import random import time import numpy as np import pandas as pd from db.conf import table_suffix """ A tool class that provides QDG algorithm and computes hyper / shuffle join cost. """ class JoinEvaluator: def __init__(self,join_queryset, joined_cols,joined_trees,joined_tables,block_size,metadata,benchmark='tpch'): self.A_join_queries=[join_queryset[0]] self.B_join_queries=[join_queryset[1]] self.join_tables=joined_tables self.pa_A=joined_trees[0] self.pa_B=joined_trees[1] self.join_attr=joined_cols self.metadata=metadata self.benchmark=benchmark self.dim_nums1,self.dim_nums2=len(join_queryset[0])//2,len(join_queryset[1])//2 self.worker_mermory=10 def generate_join_queries(self,a_training_set_for_join,b_training_set_for_join,join_amount=20): join_attr=self.join_attr def __overlap(q1, q2, dim1, dim2): if q1[dim] <= q2[dim] <= q1[dim + self.dim_nums1] or q2[dim] <= q1[dim] <= q2[dim + self.dim_nums2]: return True return False a_training_set=a_training_set_for_join b_training_set=b_training_set_for_join # pick join query which will be measure b_join_index = [] for _ in range(join_amount): b_join_index.append( list(set([random.randint(0, len(b_training_set) - 1) for _ in range(random.randint(1, 10))]))) # remove block id with overlap join attribute range b_join_queries = [] for ids in b_join_index: item = [] for idx in ids: flag = True for em in item: if __overlap(b_training_set[idx], em, join_attr): flag = False break if flag: item.append(b_training_set[idx]) b_join_queries.append(item) a_join_queries = {} for bid, item in enumerate(b_join_queries): for qb in item: a_join_queries[bid] = [] for qa in a_training_set: if __overlap(qa, qb, join_attr): # remove overlap range queries flag = True for qa2 in a_join_queries[bid]: # if __overlap(qa2,qa,join_attr): if qa2 == qa: flag = False break if flag: a_join_queries[bid].append(qa) return a_join_queries,b_join_queries # def rough_join_cost(self,group_type): # hyper_read_cost, hyper_read_bytes, temp_joined_df, group_time= self.compute_total_shuffle_hyper_cost(group_type) # # if shuffle: # # return shuffle_read_cost # return hyper_read_cost,hyper_read_bytes,temp_joined_df,group_time def pandas_hash_join(self,dataset_A, dataset_B, key_A, key_B): A_used_columns=self.pa_A.used_columns B_used_columns=self.pa_B.used_columns if self.benchmark=='imdb': A_used_columns=[table_suffix[self.benchmark][self.join_tables[0]]+'_'+col for col in A_used_columns] B_used_columns=[table_suffix[self.benchmark][self.join_tables[1]]+'_'+col for col in B_used_columns] df_A = pd.DataFrame(dataset_A, columns=A_used_columns) df_B = pd.DataFrame(dataset_B, columns=B_used_columns) left_col = A_used_columns[key_A] right_col = B_used_columns[key_B] if df_A.shape[0]>df_B.shape[0]: df_A,df_B=df_B,df_A left_col,right_col=right_col,left_col df_B = df_B.drop_duplicates(subset=[right_col], keep='first') merged_df = df_A.merge(df_B, how='inner', left_on=left_col, right_on=right_col) return merged_df def compute_total_shuffle_hyper_cost(self,group_type,is_real_hyper): start_time = time.time() pa_A=self.pa_A pa_B=self.pa_B key_A,key_B=self.join_attr[0],self.join_attr[1] blocks_A_ids = {} blocks_B_ids = {} for qid, query in enumerate(self.A_join_queries): blocks_A_ids[qid]=list(set(pa_A.query_single(query))) for qid, query in enumerate(self.B_join_queries): blocks_B_ids[qid]=list(set(pa_B.query_single(query))) print(f"Table {self.join_tables[0]} has {len(blocks_A_ids[0])} Blocks, Table {self.join_tables[1]} has {len(blocks_B_ids[0])} Blocks.") print(f"Read Cost: {sum([pa_A.nid_node_dict[a_id].node_size for a_id in blocks_A_ids[0]])} -> {sum([pa_B.nid_node_dict[b_id].node_size for b_id in blocks_B_ids[0]])}") static_A_blocks=blocks_A_ids[0].copy() static_B_blocks=blocks_B_ids[0].copy() def is_overlay(aid, bid): bucket_a = pa_A.nid_node_dict[aid].boundary bucket_b = pa_B.nid_node_dict[bid].boundary return __overlap(bucket_a, bucket_b, key_A,key_B) def __overlap(q1, q2, dim1, dim2): if q1[dim1] <= q2[dim2] <= q1[dim1 + self.dim_nums1] or q2[dim2] <= q1[dim1] <= q2[dim2 + self.dim_nums2]: return True return False final_resized_splits = [] overlap_chunks_for_queries = [] build_time = 0 for qid in range(len(blocks_A_ids)): A_join_block_ids=blocks_A_ids[qid] B_join_block_ids=blocks_B_ids[qid] # group algorithm # step1: generate overlap_chunks overlap_chunks = {} for A_bid in A_join_block_ids: if A_bid not in overlap_chunks.keys(): overlap_chunks[A_bid] = [] for B_bid in B_join_block_ids: if is_overlay(A_bid, B_bid): overlap_chunks[A_bid].append(B_bid) overlap_chunks_for_queries.append(overlap_chunks) # step2: group time0 = time.time() if group_type==0: resizedSplits = self.group(overlap_chunks, A_join_block_ids, min_partition_size=self.worker_mermory) elif group_type==1: resizedSplits = self.group1(overlap_chunks, A_join_block_ids, min_partition_size=self.worker_mermory,max_partition_size=1.2*self.worker_mermory) build_time += time.time() - time0 final_resized_splits.append(resizedSplits) group_time = time.time() - start_time total_shuffle_hyper_read_cost = 0 a_hyper_cost,b_hyper_cost=0,0 a_shuffle_cost,b_shuffle_cost=0,0 shuffle_weight = 3 for q_no, resizedSplits in enumerate(final_resized_splits): total_A_ids = [] total_B_ids = [] overlap_chunks = overlap_chunks_for_queries[q_no] group_a_cost,group_b_cost=0,0 for group in resizedSplits: B_ids = [] for a_id in group: group_a_cost+=pa_A.nid_node_dict[a_id].node_size B_ids += overlap_chunks[a_id] total_A_ids+=group B_ids = list(set(B_ids)) total_B_ids += B_ids for b_id in B_ids: group_b_cost+=pa_B.nid_node_dict[b_id].node_size a_hyper_cost+=group_a_cost b_hyper_cost+=group_b_cost total_B_ids = list(set(total_B_ids)) for a_id in total_A_ids: a_shuffle_cost += shuffle_weight * pa_A.nid_node_dict[a_id].node_size for b_id in total_B_ids: b_shuffle_cost += shuffle_weight * pa_B.nid_node_dict[b_id].node_size dataset_A,dataset_B=[],[] for a_id in static_A_blocks: dataset_A.extend(pa_A.nid_node_dict[a_id].dataset) for b_id in static_B_blocks: dataset_B.extend(pa_B.nid_node_dict[b_id].dataset) joined_df=self.pandas_hash_join(np.array(dataset_A),np.array(dataset_B),key_A,key_B) if is_real_hyper: total_shuffle_hyper_read_cost=a_hyper_cost+b_hyper_cost total_hyper_shuffle_read_bytes=a_hyper_cost*self.metadata[self.join_tables[0]]['read_line']+b_hyper_cost*self.metadata[self.join_tables[1]]['read_line'] else: total_shuffle_hyper_read_cost=a_shuffle_cost+b_shuffle_cost total_hyper_shuffle_read_bytes=a_shuffle_cost*self.metadata[self.join_tables[0]]['read_line']+b_shuffle_cost*self.metadata[self.join_tables[1]]['read_line'] print(f"Hyper Join: {a_hyper_cost} -> {b_hyper_cost}") print(f"Real Join Type: {is_real_hyper}, Final Cost: {total_shuffle_hyper_read_cost}") return total_shuffle_hyper_read_cost,total_hyper_shuffle_read_bytes,joined_df,group_time def print_shuffle_hyper_blocks(self,a_join_queries,b_join_queries,group_type): pa_A=self.pa_A pa_B=self.pa_B join_attr=self.join_attr blocks_a_ids = [] blocks_b_ids = [] a_join_info = [] b_join_info = [] # how to get join attr range base on block id. for key, queries in enumerate(b_join_queries): map_content = {} join_keys = [] block_ids = [] for query in queries: join_keys += pa_B.query_single_toplayer(query) block_ids += pa_B.query_single(query) map_content[key] = list(set(block_ids)) blocks_b_ids.append(map_content) join_keys = list(set(join_keys)) join_info = {"nums": len(join_keys), "length": []} for join_id in join_keys: node = pa_B.nid_node_dict[join_id] join_info["length"].append(node.boundary[join_attr + node.num_dims] - node.boundary[join_attr]) b_join_info.append(join_info) for key in a_join_queries: map_content = {} join_keys = [] block_ids = [] for query in a_join_queries[key]: join_keys += pa_A.query_single_toplayer(query) block_ids += pa_A.query_single(query) map_content[key] = list(set(block_ids)) blocks_a_ids.append(map_content) join_keys = list(set(join_keys)) join_info = {"nums": len(join_keys), "length": []} for join_id in join_keys: node = pa_A.nid_node_dict[join_id] join_info["length"].append(node.boundary[join_attr + node.num_dims] - node.boundary[join_attr]) a_join_info.append(join_info) join_infos = [a_join_info, b_join_info] for join_info in join_infos: total_nums, total_length = 0, 0 for item in join_info: total_nums += item['nums'] total_length += sum(item['length']) # print(f"{total_nums} , {round(total_length, 2)}") # compute hyper join cost (Use Group 4) def is_overlay(aid, bid): bucket_a = pa_A.nid_node_dict[aid].boundary bucket_b = pa_B.nid_node_dict[bid].boundary return __overlap(bucket_a, bucket_b, join_attr) def __overlap(q1, q2, dim): if q1[dim] <= q2[dim] <= q1[dim + self.dim_nums1] or q2[dim] <= q1[dim] <= q2[dim + self.dim_nums2]: return True return False final_resized_splits = [] overlap_chunks_for_queries = [] intersection_reward = 0 total_hyper_cost = 0 build_time = 0 for idx in range(len(blocks_a_ids)): # if idx<=2:continue A_join_block_ids = [] for key in blocks_a_ids[idx].keys(): A_join_block_ids += blocks_a_ids[idx][key] B_join_block_ids = [] for key in blocks_b_ids[idx].keys(): B_join_block_ids += blocks_b_ids[idx][key] # group algorithm # step1: generate overlap_chunks overlap_chunks = {} for aid in A_join_block_ids: if aid not in overlap_chunks.keys(): overlap_chunks[aid] = [] for bid in B_join_block_ids: if is_overlay(aid, bid): overlap_chunks[aid].append(bid) # print(f"overlap chunks: ",overlap_chunks) overlap_chunks_for_queries.append(overlap_chunks) # step2: group # print(overlap_chunks) # print(A_join_block_ids) time0 = time.time() if group_type==3: resizedSplits = self.group3(overlap_chunks, A_join_block_ids, partition_size=8) elif group_type==1: resizedSplits = self.group1(overlap_chunks, A_join_block_ids, partition_size=8) build_time += time.time() - time0 for group in resizedSplits: all_b_ids = [] for a_id in group: all_b_ids += overlap_chunks[a_id] # print(overlap_chunks[a_id]) actual_b_ids = list(set(all_b_ids)) intersection_reward += len(all_b_ids) - len(actual_b_ids) total_hyper_cost += sum([pa_B.nid_node_dict[_].node_size for _ in actual_b_ids]) final_resized_splits.append(resizedSplits) # print("total_hyper_cost: ", total_hyper_cost) # print("average build time: ", build_time / len(blocks_a_ids)) A_ids_for_q, B_ids_for_q = [], [] for q_no, resizedSplits in enumerate(final_resized_splits): cnt = 0 total_B_ids = [] total_A_ids = [] overlap_chunks = overlap_chunks_for_queries[q_no] for group in resizedSplits: b_ids = [] for a_id in group: b_ids += overlap_chunks[a_id] cnt += 1 total_A_ids += group b_ids = list(set(b_ids)) if b_ids: total_B_ids.append(b_ids) A_ids_for_q.append(total_A_ids) B_ids_for_q.append(total_B_ids) return A_ids_for_q, B_ids_for_q def compute_join_blocks_for_main_table(self,a_join_queries,b_join_queries): pa_A=self.pa_A pa_B=self.pa_B join_attr=self.join_attr blocks_a_ids = [] blocks_b_ids = [] a_join_info = [] b_join_info = [] # how to get join attr range base on block id. for key, queries in enumerate(b_join_queries): map_content = {} join_keys = [] block_ids = [] for query in queries: join_keys += pa_B.query_single_toplayer(query) block_ids += pa_B.query_single(query) map_content[key] = list(set(block_ids)) blocks_b_ids.append(map_content) join_keys = list(set(join_keys)) join_info = {"nums": len(join_keys), "length": []} for join_id in join_keys: node = pa_B.nid_node_dict[join_id] join_info["length"].append(node.boundary[join_attr + node.num_dims] - node.boundary[join_attr]) b_join_info.append(join_info) for key in a_join_queries: map_content = {} join_keys = [] block_ids = [] for query in a_join_queries[key]: join_keys += pa_A.query_single_toplayer(query) block_ids += pa_A.query_single(query) map_content[key] = list(set(block_ids)) blocks_a_ids.append(map_content) join_keys = list(set(join_keys)) join_info = {"nums": len(join_keys), "length": []} for join_id in join_keys: node = pa_A.nid_node_dict[join_id] join_info["length"].append(node.boundary[join_attr + node.num_dims] - node.boundary[join_attr]) a_join_info.append(join_info) join_infos = [a_join_info, b_join_info] for join_info in join_infos: total_nums, total_length = 0, 0 for item in join_info: total_nums += item['nums'] total_length += sum(item['length']) a_hyper_blocks_size=0 b_hyper_blocks_size=0 for idx in range(len(blocks_a_ids)): # if idx<=2:continue for key in blocks_a_ids[idx].keys(): a_hyper_blocks_size+=sum([pa_A.nid_node_dict[ida].node_size for ida in blocks_a_ids[idx][key]]) for key in blocks_b_ids[idx].keys(): b_hyper_blocks_size+=sum([pa_B.nid_node_dict[idb].node_size for idb in blocks_b_ids[idx][key]]) return a_hyper_blocks_size,0 # (AdaptDB grouping) def group(self,overlap_chunks,join_a_block_ids, min_partition_size): def get_intersection_size_count(setValues, listValues): size = 0 for lv in listValues: if lv in setValues: size += 1 return size def get_group_size(a_ids): chunks = [] for a_id in a_ids: chunks += overlap_chunks[a_id] chunks = list(set(chunks)) return sum([self.pa_B.nid_node_dict[b_id].node_size for b_id in chunks])+sum([self.pa_A.nid_node_dict[a_id].node_size for a_id in a_ids]) def __get_group_size(a_ids): # return sum([self.pa_A.nid_node_dict[a_id].node_size for a_id in a_ids]) return len(a_ids) resizedSplits = [] rest_count = len(join_a_block_ids) while rest_count > 0: cur_splits = [] chunks = [] available_size=min_partition_size # max block size limit for every split. while rest_count > 0: maxIntersection = -1 best_offset = -1 for offset, bid in enumerate(join_a_block_ids): cur_intersection = get_intersection_size_count(chunks, overlap_chunks[bid]) if cur_intersection > maxIntersection: maxIntersection = cur_intersection best_offset = offset bucket_id = join_a_block_ids[best_offset] # if _get_group_size(cur_splits+[bucket_id]) > available_size: # if len(cur_splits) == 0: # cur_splits.append(bucket_id) # chunks += overlap_chunks[bucket_id] # chunks = list(set(chunks)) # join_a_block_ids.remove(bucket_id) # rest_count -= 1 # break # break cur_splits.append(bucket_id) chunks += overlap_chunks[bucket_id] chunks = list(set(chunks)) join_a_block_ids.remove(bucket_id) rest_count -= 1 if __get_group_size(cur_splits) == available_size: break resizedSplits.append(cur_splits) return resizedSplits # Our QDG grouping algorithm def group1(self, overlap_chunks, join_a_block_ids, min_partition_size, max_partition_size): def list_solved_list(l1, l2): for item1 in l1: if item1 in l2: return True return False def get_group_size(a_ids): chunks = [] for a_id in a_ids: chunks += overlap_chunks[a_id] chunks = list(set(chunks)) return sum([self.pa_B.nid_node_dict[b_id].node_size for b_id in chunks]) def __get_group_size(a_ids): # return sum([self.pa_A.nid_node_dict[a_id].node_size for a_id in a_ids]) return len(a_ids) def get_intersection_size_count(setValues, listValues): size = 0 for lv in listValues: if lv in setValues: size += 1 return size resizedSplits = [] rest_count = len(join_a_block_ids) affinity_tab = [] pre_save_ids = [] computed_ids_dict = {} for bid in join_a_block_ids: computed_ids_dict[bid] = {} a_block_len = len(join_a_block_ids) for no1 in range(a_block_len): bid1 = join_a_block_ids[no1] max_intersection = -1 max_bid = [] for exist_bid in computed_ids_dict[bid1].keys(): cur_intersection = computed_ids_dict[bid1][exist_bid] if cur_intersection > max_intersection: max_intersection = cur_intersection max_bid = [exist_bid] for no2 in range(no1 + 1, a_block_len): bid2 = join_a_block_ids[no2] cur_intersection = get_intersection_size_count(overlap_chunks[bid1], overlap_chunks[bid2]) computed_ids_dict[bid1][bid2] = cur_intersection computed_ids_dict[bid2][bid1] = cur_intersection if cur_intersection > max_intersection: max_intersection = cur_intersection max_bid = [bid2] if max_intersection == 0: pre_save_ids.append(bid1) else: affinity_tab.append({'item': [[bid1], max_bid], 'val': max_intersection, 'chunk': overlap_chunks[bid1]}) cur_index = 0 # pre-save these ids which doesn't have any overlap blocks last_index=0 while last_index < len(pre_save_ids): if __get_group_size(pre_save_ids[last_index:cur_index + 1]) == min_partition_size: merge_ids = pre_save_ids[last_index:cur_index + 1] resizedSplits.append(merge_ids) rest_count -= len(merge_ids) last_index = cur_index + 1 elif cur_index >= len(pre_save_ids)-1: merge_ids = pre_save_ids[last_index:] resizedSplits.append(merge_ids) rest_count -= len(merge_ids) break else: cur_index += 1 while rest_count > 0: affinity_tab.sort(key=lambda item: (item['val'], len(item['item'][0])), reverse=True) sel_tab = affinity_tab.pop(0) merge_ids = sel_tab['item'][0] + sel_tab['item'][1] merge_ids_length = len(merge_ids) is_completed = False if __get_group_size==min_partition_size or len(affinity_tab)==0 or sel_tab['val']==-1: is_completed = True resizedSplits.append(merge_ids) rest_count -= merge_ids_length else: # add key=chunk new_overlap_chunks = sel_tab['chunk'] for bid in sel_tab['item'][1]: new_overlap_chunks += overlap_chunks[bid] new_tab = {'item': [merge_ids, []], 'val': -1, 'chunk': list(set(new_overlap_chunks))} # update affinity_tab for tab in reversed(affinity_tab): # delete tab if list_solved_list(tab['item'][0], sel_tab['item'][1]): affinity_tab.remove(tab) continue # update tab if list_solved_list(tab['item'][1], merge_ids): if is_completed or __get_group_size(tab['item'][0]) + __get_group_size(merge_ids) > min_partition_size: tab['item'][1] = [] tab['val'] = -1 else: tab['item'][1] = merge_ids tab['val'] = get_intersection_size_count(tab['chunk'], new_tab['chunk']) if not is_completed: affinity_tab.append(new_tab) # Case: the affinity_tab only has one item. if len(affinity_tab) == 1: last_tab = affinity_tab.pop(0) merge_ids = last_tab['item'][0] + last_tab['item'][1] resizedSplits.append(merge_ids) rest_count -= len(merge_ids) # Update overlap_chunks and intersection benefits for merged column groups for ud_item1 in affinity_tab: if ud_item1['val'] == -1: ud1_key = ud_item1['item'][0] flag1 = False if len(ud1_key) == 1: flag1 = True overlap_chunks1 = ud_item1['chunk'] max_allocate_size = min_partition_size - __get_group_size(ud1_key) max_intersection = -1 max_target_ids = [] for ud_item2 in affinity_tab: ud2_key = ud_item2['item'][0] if ud1_key == ud2_key: continue if __get_group_size(ud2_key) > max_allocate_size: continue if ud_item2['item'][1] == ud1_key: cur_intersection = ud_item2['val'] else: # if flag1 and len(ud2_key)==1:continue flag2 = False if len(ud2_key) == 1: flag2 = True if flag1 and flag2: cur_intersection = computed_ids_dict[ud1_key[0]][ud2_key[0]] else: overlap_chunks2 = ud_item2['chunk'] cur_intersection = get_intersection_size_count(overlap_chunks1, overlap_chunks2) if cur_intersection > max_intersection: max_intersection = cur_intersection max_target_ids = ud2_key ud_item1['val'] = max_intersection ud_item1['item'][1] = max_target_ids return resizedSplits