from dataclasses import dataclass, replace
|
|
from math import log
|
|
from random import random
|
|
from typing import Callable, List, Optional, Tuple
|
|
|
|
from gbso.program.test_case import Output, TestCase, eq_on_testcase
|
|
from gbso.program.mutate import create_random_program, mutate_program
|
|
from gbso.program.program import Program
|
|
|
|
EPSILON = 0.00001
|
|
|
|
DEFAULT_ANNEALING_CONSTANT = 0.5
|
|
DEFAULT_OPTIMIZE_ITERS = 5_000_000
|
|
|
|
# By default, we set synthesis iters = 0 and num candidates = 1 to skip synthesis
|
|
DEFAULT_SYNTHESIS_ITERS = 0
|
|
DEFAULT_NUM_CANDIDATES = 1
|
|
|
|
DEFAULT_PROB_OPCODE = 0.25
|
|
DEFAULT_PROB_OPERAND = 0.25
|
|
DEFAULT_PROB_SWAP = 0.25
|
|
DEFAULT_PROB_INSN = 0.25
|
|
|
|
DEFAULT_PROB_INSN_UNUSED = 0.1
|
|
|
|
|
|
def cost(
|
|
orig_prgm: Program, test_cases: List[TestCase], outputs: List[Output], prgm: Program
|
|
) -> Tuple[float, bool]:
|
|
# Since each instruction executes in 4*k cycles (for some k), this can have
|
|
# the undesirable effect of performance improvements being weighted much
|
|
# higher than correctness. This hurts convergence pretty badly, so we scale
|
|
# by 1/4 to compensate.
|
|
perf = (prgm.perf() - orig_prgm.perf()) / 4.0
|
|
eq = 0
|
|
|
|
for test_case in test_cases:
|
|
eq += eq_on_testcase(orig_prgm, prgm, test_case, outputs)
|
|
|
|
return perf + eq, eq == 0
|
|
|
|
|
|
def cost_noperf(
|
|
orig_prgm: Program, test_cases: List[TestCase], outputs: List[Output], prgm: Program
|
|
) -> Tuple[float, bool]:
|
|
eq = 0
|
|
for test_case in test_cases:
|
|
eq += eq_on_testcase(orig_prgm, prgm, test_case, outputs)
|
|
return eq, eq == 0
|
|
|
|
|
|
@dataclass
|
|
class OptimizationParameters:
|
|
max_size: int
|
|
beta: float = DEFAULT_ANNEALING_CONSTANT
|
|
synthesize: bool = True
|
|
synthesis_iters: int = DEFAULT_SYNTHESIS_ITERS
|
|
optimize_iters: int = DEFAULT_OPTIMIZE_ITERS
|
|
num_candidates: int = DEFAULT_NUM_CANDIDATES
|
|
prob_opcode: float = DEFAULT_PROB_OPCODE
|
|
prob_operand: float = DEFAULT_PROB_OPERAND
|
|
prob_swap: float = DEFAULT_PROB_SWAP
|
|
prob_insn: float = DEFAULT_PROB_INSN
|
|
prob_insn_unused: float = DEFAULT_PROB_INSN_UNUSED
|
|
cost_fn: Callable[
|
|
[Program, List[TestCase], List[Output], Program], Tuple[float, bool]
|
|
] = cost
|
|
|
|
|
|
# Perform one round of optimization
|
|
def _optimize(
|
|
target_prgm: Program,
|
|
test_cases: List[TestCase],
|
|
outputs: List[Output],
|
|
params: OptimizationParameters,
|
|
num_iters: int = DEFAULT_OPTIMIZE_ITERS,
|
|
init_prgm: Optional[Program] = None,
|
|
) -> Program:
|
|
padded_prgm = target_prgm.pad(params.max_size)
|
|
if init_prgm is not None:
|
|
padded_prgm = init_prgm.pad(params.max_size)
|
|
|
|
last_prgm = padded_prgm
|
|
last_cost, _last_eq = params.cost_fn(target_prgm, test_cases, outputs, last_prgm)
|
|
|
|
best_prgm = target_prgm.pad(params.max_size)
|
|
best_cost = 0.0
|
|
|
|
num_candidates = 0
|
|
|
|
for _ in range(num_iters):
|
|
candidate_prgm = mutate_program(
|
|
last_prgm,
|
|
params.prob_opcode,
|
|
params.prob_operand,
|
|
params.prob_swap,
|
|
params.prob_insn,
|
|
params.prob_insn_unused,
|
|
)
|
|
candidate_cost, candidate_eq = params.cost_fn(
|
|
target_prgm, test_cases, outputs, candidate_prgm
|
|
)
|
|
|
|
if candidate_cost < best_cost and candidate_eq:
|
|
best_prgm = candidate_prgm
|
|
best_cost = candidate_cost
|
|
num_candidates += 1
|
|
|
|
if candidate_cost < last_cost - log(random()) / params.beta:
|
|
last_prgm = candidate_prgm
|
|
last_cost = candidate_cost
|
|
|
|
return best_prgm
|
|
|
|
|
|
def optimize(
|
|
target_prgm: Program,
|
|
test_cases: List[TestCase],
|
|
outputs: List[Output],
|
|
params: OptimizationParameters,
|
|
) -> Program:
|
|
best_candidate = target_prgm
|
|
|
|
if params.synthesize:
|
|
print("Synthesizing candidates...")
|
|
candidates = [
|
|
_optimize(
|
|
target_prgm,
|
|
test_cases,
|
|
outputs,
|
|
replace(params, cost_fn=cost_noperf),
|
|
num_iters=params.synthesis_iters,
|
|
init_prgm=create_random_program(params.max_size),
|
|
)
|
|
for _ in range(params.num_candidates)
|
|
]
|
|
best_candidate = min(
|
|
candidates, key=lambda p: cost(target_prgm, test_cases, outputs, p)[0]
|
|
)
|
|
|
|
print("Optimizing...")
|
|
return _optimize(
|
|
target_prgm,
|
|
test_cases,
|
|
outputs,
|
|
params,
|
|
num_iters=params.optimize_iters,
|
|
init_prgm=best_candidate,
|
|
)
|