Grace-Hash-1 / Join.cpp
Join.cpp
Raw
#include "Join.hpp"
#include <functional>
/*
 * TODO: Student implementation
 * Input: Disk, Memory, Disk page ids for left relation, Disk page ids for right relation
 * Output: Vector of Buckets of size (MEM_SIZE_IN_PAGE - 1) after partition
 */
vector<Bucket> partition(
    Disk* disk, 
    Mem* mem, 
    pair<unsigned int, unsigned int> left_rel, 
    pair<unsigned int, unsigned int> right_rel) 
{
    vector<Bucket> buckets;
    for (unsigned int i = 0; i < MEM_SIZE_IN_PAGE - 1; i++) {
        Bucket b(disk);        
        buckets.push_back(b);
    }

    Page* input_buffer = mem->mem_page(MEM_SIZE_IN_PAGE - 1);
    unsigned int partition_num = 0;
    unsigned int page_written = 0;
    unsigned int loop_start = left_rel.first;
    unsigned int loop_end = left_rel.second;
    bool right = 0;
    

    for (int k = 0; k < 2; k++) {       // runs onces for left partition, once for right
        if (right) {                    // false on first loop
            loop_start = right_rel.first;
            loop_end = right_rel.second;
        }
        mem->reset();
        for (unsigned int i = loop_start; i < loop_end; i++) {              // Partitioning LEFT/RIGHT relation
            mem->loadFromDisk(disk, i, MEM_SIZE_IN_PAGE - 1);                                  // load the next page of relation into input_buffer     
            for (unsigned int j = 0; j < input_buffer->size(); j++) {       // iterate records in input_buffer
                partition_num = input_buffer->get_record(j).partition_hash() % (MEM_SIZE_IN_PAGE - 1);  // get this record, hash it, get partition # (1,2,..,(B-1))
                mem->mem_page(partition_num)->loadRecord(input_buffer->get_record(j));  // add this record to the mem buffer it hashed to
                if (mem->mem_page(partition_num)->full()) {                 // check if that load filled the buffer page
                    page_written = mem->flushToDisk(disk, partition_num);
                    if (!right) {       // partitioning left_rel 
                        buckets[partition_num].add_left_rel_page(page_written); // add the full page just written to memory to this partition bucket
                    }
                    else {
                        buckets[partition_num].add_right_rel_page(page_written);
                    }
                }// output flushed or wasn't full, go to next record in input_buffer                
            } // exhausted input_buffer, get new page
        } // end partitioning of this relation
        for (unsigned int n = 0; n < MEM_SIZE_IN_PAGE - 1; n++) {           // flush all memory buffers to disk and update bucket vector before partitioning next relation
            if (mem->mem_page(n)->size() > 0) {                             // writing mem buffers that contain records
                page_written = mem->flushToDisk(disk, n);
                if (!right) {
                    buckets[n].add_left_rel_page(page_written);
                }
                else {
                    buckets[n].add_right_rel_page(page_written);
                }                
            }   
        }        
        right = 1;
    }      
    return buckets;    
}
/*
 * TODO: Student implementation
 * Input: Disk, Memory, Vector of Buckets after partition
 * Output: Vector of disk page ids for join result
 */
vector<unsigned int> probe(Disk* disk, Mem* mem, vector<Bucket>& partitions) {     
    Page* input_buffer = mem->mem_page(MEM_SIZE_IN_PAGE - 1);
    Page* output_buffer = mem->mem_page(MEM_SIZE_IN_PAGE - 2);
    unsigned int input_page_id = MEM_SIZE_IN_PAGE - 1, unsigned int output_page_id = MEM_SIZE_IN_PAGE - 2;
    unsigned int hash_bucket = 0, page_written = 0, left_rel_page_num = 0, right_rel_page_num = 0, smaller_rel = 1; //0-left_rel is smaller. 
    vector<unsigned int> join_res, smaller_rel_vec, larger_rel_vec;                                                 //1-right_rel is smaller or left==right

    for (unsigned int i = 0; i < partitions.size(); i++) {          // Find smaller relation
        left_rel_page_num += partitions[i].get_left_rel().size();
        right_rel_page_num += partitions[i].get_right_rel().size();
    }
    if (left_rel_page_num < right_rel_page_num) {                   // else right_rel is smaller & smaller_rel = 1
        smaller_rel = 0;
    }
    // Build and probe for each partition
    for (unsigned int i = 0; i < partitions.size(); i++) {          // input mem_page was never pushed as a bucket, read all 
        if (smaller_rel == 0) {                                     // start of next partition, build hashtable
            smaller_rel_vec = partitions[i].get_left_rel();
            larger_rel_vec = partitions[i].get_right_rel();
        }
        else {
            smaller_rel_vec = partitions[i].get_right_rel();
            larger_rel_vec = partitions[i].get_left_rel();
        }                
        for (unsigned int b = 0; b < MEM_SIZE_IN_PAGE - 2; b++) {   // reset B-2 buffer pages (hash table) for next hashtable
            mem->mem_page(b)->reset();
        }
        for (unsigned int j = 0; j < smaller_rel_vec.size(); j++) { // iterate this bucket's vector of page_ids and build hashtable from smaller_rel            
            mem->loadFromDisk(disk, smaller_rel_vec[j], input_page_id);                   // load records in disk[page_id] to input_buffer            
            for (unsigned int k = 0; k < input_buffer->size(); k++) {                     // iterate records in input_buffer
                hash_bucket = input_buffer->get_record(k).probe_hash() % (MEM_SIZE_IN_PAGE - 2); // calculate hash of record k
                mem->mem_page(hash_bucket)->loadRecord(input_buffer->get_record(k));      // load each record into corresponding bucket in hashtable                
            } // hashed entire input_buffer
        }// repeat until exhausted all page_id's for smaller_rel in partition i
        // finished building in-memory hashtable for partition i
        // Probe against hashtable for partition i 
        for (unsigned int j = 0; j < larger_rel_vec.size(); j++) {        // iterate this bucket's vector of pages for larger_rel            
            mem->loadFromDisk(disk, larger_rel_vec[j], input_page_id);    // load probing disk page's records into input_buffer, buffer cleared before load
            for (unsigned int k = 0; k < input_buffer->size(); k++) {                     // iterate records in input_buffer                
                hash_bucket = input_buffer->get_record(k).probe_hash() % (MEM_SIZE_IN_PAGE - 2);    // calculate record k's hash
                for (unsigned int n = 0; n < mem->mem_page(hash_bucket)->size(); n++) {   // iterate records in hashed-to bucket                    
                    if (input_buffer->get_record(k) == mem->mem_page(hash_bucket)->get_record(n)) { // check if input_record == hash_table_record
                        output_buffer->loadPair(input_buffer->get_record(k), mem->mem_page(hash_bucket)->get_record(n));    // loadPair to output_buffer
                        if (output_buffer->full()) {
                            page_written = mem->flushToDisk(disk, output_page_id);        // flush output to disk, push page_id written-to to result vector 
                            join_res.push_back(page_written);
                        }
                    }                                                                    
                }                    
            } // done with records in input_buffer, read next probe page_id
        } // done with partition's probing and hashtable
        // if !empty, flush output buffer before repeating for next partition        
    } // outer for
    if (output_buffer->size() > 0) {   
        page_written = mem->flushToDisk(disk, output_page_id); 
        join_res.push_back(page_written);
    }  // only flush a partially-full buffer at the end of 
       // all partitions
    return join_res;
}