Source code for skmatter.sample_selection._voronoi_fps

import numbers
from time import time

import numpy as np
from sklearn.utils import check_random_state

from .._selection import GreedySelector


[docs] class VoronoiFPS(GreedySelector): """ In FPS, points are selected based upon their Hausdorff distance to previous selections, i.e. the minimum distance between a given point and any previously selected points. This implicitly constructs a Voronoi tessellation which is updated with each new selection, as each unselected point "belongs" to the Voronoi polyhedron of the nearest previous selection. This implicit tessellation enabled a more efficient evaluation of the FPS -- at each iteration, we need only consider for selection those points at the boundaries of the Voronoi polyhedra, and when updating the tessellation we need only consider moving those points whose Hausdorff distance is greater than half of the distance between the corresponding Voronoi center and the newly selected point, per the triangle equality. .. image:: VoronoiFPS-Schematic.pdf To demonstrate the algorithm behind Voronoi FPS, let :math:`*_{m+1}` be a new chosen point, :math:`v(j)` was chosen earlier, :math:`j` is a point in the polyhedron with center :math:`v(j)`. From the inequalities of the triangle one can easily see that if :math:`d(v(j),j)<d(*_{m+1}, j)/2`, then point :math:`j` is guaranteed not to fall into the formed polyhedron and the distance to it can be uncalculated. This algorithm is particularly appealing when using a non-Euclidean or computationally-intensive distance metric, for which the decrease in computational time due to the reduction in distance calculations outweighs the increase from book-keeping. For simple metrics (such as Euclidean distance), VoronoiFPS will likely not accelerate, and may decelerate, computations when compared to FPS. Parameters ---------- n_trial_calculation: int, default=4 Number of calculations used for the switching point between Voronoi FPS and traditional FPS (for detail look at full_fraction). full_fraction: float, default=None Proportion of calculated distances from the total number of features at which the switch from Voronoi FPS to FPS occurs. At a certain number of distances to be calculated, the use of Voronoi FPS becomes unreasonably expensive due to the associated costs connected with reading data from the memory. The switching point depends on many conditions, and it is determined "in situ" for optimal use of the algorithm. Determination is done with a few test calculations and memory operations. Examples -------- >>> from skmatter.sample_selection import VoronoiFPS >>> selector = VoronoiFPS( ... n_to_select=2, ... progress_bar=True, ... score_threshold=1e-12, ... full=False, ... # n_trial_calculation used for calculation of full_fraction, ... # so you need to determine only one parameter ... n_trial_calculation=4, ... full_fraction=0.45, ... # int or 'random', default=0 ... # Index of the first selection. ... # If 'random', picks a random value when fit starts. ... initialize=0, ... ) >>> X = np.array( ... [ ... [0.12, 0.21, 0.02], # 3 samples, 3 features ... [-0.09, 0.32, -0.10], ... [-0.03, -0.53, 0.08], ... ] ... ) >>> selector.fit(X) VoronoiFPS(full_fraction=0.45) """ def __init__( self, n_trial_calculation=4, full_fraction=None, initialize=0, **kwargs ): self.n_trial_calculation = n_trial_calculation self.full_fraction = full_fraction self.initialize = initialize super().__init__(selection_type="sample", **kwargs)
[docs] def score(self, X=None, y=None): """ Returns the Hausdorff distances of all samples to previous selections NOTE: This function does not compute the importance score each time it is called, in order to avoid unnecessary computations. The hausdorff distance is updated in :py:func:`self._update_post_selection` Parameters ---------- X : ignored y : ignored Returns ------- hausdorff : Hausdorff distances """ return self.hausdorff_
[docs] def get_distance(self): r"""Traditional FPS employs a column-wise Euclidean distance for feature selection, which can be expressed using the covariance matrix :math:`\mathbf{C} = \mathbf{X} ^ T \mathbf{X}`. .. math:: \operatorname{d}_c(i, j) = C_{ii} - 2 C_{ij} + C_{jj}. For sample selection, this is a row-wise Euclidean distance, which can be expressed in terms of the Gram matrix :math:`\mathbf{K} = \mathbf{X} \mathbf{X} ^ T` .. math:: \operatorname{d}_r(i, j) = K_{ii} - 2 K_{ij} + K_{jj}. Returns ------- hausdorff : numpy.ndarray of shape (`n_to_select_from_`) the minimum distance from each point to the set of selected points. once a point is selected, the distance is not updated; the final list will reflect the distances when selected. """ return self.hausdorff_
[docs] def get_select_distance(self): """ Returns ------- hausdorff_at_select : numpy.ndarray of shape (`n_to_select`) at the time of selection, the minimum distance from each selected point to the set of previously selected points. """ mask = self.get_support(indices=True, ordered=True) return self.hausdorff_at_select_[mask]
def _init_greedy_search(self, X, y, n_to_select): """ Initializes the search. Prepares an array to store the selected features. This function also determines the switching point if it was not given during initialization. The point is that when calculating distances in Voronoi_FPS it is necessary to "jump" between indexes in the array. This results in a significant increase in time. Therefore, if you need to recalculate a large number of distances, it is more advantageous to run simple calculation along the whole matrix. """ n_to_select_from = X.shape[0] self.vlocation_of_idx = np.full(n_to_select_from, 1) # index of the voronoi cell associated with each of the columns of X self.dSL_ = np.zeros(self.n_to_select, float) # distance between new selected point and previously # selected points if self.full_fraction is None: simple_fps_timing = -time() if not isinstance(self.n_trial_calculation, numbers.Integral): raise TypeError("Number of trial calculation should be integer") if self.n_trial_calculation == 0: raise ValueError( "Number of trial calculation should be more or equal to 1" ) for _ in range(self.n_trial_calculation): X @ X[[0]].T simple_fps_timing += time() simple_fps_timing /= self.n_trial_calculation lower_fraction = 0 top_fraction = 1 while top_fraction - lower_fraction > 0.01: voronoi_fps_times = np.zeros(self.n_trial_calculation) self.full_fraction = (top_fraction + lower_fraction) / 2 for i in range(self.n_trial_calculation): random_state = check_random_state(self.random_state) sel = random_state.randint( n_to_select_from, size=int(n_to_select_from * self.full_fraction), ) voronoi_fps_times[i] = -time() _ = X[0] @ X[sel].T voronoi_fps_times[i] += time() voronoi_fps_timing = np.sum(voronoi_fps_times) voronoi_fps_timing /= self.n_trial_calculation if voronoi_fps_timing < simple_fps_timing: lower_fraction = self.full_fraction else: top_fraction = self.full_fraction self.full_fraction = lower_fraction else: if isinstance(self.full_fraction, numbers.Real): if not 0 < self.full_fraction <= 1: raise ValueError( "Switching point should be real and more than 0 and less than " f"1. Received {self.full_fraction}" ) else: raise ValueError( "Switching point should be real and more than 0 and less than 1. " f"Received {self.full_fraction}" ) super()._init_greedy_search(X, y, n_to_select) self.norms_ = (X**2).sum(axis=abs(self._axis - 1)) if self.initialize == "random": random_state = check_random_state(self.random_state) initialize = random_state.randint(X.shape[self._axis]) elif isinstance(self.initialize, numbers.Integral): initialize = self.initialize else: raise ValueError("Invalid value of the initialize parameter") self.selected_idx_[0] = initialize self.hausdorff_ = np.full(X.shape[self._axis], np.inf) self.hausdorff_at_select_ = np.full(X.shape[self._axis], np.inf) self._update_post_selection(X, y, self.selected_idx_[0]) def _continue_greedy_search(self, X, y, n_to_select): """Continues the search. Prepares an array to store the selected features. """ super()._continue_greedy_search(X, y, n_to_select) n_pad = n_to_select - self.n_selected_ self.dSL_ = np.pad(self.dSL_, (0, n_pad), "constant", constant_values=0) def _get_active(self, X, last_selected): """ Finds the indices of the Voronoi cells that might change due to the fact that a new point has been selected (index is last_selected, because we haven't called super() yet, self.n_selected_ has not been incremented yet) This function calculates the distances between the last selected point and the previously selected points. Next, the list of points to which the distance is to be calculated is shortened. These are the points for which need to recompute distances. Let: L is the last point selected; S are the selected points from before this iteration; X are the candidates; The logic here is that we want to check if d(XL) can be smaller than min(d(X,S)) (which is stored in self.hausdorff_) now, if a point belongs to the Voronoi cell of S then min(d(X,S_i))=d(X,S). Triangle inequality implies that d(S,L) < |d(X,S) + d(L,X)| so we just need to check if |d(X,S) - d(S,L)|>= d(X,S) to know that we don't need to check X. but |d(X,S) - d(S,L)|^2>= d(X,S)^2 if and only if d(S,L)/2 > d(S,X) """ if not hasattr(self, "n_selected_") or self.n_selected_ == 0: return np.arange(X.shape[0], dtype=int) else: self.dSL_[: self.n_selected_] = ( self.norms_[self.selected_idx_[: self.n_selected_]] + self.norms_[last_selected] - 2 * (self.X_selected_[: self.n_selected_] @ X[last_selected].T) ) * 0.25 # calculation in a single block active_points = np.where( self.dSL_[self.vlocation_of_idx] < self.hausdorff_ )[0] return active_points def _update_post_selection(self, X, y, last_selected): """Saves the most recently selected feature, increments the feature counter and update the hausdorff distances Let: L is the last point selected; S are the selected points from before this iteration; X is the one active point; This function calculates d(L, X) and checks the condition d(L, X)< min d(X, S_i). If so, we move X to a new polyhedron. If the number of active points is too high, it is faster to calculate the distances between L and all the points in the dataset. """ self.hausdorff_at_select_[last_selected] = self.hausdorff_[last_selected] active_points = self._get_active(X, last_selected) if len(active_points) > 0: if len(active_points) / X.shape[0] > self.full_fraction: self.new_dist_ = ( self.norms_ + self.norms_[last_selected] - 2 * X[last_selected] @ X.T ) else: self.new_dist_ = self.hausdorff_.copy() self.new_dist_[active_points] = ( self.norms_[active_points] + self.norms_[last_selected] - 2 * X[last_selected] @ X[active_points].T ) self.new_dist_[last_selected] = 0 updated_points = np.where(self.new_dist_ < self.hausdorff_)[0] np.minimum( self.hausdorff_, self.new_dist_, self.hausdorff_, casting="unsafe" ) else: updated_points = np.array([]) if len(updated_points) > 0: self.vlocation_of_idx[updated_points] = self.n_selected_ self.vlocation_of_idx[last_selected] = self.n_selected_ super()._update_post_selection(X, y, last_selected)