# External Dependencies: import numpy as np from numpy.random import exponential import scipy.stats from scipy import Infinity from scipy.sparse import coo_matrix # From Python stdlib: from heapq import heappush, heappop, heapify # https://docs.python.org/3/library/heapq.html import bisect import csv import os from empirical_distribution_outdegree import sample_outdegree, sample_empirical_outdegree """ This algorithm generates a network according to a preferential attachment mechanism with fitness and aging. It uses the continuous-time branching process (CTBP) formulation of Garavaglia2019 that grows a tree which is finally collapsed into a graph. Every node in the tree is a stochastic process with jump times generated by update_fn, dependent on the in-degree and age of the node. When the tree is fully generated, it can be collapsed into a graph as described in Garavaglia2019. To do this correctly birth order should be kept for each node. This is a list of nodes ordered by the earliest upcoming birth times in the tree. The python implementation uses a number of optimizations that greatly improve speed on large graphs: - A heap (with Heapq) is used to maintain the birth order. TreeNode implements __lt__ to do a sorted insert into the heap. This makes the algorithm scale loglinear O(nlog(M)) in the number of edges M. """ class TreeNode: # __slots__ saves us a lot of RAM when the graph is very large. # Otherwise there is the overhead of Dict: ~100B per graph edge or tree node. # See: https://book.pythontips.com/en/latest/__slots__magic.html __slots__ = ("graph_idx", "arrival_time", "stationary", "rescaled", "next_arrival", "fitness", "pa_b", "num_children") def __init__(self, graph_idx, fitness, pa_b, arrival_time=0.0, stationary=0.0, rescaled=0.0, num_children=0): self.graph_idx = graph_idx # The graph node this node belongs to in the collapsed tree self.arrival_time = arrival_time # When was the node created (in (calendar) rescaled time) self.stationary = stationary # Current stationary time (age without age effect) self.rescaled = rescaled # Rescaled time of the process (age in calender time) self.fitness = fitness # fitness self.pa_b = pa_b # the b parameter in the preferential attachment fucntion "ak + b" self.num_children = num_children # Number of children in the BP tree. def update(self, t, rescaled): self.stationary = t self.next_arrival = self.arrival_time + rescaled def __lt__(self, other): # Required for heapq.heapify() return self.next_arrival < other.next_arrival ''' Returns the appropriate rate function out of 8 possible options. Note that if fitness is disabled, all eta's will be 1, so they will not break the method. @param: pa: (a,b) preferential attachment parameters (f(k)=ak + b). If None, no Preferential Attachment is used. @param: age: (G_mu, G_sigma) lognormal rescaling . If None, no Age is used @returns the rate function ''' def get_rate_function(pa_a=1, age=(1, 1)): if age is None: if pa_a is None: def inner_fn(k, t, eta, pa_b): rate = eta dt = np.random.exponential(scale=rate ** -1) t_new = t + dt return t_new, t_new else: def inner_fn(k, t, eta, pa_b): rate = (pa_a*k + pa_b) * eta dt = np.random.exponential(scale=rate ** -1) t_new = t+dt return t_new, t_new else: # apply age rescaling """This function uses the preferential attachment function to calculate a jump time for the branching process. The rate depends on stationary time t, degree k, and fitness eta. Aging function G(t) gives a time rescaled version of the process (Garavaglia2019, section 1.9). """ # scipy version of LogNormal distribution uses a different parameterization. See: # https://docs.scipy.org/doc/scipy/reference/generated/scipy.stats.lognorm.html #log_normal = scipy.stats.lognorm(G_sigma, scale=scipy.exp(G_mu)) log_normal = scipy.stats.lognorm(age[1], scale=np.exp(age[0])) if pa_a is None: def inner_fn(k, t, eta, pa_b): # next event, no pa rate = eta dt = np.random.exponential(scale=rate ** -1) t_new = t + dt if t_new <= 1.0: rescaled = log_normal.ppf(t_new) else: rescaled = Infinity return t_new, rescaled else: def inner_fn(k, t, eta, pa_b): # next birth event with pa rate = (pa_a*k + pa_b) * eta dt = np.random.exponential(scale=rate ** -1) t_new = t+dt #print("params", k, t, eta, pa_b) #print("time and rate", t_new, rate) # rescale aging effect if t_new <= 1.0: rescaled = log_normal.ppf(t_new) else: rescaled = Infinity return t_new, rescaled return inner_fn class BirthOrder: """ maintains the birth order of the nodes in the BP tree. Has methods to get nodes from different graphidx """ def __init__(self): self.birth_order = [] def non_empty(self) -> bool: return len(self.birth_order) > 0 def add(self, node): bisect.insort(self.birth_order, node) def bulk_add(self, nodes): for node in nodes: bisect.insort(self.birth_order, node) def get_number_of_root_nodes(self): # TODO make this a maintained state variable root_nodes = set() for node in self.birth_order: if node.graph_idx not in root_nodes: root_nodes.add(node.graph_idx) return len(root_nodes) def pop_next_n_distinct_root_nodes(self, n): """ returns the next n distinct root nodes in the birth order. If there are less than n distinct root nodes, returns all of them. """ root_nodes_yielded = set() nodes = [] for node in self.birth_order: if node.graph_idx not in root_nodes_yielded: root_nodes_yielded.add(node.graph_idx) nodes.append(node) if len(nodes) == n: break for node in nodes: self.birth_order.remove(node) return nodes class BranchingProcess: def __init__(self, pa_function, fitness=[], degree=[], pa_b=[]): self.fitness = fitness self.degree = degree self.pa_b = pa_b self.birth_order = BirthOrder() self.graph_edges = [] # edges in the collapsed graph self.graph_nodes = [] # nodes in the collapsed graph self.pa_function = pa_function def alive(self): return self.birth_order.non_empty() def seed(self): root = self.create_bp_node(0, arrival_time=0.) root.update(0,0) self.birth_order.add(root) ''' Generate a graph - optional @param save_to = (fp: str, name: str) @param debug = False; whether to store and print extra information about the simulation. ''' def generate(self, save_to=None, debug=False): if save_to: assert len(save_to) == 2, "to save graph, pass save_to=(filepath, name)" if debug: all_individiuals = [] N = len(self.degree) #initalize the root (this is a dummy node) self.seed() # Main loop of the algorithm. Grow the graph by N nodes. for n in range(N): # print("birthorder", [(x.graph_idx, round(x.next_arrival,2)) for x in self.birth_order.birth_order]) if debug: print(n) # get the M collapsed nodes the new root connects to. if self.alive(): # out degree M = self.degree[n] # get the M collapsed nodes the new root connects to. # if there are less than M nodes, connect to all of them. to_connect_to = self.birth_order.pop_next_n_distinct_root_nodes(M) # print([(x.graph_idx, round(x.next_arrival,2)) for x in to_connect_to]) # print("birthorder", [(x.graph_idx, round(x.next_arrival,2)) for x in self.birth_order.birth_order]) # the collapsed-node arrival time is the arrival time of its last out-degree child name_node_arrival_time = to_connect_to[0].next_arrival else: return False # events that need to be rescheduled after the new node is added to_be_rescheduled = [] # Add the new node to collapsed graph registry self.graph_nodes.append((n, name_node_arrival_time)) for parent in to_connect_to: # The algorithm stops if all nodes in the branching tree have died out. """ A new node is added as child to a BP node with the earliest arrival time. After adding the child, both parent and child calculate new arrival times and update the birth order heap. """ # Create a child node whose birth time is based on the stationary time of the name node. child = self.create_bp_node(n, arrival_time=name_node_arrival_time) # Update the stationary time of parent and child nodes and update the birth order. # Only alive nodes are added to the birth order (i.e. stationary time < 1) # t is the stationary time, ts is the rescaled stationary time. t, ts = self.pa_function(k=child.num_children, t=child.stationary, eta=child.fitness, pa_b=child.pa_b) child.update(t, ts) if ts < Infinity: # If rescaled time reaches Infinity the node died. to_be_rescheduled.append(child) parent.num_children += 1 t, ts = self.pa_function(k=parent.num_children, t=parent.stationary, eta=parent.fitness, pa_b=parent.pa_b) parent.update(t, ts) if ts < Infinity: # If rescaled time reaches Infinity the node died. to_be_rescheduled.append(parent) if debug: all_individiuals.append((child.graph_idx, parent.next_arrival, child.fitness, child.pa_b, child.arrival_time)) # Add the corresponding edge in the collapsed graph. self.graph_edges.append((child.graph_idx, parent.graph_idx)) # add the birth events for all children and parents # print('reschedule', [(r.graph_idx, round(r.next_arrival,2)) for r in to_be_rescheduled]) self.birth_order.bulk_add(to_be_rescheduled) if save_to: name = save_to[1] fp = save_to[0] self.save_edges(os.path.join(fp,f"{name}_edges.csv")) self.save_nodes(os.path.join(fp,f"{name}_nodes.csv")) if debug: # store the all individual diagnostic print("==========DIAGNOSTIC OUTPUT============") print(f"In total, there were {len(all_individiuals)} born.") print(f"From the inital out_degrees, we expected {sum(self.degree)}") print(f"There were {len(self.graph_edges)} edges.") # filename = f"{name}_all_individuals.csv" # print(f"Saving a log of all individuals to {filename} in {fp}") # header = ["name_node", "actual birth time", "fitness", "PA_B", "stored birth time"] # with open(os.path.join(fp,filename), "w", newline="") as f: # writer = csv.writer(f) # writer.writerow(header) # writer.writerows(all_individiuals) # print("=======================================") # the simulation was succesfull return True def create_bp_node(self, graph_idx, arrival_time=0.0): """Create a new branching process node, corresponding to node graph_idx in the collapsed graph""" return TreeNode( graph_idx=graph_idx, arrival_time=arrival_time, fitness=self.fitness[graph_idx], pa_b=self.pa_b[graph_idx] ) def as_sparse(self): """ Return the collapsed graph as a sparse matrix""" edges = self.graph_edges if len(edges) == 0: return coo_matrix((0, 0)) i, j, t = zip(*edges) ones = [1 for _ in i] N = len(self.graph_nodes) return coo_matrix((ones, (i, j)), shape=(N, N)) def save_edges(self, path): """Save edges to csv""" with open(path, 'w', newline="") as f: writer = csv.writer(f) for edge in self.graph_edges: writer.writerow(edge) def save_nodes(self, path): """Save nodes to csv""" with open(path, 'w', newline="") as f: writer = csv.writer(f) for node in self.graph_nodes: writer.writerow(node) ''' Generate a graph from scratch. All @pre's are asserted in code, so method is reasonably robust. Note that the super-criticality condition is NOT checked. # required arguments @param N - size of the final graph # optional arguments @param pa - preferential attachment parameters - pa = None: no prefferential attachment is used - pa = (a, b): corresponds to preferential attachment sequence f_k = a*k + b @pre: a, b > 0 @param fitness - fitness parameter(s) - fitness = None: no fitness is used (= setting fitness to 1 for each vertex) - fitness = l: an exponentially distributed fitness with mean = 1/l is used/ @pre: l > 0 - fitness = (min, tau): a powerlaw distributed fitness. @pre: min > 0 AND tau > 1 @param aging - aging = None: no aging effect is used - aging = (mu, sigma): lognormal aging rescaling @pre: sigma > 0 ''' def simulator_from_parameters(N=100,sample_outdegree_method=sample_outdegree, pa=None, fitness=None, aging=None): description = f"N={N}, " # Sample the degree sequence node_degree = sample_outdegree_method(N) # Sample the fitness sequence if fitness is None: # No fitness: use a constant fitness of 1. description += f"Fitness=None, " node_fitness = [1.0 for i in range(N)] elif isinstance(fitness, (int, float)): # use Exponential fitness assert fitness > 0, "exponential parameter has to be larger than 0" description += f"Fitness=Exponential(lambda={fitness}), " node_fitness = list(scipy.stats.expon.rvs( scale=1/fitness, size=N)) # this is a very ugly way of doing this but I am not refactoring the code. elif len(fitness) == 3: assert fitness[0] == "uniform", "error in parameters for fitness" assert fitness[1] < fitness[2], "bounds for uniform fitness are incorrect." description += f"Fitness=Uniform({fitness[0]},{fitness[1]}), " node_fitness = list(scipy.stats.uniform.rvs(fitness[1], scale=(fitness[2] - fitness[1]), size=N)) else: description += f"Fitness=Powerlaw(min={fitness[0]}, τ={fitness[1]}), " # Use power-law fitness. # https://stackoverflow.com/questions/17882907/python-scipy-stats-powerlaw-negative-exponent # This results in P(x) = x^-τ, with τ = opts.fitness assert fitness[1] > 1 and fitness[0] > 0, "incorrect powerlaw fitness parameters" xmin = fitness[0] def powerlaw(r): # fitness[1] is the negative exponent! return xmin * (1-r) ** (-1/(fitness[1]-1)) node_fitness = [powerlaw(r) for r in np.random.rand(N)] if aging is None: description += f"Aging=None, " else: assert aging[1] > 0, "σ in lognomral aging cannot be negative" # Use the pa function with lognormal aging description += f"Aging=LogNormal(μ={aging[0]}, σ={aging[1]}), " if pa is None: # No pa: a = 0, b_node = 1/m_i, then the effect cancels out description += f"PA=None." nodes_pa_b = [1/node_degree[i] for i in range(N)] else: assert (pa[0] > 0 and pa[1] > 0), "In f_k = a*k + b. a,b > 0" # No fitness: use a constant fitness of 1. description += f"PA=({pa[0]}*k + {pa[1]})." # distribute the PA_b parameter over the number of nodes nodes_pa_b = [pa[1]/node_degree[i] for i in range(N)] pa = pa[0] # get_rate_function selects the appropriate rate function rate_fn = get_rate_function(pa, aging) # Create a CTBP ctbp = BranchingProcess(rate_fn, fitness=node_fitness,degree=node_degree, pa_b=nodes_pa_b) #print(description) return ctbp if __name__=="__main__": # ctbp = simulator_from_parameters(N=20000, sample_outdegree_method=sample_empirical_outdegree, fitness=0.7, aging=(2, 1)) ctbp = simulator_from_parameters(N=20000, sample_outdegree_method=sample_empirical_outdegree, fitness=0.7) # ctbp = simulator_from_parameters(N=2000, sample_outdegree_method=sample_empirical_outdegree, fitness=(0.5, 2.1), aging=(2, 1)) # ctbp = simulator_from_parameters(N=1000, sample_outdegree_method=sample_empirical_outdegree, pa=(2, 3), fitness=(0.5, 2.1), aging=(2, 1)) if ctbp.generate(debug=False): ctbp.save_edges('ctbp_edges.csv') ctbp.save_nodes('ctbp_nodes.csv') else: print('failed')