from concurrent.futures.thread import _worker import numpy as np import time import pickle import os import datetime import pandas as pd import sys sys.path.append(os.path.dirname(os.path.abspath(__file__)) + "/..") from model.partition_tree import PartitionTree from model.query_example import synetic_queries base_dir = os.path.dirname(os.path.abspath(__file__)) """ The core partitioning algorithm class Attribute: partition_tree is constructed partition tree. """ class PartitionAlgorithm: """ The partition algorithms, inlcuding NORA, QdTree and kd-tree. """ def __init__(self, block_size=10000, benchmark="tpch", table_name="lineitem"): self.block_size = block_size self.partition_tree = None self.benchmark = benchmark self.table_name = table_name self.machine_num = 10 self.sample_rate = self.calculate_global_sample_rate() # default 1 def calculate_global_sample_rate( self, max_sample_size=1000000, dim_table_threshold=10000 ): """ Calculate the global sample rate uniformly based on the largest table size """ base_dir = os.path.dirname(os.path.abspath(__file__)) metadata_path = f"{base_dir}/../dataset/{self.benchmark}/metadata.pkl" if not os.path.exists(metadata_path): return 1.0 metadata = pickle.load(open(metadata_path, "rb")) max_rows = 0 for table, table_meta in metadata.items(): rows = table_meta["rows"] if rows > dim_table_threshold: max_rows = max(max_rows, rows) if max_rows > max_sample_size: return max_sample_size / max_rows return 1.0 def load_data(self, dim_table_threshold=10000): """ Load data from the data source. """ metadata = pickle.load( open(f"{base_dir}/../dataset/{self.benchmark}/metadata.pkl", "rb") ) self.used_columns = metadata[self.table_name]["numeric_columns"] # Get table domains table_min_domains, table_max_domains = [], [] for _, col_range in metadata[self.table_name]["ranges"].items(): # Convert datetime to 'yyyy-mm-dd' format, keep other types unchanged table_min_domains.append( col_range["min"].strftime("%Y-%m-%d") if isinstance(col_range["min"], datetime.date) else float(col_range["min"]) ) table_max_domains.append( col_range["max"].strftime("%Y-%m-%d") if isinstance(col_range["max"], datetime.date) else float(col_range["max"]) ) self.table_domains = table_min_domains + table_max_domains self.column_width = metadata[self.table_name]["width"] total_rows = metadata[self.table_name]["rows"] # Read CSV data file_path = f"{base_dir}/../dataset/{self.benchmark}/{self.table_name}.csv" if self.sample_rate < 1.0 and total_rows > dim_table_threshold: data = pd.read_csv( file_path, usecols=self.used_columns, skiprows=lambda i: i > 0 and np.random.random() > self.sample_rate, ) print( f"Sample {self.table_name}: Total rows {total_rows}, Sampled {len(data)} rows, Sample rate {self.sample_rate:.4f}" ) else: data = pd.read_csv(file_path, usecols=self.used_columns) self.tabledata = data.values def load_join_query(self, using_example=False, join_indeuced="MTO"): """ Load join query from the data source. """ metadata = pickle.load( open(f"{base_dir}/../dataset/{self.benchmark}/metadata.pkl", "rb") ) if using_example: query_dict = synetic_queries else: if join_indeuced == "MTO": query_dict = pickle.load( open( f"{base_dir}/../queryset/{self.benchmark}/mto_queries.pkl", "rb" ) ) elif join_indeuced == "PAC": query_dict = pickle.load( open( f"{base_dir}/../queryset/{self.benchmark}/pac_queries.pkl", "rb" ) ) else: query_dict = pickle.load( open( f"{base_dir}/../queryset/{self.benchmark}/paw_queries.pkl", "rb" ) ) join_freqs = {table: {} for table in metadata.keys()} join_query_vectors = [] # for queryid, item in query_dict.items(): # query_vectors={} # for table, col_range in item['ranges'].items(): # min_vector,max_vector=[],[] # for col in col_range: # min_vector.append( # col_range[col]['min'].strftime('%Y-%m-%d') if isinstance(col_range[col]['min'], datetime.date) else col_range[col]['min'] if isinstance(col_range[col]['min'], str) else float(col_range[col]['min']) # ) # max_vector.append( # col_range[col]['max'].strftime('%Y-%m-%d') if isinstance(col_range[col]['max'], datetime.date) else col_range[col]['max'] if isinstance(col_range[col]['max'], str) else float(col_range[col]['max']) # ) # if min_vector: # query_vectors[table]=min_vector+max_vector for queryid, item in query_dict.items(): join_vector = {"vectors": [], "join_relations": []} for join_info in item["join_info"]: for join_preds in join_info["join_keys"]: left_cond, right_cond = join_preds.split("=") left_table, left_col = left_cond.split(".") left_col = metadata[left_table]["numeric_columns"].index(left_col) right_table, right_col = right_cond.split(".") right_col = metadata[right_table]["numeric_columns"].index( right_col ) join_dict = {left_table: left_col, right_table: right_col} if left_table not in item["ranges"]: item["ranges"][left_table] = {} if right_table not in item["ranges"]: item["ranges"][right_table] = {} join_freqs[left_table][left_col] = ( join_freqs[left_table].setdefault(left_col, 0) + 1 ) join_freqs[right_table][right_col] = ( join_freqs[right_table].setdefault(right_col, 0) + 1 ) if join_dict not in join_vector["join_relations"]: join_vector["join_relations"].append(join_dict) query_vectors = {} for table, col_ranges in item["ranges"].items(): min_vector, max_vector = [], [] for col_name, col_domain in metadata[table]["ranges"].items(): if col_name not in col_ranges: min_vector.append( col_domain["min"].strftime("%Y-%m-%d") if isinstance(col_domain["min"], datetime.date) else ( col_domain["min"] if isinstance(col_domain["min"], str) else float(col_domain["min"]) ) ) max_vector.append( col_domain["max"].strftime("%Y-%m-%d") if isinstance(col_domain["max"], datetime.date) else ( col_domain["max"] if isinstance(col_domain["max"], str) else float(col_domain["max"]) ) ) else: min_vector.append( col_ranges[col_name]["min"].strftime("%Y-%m-%d") if isinstance(col_ranges[col_name]["min"], datetime.date) else ( col_ranges[col_name]["min"] if isinstance(col_ranges[col_name]["min"], str) else float(col_ranges[col_name]["min"]) ) ) max_vector.append( col_ranges[col_name]["max"].strftime("%Y-%m-%d") if isinstance(col_ranges[col_name]["max"], datetime.date) else ( col_ranges[col_name]["max"] if isinstance(col_ranges[col_name]["max"], str) else float(col_ranges[col_name]["max"]) ) ) if min_vector: query_vectors[table] = min_vector + max_vector join_vector["vectors"] = query_vectors join_query_vectors.append(join_vector) self.join_queries = join_query_vectors self.join_freqs = { table: dict( sorted( join_freqs[table].items(), key=lambda item: item[1], reverse=True ) ) for table in join_freqs.keys() } def load_query(self, using_example=False, join_indeuced="MTO"): """ Load query from the data source. """ if using_example: query_dict = synetic_queries else: if join_indeuced == "MTO": query_dict = pickle.load( open( f"{base_dir}/../queryset/{self.benchmark}/mto_queries.pkl", "rb" ) ) elif join_indeuced == "PAC": query_dict = pickle.load( open( f"{base_dir}/../queryset/{self.benchmark}/pac_queries.pkl", "rb" ) ) else: query_dict = pickle.load( open( f"{base_dir}/../queryset/{self.benchmark}/paw_queries.pkl", "rb" ) ) metadata = pickle.load( open(f"{base_dir}/../dataset/{self.benchmark}/metadata.pkl", "rb") ) query_vectors = [] for queryid, item in query_dict.items(): min_vector, max_vector = [], [] for table, col_ranges in item["ranges"].items(): if table != self.table_name: continue for col_name, col_domain in metadata[table]["ranges"].items(): if col_name not in col_ranges: min_vector.append( col_domain["min"].strftime("%Y-%m-%d") if isinstance(col_domain["min"], datetime.date) else ( col_domain["min"] if isinstance(col_domain["min"], str) else float(col_domain["min"]) ) ) max_vector.append( col_domain["max"].strftime("%Y-%m-%d") if isinstance(col_domain["max"], datetime.date) else ( col_domain["max"] if isinstance(col_domain["max"], str) else float(col_domain["max"]) ) ) else: min_vector.append( col_ranges[col_name]["min"].strftime("%Y-%m-%d") if isinstance(col_ranges[col_name]["min"], datetime.date) else ( col_ranges[col_name]["min"] if isinstance(col_ranges[col_name]["min"], str) else float(col_ranges[col_name]["min"]) ) ) max_vector.append( col_ranges[col_name]["max"].strftime("%Y-%m-%d") if isinstance(col_ranges[col_name]["max"], datetime.date) else ( col_ranges[col_name]["max"] if isinstance(col_ranges[col_name]["max"], str) else float(col_ranges[col_name]["max"]) ) ) if min_vector: query_vectors.append(min_vector + max_vector) self.queries = query_vectors def eval_len_node(self, dataset): return int(len(dataset) // self.sample_rate) # MTO layout def InitializeWithQDT(self): """ # should I also store the candidate cut positions in Partition Node ? The dimension of queries should match the dimension of boundary and dataset! """ start_time = time.time() num_dims = len(self.used_columns) boundary = self.table_domains self.partition_tree = PartitionTree(num_dims, boundary, self.sample_rate) self.partition_tree.name = "QDT" self.partition_tree.used_columns = self.used_columns self.partition_tree.column_width = self.column_width self.partition_tree.pt_root.node_size = self.eval_len_node(self.tabledata) self.partition_tree.pt_root.dataset = self.tabledata self.partition_tree.pt_root.queryset = ( self.queries ) # assume all queries overlap with the boundary self.__QDT(self.block_size) end_time = time.time() print(f"{self.table_name} Build Time (s):", end_time - start_time) self.partition_tree.build_time = end_time - start_time self.partition_tree.save_tree( f"{base_dir}/../layouts/bs={self.block_size}/{self.benchmark}/{self.table_name}-QDT.pkl" ) def __QDT(self, block_size): """ the QdTree partition algorithm """ CanSplit = True print_s = True while CanSplit: CanSplit = False # for leaf in self.partition_tree.get_leaves(): leaves = self.partition_tree.get_leaves() # print("# number of leaf nodes:",len(leaves)) for leaf in leaves: # print("current leaf node id:",leaf.nid, "leaf node dataset size:",len(leaf.dataset)) if leaf.node_size < 2 * block_size: continue candidate_cuts = leaf.get_candidate_cuts() # get best candidate cut position skip, max_skip, max_skip_split_dim, max_skip_split_value = 0, -1, 0, 0 for split_dim, split_value in candidate_cuts: valid, skip, _, _ = leaf.if_split( split_dim, split_value, block_size, self.sample_rate ) if valid and skip > max_skip: max_skip = skip max_skip_split_dim = split_dim max_skip_split_value = split_value if max_skip >= 0: # if the cost become smaller, apply the cut if print_s: print("QDTREE CUT!") print_s = False child_node1, child_node2 = self.partition_tree.apply_split( leaf.nid, max_skip_split_dim, max_skip_split_value ) # print(" Split on node id:", leaf.nid) CanSplit = True def LogicalJoinTree(self, join_col, join_max_depth=3): """ # should I also store the candidate cut positions in Partition Node ? The dimension of queries should match the dimension of boundary and dataset! """ start_time = time.time() logical_tree_path = ( f"{base_dir}/../layouts/bs={self.block_size}/{self.benchmark}/logical" ) os.makedirs(logical_tree_path, exist_ok=True) tree_path = f"{logical_tree_path}/{self.table_name}-{join_col}-tree.pkl" for i in range(1, self.machine_num): if 2**i >= self.machine_num: join_max_depth = i + 1 break num_dims = len(self.used_columns) boundary = self.table_domains self.partition_tree = PartitionTree(num_dims, boundary, self.sample_rate) self.partition_tree.name = "LJT" self.partition_tree.join_attr = self.used_columns.index(join_col) self.partition_tree.join_depth = join_max_depth self.partition_tree.used_columns = self.used_columns self.partition_tree.column_width = self.column_width self.partition_tree.pt_root.node_size = self.eval_len_node(self.tabledata) self.partition_tree.pt_root.dataset = self.tabledata self.partition_tree.pt_root.queryset = self.queries canSplit = True cur_depth = 0 while canSplit: canSplit = False leaves = self.partition_tree.get_leaves() cur_depth += 1 for leaf in leaves: if leaf.node_size < 2 * self.block_size: continue if cur_depth <= join_max_depth: split_dim = self.partition_tree.join_attr split_value = np.median(leaf.dataset[:, split_dim]) valid, skip, _, _ = leaf.if_split( split_dim, split_value, self.block_size, self.sample_rate ) if valid: child_node1, child_node2 = self.partition_tree.apply_split( leaf.nid, split_dim, split_value ) canSplit = True else: candidate_cuts = leaf.get_candidate_cuts(extended=True) skip, max_skip, max_skip_split_dim, max_skip_split_value = ( 0, -1, 0, 0, ) for split_dim, split_value in candidate_cuts: valid, skip, _, _ = leaf.if_split( split_dim, split_value, self.block_size, self.sample_rate ) if valid and skip > max_skip: max_skip = skip max_skip_split_dim = split_dim max_skip_split_value = split_value if max_skip >= 0: # if the cost become smaller, apply the cut child_node1, child_node2 = self.partition_tree.apply_split( leaf.nid, max_skip_split_dim, max_skip_split_value ) canSplit = True end_time = time.time() self.partition_tree.build_time = end_time - start_time self.partition_tree.save_tree(tree_path) # AD-MTO layout def InitializeWithADP(self, join_depth=3): """ # should I also store the candidate cut positions in Partition Node ? The dimension of queries should match the dimension of boundary and dataset! """ join_cols = list(self.join_freqs[self.table_name].keys()) num_dims = len(self.used_columns) boundary = self.table_domains self.partition_tree = PartitionTree(num_dims, boundary, self.sample_rate) self.partition_tree.name = "AdaptDB" self.partition_tree.join_attr = join_cols self.partition_tree.join_depth = join_depth self.partition_tree.used_columns = self.used_columns self.partition_tree.column_width = self.column_width self.partition_tree.pt_root.node_size = self.eval_len_node(self.tabledata) self.partition_tree.pt_root.dataset = self.tabledata self.partition_tree.pt_root.queryset = ( self.queries ) # assume all queries overlap with the boundary start_time = time.time() self.__ADP(self.block_size, join_cols, join_depth) end_time = time.time() print(f"{self.table_name} Build Time (s):", end_time - start_time) self.partition_tree.build_time = end_time - start_time self.partition_tree.save_tree( f"{base_dir}/../layouts/{self.benchmark}/{self.table_name}-ADP.pkl" ) def __ADP(self, block_size, join_cols, join_depth): print_s = True canSplit = True cur_depth = 1 if join_cols: while canSplit: canSplit = False leaves = self.partition_tree.get_leaves() cur_depth += 1 if cur_depth > join_depth: break for leaf in leaves: if leaf.node_size < 2 * block_size or leaf.depth < cur_depth: continue split_dim = join_cols[0] split_value = np.median(leaf.dataset[:, split_dim]) valid, skip, _, _ = leaf.if_split( split_dim, split_value, block_size, self.sample_rate ) if valid: if print_s: print("AdaptDB CUT!") print_s = False child_node1, child_node2 = self.partition_tree.apply_split( leaf.nid, split_dim, split_value ) child_node1.depth = leaf.depth + 1 child_node2.depth = leaf.depth + 1 canSplit = True # bottom-layer tree construction CanSplit = True while CanSplit: CanSplit = False leaves = self.partition_tree.get_leaves() for leaf in leaves: # print("current leaf node id:",leaf.nid, "leaf node dataset size:",len(leaf.dataset)) if leaf.node_size < 2 * block_size: continue candidate_cuts = leaf.get_candidate_cuts() # get best candidate cut position skip, max_skip, max_skip_split_dim, max_skip_split_value = 0, -1, 0, 0 for split_dim, split_value in candidate_cuts: valid, skip, _, _ = leaf.if_split( split_dim, split_value, block_size, self.sample_rate ) if valid and skip > max_skip: max_skip = skip max_skip_split_dim = split_dim max_skip_split_value = split_value if max_skip >= 0: # if the cost become smaller, apply the cut child_node1, child_node2 = self.partition_tree.apply_split( leaf.nid, max_skip_split_dim, max_skip_split_value ) CanSplit = True def InitializeWithNORA( self, saved=False, ): num_dims = len(self.used_columns) boundary = self.table_domains self.partition_tree = PartitionTree(num_dims, boundary, self.sample_rate) self.partition_tree.name = "NORATree" self.partition_tree.used_columns = self.used_columns self.partition_tree.column_width = self.column_width self.partition_tree.pt_root.node_size = self.eval_len_node(self.tabledata) self.partition_tree.pt_root.dataset = self.tabledata self.partition_tree.pt_root.queryset = ( self.queries ) # assume all queries overlap with the boundary self.partition_tree.pt_root.generate_query_MBRs() start_time = time.time() self.__NORA(self.block_size, depth_limit=None) end_time = time.time() print(f"{self.table_name} Build Time (s):", end_time - start_time) self.partition_tree.build_time = end_time - start_time if saved: self.partition_tree.save_tree( f"{base_dir}/../layouts/{self.benchmark}/{self.table_name}-NORA.pkl" ) def cal_optimal_join_depth(self, join_cols): # 1->2 2->4 3->8 4->16 try: best_depth,lowest_cost=0,float('inf') for xi in [1,2,3,4]: max_nodes=xi*self.machine_num join_depth=0 for i in range(1, max_nodes): if 2**i >= max_nodes: join_depth = i + 1 break print(f"candidate join depth:{join_depth}") partition_tree=self.__fake_JT(self.block_size, join_cols, join_depth,self.machine_num) join_cost=partition_tree.estimate_join_cost(self.table_name,self.join_queries) scan_cost=partition_tree.evaluate_query_cost(self.queries) if join_cost+scan_cost= max_nodes: best_depth = i + 1 break return best_depth def InitializeWithJT( self, join_depth=3, enable_bounding_split=False, saved=False, enable_median_extend=True, ): """ # should I also store the candidate cut positions in Partition Node ? The dimension of queries should match the dimension of boundary and dataset! """ join_cols = list(self.join_freqs[self.table_name].keys()) join_depth = self.cal_optimal_join_depth(join_cols) self.partition_tree = PartitionTree(len(self.used_columns), self.table_domains, self.sample_rate) self.partition_tree.name = "JoinTree" self.partition_tree.join_attr = join_cols self.partition_tree.join_depth = join_depth self.partition_tree.used_columns = self.used_columns self.partition_tree.column_width = self.column_width self.partition_tree.pt_root.node_size = self.eval_len_node(self.tabledata) self.partition_tree.pt_root.dataset = self.tabledata self.partition_tree.pt_root.queryset = ( self.queries ) # assume all queries overlap with the boundary self.partition_tree.pt_root.generate_query_MBRs() start_time = time.time() self.__JT( self.block_size, join_cols, join_depth, if_bounding_split=enable_bounding_split, if_median_extend=enable_median_extend, ) end_time = time.time() print(f"{self.table_name} Build Time (s):", end_time - start_time) self.partition_tree.build_time = end_time - start_time if saved: self.partition_tree.save_tree( f"{base_dir}/../layouts/{self.benchmark}/{self.table_name}-JT.pkl" ) def __fake_JT(self, block_size, join_cols, join_depth, worker_num): partition_tree=PartitionTree(len(self.used_columns), self.table_domains, self.sample_rate) canSplit = True cur_depth = 1 if join_cols: while canSplit: canSplit = False leaves = partition_tree.get_leaves() cur_depth += 1 if cur_depth > join_depth: break for leaf in leaves: if leaf.node_size < 2 * block_size or leaf.depth < cur_depth: continue split_dim = join_cols[0] split_value = np.median(leaf.dataset[:, split_dim]) valid, skip, _, _ = leaf.if_split( split_dim, split_value, block_size, self.sample_rate ) if valid: child_node1, child_node2 = partition_tree.apply_split( leaf.nid, split_dim, split_value ) child_node1.depth = leaf.depth + 1 child_node2.depth = leaf.depth + 1 canSplit = True join_cost=partition_tree.estimate_join_cost(self.table_name,self.join_queries, join_cols, worker_num) scan_cost=partition_tree.evaluate_query_cost(self.queries) return join_cost+scan_cost else: return float('inf') def __JT(self,block_size,join_cols,join_depth,if_bounding_split=False,if_median_extend=True): print_s = True # top-layer tree construction canSplit = True cur_depth = 1 bouding_active_ratio = 2 if not if_bounding_split and join_cols: while canSplit: canSplit = False leaves = self.partition_tree.get_leaves() cur_depth += 1 if cur_depth > join_depth: break for leaf in leaves: if leaf.node_size < 2 * block_size or leaf.depth < cur_depth: continue # temp_allocations = self.partition_tree.allocations.copy() split_dim = join_cols[0] split_value = np.median(leaf.dataset[:, split_dim]) valid, skip, _, _ = leaf.if_split( split_dim, split_value, block_size, self.sample_rate ) if valid: if print_s: print("JoinTree CUT!") print_s = False child_node1, child_node2 = self.partition_tree.apply_split( leaf.nid, split_dim, split_value ) child_node1.depth = leaf.depth + 1 child_node2.depth = leaf.depth + 1 # self.partition_tree.allocations[split_dim] -= 2.0 / pow(2, cur_depth) canSplit = True # bottom-layer tree construction CanSplit = True while CanSplit: CanSplit = False # for leaf in self.partition_tree.get_leaves(): leaves = self.partition_tree.get_leaves() # print("# number of leaf nodes:",len(leaves)) for leaf in leaves: # print("current leaf node id:",leaf.nid, "leaf node dataset size:",len(leaf.dataset)) if ( leaf.node_size < 2 * block_size or leaf.queryset is None or leaf.is_irregular_shape or leaf.no_valid_partition ): continue if if_median_extend: candidate_cuts = leaf.get_candidate_cuts() else: candidate_cuts = leaf.get_candidate_cuts(extended=False) # get best candidate cut position ( skip, max_skip, max_skip_split_dim, max_skip_split_value, max_skip_split_type, ) = (0, -1, 0, 0, 0) if ( if_bounding_split and leaf.node_size <= bouding_active_ratio * block_size ): skip = leaf.if_general_group_split(block_size) if skip != False and skip > max_skip: ( max_skip, max_skip_split_dim, max_skip_split_value, max_skip_split_type, ) = (skip, None, None, 1) # valid, skip = leaf.if_dual_bounding_split(split_dim, split_value, block_size, approximate = False) for split_dim, split_value in candidate_cuts: valid, skip, left_size, right_size = leaf.if_split( split_dim, split_value, block_size, self.sample_rate ) if valid and skip > max_skip: max_skip = skip max_skip_split_dim = split_dim max_skip_split_value = split_value if max_skip >= 0: print("max_skip:", max_skip) # if the cost become smaller, apply the cut child_node1, child_node2 = self.partition_tree.apply_split( leaf.nid, max_skip_split_dim, max_skip_split_value, max_skip_split_type, ) CanSplit = True else: leaf.no_valid_partition = True def __NORA(self, data_threshold, depth_limit=None): """ the general NORA algorithm, which utilize bounding split, daul-bounding split and extend candidate cuts with medians """ CanSplit = True while CanSplit: CanSplit = False # for leaf in self.partition_tree.get_leaves(): leaves = self.partition_tree.get_leaves() # print("# number of leaf nodes:",len(leaves)) for leaf in leaves: # print("current leaf node id:",leaf.nid, "leaf node dataset size:",len(leaf.dataset)) if ( leaf.node_size < 2 * data_threshold or leaf.queryset is None or (depth_limit is not None and leaf.depth >= depth_limit) ): continue # get best candidate cut position ( skip, max_skip, max_skip_split_dim, max_skip_split_value, max_skip_split_type, ) = (0, -1, 0, 0, 0) # extend the candidate cut with medians when it reach the bottom candidate_cuts = ( leaf.get_candidate_cuts(extended=True) if leaf.node_size < 4 * data_threshold else leaf.get_candidate_cuts(extended=False) ) for split_dim, split_value in candidate_cuts: # first try normal split valid, skip, left_size, right_size = leaf.if_split( split_dim, split_value, data_threshold ) if valid and skip > max_skip: ( max_skip, max_skip_split_dim, max_skip_split_value, max_skip_split_type, ) = (skip, split_dim, split_value, 0) # if it's available for bounding split, try it if leaf.node_size < 3 * data_threshold: # try bounding split valid, skip, _ = leaf.if_bounding_split( data_threshold, approximate=False ) if valid and skip > max_skip: ( max_skip, max_skip_split_dim, max_skip_split_value, max_skip_split_type, ) = (skip, split_dim, split_value, 1) # if it's availble for dual-bounding split, try it elif ( leaf.node_size < 4 * data_threshold and left_size < 2 * data_threshold and right_size < 2 * data_threshold ): # try dual-bounding split valid, skip = leaf.if_dual_bounding_split( split_dim, split_value, data_threshold, approximate=False ) if valid and skip > max_skip: ( max_skip, max_skip_split_dim, max_skip_split_value, max_skip_split_type, ) = (skip, split_dim, split_value, 2) if max_skip > 0: # if the cost become smaller, apply the cut child_node1, child_node2 = self.partition_tree.apply_split( leaf.nid, max_skip_split_dim, max_skip_split_value, max_skip_split_type, ) # print(" Split on node id:", leaf.nid) CanSplit = True def InitializeWithPando(self, max_depth=10000): """ Pando Layout: Build an independent partition tree for each column, with no size limit on leaf nodes. When querying, scan matching blocks in each tree separately, then combine results using column-wise logical operations. """ start_time = time.time() num_dims = len(self.used_columns) boundary = self.table_domains # Build independent trees for each column (stored in a list) self.pando_trees = [] for col_idx in range(num_dims): tree = PartitionTree( 1, [boundary[col_idx], boundary[num_dims + col_idx]], self.sample_rate ) tree.name = f"Pando-Col{col_idx}" tree.used_columns = [self.used_columns[col_idx]] tree.column_width = { self.used_columns[col_idx]: self.column_width[ self.used_columns[col_idx] ] } # Keep only current column's data col_data = self.tabledata[:, col_idx : col_idx + 1] tree.pt_root.node_size = self.eval_len_node(col_data) tree.pt_root.dataset = col_data # Keep only current column's query ranges col_queries = [] for q in self.queries: col_q = [q[col_idx], q[num_dims + col_idx]] col_queries.append(col_q) tree.pt_root.queryset = col_queries # Build single-column tree (no leaf size limit) self.__Pando_build_tree(tree, max_depth) self.pando_trees.append(tree) # Create actual physical blocks pando_blocks self._create_pando_blocks() end_time = time.time() print(f"{self.table_name} Pando Build Time (s):", end_time - start_time) self.pando_build_time = end_time - start_time def __Pando_build_tree(self, tree, max_depth): """ Recursively build single-column partition tree, using filters from query_set as split points, with no leaf size limit. """ can_split = True cur_depth = 1 # Collect all filters from queries as candidate split points all_filters = [] for query_range in tree.pt_root.queryset: lower, upper = query_range # Add query boundaries as potential split points if lower < upper: all_filters.extend([lower, upper]) # Deduplicate and sort split points split_candidates = sorted(list(set(all_filters))) while can_split and cur_depth <= max_depth: can_split = False leaves = tree.get_leaves() cur_depth += 1 for leaf in leaves: if leaf.depth >= max_depth: continue # Get current leaf node's data range current_range = [leaf.boundary[0], leaf.boundary[1]] # Find best split point within current range best_split = None best_balance = float("inf") for split_value in split_candidates: if current_range[0] < split_value < current_range[1]: # Check if this split point effectively divides the query set left_queries = 0 right_queries = 0 for query_range in leaf.queryset: q_lower, q_upper = query_range # Calculate query intersection with left and right subspaces (handle date strings) if ( isinstance(split_value, str) and isinstance(q_upper, str) and isinstance(q_lower, str) ): # Date string comparison left_intersect = ( 1 if split_value >= q_lower and current_range[0] <= q_upper else 0 ) right_intersect = ( 1 if current_range[1] >= q_lower and split_value <= q_upper else 0 ) else: # Numeric comparison left_intersect = max( 0, min(split_value, q_upper) - max(current_range[0], q_lower), ) right_intersect = max( 0, min(current_range[1], q_upper) - max(split_value, q_lower), ) if left_intersect > 0: left_queries += 1 if right_intersect > 0: right_queries += 1 # Evaluate split quality: balance left and right query counts if left_queries > 0 and right_queries > 0: balance = abs(left_queries - right_queries) if balance < best_balance: best_balance = balance best_split = split_value # Apply best split if best_split is not None: child_node1, child_node2 = tree.apply_split(leaf.nid, 0, best_split) child_node1.depth = leaf.depth + 1 child_node2.depth = leaf.depth + 1 can_split = True def _create_pando_blocks(self): """ Create actual physical blocks pando_blocks Using flattened dictionary structure with physical block index tuples as keys and tuple lists as values This ensures tuples are correctly assigned to corresponding physical blocks """ if not hasattr(self, "pando_trees") or not self.pando_trees: print("Warning: pando_trees not initialized") return # Get leaf node count for each column's subtree col_leaf_counts = [] for i, tree in enumerate(self.pando_trees): leaves = tree.get_leaves() col_leaf_counts.append(len(leaves)) print(f"Debug: Column {i} subtree leaf node count: {len(leaves)}") print(f"Debug: Leaf node counts for all columns: {col_leaf_counts}") # Use flattened dictionary structure: keys are physical block index tuples, values are tuple lists from collections import defaultdict self.pando_blocks = defaultdict(list) # Traverse all tuples and assign to physical blocks based on routing results total_rows = len(self.tabledata) print(f"Debug: Total tuples: {total_rows}") assigned_tuples = 0 for row_idx in range(total_rows): tuple_data = self.tabledata[row_idx] # Get logical block ID for this tuple in each column's subtree block_indices = [] for col_idx, tree in enumerate(self.pando_trees): col_value = tuple_data[col_idx] # Find corresponding logical leaf node in this column's subtree leaf_id = self._find_leaf_for_value(tree, col_value) block_indices.append(leaf_id) # Assign tuple to corresponding physical block # Use tuple as key to ensure correct indexing block_key = tuple(block_indices) self.pando_blocks[block_key].append(tuple_data.tolist()) assigned_tuples += 1 # Verify assignment results total_blocks = len(self.pando_blocks) print(f"Debug: Successfully assigned {assigned_tuples} tuples to {total_blocks} physical blocks") # Print contents of first few blocks for verification block_keys = list(self.pando_blocks.keys())[:5] for key in block_keys: tuples_in_block = len(self.pando_blocks[key]) print(f"Debug: Block {key} contains {tuples_in_block} tuples") # Verify total tuple count total_assigned = sum(len(tuples) for tuples in self.pando_blocks.values()) print(f"Debug: Verify total assigned tuples: {total_assigned} / {total_rows}") def get_pando_block_count(self): """ Get total number of blocks in pando_blocks """ if not hasattr(self, "pando_blocks") or not self.pando_blocks: return 0 # For flattened dictionary structure, directly return number of keys return len(self.pando_blocks) def get_pando_block(self, *block_indices): """ Get content of specific physical block by block indices Args: *block_indices: Variable parameters, each corresponding to a column's block index Returns: List of tuples in this physical block """ if not hasattr(self, "pando_blocks") or not self.pando_blocks: return [] try: block_key = tuple(block_indices) return self.pando_blocks.get(block_key, []) except Exception: return [] def _find_leaf_for_value(self, tree, value): """ Find leaf node ID corresponding to given value in single column subtree """ current_node = tree.pt_root # Traverse directly to leaf node while current_node.children_ids: if len(current_node.children_ids) == 2: left_child = tree.nid_node_dict[current_node.children_ids[0]] right_child = tree.nid_node_dict[current_node.children_ids[1]] # Choose child node based on boundary value if value <= left_child.boundary[1]: current_node = left_child else: current_node = right_child else: # Case of single child node current_node = tree.nid_node_dict[current_node.children_ids[0]] return current_node.nid def query_pando(self, query, logic="AND"): """ Execute query on Pando layout: 1. Query logical node IDs that meet conditions in each column's corresponding tree 2. Find all physical blocks that need to be accessed based on Cartesian product of these IDs 3. Count total tuples in these physical blocks Args: query: Complete query range [l1,...,ln, u1,...,un] logic: Logical relationship between columns, "AND" or "OR" Returns: Number of accessed tuples """ if not hasattr(self, "pando_trees"): raise ValueError( "Pando layout not initialized. Call InitializeWithPando first." ) if not hasattr(self, "pando_blocks"): raise ValueError( "Pando blocks not created. Call InitializeWithPando first." ) # Debug info: Check pando_blocks structure total_blocks = self.get_pando_block_count() print(f"Debug: Total Pando blocks: {total_blocks}") num_dims = len(self.used_columns) col_block_lists = [] # Query logical node IDs that meet conditions in each column's tree for col_idx, tree in enumerate(self.pando_trees): col_query = [query[col_idx], query[num_dims + col_idx]] col_blocks = tree.query_single(col_query) col_block_lists.append(col_blocks) # Debug info: Check block IDs returned by each column print(f"Debug: Logical block IDs returned by each column: {col_block_lists}") # Check if pando_blocks is empty if total_blocks == 0: print("Warning: pando_blocks is empty, possible creation issue") return 0 # Process block lists based on logical relationship if logic == "AND": # AND logic: all columns must meet conditions # Check for empty results if any(len(blocks) == 0 for blocks in col_block_lists): print("Debug: At least one column has empty results, AND query returns 0") return 0 # Use Cartesian product to find all possible physical block combinations from itertools import product physical_blocks = product(*col_block_lists) total_tuples = 0 accessed_blocks = 0 for block_indices in physical_blocks: block_content = self.get_pando_block(*block_indices) if block_content: # Only count non-empty blocks total_tuples += len(block_content) accessed_blocks += 1 print(f"Debug: AND logic accessed blocks: {accessed_blocks}, tuples: {total_tuples}") elif logic == "OR": # OR logic: any column meeting conditions is sufficient # Collect all involved tuples, avoid duplicate counting visited_tuples = set() # Get leaf node count for each column leaf_counts = [len(tree.get_leaves()) for tree in self.pando_trees] # Use set to collect all physical block indices that need to be accessed all_block_indices = set() # For each column with query conditions for col_idx, col_blocks in enumerate(col_block_lists): if col_blocks: # This column has query results # Generate all possible values for other columns other_ranges = [] for other_col_idx in range(len(self.pando_trees)): if other_col_idx == col_idx: other_ranges.append(col_blocks) else: other_ranges.append(range(leaf_counts[other_col_idx])) from itertools import product # Generate all possible complete block index combinations for block_indices in product(*other_ranges): all_block_indices.add(block_indices) # Count tuples in all accessed blocks total_tuples = 0 accessed_blocks = 0 for block_indices in all_block_indices: block_content = self.get_pando_block(*block_indices) if block_content: for tuple_data in block_content: visited_tuples.add(tuple(tuple_data)) total_tuples = len(visited_tuples) print(f"Debug: OR logic unique tuples accessed: {total_tuples}") else: raise ValueError("logic must be 'AND' or 'OR'") return total_tuples def evaluate_single_table_access_cost(self): return self.partition_tree.evaluate_query_cost(self.queries) def evaluate_tree_depth(self, node, depth): max_depth = depth for nid in node.children_ids: max_depth = max( max_depth, self.evaluate_tree_depth( self.partition_tree.nid_node_dict[nid], depth + 1 ), ) return max_depth def evaluate_pando_cost(self, queries=None, logic="AND"): """ Evaluate query cost under Pando layout, calculate data scan ratio similar to evaluate_single_table_access_cost Args: queries: Query list, use self.queries if None logic: Logical relationship between columns ("AND" or "OR") Returns: Average data scan ratio (ratio of accessed data volume to total data volume) """ if queries is None: queries = self.queries if not hasattr(self, "pando_trees"): raise ValueError( "Pando layout not initialized. Call InitializeWithPando first." ) total_scan_ratio = 0 total_rows = len(self.tabledata) for count, query in enumerate(queries): # Use query_pando to get block IDs meeting conditions query_rows = self.query_pando(query, logic=logic) scan_ratio = query_rows / total_rows total_scan_ratio += scan_ratio print(f"Pando Query#{count}: Scan ratio: {scan_ratio:.4f}") avg_scan_ratio = total_scan_ratio / len(queries) if queries else 0 print(f"Pando average data scan ratio (logic={logic}): {avg_scan_ratio:.4f}") return avg_scan_ratio