Lancelot / src / gpudb / CacheManager.h
CacheManager.h
Raw
#ifndef _CACHE_MANAGER_H_
#define _CACHE_MANAGER_H_

#pragma once

#include "common.h"

#define CUB_STDERR

class Statistics;
class CacheManager;
class Segment;
class ColumnInfo;
class TableInfo;
class priority_stack;

enum ReplacementPolicy {
    LRU, LFU, LFUSegmented, LRUSegmented, Segmented, LRU2, LRU2Segmented
};

enum Distribution {
    None, Zipf, Norm
};

class Statistics{
public:
	Statistics() {
		col_freq = 0;
		timestamp = 0;
		speedup = 0;
		backward_t = 0;
		// real_timestamp = 0;
	};
	double col_freq;
	double timestamp;
	double speedup;
	double backward_t;
	// double real_timestamp;
};

class Segment {
public:
	Segment(ColumnInfo* _column, int* _seg_ptr, int _priority);
	Segment(ColumnInfo* _column, int* _seg_ptr);
	ColumnInfo* column;
	int segment_id;
	int* col_ptr; //ptr to the beginning of the column
	int* seg_ptr; //ptr to the beginning of the segment
	int priority;
	int seg_size;
	double weight;
	double repl_weight;

	Statistics* stats;
};

class ColumnInfo{
public:
	ColumnInfo(TableInfo* _table, string _column_name, int _column_id, int* _col_ptr);
	Statistics* stats;
	string column_name;
	string table_name;
	int table_id;
	int LEN;
	int column_id;
	int column_table_id;
	int* col_ptr; //ptr to the beginning of the column
	int* seg_ptr; //ptr to the last segment in the column
	int tot_seg_in_GPU; //total segments in GPU (based on current weight) (this is regardless of replication, which means that even if the segment is replicated, it is still counted as one segment)
	double weight;
	int total_segment;
	TableInfo* table;

	Segment* getSegment(int index);
};

class TableInfo{
public:
	TableInfo(string _table_name, int _LEN, int _table_id);
	string table_name;
	int LEN;
	int table_id;
	int total_segment;
	vector<ColumnInfo*> columns;
};

class priority_stack {
public:
	vector<Segment*> stack;
    bool empty() { return stack.size()==0; } 
    void push(Segment* x) { //const so that it can't be modified, passed by reference so that large object not get copied 
        stack.push_back(x);
        percolateDown();
    } 
    void pop() {
        if (!empty())
            stack.resize(stack.size()-1);
    }
    Segment* top() { 
        if (!empty()) 
        	return stack[stack.size()-1]; 
        else
        	return NULL;
    }
    void percolateDown() {
        for (int i=stack.size()-1; i>0; i--)
            if (stack[i]->priority > stack[i-1]->priority)
                swap(stack[i-1], stack[i]);
    }
    void percolateUp() {
    	for (int i=0; i<stack.size()-1; i++)
    		if (stack[i]->priority < stack[i+1]->priority)
    			swap(stack[i], stack[i+1]);
    }
    vector<Segment*> return_stack() {
    	return stack;
    }
};

class CacheManager {
public:
	int** gpuCache; //each gpu has one
	int** gpuBroadcast; //each gpu has one
	uint64_t** gpuProcessing, *cpuProcessing, *pinnedMemory; //each gpu has one
	unsigned int* gpuPointer; //each gpu has one
	unsigned int* broadcastPointer;
	unsigned int cpuPointer, pinnedPointer; 
	int cache_total_seg, cache_each_tot_seg; //total cache size and total cache size for each gpu (in segments)
	size_t tot_cache_size, each_cache_size;
	size_t tot_processing_size, each_processing_size, pinned_memsize;
	size_t each_broadcast_size, broadcast_each_tot_seg;
	int TOT_COLUMN;
	int TOT_TABLE;
	vector<ColumnInfo*> allColumn;
	vector<TableInfo*> allTable;

	int seg_idx_min; //store the segment id which hold threshold for zipf distribution

