import torch import json import torch.nn as nn from utils import prepare_trees, generate_plan_tree def left_child(x): if len(x) != 3: return None return x[1] def right_child(x): if len(x) != 3: return None return x[2] def features(x): return x[0] class BinaryTreeConv(nn.Module): def __init__(self, in_channels, out_channels): super(BinaryTreeConv, self).__init__() self.__in_channels = in_channels self.__out_channels = out_channels # we can think of the tree conv as a single dense layer # that we "drag" across the tree. self.weights = nn.Conv1d(in_channels, out_channels, stride=3, kernel_size=3) def forward(self, flat_data): trees, idxes = flat_data orig_idxes = idxes idxes = idxes.expand(-1, -1, self.__in_channels).transpose(1, 2) expanded = torch.gather(trees, 2, idxes) results = self.weights(expanded) # add a zero vector back on zero_vec = torch.zeros((trees.shape[0], self.__out_channels)).unsqueeze(2) zero_vec = zero_vec.to(results.device) results = torch.cat((zero_vec, results), dim=2) return (results, orig_idxes) class TreeActivation(nn.Module): def __init__(self, activation): super(TreeActivation, self).__init__() self.activation = activation def forward(self, x): return (self.activation(x[0]), x[1]) class TreeLayerNorm(nn.Module): def forward(self, x): data, idxes = x mean = torch.mean(data, dim=(1, 2)).unsqueeze(1).unsqueeze(1) std = torch.std(data, dim=(1, 2)).unsqueeze(1).unsqueeze(1) normd = (data - mean) / (std + 0.00001) return (normd, idxes) class DynamicPooling(nn.Module): def forward(self, x): return torch.max(x[0], dim=2).values class TreeNet(nn.Module): def __init__(self, in_channels, output=8): super(TreeNet, self).__init__() self.__in_channels = in_channels self.__cuda = False self.tree_conv = nn.Sequential( BinaryTreeConv(self.__in_channels, 256), TreeLayerNorm(), TreeActivation(nn.LeakyReLU()), BinaryTreeConv(256, 128), TreeLayerNorm(), TreeActivation(nn.LeakyReLU()), BinaryTreeConv(128, 64), TreeLayerNorm(), DynamicPooling(), nn.Linear(64, output), ) def in_channels(self): return self.__in_channels def forward(self, x): trees = prepare_trees(x, features, left_child, right_child, cuda=self.__cuda) return self.tree_conv(trees) def cuda(self): self.__cuda = True return super().cuda() class TCNNEncoder(nn.Module): def __init__(self, plan_in_channel, knobs_dim, output=32): super(TCNNEncoder, self).__init__() self.channel = plan_in_channel self.knobs_dim = knobs_dim self._net = TreeNet(plan_in_channel) self.m = torch.nn.Linear(11, output) def forward(self, plan, knobs): x = self._net(plan) tmp = torch.cat((x, torch.Tensor(knobs)), 1) return self.m(tmp) if __name__ == "__main__": encoder = TCNNEncoder(9, 3) with open("test_plan") as f: plan = eval(f.read()) plan = generate_plan_tree(plan) print(plan) vec = encoder([plan, plan], [[0.5, 0.4, 0.6], [0.2, 0.4, 0.5]]) print(vec)