import numpy as np import time import pickle import os import datetime import pandas as pd import sys sys.path.append(os.path.dirname(os.path.abspath(__file__))+ '/..') from model.partition_tree import PartitionTree from model.join_eval import JoinEvaluator from model.query_example import synetic_queries base_dir= os.path.dirname(os.path.abspath(__file__)) """ The core partitioning algorithm class Attribute: partition_tree is constructed partition tree. """ class PartitionAlgorithm: ''' The partition algorithms, inlcuding NORA, QdTree and kd-tree. ''' def __init__(self, block_size = 10000, benchmark = 'tpch', table_name = 'lineitem'): self.block_size = block_size self.partition_tree = None self.benchmark = benchmark self.table_name = table_name self.machine_num=10 def load_data(self): ''' Load data from the data source. ''' # 读取表的阈值 metadata = pickle.load(open(f'{base_dir}/../dataset/{self.benchmark}/metadata.pkl', 'rb')) self.used_columns = metadata[self.table_name]['numeric_columns'] # 获取表域 table_min_domains,table_max_domains = [],[] for _, col_range in metadata[self.table_name]['ranges'].items(): # table_domains.append([ # datetime.datetime.combine(col_range['min'], datetime.datetime.min.time()).timestamp() if isinstance(col_range['min'], datetime.date) else float(col_range['min']), # datetime.datetime.combine(col_range['max'], datetime.datetime.min.time()).timestamp() if isinstance(col_range['max'], datetime.date) else float(col_range['max']), # ]) # datetime类型转化为'yyyy-mm-dd'的形式,其他类别不变 table_min_domains.append( col_range['min'].strftime('%Y-%m-%d') if isinstance(col_range['min'], datetime.date) else float(col_range['min']) ) table_max_domains.append( col_range['max'].strftime('%Y-%m-%d') if isinstance(col_range['max'], datetime.date) else float(col_range['max']) ) self.table_domains = table_min_domains+table_max_domains self.column_width=metadata[self.table_name]['width'] # 读取csv数据 data = pd.read_csv(f'{base_dir}/../dataset/{self.benchmark}/{self.table_name}.csv', usecols=self.used_columns) self.tabledata = data.values def load_join_query(self,using_example=False,join_indeuced='MTO'): ''' Load join query from the data source. ''' metadata = pickle.load(open(f'{base_dir}/../dataset/{self.benchmark}/metadata.pkl', 'rb')) if using_example: query_dict=synetic_queries else: if join_indeuced=='MTO': query_dict=pickle.load(open(f'{base_dir}/../queryset/{self.benchmark}/mto_queries.pkl','rb')) elif join_indeuced=='PAC': query_dict=pickle.load(open(f'{base_dir}/../queryset/{self.benchmark}/pac_queries.pkl','rb')) else: query_dict=pickle.load(open(f'{base_dir}/../queryset/{self.benchmark}/paw_queries.pkl','rb')) join_freqs={table:{} for table in metadata.keys()} join_query_vectors=[] # for queryid, item in query_dict.items(): # query_vectors={} # for table, col_range in item['ranges'].items(): # min_vector,max_vector=[],[] # for col in col_range: # min_vector.append( # col_range[col]['min'].strftime('%Y-%m-%d') if isinstance(col_range[col]['min'], datetime.date) else col_range[col]['min'] if isinstance(col_range[col]['min'], str) else float(col_range[col]['min']) # ) # max_vector.append( # col_range[col]['max'].strftime('%Y-%m-%d') if isinstance(col_range[col]['max'], datetime.date) else col_range[col]['max'] if isinstance(col_range[col]['max'], str) else float(col_range[col]['max']) # ) # if min_vector: # query_vectors[table]=min_vector+max_vector for queryid, item in query_dict.items(): join_vector={'vectors':[],'join_relations':[]} for join_info in item['join_info']: for join_preds in join_info['join_keys']: left_cond,right_cond=join_preds.split('=') left_table,left_col=left_cond.split('.') left_col=metadata[left_table]['numeric_columns'].index(left_col) right_table,right_col=right_cond.split('.') right_col=metadata[right_table]['numeric_columns'].index(right_col) join_dict={left_table:left_col,right_table:right_col} if left_table not in item['ranges']: item['ranges'][left_table]={} if right_table not in item['ranges']: item['ranges'][right_table]={} join_freqs[left_table][left_col] = join_freqs[left_table].setdefault(left_col, 0) + 1 join_freqs[right_table][right_col] = join_freqs[right_table].setdefault(right_col, 0) + 1 if join_dict not in join_vector['join_relations']: join_vector['join_relations'].append(join_dict) query_vectors={} for table, col_ranges in item['ranges'].items(): min_vector,max_vector=[],[] for col_name,col_domain in metadata[table]['ranges'].items(): if col_name not in col_ranges: min_vector.append(col_domain['min'].strftime('%Y-%m-%d') if isinstance(col_domain['min'], datetime.date) else col_domain['min'] if isinstance(col_domain['min'], str) else float(col_domain['min'])) max_vector.append(col_domain['max'].strftime('%Y-%m-%d') if isinstance(col_domain['max'], datetime.date) else col_domain['max'] if isinstance(col_domain['max'], str) else float(col_domain['max'])) else: min_vector.append( col_ranges[col_name]['min'].strftime('%Y-%m-%d') if isinstance(col_ranges[col_name]['min'], datetime.date) else col_ranges[col_name]['min'] if isinstance(col_ranges[col_name]['min'], str) else float(col_ranges[col_name]['min']) ) max_vector.append( col_ranges[col_name]['max'].strftime('%Y-%m-%d') if isinstance(col_ranges[col_name]['max'], datetime.date) else col_ranges[col_name]['max'] if isinstance(col_ranges[col_name]['max'], str) else float(col_ranges[col_name]['max']) ) if min_vector: query_vectors[table]=min_vector+max_vector join_vector['vectors']=query_vectors join_query_vectors.append(join_vector) self.join_queries = join_query_vectors # join_freqs按照 value的值对key进行降序排列 self.join_freqs= {table:dict(sorted(join_freqs[table].items(), key=lambda item: item[1], reverse=True)) for table in join_freqs.keys()} def load_query(self,using_example=False,join_indeuced='MTO'): ''' Load query from the data source. ''' if using_example: query_dict=synetic_queries else: if join_indeuced=='MTO': query_dict=pickle.load(open(f'{base_dir}/../queryset/{self.benchmark}/mto_queries.pkl','rb')) elif join_indeuced=='PAC': query_dict=pickle.load(open(f'{base_dir}/../queryset/{self.benchmark}/pac_queries.pkl','rb')) else: query_dict=pickle.load(open(f'{base_dir}/../queryset/{self.benchmark}/paw_queries.pkl','rb')) metadata = pickle.load(open(f'{base_dir}/../dataset/{self.benchmark}/metadata.pkl', 'rb')) query_vectors=[] # for queryid, item in query_dict.items(): # min_vector,max_vector=[],[] # for table, col_range in item['ranges'].items(): # if table == self.table_name: # for col in col_range: # min_vector.append( # col_range[col]['min'].strftime('%Y-%m-%d') if isinstance(col_range[col]['min'], datetime.date) else col_range[col]['min'] if isinstance(col_range[col]['min'], str) else float(col_range[col]['min']) # ) # max_vector.append( # col_range[col]['max'].strftime('%Y-%m-%d') if isinstance(col_range[col]['max'], datetime.date) else col_range[col]['max'] if isinstance(col_range[col]['max'], str) else float(col_range[col]['max']) # ) # if min_vector: query_vectors.append(min_vector+max_vector) for queryid, item in query_dict.items(): min_vector,max_vector=[],[] for table, col_ranges in item['ranges'].items(): if table!=self.table_name: continue for col_name,col_domain in metadata[table]['ranges'].items(): if col_name not in col_ranges: min_vector.append(col_domain['min'].strftime('%Y-%m-%d') if isinstance(col_domain['min'], datetime.date) else col_domain['min'] if isinstance(col_domain['min'], str) else float(col_domain['min'])) max_vector.append(col_domain['max'].strftime('%Y-%m-%d') if isinstance(col_domain['max'], datetime.date) else col_domain['max'] if isinstance(col_domain['max'], str) else float(col_domain['max'])) else: min_vector.append( col_ranges[col_name]['min'].strftime('%Y-%m-%d') if isinstance(col_ranges[col_name]['min'], datetime.date) else col_ranges[col_name]['min'] if isinstance(col_ranges[col_name]['min'], str) else float(col_ranges[col_name]['min']) ) max_vector.append( col_ranges[col_name]['max'].strftime('%Y-%m-%d') if isinstance(col_ranges[col_name]['max'], datetime.date) else col_ranges[col_name]['max'] if isinstance(col_ranges[col_name]['max'], str) else float(col_ranges[col_name]['max']) ) if min_vector: query_vectors.append(min_vector+max_vector) # join_vector=[] # for join_info in item['join_info']: # for join_preds in join_info['join_keys']: # left_cond,right_cond=join_preds.split('=') # left_table,left_col=left_cond.split('.') # right_table,right_col=right_cond.split('.') # if left_table==table_name: # join_vector.append(self.used_columns.index(left_col)) # if right_table==table_name: # join_vector.append(self.used_columns.index(right_col)) # if min_vector: # join_query_vectors.append(list(set(join_vector))) self.queries = query_vectors # self.join_queries = join_query_vectors # MTO layout def InitializeWithQDT(self): ''' # should I also store the candidate cut positions in Partition Node ? The dimension of queries should match the dimension of boundary and dataset! ''' start_time = time.time() num_dims=len(self.used_columns) boundary=self.table_domains self.partition_tree = PartitionTree(num_dims, boundary) self.partition_tree.name='QDT' self.partition_tree.used_columns=self.used_columns self.partition_tree.column_width=self.column_width self.partition_tree.pt_root.node_size = len(self.tabledata) self.partition_tree.pt_root.dataset = self.tabledata self.partition_tree.pt_root.queryset = self.queries # assume all queries overlap with the boundary self.__QDT(self.block_size) end_time = time.time() print(f"{self.table_name} Build Time (s):", end_time-start_time) self.partition_tree.build_time = end_time - start_time self.partition_tree.save_tree(f'{base_dir}/../layouts/{self.benchmark}/{self.table_name}-QDT.pkl') def __QDT(self, block_size): ''' the QdTree partition algorithm ''' CanSplit = True print_s=True while CanSplit: CanSplit = False # for leaf in self.partition_tree.get_leaves(): leaves = self.partition_tree.get_leaves() #print("# number of leaf nodes:",len(leaves)) for leaf in leaves: # print("current leaf node id:",leaf.nid, "leaf node dataset size:",len(leaf.dataset)) if leaf.node_size < 2 * block_size: continue candidate_cuts = leaf.get_candidate_cuts() # get best candidate cut position skip, max_skip, max_skip_split_dim, max_skip_split_value = 0, -1, 0, 0 for split_dim, split_value in candidate_cuts: valid,skip,_,_ = leaf.if_split(split_dim, split_value, block_size) if valid and skip > max_skip: max_skip = skip max_skip_split_dim = split_dim max_skip_split_value = split_value if max_skip >= 0: # if the cost become smaller, apply the cut if print_s: print("QDTREE CUT!") print_s=False child_node1, child_node2 = self.partition_tree.apply_split(leaf.nid, max_skip_split_dim, max_skip_split_value) # print(" Split on node id:", leaf.nid) CanSplit = True def LogicalJoinTree(self,join_col,join_max_depth=3): ''' # should I also store the candidate cut positions in Partition Node ? The dimension of queries should match the dimension of boundary and dataset! ''' start_time = time.time() tree_path=f'{base_dir}/../layouts/{self.benchmark}/logical/{self.table_name}-{join_col}-tree.pkl' # if not os.path.exists(tree_path): for i in range(1,self.machine_num): if 2**i>=self.machine_num: join_max_depth=i+1 break num_dims=len(self.used_columns) boundary=self.table_domains self.partition_tree = PartitionTree(num_dims, boundary) self.partition_tree.name='LJT' self.partition_tree.join_attr = self.used_columns.index(join_col) self.partition_tree.join_depth = join_max_depth self.partition_tree.used_columns=self.used_columns self.partition_tree.column_width=self.column_width self.partition_tree.pt_root.node_size = len(self.tabledata) self.partition_tree.pt_root.dataset = self.tabledata self.partition_tree.pt_root.queryset = self.queries canSplit = True cur_depth = 0 # 仅在join_cols不为空时执行top-layer tree construction while canSplit: canSplit = False leaves = self.partition_tree.get_leaves() cur_depth += 1 for leaf in leaves: if leaf.node_size < 2 * self.block_size: continue if cur_depth<=join_max_depth: split_dim=self.partition_tree.join_attr split_value = np.median(leaf.dataset[:, split_dim]) valid, skip, _, _ = leaf.if_split(split_dim, split_value, self.block_size) if valid: child_node1, child_node2 = self.partition_tree.apply_split(leaf.nid, split_dim, split_value) canSplit = True else: candidate_cuts = leaf.get_candidate_cuts(extended=True) # get best candidate cut position skip, max_skip, max_skip_split_dim, max_skip_split_value = 0, -1, 0, 0 for split_dim, split_value in candidate_cuts: valid, skip, _, _ = leaf.if_split(split_dim, split_value, self.block_size) if valid and skip > max_skip: max_skip = skip max_skip_split_dim = split_dim max_skip_split_value = split_value if max_skip >= 0: # if the cost become smaller, apply the cut child_node1, child_node2 = self.partition_tree.apply_split(leaf.nid, max_skip_split_dim, max_skip_split_value) canSplit = True end_time = time.time() self.partition_tree.build_time = end_time - start_time self.partition_tree.save_tree(tree_path) # AdaptDB layout def InitializeWithADP(self,join_depth=3): ''' # should I also store the candidate cut positions in Partition Node ? The dimension of queries should match the dimension of boundary and dataset! ''' join_cols=list(self.join_freqs[self.table_name].keys()) num_dims=len(self.used_columns) boundary=self.table_domains self.partition_tree = PartitionTree(num_dims, boundary) self.partition_tree.name='AdaptDB' self.partition_tree.join_attr = join_cols self.partition_tree.join_depth = join_depth self.partition_tree.used_columns=self.used_columns self.partition_tree.column_width=self.column_width self.partition_tree.pt_root.node_size = len(self.tabledata) self.partition_tree.pt_root.dataset = self.tabledata self.partition_tree.pt_root.queryset = self.queries # assume all queries overlap with the boundary start_time = time.time() self.__ADP(self.block_size, join_cols, join_depth) end_time = time.time() print(f"{self.table_name} Build Time (s):", end_time-start_time) self.partition_tree.build_time = end_time - start_time self.partition_tree.save_tree(f'{base_dir}/../layouts/{self.benchmark}/{self.table_name}-ADP.pkl') def __ADP(self, block_size, join_cols, join_depth): print_s=True # top-layer tree construction canSplit = True cur_depth = 1 # 仅在join_cols不为空时执行top-layer tree construction if join_cols: while canSplit: canSplit = False leaves = self.partition_tree.get_leaves() cur_depth += 1 if cur_depth>join_depth: break for leaf in leaves: if leaf.node_size < 2 * block_size or leaf.depth < cur_depth: continue # 选择剩余可分配数据最大的属性作为下一个切割点 # temp_allocations = self.partition_tree.allocations.copy() split_dim=join_cols[0] split_value = np.median(leaf.dataset[:, split_dim]) valid, skip, _, _ = leaf.if_split(split_dim, split_value, block_size) if valid: if print_s: print("AdaptDB CUT!") print_s=False child_node1, child_node2 = self.partition_tree.apply_split(leaf.nid, split_dim, split_value) child_node1.depth = leaf.depth + 1 child_node2.depth = leaf.depth + 1 # self.partition_tree.allocations[split_dim] -= 2.0 / pow(2, cur_depth) canSplit = True # bottom-layer tree construction CanSplit = True while CanSplit: CanSplit = False # for leaf in self.partition_tree.get_leaves(): leaves = self.partition_tree.get_leaves() # print("# number of leaf nodes:",len(leaves)) for leaf in leaves: # print("current leaf node id:",leaf.nid, "leaf node dataset size:",len(leaf.dataset)) if leaf.node_size < 2 * block_size: continue candidate_cuts = leaf.get_candidate_cuts() # get best candidate cut position skip, max_skip, max_skip_split_dim, max_skip_split_value = 0, -1, 0, 0 for split_dim, split_value in candidate_cuts: valid, skip, _, _ = leaf.if_split(split_dim, split_value, block_size) if valid and skip > max_skip: max_skip = skip max_skip_split_dim = split_dim max_skip_split_value = split_value if max_skip >= 0: # if the cost become smaller, apply the cut child_node1, child_node2 = self.partition_tree.apply_split(leaf.nid, max_skip_split_dim, max_skip_split_value) CanSplit = True # join tree layout def InitializeWithJT(self,join_depth=3): ''' # should I also store the candidate cut positions in Partition Node ? The dimension of queries should match the dimension of boundary and dataset! ''' join_cols=list(self.join_freqs[self.table_name].keys()) # 1->2 2->4 3->8 4->16 for i in range(1,self.machine_num): if 2**i>=self.machine_num: join_depth=i+1 break num_dims=len(self.used_columns) boundary=self.table_domains self.partition_tree = PartitionTree(num_dims, boundary) self.partition_tree.name='JoinTree' self.partition_tree.join_attr = join_cols self.partition_tree.join_depth = join_depth self.partition_tree.used_columns=self.used_columns self.partition_tree.column_width=self.column_width self.partition_tree.pt_root.node_size = len(self.tabledata) self.partition_tree.pt_root.dataset = self.tabledata self.partition_tree.pt_root.queryset = self.queries # assume all queries overlap with the boundary start_time = time.time() self.__JT(self.block_size, join_cols, join_depth) end_time = time.time() print(f"{self.table_name} Build Time (s):", end_time-start_time) self.partition_tree.build_time = end_time - start_time self.partition_tree.save_tree(f'{base_dir}/../layouts/{self.benchmark}/{self.table_name}-JT.pkl') def __JT(self, block_size, join_cols, join_depth): print_s=True # top-layer tree construction canSplit = True cur_depth = 1 # 仅在join_cols不为空时执行top-layer tree construction if join_cols: while canSplit: canSplit = False leaves = self.partition_tree.get_leaves() cur_depth += 1 if cur_depth>join_depth: break for leaf in leaves: if leaf.node_size < 2 * block_size or leaf.depth < cur_depth: continue # 选择剩余可分配数据最大的属性作为下一个切割点 # temp_allocations = self.partition_tree.allocations.copy() split_dim=join_cols[0] split_value = np.median(leaf.dataset[:, split_dim]) valid, skip, _, _ = leaf.if_split(split_dim, split_value, block_size) if valid: if print_s: print("JoinTree CUT!") print_s=False child_node1, child_node2 = self.partition_tree.apply_split(leaf.nid, split_dim, split_value) child_node1.depth = leaf.depth + 1 child_node2.depth = leaf.depth + 1 # self.partition_tree.allocations[split_dim] -= 2.0 / pow(2, cur_depth) canSplit = True # bottom-layer tree construction CanSplit = True while CanSplit: CanSplit = False # for leaf in self.partition_tree.get_leaves(): leaves = self.partition_tree.get_leaves() # print("# number of leaf nodes:",len(leaves)) for leaf in leaves: # print("current leaf node id:",leaf.nid, "leaf node dataset size:",len(leaf.dataset)) if leaf.node_size < 2 * block_size: continue candidate_cuts = leaf.get_candidate_cuts() # get best candidate cut position skip, max_skip, max_skip_split_dim, max_skip_split_value = 0, -1, 0, 0 for split_dim, split_value in candidate_cuts: valid, skip, _, _ = leaf.if_split(split_dim, split_value, block_size) if valid and skip > max_skip: max_skip = skip max_skip_split_dim = split_dim max_skip_split_value = split_value if max_skip >= 0: # if the cost become smaller, apply the cut child_node1, child_node2 = self.partition_tree.apply_split(leaf.nid, max_skip_split_dim, max_skip_split_value) CanSplit = True def evaluate_single_table_access_cost(self): self.partition_tree.evaluate_query_cost(self.queries,len(self.tabledata),self.column_width,self.used_columns) def evaluate_multiple_table_access_cost(self,trees): group_type=0 if trees[list(trees.keys())[0]].name=='AdaptDB': group_type=0 elif trees[trees.keys()[0]].name=='JoinTree': group_type=1 tot_cost={} for join_query in self.join_queries: # 分别处理每条查询 if join_query['join_relations']: for join_op in join_query['join_relations']: # 分别处理查询中的每个join操作 join_queryset,joined_cols,joined_trees=[],[],[] for join_table,join_col in join_op.items(): # overlapped_leaf_ids=trees[join_table].query_single(join_query['vectors'][join_table]) join_queryset.append(join_query['vectors'][join_table]) joined_cols.append(join_col) joined_trees.append(trees[join_table]) join_eval = JoinEvaluator(join_queryset,joined_cols,joined_trees) #jobv2 hyper_read_cost, shuffle_read_cost, hyper_cost_list = join_eval.compute_total_shuffle_hyper_cost(group_type) tot_cost+=hyper_read_cost+shuffle_read_cost else: for tablename in join_query['vectors']: tot_cost[tablename]=tot_cost.setdefault(tablename,0)+trees[tablename].evaluate_query_cost([join_query['vectors'][tablename]]) return tot_cost