Source code for auswahl._base


from __future__ import annotations

import numpy as np

from abc import ABCMeta, abstractmethod

from typing import Union, Tuple, List
from functools import cached_property

import sklearn.base
from sklearn import clone
from sklearn.base import BaseEstimator
from sklearn.cross_decomposition import PLSRegression
from sklearn.feature_selection import SelectorMixin
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import cross_val_score
from sklearn.utils import check_scalar
from sklearn.utils.validation import check_is_fitted
from numpy.random import RandomState

from functools import wraps


[docs]class FeatureDescriptor: """ The class FeatureDescriptor abstracts the configuration of features the selection methods are to retrieve from the spectral data. The FeatureDescriptor wraps either a number of arbitrary features to be selected or a specific number of intervals of features of a fix length. Parameters ---------- key: int, Tuple[int, int], FeatureDescriptor Feature configuration to be abstracted by the object. A single integer is interpreted as a number of arbitrarily selectable features. A tuple is a (#intervals, width of intervals) configuration of features to be selected. If a FeatureDescriptor is passed, it is copied. All passed integers are required to be non-negative. resolve_intervals: bool, default=False Flag indicating whether interval feature configurations are to be resolved to a single integer of arbitrary features to be selected. Attributes ---------- org_key: int, Tuple[int, int] Originally passed feature configuration key: int, Tuple[int, int] Resolved key. Equal to org_key, if org_key is not a tuple or if argument resolve_tuples is False resolve_intervals: bool Passed argument resolve_intervals """
[docs] def __init__(self, key: Union[int, Tuple[int, int], FeatureDescriptor], resolve_intervals: bool = False): if isinstance(key, FeatureDescriptor): self._build_from_descriptor(key) else: self._check_consistency(key) self.org_key = key self.key = self._resolve_intervals(key, resolve_intervals) self.resolve_tuples = resolve_intervals
def _build_from_descriptor(self, descriptor): self.key = descriptor.key self.org_key = descriptor.org_key self.resolve_tuples = descriptor.resolve_tuples def __len__(self): return self.comparator[0] @cached_property def string_rep(self): """ Provides a printing representation for the FeatureDescriptor printing interval configurations as number of intervals and interval width separated via a forward slash. Returns ------- string representation: str """ if isinstance(self.key, int): return str(self.key) return f'{self.key[0]}/{self.key[1]}' @cached_property def comparator(self): """Provides a feature configuration representation allowing comparison of FeatureDescriptors. """ if isinstance(self.key, int): return [self.key] return [self.key[0] * self.key[1], self.key[0], self.key[1]] # # Consistency checks # def _check_consistency(self, x): if isinstance(x, int): self._check_positive_integer(x) elif isinstance(x, tuple): self._check_diploid_positive_integer_tuple(x) else: raise ValueError(f'The specification of features requires either a positive integer' f' or a tuple of two positive integers') def _check_positive_integer(self, x): if not isinstance(x, int): raise ValueError(f'The specification of features requires integers. Got {type(x)}') if x <= 0: raise ValueError(f'The specification of features requires positive integers. Got {x}') def _check_diploid_positive_integer_tuple(self, x): if len(x) != 2: raise ValueError("Feature specification with tuples requires a tuple of length 2.") v1, v2 = x self._check_positive_integer(v1) self._check_positive_integer(v2) return x def _resolve_intervals(self, key, resolve_tuple): if resolve_tuple and isinstance(key, tuple): return key[0] * key[1] # consistency has already been checked at this point return key def __hash__(self): return self.key.__hash__() # # Comparator implementations # def _feature_cast(f): @wraps(f) def wrapper(s, x): if not isinstance(x, FeatureDescriptor): x = FeatureDescriptor(x, resolve_intervals=s.resolve_tuples) return f(s, x) return wrapper @_feature_cast def __le__(self, x: Union[FeatureDescriptor, int, Tuple[int, int]]): """A FeatureDescriptor is less or equal to another FeatureDescriptor, if it selects more features (intervals resolved to the number of constituent features) or, in case of equality, if the number of intervals is smaller. """ for i in range(len(self.comparator)): if self.comparator[i] < x.comparator[i]: return True elif self.comparator[i] > x.comparator[i]: return False return True @_feature_cast def __ge__(self, x: FeatureDescriptor): """A FeatureDescriptor is greater or equal to another FeatureDescriptor, if it selects more features (intervals resolved to the number of constituent features) or, in case of equality, if the number of intervals is larger. """ for i in range(len(self.comparator)): if self.comparator[i] > x.comparator[i]: return True elif self.comparator[i] < x.comparator[i]: return False return True # # Derived comparison functions # def __eq__(self, x: FeatureDescriptor): return self.__le__(x) and self.__ge__(x) def __gt__(self, x: FeatureDescriptor): return not self.__le__(x) def __ne__(self, x: FeatureDescriptor): return not self.__eq__(x) def __lt__(self, x: FeatureDescriptor): return not self.__ge__(x) # # Printing # def __repr__(self): return self.string_rep def __str__(self): return self.string_rep
[docs] def get_configuration_for(self, selector: SpectralSelector): """ Translate and return the feature configuration for a given :class:`~auswahl.SpectralSelector`. Parameters ---------- selector: SpectralSelector SpectralSelector instance """ if isinstance(selector, PointSelector): if self.resolve_tuples: return self.key else: # return the number of overall features to be selected return self.key[0] * self.key[1] else: # return the interval configuration return self.key[0], self.key[1]
[docs]class SpectralSelector(SelectorMixin, BaseEstimator, metaclass=ABCMeta): """ Top level base class for all Auswahl selectors. Provides subclassing of all relevant sklearn classes, common cross validationa and hyperparameter optimization functionality. Parameters ---------- model_hyperparams: dict Dictionary of hyperparameters following the sklearn convention for the estimator underlying the selection algorithm. n_cv_folds: int Number of cross validation runs during model fitting random_state: Union[int, np.random.RandomState] random state of the selector n_jobs: int, default=1 Number of threads to be used to execute the selection method """
[docs] def __init__(self, model_hyperparams: Union[dict, List[dict]], n_cv_folds: int, random_state: Union[int, RandomState] = None, n_jobs: int = 1): if model_hyperparams is not None and not isinstance(model_hyperparams, (list, dict)): raise ValueError("Keyword argument 'model_hyperparams' is expected to be of type dict or list of dicts") if not isinstance(n_cv_folds, int) or n_cv_folds <= 0: raise ValueError(f'Keyword argument "n_cv_folds" is expected to be a positive integer. Got {n_cv_folds}') self.model_hyperparams = model_hyperparams self.n_cv_folds = n_cv_folds self.random_state = random_state self.n_jobs = n_jobs
[docs] def evaluate(self, X, y, model, do_cv=True, *args): """Conduct a cross validationand hyperparameter optimization of the underlying estimator model. Parameters ---------- X: array-like, shape (n_samples, n_features) Spectral data to be fitted y: array-like, shape (n_samples,) Regression targets model: BaseEstimator Regression model do_cv: bool, default=True If True, the model is fitted to the data and a cross validation score is provided *args: arbitrary payload Arbitrary payload returned with the evaluation result. Used for instance for identification of threads, if multiple models are evaluated in parallel Returns ------- tuple: float, BaseEstimator cross validation score if requested (otherwise None) and fitted estimator """ model = PLSRegression() if model is None else clone(model) model.n_components = min(model.n_components, X.shape[1]) if self.model_hyperparams is None: # no hyperparameter optimization; conduct a simple CV cv_scores = None if do_cv: cv_scores = np.mean(cross_val_score(model, X, y, cv=self.n_cv_folds, scoring='neg_mean_squared_error')) model.fit(X, y) return cv_scores, model, *args else: cv = GridSearchCV(model, self.model_hyperparams, cv=self.n_cv_folds, scoring='neg_mean_squared_error') cv.fit(X, y) return cv.best_score_, cv.best_estimator_, *args
[docs] def fit(self, X, y, mask=None): """Run the feature selection process. Parameters ---------- X : array-like of shape (n_samples, n_features) The input samples. y : array-like of shape (n_samples,) The target values. mask: array-like of shape (n_features,) Mask indicating (values == 0), which features are not to be taken into account during the feature selection Returns ------- SpectralSelector : self Returns the instance itself. """ if mask is not None: if mask.shape != (X.shape[1],): raise ValueError(f'Expected mask to have shape {(X.shape[1],)}. Got {mask.shape}') mask_indices = np.nonzero(mask)[0] n_features = X.shape[1] X = X[:, mask_indices] X, y = self._validate_data(X, y, accept_sparse=False, ensure_min_samples=2, ensure_min_features=2) self._dispatch_fit(X, y) if mask is not None: selected = mask_indices[np.nonzero(self.support_)] self.support_ = np.zeros((n_features,), dtype=bool) self.support_[selected] = 1 return self
[docs] def get_best_estimator(self) -> sklearn.base.BaseEstimator: """Retrieve the best estimator model fitted on the selected features Returns ------- best model fitted on selected features: sklearn.base.BaseEstimator """ check_is_fitted(self) if not hasattr(self, 'best_model_'): raise NotImplementedError("Make sure, that after fit has been called on the selector, the selector " "provides the optimally configured estimator for the selected features as " "attribute 'best_model_'") return self.best_model_
[docs] def reseed(self, seed: Union[int, RandomState]): """ Random state updating interface for benchmarking. Selector methods with more complex internal structure (such as methods wrapping other methods) are required to override this function accordingly. """ self.random_state = seed
[docs] def rethread(self, n_jobs: int): """ n_jobs updating interface for benchmarking. Selector methods with more complex internal structure (such as methods wrapping other methods) are required to override this function accordingly. """ self.n_jobs = n_jobs
def _get_support_mask(self): check_is_fitted(self) return self.support_ @abstractmethod def _dispatch_fit(self, X, y): ... @abstractmethod def reparameterize(self, feature_descriptor: FeatureDescriptor): ...
[docs]class PointSelector(SpectralSelector, metaclass=ABCMeta): """Base class for feature selection methods that select features independently. Parameters ---------- n_features_to_select : int or float, default=1 Number of features to select model_hyperparams: dict Dictionary of estimator hyperparameters following the sklearn convention. n_cv_folds: int Number of cross validation runs during model fitting random_state: Union[int, np.random.RandomState] Random state of the selector n_jobs: int, default=1 Number of threads to be used to execute the selection method """
[docs] def __init__(self, n_features_to_select: Union[int, float] = 1, model_hyperparams: Union[dict, List[dict]] = None, n_cv_folds: int = 2, random_state: Union[int, RandomState] = None, n_jobs: int = 1): self.n_features_to_select = n_features_to_select super().__init__(model_hyperparams, n_cv_folds, random_state, n_jobs)
def _dispatch_fit(self, X, y): n_features_to_select = self._check_n_features_to_select(X) self._fit(X, y, n_features_to_select) @abstractmethod def _fit(self, X, y, n_features_to_select): pass def _check_n_features_to_select(self, X): n_features = X.shape[1] n_features_to_select = self.n_features_to_select if n_features_to_select is None: n_features_to_select = n_features // 2 else: check_scalar(n_features_to_select, name='n_features_to_select', target_type=(int, float)) if 0 < n_features_to_select < 1: n_features_to_select = int(n_features_to_select * n_features) if (n_features_to_select <= 0) or (n_features_to_select >= n_features): raise ValueError('n_features_to_select has to be either an int in {1, ..., n_features-1}' 'or a float in (0, 1) with (n_features_to_select*n_features) >= 1; ' f'got {self.n_features_to_select}') return n_features_to_select def reparameterize(self, feature_descriptor: FeatureDescriptor): self.n_features_to_select = feature_descriptor.get_configuration_for(self)
[docs]class IntervalSelector(SpectralSelector, metaclass=ABCMeta): """Base class for feature selection methods that select consecutive chunks (intervals) of features. Parameters ---------- n_intervals_to_select : int, default=1 Number of intervals to select. interval_width : int or float, default=1 Number of features that form an interval model_hyperparams: dict Dictionary of estimator hyperparameters following the sklearn convention. n_cv_folds: int Number of cross validation runs during model fitting random_state: Union[int, np.random.RandomState] Random state of the selector n_jobs: int, default=1 Number of threads to be used to execute the selection method """
[docs] def __init__(self, n_intervals_to_select: int = 1, interval_width: Union[int, float] = 1, n_cv_folds: int = 1, model_hyperparams: Union[dict, List[dict]] = None, random_state: Union[int, RandomState] = None, n_jobs: int = 1): self.n_intervals_to_select = n_intervals_to_select self.interval_width = interval_width super().__init__(model_hyperparams, n_cv_folds, random_state, n_jobs)
def _dispatch_fit(self, X, y): self._check_n_intervals_to_select(X) interval_width = self._check_interval_width(X) self._fit(X, y, self.n_intervals_to_select, interval_width) @abstractmethod def _fit(self, X, y, n_intervals_to_select, interval_width): pass def _check_n_intervals_to_select(self, X): check_scalar(self.n_intervals_to_select, name='n_intervals_to_select', target_type=int, min_val=1, max_val=X.shape[1]-1) def _check_interval_width(self, X): n_features = X.shape[1] interval_width = self.interval_width if interval_width is None: interval_width = n_features // 2 elif 0 < interval_width < 1: interval_width = max(2, int(interval_width * n_features)) if (interval_width <= 0) \ or (interval_width >= n_features) \ or (self.n_intervals_to_select * interval_width >= n_features): raise ValueError('interval_width has to be either an int in {1, ..., n_features-1}' f'or a float in (0, 1); got {self.interval_width}') return interval_width def reparameterize(self, feature_descriptor: FeatureDescriptor): self.n_intervals_to_select, self.interval_width = feature_descriptor.get_configuration_for(self)
[docs]class Convertible(metaclass=ABCMeta): """Selectors subclassing :class:`~auswah.PointSelector`, which provide a global score for each feature, can be made eligible for a :class:`~auswahl.PointSelector` to :class:`~auswahl.IntervalSelector` conversion facilitated by :class:`~auswahl.PseudoIntervalSelector` by inheriting from this class. """
[docs] @abstractmethod def get_feature_scores(self) -> np.ndarray: """Provide scores of all features Returns ------- feature scores: np.ndarray of shape [n_features,] """ ...