Source code for modbp.GenerateGraphs

import numpy as np
import sklearn.metrics as skm
import igraph as ig
import itertools  as it
import scipy.sparse as scispa
import matplotlib.pyplot as plt
import seaborn as sbn


"""Set of wrapper classes for easier access to graph information used by multimodbp"""

class RandomGraph(object):
    """
    Wrapper class for igraph with different accessor methods.
    """
    def __init__(self):
        pass

    def get_adjacency(self):
        return np.array(self.graph.get_adjacency().data)

    def get_edgelist(self):
        return self.graph.get_edgelist()
    @property
    def m(self):
        return self.graph.ecount()

    @property
    def n(self):
        return self.graph.vcount()



class RandomERGraph(RandomGraph):
    """
    Wrapper class to create igraph ER graph
    """

    def __init__(self,n,p):
        self.graph=ig.Graph.Erdos_Renyi(n=n,p=p,directed=False,loops=False)
        super(RandomERGraph,self).__init__()



class RandomSBMGraph(RandomGraph):
    """
    Wrapper class for a realization of non-degree corrected stochastic block model.
    """
    def __init__(self,n,comm_prob_mat,block_sizes=None,graph=None,use_gcc=False):
        """

        :param n: number of communities
        :param comm_prob_mat: Probabilities for node in community i to connect to node in community j
        :param block_sizes: How many nodes are in each community.  len(block_sizes) = num communities and \
        sum(block_sizes) = n
        :param graph: can create object from already existing igraph object or create from scratch given parameters.
        :param use_gcc:  if true, prune nodes node connected to largest component
        """
        if block_sizes is None:
            block_sizes = [int(n / (1.0 * comm_prob_mat.shape[0])) for _ in range(comm_prob_mat.shape[0] - 1)]
            block_sizes += [int(n - np.sum(block_sizes))]  # make sure it sums to one

        if graph is not None:
            self.graph=graph
        else:

            try:
                comm_prob_mat=comm_prob_mat.tolist()
            except TypeError:
                pass
            self.graph = ig.Graph.SBM(n=n, pref_matrix=comm_prob_mat, block_sizes=list(block_sizes), directed=False,loops=False)

        self.use_gcc=use_gcc
        self.block_sizes=np.array(block_sizes,dtype=int)
        self.comm_prob_mat=comm_prob_mat


        #nodes are assigned on bases of block
        block=[]
        for i,blk in enumerate(self.block_sizes):
            block+=[i for _ in range(blk)]
        self.graph.vs['block']=block

        if use_gcc==True: #in this case the block sizes need ot be recalculated
            self.graph=self.graph.components().giant()
            _,cnts=np.unique(self.graph.vs['block'],return_counts=True)
            self.block_sizes=cnts

    @property
    def block(self):
        '''vector denoting the membership of each node'''
        return self.graph.vs['block']

    def get_AMI_with_blocks(self,labels):
        """
        Compare partition (labels) of the nodes with the ground truth of the SBM

        :param labels: a labeling of the nodes
        :type labels: iterable
        :return: AMI
        :rtype: float
        """
        return skm.adjusted_mutual_info_score(labels_pred=labels,labels_true=self.block)

    def get_pin_pout_ratio(self):
        """

        Calculate :math: `\epsilon = \frac{p_{in}}{p_{out}}` for the SBM.

        :return:
        """
        totpin=0
        totpout=0
        for ei,ej in self.get_edgelist():
            if self.graph.vs['block'][ei]!=self.graph.vs['block'][ej]:
                totpout+=1
            else:
                totpin+=1

        #total number of out edges
        pin_possible=np.sum([ bs*(bs-1.0) for bs in self.block_sizes])
        pout_possible=self.n*(self.n-1.0)-pin_possible #all other possible edges have to be external

        return (totpin/pin_possible)/(totpout/pout_possible)

    def get_observed_group_sizes(self):
        """
        Get the block sizes of the underlying graph.  Especially useful when we have taken the GCC \
        so block sizes might have changed from specified parameters.
        :return:
        """
        coms,cnts=np.unique(self.graph.vs['block'],return_counts=True)
        return cnts

    def get_observed_cin_cout(self):
        """

        :return: degree for each of the blocks
        """
        coms,cnts=np.unique(self.graph.vs['block'],return_counts=True)

        ncoms=len(coms)

        totalcnts=np.divide(np.ones((ncoms, ncoms)) * sum(cnts),
                     np.outer(cnts,cnts)-np.diag(cnts)) #N/(N_A*N_B)
        label2num=dict(list(zip(coms,range(ncoms))))
        observed_cnts=np.zeros((ncoms,ncoms))

        for ei, ej in self.get_edgelist():
            ind1=label2num[self.graph.vs['block'][ei]]
            ind2=label2num[self.graph.vs['block'][ej]]
            observed_cnts[ind1,ind2]+=1
            observed_cnts[ind2,ind1]+=1
        return np.multiply(observed_cnts,totalcnts)



    def get_accuracy(self, labels):
        """
        :param labels: labels to compare to known blocks
        :type labels: list
        :return:
        :rtype:
        """
        return skm.accuracy_score(labels, self.block)

