|
|
@ -1,14 +1,18 @@ |
|
|
|
from dataclasses import dataclass, replace |
|
|
|
from math import log |
|
|
|
from random import random |
|
|
|
from typing import List, Optional, Tuple |
|
|
|
from typing import Callable, List, Optional, Tuple |
|
|
|
|
|
|
|
from gbso.program.test_case import Output, TestCase, eq_on_testcase |
|
|
|
from gbso.program.mutate import mutate_program |
|
|
|
from gbso.program.mutate import create_random_program, mutate_program |
|
|
|
from gbso.program.program import Program |
|
|
|
|
|
|
|
EPSILON = 0.00001 |
|
|
|
|
|
|
|
DEFAULT_NUM_ITERS = 1_000_000 |
|
|
|
DEFAULT_ANNEALING_CONSTANT = 0.5 |
|
|
|
DEFAULT_SYNTHESIS_ITERS = 0 |
|
|
|
DEFAULT_OPTIMIZE_ITERS = 5_000_000 |
|
|
|
DEFAULT_NUM_CANDIDATES = 1 |
|
|
|
|
|
|
|
DEFAULT_PROB_OPCODE = 0.25 |
|
|
|
DEFAULT_PROB_OPERAND = 0.25 |
|
|
@ -18,7 +22,9 @@ DEFAULT_PROB_INSN = 0.25 |
|
|
|
DEFAULT_PROB_INSN_UNUSED = 0.1 |
|
|
|
|
|
|
|
|
|
|
|
def cost(orig_prgm, test_cases, outputs, prgm) -> Tuple[int, bool]: |
|
|
|
def cost( |
|
|
|
orig_prgm: Program, test_cases: List[TestCase], outputs: List[Output], prgm: Program |
|
|
|
) -> Tuple[float, bool]: |
|
|
|
# Since each instruction executes in 4*k cycles (for some k), this can have |
|
|
|
# the undesirable effect of performance improvements being weighted much |
|
|
|
# higher than correctness. This hurts convergence pretty badly, so we scale |
|
|
@ -32,35 +38,63 @@ def cost(orig_prgm, test_cases, outputs, prgm) -> Tuple[int, bool]: |
|
|
|
return perf + eq, eq == 0 |
|
|
|
|
|
|
|
|
|
|
|
def optimize( |
|
|
|
def cost_noperf( |
|
|
|
orig_prgm: Program, test_cases: List[TestCase], outputs: List[Output], prgm: Program |
|
|
|
) -> Tuple[float, bool]: |
|
|
|
eq = 0 |
|
|
|
for test_case in test_cases: |
|
|
|
eq += eq_on_testcase(orig_prgm, prgm, test_case, outputs) |
|
|
|
return eq, eq == 0 |
|
|
|
|
|
|
|
|
|
|
|
@dataclass |
|
|
|
class OptimizationParameters: |
|
|
|
max_size: int |
|
|
|
beta: float = DEFAULT_ANNEALING_CONSTANT |
|
|
|
synthesis_iters: int = DEFAULT_SYNTHESIS_ITERS |
|
|
|
optimize_iters: int = DEFAULT_OPTIMIZE_ITERS |
|
|
|
num_candidates: int = DEFAULT_NUM_CANDIDATES |
|
|
|
prob_opcode: float = DEFAULT_PROB_OPCODE |
|
|
|
prob_operand: float = DEFAULT_PROB_OPERAND |
|
|
|
prob_swap: float = DEFAULT_PROB_SWAP |
|
|
|
prob_insn: float = DEFAULT_PROB_INSN |
|
|
|
prob_insn_unused: float = DEFAULT_PROB_INSN_UNUSED |
|
|
|
cost_fn: Callable[ |
|
|
|
[Program, List[TestCase], List[Output], Program], Tuple[float, bool] |
|
|
|
] = cost |
|
|
|
|
|
|
|
|
|
|
|
# Perform one round of optimization |
|
|
|
def _optimize( |
|
|
|
target_prgm: Program, |
|
|
|
max_size: int, |
|
|
|
test_cases: List[TestCase], |
|
|
|
outputs: List[Output], |
|
|
|
beta: int = 0.5, # How far away in cost you are allowed to search |
|
|
|
params: OptimizationParameters, |
|
|
|
num_iters: int = DEFAULT_OPTIMIZE_ITERS, |
|
|
|
init_prgm: Optional[Program] = None, |
|
|
|
num_iters: int = DEFAULT_NUM_ITERS, |
|
|
|
prob_opcode: float = DEFAULT_PROB_OPCODE, |
|
|
|
prob_operand: float = DEFAULT_PROB_OPERAND, |
|
|
|
prob_swap: float = DEFAULT_PROB_SWAP, |
|
|
|
prob_insn: float = DEFAULT_PROB_INSN, |
|
|
|
prob_insn_unused: float = DEFAULT_PROB_INSN_UNUSED, |
|
|
|
) -> Program: |
|
|
|
padded_prgm = (init_prgm or target_prgm).pad(max_size) |
|
|
|
padded_prgm = target_prgm.pad(params.max_size) |
|
|
|
if init_prgm is not None: |
|
|
|
padded_prgm = init_prgm.pad(params.max_size) |
|
|
|
|
|
|
|
last_prgm = padded_prgm |
|
|
|
last_cost, _last_eq = cost(target_prgm, test_cases, outputs, last_prgm) |
|
|
|
last_cost, _last_eq = params.cost_fn(target_prgm, test_cases, outputs, last_prgm) |
|
|
|
|
|
|
|
best_prgm = target_prgm.pad(max_size) |
|
|
|
best_cost = 0 |
|
|
|
best_prgm = target_prgm.pad(params.max_size) |
|
|
|
best_cost = 0.0 |
|
|
|
|
|
|
|
num_candidates = 0 |
|
|
|
|
|
|
|
for _ in range(num_iters): |
|
|
|
candidate_prgm = mutate_program( |
|
|
|
last_prgm, prob_opcode, prob_operand, prob_swap, prob_insn, prob_insn_unused |
|
|
|
last_prgm, |
|
|
|
params.prob_opcode, |
|
|
|
params.prob_operand, |
|
|
|
params.prob_swap, |
|
|
|
params.prob_insn, |
|
|
|
params.prob_insn_unused, |
|
|
|
) |
|
|
|
candidate_cost, candidate_eq = cost( |
|
|
|
candidate_cost, candidate_eq = params.cost_fn( |
|
|
|
target_prgm, test_cases, outputs, candidate_prgm |
|
|
|
) |
|
|
|
|
|
|
@ -69,9 +103,40 @@ def optimize( |
|
|
|
best_cost = candidate_cost |
|
|
|
num_candidates += 1 |
|
|
|
|
|
|
|
if candidate_cost < last_cost - log(random()) / beta: |
|
|
|
if candidate_cost < last_cost - log(random()) / params.beta: |
|
|
|
last_prgm = candidate_prgm |
|
|
|
last_cost = candidate_cost |
|
|
|
|
|
|
|
print(f"Optimization complete. Total candidates: {num_candidates}") |
|
|
|
return best_prgm |
|
|
|
|
|
|
|
|
|
|
|
def optimize( |
|
|
|
target_prgm: Program, |
|
|
|
test_cases: List[TestCase], |
|
|
|
outputs: List[Output], |
|
|
|
params: OptimizationParameters, |
|
|
|
) -> Program: |
|
|
|
print("Synthesizing candidates...") |
|
|
|
candidates = [ |
|
|
|
_optimize( |
|
|
|
target_prgm, |
|
|
|
test_cases, |
|
|
|
outputs, |
|
|
|
replace(params, cost_fn=cost_noperf), |
|
|
|
num_iters=params.synthesis_iters, |
|
|
|
init_prgm=create_random_program(params.max_size), |
|
|
|
) |
|
|
|
for _ in range(params.num_candidates) |
|
|
|
] |
|
|
|
best_candidate = min( |
|
|
|
candidates, key=lambda p: cost(target_prgm, test_cases, outputs, p)[0] |
|
|
|
) |
|
|
|
print("Optimizing...") |
|
|
|
return _optimize( |
|
|
|
target_prgm, |
|
|
|
test_cases, |
|
|
|
outputs, |
|
|
|
params, |
|
|
|
num_iters=params.optimize_iters, |
|
|
|
init_prgm=best_candidate, |
|
|
|
) |