PAC-tree / model / join_eval.py
join_eval.py
Raw
import random
import time
import numpy as np
import pandas as pd
from db.conf import table_suffix
"""
A tool class that provides QDG algorithm and computes hyper / shuffle join cost.
"""
class JoinEvaluator:
    def __init__(self,join_queryset, joined_cols,joined_trees,joined_tables,block_size,metadata,benchmark='tpch'):
        self.A_join_queries=[join_queryset[0]]
        self.B_join_queries=[join_queryset[1]]
        self.join_tables=joined_tables
        self.pa_A=joined_trees[0]
        self.pa_B=joined_trees[1]
        self.join_attr=joined_cols
        self.metadata=metadata
        self.benchmark=benchmark
        self.dim_nums1,self.dim_nums2=len(join_queryset[0])//2,len(join_queryset[1])//2
        self.worker_mermory=10 


    def generate_join_queries(self,a_training_set_for_join,b_training_set_for_join,join_amount=20):
        join_attr=self.join_attr
        def __overlap(q1, q2, dim1, dim2):
            if q1[dim] <= q2[dim] <= q1[dim + self.dim_nums1] or q2[dim] <= q1[dim] <= q2[dim + self.dim_nums2]:
                return True
            return False
        a_training_set=a_training_set_for_join
        b_training_set=b_training_set_for_join

        # pick join query which will be measure
        b_join_index = [] 
        for _ in range(join_amount):
            b_join_index.append(
                list(set([random.randint(0, len(b_training_set) - 1) for _ in range(random.randint(1, 10))])))
        # remove block id with overlap join attribute range
        b_join_queries = []
        for ids in b_join_index:
            item = []
            for idx in ids:
                flag = True
                for em in item:
                    if __overlap(b_training_set[idx], em, join_attr):
                        flag = False
                        break
                if flag: item.append(b_training_set[idx])
            b_join_queries.append(item)
        a_join_queries = {}
        
        for bid, item in enumerate(b_join_queries):
            for qb in item:
                a_join_queries[bid] = []
                for qa in a_training_set:
                    if __overlap(qa, qb, join_attr):
                        # remove overlap range queries
                        flag = True
                        for qa2 in a_join_queries[bid]:
                            # if __overlap(qa2,qa,join_attr):
                            if qa2 == qa:
                                flag = False
                                break
                        if flag: a_join_queries[bid].append(qa)
        return a_join_queries,b_join_queries


    # def rough_join_cost(self,group_type):
    #     hyper_read_cost, hyper_read_bytes, temp_joined_df, group_time= self.compute_total_shuffle_hyper_cost(group_type)
    #     # if shuffle:
    #     #     return shuffle_read_cost
    #     return hyper_read_cost,hyper_read_bytes,temp_joined_df,group_time
    
    def pandas_hash_join(self,dataset_A, dataset_B, key_A, key_B):
        A_used_columns=self.pa_A.used_columns
        B_used_columns=self.pa_B.used_columns
        if self.benchmark=='imdb':
            A_used_columns=[table_suffix[self.benchmark][self.join_tables[0]]+'_'+col for col in A_used_columns]
            B_used_columns=[table_suffix[self.benchmark][self.join_tables[1]]+'_'+col for col in B_used_columns]
        df_A = pd.DataFrame(dataset_A, columns=A_used_columns)
        df_B = pd.DataFrame(dataset_B, columns=B_used_columns)

        left_col = A_used_columns[key_A]
        right_col = B_used_columns[key_B]

        if df_A.shape[0]>df_B.shape[0]:
            df_A,df_B=df_B,df_A
            left_col,right_col=right_col,left_col

        df_B = df_B.drop_duplicates(subset=[right_col], keep='first')
        merged_df = df_A.merge(df_B, how='inner', left_on=left_col, right_on=right_col)
        return merged_df

    def compute_total_shuffle_hyper_cost(self,group_type,is_real_hyper):
        start_time = time.time()
        pa_A=self.pa_A
        pa_B=self.pa_B
        key_A,key_B=self.join_attr[0],self.join_attr[1]
        blocks_A_ids = {}
        blocks_B_ids = {}
        for qid, query in enumerate(self.A_join_queries):
            blocks_A_ids[qid]=list(set(pa_A.query_single(query)))
        
        for qid, query in enumerate(self.B_join_queries):
            blocks_B_ids[qid]=list(set(pa_B.query_single(query)))

        print(f"Table {self.join_tables[0]} has {len(blocks_A_ids[0])} Blocks, Table {self.join_tables[1]} has {len(blocks_B_ids[0])} Blocks.")
        print(f"Read Cost: {sum([pa_A.nid_node_dict[a_id].node_size for a_id in blocks_A_ids[0]])} -> {sum([pa_B.nid_node_dict[b_id].node_size for b_id in blocks_B_ids[0]])}")
        static_A_blocks=blocks_A_ids[0].copy()
        static_B_blocks=blocks_B_ids[0].copy()

        def is_overlay(aid, bid):
            bucket_a = pa_A.nid_node_dict[aid].boundary
            bucket_b = pa_B.nid_node_dict[bid].boundary
            return __overlap(bucket_a, bucket_b, key_A,key_B)
        def __overlap(q1, q2, dim1, dim2):
            if q1[dim1] <= q2[dim2] <= q1[dim1 + self.dim_nums1] or q2[dim2] <= q1[dim1] <= q2[dim2 + self.dim_nums2]:
                return True
            return False
        
        final_resized_splits = []
        overlap_chunks_for_queries = []
        
        build_time = 0
        for qid in range(len(blocks_A_ids)):
            A_join_block_ids=blocks_A_ids[qid]
            B_join_block_ids=blocks_B_ids[qid]
            
            # group algorithm
            # step1: generate overlap_chunks
            overlap_chunks = {}
            for A_bid in A_join_block_ids:
                if A_bid not in overlap_chunks.keys(): 
                    overlap_chunks[A_bid] = []
                for B_bid in B_join_block_ids:
                    if is_overlay(A_bid, B_bid): 
                        overlap_chunks[A_bid].append(B_bid)
                    
            overlap_chunks_for_queries.append(overlap_chunks)
            # step2: group
            time0 = time.time()
            if group_type==0:
                resizedSplits = self.group(overlap_chunks, A_join_block_ids, min_partition_size=self.worker_mermory)
            elif group_type==1:
                resizedSplits = self.group1(overlap_chunks, A_join_block_ids, min_partition_size=self.worker_mermory,max_partition_size=1.2*self.worker_mermory)
            build_time += time.time() - time0
            final_resized_splits.append(resizedSplits)
        group_time = time.time() - start_time
        total_shuffle_hyper_read_cost = 0
        a_hyper_cost,b_hyper_cost=0,0
        a_shuffle_cost,b_shuffle_cost=0,0
        shuffle_weight = 3
        
        for q_no, resizedSplits in enumerate(final_resized_splits):
            total_A_ids = []
            total_B_ids = []
            overlap_chunks = overlap_chunks_for_queries[q_no]
            group_a_cost,group_b_cost=0,0
            for group in resizedSplits:
                B_ids = []
                for a_id in group:
                    group_a_cost+=pa_A.nid_node_dict[a_id].node_size
                    B_ids += overlap_chunks[a_id]
                    
                total_A_ids+=group
                B_ids = list(set(B_ids))
                total_B_ids += B_ids
                for b_id in B_ids:
                    group_b_cost+=pa_B.nid_node_dict[b_id].node_size

            a_hyper_cost+=group_a_cost
            b_hyper_cost+=group_b_cost
            
            total_B_ids = list(set(total_B_ids))
            for a_id in total_A_ids:
                a_shuffle_cost += shuffle_weight * pa_A.nid_node_dict[a_id].node_size
            for b_id in total_B_ids:
                b_shuffle_cost += shuffle_weight * pa_B.nid_node_dict[b_id].node_size

        dataset_A,dataset_B=[],[]
        for a_id in static_A_blocks:
            dataset_A.extend(pa_A.nid_node_dict[a_id].dataset)
        for b_id in static_B_blocks:
            dataset_B.extend(pa_B.nid_node_dict[b_id].dataset)
        joined_df=self.pandas_hash_join(np.array(dataset_A),np.array(dataset_B),key_A,key_B)
        if is_real_hyper:
            total_shuffle_hyper_read_cost=a_hyper_cost+b_hyper_cost
            total_hyper_shuffle_read_bytes=a_hyper_cost*self.metadata[self.join_tables[0]]['read_line']+b_hyper_cost*self.metadata[self.join_tables[1]]['read_line']
        else:
            total_shuffle_hyper_read_cost=a_shuffle_cost+b_shuffle_cost
            total_hyper_shuffle_read_bytes=a_shuffle_cost*self.metadata[self.join_tables[0]]['read_line']+b_shuffle_cost*self.metadata[self.join_tables[1]]['read_line']
        print(f"Hyper Join: {a_hyper_cost} -> {b_hyper_cost}")
        print(f"Real Join Type: {is_real_hyper}, Final Cost: {total_shuffle_hyper_read_cost}")

        return total_shuffle_hyper_read_cost,total_hyper_shuffle_read_bytes,joined_df,group_time
    
    
    

    def print_shuffle_hyper_blocks(self,a_join_queries,b_join_queries,group_type):
        pa_A=self.pa_A
        pa_B=self.pa_B
        join_attr=self.join_attr
        blocks_a_ids = []
        blocks_b_ids = []
        a_join_info = []
        b_join_info = []
        # how to get join attr range base on block id.
        for key, queries in enumerate(b_join_queries):
            map_content = {}
            join_keys = []
            block_ids = []
            for query in queries:
                join_keys += pa_B.query_single_toplayer(query)
                block_ids += pa_B.query_single(query)
            map_content[key] = list(set(block_ids))
            blocks_b_ids.append(map_content)

            join_keys = list(set(join_keys))
            join_info = {"nums": len(join_keys), "length": []}
            for join_id in join_keys:
                node = pa_B.nid_node_dict[join_id]
                join_info["length"].append(node.boundary[join_attr + node.num_dims] - node.boundary[join_attr])
            b_join_info.append(join_info)

        for key in a_join_queries:
            map_content = {}
            join_keys = []
            block_ids = []
            for query in a_join_queries[key]:
                join_keys += pa_A.query_single_toplayer(query)
                block_ids += pa_A.query_single(query)
            map_content[key] = list(set(block_ids))
            blocks_a_ids.append(map_content)

            join_keys = list(set(join_keys))
            join_info = {"nums": len(join_keys), "length": []}
            for join_id in join_keys:
                node = pa_A.nid_node_dict[join_id]
                join_info["length"].append(node.boundary[join_attr + node.num_dims] - node.boundary[join_attr])
            a_join_info.append(join_info)

        join_infos = [a_join_info, b_join_info]
        for join_info in join_infos:
            total_nums, total_length = 0, 0
            for item in join_info:
                total_nums += item['nums']
                total_length += sum(item['length'])
            # print(f"{total_nums} , {round(total_length, 2)}")

        # compute hyper join cost (Use Group 4)
        def is_overlay(aid, bid):
            bucket_a = pa_A.nid_node_dict[aid].boundary
            bucket_b = pa_B.nid_node_dict[bid].boundary
            return __overlap(bucket_a, bucket_b, join_attr)
        def __overlap(q1, q2, dim):
            if q1[dim] <= q2[dim] <= q1[dim + self.dim_nums1] or q2[dim] <= q1[dim] <= q2[dim + self.dim_nums2]:
                return True
            return False
        final_resized_splits = []
        overlap_chunks_for_queries = []
        intersection_reward = 0
        total_hyper_cost = 0
        build_time = 0
        for idx in range(len(blocks_a_ids)):
            # if idx<=2:continue
            A_join_block_ids = []
            for key in blocks_a_ids[idx].keys():
                A_join_block_ids += blocks_a_ids[idx][key]
            B_join_block_ids = []
            for key in blocks_b_ids[idx].keys():
                B_join_block_ids += blocks_b_ids[idx][key]
            # group algorithm
            # step1: generate overlap_chunks
            overlap_chunks = {}
            for aid in A_join_block_ids:
                if aid not in overlap_chunks.keys(): overlap_chunks[aid] = []
                for bid in B_join_block_ids:
                    if is_overlay(aid, bid): overlap_chunks[aid].append(bid)
            # print(f"overlap chunks: ",overlap_chunks)
            overlap_chunks_for_queries.append(overlap_chunks)
            # step2: group
            # print(overlap_chunks)
            # print(A_join_block_ids)
            time0 = time.time()
            if group_type==3:
                resizedSplits = self.group3(overlap_chunks, A_join_block_ids, partition_size=8)
            elif group_type==1:
                resizedSplits = self.group1(overlap_chunks, A_join_block_ids, partition_size=8)

            build_time += time.time() - time0
            for group in resizedSplits:
                all_b_ids = []
                for a_id in group:
                    all_b_ids += overlap_chunks[a_id]
                    # print(overlap_chunks[a_id])
                actual_b_ids = list(set(all_b_ids))
                intersection_reward += len(all_b_ids) - len(actual_b_ids)
                total_hyper_cost += sum([pa_B.nid_node_dict[_].node_size for _ in actual_b_ids])
            final_resized_splits.append(resizedSplits)
        # print("total_hyper_cost: ", total_hyper_cost)
        # print("average build time: ", build_time / len(blocks_a_ids))
        A_ids_for_q, B_ids_for_q = [], []
        for q_no, resizedSplits in enumerate(final_resized_splits):
            cnt = 0
            total_B_ids = []
            total_A_ids = []
            overlap_chunks = overlap_chunks_for_queries[q_no]
            for group in resizedSplits:
                b_ids = []
                for a_id in group:
                    b_ids += overlap_chunks[a_id]
                    cnt += 1
                total_A_ids += group
                b_ids = list(set(b_ids))
                if b_ids:
                    total_B_ids.append(b_ids)
            A_ids_for_q.append(total_A_ids)
            B_ids_for_q.append(total_B_ids)
        return A_ids_for_q, B_ids_for_q

    def compute_join_blocks_for_main_table(self,a_join_queries,b_join_queries):
        pa_A=self.pa_A
        pa_B=self.pa_B
        join_attr=self.join_attr
        blocks_a_ids = []
        blocks_b_ids = []
        a_join_info = []
        b_join_info = []
        # how to get join attr range base on block id.
        for key, queries in enumerate(b_join_queries):
            map_content = {}
            join_keys = []
            block_ids = []
            for query in queries:
                join_keys += pa_B.query_single_toplayer(query)
                block_ids += pa_B.query_single(query)
            map_content[key] = list(set(block_ids))
            blocks_b_ids.append(map_content)

            join_keys = list(set(join_keys))
            join_info = {"nums": len(join_keys), "length": []}
            for join_id in join_keys:
                node = pa_B.nid_node_dict[join_id]
                join_info["length"].append(node.boundary[join_attr + node.num_dims] - node.boundary[join_attr])
            b_join_info.append(join_info)

        for key in a_join_queries:
            map_content = {}
            join_keys = []
            block_ids = []
            for query in a_join_queries[key]:
                join_keys += pa_A.query_single_toplayer(query)
                block_ids += pa_A.query_single(query)
            map_content[key] = list(set(block_ids))
            blocks_a_ids.append(map_content)

            join_keys = list(set(join_keys))
            join_info = {"nums": len(join_keys), "length": []}
            for join_id in join_keys:
                node = pa_A.nid_node_dict[join_id]
                join_info["length"].append(node.boundary[join_attr + node.num_dims] - node.boundary[join_attr])
            a_join_info.append(join_info)


        join_infos = [a_join_info, b_join_info]
        for join_info in join_infos:
            total_nums, total_length = 0, 0
            for item in join_info:
                total_nums += item['nums']
                total_length += sum(item['length'])
        a_hyper_blocks_size=0
        b_hyper_blocks_size=0
        for idx in range(len(blocks_a_ids)):
            # if idx<=2:continue
            for key in blocks_a_ids[idx].keys():
                a_hyper_blocks_size+=sum([pa_A.nid_node_dict[ida].node_size for ida in blocks_a_ids[idx][key]])
            for key in blocks_b_ids[idx].keys():
                b_hyper_blocks_size+=sum([pa_B.nid_node_dict[idb].node_size for idb in blocks_b_ids[idx][key]])
        return a_hyper_blocks_size,0

    # (AdaptDB grouping)
    def group(self,overlap_chunks,join_a_block_ids, min_partition_size):
        def get_intersection_size_count(setValues, listValues):
            size = 0
            for lv in listValues:
                if lv in setValues: size += 1
            return size
        
        def get_group_size(a_ids):
            chunks = []
            for a_id in a_ids:
                chunks += overlap_chunks[a_id]
            chunks = list(set(chunks))
            return sum([self.pa_B.nid_node_dict[b_id].node_size for b_id in chunks])+sum([self.pa_A.nid_node_dict[a_id].node_size for a_id in a_ids])
        
        def __get_group_size(a_ids):
            # return sum([self.pa_A.nid_node_dict[a_id].node_size for a_id in a_ids])
            return len(a_ids)
        
        resizedSplits = []
        rest_count = len(join_a_block_ids)
        while rest_count > 0:
            cur_splits = []
            chunks = []
            available_size=min_partition_size
            # max block size limit for every split.
            while rest_count > 0:
                maxIntersection = -1
                best_offset = -1
                for offset, bid in enumerate(join_a_block_ids):
                    cur_intersection = get_intersection_size_count(chunks, overlap_chunks[bid])
                    if cur_intersection > maxIntersection:
                        maxIntersection = cur_intersection
                        best_offset = offset
                bucket_id = join_a_block_ids[best_offset]
                # if _get_group_size(cur_splits+[bucket_id]) > available_size:
                #     if len(cur_splits) == 0:
                #         cur_splits.append(bucket_id)
                #         chunks += overlap_chunks[bucket_id]
                #         chunks = list(set(chunks))
                #         join_a_block_ids.remove(bucket_id)
                #         rest_count -= 1
                #         break
                #     break
                cur_splits.append(bucket_id)
                chunks += overlap_chunks[bucket_id]
                chunks = list(set(chunks))
                join_a_block_ids.remove(bucket_id)
                rest_count -= 1
                if __get_group_size(cur_splits) == available_size:
                    break
            resizedSplits.append(cur_splits)
        return resizedSplits

    # Our QDG grouping algorithm
    def group1(self, overlap_chunks, join_a_block_ids, min_partition_size, max_partition_size):
        def list_solved_list(l1, l2):
            for item1 in l1:
                if item1 in l2:
                    return True
            return False
        
        def get_group_size(a_ids):
            chunks = []
            for a_id in a_ids:
                chunks += overlap_chunks[a_id]
            chunks = list(set(chunks))
            return sum([self.pa_B.nid_node_dict[b_id].node_size for b_id in chunks])
        
        def __get_group_size(a_ids):
            # return sum([self.pa_A.nid_node_dict[a_id].node_size for a_id in a_ids])
            return len(a_ids)

        def get_intersection_size_count(setValues, listValues):
            size = 0
            for lv in listValues:
                if lv in setValues: size += 1
            return size

        resizedSplits = []
        rest_count = len(join_a_block_ids)
        affinity_tab = []
        pre_save_ids = []
        computed_ids_dict = {}
        for bid in join_a_block_ids: computed_ids_dict[bid] = {}
        a_block_len = len(join_a_block_ids)
        for no1 in range(a_block_len):
            bid1 = join_a_block_ids[no1]
            max_intersection = -1
            max_bid = []
            for exist_bid in computed_ids_dict[bid1].keys():
                cur_intersection = computed_ids_dict[bid1][exist_bid]
                if cur_intersection > max_intersection:
                    max_intersection = cur_intersection
                    max_bid = [exist_bid]
            for no2 in range(no1 + 1, a_block_len):
                bid2 = join_a_block_ids[no2]
                cur_intersection = get_intersection_size_count(overlap_chunks[bid1], overlap_chunks[bid2])
                computed_ids_dict[bid1][bid2] = cur_intersection
                computed_ids_dict[bid2][bid1] = cur_intersection
                if cur_intersection > max_intersection:
                    max_intersection = cur_intersection
                    max_bid = [bid2]
            if max_intersection == 0:
                pre_save_ids.append(bid1)
            else:
                affinity_tab.append({'item': [[bid1], max_bid], 'val': max_intersection, 'chunk': overlap_chunks[bid1]})
        cur_index = 0
        # pre-save these ids which doesn't have any overlap blocks
        last_index=0
        while last_index < len(pre_save_ids):
            if __get_group_size(pre_save_ids[last_index:cur_index + 1]) == min_partition_size:
                merge_ids = pre_save_ids[last_index:cur_index + 1]
                resizedSplits.append(merge_ids)
                rest_count -= len(merge_ids)
                last_index = cur_index + 1
            
            elif cur_index >= len(pre_save_ids)-1:
                merge_ids = pre_save_ids[last_index:]
                resizedSplits.append(merge_ids)
                rest_count -= len(merge_ids)
                break
            else:
                cur_index += 1
        while rest_count > 0:
            affinity_tab.sort(key=lambda item: (item['val'], len(item['item'][0])), reverse=True)
            sel_tab = affinity_tab.pop(0)
            merge_ids = sel_tab['item'][0] + sel_tab['item'][1]
            merge_ids_length = len(merge_ids)
            is_completed = False
            if __get_group_size==min_partition_size or len(affinity_tab)==0 or sel_tab['val']==-1:
                is_completed = True
                resizedSplits.append(merge_ids)
                rest_count -= merge_ids_length
            else:
                # add key=chunk
                new_overlap_chunks = sel_tab['chunk']
                for bid in sel_tab['item'][1]:
                    new_overlap_chunks += overlap_chunks[bid]
                new_tab = {'item': [merge_ids, []], 'val': -1, 'chunk': list(set(new_overlap_chunks))}

            # update affinity_tab
            for tab in reversed(affinity_tab):
                # delete tab
                if list_solved_list(tab['item'][0], sel_tab['item'][1]):
                    affinity_tab.remove(tab)
                    continue
                # update tab
                if list_solved_list(tab['item'][1], merge_ids):
                    if is_completed or __get_group_size(tab['item'][0]) + __get_group_size(merge_ids) > min_partition_size:
                        tab['item'][1] = []
                        tab['val'] = -1
                    else:
                        tab['item'][1] = merge_ids
                        tab['val'] = get_intersection_size_count(tab['chunk'], new_tab['chunk'])
            if not is_completed: affinity_tab.append(new_tab)
            # Case: the affinity_tab only has one item.
            if len(affinity_tab) == 1:
                last_tab = affinity_tab.pop(0)
                merge_ids = last_tab['item'][0] + last_tab['item'][1]
                resizedSplits.append(merge_ids)
                rest_count -= len(merge_ids)
            # Update overlap_chunks and intersection benefits for merged column groups
            for ud_item1 in affinity_tab:
                if ud_item1['val'] == -1:
                    ud1_key = ud_item1['item'][0]
                    flag1 = False
                    if len(ud1_key) == 1: flag1 = True
                    overlap_chunks1 = ud_item1['chunk']
                    max_allocate_size = min_partition_size - __get_group_size(ud1_key)
                    max_intersection = -1
                    max_target_ids = []
                    for ud_item2 in affinity_tab:
                        ud2_key = ud_item2['item'][0]
                        if ud1_key == ud2_key: continue
                        if __get_group_size(ud2_key) > max_allocate_size: continue
                        if ud_item2['item'][1] == ud1_key:
                            cur_intersection = ud_item2['val']
                        else:
                            # if flag1 and len(ud2_key)==1:continue
                            flag2 = False
                            if len(ud2_key) == 1: flag2 = True
                            if flag1 and flag2:
                                cur_intersection = computed_ids_dict[ud1_key[0]][ud2_key[0]]
                            else:
                                overlap_chunks2 = ud_item2['chunk']
                                cur_intersection = get_intersection_size_count(overlap_chunks1, overlap_chunks2)
                        if cur_intersection > max_intersection:
                            max_intersection = cur_intersection
                            max_target_ids = ud2_key
                    ud_item1['val'] = max_intersection
                    ud_item1['item'][1] = max_target_ids
        return resizedSplits