	vector<queue<int>> empty_gpu_segment; //free list of empty segment for each gpu
	int*** segment_list; //list of segment indexes for each column for each GPU (the index is a local index)
	vector<unordered_map<Segment*, int>> cache_mapper; //map segment to index in each GPU (the index is a local index)
	vector<vector<Segment*>> index_to_segment; //track which segment has been created from a particular segment id (this index is segment id and not global or local index)
	unsigned char** segment_bitmap; //bitmap to store information which segment is in which GPU 00001001 means that the segment is in GPU 5 and GPU 8
	vector<vector<vector<int>>> segment_row_to_gpu; //for each segment row for each table, which gpu does it belong to
	int** seg_is_replicated; //for each segment row for each table, check if segment is replicated
	int** seg_row_to_single_gpu; //for each segment row for each table, which gpu does it belong to (but only store one GPU per segment)
	unsigned int*** gpu_segment_row_bitmap; //bitmap to tell for each gpu, which segment in a segment row in a particular table is in the GPU 00101000 means that segment from column 3 and column 5 in that segment row cached in this GPU

	vector<vector<int>> columns_in_table;
	int** segment_min;
	int** segment_max;

	int *h_lo_orderkey, *h_lo_orderdate, *h_lo_custkey, *h_lo_suppkey, *h_lo_partkey, *h_lo_revenue, *h_lo_discount, *h_lo_quantity, *h_lo_extendedprice, *h_lo_supplycost;
	int *h_c_custkey, *h_c_nation, *h_c_region, *h_c_city;
	int *h_s_suppkey, *h_s_nation, *h_s_region, *h_s_city;
	int *h_p_partkey, *h_p_brand1, *h_p_category, *h_p_mfgr;
	int *h_d_datekey, *h_d_year, *h_d_yearmonthnum;

	TableInfo *lo, *s, *c, *p, *d;
	ColumnInfo *lo_orderkey, *lo_orderdate, *lo_custkey, *lo_suppkey, *lo_partkey, *lo_revenue, *lo_discount, *lo_quantity, *lo_extendedprice, *lo_supplycost;
	ColumnInfo *c_custkey, *c_nation, *c_region, *c_city;
	ColumnInfo *s_suppkey, *s_nation, *s_region, *s_city;
	ColumnInfo *p_partkey, *p_brand1, *p_category, *p_mfgr;
	ColumnInfo *d_datekey, *d_year, *d_yearmonthnum;

	CacheManager(size_t _cache_size, size_t _broadcast_size, size_t _processing_size, size_t _pinned_memsize);

	void resetCache(size_t cache_size, size_t _processing_size, size_t _pinned_memsize);

	~CacheManager();

	void dumpStats(string filename, Distribution dist);

	void assignWeight(ColumnInfo* column, int start_seg, int end_seg, double weight, double repl_weight);

	bool couldReplicateCheck(Segment* seg, double weight, int cache_each_tot_seg, int* temp_buffer_size);

	bool couldReplicateCachedSegmentRow(Segment* seg, double weight, int cache_each_tot_seg, int* temp_buffer_size, map<Segment*, int> map_segment);

	unsigned long long ShuffleAwareDataPlacement();

	unsigned long long PartitionDataPlacement();

	unsigned long long ReplicationDataPlacement();

	void ShuffleAware(Distribution dist, bool opt);

	void PartitioningOnly(Distribution dist);

	void ReplicationOnly(Distribution dist);

	void cacheSegmentInGPU(Segment* seg, int gpu);

	void deleteSegmentInGPU(Segment* seg, int gpu);

	void cacheSegmentMultipleGPU(Segment* seg, vector<int> &gpu_list);

	void updateColumnFrequency(ColumnInfo* column);

	void updateColumnWeight(ColumnInfo* column, int freq, double speedup, double selectivity);

	void updateColumnWeightDirect(ColumnInfo* column, double speedup);

	void updateColumnTimestamp(ColumnInfo* column, double timestamp);

	void updateSegmentTimeDirect(ColumnInfo* column, Segment* segment, double timestamp);

	void updateSegmentWeightDirect(ColumnInfo* column, Segment* segment, double speedup);

	void updateSegmentWeightCostDirect(ColumnInfo* column, Segment* segment, double speedup);

	void updateSegmentFreqDirect(ColumnInfo* column, Segment* segment);

	void loadColumnToCPU();

	void newEpoch(double param = 0.75);

	template <typename T>
	T* customMalloc(int size);

	template <typename T>
	T* customCudaMalloc(int size, int gpu);

	template <typename T>
	T* customCudaHostAlloc(int size);

	void indexTransfer(int** col_idx, ColumnInfo* column, cudaStream_t stream, int gpu, bool custom = true);

	void resetPointer();

	void readSegmentMinMax();

	void deleteAll();
};

#endif