#!/usr/bin/env python
"""
The clustering algorithm works in several steps to generate cluster assignments.
1. Generate a scoring matrix using a network flow strategy
2. Cluster on the scoring matrix.
3. In case the network displays memory effects, define weak nodes.
The scoring matrix is first clustered with the AgglomerativeClustering algorithm.
Because the network flow strategy can result in central values being separated from the clusters,
agglomerative clustering is repeated on score matrices with removed high-scoring nodes
until larger clusters are identified.
After this step, clustering assignments are set.
However, if the network caused the scoring matrix to enter a flip-flop state,
nodes can still be defined as 'weak'.
In this case, manta uses a shortest path strategy to assess whether nodes belong to a cluster
or are in conflict with the cluster oscillator.
"""
__author__ = 'Lisa Rottjers'
__email__ = 'lisa.rottjers@kuleuven.be'
__status__ = 'Development'
__license__ = 'Apache 2.0'
import networkx as nx
import numpy as np
# from sklearn.mixture import GaussianMixture # This works quite well, slightly better Sn
from sklearn.cluster import AgglomerativeClustering
import sys
from manta.flow import diffusion, partial_diffusion, harary_components
from itertools import combinations, chain
from copy import deepcopy
import os
import logging.handlers
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
# handler to sys.stdout
sh = logging.StreamHandler(sys.stdout)
sh.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
sh.setFormatter(formatter)
logger.addHandler(sh)
# handler to file
# only handler with 'w' mode, rest is 'a'
# once this handler is started, the file writing is cleared
# other handlers append to the file
logpath = "\\".join(os.getcwd().split("\\")[:-1]) + '\\manta.log'
# filelog path is one folder above manta
# pyinstaller creates a temporary folder, so log would be deleted
fh = logging.handlers.RotatingFileHandler(maxBytes=500,
filename=logpath, mode='a')
fh.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
logger.addHandler(fh)
[docs]def cluster_graph(graph, limit, max_clusters, min_clusters, min_cluster_size,
iterations, subset, ratio, edgescale, permutations, verbose):
"""
Takes a networkx graph and carries out network clustering.
The returned graph contains cluster assignments and weak assignments.
If weight is available, this is considered during the diffusion process.
Parameters
----------
:param graph: Weighted, undirected networkx graph.
:param limit: Percentage in error decrease until matrix is considered converged.
:param max_clusters: Maximum number of clusters to evaluate in K-means clustering.
:param min_clusters: Minimum number of clusters to evaluate in K-means clustering.
:param min_cluster_size: Minimum cluster size as fraction of network size
:param iterations: If algorithm does not converge, it stops here.
:param subset: Fraction of edges used in subsetting procedure
:param ratio: Ratio of scores that need to be positive or negative for a stable edge
:param edgescale: Mean edge weight for node removal
:param permutations: Number of permutations for partial iterations
:param verbose: Verbosity level of function
:return: NetworkX graph, score matrix and diffusion matrix.
"""
adj_index = dict()
for i in range(len(graph.nodes)):
adj_index[list(graph.nodes)[i]] = i
rev_index = {v: k for k, v in adj_index.items()}
# next part is to define scoring matrix
balanced = [False]
scoremat, memory, diffs = diffusion(graph=graph, limit=limit, iterations=iterations, verbose=verbose)
if not nx.is_directed(graph):
balanced = harary_components(graph, verbose=verbose).values()
# partial diffusion results in unclosed graphs for directed graphs,
# and can therefore not be used here.
if not all(balanced):
if verbose:
logger.info("Carrying out diffusion on partial graphs. ")
# ratio from 0.7 to 0.9 appears to give good results on 3 clusters
scoremat, partials = partial_diffusion(graph=graph, iterations=iterations, limit=limit, subset=subset,
ratio=ratio, permutations=permutations, verbose=verbose)
bestcluster = None
# the randomclust is a random separation into two clusters
# if clustering can't beat this, the user is given a warning
# select optimal cluster by sparsity score
bestcluster = cluster_hard(graph=graph, adj_index=adj_index, rev_index=rev_index, scoremat=scoremat,
max_clusters=max_clusters,
min_clusters=min_clusters, min_cluster_size=min_cluster_size, verbose=verbose)
flatcluster = _cluster_vector(bestcluster, adj_index)
if not all(balanced):
weak_nodes = cluster_weak(graph, diffs=diffs, cluster=flatcluster,
edgescale=edgescale,
adj_index=adj_index, rev_index=rev_index, verbose=verbose)
weak_dict = dict()
for node in graph.nodes:
if adj_index[node] in weak_nodes:
weak_dict[node] = 'weak'
else:
weak_dict[node] = 'strong'
nx.set_node_attributes(graph, values=weak_dict, name='assignment')
nx.set_node_attributes(graph, values=bestcluster, name='cluster')
return graph, scoremat
[docs]def sparsity_score(graph, clusters, rev_index):
"""
Given a graph, and a list of cluster identities,
this function calculates how many edges need to be cut
to assign these cluster identities.
Cuts through positively-weighted edges are penalized,
whereas cuts through negatively-weighted edges are rewarded.
The lowest sparsity score represents the best clustering.
Because this sparsity score rewards cluster assignments
that separate out negative hubs.
This penalty ensures that nodes with some intra-cluster
negative edges are still assigned to clusters where
they predominantly co-occur with other cluster members.
Parameters
----------
:param graph: NetworkX weighted, undirected graph
:param clusters: List of cluster identities
:param rev_index: Index matching node ID to matrix index
:return: Sparsity score
"""
# set up scale for positive + negative edges
# the sparsity score scales from -1 to 1
# -1 is the worst possible assignment,
# all negative edges inside clusters and positive outside clusters
# 1 is the best,
# with clusters containing only positive edges
cut_score = 1/len(graph.edges)
sparsity = 0
edges = list()
for cluster_id in set(clusters):
# get the set of edges that is NOT in either cluster
node_ids = list(np.where(clusters == cluster_id)[0])
node_ids = [rev_index.get(item, item) for item in node_ids]
cluster = graph.subgraph(node_ids)
edges.extend(list(cluster.edges))
# penalize for having negative edges inside cluster
weights = nx.get_edge_attributes(cluster, 'weight')
for x in weights:
if weights[x] < 0:
sparsity -= cut_score
else:
sparsity += cut_score
all_edges = list(graph.edges)
cuts = list()
for edge in all_edges:
if edge not in edges and (edge[1], edge[0]) not in edges:
# problem with cluster edges having swapped orders
cuts.append(edge)
for edge in cuts:
cut = graph[edge[0]][edge[1]]['weight']
if cut > 0:
sparsity -= cut_score
else:
sparsity += cut_score
return sparsity
[docs]def cluster_hard(graph, adj_index, rev_index, scoremat,
max_clusters, min_clusters, min_cluster_size, verbose):
"""
Agglomerative clustering is used to separate nodes based on the scoring matrix.
Because the scoring matrix generally results in separation of 'central' nodes,
these nodes are removed and clustering is repeated until larger clusters are identified.
Afterwards, the nodes are assigned to clusters based on a shortest path strategy.
Parameters
----------
:param graph: NetworkX weighted, undirected graph
:param adj_index: Index matching matrix index to node ID
:param rev_index: Index matching node ID to matrix index
:param scoremat: Converged diffusion matrix
:param max_clusters: Maximum cluster number
:param min_clusters: Minimum cluster number
:param min_cluster_size: Minimum cluster size as fraction of network size
:param verbose: Verbosity level of function
:return: Dictionary of nodes with cluster assignments
"""
# get the mean of 100 assignments
randomscores = list()
for i in range(100):
randomclust = np.random.randint(2, size=len(scoremat))
randomscores.append(sparsity_score(graph, randomclust, rev_index))
scores = dict()
scores['random'] = np.mean(randomscores)
if verbose:
logger.info('Sparsity level for 2 clusters, randomly assigned labels: ' + str(scores['random']))
bestclusters = dict()
clusnum = min_clusters
scoremat_index = rev_index.copy()
outliers = dict()
outliers[clusnum] = list()
clustermat = scoremat.copy()
# minimum cluster size is 10% of nodes
minclus = len(graph) * min_cluster_size
while clusnum < max_clusters + 1:
clusters = AgglomerativeClustering(n_clusters=clusnum).fit_predict(clustermat)
counts = np.bincount(clusters)
# then add to cluster based on shortest paths
if len(np.where(counts < minclus)[0]) > 0:
# if there is a cluster with fewer than 3 nodes,
# remove nodes that separate into tiny cluster
# get cluster ID and location for this cluster
locs = np.where(counts < minclus)[0]
# we only remove one cluster pos at a time
# repeated clustering may assign node differently
loc = locs[0]
clusid = list(set(clusters))[loc]
clusloc = np.where(clusters == clusid)[0][0]
# we need to update the rev_index so that adjacency indices point to taxon IDs
# outlier nodes are added to a list, to be dealt with later
outliers[clusnum].append(scoremat_index[clusloc])
clustermat, scoremat_index = _remove_node(clusloc, clustermat, scoremat_index)
# now the smaller clusters are deleted, we can cluster on the updated scoring matrix
if clustermat.shape[0] <= max_clusters:
# indicates that there is no good clustering possible for this cluster number
clusnum += 1
outliers[clusnum] = list()
# reset scoring matrix in case different cluster assignment does assign outliers
clustermat = scoremat.copy()
scoremat_index = rev_index.copy()
else:
scores[clusnum] = sparsity_score(graph, clusters, rev_index)
bestclusters[clusnum] = clusters
if verbose:
logger.info('Sparsity level of k=' + str(clusnum) + ' clusters: '
+ str(scores[clusnum]) + '.')
clusnum += 1
outliers[clusnum] = list()
# reset scoring matrix in case different cluster assignment does assign outliers
clustermat = scoremat
scoremat_index = rev_index.copy()
topscore = max(scores, key=scores.get)
if topscore != 'random':
if verbose:
logger.info('Highest score for k=' + str(topscore) + ' clusters: ' + str(scores[topscore]))
else:
logger.warning('Warning: random clustering performed best.'
' \n Setting cluster amount to minimum value.')
topscore = min_clusters
# it is possible that all evaluated cluster assignments did not work out
# in that case, the assignment below is without the binning strategy
if min_clusters not in bestclusters:
bestclusters[min_clusters] = AgglomerativeClustering(n_clusters=min_clusters).fit_predict(clustermat)
# given a topscore, clustering is carried out on scoremat without outliers
outlier_locs = [adj_index[x] for x in outliers[topscore]]
scoremat_index = rev_index.copy()
clustermat = scoremat.copy()
clustermat, scoremat_index = _remove_node(outlier_locs, clustermat, scoremat_index)
bestcluster = bestclusters[topscore]
# we need to assign outlier nodes to clusters after clustering on main network
corrdict = _path_weights(outliers[topscore], graph, verbose)
scoremat_index = {v: k for k, v in scoremat_index.items()}
# generate dictionary of cluster assignments
# use this to reconstruct bestcluster vector
cluster_index = adj_index.copy()
for key in cluster_index:
try:
cluster_index[key] = float(bestcluster[scoremat_index[key]])
except KeyError:
# key error happens for outlier that has not been assigned cluster ID yet
pass
# replace node values in corrdict with cluster ids
for node in corrdict:
# initialize list to store path values per cluster
clusdict = dict.fromkeys(set(bestcluster))
for key in clusdict:
clusdict[key] = list()
# lookup cluster ID of node
for value in corrdict[node]:
try:
clusid = cluster_index[value]
clusdict[clusid].append(corrdict[node][value])
except KeyError:
pass
closest_cluster = list(set(bestcluster))[np.argmax([np.mean(clusdict[x]) for x in clusdict])]
cluster_index[node] = float(closest_cluster)
return cluster_index
[docs]def cluster_weak(graph, diffs, cluster, edgescale, adj_index, rev_index, verbose):
"""
Although clusters can be assigned with cluster_hard, cluster_weak tests
whether cluster assignments are in conflict with oscillator nodes present in clusters.
Oscillators can only be defined from flip-flopping states;
hence, this function should not be called for scoring matrices that converge to -1 and 1.
Parameters
----------
:param graph: NetworkX weighted, undirected graph
:param diffs: Diffusion matrices from flip-flop states generated by diffusion
:param cluster: Cluster assignment
:param edgescale: Mean edge weight for node removal
:param adj_index: Node index
:param rev_index: Reverse node index
:param verbose: Verbosity level of function
:return: Vector with cluster assignments
"""
# clustering on the 1st iteration already yields reasonable results
# however, we can't separate nodes that are 'intermediates' between clusters
# solution: find nodes with the lowest cumulative edge amplitudes
# the nodes that oscillate the most, have the largest self-loop amplitude
# these are on the cluster periphery
# nodes that are in-between clusters do not have large self-loop amplitudes
#bestcluster = cluster_hard(graph=graph, rev_index=rev_index, scoremat=scoremat,
# max_clusters=max_clusters, min_clusters=min_clusters, cluster=cluster)
# cluster assignment 0 is reserved for weak clusters
if verbose:
logger.info('Determining weak nodes.')
# diffs is a 3-dimensional array; need to extract 2D dataframe with timeseries for each edge
# each timeseries is 5 flip-flops long
diffs = np.array(diffs)
# only upper triangle of matrix is indexed this way
core, anti = _core_oscillators(difmats=diffs, assignment=cluster,
adj_index=adj_index, rev_index=rev_index, verbose=verbose)
# for each node with contrasting edge products,
# we can check whether the sparsity score is improved by
# removing the node or adding it to another cluster.
remove_cluster = _oscillator_paths(graph=graph, core_oscillators=core, assignment=cluster,
adj_index=adj_index, edgescale=edgescale, verbose=verbose)
# we only remove nodes with conflicting shortest paths if
# they have a large impact on sparsity score
#posfreq = np.zeros((len(graph), len(graph)))
#negfreq = np.zeros((len(graph), len(graph)))
# count how many times specific values in matrix have
# been assigned positive or negative values
#for b in range(len(partials)):
# posfreq[partials[b] > 0] += 1
# negfreq[partials[b] < 0] += 1
# count relative frequeny - values close to 1 or 0 are ok
#posratio = posfreq[np.where(posfreq != 0)] / \
# (posfreq[np.where(posfreq != 0)] + negfreq[np.where(posfreq != 0)])
#weak_edges = np.where(np.logical_and(posratio < 0.8, posratio > 0.2))
#weak_ids = np.where(posfreq != 0)
#indices = weak_ids[0][weak_edges[0]]
#weak_freq = np.unique(indices, return_counts=True)
# determine how to go from weak edges to weak nodes
# [0] or [1] does not matter, weak_edges is symmetrical
#weak_nodes = list()
#for i in range(len(weak_freq[0])):
# node = weak_freq[0][i]
# freq = weak_freq[1][i]
# deg = nx.degree(graph, list(graph.nodes)[0])
# if freq / deg > edgescale:
# weak_nodes.append(i)
return remove_cluster
def _core_oscillators(difmats, assignment, adj_index, rev_index, verbose):
"""
Given a list of diffusion matrices calculated during a flip-flop state,
this function identifies core oscillators as well as their anti-correlated partners.
Parameters
----------
:param difmats: Diffusion matrices during flip-flop state
:param assignment: Cluster assignment
:param adj_index: Dictionary for indexing
:param rev_index: Dictionary for indexing
:param verbose: Verbosity level of function
:return: Tuple with list of oscillators and dictionary of anti-correlated oscillators
"""
oscillators = list()
oscillators_series = list()
for index in range(len(assignment)):
# node amplitude is NOT correlated to position in network
seq = difmats[:, index, index]
ampli = np.max(seq) - np.min(seq)
if ampli > 0.5:
# if the amplitude is this large,
# the node may be an oscillator
# in that case, mean amplitude may be low
oscillators.append(index)
oscillators_series.append(seq)
oscillators = [rev_index[x] for x in oscillators]
if verbose:
logger.info('Found the following strong oscillators: ' + str(oscillators))
amplis = dict()
clusdict = dict.fromkeys(oscillators)
for x in clusdict:
clusdict[x] = assignment[adj_index[x]]
# we find anti-correlated oscillator nodes
# there should be at least one node represented for each cluster
for pair in combinations(range(len(oscillators)), 2):
total = oscillators_series[pair[0]] - oscillators_series[pair[1]]
# need to be careful with this number,
# the core oscillators should converge to 1 and -1
# but may stick a little below that value
amplis[(oscillators[pair[0]], oscillators[pair[1]])] = (np.max(total) - np.min(total))
# need to find the largest anti-correlation per cluster
clus_corrs = dict.fromkeys(set(assignment), 0)
clus_nodes = dict.fromkeys(set(assignment))
for corr in amplis:
cluster1 = clusdict[corr[0]]
cluster2 = clusdict[corr[1]]
if amplis[corr] > clus_corrs[cluster1]:
clus_nodes[cluster1] = corr
clus_corrs[cluster1] = amplis[corr]
if amplis[corr] > clus_corrs[cluster2]:
clus_nodes[cluster2] = corr
clus_corrs[cluster2] = amplis[corr]
clus_nodes = {k: v for k, v in clus_nodes.items() if v is not None}
# it is possible for clusters to not have a strong oscillator
core_oscillators = set(list(chain.from_iterable(list(clus_nodes.values()))))
id_corrs = dict.fromkeys(core_oscillators, 0)
anti_sizes = dict.fromkeys(core_oscillators, 0)
for nodes in combinations(core_oscillators, 2):
try:
size = amplis[nodes]
except KeyError:
size = amplis[(nodes[1], nodes[0])]
if size > anti_sizes[nodes[0]]:
id_corrs[nodes[0]] = nodes[1]
anti_sizes[nodes[0]] = size
if size > anti_sizes[nodes[1]]:
id_corrs[nodes[1]] = nodes[0]
anti_sizes[nodes[1]] = size
[clusdict.pop(x) for x in list(clusdict.keys()) if x not in core_oscillators]
anti_corrs = dict()
for core in core_oscillators:
anti_corrs[clusdict[core]] = clusdict[id_corrs[core]]
# oscillator is defined as strongest anti-correlation
return core_oscillators, anti_corrs
def _oscillator_paths(graph, core_oscillators,
assignment, adj_index, edgescale, verbose):
"""
Given optimal cluster assignments and a set of core oscillators,
this function computes paths to / from oscillators and identifies nodes
which do not match their cluster + oscillator assignment.
A list of these nodes is then returned.
Parameters
----------
:param graph: NetworkX weighted, undirected graph
:param core_oscillators: List of core oscillators
:param anti_corrs: Dictionary of anti-correlated oscillators
:param assignment: Cluster assignment
:param adj_index: Dictionary for indexing
:param edgescale: Threshold for node removal
:param verbose: Verbosity level of function
:return: List of node indices that are in conflict with oscillators
"""
# get all shortest paths to/from oscillators
corrdict = _path_weights(core_oscillators, graph, verbose)
varweights = list() # stores nodes that have low weights of mean shortest paths
clus_matches = list() # stores nodes that have matching signs for oscillators
clus_assign = list() # stores nodes that have negative shortest paths to cluster oscillator
# first need to scale weight variables for this
clusdict = dict.fromkeys(core_oscillators)
for x in clusdict:
clusdict[x] = assignment[adj_index[x]]
clusdict = {v: k for k, v in clusdict.items()}
for target in graph.nodes:
clus_id = assignment[adj_index[target]]
try:
weight = corrdict[clusdict[clus_id]][target]
if np.sign(weight) == -1:
# this criterion filters out nodes that have negative path weight to their oscillator
clus_assign.append(target)
if -edgescale < weight < edgescale:
# this criterion filters out nodes that have low cumulative path weights
varweights.append(target)
except KeyError:
pass
# cannot check oscillator sign for clusters w/o oscillators
if verbose:
logger.info('Sign of mean edge products does not match cluster assignment for: \n' +
str(clus_assign))
logger.info('Mean edge products are small for: \n' +
str(varweights))
remove_cluster = set([adj_index[x] for x in clus_assign + varweights])
return list(remove_cluster)
def _node_sparsity(graph, removals, assignment, rev_index):
default_sparsity = sparsity_score(graph, assignment, rev_index)
clusters = set(assignment)
updated_removals = deepcopy(removals)
for node in removals:
clus_id = list()
clus_id.append(assignment[node])
other_ids = list(clusters.difference(clus_id))
other_sparsities = list()
for id in other_ids:
updated_assignment = deepcopy(assignment)
updated_assignment[node] = id
other_sparsities.append(sparsity_score(graph, updated_assignment, rev_index))
if np.max(other_sparsities) < (default_sparsity - 0.3):
updated_removals.remove(node)
return updated_removals
def _path_weights(source, graph, verbose):
"""
Given a list of nodes, this function returns a dictionary of all shortest path weights
from the sources to all other nodes in the network.
Parameters
----------
:param source: List of source nodes
:param graph: NetworkX graph object
:param verbose: Verbosity level of function
:return: Dictionary of shortest path weights
"""
corrdict = dict.fromkeys(source)
weights = nx.get_edge_attributes(graph, 'weight')
max_weight = max(weights.values())
weights = {k: v / max_weight for k, v in weights.items()}
rev_weights = dict()
for key in weights:
newkey = (key[1], key[0])
rev_weights[newkey] = weights[key]
weights = {**weights, **rev_weights}
# first scale edge weights
for node in source:
targets = list(graph.nodes)
corrdict[node] = dict()
for target in targets:
try:
shortest_paths = list(nx.all_shortest_paths(graph, source=node, target=target))
total_weight = 0
for path in shortest_paths:
edge_weight = 1
for i in range(len(path) - 1):
edge_weight *= weights[(path[i], path[i + 1])]
total_weight += edge_weight
total_weight = total_weight / len(shortest_paths)
except nx.exception.NetworkXNoPath:
if verbose:
logger.warning("Could not find shortest path for: " + target)
total_weight = -1
corrdict[node][target] = total_weight
return corrdict
def _remove_node(loc, mat, mat_index):
"""
Given an outlier node to remove,
this function updates the matrix and matrix index.
Parameters
----------
:param loc: Node(s) to remove
:param mat: Scoring matrix
:param mat_index: Matrix index
:return: Tuple of matrix and matrix_index
"""
mat = np.delete(mat, loc, axis=0)
mat = np.delete(mat, loc, axis=1)
if type(loc) == list:
loc.sort()
else:
loc = [loc]
for i in range(len(loc)):
item = loc[i]
mat_index.pop(item)
# need mat_index remove last key if this is not the clusloc
if item != (len(mat_index)):
for remainder in range(item, len(mat_index)):
mat_index[remainder] = mat_index[remainder + 1]
mat_index.pop(len(mat_index) - 1)
loc = [x - 1 for x in loc]
# update scoring matrix with removed nodes
return mat, mat_index
def _cluster_vector(assignment, adj_index):
"""
Given a dictionary of cluster assignments,
this helper function returns a vector with cluster IDs.
Parameters
----------
:param assignment: Dictionary with nodes as keys, cluster IDs as values
:param adj_index: Dictionary with nodes as keys, matrix index as values
:return: Numpy array with cluster assignments
"""
result = np.zeros(shape=(1, len(assignment)))[0]
for key in adj_index:
result[adj_index[key]] = assignment[key]
return result