from dataclasses import dataclass, replace from math import log from random import random from typing import Callable, List, Optional, Tuple from gbso.program.test_case import Output, TestCase, eq_on_testcase from gbso.program.mutate import create_random_program, mutate_program from gbso.program.program import Program EPSILON = 0.00001 DEFAULT_ANNEALING_CONSTANT = 0.5 DEFAULT_OPTIMIZE_ITERS = 5_000_000 # By default, we set synthesis iters = 0 and num candidates = 1 to skip synthesis DEFAULT_SYNTHESIS_ITERS = 0 DEFAULT_NUM_CANDIDATES = 1 DEFAULT_PROB_OPCODE = 0.25 DEFAULT_PROB_OPERAND = 0.25 DEFAULT_PROB_SWAP = 0.25 DEFAULT_PROB_INSN = 0.25 DEFAULT_PROB_INSN_UNUSED = 0.1 def cost( orig_prgm: Program, test_cases: List[TestCase], outputs: List[Output], prgm: Program ) -> Tuple[float, bool]: # Since each instruction executes in 4*k cycles (for some k), this can have # the undesirable effect of performance improvements being weighted much # higher than correctness. This hurts convergence pretty badly, so we scale # by 1/4 to compensate. perf = (prgm.perf() - orig_prgm.perf()) / 4.0 eq = 0 for test_case in test_cases: eq += eq_on_testcase(orig_prgm, prgm, test_case, outputs) return perf + eq, eq == 0 def cost_noperf( orig_prgm: Program, test_cases: List[TestCase], outputs: List[Output], prgm: Program ) -> Tuple[float, bool]: eq = 0 for test_case in test_cases: eq += eq_on_testcase(orig_prgm, prgm, test_case, outputs) return eq, eq == 0 @dataclass class OptimizationParameters: max_size: int beta: float = DEFAULT_ANNEALING_CONSTANT synthesize: bool = True synthesis_iters: int = DEFAULT_SYNTHESIS_ITERS optimize_iters: int = DEFAULT_OPTIMIZE_ITERS num_candidates: int = DEFAULT_NUM_CANDIDATES prob_opcode: float = DEFAULT_PROB_OPCODE prob_operand: float = DEFAULT_PROB_OPERAND prob_swap: float = DEFAULT_PROB_SWAP prob_insn: float = DEFAULT_PROB_INSN prob_insn_unused: float = DEFAULT_PROB_INSN_UNUSED cost_fn: Callable[ [Program, List[TestCase], List[Output], Program], Tuple[float, bool] ] = cost # Perform one round of optimization def _optimize( target_prgm: Program, test_cases: List[TestCase], outputs: List[Output], params: OptimizationParameters, num_iters: int = DEFAULT_OPTIMIZE_ITERS, init_prgm: Optional[Program] = None, ) -> Program: padded_prgm = target_prgm.pad(params.max_size) if init_prgm is not None: padded_prgm = init_prgm.pad(params.max_size) last_prgm = padded_prgm last_cost, _last_eq = params.cost_fn(target_prgm, test_cases, outputs, last_prgm) best_prgm = target_prgm.pad(params.max_size) best_cost = 0.0 num_candidates = 0 for _ in range(num_iters): candidate_prgm = mutate_program( last_prgm, params.prob_opcode, params.prob_operand, params.prob_swap, params.prob_insn, params.prob_insn_unused, ) candidate_cost, candidate_eq = params.cost_fn( target_prgm, test_cases, outputs, candidate_prgm ) if candidate_cost < best_cost and candidate_eq: best_prgm = candidate_prgm best_cost = candidate_cost num_candidates += 1 if candidate_cost < last_cost - log(random()) / params.beta: last_prgm = candidate_prgm last_cost = candidate_cost return best_prgm def optimize( target_prgm: Program, test_cases: List[TestCase], outputs: List[Output], params: OptimizationParameters, ) -> Program: best_candidate = target_prgm if params.synthesize: print("Synthesizing candidates...") candidates = [ _optimize( target_prgm, test_cases, outputs, replace(params, cost_fn=cost_noperf), num_iters=params.synthesis_iters, init_prgm=create_random_program(params.max_size), ) for _ in range(params.num_candidates) ] best_candidate = min( candidates, key=lambda p: cost(target_prgm, test_cases, outputs, p)[0] ) print("Optimizing...") return _optimize( target_prgm, test_cases, outputs, params, num_iters=params.optimize_iters, init_prgm=best_candidate, )