|
|
- from dataclasses import dataclass, replace
- from math import log
- from random import random
- from typing import Callable, List, Optional, Tuple
-
- from gbso.program.test_case import Output, TestCase, eq_on_testcase
- from gbso.program.mutate import create_random_program, mutate_program
- from gbso.program.program import Program
-
- EPSILON = 0.00001
-
- DEFAULT_ANNEALING_CONSTANT = 0.5
- DEFAULT_OPTIMIZE_ITERS = 5_000_000
-
- # By default, we set synthesis iters = 0 and num candidates = 1 to skip synthesis
- DEFAULT_SYNTHESIS_ITERS = 0
- DEFAULT_NUM_CANDIDATES = 1
-
- DEFAULT_PROB_OPCODE = 0.25
- DEFAULT_PROB_OPERAND = 0.25
- DEFAULT_PROB_SWAP = 0.25
- DEFAULT_PROB_INSN = 0.25
-
- DEFAULT_PROB_INSN_UNUSED = 0.1
-
-
- def cost(
- orig_prgm: Program, test_cases: List[TestCase], outputs: List[Output], prgm: Program
- ) -> Tuple[float, bool]:
- # Since each instruction executes in 4*k cycles (for some k), this can have
- # the undesirable effect of performance improvements being weighted much
- # higher than correctness. This hurts convergence pretty badly, so we scale
- # by 1/4 to compensate.
- perf = (prgm.perf() - orig_prgm.perf()) / 4.0
- eq = 0
-
- for test_case in test_cases:
- eq += eq_on_testcase(orig_prgm, prgm, test_case, outputs)
-
- return perf + eq, eq == 0
-
-
- def cost_noperf(
- orig_prgm: Program, test_cases: List[TestCase], outputs: List[Output], prgm: Program
- ) -> Tuple[float, bool]:
- eq = 0
- for test_case in test_cases:
- eq += eq_on_testcase(orig_prgm, prgm, test_case, outputs)
- return eq, eq == 0
-
-
- @dataclass
- class OptimizationParameters:
- max_size: int
- beta: float = DEFAULT_ANNEALING_CONSTANT
- synthesize: bool = True
- synthesis_iters: int = DEFAULT_SYNTHESIS_ITERS
- optimize_iters: int = DEFAULT_OPTIMIZE_ITERS
- num_candidates: int = DEFAULT_NUM_CANDIDATES
- prob_opcode: float = DEFAULT_PROB_OPCODE
- prob_operand: float = DEFAULT_PROB_OPERAND
- prob_swap: float = DEFAULT_PROB_SWAP
- prob_insn: float = DEFAULT_PROB_INSN
- prob_insn_unused: float = DEFAULT_PROB_INSN_UNUSED
- cost_fn: Callable[
- [Program, List[TestCase], List[Output], Program], Tuple[float, bool]
- ] = cost
-
-
- # Perform one round of optimization
- def _optimize(
- target_prgm: Program,
- test_cases: List[TestCase],
- outputs: List[Output],
- params: OptimizationParameters,
- num_iters: int = DEFAULT_OPTIMIZE_ITERS,
- init_prgm: Optional[Program] = None,
- ) -> Program:
- padded_prgm = target_prgm.pad(params.max_size)
- if init_prgm is not None:
- padded_prgm = init_prgm.pad(params.max_size)
-
- last_prgm = padded_prgm
- last_cost, _last_eq = params.cost_fn(target_prgm, test_cases, outputs, last_prgm)
-
- best_prgm = target_prgm.pad(params.max_size)
- best_cost = 0.0
-
- num_candidates = 0
-
- for _ in range(num_iters):
- candidate_prgm = mutate_program(
- last_prgm,
- params.prob_opcode,
- params.prob_operand,
- params.prob_swap,
- params.prob_insn,
- params.prob_insn_unused,
- )
- candidate_cost, candidate_eq = params.cost_fn(
- target_prgm, test_cases, outputs, candidate_prgm
- )
-
- if candidate_cost < best_cost and candidate_eq:
- best_prgm = candidate_prgm
- best_cost = candidate_cost
- num_candidates += 1
-
- if candidate_cost < last_cost - log(random()) / params.beta:
- last_prgm = candidate_prgm
- last_cost = candidate_cost
-
- return best_prgm
-
-
- def optimize(
- target_prgm: Program,
- test_cases: List[TestCase],
- outputs: List[Output],
- params: OptimizationParameters,
- ) -> Program:
- best_candidate = target_prgm
-
- if params.synthesize:
- print("Synthesizing candidates...")
- candidates = [
- _optimize(
- target_prgm,
- test_cases,
- outputs,
- replace(params, cost_fn=cost_noperf),
- num_iters=params.synthesis_iters,
- init_prgm=create_random_program(params.max_size),
- )
- for _ in range(params.num_candidates)
- ]
- best_candidate = min(
- candidates, key=lambda p: cost(target_prgm, test_cases, outputs, p)[0]
- )
-
- print("Optimizing...")
- return _optimize(
- target_prgm,
- test_cases,
- outputs,
- params,
- num_iters=params.optimize_iters,
- init_prgm=best_candidate,
- )
|