4 Commits

Author SHA1 Message Date
  Forest Belton e655d6c0fc Break up synthesis and optimization 2 years ago
  Forest Belton 22f5e10812 Reduce performance impact on cost function 2 years ago
  Forest Belton b210652718 Don't include UNUSED in list of all instructions 2 years ago
  Forest Belton e34eb15271 Allow searching from a random starting point 2 years ago
4 changed files with 123 additions and 42 deletions
Split View
  1. +11
    -9
      ex.py
  2. +1
    -1
      gbso/cpu/insn.py
  3. +101
    -32
      gbso/optimize.py
  4. +10
    -0
      gbso/program/mutate.py

+ 11
- 9
ex.py View File

@ -1,3 +1,4 @@
from gbso.program.mutate import create_random_program
from time import time
from gbso.cpu.state import CPUState
@ -5,7 +6,7 @@ from gbso.program.test_case import TestCase
from gbso.cpu.insn import *
from gbso.cpu.regs import R8
from gbso.optimize import cost, optimize
from gbso.optimize import OptimizationParameters, cost, optimize
from gbso.program.program import Program
prgm = Program(
@ -19,35 +20,36 @@ prgm = Program(
outputs = [R8.A]
test_cases = [TestCase()]
max_size = 4
num_iters = 100_000_000
params = OptimizationParameters(max_size=4)
initial_cost = cost(prgm, test_cases, outputs, prgm)
initial_cycles = prgm.perf()
print("Program to optimize:")
prgm.display()
print(f"Cost: {initial_cost}")
print(f"Cost: {initial_cost[0]}")
print(f"Cycles: {initial_cycles}")
start_time = time()
optimized_prgm = optimize(
prgm,
max_size=max_size,
test_cases=test_cases,
outputs=outputs,
num_iters=num_iters,
test_cases,
outputs,
params,
)
end_time = time()
print("Optimized result:")
optimized_prgm.display()
optimized_cost = cost(prgm, test_cases, outputs, optimized_prgm)
optimized_cycles = optimized_prgm.perf()
print(f"Cost: {optimized_cost}")
print(f"Cost: {optimized_cost[0]}")
print(f"Cycles: {optimized_cycles}")
print(f"Took {round(end_time - start_time, 3)} seconds")

+ 1
- 1
gbso/cpu/insn.py View File

@ -1694,7 +1694,7 @@ class UNUSED(Insn):
ALL_INSN_CLASSES: List[Type[Insn]] = [
cls for cls in Insn.__subclasses__() if cls != Insn # type: ignore
cls for cls in Insn.__subclasses__() if cls not in {Insn, UNUSED} # type: ignore
]
INSNS_BY_SIGNATURE: Dict[str, List[Type[Insn]]] = defaultdict(list)

+ 101
- 32
gbso/optimize.py View File

@ -1,14 +1,18 @@
from math import exp, log
from dataclasses import dataclass, replace
from math import log
from random import random
from typing import List, Tuple
from typing import Callable, List, Optional, Tuple
from gbso.program.test_case import Output, TestCase, eq_on_testcase
from gbso.program.mutate import mutate_program
from gbso.program.mutate import create_random_program, mutate_program
from gbso.program.program import Program
EPSILON = 0.00001
DEFAULT_NUM_ITERS = 1_000_000
DEFAULT_ANNEALING_CONSTANT = 0.5
DEFAULT_SYNTHESIS_ITERS = 0
DEFAULT_OPTIMIZE_ITERS = 5_000_000
DEFAULT_NUM_CANDIDATES = 1
DEFAULT_PROB_OPCODE = 0.25
DEFAULT_PROB_OPERAND = 0.25
@ -18,56 +22,121 @@ DEFAULT_PROB_INSN = 0.25
DEFAULT_PROB_INSN_UNUSED = 0.1
def cost(orig_prgm, test_cases, outputs, prgm) -> Tuple[int, bool]:
c = prgm.perf() - orig_prgm.perf()
eq = c == 0
# print(f"init cost: {c}")
def cost(
orig_prgm: Program, test_cases: List[TestCase], outputs: List[Output], prgm: Program
) -> Tuple[float, bool]:
# Since each instruction executes in 4*k cycles (for some k), this can have
# the undesirable effect of performance improvements being weighted much
# higher than correctness. This hurts convergence pretty badly, so we scale
# by 1/4 to compensate.
perf = (prgm.perf() - orig_prgm.perf()) / 4.0
eq = 0
for test_case in test_cases:
c += eq_on_testcase(orig_prgm, prgm, test_case, outputs)
# print(f"cost after testcase: {c}")
eq += eq_on_testcase(orig_prgm, prgm, test_case, outputs)
return c, eq
return perf + eq, eq == 0
def optimize(
prgm: Program,
max_size: int,
def cost_noperf(
orig_prgm: Program, test_cases: List[TestCase], outputs: List[Output], prgm: Program
) -> Tuple[float, bool]:
eq = 0
for test_case in test_cases:
eq += eq_on_testcase(orig_prgm, prgm, test_case, outputs)
return eq, eq == 0
@dataclass
class OptimizationParameters:
max_size: int
beta: float = DEFAULT_ANNEALING_CONSTANT
synthesis_iters: int = DEFAULT_SYNTHESIS_ITERS
optimize_iters: int = DEFAULT_OPTIMIZE_ITERS
num_candidates: int = DEFAULT_NUM_CANDIDATES
prob_opcode: float = DEFAULT_PROB_OPCODE
prob_operand: float = DEFAULT_PROB_OPERAND
prob_swap: float = DEFAULT_PROB_SWAP
prob_insn: float = DEFAULT_PROB_INSN
prob_insn_unused: float = DEFAULT_PROB_INSN_UNUSED
cost_fn: Callable[
[Program, List[TestCase], List[Output], Program], Tuple[float, bool]
] = cost
# Perform one round of optimization
def _optimize(
target_prgm: Program,
test_cases: List[TestCase],
outputs: List[Output],
beta: int = 0.5, # How far away in cost you are allowed to search
num_iters: int = DEFAULT_NUM_ITERS,
prob_opcode: float = DEFAULT_PROB_OPCODE,
prob_operand: float = DEFAULT_PROB_OPERAND,
prob_swap: float = DEFAULT_PROB_SWAP,
prob_insn: float = DEFAULT_PROB_INSN,
prob_insn_unused: float = DEFAULT_PROB_INSN_UNUSED,
params: OptimizationParameters,
num_iters: int = DEFAULT_OPTIMIZE_ITERS,
init_prgm: Optional[Program] = None,
) -> Program:
padded_prgm = prgm.pad(max_size)
padded_prgm = target_prgm.pad(params.max_size)
if init_prgm is not None:
padded_prgm = init_prgm.pad(params.max_size)
last_prgm = padded_prgm
last_cost, _last_eq = cost(padded_prgm, test_cases, outputs, last_prgm)
last_cost, _last_eq = params.cost_fn(target_prgm, test_cases, outputs, last_prgm)
best_prgm = target_prgm.pad(params.max_size)
best_cost = 0.0
best_prgm = padded_prgm
best_cost = last_cost
num_candidates = 0
for _ in range(num_iters):
candidate_prgm = mutate_program(
last_prgm, prob_opcode, prob_operand, prob_swap, prob_insn, prob_insn_unused
last_prgm,
params.prob_opcode,
params.prob_operand,
params.prob_swap,
params.prob_insn,
params.prob_insn_unused,
)
candidate_cost, candidate_eq = cost(
padded_prgm, test_cases, outputs, candidate_prgm
candidate_cost, candidate_eq = params.cost_fn(
target_prgm, test_cases, outputs, candidate_prgm
)
if candidate_cost < best_cost and candidate_eq:
best_prgm = candidate_prgm
best_cost = candidate_cost
num_candidates += 1
if candidate_cost < last_cost - log(random()) / beta:
if candidate_cost < last_cost - log(random()) / params.beta:
last_prgm = candidate_prgm
last_cost = candidate_cost
print("last")
last_prgm.display()
return best_prgm
def optimize(
target_prgm: Program,
test_cases: List[TestCase],
outputs: List[Output],
params: OptimizationParameters,
) -> Program:
print("Synthesizing candidates...")
candidates = [
_optimize(
target_prgm,
test_cases,
outputs,
replace(params, cost_fn=cost_noperf),
num_iters=params.synthesis_iters,
init_prgm=create_random_program(params.max_size),
)
for _ in range(params.num_candidates)
]
best_candidate = min(
candidates, key=lambda p: cost(target_prgm, test_cases, outputs, p)[0]
)
print("Optimizing...")
return _optimize(
target_prgm,
test_cases,
outputs,
params,
num_iters=params.optimize_iters,
init_prgm=best_candidate,
)

+ 10
- 0
gbso/program/mutate.py View File

@ -146,3 +146,13 @@ def create_random_operand(op: Operand) -> Union[int, R8, R16]:
value = randint(-128, 127)
return value
def create_random_program(size: int) -> Program:
insns = []
for _ in range(size):
cls = choice(ALL_INSN_CLASSES + [UNUSED])
insns.append(create_random_insn(cls))
return Program(insns=insns)

Loading…
Cancel
Save