import numpy as np import copy class QueryMBR: """ the MBR that bound overlapped queries """ def __init__(self, boundary, added_as_fist_query=True): self.num_dims = len(boundary) / 2 self.boundary = boundary self.num_query = 1 self.queries = [] self.bound_size = None # number of records this MBR overlaps self.total_query_result_size = ( None # total query results size of all the queries inside this MBR ) self.query_result_size = [] # record each query's result size self.is_extended = False self.ill_extended = False if added_as_fist_query: self.queries = [boundary] def check_condition3(self, data_threshold): """ check whether this MBR satisfy the new bounding split condition 3: 1. every query size > BP - b 2. total_query_result_size + b > bound_size * num_query """ for size in self.query_result_size: if size <= self.bound_size - data_threshold: return False if ( self.total_query_result_size + data_threshold <= self.bound_size * self.num_query ): return False return True """ A node class used to construct partition tree. """ class PartitionNode: """ A partition node, including both the internal and leaf nodes in the partition tree """ def __init__( self, num_dims=0, boundary=[], nid=None, pid=None, is_irregular_shape_parent=False, is_irregular_shape=False, num_children=0, children_ids=[], is_leaf=True, node_size=0, ): # print("Initialize PartitionTree Root: num_dims",num_dims,"boundary:",boundary,"children_ids:",children_ids) self.num_dims = num_dims # number of dimensions # the domain, [l1,l2,..,ln, u1,u2,..,un,], for irregular shape partition, one need to exempt its siblings self.boundary = boundary # I think the lower side should be inclusive and the upper side should be exclusive? self.nid = nid # node id self.pid = pid # parent id self.is_irregular_shape_parent = is_irregular_shape_parent # whether the [last] child is an irregular shape partition self.is_irregular_shape = is_irregular_shape # an irregular shape partition cannot be further split, and it must be a leaf node self.num_children = num_children # number of children, should be 0, 2, or 3 self.children_ids = children_ids # if it's the irregular shape parent, then the last child should be the irregular partition self.is_leaf = is_leaf self.node_size = node_size # number of records in this partition # the following attributes will not be serialized self.dataset = None # only used in partition algorithms, temporary, should consist records that within this partition self.queryset = None # only used in partition algorithms, temporary, should consist queries that overlap this partition self.query_MBRs = None # only used in partition algorithms, temporary, should consist queries that overlap this partition # beam search self.depth = 0 # only used in beam search, root node depth is 0 self.no_valid_partition = False def is_overlap(self, query): """ query is in plain form, i.e., [l1,l2,...,ln, u1,u2,...,un] !query dimension should match the partition dimensions! i.e., all projected or all not projected return 0 if no overlap return 1 if overlap return 2 if inside """ if len(query) != 2 * self.num_dims: return -1 # error overlap_flag = True inside_flag = True for i in range(self.num_dims): if ( query[i] > self.boundary[self.num_dims + i] or query[self.num_dims + i] < self.boundary[i] ): overlap_flag = False inside_flag = False return 0 elif ( query[i] < self.boundary[i] or query[self.num_dims + i] > self.boundary[self.num_dims + i] ): inside_flag = False if inside_flag: return 2 elif overlap_flag: return 1 else: return 0 def is_overlap_np(self, query): """ the numpy version of the is_overlap function the query here and boundary class attribute should in the form of numpy array """ if all( (self.boundary[0 : self.num_dims] > query[self.num_dims :]) | (self.boundary[self.num_dims :] <= query[0 : self.num_dims]) ): return 0 # no overlap elif all( (self.boundary[0 : self.num_dims] >= query[0 : self.num_dims]) & (self.boundary[self.num_dims :] <= query[self.num_dims :]) ): return 2 # inside else: return 1 # overlap def is_redundant_contain(self, point): """ used to determine wheter a data point is contained in this node point: [dim1_value, dim2_value,...], should has the same dimensions as this node """ flag = False for boundary in self.redundant_boundaries: is_located = True for i in range(self.num_dims): if point[i] > boundary[self.num_dims + i] or point[i] < boundary[i]: is_located = False break if not is_located: continue else: flag = True break return flag def is_contain(self, point): """ used to determine wheter a data point is contained in this node point: [dim1_value, dim2_value,...], should has the same dimensions as this node """ for i in range(self.num_dims): if ( point[i] > self.boundary[self.num_dims + i] or point[i] < self.boundary[i] ): return False return True def if_bounding_split(self, data_threshold, approximate = False, force_extend = False): ''' # the split node is assumed to be >= 2b approximate: whether use approximation (even distribution) to find the number of records within a partition force_extend: whether extend the bounding partition to make its size greater than data_threshold, if possible return availability, skip gain, and the (possible extended) bound ''' max_bound = self.__max_bound(self.queryset) bound_size = self.query_result_size(max_bound, approximate) if bound_size is None: return False, None, None extended_bound = copy.deepcopy(max_bound) if bound_size < data_threshold: # assume the partition is >= 2b, then we must be able to find the valid extension if force_extend: side = 0 for dim in range(self.num_dims): valid, extended_bound, bound_size = self.__try_extend(extended_bound, dim, 0, data_threshold) # lower side if valid: break valid, extended_bound, bound_size = self.__try_extend(extended_bound, dim, 1, data_threshold) # upper side if valid: break else: return False, None, None remaining_size = self.node_size - bound_size if remaining_size < data_threshold: return False, None, None cost_before_split = len(self.queryset) * self.node_size cost_bound_split = len(self.queryset) * bound_size skip_gain = cost_before_split - cost_bound_split if force_extend: return True, skip_gain, extended_bound else: return True, skip_gain, max_bound # TODO: should we also return the extended bound? def get_candidate_cuts(self, extended=True, begin_pos=0): """ get the candidate cut positions if extended is set to True, also add medians from all dimensions """ candidate_cut_pos = [] for query in self.queryset: for dim in range(begin_pos, self.num_dims): # check if the cut position is inside the partition, as the queryset are queries overlap this partition if ( query[dim] > self.boundary[dim] and query[dim] < self.boundary[self.num_dims + dim] ): candidate_cut_pos.append((dim, query[dim])) if ( query[self.num_dims + dim] > self.boundary[dim] and query[self.num_dims + dim] < self.boundary[self.num_dims + dim] ): candidate_cut_pos.append((dim, query[self.num_dims + dim])) if extended: for dim in range(self.num_dims): try: split_value = float(np.median(self.dataset[:, dim])) candidate_cut_pos.append((dim, split_value)) except Exception as e: pass return candidate_cut_pos def get_candidate_join_cuts(self, join_attr): dim = join_attr candidate_cut_pos = [] for query in self.queryset: if ( query[dim] >= self.boundary[dim] and query[dim] <= self.boundary[self.num_dims + dim] ): candidate_cut_pos.append((dim, query[dim])) if ( query[self.num_dims + dim] >= self.boundary[dim] and query[self.num_dims + dim] <= self.boundary[self.num_dims + dim] ): candidate_cut_pos.append((dim, query[self.num_dims + dim])) split_value = np.median(self.dataset[:, dim]) candidate_cut_pos.append((dim, split_value)) return candidate_cut_pos def if_split( self, split_dim, split_value, data_threshold, sample_rate=1, test=False ): # rename: if_split_get_gain """ return the skip gain and children partition size if split a node from a given split dimension and split value """ # print("current_node.nid:", current_node.nid) # print("current_node.is_leaf:", current_node.is_leaf) # print("current_node.dataset is None:", current_node.dataset is None) sub_dataset1_size = int( np.count_nonzero(self.dataset[:, split_dim] < split_value) // sample_rate ) # process time: 0.007 sub_dataset2_size = self.node_size - sub_dataset1_size if sub_dataset1_size < data_threshold or sub_dataset2_size < data_threshold: return False, 0, sub_dataset1_size, sub_dataset2_size left_part, right_part, mid_part = self.split_queryset(split_dim, split_value) num_overlap_child1 = len(left_part) + len(mid_part) num_overlap_child2 = len(right_part) + len(mid_part) if test: print( "num left part:", len(left_part), "num right part:", len(right_part), "num mid part:", len(mid_part), ) print( "left part:", left_part, "right part:", right_part, "mid part:", mid_part, ) # temp_child_node1, temp_child_node2 = self.__if_split_get_child(split_dim, split_value) skip_gain = ( len(self.queryset) * self.node_size - num_overlap_child1 * sub_dataset1_size - num_overlap_child2 * sub_dataset2_size ) return True, skip_gain, sub_dataset1_size, sub_dataset2_size def num_query_crossed(self, split_dim, split_value): """ similar to the split_queryset function, but just return how many queries the intended split will cross """ count = 0 if self.queryset is not None: for query in self.queryset: if ( query[split_dim] < split_value and query[self.num_dims + split_dim] > split_value ): count += 1 return count return None def split_queryset(self, split_dim, split_value): """ split the queryset into 3 parts: the left part, the right part, and those cross the split value """ if self.queryset is not None: left_part = [] right_part = [] mid_part = [] for query in self.queryset: # print("[Split Queryset] query:",query, "split dim:", split_dim, "split value", split_value, "query[split dim]:",query[split_dim]) if query[split_dim] >= split_value: # print("[Split Queryset] query is right part") right_part.append(query) elif query[self.num_dims + split_dim] <= split_value: # print("[Split Queryset] query is left part") left_part.append(query) elif ( query[split_dim] < split_value and query[self.num_dims + split_dim] > split_value ): # print("[Split Queryset] query is mid part") mid_part.append(query) else: # print("[Split Queryset] query is nothing") pass # print("[Split Queryset] left part:",len(left_part), "right part:",len(right_part),"mid part:",len(mid_part)) return left_part, right_part, mid_part def if_general_group_split(self, data_threshold): """ the general group split in PAW (this one use merge and doesn't handle overlap) """ if self.query_MBRs is None or len(self.query_MBRs) == 0: # print("PAW: no MBRs") return False def check_MBR_sizes(): for MBR in self.query_MBRs: if MBR.bound_size < data_threshold: return False return True while ( not check_MBR_sizes() and len(self.query_MBRs) >= 2 ): # what if only 1 MBR and its size is less than b # merge MBRs merged_records = [] # (cost, MBR index1, MBR_index2) for i in range(len(self.query_MBRs) - 1): for j in range(i + 1, len(self.query_MBRs)): merged_MBR = self.__if_merge_2MBRs( self.query_MBRs[i], self.query_MBRs[j] ) cost = merged_MBR.num_query * merged_MBR.bound_size merged_records.append((cost, i, j, merged_MBR)) merged_records.sort() merged_MBR = merged_records[0][-1] i, j = merged_records[0][1], merged_records[0][2] del self.query_MBRs[j] del self.query_MBRs[i] # i < j self.query_MBRs.append(merged_MBR) # print("merged MBR size:", merged_MBR.bound_size, "boundary:",merged_MBR.boundary) # check if every partition size is greater than b remaining_size = self.node_size for MBR in self.query_MBRs: remaining_size -= MBR.bound_size if MBR.bound_size < data_threshold: # print("PAW: MBR size < b, len(MBRs):",len(self.query_MBRs)) return False if remaining_size < data_threshold: # print("PAW: remaining size < b") return False # get the cost cost = 0 for MBR in self.query_MBRs: cost += MBR.num_query * MBR.bound_size # return cost skip = len(self.queryset) * self.node_size - cost # print("PAW: skip", skip) return skip def if_dual_bounding_split(self, split_dim, split_value, data_threshold, approximate = False): ''' check whether it's available to perform dual bounding split return availability and skip gain ''' # split queriese first left_part, right_part, mid_part = self.split_queryset(split_dim, split_value) max_bound_left = self.__max_bound(left_part) max_bound_right = self.__max_bound(right_part) # Should we only consider the case when left and right cannot be further split? i.e., [b,2b) # this check logic is given in the PartitionAlgorithm, not here, as the split action should be general naive_left_size = np.count_nonzero(self.dataset[:,split_dim] < split_value) naive_right_size = self.node_size - naive_left_size # get (irregular-shape) sub-partition size left_size = self.query_result_size(max_bound_left, approximate) if left_size is None: # there is no query within the left left_size = naive_left_size # use the whole left part as its size if left_size < data_threshold: return False, None right_size = self.query_result_size(max_bound_right, approximate) if right_size is None: # there is no query within the right right_size = naive_right_size # use the whole right part as its size if right_size < data_threshold: return False, None remaining_size = self.node_size - left_size - right_size if remaining_size < data_threshold: return False, None # check cost cost_before_split = len(self.queryset) * self.node_size cost_dual_split = len(left_part) * left_size + len(right_part) * right_size + len(mid_part) * remaining_size for query in mid_part: # if it overlap left bounding box if max_bound_left is None or self.__is_overlap(max_bound_left, query) > 0: cost_dual_split += left_size # if it overlap right bounding box if max_bound_right is None or self.__is_overlap(max_bound_right, query) > 0: cost_dual_split += right_size skip_gain = cost_before_split - cost_dual_split return True, skip_gain def __if_merge_2MBRs(self, MBR1, MBR2): merged_MBR = copy.deepcopy(MBR1) for i in range(self.num_dims): merged_MBR.boundary[i] = min(merged_MBR.boundary[i], MBR2.boundary[i]) merged_MBR.boundary[self.num_dims + i] = max( merged_MBR.boundary[self.num_dims + i], MBR2.boundary[self.num_dims + i] ) merged_MBR.queries += MBR2.queries merged_MBR.num_query += MBR2.num_query merged_MBR.bound_size = self.query_result_size(merged_MBR.boundary) return merged_MBR def split_query_MBRs(self, split_dim, split_value): if self.query_MBRs is not None: left_part = [] # totally in left right_part = [] # totally in right mid_part = [] for MBR in self.query_MBRs: if MBR.boundary[split_dim] >= split_value: right_part.append(MBR) elif MBR.boundary[self.num_dims + split_dim] <= split_value: left_part.append(MBR) elif ( MBR.boundary[split_dim] < split_value and MBR.boundary[self.num_dims + split_dim] > split_value ): mid_part.append(MBR) # process each mid_part MBR overlap_left_part_queries = [] overlap_right_part_queries = [] for MBR in mid_part: for query in MBR.queries: if query[split_dim] < split_value: overlap_left_part_queries.append(query) if query[self.num_dims + split_dim] > split_value: overlap_right_part_queries.append(query) # generate MBRs for both part. Notice we cannot simply adjust the shape using original MBRs mid_part_left_MBRs = self.generate_query_MBRs(overlap_left_part_queries) mid_part_right_MBRs = self.generate_query_MBRs(overlap_right_part_queries) left_part += mid_part_left_MBRs right_part += mid_part_right_MBRs return left_part, right_part def generate_query_MBRs(self, queryset=None): """ bound the overlapped queries in this partition into MBRs the MBRs will only contains the part inside this partition """ if queryset is None: queryset = self.queryset if len(queryset) == 0: return [] query_MBRs = [] for query in queryset: query_MBRs.append(QueryMBR(query, True)) # print("before merged, number of query MBRs:", len(query_MBRs)) while len(query_MBRs) >= 2: new_query_MBRs = [] merged_qids = [] for i in range(len(query_MBRs) - 1): new_MBR = copy.deepcopy(query_MBRs[i]) if i in merged_qids: continue for j in range(i + 1, len(query_MBRs)): if j in merged_qids: continue if self.__is_overlap( query_MBRs[i].boundary, query_MBRs[j].boundary ): # print("merge:",i,j,query_MBRs[i].boundary,query_MBRs[j].boundary) new_MBR = self.__merge_2MBRs(new_MBR, query_MBRs[j]) merged_qids.append(j) new_query_MBRs.append(new_MBR) # print("for iteration",i, "current new_query_MBRs size:",len(new_query_MBRs)) if len(query_MBRs) - 1 not in merged_qids: new_query_MBRs.append(query_MBRs[-1]) if len(query_MBRs) == len(new_query_MBRs): break else: query_MBRs = copy.deepcopy(new_query_MBRs) # print("after merged, number of query MBRs:", len(query_MBRs)) # bound each query MBRs by its partition boundary, and calculate the result size for MBR in query_MBRs: MBR.boundary = self.__max_bound_single(MBR.boundary) MBR.bound_size = self.query_result_size(MBR.boundary) for query in MBR.queries: MBR.query_result_size.append(self.query_result_size(query)) MBR.total_query_result_size = sum(MBR.query_result_size) self.query_MBRs = query_MBRs return query_MBRs def __merge_2MBRs(self, MBR1, MBR2): """ merge 2 MBRs into 1 (the first one) in this step we do not consider whether the merged MBR exceeds the current partition """ for i in range(self.num_dims): MBR1.boundary[i] = min(MBR1.boundary[i], MBR2.boundary[i]) MBR1.boundary[self.num_dims + i] = max( MBR1.boundary[self.num_dims + i], MBR2.boundary[self.num_dims + i] ) MBR1.queries += MBR2.queries MBR1.num_query += MBR2.num_query return MBR1 def get_query_result(self, query): constraints = [] for d in range(self.num_dims): constraint_L = self.dataset[:, d] >= query[d] constraint_U = self.dataset[:, d] <= query[self.num_dims + d] constraints.append(constraint_L) constraints.append(constraint_U) constraint = np.all(constraints, axis=0) return self.dataset[np.argwhere(constraint == True).flatten()] def query_result_size(self, query, approximate=False): """ get the query result's size on this node the approximate parameter is set to True, the use even distribution to approximate """ if query is None: return None result_size = 0 if approximate: query_volume = 1 volume = 1 for d in range(self.num_dims): query_volume *= query[self.num_dims + d] - query[d] volume *= self.boundary[self.num_dims + d] - self.boundary[d] result_size = int(query_volume / volume * self.node_size) else: constraints = [] for d in range(self.num_dims): constraint_L = self.dataset[:, d] >= query[d] constraint_U = self.dataset[:, d] <= query[self.num_dims + d] constraints.append(constraint_L) constraints.append(constraint_U) constraint = np.all(constraints, axis=0) result_size = np.count_nonzero(constraint) return result_size def extend_bound(self, bound, data_threshold, print_info=False, algorithm=2): """ extend a bound to be at least b, assume the bound is within the partition boundary algorithm == 1: binary search on each dimension algorithm == 2: Ken's extend bound method """ # safe guard current_size = self.query_result_size(bound, approximate=False) if current_size >= data_threshold: return bound, current_size if algorithm == 1: side = 0 for dim in range( self.num_dims ): # or it cannot adapted to other dataset ! #[2,0,1,4,3,5,6]: reranged by distinct values if dim + 1 > self.num_dims: continue valid, bound, bound_size = self.__try_extend( bound, dim, 0, data_threshold, print_info ) # lower side if print_info: print("dim:", dim, "current bound:", bound, valid, bound_size) if valid: break valid, bound, bound_size = self.__try_extend( bound, dim, 1, data_threshold, print_info ) # upper side if print_info: print("dim:", dim, "current bound:", bound, valid, bound_size) if valid: break return bound, bound_size elif algorithm == 2: center = [ (bound[i] + bound[i + self.num_dims]) / 2 for i in range(self.num_dims) ] radius = [ (bound[i + self.num_dims] - bound[i]) / 2 for i in range(self.num_dims) ] f_records = [] for point in self.dataset: dist_ratio = [ abs(point[i] - center[i]) / radius[i] for i in range(self.num_dims) ] max_dist_ratio = max(dist_ratio) f_records.append(max_dist_ratio) f_records.sort() threshold_ratio = f_records[data_threshold] extend_bound_lower = [ center[i] - threshold_ratio * radius[i] for i in range(self.num_dims) ] extend_bound_upper = [ center[i] + threshold_ratio * radius[i] for i in range(self.num_dims) ] extended_bound = extend_bound_lower + extend_bound_upper extended_bound = self.__max_bound_single(extended_bound) bound_size = self.query_result_size(extended_bound, approximate=False) return extended_bound, bound_size # = = = = = internal functions = = = = = def __try_extend( self, current_bound, try_dim, side, data_threshold, print_info=False ): """ side = 0: lower side side = 1: upper side return whether this extend has made bound greater than b, current extended bound, and the size """ # first try the extreme case dim = try_dim if side == 1: dim += self.num_dims extended_bound = copy.deepcopy(current_bound) extended_bound[dim] = self.boundary[dim] bound_size = self.query_result_size(extended_bound, approximate=False) if bound_size < data_threshold: return False, extended_bound, bound_size # binary search in this extend direction L, U = None, None if side == 0: L, U = self.boundary[dim], current_bound[dim] else: L, U = current_bound[dim], self.boundary[dim] if print_info: print("L,U:", L, U) loop_count = 0 while L < U and loop_count < 30: mid = (L + U) / 2 extended_bound[dim] = mid bound_size = self.query_result_size(extended_bound, approximate=False) if bound_size < data_threshold: L = mid elif bound_size > data_threshold: U = mid if U - L < 0.00001: break else: break if print_info: print( "loop,L:", L, "U:", U, "mid:", mid, "extended_bound:", extended_bound, "size:", bound_size, ) loop_count += 1 return bound_size >= data_threshold, extended_bound, bound_size def __is_overlap(self, boundary, query): """ the difference between this function and the public is_overlap function lies in the boundary parameter """ if len(query) != 2 * self.num_dims: return -1 # error overlap_flag = True inside_flag = True for i in range(self.num_dims): if ( query[i] >= boundary[self.num_dims + i] or query[self.num_dims + i] <= boundary[i] ): overlap_flag = False inside_flag = False return 0 elif ( query[i] < boundary[i] or query[self.num_dims + i] > boundary[self.num_dims + i] ): inside_flag = False if inside_flag: return 2 elif overlap_flag: return 1 else: return 0 def __max_bound(self, queryset): """ bound the queries by their maximum bounding rectangle !NOTE it is for a collection of queries!!! then constraint the MBR by the node's boundary! the return bound is in the same form as boundary """ if len(queryset) == 0: return None # if len(queryset) == 1: # pass, I don't think there will be shape issue here max_bound_L = np.amin(np.array(queryset)[:, 0 : self.num_dims], axis=0).tolist() # bound the lower side with the boundary's lower side max_bound_L = np.amax( np.array([max_bound_L, self.boundary[0 : self.num_dims]]), axis=0 ).tolist() max_bound_U = np.amax(np.array(queryset)[:, self.num_dims :], axis=0).tolist() # bound the upper side with the boundary's upper side max_bound_U = np.amin( np.array([max_bound_U, self.boundary[self.num_dims :]]), axis=0 ).tolist() max_bound = max_bound_L + max_bound_U # concat return max_bound def max_bound_for_query(self, q): query = q.copy() return self.__max_bound_single(query) def __max_bound_single(self, query, parent_boundary=None): """ bound anything in the shape of query by the current partition boundary """ if parent_boundary is None: for i in range(self.num_dims): query[i] = max(query[i], self.boundary[i]) query[self.num_dims + i] = min( query[self.num_dims + i], self.boundary[self.num_dims + i] ) return query else: for i in range(self.num_dims): query[i] = max(query[i], parent_boundary[i]) query[self.num_dims + i] = min( query[self.num_dims + i], parent_boundary[self.num_dims + i] ) return query