Source code for skmatter.clustering._quick_shift

from typing import Callable, Optional

import numpy as np
from sklearn.base import BaseEstimator
from tqdm import tqdm

from ..metrics._pairwise import periodic_pairwise_euclidean_distances


[docs] class QuickShift(BaseEstimator): """Conducts quick shift clustering. This class is used to implement the quick shift clustering algorithm, which is used in probabilistic analysis of molecular motifs (PAMM). There are two ways of searching the next point: (1) search for the point within the given distance cutoff and (2) search for the point within the given number of neighbor shell of the Gabriel graph. If both of them are set, the distance cutoff is used. Parameters ---------- dist_cutoff_sq : float, default=None The squared distance cutoff for searching for the next point. Two points are considered as neighbors if they are within this distance. If :obj:`None`, the scheme of Gabriel graph is used. gabriel_shell : int, default=None The number of neighbor shell of Gabriel graph for searching for the next point. For example, if the number is 1, two points will be considered as neighbors if they have at least one common neighbor, like for the case "A-B-C", we will consider "A-C" as neighbors. If the number is 2, for the case "A-B-C-D", we will consider "A-D" as neighbors. If :obj:`None`, the scheme of distance cutoff is used. scale : float, default=1.0 Distance cutoff scaling factor used during the QS clustering. It will be squared since the squared distance is used in this class. metric : Callable, default=None The metric to use. Your metric should be able to take at least three arguments in secquence: `X`, `Y`, and `squared=True`. Here, `X` and `Y` are two array-like of shape (n_samples, n_components). The return of the metric is an array-like of shape (n_samples, n_samples). If you want to use periodic boundary conditions, be sure to provide the cell length in the ``metric_params`` and provide a metric that can take the cell argument. If :obj:`None`, the :func:`skmatter.metrics.periodic_pairwise_euclidean_distances()` is used. metric_params : dict, default=None Additional parameters to be passed to the use of metric. i.e. the dimension of a rectangular cell of side length :math:`a_i` for :func:`skmatter.metrics.periodic_pairwise_euclidean_distances()` ``{'cell_length': [a_1, a_2, ..., a_n]}`` Attributes ---------- labels_ : numpy.ndarray An array of labels for each input data. cluster_centers_idx_ : numpy.ndarray An array of indices of cluster centers. cluster_centers_ : numpy.ndarray An array of cluster centers. Examples -------- >>> import numpy as np >>> from skmatter.clustering import QuickShift Create some points and their weights for quick shift clustering >>> feature1 = np.array([-1.72, -4.44, 0.54, 3.19, -1.13, 0.55]) >>> feature2 = np.array([-1.32, -2.13, -2.43, -0.49, 2.33, 0.18]) >>> points = np.vstack((feature1, feature2)).T >>> weights = np.array([-3.94, -12.68, -7.07, -9.03, -8.26, -2.61]) Set cutoffs for seraching >>> cuts = np.array([6.99, 8.80, 7.68, 9.51, 8.07, 6.22]) Do the clustering >>> model = QuickShift(cuts).fit(points, samples_weight=weights) >>> print(model.labels_) [0 0 0 5 5 5] >>> print(model.cluster_centers_idx_) [0 5] We can also apply a periodic boundary condition >>> model = QuickShift(cuts, metric_params={"cell_length": [3, 3]}) >>> model = model.fit(points, samples_weight=weights) >>> print(model.labels_) [5 5 5 5 5 5] >>> print(model.cluster_centers_idx_) [5] Since the searching cuts are all larger than the maximum distance in the PBC box, it can be expected that all points are assigned to the same cluster, of the center that has the largest weight. """ def __init__( self, dist_cutoff_sq: Optional[float] = None, gabriel_shell: Optional[int] = None, scale: float = 1.0, metric: Optional[Callable] = None, metric_params: Optional[dict] = None, ): if (dist_cutoff_sq is None) and (gabriel_shell is None): raise ValueError("Either dist_cutoff or gabriel_depth must be set.") self.dist_cutoff_sq = dist_cutoff_sq self.gabriel_shell = gabriel_shell self.scale = scale if self.dist_cutoff_sq is not None: self.dist_cutoff_sq *= self.scale**2 self.metric_params = ( metric_params if metric_params is not None else {"cell_length": None} ) if metric is None: metric = periodic_pairwise_euclidean_distances self.metric = lambda X, Y: metric(X, Y, squared=True, **self.metric_params) if isinstance(self.metric_params, dict): self.cell = self.metric_params["cell_length"] else: self.cell = None def fit(self, X, y=None, samples_weight=None): """Fit the model using X as training data and y as target values. Parameters ---------- X : array-like of shape (n_samples, n_features) Training data, where `n_samples` is the number of samples and `n_features` is the number of features. Y : None Ignored. This parameter exists only for compatibility with :class:`~sklearn.pipeline.Pipeline`. samples_weight : array-like of shape (n_samples,), default=None List of sample weights attached to the data X. This parameter must be given in order to do the quick shift clustering. """ if (self.cell is not None) and (X.shape[1] != len(self.cell)): raise ValueError( "Dimension of the cell length does not match the data dimension." ) dist_matrix = self.metric(X, X) np.fill_diagonal(dist_matrix, np.inf) if self.dist_cutoff_sq is None: gabrial = _get_gabriel_graph(dist_matrix) idmindist = np.argmin(dist_matrix, axis=1) idxroot = np.full(dist_matrix.shape[0], -1, dtype=int) for i in tqdm(range(dist_matrix.shape[0]), desc="Quick-Shift"): if idxroot[i] != -1: continue qspath = [] qspath.append(i) current = qspath[-1] while current != idxroot[current]: if self.gabriel_shell is not None: idxroot[current] = self._gs_next( current, samples_weight, dist_matrix, gabrial ) else: idxroot[current] = self._qs_next( current, idmindist[current], samples_weight, dist_matrix, self.dist_cutoff_sq[current], ) if idxroot[idxroot[current]] != -1: # Found a path to a root break qspath.append(idxroot[current]) current = qspath[-1] idxroot[qspath] = idxroot[idxroot[current]] self.labels_ = idxroot self.cluster_centers_idx_ = np.concatenate( np.argwhere(idxroot == np.arange(dist_matrix.shape[0])) ) self.cluster_centers_ = X[self.cluster_centers_idx_] return self def _gs_next( self, idx: int, probs: np.ndarray, distmm: np.ndarray, gabriel: np.ndarray, ): """Find next cluster in Gabriel graph.""" ngrid = len(probs) neighs = np.copy(gabriel[idx]) for _ in range(1, self.gabriel_shell): nneighs = np.full(ngrid, False) for j in range(ngrid): if neighs[j]: # j can be accessed from idx # j's neighbors can also be accessed from idx nneighs |= gabriel[j] neighs |= nneighs next_idx = idx dmin = np.inf for j in range(ngrid): if probs[j] > probs[idx] and distmm[idx, j] < dmin and neighs[j]: # find the closest neighbor next_idx = j dmin = distmm[idx, j] return next_idx def _qs_next( self, idx: int, idxn: int, probs: np.ndarray, distmm: np.ndarray, cutoff: float ): """Find next cluster with respect to cutoff.""" ngrid = len(probs) dmin = np.inf next_idx = idx if probs[idxn] > probs[idx]: next_idx = idxn for j in range(ngrid): if probs[j] > probs[idx] and distmm[idx, j] < min(dmin, cutoff): next_idx = j dmin = distmm[idx, j] return next_idx
def _get_gabriel_graph(dist_matrix_sq: np.ndarray): """ Generate the Gabriel graph based on the given squared distance matrix. Parameters ---------- dist_matrix_sq : np.ndarray The squared distance matrix of shape (n_points, n_points). Returns ------- np.ndarray The Gabriel graph matrix of shape (n_points, n_points). """ n_points = dist_matrix_sq.shape[0] gabriel = np.full((n_points, n_points), True) for i in tqdm(range(n_points), desc="Calculating Gabriel graph"): gabriel[i, i] = False for j in range(i, n_points): if np.sum(dist_matrix_sq[i] + dist_matrix_sq[j] < dist_matrix_sq[i, j]): gabriel[i, j] = False gabriel[j, i] = False return gabriel