#include "main.h" #include "buffer.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include "xxhash32.h" #include #include #define NO_KEY INT_MIN #define TOMBSTONE INT_MAX #define PAGE_SIZE 4096 #define B 512 // Can be changed, but should be 2^n for some n such that B DB entries will fit within one page. #define BIN_SEARCH 0 #define BTREE_SEARCH 1 #ifndef O_DIRECT // On machines that have O_DIRECT, our code is designed to use it. #define O_DIRECT O_RDONLY // On machines where O_DIRECT is disallowed, use O_RDONLY instead. #endif #define DEBUG 0 // Set to 0 for experimental use / real-world , 1 for testing. DB::DB(std::string name): name(name), mem_table(nullptr) { buffer_args[0] = 0; buffer_args[1] = NO_KEY; buffer_args[2] = NO_KEY; } int sizeToDepth(int size) { if (size < 0) return 0; // edge case, log only defined on >= 0 return std::ceil(std::log2(size)); } // Sums the array from start to size int array_sum(int* arr, int start, int size) { int sum = 0; for (int i = start; i < size; i++) { sum += arr[i]; } return sum; } // Public Methods DB* Open(std::string name, int max_size, int initial_directory_size, int max_directory_size, int search_alg, int bloomfilter_bits) { DB* db = new DB(name); db->mem_table = new AVLTree(max_size, name); db->buffer = new BufferPool(sizeToDepth(initial_directory_size), sizeToDepth(max_directory_size)); db->sst_counter = 0; // Update this to be persistent db->max_size = max_size; db->buffer->no_buffer = false; db->bloomfilter_num_bits = bloomfilter_bits; if(max_directory_size == 0) db->buffer->no_buffer = true; // Moved from AVLTree Class struct stat sb; if (stat(name.c_str(), &sb) == 0){ for (const auto & entry : std::filesystem::directory_iterator(name)){ if ((strcmp(entry.path().c_str(), (name + "/sizes.bin").c_str()) != 0)) { std::string filename = entry.path().c_str(); size_t back = filename.find_last_of("."); std::string no_extension = filename.substr(0, back); size_t front = no_extension.find_last_not_of("0123456789"); std::string result = no_extension.substr(front + 1); db->sst_counter = std::max(db->sst_counter, stoi(result)+1); } } } db->search_alg = search_alg; return db; } int DB::Close() { int new_size = transform_to_sst(); // Flush the current memtable to disk if (sst_counter == 0) sst_counter++; sst_counter = std::max(sst_counter, new_size + 1); delete(DB::mem_table); DB:buffer->destroyBuffer(); // Currently is freeing nothing, and failing a malloc. delete this; // suicide, be careful return 0; } // Checks for invalid keys / values, then inserts the pair. int DB::put(int key, int value) { if(key == NO_KEY || key == TOMBSTONE || value == NO_KEY || value == TOMBSTONE){ std::cerr << "Entry (" << key << ", " << value << ") not inserted. \nThe values " << NO_KEY << " and " << TOMBSTONE << " are not allowed as keys or values in this DB.\n"; return -1; } return insert(key, value); } // Returns the value matching key, or NO_KEY if the key is not in the database or deleted. int DB::get(int key) { // first searches the tree itself, and then searches the SSTs in-order if the result is not in the tree. int tree_search_result = mem_table->get(key); if(tree_search_result == TOMBSTONE){ // key has been deleted. std::cerr << "Key " << key << " not found.\n"; return NO_KEY; } if (tree_search_result != NO_KEY) return tree_search_result; // Iterate over SSTs bool buf = true; if ((DB::buffer_args[0] == 1) && (DB::buffer_args[1] == key)) buf = false; for(int sst_num = 0; sst_num < sst_counter; sst_num++){ std::string sst_name = DB::name + "/sst" + std::to_string(sst_num) + ".bin"; if (std::filesystem::exists(sst_name)){ if (check_bloom_filters(sst_num, key)) { int sst_search_result; if (search_alg == BIN_SEARCH) { sst_search_result = get_from_SST_Binary(sst_num, key, buf); } else { // btree search sst_search_result = get_from_SST_BTree(sst_num, key, buf); } if ((sst_search_result != NO_KEY) && (sst_search_result != TOMBSTONE)){ DB::buffer_args[0] = 1; DB::buffer_args[1] = key; DB::buffer_args[2] = NO_KEY; return sst_search_result; } } } } DB::buffer_args[0] = 1; DB::buffer_args[1] = key; DB::buffer_args[2] = NO_KEY; // Key not found std::cerr << "Key " << key << " not found.\n"; return NO_KEY; } // An update is identical to a put in our current DB, so update() simply calls put(). int DB::update(int key, int value){ return put(key, value); } // To remove a key, we insert a tombstone. We can check for users trying to delete invalid keys, still. int DB::remove(int key){ if(key == NO_KEY || key == TOMBSTONE){ std::cerr << "Entry with key " << key << " is not permitted in the DB, and can not be deleted. \nThe values " << NO_KEY << " and " << TOMBSTONE << " are not allowed as keys or values in this DB.\n"; return -1; } return insert(key, TOMBSTONE); } // Retrieves all KV-pairs in a key range in key order. struct kv_pairs* DB::scan(int key1, int key2) { kv_pairs *scanned_kv = mem_table->scan(key1, key2); // Scan the memtable kv_pairs *SST_kv = new kv_pairs; // temporary struct for each SST to use kv_pairs *merged_kv = new kv_pairs; // temporary struct to hold the merged kv pairs before it is re-assigned to scanned_kv SST_kv->kv = (int**) malloc(sizeof(int*) * mem_table->max_size * sst_counter * 2); // todo: find a better bound bool buf = true; if ((DB::buffer_args[0] == 2) && (DB::buffer_args[1] == key1) && (DB::buffer_args[2] == key2)) buf = false; // Iterate over SSTs for (int sst_num = 0; sst_num < sst_counter; sst_num++) { // some layers of the sst may not exist due to lsm structure if (std::filesystem::exists((name + "/sst" + std::to_string(sst_num) + ".bin").c_str()) == 0) continue; SST_kv->kv_size = 0; if (search_alg == BIN_SEARCH) { scan_SST_Binary(sst_num, key1, key2, SST_kv, buf); } else { scan_SST_BTree(sst_num, key1, key2, SST_kv, buf); } merged_kv->kv = (int**) malloc(sizeof(int*) * (scanned_kv->kv_size + SST_kv->kv_size)); // allocate enough space for all pairs in both merge_kv_pairs(scanned_kv, SST_kv, merged_kv); free(scanned_kv->kv); // free the old scanned kv so it can be replaced by the new merged one scanned_kv->kv = merged_kv->kv; scanned_kv->kv_size = merged_kv->kv_size; } free(SST_kv->kv); delete(SST_kv); // no longer needed delete(merged_kv); remove_scanned_tombstones(scanned_kv); // clean scanned kv of tombstones for returning DB::buffer_args[0] = 2; DB::buffer_args[1] = key1; DB::buffer_args[2] = key2; return scanned_kv; } // Helper function for end-user. Frees a KV pairs object once the user no longer requires it. void DB::freeKVpairs(kv_pairs* p) { mem_table->freeKVpairs(p); } // Setting the maximum directory size for the buffer pool void DB::set_max_dir_size(int size) { int depth = sizeToDepth(size); DB::buffer->setMaxDirectoryDepth(depth); } // DB private methods // Inserts a KV pair into the DB, without validating the key or value. Used by both put() and delete(). int DB::insert(int key, int value){ // If full, move to SST before inserting if (mem_table->cur_size == mem_table->max_size){ int new_size = transform_to_sst(); if (sst_counter == 0) sst_counter++; sst_counter = std::max(sst_counter, new_size + 1); mem_table->root = nullptr; // reset root } mem_table->put(key, value); return 0; } // Because we implement Monkey, the number of bits assigned to a bloom filter depends on the level of the LSM tree it corresponds to. int DB::get_bloom_size(int lvl) { int num_entry = pow(2, lvl) * (DB::mem_table->max_size); if (lvl >= DB::sst_counter - 1) return num_entry * (DB::bloomfilter_num_bits - 1); // lowest level of LSM tree return num_entry * (DB::bloomfilter_num_bits + (DB::sst_counter - 1 - lvl)); // other levels } // For a given key and LSM tree level, return the ln(2) * m hashes of our key. std::vector DB::bloom_hash(int lvl, int key){ int m; if (lvl >= DB::sst_counter - 1){ m = (DB::bloomfilter_num_bits - 1); } else { m = (DB::bloomfilter_num_bits + (DB::sst_counter - 1 - lvl)); } int num_hash = round(m * log(2)); int k = key; std::vector hashes; for (int i = 0; i < num_hash; i++){ uint32_t keyHash = (XXHash32::hash(&k, sizeof(int), i)) % (get_bloom_size(lvl)); hashes.push_back(keyHash); } return hashes; } // Creates a bloom filter from a set of KV pairs. We assume this KV pairs corresponds to all of the pairs from level 'lvl' of the LSM tree. std::vector DB::create_bloom_filter(int lvl, kv_pairs* scanned_kv){ int size = DB::get_bloom_size(lvl); std::vector filter(size, false); for (int i = 0; i < scanned_kv->kv_size; i++) { std::vector hashes = bloom_hash(lvl, scanned_kv->kv[i][0]); for (uint32_t h : hashes){ filter[h] = true; } } return filter; } int DB::get_offset(int sst_num, int index, int num_leaf_pages, int sst_size, int leaf_page) { int offset; if (num_leaf_pages <= 1) { offset = leaf_page * PAGE_SIZE + floor(index/B) * PAGE_SIZE + (index % B) * sizeof(int) * 2; } // The last two pages needed to be split so that they'd have between B/2 and B leaves each // Check if the element on index is at one of the last two pages else if (index >= num_leaf_pages - 2 && sst_size % B != 0 && max_size % B != 0 && max_size % B < ceil(float(B)/float(2))) { if (index < (num_leaf_pages - 2) * B + ceil(float(B)/float(2))) { // on the first half --> second last page offset = leaf_page * PAGE_SIZE + (num_leaf_pages - 2) * PAGE_SIZE + (index % B) * sizeof(int) * 2; } else { // on last page offset = leaf_page * PAGE_SIZE + (num_leaf_pages - 1) * PAGE_SIZE + (index - (sst_size - (num_leaf_pages-2) * B - ceil(float(B)/float(2)))) * sizeof(int) * 2; } } else { offset = leaf_page * PAGE_SIZE + floor(index/B) * PAGE_SIZE + (index % B) * sizeof(int) * 2; } return offset; } // Returns the size of the th SST int DB::get_SST_size(int sst_num) { int fd = open((name + "/sizes.bin").c_str(), O_RDONLY); int sst_size; pread(fd, &sst_size, sizeof(int), sizeof(int) * sst_num); // read sst size close(fd); return sst_size; } // Gets the offset in the SST of the key at index in cur_page of the SST int DB::get_internal_node_offset(int cur_page, int index) { return cur_page * PAGE_SIZE + index * sizeof(int) * 2; } // Scans the SST with sst_num via B-Tree search for keys in range key1 to key2, and adds valid pairs into scanned_kv. void DB::scan_SST_BTree(int sst_num, int key1, int key2, struct kv_pairs *scanned_kv, bool args) { // Initializing variables that will help with sizing of the internal nodes int sst_size = get_SST_size(sst_num); int num_layers = ceil(log(sst_size) / log(B)); int cur_layer = 1; int node_size = 0; int cur_page = 0; // Calculate the number of leaf pages int num_leaf_pages = ceil((float) sst_size / (float) B); // Calculate the page number of the leaf page int num_pages [num_layers]; // Store the number of pages needed in each layer, starting with leaves and going up num_pages[0] = num_leaf_pages; int leaf_page = 0; // Count the number of pages that we need on each layer for (int i = 1; i < num_layers; i++) { // Check if we have already found a layer with the root node if (num_pages[i-1] == 1) { break; } num_pages[i] = ceil(float(num_pages[i-1]) / float(B)); leaf_page += num_pages[i]; } // Set up binary search int left = 1; // left boundary of binary search int right = node_size; // right boundary of binary search int middle; // current index to query in binary search int middle_value; // buffer to hold value of middle element. This will be the key. int idx; // will hold the least index in the binary search that is greater than key1 int offset; // offset to search at std::string sst_name = (name + "/sst" + std::to_string(sst_num) + ".bin").c_str(); // Assertion: sst_size >= 1 (an empty SST is illogical). DOUBLE CHECK THIS if (sst_size < 1) return; // While cur_layer is not num_layers (we have not reached the leaf nodes yet), // keep searching the internal nodes with binary search while (cur_layer != num_layers) { if (cur_page >= leaf_page) break; // Binary search to find the first element in the range node_size = read_SST(sst_name, sst_num, cur_page * PAGE_SIZE, args); left = 1; // 1-indexed, as node size takes up the first sizeof(int) of the SST right = node_size; // If the rightmost key is less than key1, then it is on the last offset if (key1 > read_SST(sst_name, sst_num, get_internal_node_offset(cur_page, right), args)) { cur_layer++; // Assign cur_page to the offset suggested by the BTree (the offset after the key at index ) cur_page = read_SST(sst_name, sst_num, get_internal_node_offset(cur_page, right) + sizeof(int), args); continue; } // Otherwise, binary search the internal nodes to find the last node that is greater than or equal to key // and navigate to the the internal node inferred while (left <= right) { middle = (left + right) / 2; middle_value = read_SST(sst_name, sst_num, get_internal_node_offset(cur_page, middle), args); // If key1 is greater than middle, then move left side to mid + 1 if (middle_value < key1) { left = middle + 1; } else { idx = middle; right = middle - 1; } } // Set cur page to the value indicated by the offset for idx and then increment cur_layer cur_page = read_SST(sst_name, sst_num, get_internal_node_offset(cur_page, idx) - sizeof(int), args); // offset should be stored at index before the found key cur_layer++; } // We should now have the page that our leaf node is in - binary search again here to find it // First we need to find the size of this page int page_size; if (num_leaf_pages <= 1) { // if there's only one leaf page, must contain all the leaves page_size = sst_size; } else if (cur_page < num_leaf_pages + leaf_page - 2 || sst_size % B == 0) { // not on last two pages, or the sst size is a multiple of B, so page size must be B page_size = B; } else if (sst_size % B < ceil(float(B)/float(2))) { // the last two pages needed to be balanced if (cur_page == num_leaf_pages + leaf_page - 1) { // last page page_size = B - ceil(float(B)/float(2)); } else { // second last page page_size = ceil(float(B)/float(2)); } } else if (cur_page == num_leaf_pages + leaf_page - 1) { // on last page, did not need to be balanced page_size = B; } else { // on second last page, did not need to be balanced page_size = num_leaf_pages % B; } // Binary search the leaves left = 0; right = page_size; while(left < right){ middle = (left + right) / 2; middle_value = read_SST(sst_name, sst_num, cur_page * PAGE_SIZE + middle * sizeof(int) * 2, args); if (middle_value < key1){left = middle + 1;} // if value is less than key, nothing before it is in-range. else {right = middle;} // otherwise, the value could possibly be the smallest in-range element. } // Scan to get all the elements int kvpair_buffer[2]; // buffer to hold current KV pair being read in offset = cur_page * PAGE_SIZE + left * sizeof(int) * 2; // Get the offset while (offset < leaf_page * PAGE_SIZE + num_leaf_pages * PAGE_SIZE) { kvpair_buffer[0] = read_SST(sst_name, sst_num, offset, args); if(kvpair_buffer[0] > key2) break; // stop reading from this SST once we have surpassed range // move to next page if (kvpair_buffer[0] == NO_KEY) { cur_page += 1; offset = cur_page * PAGE_SIZE; continue; } kvpair_buffer[1] = read_SST(sst_name, sst_num, offset + sizeof(int), args); int * pair; pair = new int [2]; pair[0] = kvpair_buffer[0]; pair[1] = kvpair_buffer[1]; scanned_kv->kv[scanned_kv->kv_size] = pair; scanned_kv->kv_size += 1; offset += sizeof(int) * 2; } } // Scans the SST with sst_num via Binary search for keys in range key1 to key2, and adds valid pairs into scanned_kv. void DB::scan_SST_Binary(int sst_num, int key1, int key2, struct kv_pairs *scanned_kv, bool args){ int sst_size = get_SST_size(sst_num); if (sst_size < 1) return; // confirm that SST is non-empty. // we are going to binary search to find the smallest element that is in-range, and then // iterate through each element to add to our list of kv-pairs until we reach one out of range. // set up binary search int left = 0; // left boundry of binary search int right = sst_size - 1; // right boundry of binary search int middle; // current index to query in binary search int middle_value; // buffer to hold value of middle element. This will be the key. int offset; // offset in SST to read at. std::string sst_name = (name + "/sst" + std::to_string(sst_num) + ".bin").c_str(); int num_leaf_pages = ceil((float) sst_size / (float) B); // Calculate the page number of the leaf page int num_layers = ceil(log(sst_size) / log(B)); // O(log_B(max_size)) internal node layers int num_pages [num_layers]; // Store the number of pages needed in each layer, starting with leaves and going up num_pages[0] = num_leaf_pages; int leaf_page = 0; // Count the number of pages that we need on each layer for (int i = 1; i < num_layers; i++) { // Check if we have already found a layer with the root node if (num_pages[i-1] == 1) break; num_pages[i] = ceil(float(num_pages[i-1]) / float(B)); leaf_page += num_pages[i]; } // Once we exit the loop, we will have left = right = smallest element in-range. while(left < right){ middle = (left + right) / 2; // Get the offset of the middle index offset = get_offset(sst_num, middle, num_leaf_pages, sst_size, leaf_page); middle_value = read_SST(sst_name, sst_num, offset, args); if (middle_value < key1){left = middle + 1;} // if value is less than key, nothing before it is in-range. else {right = middle;} // otherwise, the value could possibly be the smallest in-range element. } int kvpair_buffer[2]; // buffer to hold current KV pair being read in // Get the page that left is on int cur_page = floor(float(get_offset(sst_num, left, num_leaf_pages, sst_size, leaf_page)) / PAGE_SIZE); for (int i = left; i < sst_size; i++) { offset = get_offset(sst_num, i, num_leaf_pages, sst_size, leaf_page); kvpair_buffer[0] = read_SST(sst_name, sst_num, offset, args); if(kvpair_buffer[0] > key2) break; // stop reading from this SST once we have surpassed range // move to next page if (kvpair_buffer[0] == INT_MIN) { cur_page += 1; offset = cur_page * PAGE_SIZE; continue; } kvpair_buffer[1] = read_SST(sst_name, sst_num, offset + sizeof(int), args); int * pair; pair = new int [2]; pair[0] = kvpair_buffer[0]; pair[1] = kvpair_buffer[1]; scanned_kv->kv[scanned_kv->kv_size] = pair; scanned_kv->kv_size += 1; } } // Merges kv_1 and kv_2 together, storing them in merged_kv. // Assumes kv_1 is newer than kv_2, and thus always prefers kv_1 over kv_2 when keys match. void DB::merge_kv_pairs(struct kv_pairs *kv_1, struct kv_pairs *kv_2, struct kv_pairs *merged_kv){ int kv_1_count = 0; int kv_2_count = 0; int merged_count = 0; // Keep track of number of discarded elements // while ((kv_1_count < kv_1->kv_size) && (kv_2_count < kv_2->kv_size)){ if (kv_1->kv[kv_1_count][0] < kv_2->kv[kv_2_count][0]){ merged_kv->kv[merged_count] = kv_1->kv[kv_1_count]; kv_1_count++; } else if (kv_1->kv[kv_1_count][0] > kv_2->kv[kv_2_count][0]){ merged_kv->kv[merged_count] = kv_2->kv[kv_2_count]; kv_2_count++; } else{ // matching keys, discard older version merged_kv->kv[merged_count] = kv_1->kv[kv_1_count]; kv_1_count++; kv_2_count++; } merged_count++; } // insert all remaining elements from kv_1 while(kv_1_count < kv_1->kv_size){ merged_kv->kv[merged_count] = kv_1->kv[kv_1_count]; kv_1_count++; merged_count++; } // insert all remaining elements from kv_2. Only one of these two while loops will execute. while(kv_2_count < kv_2->kv_size){ merged_kv->kv[merged_count] = kv_2->kv[kv_2_count]; kv_2_count++; merged_count++; } merged_kv->kv_size = merged_count; } // Takes final kv pairs struct from scan, and removes any tombstones present. void DB::remove_scanned_tombstones(struct kv_pairs *scanned_kv){ // Create new array of kv-pairs int** clean_kv = (int**) malloc(sizeof(int*) * (scanned_kv->kv_size)); int clean_index = 0; for(int i = 0; i < scanned_kv->kv_size; i++){ if(scanned_kv->kv[i][1] != TOMBSTONE){ clean_kv[clean_index] = scanned_kv->kv[i]; clean_index++; } } free(scanned_kv->kv); // remove old kv array scanned_kv->kv = clean_kv; scanned_kv->kv_size = clean_index; } // gets the element matching key from SST sst_num, using B-Tree search. int DB::get_from_SST_BTree(int sst_num, int key, bool args) { // Initializing variables that will help with sizing of the internal nodes int sst_size = get_SST_size(sst_num); int num_layers = ceil(log(sst_size) / log(B)); int cur_layer = 1; int node_size = 0; int cur_page = 0; // Calculate the number of leaf pages int num_leaf_pages = ceil((float) sst_size / (float) B); // Calculate the page number of the leaf page int num_pages [num_layers]; // Store the number of pages needed in each layer, starting with leaves and going up num_pages[0] = num_leaf_pages; int leaf_page = 0; // Count the number of pages that we need on each layer for (int i = 1; i < num_layers; i++) { // Check if we have already found a layer with the root node if (num_pages[i-1] == 1) break; num_pages[i] = ceil(float(num_pages[i-1]) / float(B)); leaf_page += num_pages[i]; } // Set up binary search int left = 1; // left boundary of binary search int right = node_size; // right boundary of binary search int middle; // current index to query in binary search int middle_value; // buffer to hold value of middle element. This will be the key. int idx; // will hold the least index in the binary search that is greater than key1 int offset; // offset to search at std::string sst_name = (name + "/sst" + std::to_string(sst_num) + ".bin").c_str(); // Assertion: sst_size >= 1 (an empty SST is illogical). if (sst_size < 1) return NO_KEY; // While cur_layer is not num_layers (we have not reached the leaf nodes yet), // keep searching the internal nodes with binary search while (cur_layer != num_layers) { if (cur_page >= leaf_page) break; // Binary search to find the first element in the range node_size = read_SST(sst_name, sst_num, cur_page * PAGE_SIZE, args); left = 1; // 1-indexed, as node size takes up the first sizeof(int) of the SST right = node_size; // If the rightmost key is less than key1, then it is on the last offset if (key > read_SST(sst_name, sst_num, get_internal_node_offset(cur_page, right), args)) { cur_layer++; // Assign cur_page to the offset suggested by the BTree (the offset after the key at index ) cur_page = read_SST(sst_name, sst_num, get_internal_node_offset(cur_page, right) + sizeof(int), args); continue; } // Otherwise, binary search the internal nodes to find the last node that is greater than or equal to key // and navigate to the the internal node inferred while (left <= right) { middle = (left + right) / 2; middle_value = read_SST(sst_name, sst_num, get_internal_node_offset(cur_page, middle), args); // If key1 is greater than middle, then move left side to mid + 1 if (middle_value < key) { left = middle + 1; } else { idx = middle; right = middle - 1; } } // Set cur page to the value indicated by the offset for idx and then increment cur_layer cur_page = read_SST(sst_name, sst_num, get_internal_node_offset(cur_page, idx) - sizeof(int), args); // offset should be stored at index before the found key cur_layer++; } // We should now have the page that our leaf node is in - binary search again here to find it // First we need to find the size of this page int page_size; if (num_leaf_pages <= 1) { // if there's only one leaf page, must contain all the leaves page_size = sst_size; } else if (cur_page < num_leaf_pages + leaf_page - 2 || sst_size % B == 0) { // not on last two pages, or the sst size is a multiple of B, so page size must be B page_size = B; } else if (sst_size % B < ceil(float(B)/float(2))) { // the last two pages needed to be balanced if (cur_page == num_leaf_pages + leaf_page - 1) { // last page page_size = B - ceil(float(B)/float(2)); } else { // second last page page_size = ceil(float(B)/float(2)); } } else if (cur_page == num_leaf_pages + leaf_page - 1) { // on last page, did not need to be balanced page_size = B; } else { // on second last page, did not need to be balanced page_size = num_leaf_pages % B; } // Binary search the leaves left = 0; right = page_size; while(left <= right){ middle = (left + right) / 2; middle_value = read_SST(sst_name, sst_num, cur_page * PAGE_SIZE + middle * sizeof(int) * 2, args); if (middle_value < key){left = middle + 1;} if (middle_value > key){right = middle - 1;} if (middle_value == key) return read_SST(sst_name, sst_num, cur_page * PAGE_SIZE + middle * sizeof(int) * 2 + sizeof(int), args); } return NO_KEY; } // gets the element matching key from SST sst_num, using Binary search. int DB::get_from_SST_Binary(int sst_num, int key, bool args){ int sst_size = get_SST_size(sst_num); int num_leaf_pages = ceil((float) sst_size / (float) B); // Calculate the number of leaf pages // Calculate the page number of the leaf page int num_layers = ceil(log(sst_size) / log(B)); // O(log_B(max_size)) internal node layers int num_pages [num_layers]; // Store the number of pages needed in each layer, starting with leaves and going up num_pages[0] = num_leaf_pages; int leaf_page = 0; std::string sst_name = (name + "/sst" + std::to_string(sst_num) + ".bin").c_str(); // Count the number of pages that we need on each layer for (int i = 1; i < num_layers; i++) { // Check if we have already found a layer with the root node if (num_pages[i-1] == 1) break; num_pages[i] = ceil(float(num_pages[i-1]) / float(B)); leaf_page += num_pages[i]; } // set up binary search int left = 0; // left boundry of binary search int right = sst_size - 1; // right boundry of binary search int middle; // current index to query in binary search int middle_value; // buffer to hold value of middle element. This will be the key, but if the key matches we can write the value to here instead. int offset; // offset in SST to read middle at. while(left <= right){ middle = (left + right) / 2; offset = get_offset(sst_num, middle, num_leaf_pages, sst_size, leaf_page); middle_value = read_SST(sst_name, sst_num, offset, args); if (middle_value < key){ left = middle + 1; } if (middle_value > key){ right = middle - 1; } if (middle_value == key) { return read_SST(sst_name, sst_num, offset + sizeof(int), args); } } return NO_KEY; } // Reads the value at offset in SST number sst_num. First checks the buffer, and only reads from file if it is not in the buffer. // Note that the buffer page is defined by memtable_maxsize * 2^(sst_num) + (offset / PAGE_SIZE). int DB::read_SST(std::string sst_name, int sst_num, int offset, bool args){ int page_num = (((1 << sst_num) - 1) * mem_table->max_size) + (offset / PAGE_SIZE); // page we are reading from if(DEBUG) assert(page_num < (((1 << (sst_num + 1)) - 1) * mem_table->max_size) ); // assert all reads from SST N do not bleed into SST N + 1 (page numbering is valid) int page_offset = offset % PAGE_SIZE; // offset within page // Case where there is no buffer, so we read directly from file. DOES NOT USE DIRECT I/O. if(buffer->no_buffer){ int read_val; int fd = open(sst_name.c_str(), O_RDONLY); pread(fd, &read_val, sizeof(int), offset); close(fd); return read_val; } // Case where we have a buffer. USES DIRECT I/O char* page; page = buffer->getPage(page_num); if (page == nullptr){ // if page is not in buffer posix_memalign(reinterpret_cast(&page), PAGE_SIZE, PAGE_SIZE); // Allocate 4KB of page-alligned memory for new page // Read in page int fd = open((name + "/sst" + std::to_string(sst_num) + ".bin").c_str(), O_DIRECT); pread(fd, page, PAGE_SIZE, (offset / PAGE_SIZE) * PAGE_SIZE); close(fd); if (args) buffer->insertPage(page_num, page); } // We now have the needed page. int* value = (int*) (page + page_offset); // is this legal? "I will make it so" return *value; } void AVLTree::update_size(int sst_num, int sst_size) { int fd = open((db_name + "/sizes.bin").c_str(), O_RDWR | O_CREAT, 0777); pwrite(fd, &sst_size, sizeof(int), sst_num * sizeof(int)); close(fd); } // Get the page where the leaf nodes start on the SST with sst_num int AVLTree::get_leaf_page(int sst_size) { // Calculate the number of leaf pages int num_leaf_pages = ceil((float) sst_size / (float) B); // Calculate the page number of the leaf page int num_layers = ceil(log(sst_size) / log(B)); // O(log_B(max_size)) internal node layers int num_pages [num_layers]; // Store the number of pages needed in each layer, starting with leaves and going up num_pages[0] = num_leaf_pages; int leaf_page = 0; // Count the number of pages that we need on each layer for (int i = 1; i < num_layers; i++) { // Check if we have already found a layer with the root node if (num_pages[i-1] == 1) break; num_pages[i] = ceil(float(num_pages[i-1]) / float(B)); leaf_page += num_pages[i]; } return leaf_page; } int AVLTree::count_num_repeats(int size1, int size2, int fd1, int fd2, int in1, int in2) { int num_read1 = 0; int num_read2 = 0; int page1 = get_leaf_page(size1); int page2 = get_leaf_page(size2); int offset1 = PAGE_SIZE * page1; int offset2 = PAGE_SIZE * page2; int num_repeats = 0; while (num_read1 < size1 || num_read2 < size2) { // Reads the next key into the buffer if we are out of data in the page. // INT_MIN means nothing is there, need to move to the next page to find more data. if (in1 == INT_MIN && num_read1 < size1) { page1 += 1; offset1 = page1 * PAGE_SIZE; pread(fd1, &in1, sizeof(int), offset1); } if (in2 == INT_MIN && num_read2 < size2) { page2 += 1; offset2 = page2 * PAGE_SIZE; pread(fd2, &in2, sizeof(int), offset2); } // Compare the keys and put the lesser one into the output buffer. If they are equal, we ignore sst2 (as it is older) // Then, read the next page. if ((in1 <= in2 && num_read1 < size1) || num_read2 >= size2) { // Read in1 if in1 <= in2 or sst2 has already been fully read if (in1 == in2 && num_read2 < size2) { // If equal, we need to skip over input2 and read the next input from SST2 num_read2 += 1; num_repeats++; offset2 += sizeof(int) * 2; pread(fd2, &in2, sizeof(int), offset2); } pread(fd1, &in1, sizeof(int), offset1 + sizeof(int)); num_read1 += 1; offset1 += sizeof(int) * 2; if (num_read1 < size1) { pread(fd1, &in1, sizeof(int), offset1); } } else if ((in1 > in2 && num_read2 < size2) || num_read1 >= size1) { // Read in2 if in2 < in1 or sst1 has already been fully read pread(fd2, &in2, sizeof(int), offset2 + sizeof(int)); num_read2 += 1; offset2 += sizeof(int) * 2; if (num_read2 < size2) { pread(fd2, &in2, sizeof(int), offset2); } } } return num_repeats; } // Compacts the SSTs with IDs sst_num1 and sst_num2 and returns the sst_num of the highest compacted SST // sst_num1 is on level i, sst_num2 is on level i + 1, they get merged to level i + 2 // NOTE: always pass the younger SST (the one on the lower level) as sst_num1 // If temp is > 0, then we have recursively called compact while merging from a lower layer // If temp = 0, then we compact sst_num1 and sst_num2 // If temp = 1 then the first SST to merge is temp1.bin // If temp = 2 then the first SST to merge is is temp2.bin // The need for two temps is because we may need to continuously merge upwards int DB::compact_sst(int sst_num1, int sst_num2, int has_temp, int temp_size) { // Variables needed for compaction int size1; if (has_temp == 0) { size1 = get_SST_size(sst_num1); } else {size1 = temp_size; } int size2 = get_SST_size(sst_num2); int new_size = size1 + size2; int page1 = DB::mem_table->get_leaf_page(size1); int page2 = DB::mem_table->get_leaf_page(size2); int offset1 = PAGE_SIZE * page1; int offset2 = PAGE_SIZE * page2; int num_read1 = 0, num_read2 = 0; // The number of elements read from each SST bool final_out = false; // Is this the final call that outputs sst file int final_level = -1; std::vector Bfilter; // Names of the SST files std::string sst1_name; std::string sst2_name = (DB::name + "/sst" + std::to_string(sst_num2) + ".bin").c_str(); std::string sst_out_name; if (has_temp == 0) { sst1_name = (DB::name + "/sst" + std::to_string(sst_num1) + ".bin").c_str(); DB::mem_table->update_size(sst_num1, 0); // we are merging this one upwards, so it is now 0 sized } else if (has_temp == 1) { sst1_name = (DB::name + "/temp1.bin").c_str(); } else { sst1_name = (DB::name + "/temp2.bin").c_str(); } // If there is already an SST on the level we want to sort-merge to, we need to create a temp // file to hold the sort-merged SST before sort-merging it on the next level int temp_file_num = 0; bool need_temp = false; int fd_new; if (std::filesystem::exists((DB::name + "/sst" + std::to_string(sst_num2 + 1) + ".bin").c_str()) != 0) { need_temp = true; if (has_temp == 1) { temp_file_num = 2; } else { temp_file_num = 1; } sst_out_name = (DB::name + "/temp" + std::to_string(temp_file_num) + ".bin").c_str(); } else { final_out = true; final_level = sst_num2 + 1; Bfilter.assign(get_bloom_size(final_level), false); sst_out_name = (DB::name + "/sst" + std::to_string(sst_num2 + 1) + ".bin").c_str(); } fd_new = open(sst_out_name.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0777); int fd1 = open((sst1_name).c_str(), O_RDONLY); int fd2 = open((sst2_name).c_str(), O_RDONLY); // Buffer related variables int kvpair_buffer[2]; // buffer to hold current KV pair being read in int in1; int in2; pread(fd1, &in1, sizeof(int), offset1); pread(fd2, &in2, sizeof(int), offset2); int out [B * 2]; // Output buffer size B as there are B leaves per page int curr_out = 0; // The current size of the output buffer int num_repeats = DB::mem_table->count_num_repeats(size1, size2, fd1, fd2, in1, in2); new_size -= num_repeats; // Variables needed for constructing the BTree in the merged SST int out_leaf_page = DB::mem_table->get_leaf_page(new_size); // The location where we start writing the leaf pages in the new SST int cur_out_page = 0; int num_leaf_pages = ceil((float) (new_size) / (float)B); int max_keys [num_leaf_pages + 1]; // the max value of each leaf page, so that we can assign internal nodes later int max_offsets [num_leaf_pages + 1]; // the page where each value in max_keys is contained (then do offset * PAGE_SIZE on retrieval) int cur_idx = 0; // index for max keys and max offsets int num_layers = ceil(log(new_size) / log(B)); // O(log_B(max_size)) internal node layers int num_pages [num_layers]; // Store the number of pages needed in each layer, starting with leaves and going up num_pages[0] = num_leaf_pages; int no_idx = INT_MIN; // Count the number of pages that we need on each layer for (int i = 1; i < num_layers; i++) { // Check if we have already found a layer with the root node if (num_pages[i-1] == 1) break; num_pages[i] = ceil(float(num_pages[i-1]) / float(B)); } int nodes_per_page = B; while (num_read1 < size1 || num_read2 < size2) { // Reads the next key into the buffer if we are out of data in the page. // INT_MIN means nothing is there, need to move to the next page to find more data. if (in1 == INT_MIN && num_read1 < size1) { page1 += 1; offset1 = page1 * PAGE_SIZE; pread(fd1, &in1, sizeof(int), offset1); } if (in2 == INT_MIN && num_read2 < size2) { page2 += 1; offset2 = page2 * PAGE_SIZE; pread(fd2, &in2, sizeof(int), offset2); } // Compare the keys and put the lesser one into the output buffer. If they are equal, we ignore sst2 (as it is older) // Then, read the next page. if ((in1 <= in2 && num_read1 < size1) || num_read2 >= size2) { // Read in1 if in1 <= in2 or sst2 has already been fully read if (in1 == in2 && num_read2 < size2) { // If equal, we need to skip over input2 and read the next input from SST2 num_read2 += 1; offset2 += sizeof(int) * 2; pread(fd2, &in2, sizeof(int), offset2); } out[curr_out * 2] = in1; // Key if (final_out){ std::vector hashes = bloom_hash(final_level, in1); for (uint32_t h : hashes){ Bfilter[h] = true; } // hash in1 } pread(fd1, &in1, sizeof(int), offset1 + sizeof(int)); out[curr_out * 2 + 1] = in1; // Value curr_out += 1; num_read1 += 1; offset1 += sizeof(int) * 2; if (num_read1 < size1) { pread(fd1, &in1, sizeof(int), offset1); } } else if ((in1 > in2 && num_read2 < size2) || num_read1 >= size1) { // Read in2 if in2 < in1 or sst1 has already been fully read out[curr_out * 2] = in2; if (final_out){ std::vector hashes = bloom_hash(final_level, in2); for (uint32_t h : hashes){ Bfilter[h] = true; } } pread(fd2, &in2, sizeof(int), offset2 + sizeof(int)); out[curr_out * 2 + 1] = in2; curr_out += 1; num_read2 += 1; offset2 += sizeof(int) * 2; if (num_read2 < size2) { pread(fd2, &in2, sizeof(int), offset2); } } // Flush the buffer if it is full (also need to be aware of the splitting at the end) if ((curr_out >= B) || (cur_out_page == num_leaf_pages - 2 && new_size % B != 0 && new_size % B < ceil(float(B) / float(2)) && curr_out >= ceil(float(B) / float(2)))) { // Store max keys and max offsets max_keys[cur_idx] = out[curr_out * 2 - 2]; max_offsets[cur_idx] = out_leaf_page + cur_out_page; cur_idx += 1; // Flush pwrite(fd_new, out, sizeof(int) * curr_out * 2, (out_leaf_page + cur_out_page) * PAGE_SIZE); // Need to fill the next slot with INT_MIN pwrite(fd_new, &no_idx, sizeof(int), (out_leaf_page + cur_out_page) * PAGE_SIZE + (curr_out * 2) * sizeof(int)); cur_out_page += 1; curr_out = 0; } } // If there is still data in the buffer, flush it if (curr_out != 0) { // Store max keys and max offsets max_keys[cur_idx] = out[curr_out * 2 - 2]; max_offsets[cur_idx] = out_leaf_page + cur_out_page; cur_idx += 1; // Flush pwrite(fd_new, out, sizeof(int) * curr_out * 2, (out_leaf_page + cur_out_page) * PAGE_SIZE); pwrite(fd_new, &no_idx, sizeof(int), (out_leaf_page + cur_out_page) * PAGE_SIZE + (curr_out * 2) * sizeof(int)); cur_out_page += 1; curr_out = 0; } // Remake the internal nodes // Loop through the remaining layers to write their internal nodes int cur_psize; int cur_page; int offset; for (int i = 1; i < num_layers; i++) { if (num_pages[i] == 0) break; // no elements in this layer // Get the layer offset if (num_pages[i] == 1) { offset = 0; } else { offset = array_sum(num_pages, i+1, num_layers); } cur_psize = 0; cur_page = 0; nodes_per_page = B-1; // Write to the SST for (int j=0; j < num_pages[i-1]; j++) { // Check if we're on the second last page and if we need to split it to avoid inbalance if (cur_page == num_pages[i] - 2 && (num_pages[i-1] % (B) != 0) && ((num_pages[i-1] % (B)) < (B)/2)) { nodes_per_page = ceil(float(B) / float(2)); } // If we have filled up this page, move to write to the next page if (cur_psize >= nodes_per_page || j == num_pages[i-1] - 1 || j == num_pages[i-1]-1) { // Write the offset as we need it to access this last node pwrite(fd_new, &max_offsets[j], sizeof(int), (offset * PAGE_SIZE) + (PAGE_SIZE * cur_page) + (cur_psize * sizeof(int) * 2) + sizeof(int)); pwrite(fd_new, &cur_psize, sizeof(int), (offset * PAGE_SIZE) + (PAGE_SIZE * cur_page)); // Also set the max value of this internal node to its rightmost max value max_keys[cur_page] = max_keys[j]; max_offsets[cur_page] = cur_page + offset; cur_page ++; cur_psize = 0; } else { // Write to (Start of internal node pages + page offset + element offset) // Write alternating offsets and keys pwrite(fd_new, &max_offsets[j], sizeof(int), (offset * PAGE_SIZE) + (PAGE_SIZE * cur_page) + (cur_psize * sizeof(int) * 2) + sizeof(int)); pwrite(fd_new, &max_keys[j], sizeof(int), (offset * PAGE_SIZE) + (PAGE_SIZE * cur_page) + (cur_psize * sizeof(int) * 2) + sizeof(int) * 2); cur_psize++; } } } // If we have a temp file, we need to merge upwards again with tempfile and (sst2 + 1).bin int rv; if (need_temp) { rv = compact_sst(-1, sst_num2 + 1, temp_file_num, new_size); // merge temp sst and sst_num2+1 sst } else { rv = sst_num2 + 1; } if (final_out) write_bloom_filters(Bfilter, final_level); close(fd1); close(fd2); close(fd_new); std::string fp_bf1 = DB::name + "/bloomFilter" + std::to_string(sst_num1) + ".bin"; std::string fp_bf2 = DB::name + "/bloomFilter" + std::to_string(sst_num2) + ".bin"; if (std::filesystem::exists(fp_bf1)) std::remove(fp_bf1.c_str()); // Delete bloom filter file for sst1 if (std::filesystem::exists(fp_bf2)) std::remove(fp_bf2.c_str()); // Delete bloom filter file for sst2 std::remove(sst1_name.c_str()); // Delete sst1 file std::remove(sst2_name.c_str()); // Delete sst2 file DB::mem_table->update_size(sst_num2, 0); // we merged this one upwards as well, so nothing is left here DB::mem_table->update_size(sst_num2 + 1, new_size); // the sort-merged file on the new level that we created buffer->emptyBuffer(); // clear the buffer, to ensure that there are no stale pages within it return rv; } void DB::write_bloom_filters(const std::vector& data, int lvl) { std::string filename = name + "/bloomFilter" + std::to_string(lvl) + ".bin"; std::ofstream outFile(filename, std::ios::binary); if (!outFile.is_open()) { std::cerr << "Error opening file for writing." << std::endl; return; } // Calculate the number of bytes needed (ceil) std::size_t byteCount = (data.size() + 7) / 8; // Write the data to the file, packing bits into bytes for (std::size_t i = 0; i < byteCount; ++i) { char byte = 0; for (std::size_t j = 0; j < 8 && i * 8 + j < data.size(); ++j) { if (data[i * 8 + j]) byte |= (1 << j); } outFile.put(byte); } outFile.close(); } int DB::bloom_filter_id(int num1, int num2){ return - (pow(2, num1) * ceil((DB::mem_table->max_size) * 0.001) + num2); // magic, empirically impossible to overflow } bool DB::check_bloom_filters(int sst_num, int key) { std::vector hashes = bloom_hash(sst_num, key); for (uint32_t h : hashes) { int page = h / (PAGE_SIZE*8); int page_id = bloom_filter_id(sst_num, page); int page_offset = h % (PAGE_SIZE*8); char* data; data = buffer->getPage(page_id); if (data == nullptr){ posix_memalign(reinterpret_cast(&data), PAGE_SIZE, PAGE_SIZE); // Allocate 4KB of page-alligned memory for new page // Read in page int fd = open((DB::name + "/bloomFilter" + std::to_string(sst_num) + ".bin").c_str(), O_DIRECT); pread(fd, data, PAGE_SIZE, (h / (PAGE_SIZE*8)) * PAGE_SIZE); close(fd); buffer->insertPage(page_id, data); } // read the actual bit std::size_t byteIndex = page_offset / 8; std::size_t bitIndex = page_offset % 8; // Extract the bit using bitwise operations char byte = data[byteIndex]; if ((byte & (1 << bitIndex)) == 0) return false; } return true; } // Flushes the memtable to disk by converting it to a SST. // Return value is the sst_num of the SST created by flushing to disk, after compaction (if needed) has been performed. int DB::transform_to_sst() { if(mem_table->cur_size == 0) return 0; // If memtable is empty, there is nothing to flush. // Create struct perfectly size-alligned to memtable kv_pairs *scanned_kv = new kv_pairs; scanned_kv->kv = (int**) malloc(sizeof(int*) * DB::mem_table->cur_size); scanned_kv->kv_size = 0; DB::mem_table->scan_node(DB::mem_table->root, 0, 0, scanned_kv, true); // Scan the full memtable // The max size of the directory when we transform the SST max_size = scanned_kv->kv_size; // Write the memtable to a file mkdir(DB::name.c_str(), 0777); int fd; std::string sst0 = (DB::name + "/sst0.bin").c_str(); std::string sst_name; bool to_compact = false; // If sst0.bin does not exist, increment sst_counter and put this into sst0 if (std::filesystem::exists(sst0.c_str()) == 0) { std::vector filter = create_bloom_filter(0, scanned_kv); write_bloom_filters(filter, 0); sst_name = (DB::name + "/sst0.bin").c_str(); // Store the number of leaves that are being transformed to SST fd = open((DB::name + "/sizes.bin").c_str(), O_RDWR | O_CREAT, 0777); pwrite(fd, &(scanned_kv->kv_size), sizeof(int), 0); // because this is SST 0, we start at offset 0 close(fd); } else { // If sst0.bin already exists, we need to create a temp file and then merge them together to new file sst1.bin sst_name = (DB::name + "/temp1.bin"); to_compact = true; } fd = open(sst_name.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0777); // open binary file sst.bin // Converts to a B-Tree format before storing it into the STT // Counting the number of elements per layer int num_layers = ceil(log(max_size) / log(B)); // O(log_B(max_size)) internal node layers int num_pages [num_layers]; // Store the number of pages needed in each layer, starting with leaves and going up int offset = 0; int num_leaf_pages = ceil((float)max_size / (float)B); num_pages[0] = num_leaf_pages; int max_keys [num_leaf_pages + 1]; // the max value of each leaf page, so that we can assign internal nodes later int max_offsets [num_leaf_pages + 1]; // the page where each value in max_keys is contained (then do offset * PAGE_SIZE on retrieval) // Count the number of pages that we need on each layer for (int i = 1; i < num_layers; i++) { // Check if we have already found a layer with the root node if (num_pages[i-1] == 1) break; num_pages[i] = ceil(float(num_pages[i-1]) / float(B)); offset += num_pages[i]; } int cur_psize = 0; // current size of the page (number of kv pairs currently in the page) int cur_page = 0; // the leaf page we are currently writing to int nodes_per_page = B; // the maximum number of nodes we can write to the page // Iterate through leaves and write them to their proper page with the proper offset for (int i = 0; i < scanned_kv->kv_size; i++) { // If the last page needs to be split (to avoid being unbalanced), then we only write B/2 elements on the second last page if (cur_page == num_leaf_pages - 2 && (max_size % B != 0) && ((max_size % B) < ceil(float(B)/float(2)))) { nodes_per_page = ceil(float(B) / float(2)); } // Write to (Start of leaf pages + page offset + element offset) pwrite(fd, scanned_kv->kv[i], sizeof(int) * 2, (offset * PAGE_SIZE) + (PAGE_SIZE * cur_page) + (cur_psize * sizeof(int) * 2)); cur_psize++; // // If we have filled up this page, move to write to the next page and store the max value and its offset to use in internal nodes if (cur_psize >= nodes_per_page) { max_keys[cur_page] = scanned_kv->kv[i][0]; max_offsets[cur_page] = offset + cur_page; // Write INT_MIN to indicate that we have filled up the page (if we have not entered the next page yet) if ((cur_psize * sizeof(int) * 2) % PAGE_SIZE != 0) { int no_key = NO_KEY; pwrite(fd, &(no_key), sizeof(int), offset * PAGE_SIZE + cur_page * PAGE_SIZE + (cur_psize - 1) * sizeof(int) * 2 + sizeof(int) * 2); } cur_page++; cur_psize = 0; } else if (i == scanned_kv->kv_size -1) { // last element, also add it to the max_keys and offset max_keys[cur_page] = scanned_kv->kv[i][0]; max_offsets[cur_page] = offset + cur_page; if ((cur_psize * sizeof(int) * 2) % PAGE_SIZE != 0) { int no_key = NO_KEY; pwrite(fd, &(no_key), sizeof(int), offset * PAGE_SIZE + cur_page * PAGE_SIZE + (cur_psize - 1) * sizeof(int) * 2 + sizeof(int) * 2); } } delete[] scanned_kv->kv[i]; } // Loop through the remaining layers to write their internal nodes for (int i = 1; i < num_layers; i++) { if (num_pages[i] == 0) break; // no elements in this layer // Get the layer offset if (num_pages[i] == 1) { offset = 0; } else { offset = array_sum(num_pages, i+1, num_layers); } cur_psize = 0; cur_page = 0; nodes_per_page = B-1; // Write to the SST for (int j=0; j < num_pages[i-1]; j++) { // Check if we're on the second last page and if we need to split it to avoid inbalance if (cur_page == num_pages[i] - 2 && (num_pages[i-1] % (B) != 0) && ((num_pages[i-1] % (B)) < (B)/2)) { nodes_per_page = ceil(float(B) / float(2)); } // If we have filled up this page, move to write to the next page if (cur_psize >= nodes_per_page || j == num_pages[i-1] - 1 || j == num_pages[i-1]-1) { // Write the offset as we need it to access this last node pwrite(fd, &max_offsets[j], sizeof(int), (offset * PAGE_SIZE) + (PAGE_SIZE * cur_page) + (cur_psize * sizeof(int) * 2) + sizeof(int)); pwrite(fd, &cur_psize, sizeof(int), (offset * PAGE_SIZE) + (PAGE_SIZE * cur_page)); // Also set the max value of this internal node to its rightmost max value max_keys[cur_page] = max_keys[j]; max_offsets[cur_page] = cur_page + offset; cur_page ++; cur_psize = 0; } else { // Write to (Start of internal node pages + page offset + element offset) // Write alternating offsets and keys pwrite(fd, &max_offsets[j], sizeof(int), (offset * PAGE_SIZE) + (PAGE_SIZE * cur_page) + (cur_psize * sizeof(int) * 2) + sizeof(int)); pwrite(fd, &max_keys[j], sizeof(int), (offset * PAGE_SIZE) + (PAGE_SIZE * cur_page) + (cur_psize * sizeof(int) * 2) + sizeof(int) * 2); cur_psize++; } } } int rv = 0; // If sst0.bin exists, we need to do a compaction if (to_compact) rv = compact_sst(-1, 0, 1, scanned_kv->kv_size); close(fd); DB::mem_table->cur_size = 0; free(scanned_kv->kv); delete(scanned_kv); DB::mem_table->deleteTree(DB::mem_table->root); return rv; } TreeNode::TreeNode(int key, int val) : key(key), value(val), height(1), left(nullptr), right(nullptr) {} AVLTree::AVLTree(int max_size, std::string db_name): root(nullptr), max_size(max_size), cur_size(0), db_name(db_name) {} // public methods int AVLTree::put(int key, int value) { root = insert(root, key, value); return 0; } int AVLTree::get(int key) { return get_node(root, key); } struct kv_pairs* AVLTree::scan(int key1, int key2){ kv_pairs *scanned_kv = new kv_pairs; scanned_kv->kv = (int**) malloc(sizeof(int*) * max_size); // initially sized to fit max possible number of pairs in the memtable scanned_kv->kv_size = 0; scan_node(root, key1, key2, scanned_kv, false); // Recursive scan of tree using our helper function return scanned_kv; } // private methods void AVLTree::scan_node(TreeNode* node, int key1, int key2, struct kv_pairs *scanned_kv, bool fullscan) { if (node == nullptr) return; // Scan left side if ((key1 < node->key) || fullscan) scan_node(node->left, key1, key2, scanned_kv, fullscan); // If we are not doing a full scan of the memtable, and we are in range, add to kv pairs and increment size // If we are doing a fullscan, we always add the kv pair regardless of the range if ((!fullscan && node->key >= key1 && node->key <= key2) || fullscan) { int * pair; pair = new int [2]; pair[0] = node->key; pair[1] = node->value; scanned_kv->kv[scanned_kv->kv_size] = pair; scanned_kv->kv_size += 1; } // Scan right side if ((key2 > node->key) || fullscan) scan_node(node->right, key1, key2, scanned_kv, fullscan); } int AVLTree::getHeight(TreeNode* node) { if (node == nullptr) return 0; return node->height; } int AVLTree::getBalanceFactor(TreeNode* node) { if (node == nullptr) return 0; return getHeight(node->left) - getHeight(node->right); } TreeNode* AVLTree::rotateRight(TreeNode* node) { /* node / x / \ z y */ TreeNode* x = node->left; TreeNode* y = x->right; x->right = node; node->left = y; node->height = std::max(getHeight(node->right), getHeight(node->left)) + 1; x->height = std::max(getHeight(x->right), getHeight(x->left)) + 1; return x; } TreeNode* AVLTree::rotateLeft(TreeNode* node) { /* node \ x / \ z y */ TreeNode* x = node->right; TreeNode* z = x->left; x->left = node; node->right = z; node->height = std::max(getHeight(node->right), getHeight(node->left)) + 1; x->height = std::max(getHeight(x->right), getHeight(x->left)) + 1; return x; } // insert into the tree recursively TreeNode* AVLTree::insert(TreeNode* node, int key, int val) { if (node == nullptr) { cur_size ++; return new TreeNode(key, val); } if (key < node->key) { node->left = insert(node->left, key, val); } else if (key > node->key) { node->right = insert(node->right, key, val); } else{ node->value = val; // value updated for the key return node; } // new node is created node->height = std::max(getHeight(node->right), getHeight(node->left)) + 1; // check current balance int balanceFactor = getBalanceFactor(node); if (balanceFactor < -1 && getBalanceFactor(node->right) <= 0) return rotateLeft(node); if (balanceFactor > 1 && getBalanceFactor(node->left) >= 0) return rotateRight(node); if (balanceFactor < -1 && getBalanceFactor(node->right) > 0) { node->right = rotateRight(node->right); return rotateLeft(node); } if (balanceFactor > 1 && getBalanceFactor(node->left) < 0) { node->left = rotateLeft(node->left); return rotateRight(node); } return node; } // get from tree int AVLTree::get_node(TreeNode* node, int k) { if (node == nullptr) return NO_KEY; if (k < node->key) return get_node(node->left, k); if (k > node->key) return get_node(node->right, k); return node->value; } void AVLTree::deleteTree(TreeNode* node){ if (node == nullptr) return; deleteTree(node->left); deleteTree(node->right); delete(node); return; } void AVLTree::freeKVpairs(kv_pairs* p) { for(int i = 0; i < p->kv_size; i++){ delete[] p->kv[i]; } free(p->kv); delete(p); }