import ecole import pyscipopt class IntegralParameters(): def __init__(self, offset=None, initial_primal_bound=None, initial_dual_bound=None): self._offset = offset self._initial_primal_bound = initial_primal_bound self._initial_dual_bound = initial_dual_bound def fetch_values(self, model): # trick to allow the parameters to be set dynamically self.offset = self._offset() if callable(self._offset) else self._offset self.initial_primal_bound = self._initial_primal_bound() if callable(self._initial_primal_bound) else self._initial_primal_bound self.initial_dual_bound = self._initial_dual_bound() if callable(self._initial_dual_bound) else self._initial_dual_bound # fetch default values if none was provided if self.offset is None: self.offset = 0.0 if self.initial_primal_bound is None: self.initial_primal_bound = model.as_pyscipopt().getObjlimit() if self.initial_dual_bound is None: m = model.as_pyscipopt() self.initial_dual_bound = -m.infinity() if m.getObjectiveSense() == "minimize" else m.infinity() class TimeLimitDualIntegral(ecole.reward.DualIntegral): def __init__(self): self.parameters = IntegralParameters() super().__init__(wall=True, bound_function=lambda model: ( self.parameters.offset, self.parameters.initial_dual_bound)) def set_parameters(self, objective_offset=None, initial_primal_bound=None, initial_dual_bound=None): self.parameters = IntegralParameters( offset=objective_offset, initial_primal_bound=initial_primal_bound, initial_dual_bound=initial_dual_bound) def before_reset(self, model): self.parameters.fetch_values(model) super().before_reset(model) def extract(self, model, done): reward = super().extract(model, done) # adjust the final reward if the time limit has not been reached if done: m = model.as_pyscipopt() # keep integrating over the time left time_left = max(m.getParam("limits/time") - m.getSolvingTime(), 0) if m.getStage() < pyscipopt.scip.PY_SCIP_STAGE.TRANSFORMED: dual_bound = -m.infinity() if m.getObjectiveSense() == "minimize" else m.infinity() else: dual_bound = m.getDualbound() offset = self.parameters.offset initial_dual_bound = self.parameters.initial_dual_bound # account for the model's objective direction (maximization vs minimization) if m.getObjectiveSense() == "minimize": reward += -(max(dual_bound, initial_dual_bound) - offset) * time_left else: reward += (min(dual_bound, initial_dual_bound) - offset) * time_left return reward