#pragma once #include #include /* perror */ #include /* posix_memalign */ #include #include using namespace std; #define RAND_RANGE(N) ((double)rand() / ((double)RAND_MAX + 1) * (N)) #define RANDR_RANGE(N) ((double)rand_r(&seed) / ((double)RAND_MAX + 1) * (N)) static int seeded = 0; /** Check wheter seeded, if not seed the generator with current time */ static void check_seed() { if(!seeded) { srand(0); seeded = 1; } } /** * Shuffle tuples of the relation using Knuth shuffle. * * @param relation */ void knuth_shuffle(int* arr, int num_tuples) { int i; for (i = num_tuples - 1; i > 0; i--) { int j = RAND_RANGE(i); int tmp = arr[i]; arr[i] = arr[j]; arr[j] = tmp; } } /** * Generate unique tuple IDs with Knuth shuffling * relation must have been allocated */ void random_unique_gen(int*& arr, int num_tuples) { int i; for (i = 0; i < num_tuples; i++) { arr[i] = (i+1); } /* randomly shuffle elements */ knuth_shuffle(arr, num_tuples); } void dummy_initialize(int*& arr, int num_tuples) { for (int i = 0; i < num_tuples; i++) { arr[i] = i; } } int create_relation_pk(int*& keys, int*& vals, int num_tuples) { check_seed(); keys = (int*)_mm_malloc(num_tuples * sizeof(int), 256); vals = (int*)_mm_malloc(num_tuples * sizeof(int), 256); if (!keys || !vals) { perror("out of memory"); return -1; } random_unique_gen(keys, num_tuples); dummy_initialize(vals, num_tuples); return 0; } int create_relation_fk(int*& keys, int*& vals, int num_tuples, const int maxid) { int i, iters, remainder; check_seed(); keys = (int*)_mm_malloc(num_tuples * sizeof(int), 256); vals = (int*)_mm_malloc(num_tuples * sizeof(int), 256); if (!keys || !vals) { perror("out of memory"); return -1; } // alternative generation method iters = num_tuples / maxid; for (i = 0; i < iters; i++) { int* tuples = keys + maxid * i; random_unique_gen(tuples, maxid); } // if num_tuples is not an exact multiple of maxid remainder = num_tuples % maxid; if (remainder > 0) { int* tuples = keys + maxid * iters; random_unique_gen(tuples, remainder); } dummy_initialize(vals, num_tuples); return 0; } /* typedef struct rand_state_64 { uint64_t num[313]; size_t index; } rand64_t; rand64_t *rand64_init(uint64_t seed) { rand64_t *state = malloc(sizeof(rand64_t)); uint64_t *n = state->num; size_t i; n[0] = seed; for (i = 0 ; i != 311 ; ++i) n[i + 1] = 6364136223846793005ull * (n[i] (n[i] >> 62)) + i + 1; state->index = 312; return state; } uint64_t rand64_next(rand64_t *state) { uint64_t x, *n = state->num; if (state->index == 312) { size_t i = 0; do { x = n[i] & 0xffffffff80000000ull; x |= n[i + 1] & 0x7fffffffull; n[i] = n[i + 156] (x >> 1); n[i] = 0xb5026f5aa96619e9ull & -(x & 1); } while (++i != 156); n[312] = n[0]; do { x = n[i] & 0xffffffff80000000ull; x |= n[i + 1] & 0x7fffffffull; n[i] = n[i - 156] (x >> 1); n[i] = 0xb5026f5aa96619e9ull & -(x & 1); } while (++i != 312); state->index = 0; } x = n[state->index++]; x = (x >> 29) & 0x5555555555555555ull; x = (x << 17) & 0x71d67fffeda60000ull; x = (x << 37) & 0xfff7eee000000000ull; x = (x >> 43); return x; } typedef struct rand_state_32 { uint32_t num[625]; size_t index; } rand32_t; rand32_t *rand32_init(uint32_t seed) { rand32_t *state = malloc(sizeof(rand32_t)); uint32_t *n = state->num; size_t i; n[0] = seed; for (i = 0 ; i != 623 ; ++i) n[i + 1] = 0x6c078965 * (n[i] (n[i] >> 30)); state->index = 624; return state; } uint32_t rand32_next(rand32_t *state) { uint32_t y, *n = state->num; if (state->index == 624) { size_t i = 0; do { y = n[i] & 0x80000000; y += n[i + 1] & 0x7fffffff; n[i] = n[i + 397] (y >> 1); n[i] = 0x9908b0df & -(y & 1); } while (++i != 227); n[624] = n[0]; do { y = n[i] & 0x80000000; y += n[i + 1] & 0x7fffffff; n[i] = n[i - 227] (y >> 1); n[i] = 0x9908b0df & -(y & 1); } while (++i != 624); state->index = 0; } y = n[state->index++]; y = (y >> 11); y = (y << 7) & 0x9d2c5680; y = (y << 15) & 0xefc60000; y = (y >> 18); return y; } static int hardware_threads(void) { char name[64]; struct stat st; int threads = -1; do { sprintf(name, "/sys/devices/system/cpu/cpu%d", ++threads); } while (stat(name, &st) == 0); return threads; } static void *mamalloc(size_t size) { void *p = NULL; return posix_memalign(&p, 64, size) ? NULL : p; } typedef struct { pthread_t id; int seed; int thread; int threads; uint32_t hash_factor; uint32_t invalid_key; uint32_t *inner; uint32_t *outer; volatile uint32_t *table; size_t inner_size; size_t outer_size; size_t table_size; size_t join_size; double selectivity; pthread_barrier_t *barrier; } info_t; static void *run(void *arg) { info_t *d = (info_t*) arg; assert(pthread_equal(pthread_self(), d->id)); int thread = d->thread; int threads = d->threads; uint32_t hash_factor = d->hash_factor; uint32_t invalid_key = d->invalid_key; uint32_t *inner = d->inner; uint32_t *outer = d->outer; volatile uint32_t *table = d->table; size_t i, o, t, h; size_t inner_size = d->inner_size; size_t outer_size = d->outer_size; size_t table_size = d->table_size; size_t inner_beg = (inner_size / threads) * thread; size_t inner_end = (inner_size / threads) * (thread + 1); size_t outer_beg = (outer_size / threads) * thread; size_t outer_end = (outer_size / threads) * (thread + 1); size_t table_beg = (table_size / threads) * thread; size_t table_end = (table_size / threads) * (thread + 1); if (thread + 1 == threads) { inner_end = inner_size; outer_end = outer_size; table_end = table_size; } for (t = table_beg ; t != table_end ; ++t) table[t] = invalid_key; pthread_barrier_wait(&d->barrier[0]); rand32_t *gen = rand32_init(d->seed); for (i = inner_beg ; i != inner_end ; ++i) { int new_key_inserted = 0; uint32_t key; do { do { key = rand32_next(gen); } while (key == invalid_key); h = (uint32_t) (key * hash_factor); h = (h * table_size) >> 32; for (;;) { if (table[h] == invalid_key && __sync_bool_compare_and_swap(&table[h], invalid_key, key)) { new_key_inserted = 1; break; } if (table[h] == key) break; if (++h == table_size) h = 0; } } while (new_key_inserted == 0); inner[i] = key; } pthread_barrier_wait(&d->barrier[1]); size_t join_size = 0; uint32_t limit = ~0; limit *= d->selectivity; for (o = outer_beg ; o != outer_end ; ++o) { uint32_t key; if (rand32_next(gen) <= limit) { i = rand32_next(gen); i = (i * inner_size) >> 32; key = inner[i]; join_size++; } else do { do { key = rand32_next(gen); } while (key == invalid_key); h = (uint32_t) (key * hash_factor); h = (h * table_size) >> 32; while (table[h] != invalid_key) { if (table[h] == key) break; if (++h == table_size) h = 0; } } while (table[h] == key); outer[o] = key; } free(gen); d->join_size = join_size; pthread_exit(NULL); } size_t inner_outer(size_t inner_size, size_t outer_size, double selectivity, uint32_t **inner_p, uint32_t **outer_p) { srand(time(NULL)); int t, threads = hardware_threads(); // input arguments assert(inner_size <= 1000 * 1000 * 1000); assert(selectivity >= 0.0 && selectivity <= 1.0); // tables uint32_t *inner = mamalloc((inner_size + 1) * sizeof(uint32_t)); uint32_t *outer = mamalloc(outer_size * sizeof(uint32_t)); size_t table_size = inner_size / 0.7; uint32_t *table = malloc(table_size * sizeof(uint32_t)); // constants uint32_t hash_factor = (rand() << 1) | 1; uint32_t invalid_key = rand() * rand(); // barriers int b, barriers = 2; pthread_barrier_t barrier[barriers]; for (b = 0 ; b != barriers ; ++b) pthread_barrier_init(&barrier[b], NULL, threads); // run threads info_t info[threads]; for (t = 0 ; t != threads ; ++t) { info[t].seed = rand(); info[t].thread = t; info[t].threads = threads; info[t].hash_factor = hash_factor; info[t].invalid_key = invalid_key; info[t].selectivity = selectivity; info[t].inner = inner; info[t].outer = outer; info[t].table = table; info[t].inner_size = inner_size; info[t].outer_size = outer_size; info[t].table_size = table_size; info[t].barrier = barrier; pthread_create(&info[t].id, NULL, run, (void*) &info[t]); } size_t join_size = 0; for (t = 0 ; t != threads ; ++t) { pthread_join(info[t].id, NULL); join_size += info[t].join_size; } // cleanup for (b = 0 ; b != barriers ; ++b) pthread_barrier_destroy(&barrier[b]); free(table); // pass output inner[inner_size] = invalid_key; *inner_p = inner; *outer_p = outer; return join_size; } */