Bouncer / bouncer / linear_sketches / tugofwar.py
tugofwar.py
Raw
from _typeshed import Self
import numpy as np
import math
from typing import List 
from numpy.lib.function_base import median 
from ..utils.utils import nextPrime, LinearSketch
from numba import njit, prange

#F2 estimation 

#space complexity : 18 ln(2/delta)*6/epsilon**2 ** (log(z) + log(p)) bits + 18 ln(2/delta)*6/epsilon**2 ** (4log(p)) bits

# log z < log m < log n 

#F2 linear sketch 
class TugOfWar(LinearSketch):
    # https://cs-web.bu.edu/faculty/homer/537/talks/SarahAdelBargal_UniversalHashingnotes.pdf , hashing algorithm
    def __init__(self, n): # current hash function needs the size of the universe N , add version option? 
        assert(n > 0), "Universe size must be a positive integer"
        self.p = nextPrime(n) # n buckets
        self.s = math.ceil(6/(self.epsilon**2))
        self.t = math.ceil(18 * math.log(2/self.delta))
        self.random_sign_bits = np.random.choice([-1,1], size = (self.t, self.s, self.p)) 
        self.random_bits = np.random.choice(range(self.p), size = (self.t, self.s, 4)) 
        self.X = np.zeros(shape=(self.t, self.s))
        self.hash = lambda row, column, token, count: count * self.random_sign_bits[self.hash_sum(row, column, token)]

    @njit(parallel=True)
    def average(self, row):
        sum = 0
        for i in prange(self.s):
            sum += self.X[row][i] ** 2
        return sum/(self.s)

    @njit(parallel=True)
    def hash_sum(self, row, column, token):
        sum = 0
        for i in prange(4):
            sum += self.random_bits[row][column][i] * (token**i)
        return sum % self.p

    def process(self, token_j, c):
        rows = np.array(range(self.t))
        columns = np.array(range(self.s))
        #rows,columns = np.meshgrid(R,C) # is meshgrid necessary since I am only using the range, I don't think so. 
        self.X += np.array(map(lambda x,y,t,z: self.hash(x,y,t,z) , rows, columns, token_j, c)) # return in the right shape? 
            
    def query(self):
        rows = np.array(range(self.t))
        #rows,columns = np.meshgrid(R,C)
        return median(list(map(lambda x:  self.average(x), rows)))