import numpy as np import copy """ A node class used to construct partition tree. """ class PartitionNode: ''' A partition node, including both the internal and leaf nodes in the partition tree ''' def __init__(self, num_dims=0, boundary=[], nid=None, pid=None, is_irregular_shape_parent=False, is_irregular_shape=False, num_children=0, children_ids=[], is_leaf=True, node_size=0): # print("Initialize PartitionTree Root: num_dims",num_dims,"boundary:",boundary,"children_ids:",children_ids) self.num_dims = num_dims # number of dimensions # the domain, [l1,l2,..,ln, u1,u2,..,un,], for irregular shape partition, one need to exempt its siblings self.boundary = boundary # I think the lower side should be inclusive and the upper side should be exclusive? self.nid = nid # node id self.pid = pid # parent id self.is_irregular_shape_parent = is_irregular_shape_parent # whether the [last] child is an irregular shape partition self.is_irregular_shape = is_irregular_shape # an irregular shape partition cannot be further split, and it must be a leaf node self.num_children = num_children # number of children, should be 0, 2, or 3 self.children_ids = children_ids # if it's the irregular shape parent, then the last child should be the irregular partition self.is_leaf = is_leaf self.node_size = node_size # number of records in this partition # the following attributes will not be serialized self.dataset = None # only used in partition algorithms, temporary, should consist records that within this partition self.queryset = None # only used in partition algorithms, temporary, should consist queries that overlap this partition # beam search self.depth = 0 # only used in beam search, root node depth is 0 def is_overlap(self, query): ''' query is in plain form, i.e., [l1,l2,...,ln, u1,u2,...,un] !query dimension should match the partition dimensions! i.e., all projected or all not projected return 0 if no overlap return 1 if overlap return 2 if inside ''' if len(query) != 2 * self.num_dims: return -1 # error overlap_flag = True inside_flag = True for i in range(self.num_dims): if query[i] > self.boundary[self.num_dims + i] or query[self.num_dims + i] < self.boundary[i]: overlap_flag = False inside_flag = False return 0 elif query[i] < self.boundary[i] or query[self.num_dims + i] > self.boundary[self.num_dims + i]: inside_flag = False if inside_flag: return 2 elif overlap_flag: return 1 else: return 0 def is_overlap_np(self, query): ''' the numpy version of the is_overlap function the query here and boundary class attribute should in the form of numpy array ''' if all((self.boundary[0:self.num_dims] > query[self.num_dims:]) | ( self.boundary[self.num_dims:] <= query[0:self.num_dims])): return 0 # no overlap elif all((self.boundary[0:self.num_dims] >= query[0:self.num_dims]) & ( self.boundary[self.num_dims:] <= query[self.num_dims:])): return 2 # inside else: return 1 # overlap def is_redundant_contain(self, point): ''' used to determine wheter a data point is contained in this node point: [dim1_value, dim2_value,...], should has the same dimensions as this node ''' flag=False for boundary in self.redundant_boundaries: is_located=True for i in range(self.num_dims): if point[i] > boundary[self.num_dims + i] or point[i] < boundary[i]: is_located=False break if not is_located: continue else: flag=True break return flag def is_contain(self, point): ''' used to determine wheter a data point is contained in this node point: [dim1_value, dim2_value,...], should has the same dimensions as this node ''' for i in range(self.num_dims): if point[i] > self.boundary[self.num_dims + i] or point[i] < self.boundary[i]: return False return True def get_candidate_cuts(self, extended=True, begin_pos=0): ''' get the candidate cut positions if extended is set to True, also add medians from all dimensions ''' candidate_cut_pos = [] for query in self.queryset: for dim in range(begin_pos,self.num_dims): # check if the cut position is inside the partition, as the queryset are queries overlap this partition if query[dim] > self.boundary[dim] and query[dim] < self.boundary[self.num_dims + dim]: candidate_cut_pos.append((dim, query[dim])) if query[self.num_dims + dim] > self.boundary[dim] and query[self.num_dims + dim] < self.boundary[ self.num_dims + dim]: candidate_cut_pos.append((dim, query[self.num_dims + dim])) if extended: for dim in range(self.num_dims): try: split_value = float(np.median(self.dataset[:, dim])) candidate_cut_pos.append((dim, split_value)) except Exception as e: pass return candidate_cut_pos def get_candidate_join_cuts(self,join_attr): dim=join_attr candidate_cut_pos = [] for query in self.queryset: if query[dim] >= self.boundary[dim] and query[dim] <= self.boundary[self.num_dims + dim]: candidate_cut_pos.append((dim, query[dim])) if query[self.num_dims + dim] >= self.boundary[dim] and query[self.num_dims + dim] <= self.boundary[self.num_dims + dim]: candidate_cut_pos.append((dim, query[self.num_dims + dim])) split_value = np.median(self.dataset[:, dim]) candidate_cut_pos.append((dim, split_value)) return candidate_cut_pos def if_split(self, split_dim, split_value, data_threshold, sample_rate=1, test=False): # rename: if_split_get_gain ''' return the skip gain and children partition size if split a node from a given split dimension and split value ''' # print("current_node.nid:", current_node.nid) # print("current_node.is_leaf:", current_node.is_leaf) # print("current_node.dataset is None:", current_node.dataset is None) sub_dataset1_size = int(np.count_nonzero(self.dataset[:, split_dim] < split_value)//sample_rate) # process time: 0.007 sub_dataset2_size = self.node_size - sub_dataset1_size if sub_dataset1_size < data_threshold or sub_dataset2_size < data_threshold: return False, 0, sub_dataset1_size, sub_dataset2_size left_part, right_part, mid_part = self.split_queryset(split_dim, split_value) num_overlap_child1 = len(left_part) + len(mid_part) num_overlap_child2 = len(right_part) + len(mid_part) if test: print("num left part:", len(left_part), "num right part:", len(right_part), "num mid part:", len(mid_part)) print("left part:", left_part, "right part:", right_part, "mid part:", mid_part) # temp_child_node1, temp_child_node2 = self.__if_split_get_child(split_dim, split_value) skip_gain = len( self.queryset) * self.node_size - num_overlap_child1 * sub_dataset1_size - num_overlap_child2 * sub_dataset2_size return True, skip_gain, sub_dataset1_size, sub_dataset2_size def num_query_crossed(self, split_dim, split_value): ''' similar to the split_queryset function, but just return how many queries the intended split will cross ''' count = 0 if self.queryset is not None: for query in self.queryset: if query[split_dim] < split_value and query[self.num_dims + split_dim] > split_value: count += 1 return count return None def split_queryset(self, split_dim, split_value): ''' split the queryset into 3 parts: the left part, the right part, and those cross the split value ''' if self.queryset is not None: left_part = [] right_part = [] mid_part = [] for query in self.queryset: # print("[Split Queryset] query:",query, "split dim:", split_dim, "split value", split_value, "query[split dim]:",query[split_dim]) if query[split_dim] >= split_value: # print("[Split Queryset] query is right part") right_part.append(query) elif query[self.num_dims + split_dim] <= split_value: # print("[Split Queryset] query is left part") left_part.append(query) elif query[split_dim] < split_value and query[self.num_dims + split_dim] > split_value: # print("[Split Queryset] query is mid part") mid_part.append(query) else: # print("[Split Queryset] query is nothing") pass # print("[Split Queryset] left part:",len(left_part), "right part:",len(right_part),"mid part:",len(mid_part)) return left_part, right_part, mid_part def get_query_result(self,query): constraints = [] for d in range(self.num_dims): constraint_L = self.dataset[:, d] >= query[d] constraint_U = self.dataset[:, d] <= query[self.num_dims + d] constraints.append(constraint_L) constraints.append(constraint_U) constraint = np.all(constraints, axis=0) return self.dataset[np.argwhere(constraint==True).flatten()] def query_result_size(self, query, approximate=False): ''' get the query result's size on this node the approximate parameter is set to True, the use even distribution to approximate ''' if query is None: return None result_size = 0 if approximate: query_volume = 1 volume = 1 for d in range(self.num_dims): query_volume *= query[self.num_dims + d] - query[d] volume *= self.boundary[self.num_dims + d] - self.boundary[d] result_size = int(query_volume / volume * self.node_size) else: constraints = [] for d in range(self.num_dims): constraint_L = self.dataset[:, d] >= query[d] constraint_U = self.dataset[:, d] <= query[self.num_dims + d] constraints.append(constraint_L) constraints.append(constraint_U) constraint = np.all(constraints, axis=0) result_size = np.count_nonzero(constraint) return result_size def extend_bound(self, bound, data_threshold, print_info=False, algorithm=2): ''' extend a bound to be at least b, assume the bound is within the partition boundary algorithm == 1: binary search on each dimension algorithm == 2: Ken's extend bound method ''' # safe guard current_size = self.query_result_size(bound, approximate=False) if current_size >= data_threshold: return bound, current_size if algorithm == 1: side = 0 for dim in range( self.num_dims): # or it cannot adapted to other dataset ! #[2,0,1,4,3,5,6]: reranged by distinct values if dim + 1 > self.num_dims: continue valid, bound, bound_size = self.__try_extend(bound, dim, 0, data_threshold, print_info) # lower side if print_info: print("dim:", dim, "current bound:", bound, valid, bound_size) if valid: break valid, bound, bound_size = self.__try_extend(bound, dim, 1, data_threshold, print_info) # upper side if print_info: print("dim:", dim, "current bound:", bound, valid, bound_size) if valid: break return bound, bound_size elif algorithm == 2: center = [(bound[i] + bound[i + self.num_dims]) / 2 for i in range(self.num_dims)] radius = [(bound[i + self.num_dims] - bound[i]) / 2 for i in range(self.num_dims)] f_records = [] for point in self.dataset: dist_ratio = [abs(point[i] - center[i]) / radius[i] for i in range(self.num_dims)] max_dist_ratio = max(dist_ratio) f_records.append(max_dist_ratio) f_records.sort() threshold_ratio = f_records[data_threshold] extend_bound_lower = [center[i] - threshold_ratio * radius[i] for i in range(self.num_dims)] extend_bound_upper = [center[i] + threshold_ratio * radius[i] for i in range(self.num_dims)] extended_bound = extend_bound_lower + extend_bound_upper extended_bound = self.__max_bound_single(extended_bound) bound_size = self.query_result_size(extended_bound, approximate=False) return extended_bound, bound_size # = = = = = internal functions = = = = = def __try_extend(self, current_bound, try_dim, side, data_threshold, print_info=False): ''' side = 0: lower side side = 1: upper side return whether this extend has made bound greater than b, current extended bound, and the size ''' # first try the extreme case dim = try_dim if side == 1: dim += self.num_dims extended_bound = copy.deepcopy(current_bound) extended_bound[dim] = self.boundary[dim] bound_size = self.query_result_size(extended_bound, approximate=False) if bound_size < data_threshold: return False, extended_bound, bound_size # binary search in this extend direction L, U = None, None if side == 0: L, U = self.boundary[dim], current_bound[dim] else: L, U = current_bound[dim], self.boundary[dim] if print_info: print("L,U:", L, U) loop_count = 0 while L < U and loop_count < 30: mid = (L + U) / 2 extended_bound[dim] = mid bound_size = self.query_result_size(extended_bound, approximate=False) if bound_size < data_threshold: L = mid elif bound_size > data_threshold: U = mid if U - L < 0.00001: break else: break if print_info: print("loop,L:", L, "U:", U, "mid:", mid, "extended_bound:", extended_bound, "size:", bound_size) loop_count += 1 return bound_size >= data_threshold, extended_bound, bound_size def __is_overlap(self, boundary, query): ''' the difference between this function and the public is_overlap function lies in the boundary parameter ''' if len(query) != 2 * self.num_dims: return -1 # error overlap_flag = True inside_flag = True for i in range(self.num_dims): if query[i] >= boundary[self.num_dims + i] or query[self.num_dims + i] <= boundary[i]: overlap_flag = False inside_flag = False return 0 elif query[i] < boundary[i] or query[self.num_dims + i] > boundary[self.num_dims + i]: inside_flag = False if inside_flag: return 2 elif overlap_flag: return 1 else: return 0 def __max_bound(self, queryset): ''' bound the queries by their maximum bounding rectangle !NOTE it is for a collection of queries!!! then constraint the MBR by the node's boundary! the return bound is in the same form as boundary ''' if len(queryset) == 0: return None # if len(queryset) == 1: # pass, I don't think there will be shape issue here max_bound_L = np.amin(np.array(queryset)[:, 0:self.num_dims], axis=0).tolist() # bound the lower side with the boundary's lower side max_bound_L = np.amax(np.array([max_bound_L, self.boundary[0:self.num_dims]]), axis=0).tolist() max_bound_U = np.amax(np.array(queryset)[:, self.num_dims:], axis=0).tolist() # bound the upper side with the boundary's upper side max_bound_U = np.amin(np.array([max_bound_U, self.boundary[self.num_dims:]]), axis=0).tolist() max_bound = max_bound_L + max_bound_U # concat return max_bound def max_bound_for_query(self,q): query=q.copy() return self.__max_bound_single(query) def __max_bound_single(self, query, parent_boundary=None): ''' bound anything in the shape of query by the current partition boundary ''' if parent_boundary is None: for i in range(self.num_dims): query[i] = max(query[i], self.boundary[i]) query[self.num_dims + i] = min(query[self.num_dims + i], self.boundary[self.num_dims + i]) return query else: for i in range(self.num_dims): query[i] = max(query[i], parent_boundary[i]) query[self.num_dims + i] = min(query[self.num_dims + i], parent_boundary[self.num_dims + i]) return query