[docs]class MultilayerGraph(object): """ Wrapper class for storing a 'multilayer graph' that can be used to call the modularity belief propagation\ A graph here is represented by a collection of igraphs (each one representing a "layer") as well as a set of \ edges between the layers. In this formulation, each node can only be present in a single layer """
[docs] def __init__(self,intralayer_edges,layer_vec,interlayer_edges=None,comm_vec=None, directed=False): """ :param intralayer_edges: list of intralayer edges between the nodes. If intralayer_edges.shape[1] > 2\ intralayer_edges[:,2] is assumed to represent the weights of the edges. Default weight is 1. :param layer_vec: vector denoting layer membership for each edge. Size of network is taken to be\ len(layer_vec) :param interlayer_edges: list of edges across layers. If interlayer_edges.shape[1] > 2\ interlayer_edges[:,2] is assumed to represent the weights of the edges. Default weight is 1. :param comm_vec: Underlying known communitiies of the network. Default is None :param directed: Are intralayer and interlayer edges directed. #TODO: allow one or the other to be directed. """ self.N=len(layer_vec) layers=np.unique(layer_vec) #assure that it is sorted by appearance with elements from 0 to len(nlayers) layer_dict=dict(zip(layers,range(len(layers)))) self.layer_vec=np.array([layer_dict[x] for x in layer_vec]) self.intralayer_edges=intralayer_edges self.is_directed=directed self.unweighted=True #create an vector length zero if interlayer_edges is None: #Assume that it is single layer self.interlayer_edges=np.zeros((0,2),dtype='int') self.interlayer_weights=None else: self.interlayer_edges=interlayer_edges #are interlayer weights presen if len(self.interlayer_edges)>0 and len(self.interlayer_edges[0])>2:#weights are present self.interlayer_weights = [e[2] for e in self.interlayer_edges] self.interlayer_edges = [ (e[0],e[1]) for e in self.interlayer_edges] self.unweighted=False else: self.interlayer_weights=[ 1.0 for _ in range(len(self.interlayer_edges))] #are intralayer weights present if len(self.intralayer_edges)>0 and len(self.intralayer_edges[0]) > 2: # weights are present self._intralayer_weights = [e[2] for e in self.intralayer_edges] self.intralayer_edges = [(e[0], e[1]) for e in self.intralayer_edges] self.unweighted=False else: self._intralayer_weights = [1.0 for _ in range(len(self.intralayer_edges))] if not self.is_directed: self._prune_intra_edges_for_undirected() # make sure each edge is unique self._prune_inter_edges_for_undirected() self.layers=self._create_layer_graphs() self.nlayers=len(self.layers) self.intradegrees=self.get_intralayer_degrees() #by default these are weighted self.interdegrees=self.get_interlayer_degrees() self.intra_edge_counts=self.get_layer_edgecounts() if self.is_directed: self.totaledgeweight=np.sum(self.interdegrees)+np.sum(self.intradegrees) else: self.totaledgeweight=np.sum(self.interdegrees)/2.0+np.sum(self.intradegrees)/2.0 self.comm_vec=comm_vec #for known community labels of nodes if self.comm_vec is not None: self._label_layers(self.comm_vec) self.interedgesbylayers=self._create_interlayeredges_by_layers()
@property def intralayer_weights(self): return self._intralayer_weights @intralayer_weights.setter def intralayer_weights(self, intra_weights): """We have to recreate the layer graphs to be able to access degrees and\ strengths if these have changed""" self._intralayer_weights = intra_weights self.layers=self._create_layer_graphs() def _prune_intra_edges_for_undirected(self): eset={} for i,e in enumerate(self.intralayer_edges): #note that we assume here BOTH WEIGHTs will be the same if edges are duplicated if e[0]<e[1]: eset[(e[0], e[1])] = self._intralayer_weights[i] else: eset[(e[1], e[0])] = self._intralayer_weights[i] edges=[] weights=[] for k,val in eset.items(): edges.append(k) weights.append(val) edge_weights=sorted(list(zip(edges,weights)),key=lambda x:x[0]) edges,weights=list(zip(*edge_weights)) self.intralayer_edges=edges self.intralayer_weights=weights def _prune_inter_edges_for_undirected(self): #interlayer edges aren't weighted. We just remove duplicated edges eset=set([]) for i, e in enumerate(self.interlayer_edges): if e[0] < e[1]: eset.add((e[0], e[1])) else: eset.add((e[1], e[0])) eset=sorted(list(eset), key= lambda x:x[0]) self.interlayer_edges=eset def _create_layer_graphs(self): layers=[] uniq=np.unique(self.layer_vec) for val in uniq: node_inds=np.where(self.layer_vec==val)[0] min_ind=np.min(node_inds) node_inds=set(node_inds) # hash for look up celist=[] cweights=[] #subtract this off so that number of nodes created in igraph is correct for i,e in enumerate(self.intralayer_edges): ei,ej=e[0],e[1] weight = 1.0 if self._intralayer_weights is None else self._intralayer_weights[i] if ei in node_inds or ej in node_inds: if not (ei>=min_ind and ej>=min_ind): raise AssertionError('edge indicies not in layer {:d},{:d}'.format(ei,ej)) celist.append((ei-min_ind,ej-min_ind)) cweights.append(weight) layers.append(self._create_graph_from_elist(len(node_inds),celist,cweights)) return layers def _create_graph_from_elist(self,n,elist,weights=None,simplify=True): ''' :param n: number of nodes :param elist: list of edges :param weights: vector of weights of the edges. Len(weights) must equal len(elist). :param simplify: remove multi-edges :return: ''' cgraph=ig.Graph(n=n,edges=elist,directed=False) if weights is not None: cgraph.es['weight']=weights if simplify: cgraph=cgraph.simplify(multiple=True,combine_edges='sum') return cgraph def _create_interlayeredges_by_layers(self): layers2edges={} for e in self.interlayer_edges: i,j=e[0],e[1] lay_i=self.layer_vec[i] lay_j=self.layer_vec[j] layers2edges[(lay_i,lay_j)]=layers2edges.get((lay_i,lay_j),[])+[(i,j)] layers2edges[(lay_j, lay_i)]=layers2edges[(lay_i, lay_j)] #keep reference return layers2edges def _label_layers(self,comvec=None): """ Here we set the true community assignment for each node in each layer using commvec :return: None """ if comvec is None: assert self.comm_vec is not None, "Cannot set node communities if they are not provided" comvec=self.comm_vec assert len(comvec)==self.N,"length of comvec: {:d} does not equal number of nodes: {:}".format(len(comvec),self.N) coffset=0 #keep track of nodes already seen for layer in self.layers: layer.vs['block']=comvec[coffset:coffset+layer.vcount()] coffset+=layer.vcount()
[docs] def get_layer_edgecounts(self): """m for each layer""" ecounts=[] for i in range(self.nlayers): if self.is_directed: ecounts.append(np.sum(self.get_intralayer_degrees(i))) else: ecounts.append(np.sum(self.get_intralayer_degrees(i))/2.0) return np.array(ecounts)
def get_intralayer_degrees(self, i=None,weighted=True): if i is not None: if weighted and 'weight' in self.layers[i].es.attributes(): return np.array(self.layers[i].strength(weights='weight')) else: return np.array(self.layers[i].degree()) else: total_degrees=[] for i in range(len(self.layers)): if weighted and 'weight' in self.layers[i].es.attributes(): total_degrees.extend(list(self.layers[i].strength(weights='weight'))) else: total_degrees.extend(list(self.layers[i].degree())) return np.array(total_degrees) def get_interlayer_degrees(self): degrees=np.zeros(self.N) for i,e in enumerate(self.interlayer_edges): ei,ej=e[0],e[1] toadd=1 if self.interlayer_weights is None else self.interlayer_weights[i] degrees[ei]=degrees[ei]+toadd degrees[ej]=degrees[ej]+toadd return degrees
[docs] def get_AMI_with_communities(self,labels): """ Calculate adjusted mutual information of labels with underlying community of network. :param labels: commmunity to assess agreement with. Len(labels) must \ equal self.N :return: """ if self.comm_vec is None: raise ValueError("Must provide communities lables for Multilayer Graph") return skm.adjusted_mutual_info_score(self.comm_vec,labels_pred=labels)
[docs] def get_AMI_layer_avg_with_communities(self,labels): """ Calculate AMI of each layer with corresponding community in labels. Return \ average AMI weighted by number of nodes in each layer. :param labels: commmunity to assess agreement with. Len(labels) must \ equal self.N :return: """ if self.comm_vec is None: raise ValueError("Must provide communities lables for Multilayer Graph") la_amis=[] lay_vals=np.unique(self.layer_vec) for lay_val in lay_vals: cinds=np.where(self.layer_vec==lay_val)[0] la_amis.append(len(cinds)/(1.0*self.N)*skm.adjusted_mutual_info_score(labels_true=labels[cinds],labels_pred=self.comm_vec[cinds])) return np.sum(la_amis) #take the average weighted by number of nodes in each layer
[docs] def get_accuracy_with_communities(self,labels,permute=True): """Calculate accuracy between supplied labels and the known communities of the networks. :param labels: commmunity to assess agreement with. Len(labels) must \ equal self.N :param permute: Should maximum accurracy across label permuations be identified? :return: """ if self.comm_vec is None: raise ValueError("Must provide communities lables for Multilayer Graph") #TODO # #this needs to be re-written to be more efficient. Can use bipartite matching here. if permute: vals=np.unique(labels) all_acc=[] ncoms=float(len(np.unique(self.comm_vec))) for perm in it.permutations(vals): cdict=dict(zip(vals,perm)) mappedlabels=list(map(lambda x : cdict[x],labels)) acc=skm.accuracy_score(y_pred=mappedlabels,y_true=self.comm_vec,normalize=False) acc=(acc-self.N/ncoms)/(self.N-self.N/ncoms) all_acc.append(acc) return np.max(all_acc) #return value with highest accuracy else: return skm.accuracy_score(y_true=self.comm_vec,y_pred=labels)
def get_accuracy_layer_averaged_with_communities(self,labels,permute=True): if self.comm_vec is None: raise ValueError("Must provide communities lables for Multilayer Graph") la_amis = [] lay_vals = np.unique(self.layer_vec) for lay_val in lay_vals: cinds = np.where(self.layer_vec == lay_val)[0] clabs=labels[cinds] ctrue=self.comm_vec[cinds] if permute: vals=np.unique(clabs) all_acc=[] ncoms=float(len(np.unique(ctrue))) for perm in it.permutations(vals): cdict=dict(zip(vals,perm)) mappedlabels=list(map(lambda x : cdict[x],clabs)) acc=skm.accuracy_score(y_pred=mappedlabels,y_true=ctrue,normalize=False) c_n=float(len(clabs)) #size of current layer acc=(acc-c_n/ncoms)/(c_n-c_n/ncoms) all_acc.append(acc) la_amis.append( (len(cinds)/(1.0*self.N))*np.max(all_acc) ) else: la_amis.append( (len(cinds)/(1.0*self.N))*skm.accuracy_score(y_true=ctrue,y_pred=clabs)) return np.sum(la_amis) def _to_sparse(self,intra=True): if intra: edgelist=np.array(self.intralayer_edges) data=self.intralayer_weights else: edgelist=np.array(self.interlayer_edges) data=self.interlayer_weights # if edgelist.shape[1]>2 : #assume data is 3rd # data=edgelist[:,2] # else: # data=np.array([1.0 for _ in range(edgelist.shape[0])]) row_ind=edgelist[:,0] col_ind=edgelist[:,1] N=self.N return scispa.csr_matrix((data,(row_ind,col_ind)),shape=(N,N),dtype=float)
[docs] def reorder_nodes(self): """ Resort all objects in the MultilayerGraph object by their community label :return: """ new_com_vec=np.array([]) A_old,C_old=self.to_scipy_csr() offset=0 total_perm_vec=[] for i,layer in enumerate(self.layers): cinds=np.where(self.layer_vec==i)[0] ccom_vec=self.comm_vec[cinds] #zip sort, unzip if i==0: c_new_comvec,cperm_vec=list(zip(*sorted(zip(ccom_vec,range(offset,len(ccom_vec)+offset)),key=lambda x:x[0]))) rev_perm_vec = [-1 for _ in range(len(cperm_vec))] for j, val in enumerate(cperm_vec): rev_perm_vec[val - offset] = j + offset # offset for nodes in previous layers assert -1 not in rev_perm_vec else: c_new_comvec = [0 for _ in range(len(ccom_vec))] #use the old rev_perm_vec for permuting this layer (just offset new values) for j,val in enumerate(rev_perm_vec): rev_perm_vec[j]=val+len(cinds) for j,val in enumerate(ccom_vec): c_new_comvec[rev_perm_vec[j]-offset]=val new_com_vec=np.append(new_com_vec,c_new_comvec) total_perm_vec+=rev_perm_vec offset+=len(cinds) new_intra_elist=[(total_perm_vec[e[0]],total_perm_vec[e[1]],self.intralayer_weights[i])\ for i,e in enumerate(self.intralayer_edges)] new_inter_elist = [(total_perm_vec[e[0]], total_perm_vec[e[1]], self.interlayer_weights[i]) \ for i,e in enumerate(self.interlayer_edges)] #reset with new permuted edges self.__init__(intralayer_edges=new_intra_elist,layer_vec=self.layer_vec,interlayer_edges=new_inter_elist, comm_vec=new_com_vec)
[docs] def to_scipy_csr(self): """Create sparse matrix representations of the multilayer network. :return: (A_sparse,C_sparse) = interlayer adjacency , interlayer adjacency """ A_sparse=self._to_sparse() C_sparse=self._to_sparse(intra=False) return (A_sparse,C_sparse)
def create_null_adj(self): P = np.zeros((self.N, self.N)) cind = 0 for layer in self.layers: if 'weight' in layer.es.attributes(): strength = np.array(layer.strength(weights='weight')) pcur = np.outer(strength, strength) pcur /= (2.0 * np.sum(layer.es['weight'])) else: strength = np.array(layer.strength()) pcur = np.outer(strength, strength) pcur /= (2.0 * np.sum(layer.degree())) cinds = range(cind, cind + layer.vcount()) P[np.ix_(cinds, cinds)] = pcur cind += layer.vcount() P=scispa.csr_matrix(P) return P
[docs] def plot_communities(self, comvec=None, layers=None, ax=None, cmap=None): """ Plot communities as an nlayers by nodes/layer heatmap. Note this only works for the multiplex case where the number of nodes is fixed throughout each layer. :param comvec: community label for each nodes. If none, used stored ground truth for\ the network. :param layers: Subset of the layers to plot. If None, plots all layers. :param ax: matplotlib.Axes to draw on :param cmap: color map to label communities with. Defaults to cube_helix. :return: """ if layers is None: layers = np.unique(self.layer_vec) def get_partition_matrix(partition, layer_vec): # assumes partiton in same ordering for each layer vals = np.unique(layer_vec) nodeperlayer = len(layer_vec) // len(vals) com_matrix = np.zeros((nodeperlayer, len(vals))) for i, val in enumerate(vals): cind = np.where(layer_vec == val)[0] ccoms = partition[cind] com_matrix[:, i] = ccoms return com_matrix cinds = np.where(np.isin(self.layer_vec, layers))[0] if comvec is None: # use baseline assert self.comm_vec is not None, "Must specify ground truth com_vec for graph" cpart = self.comm_vec else: cpart = comvec vmin = np.min(cpart) vmax = np.max(cpart) clayer_vec = self.layer_vec[cinds] part_mat = get_partition_matrix(cpart, clayer_vec) if ax is None: ax = plt.axes() if cmap is None: cmap = sbn.cubehelix_palette(as_cmap=True) ax.grid('off') ax.pcolormesh(part_mat, cmap=cmap, vmin=vmin, vmax=vmax) # numswitched = self.get_number_nodes_switched_all_layers(ind=ind, percent=True) # numswitched = numswitched[np.where(np.isin(self.layers_unique, layers))[0]] # filter for layers selected # for i, num in enumerate(numswitched): # ax.text(s="{:.2f}".format(num), x=i, y=-1, fontdict={"fontsize": 9, 'color': 'white'}) ax.set_xticks(range(0, len(layers))) ax.set_xticklabels(layers) return ax
[docs]class MultilayerSBM(MultilayerGraph): """ Subclass of MultilayerGraph to create the dynamic stochastic block model from Ghasemian et al. 2016. """
[docs] def __init__(self,n,comm_prob_mat,layers=2,transition_prob=.1,block_sizes0=None,use_gcc=False): """ :param n: number of nodes in each layer :param comm_prob_mat: probability of block connections in SBM(this is fixed across all layers) :param layers: number of layers :param transition_prob: probability of each node changing communities from one layer to next. \ if transistion_prob=0 then community structure is constant across all layers. :param block_sizes0: Initial size of the blocks. Default is to use even block sizes starting out.\ block sizes at subsequent layers are determined by number of nodes that randomly transition between \ communities :param use_gcc: use only giant connected component of starting SBM (option for single layer only) """ self.layer_sbms = [] #this is list of GenerateGraph.RandomSBMGraph objects. self.nlayers=layers self.transition_prob=transition_prob self.comm_prob_mat=comm_prob_mat if block_sizes0 is None: block_sizes0 = [int(n / (1.0 * comm_prob_mat.shape[0])) for _ in range(comm_prob_mat.shape[0] - 1)] block_sizes0 += [n - np.sum(block_sizes0)] # make sure it sums to one assert not (use_gcc and layers > 1), "use_gcc only applies to single layer network" #create first layer initalSBM = RandomSBMGraph(n=n, comm_prob_mat=comm_prob_mat, block_sizes=block_sizes0, use_gcc=use_gcc) self.n = initalSBM.n # number of nodes in each layer self.nlayers = layers # number of layers self.N = self.n * self.nlayers #Total number of nodes self._blocks=range(self.comm_prob_mat.shape[0]) # #initialize the first one initalSBM.graph.vs['id']=np.arange(n) #set id's in order self.layer_sbms.append(initalSBM) for _ in range(layers-1): #create the next sbm from the previous one and add it to the list. self.layer_sbms.append(self._get_next_sbm(self.layer_sbms[-1])) self.interedges=self.get_interlayer_edgelist() self.intraedges=self.get_intralayer_edgelist() self.layer_vec=self.get_node_layer_vec() #some redudancy between constructors here that could be cleaned up super(MultilayerSBM,self).__init__(intralayer_edges=self.intraedges, interlayer_edges=self.interedges, layer_vec=self.layer_vec,comm_vec=self.get_all_layers_block()) #switch this to point to the layer_sbms igraphs self.layers=[ lsbm.graph for i,lsbm in enumerate(self.layer_sbms)]
# self.layers=self.layer_sbms #we only need to store these once. def _get_next_sbm(self, sbm): """ Generate new block values for each nodes. And then create a new SBM for the next layer :param sbm: current sbm :return: GenerateGraph.RandomSBMGraph """ num_transition=np.random.binomial(self.n,self.transition_prob) nodes2switch=np.random.choice(np.arange(self.n),replace=False,size=num_transition) #generate all of the newblocks #choose uniformly from blocks. newblocks=np.random.choice(self._blocks,size=len(nodes2switch)) #increment and decrement the block sizes accordingly nxtblocksize=sbm.block_sizes.copy() #make deepcopy inds, cnts = np.unique(newblocks, return_counts=True) nxtblocksize[inds]+=1 * cnts #count old blocks to decrement inds, cnts = np.unique(np.array(sbm.graph.vs['block'])[nodes2switch], return_counts=True) nxtblocksize[inds] += -1 * cnts #decrement by number next_blocks=np.array(sbm.graph.vs['block']) newids=np.array(sbm.graph.vs['id']) nxtsbm=RandomSBMGraph(n=self.n,comm_prob_mat=self.comm_prob_mat,block_sizes=nxtblocksize) next_blocks[nodes2switch]=newblocks #switch the blocks # this permutes the new ids accordingly to keep blocks in order perm_ids=[ x[1] for x in sorted(zip(next_blocks,newids),key=lambda x :x[0])] nxtsbm.graph.vs['id']=perm_ids #permute the nodes to line up with ids nxtsbm.graph=nxtsbm.graph.permute_vertices(perm_ids) return nxtsbm
[docs] def get_intralayer_adj(self): """ Single adjacency matrix representing all layers. :return: np.array of intralayer adjacency representation """ intra_adj=np.zeros((self.n*self.nlayers,self.n*self.nlayers)) for i,layer in enumerate(self.layer_sbms): offset=self.n*i inds=np.ix_(range(offset,offset+self.n),range(offset,offset+self.n)) #index diagonal block intra_adj[inds]=layer.get_adjacency() return intra_adj
[docs] def get_interlayer_adj(self): """ Singer interlayer adjencency matrix created by connecting each node to it's equivalent\ node in the next layer. This is a temporal multiplex topology. :return: np.array of interlayer adjacency representation """ #TODO this can be changed for different couplings. Right now I do only adjacent layers inter_adj=np.zeros((self.n*self.nlayers,self.n*self.nlayers)) Iden_N=np.identity(self.n) for i in range(self.nlayers): if i < self.nlayers-1: inds=np.ix_(range(i*self.n,(i+1)*self.n),range((i+1)*self.n,(i+2)*self.n)) inds_t=np.ix_(range((i+1)*self.n,(i+2)*self.n),range(i*self.n,(i+1)*self.n)) inter_adj[inds]=np.array(Iden_N) inter_adj[inds_t]=np.array(Iden_N) return inter_adj
[docs] def get_interlayer_edgelist(self): """ Single list of edges giving the multilayer connections. For this model \ nodes are multiplex an connected to their neighboring slice identities. :return: np.array """ interedges=np.zeros((self.n*(self.nlayers-1),2),dtype='int') offset=0 for i in range(self.nlayers-1): cnet=self.layer_sbms[i] cnetnxt=self.layer_sbms[i+1] cedge=np.array(list(zip(range(offset,offset+cnet.n), range(offset+cnet.n,offset+2*cnet.n)))) interedges[offset:offset+cnet.n,:]=cedge offset+=cnet.n return interedges
[docs] def get_node_layer_vec(self): """ :return: np.array denoting which layer each node is in """ layers=[] for i,net in enumerate(self.layer_sbms): layers.extend([i for _ in range(net.n)]) return np.array(layers)
[docs] def get_intralayer_edgelist(self): """ Single list of edges treating the group of single layer SBM's as a \ surpra-adjacency format :return: np.array """ nedges=np.sum([net.m for net in self.layer_sbms]) intraedges=np.zeros((nedges,2),dtype='int') offset=0 m_offset=0 #for indexing for i in np.arange(self.nlayers): c_layernet=self.layer_sbms[i] if c_layernet.m==0: continue #no edges in this layer c_elist=c_layernet.get_edgelist() intraedges[m_offset:m_offset+c_layernet.m,:]=np.array(c_elist)+offset offset+=c_layernet.n m_offset+=c_layernet.m return intraedges
[docs] def get_all_layers_block(self): """ returns a single vector with block id for each node across all of the layers :return: """ merged_blocks=[] for layer in self.layer_sbms: merged_blocks+=layer.graph.vs['block'] return np.array(merged_blocks)
def generate_planted_partitions_sbm(n,epsilon,c,ncoms): """ :param n: The number of nodes :param c: total average degree for the network :param ep: detectability parameter, :math:`\epsilon=p_{out}/p_{in}`, where \ p is the internal and external connection probabilities :param ncoms: number of communities within the network :return: """ #based on planted partition model, we can calculated the values of #pin and pout using the number of nodes, average degree, and the ratio # of the internal and external edges noq=n/float(ncoms) pin=c/(((noq-1.0)+noq*(ncoms-1)*epsilon)) pout = epsilon * pin remain=n%ncoms if remain>0: nodesperblock=[int(n/ncoms)]*ncoms nodesperblock=[ x+1 if k<remain else x for k,x in enumerate(nodesperblock) ] else: nodesperblock=[n/ncoms]*ncoms assert np.sum(nodesperblock)==n # prob_mat=np.identity(ncoms) * pin+(np.ones((ncoms,ncoms))-np.identity(ncoms))*pout prob_mat=np.identity(ncoms)*(pin-pout)+np.ones((ncoms,ncoms))*pout sbm = MultilayerSBM(n, comm_prob_mat=prob_mat, layers=1, block_sizes0=nodesperblock, transition_prob=0, use_gcc=True) return sbm def generate_planted_partitions_dynamic_sbm(n, epsilon, c, ncoms,nlayers,eta): """ :param n: The number of nodes :param c: total average degree for the network :param ep: detectability parameter, :math:`\epsilon=p_{out}/p_{in}`, where \ p is the internal and external connection probabilities :param ncoms: number of communities within the network :param nlayers: number of layers of the dynamic stochastic block model :param eta: probability of each node switching community label in each layer :return: """ # based on planted partition model, we can calculated the values of # pin and pout using the number of nodes, average degree, and the ratio # of the internal and external edges noq = n / float(ncoms) pin = c / (((noq - 1.0) + noq * (ncoms - 1) * epsilon)) pout = epsilon * pin remain = n % ncoms if remain > 0: nodesperblock = [int(n / ncoms)] * ncoms nodesperblock = [x + 1 if k < remain else x for k, x in enumerate(nodesperblock)] else: nodesperblock = [n / ncoms] * ncoms assert np.sum(nodesperblock) == n # prob_mat=np.identity(ncoms) * pin+(np.ones((ncoms,ncoms))-np.identity(ncoms))*pout prob_mat=np.identity(ncoms)*(pin-pout)+np.ones((ncoms,ncoms))*pout dsbm = MultilayerSBM(n, comm_prob_mat=prob_mat, layers=nlayers, block_sizes0=nodesperblock, transition_prob=eta, use_gcc=False) return dsbm def adjacency_to_edges(A,directed=False): """Extract list of non zero values from adjacency, handling directedness.""" if not directed: try: nnz_inds=np.nonzero(np.triu(A)) except: nnz_inds=np.nonzero(scispa.triu(A)) else: nnz_inds = np.nonzero(A) nnzvals = np.array(A[nnz_inds]) if len(nnzvals.shape) > 1: nnzvals = nnzvals[0] # handle scipy sparse types return list(zip(nnz_inds[0], nnz_inds[1], nnzvals))