import os
import sys
import glob
import gzip
import json
import ecole
import queue
import pickle
import shutil
import argparse
import threading
import ecole.instance
import numpy as np
from pathlib import Path
import os
print(os.getcwd())
sys.path.append('.')
print(sys.path)
# import environment
# sys.path.append('../..')
from common.environments import Branching as Environment
class ExploreThenStrongBranch:
"""
Custom observation function.
Queries the expert with a given probability. Returns variable scores given
by the expert (if queried), or pseudocost scores otherwise.
Parameters
----------
expert_probability : float in [0, 1]
Probability of running the expert strategy and collecting samples.
"""
def __init__(self, expert_probability):
self.expert_probability = expert_probability
self.pseudocosts_function = ecole.observation.Pseudocosts()
self.strong_branching_function = ecole.observation.StrongBranchingScores()
def before_reset(self, model):
"""
Reset internal data at the start of episodes.
Called before environment dynamics are reset.
Parameters
----------
model : ecole.scip.Model
Model defining the current state of the solver.
"""
self.pseudocosts_function.before_reset(model)
self.strong_branching_function.before_reset(model)
def extract(self, model, done):
"""
Extract the observation on the given state.
Parameters
----------
model : ecole.scip.Model
Model defining the current state of the solver.
done : bool
Flag indicating if the state is terminal.
Returns
-------
scores : np.ndarray
Variable scores.
scores_are_expert : bool
Flag indicating whether scores are given by the expert.
"""
probabilities = [1-self.expert_probability, self.expert_probability]
expert_chosen = bool(np.random.choice(np.arange(2), p=probabilities))
if expert_chosen:
return (self.strong_branching_function.extract(model,done), True)
else:
return (self.pseudocosts_function.extract(model,done), False)
def send_orders(orders_queue, instances, seed, query_expert_prob, time_limit, out_dir, stop_flag):
"""
Continuously send sampling orders to workers (relies on limited
queue capacity).
Parameters
----------
orders_queue : queue.Queue
Queue to which to send orders.
instances : list
Instance file names from which to sample episodes.
seed : int
Random seed for reproducibility.
query_expert_prob : float in [0, 1]
Probability of running the expert strategy and collecting samples.
time_limit : float in [0, 1e+20]
Maximum running time for an episode, in seconds.
out_dir: str
Output directory in which to write samples.
stop_flag: threading.Event
A flag to tell the thread to stop.
"""
rng = np.random.RandomState(seed)
episode = 0
while not stop_flag.is_set():
instance = next(iter(instances))
# initial_primal_bound = instance.primal_bound
instance = Path(rng.choice(instances))
with open(instance.with_name(instance.stem).with_suffix('.json')) as f:
instance_info = json.load(f)
initial_primal_bound = instance_info["primal_bound"]
seed = rng.randint(2**32)
orders_queue.put([episode, instance, initial_primal_bound, seed, query_expert_prob, time_limit, out_dir])
episode += 1
def make_samples(in_queue, out_queue, stop_flag):
"""
Worker loop: fetch an instance, run an episode and record samples.
Parameters
----------
in_queue : queue.Queue
Input queue from which orders are received.
out_queue : queue.Queue
Output queue in which to send samples.
stop_flag: threading.Event
A flag to tell the thread to stop.
"""
sample_counter = 0
while not stop_flag.is_set():
episode, instance, initial_primal_bound, seed, query_expert_prob, time_limit, out_dir = in_queue.get()
observation_function = { 'scores': ExploreThenStrongBranch(expert_probability=query_expert_prob),
'node_observation': ecole.observation.NodeBipartite() }
env = Environment(
time_limit=time_limit,
observation_function=observation_function,
)
print(f"[w {threading.current_thread().name}] episode {episode}, seed {seed}, "
f"processing instance '{instance}'...\n", end='')
out_queue.put({
'type': 'start',
'episode': episode,
'instance': instance,
'seed': seed,
})
env.seed(seed)
# observation, action_set, _, done, _ = env.reset(str(instance), objective_limit=initial_primal_bound)
observation, action_set, _, done, _ = env.reset(instance, objective_limit=initial_primal_bound)
while not done:
scores, scores_are_expert = observation["scores"]
node_observation = observation["node_observation"]
node_observation = (node_observation.row_features,
(node_observation.edge_features.indices,
node_observation.edge_features.values),
node_observation.variable_features)
action = action_set[scores[action_set].argmax()]
if scores_are_expert and not stop_flag.is_set():
data = [node_observation, action, action_set, scores]
filename = f'{out_dir}/sample_{episode}_{sample_counter}.pkl'
with gzip.open(filename, 'wb') as f:
pickle.dump({
'episode': episode,
# 'instance': instance,
'seed': seed,
'data': data,
}, f)
out_queue.put({
'type': 'sample',
'episode': episode,
'instance': instance,
'seed': seed,
'filename': filename,
})
sample_counter += 1
try:
observation, action_set, _, done, _ = env.step(action)
except Exception as e:
done = True
with open("error_log.txt","a") as f:
f.write(f"Error occurred solving {instance} with seed {seed}\n")
f.write(f"{e}\n")
print(f"[w {threading.current_thread().name}] episode {episode} done, {sample_counter} samples\n", end='')
out_queue.put({
'type': 'done',
'episode': episode,
'instance': instance,
'seed': seed,
})
def collect_samples(instances, out_dir, rng, n_samples, n_jobs, query_expert_prob, time_limit):
"""
Runs branch-and-bound episodes on the given set of instances, and collects
randomly (state, action) pairs from the 'vanilla-fullstrong' expert
brancher.
Parameters
----------
instances : list
Instance files from which to collect samples.
out_dir : str
Directory in which to write samples.
rng : numpy.random.RandomState
A random number generator for reproducibility.
n_samples : int
Number of samples to collect.
n_jobs : int
Number of jobs for parallel sampling.
query_expert_prob : float in [0, 1]
Probability of using the expert policy and recording a (state, action)
pair.
time_limit : float in [0, 1e+20]
Maximum running time for an episode, in seconds.
"""
os.makedirs(out_dir, exist_ok=True)
# start workers
orders_queue = queue.Queue(maxsize=2*n_jobs)
answers_queue = queue.SimpleQueue()
tmp_samples_dir = f'{out_dir}/tmp'
os.makedirs(tmp_samples_dir, exist_ok=True)
# start dispatcher
dispatcher_stop_flag = threading.Event()
dispatcher = threading.Thread(
target=send_orders,
args=(orders_queue, instances, rng.randint(2**32), query_expert_prob,
time_limit, tmp_samples_dir, dispatcher_stop_flag),
daemon=True)
dispatcher.start()
workers = []
workers_stop_flag = threading.Event()
for i in range(n_jobs):
p = threading.Thread(
target=make_samples,
args=(orders_queue, answers_queue, workers_stop_flag),
daemon=True)
workers.append(p)
p.start()
# record answers and write samples
buffer = {}
current_episode = 0
i = 0
in_buffer = 0
while i < n_samples:
sample = answers_queue.get()
# add received sample to buffer
if sample['type'] == 'start':
buffer[sample['episode']] = []
else:
buffer[sample['episode']].append(sample)
if sample['type'] == 'sample':
in_buffer += 1
# if any, write samples from current episode
while current_episode in buffer and buffer[current_episode]:
samples_to_write = buffer[current_episode]
buffer[current_episode] = []
for sample in samples_to_write:
# if no more samples here, move to next episode
if sample['type'] == 'done':
del buffer[current_episode]
current_episode += 1
# else write sample
else:
os.rename(sample['filename'], f'{out_dir}/sample_{i+1}.pkl')
in_buffer -= 1
i += 1
print(f"[m {threading.current_thread().name}] {i} / {n_samples} samples written, "
f"ep {sample['episode']} ({in_buffer} in buffer).\n", end='')
# early stop dispatcher
if in_buffer + i >= n_samples and dispatcher.is_alive():
dispatcher_stop_flag.set()
print(f"[m {threading.current_thread().name}] dispatcher stopped...\n", end='')
# as soon as enough samples are collected, stop
if i == n_samples:
buffer = {}
break
# # stop all workers
workers_stop_flag.set()
shutil.rmtree(tmp_samples_dir, ignore_errors=True)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
'problem',
help='MILP instance type to process.',
choices=['item_placement', 'load_balancing', 'anonymous', 'set_cover', 'combinatorial_auction', 'capacitated_facility_location', 'independent_set'],
)
parser.add_argument(
'-s', '--seed',
help='Random generator seed.',
type=int,
default=0,
)
parser.add_argument(
'-j', '--njobs',
help='Number of parallel jobs.',
type=int,
default=1,
)
args = parser.parse_args()
print(f"seed {args.seed}")
# parameters
node_record_prob = 0.05 # probability of running the expert strategy and collecting samples.
time_limit = 3600 # time limit for solving each instance
train_size = 10000 # number of samples of each type
valid_size = 2000
# get instances
if args.problem == 'item_placement':
instances_train = glob.glob('instances/1_item_placement/train/*.mps.gz')
instances_valid = glob.glob('instances/1_item_placement/valid/*.mps.gz')
out_dir = 'baseline/dual/train_files/samples/1_item_placement'
elif args.problem == 'load_balancing':
instances_train = glob.glob('instances/2_load_balancing/train/*.mps.gz')
instances_valid = glob.glob('instances/2_load_balancing/valid/*.mps.gz')
out_dir = 'baseline/dual/train_files/samples/2_load_balancing'
elif args.problem == 'set_cover':
instances_train = ecole.instance.SetCoverGenerator(n_rows=500, n_cols=500, density=0.05)
instances_train.seed(1)
instances_valid = ecole.instance.SetCoverGenerator(n_rows=500, n_cols=500, density=0.05)
instances_valid.seed(2)
out_dir = 'baseline/dual/train_files/samples/4_set_cover'
elif args.problem == 'combinatorial_auction':
instances_train = ecole.instance.CombinatorialAuctionGenerator(n_items=100, n_bids=250)
instances_train.seed(1)
instances_valid = ecole.instance.CombinatorialAuctionGenerator(n_items=100, n_bids=250)
instances_valid.seed(2)
out_dir = 'baseline/dual/train_files/samples/5_combinatorial_auction'
elif args.problem == 'capacitated_facility_location':
instances_train = ecole.instance.CapacitatedFacilityLocationGenerator(n_customers=100, n_facilities=50)
instances_train.seed(1)
instances_valid = ecole.instance.CapacitatedFacilityLocationGenerator(n_customers=100, n_facilities=50)
instances_valid.seed(2)
out_dir = 'baseline/dual/train_files/samples/6_capacitated_facility_location'
elif args.problem == 'independent_set':
instances_train = ecole.instance.IndependentSetGenerator(n_nodes=500, graph_type='erdos_renyi')
instances_train.seed(1)
instances_valid = ecole.instance.IndependentSetGenerator(n_nodes=500, graph_type='erdos_renyi')
instances_valid.seed(2)
out_dir = 'baseline/dual/train_files/samples/7_independent_set'
else:
raise NotImplementedError
# print(f"{len(instances_train)} train instances for {train_size} samples")
# print(f"{len(instances_valid)} validation instances for {valid_size} samples")
print(f"{0} train instances for {train_size} samples")
print(f"{0} validation instances for {valid_size} samples")
# create output directory, throws an error if it already exists
os.makedirs(out_dir)
# generate train samples
rng = np.random.RandomState(args.seed+100)
collect_samples(instances_train, out_dir + '/train', rng, train_size,
args.njobs, query_expert_prob=node_record_prob,
time_limit=time_limit)
# generate validation samples
rng = np.random.RandomState(args.seed + 1)
collect_samples(instances_valid, out_dir + '/valid', rng, valid_size,
args.njobs, query_expert_prob=node_record_prob,
time_limit=time_limit)