# Pyrea: Multi-view clustering with deep ensemble structures
# Copyright (C) 2022 Marcus D. Bloice, Bastian Pfeifer
#
#
# Licenced under the terms of the MIT license.
#
# structure.py
# Contains the classes required for the structuring of ensembles, for
# example Views, Ensembles, Clusterers, and so on.
"""
The :py:mod:`pyrea.structure` module contains the classes used for the internal functionaliy
of Pyrea. The classes contained here are not generally called or instantiated
by the user, see the :py:mod:`pyrea.core` module for the user-facing API.
Developers who wish to extend Pyrea, such as by creating a custom clustering
algorthim, should consult the documentation of the :class:`Clusterer` abstract
base class for example. The :class:`Fusion` class is another such abstract base
class that must be used if a developer wishes to create a custom fusion
algorithm for use within Pyrea.
"""
import numpy as np
from sklearn.cluster import AgglomerativeClustering, SpectralClustering, DBSCAN, OPTICS
from typing import List, Union, Any
from scipy.cluster import hierarchy
from scipy import spatial
[docs]class Clusterer(object):
"""
:class:`Clusterer` is the Abstract Base Class for all clustering algorithms.
All clustering algorithms must be a subclass of this class in order to
accepted by functions such as :func:`~pyrea.core.execute_ensemble()`.
To extend Pyrea with a custom clustering algorithm, create a new
class that is a subclass of :class:`Clusterer`, and implement the
:func:`Clusterer.execute` function.
"""
def __init__(self) -> None:
pass
[docs] def execute(self) -> list:
"""
Execute the clustering algorithm with the given :attr:`data`.
"""
pass
[docs]class HierarchicalClusteringPyrea(Clusterer):
def __init__(self, precomputed,
# linkage arguments:
method='single',
metric='euclidean',
optimal_ordering=False,
# pdist arguments:
distance_metric = 'euclidean',
out=None,
# cut_tree arguments:
n_clusters=None,
height=None
) -> None:
super().__init__()
self.n_clusters = n_clusters
self.metric = metric
self.method = method
self.optimal_ordering = optimal_ordering
self.distance_metric = distance_metric
self.precomputed = precomputed
self.out = out
self.height = height
[docs] def execute(self, data) -> list:
super().execute()
# TODO: Both pdist and linkage can take a distance metric.
# It must be possible for the user to provide both.
# Currently distance_metric is used for pdist although this then
# breaks compatibility with the SciPy docs. Fix.
if self.precomputed:
y = spatial.distance.squareform(data)
if self.method == 'ward2':
y = y**2
tree = hierarchy.linkage(y, method=self.method, metric=self.metric)
else:
y = spatial.distance.pdist(data, metric=self.distance_metric, out=self.out)
if self.method == 'ward2':
y = y**2
tree = hierarchy.linkage(y, method=self.method, metric=self.metric)
return hierarchy.cut_tree(tree, n_clusters=self.n_clusters, height=self.height)
[docs]class AgglomerativeClusteringPyrea(Clusterer):
def __init__(self, n_clusters=2,
linkage: str='ward',
affinity: str='euclidean',
memory: Union[None, Any]=None,
connectivity=None,
compute_full_tree='auto',
distance_threshold=None,
compute_distances=False) -> None:
"""
Perform agglomerative clustering.
See https://scikit-learn.org/stable/modules/generated/sklearn.cluster.AgglomerativeClustering.html
"""
super().__init__()
self.n_clusters = n_clusters
self.linkage = linkage
self.affinity = affinity
self.memory = memory
self.connectivity = connectivity
self.compute_full_tree = compute_full_tree
self.distance_threshold = distance_threshold
self.compute_distances = compute_distances
[docs] def execute(self, data: list) -> list:
super().execute()
return AgglomerativeClustering(n_clusters = self.n_clusters,
linkage=self.linkage,
affinity=self.affinity,
memory=self.memory,
connectivity=self.connectivity,
compute_full_tree=self.compute_full_tree,
distance_threshold=self.distance_threshold,
compute_distances=self.compute_distances).fit(data).labels_
[docs]class SpectralClusteringPyrea(Clusterer):
def __init__(self, n_clusters=8,
eigen_solver=None,
n_components=None,
random_state=None,
n_init=10,
gamma=1.0,
affinity='rbf',
n_neighbors=10,
eigen_tol=0.0,
assign_labels='kmeans',
degree=3,
coef0=1,
kernel_params=None,
n_jobs=None,
verbose=False) -> None:
"""
Perform spectral clustering.
See: https://scikit-learn.org/stable/modules/generated/sklearn.cluster.SpectralClustering.html
"""
super().__init__()
self.n_clusters = n_clusters
self.eigen_solver = eigen_solver
self.n_components = n_components
self.random_state = random_state
self.n_init = n_init
self.gamma = gamma
self.affinity = affinity
self.n_neighbors = n_neighbors
self.eigen_tol = eigen_tol
self.assign_labels = assign_labels
self.degree = degree
self.coef0 = coef0
self.kernel_params = kernel_params
self.n_jobs = n_jobs
self.verbose = verbose
def execute(self, data: list) -> list:
return SpectralClustering(n_clusters=self.n_clusters,
eigen_solver=self.eigen_solver,
n_components=self.n_components,
random_state=self.random_state,
n_init=self.n_init,
gamma=self.gamma,
affinity=self.affinity,
n_neighbors=self.n_neighbors,
eigen_tol=self.eigen_tol,
assign_labels=self.assign_labels,
degree=self.degree,
coef0=self.coef0,
kernel_params=self.kernel_params,
n_jobs=self.n_jobs,
verbose=self.verbose).fit().labels_
[docs]class DBSCANPyrea(Clusterer):
def __init__(self, eps=0.5,
min_samples=5,
metric='euclidean',
metric_params=None,
algorithm='auto',
leaf_size=30,
p=None,
n_jobs=None) -> None:
super().__init__()
self.eps = eps
self.min_samples = min_samples
self.metric = metric
self.metric_params = metric_params
self.algorithm = algorithm
self.leaf_size = leaf_size
self.p = p
self.n_jobs = n_jobs
[docs] def execute(self, data) -> list:
DBSCAN(eps=self.eps,
min_samples=self.min_samples,
metric=self.metric,
metric_params=self.metric_params,
algorithm=self.algorithm,
leaf_size=self.leaf_size,
p=self.p,
n_jobs=self.n_jobs).fit(data).labels_
[docs]class OPTICSPyrea(Clusterer):
def __init__(self, min_samples=5,
max_eps=np.inf,
metric='minkowski',
p=2,
metric_params=None,
cluster_method='xi',
eps=None,
xi=0.05,
predecessor_correction=True,
min_cluster_size=None,
algorithm='auto',
leaf_size=30,
# memory=None,
n_jobs=None) -> None:
super().__init__()
self.max_eps = max_eps
self.min_samples = min_samples
self.min_cluster_size = min_cluster_size
self.algorithm = algorithm
self.metric = metric
self.metric_params = metric_params
self.p = p
self.leaf_size = leaf_size
self.cluster_method = cluster_method
self.eps = eps
self.xi = xi
self.predecessor_correction = predecessor_correction
# self.memory = memory # TODO: This must be a new parameter since some version. Check.
self.n_jobs = n_jobs
[docs] def execute(self, data: list) -> list:
return OPTICS(max_eps=self.max_eps,
min_samples=self.min_samples,
min_cluster_size=self.min_cluster_size,
algorithm=self.algorithm,
metric=self.metric,
metric_params=self.metric_params,
p=self.p,
leaf_size=self.leaf_size,
cluster_method=self.cluster_method,
eps=self.eps,
xi=self.xi,
predecessor_correction=self.predecessor_correction,
# memory = self.memory,
n_jobs=self.n_jobs
).fit(data).labels_
[docs]class Fusion(object):
def __init__(self) -> None:
"""
:class:`Fusion` is the Abstract Base Class for all fusion algorithms.
All fusion algorithms must be a subclass of this class in order to
accepted by functions such as :func:`~pyrea.core.execute_ensemble()`.
To extend Pyrea with a custom fusion algorithm, create a new
class that is a subclass of :class:`Fusion`, and implement the
:func:`Fusion.execute` function.
"""
pass
[docs] def execute(self, views: list) -> list:
"""
Execute the fusion algorithm on the provided :attr:`views`.
"""
# TODO: Fix views type to List[View] (requires reshuffle of class order)
pass
[docs]class Parea(Fusion):
"""
Parea fusion algorithm. This functionality is not yet implemented.
"""
def __init__(self) -> None:
super().__init__()
raise Exception("Not yet implemented.")
[docs] def execute(self, views: list) -> list:
"""
Performs the fusion of a set of views.
Not yet implemented.
"""
# TODO: Check name, is it HCfused?
pass
[docs]class Disagreement(Fusion):
"""
Disagreement fusion function.
Creates the disagreement of two clusterings.
"""
def __init__(self) -> None:
super().__init__()
[docs] def execute(self, views: list) -> list:
"""
Executes the disagreement fusion algorithm on the provided clusterings,
:attr:`views`.
"""
n = len(views[0])
labels = np.zeros((n, n), dtype=int)
for i in range(0, len(views)):
l = views[i]
res = [[int(x != y) for y in l] for x in l]
res = np.matrix(res)
labels = labels + res
# return np.fill_diagonal(labels, 0)
return labels
[docs]class Agreement(Fusion):
"""
Agreement fusion function.
Creates the agreement of two clusterings.
"""
def __init__(self) -> None:
super().__init__()
# TODO: Rename paramter to labels
[docs] def execute(self, views: list) -> list:
"""
Executes the agreement fusion algorithm on the provided clusterings,
:attr:`views`.
"""
n_samp = len(views[0])
labels = np.zeros((n_samp, n_samp), dtype=int)
for i in range(0, len(views)):
l = views[i]
res = [[int(x == y) for y in l] for x in l]
res = np.matrix(res)
labels = labels + res
return np.fill_diagonal(labels, 0)
[docs]class Consensus(Fusion):
"""
Consensus fusion function.
Creates the consensus of two clusterings.
"""
def __init__(self) -> None:
super().__init__()
[docs] def execute(self, views: list):
"""
Executes the consensus fusion algorithm on the provided clusterings,
:attr:`views`.
"""
# Start consensus
n_samp = len(views[0])
cl_cons = np.zeros((n_samp,), dtype=int)
n_cl = len(views)
k = 1
for xx in range(0, n_samp):
ids = np.where(views[0] == views[0][xx])
for yy in range(1, n_cl):
m = np.where(views[yy] == views[yy][xx])
ids = np.intersect1d(ids, m)
check = np.sum(cl_cons[ids])
if check == 0:
cl_cons[ids] = k
k = k + 1
# End consensus
# Calculate binary matrix
mat_bin = np.zeros((n_samp, n_samp), dtype=int)
for xx in range(0, n_samp):
ids = np.where(cl_cons == cl_cons[xx])
mat_bin[xx, ids] = 1
mat_bin[ids, xx] = 1
return(mat_bin)
[docs]class View(object):
"""
Represents a :class:`View`, which consists of some :attr:`data` and a
clustering algorithm, :attr:`clusterer`.
Requires a data source, :attr:`data`, which is used to create the
view (the data source can be a Python matrix (a list of lists), a
NumPy 2D array, or a Pandas DataFrame) and a clustering method
:attr:`clusterer`.
Some examples follow (using a list of lists)::
import pyrea
data = [[1, 5, 3, 7],
[4, 2, 9, 4],
[8, 6, 1, 9],
[7, 1, 8, 1]]
v = pyrea.view(data, pyrea.cluster('ward'))
Or by passing a Pandas DataFrame (``pandas.core.frame.DataFrame``)::
import pyrea
import pandas
data = pandas.read_csv('iris.csv')
v = pyrea.view(data, pyrea.cluster('ward'))
Or (passing a numpy 2d array or matrix (``numpy.matrix`` or ``numpy.ndarray``))::
import pyrea
import numpy
data = numpy.random.randint(0, 10, (4,4))
v = pyrea.view(data, pyrea.cluster('ward'))
.. seealso:: The :class:`Clusterer` class.
:param data: The data from which to create your :class:`View`.
:param clusterer: The clustering algorithm to use to cluster your
:attr:`data`
:ivar labels: Contains the calculated labels when the :attr:`clusterer`
is run on the :attr:`data`.
"""
def __init__(self, data, clusterer: List[Clusterer]) -> None:
self.data = np.asmatrix(data)
self.clusterer = clusterer
self.labels = None
# Numpy matrices can have max 2 dimensions, but can have 1 dimension.
# If this needs to be checked revert to above below.
#if data.ndim != 2:
# raise Exception("Number of dimensions is not 2: you supplied a data structure with %s dimensions." % data.ndim)
[docs] def execute(self) -> list:
"""
Clusters the :attr:`data` using the :attr:`clusterer` specified at
initialisation.
"""
# TODO: If a list is passed, then we need to execute them all.
self.labels = self.clusterer.execute(self.data)
return self.labels
[docs]class Ward(Clusterer):
"""
Implements the 'Ward' clustering algorithm.
"""
def __init__(self) -> None:
super().__init__()
raise NotImplementedError("Deprecated.")
[docs] def execute(self, data):
"""
Perform the clustering and return the results.
"""
return AgglomerativeClustering().fit(data).labels_
[docs]class Complete(Clusterer):
"""
Implements the 'complete' clustering algorithm.
"""
def __init__(self) -> None:
super().__init__()
raise NotImplementedError("Deprecated.")
[docs] def execute(self, data):
"""
Perform the clustering and return the results.
"""
return AgglomerativeClustering(linkage='complete').fit(data).labels_
[docs]class Average(Clusterer):
"""
Implements the 'average' clustering algorithm.
"""
def __init__(self) -> None:
super().__init__()
raise NotImplementedError("Deprecated.")
[docs] def execute(self, data):
"""
Perform the clustering and return the results.
"""
return AgglomerativeClustering(linkage='average').fit(data).labels_
[docs]class Single(Clusterer):
"""
Implements the 'single' clustering algorithm.
"""
def __init__(self) -> None:
super().__init__()
raise NotImplementedError("Deprecated.")
[docs] def execute(self, data):
"""
Perform the clustering and return the results.
"""
return AgglomerativeClustering(linkage='single').fit(data).labels_
[docs]class Ensemble(object):
"""
The Ensemble class encapsulates the views, fusion algorithm
and clustering methods required to perform a multi-view clustering.
:param views: The views that constitute the ensemble's multi-view data.
:param fuser: The fusion algorithm to use.
:param clusterers: The clustering algorithms to use on the fused matrix.
"""
def __init__(self, views: List[View], fuser: Fusion):
if isinstance(views, View):
self.views = [views]
elif isinstance(views, list):
self.views = views
self.fuser = fuser
self.labels = []
[docs] def execute(self):
"""
Executes the ensemble, returning a :class:`View` object.
The new :class:`View` can then be passed to subsequent ensembles.
"""
# Execute each view's clustering algorithm on its data
for v in self.views:
self.labels.append(v.execute())
# Fuse the clusterings to a single fused matrix
fusion_matrix = self.fuser.execute(self.labels)
return fusion_matrix