Source code for auswahl._random_frog

from abc import ABCMeta, abstractmethod
from typing import Union, Dict, List
from warnings import warn

import numpy as np
from sklearn import clone
from sklearn.base import BaseEstimator
from sklearn.cross_decomposition import PLSRegression
from sklearn.utils.validation import check_is_fitted, check_random_state, check_scalar

from ._base import PointSelector, IntervalSelector, SpectralSelector
from .util import get_coef_from_pls


class _RandomFrog(SpectralSelector, BaseEstimator, metaclass=ABCMeta):
    """Mixin for the Random Frog feature selection method.

    The py:class:`auswahl.RandomFrogPointSelector` can be used for Wavelength Point selection and the
    py:class:`auswahl.RandomFrogIntervalSelector` can be used for Wavelength Interval selection.
    """

    def _select(self, X, y,
                n_features: int,
                n_features_to_select: Union[int, float] = None,
                n_iterations: int = 10000,
                n_initial_features: Union[int, float] = 0.1,
                variance_factor: float = 0.3,
                subset_expansion_factor: float = 3,
                acceptance_factor: float = 0.1,
                pls: PLSRegression = None,
                random_state: Union[int, np.random.RandomState] = None):
        # Perform parameter checks
        self._check_n_iterations(n_iterations)
        self._check_subset_expansion_factor(subset_expansion_factor)
        self._check_acceptance_factor(acceptance_factor)

        n_initial_features = self._check_n_initial_features(X, n_initial_features)
        variance_factor = self._check_variance_factor(variance_factor)
        random_state = check_random_state(random_state)

        # Initialize estimator, feature sets and frequency counter
        pls = PLSRegression() if pls is None else clone(pls)
        n_components = pls.n_components

        all_features = np.arange(n_features)
        selected_features = random_state.choice(n_features, n_initial_features, replace=False)
        self.frequencies_ = np.zeros(n_features)

        # Random Frog Iteration
        for i in range(n_iterations):
            n_selected_features = len(selected_features)
            n_candidate_features = random_state.normal(n_selected_features, n_selected_features * variance_factor)
            n_candidate_features = np.clip(round(n_candidate_features), n_components, n_features)

            if n_candidate_features < n_selected_features:
                # Reduction step
                features_to_explore = selected_features
            elif n_candidate_features > n_selected_features:
                # Expansion step
                n_diff = n_candidate_features - n_selected_features
                non_selected_features = np.setdiff1d(all_features, selected_features)
                n_features_to_explore = min(subset_expansion_factor * n_diff, len(non_selected_features))
                additional_features = random_state.choice(non_selected_features, n_features_to_explore, replace=False)
                features_to_explore = np.union1d(selected_features, additional_features)
            else:
                # Skip step
                self.frequencies_[selected_features] += 1
                continue

            # Determine the candidate feature selection
            _, pls = self.evaluate(X[:, self._idx_to_mask(features_to_explore)], y, pls)
            absolute_coefficients = self._get_feature_score_from_model(pls, features_to_explore)
            selection_idx = np.argsort(absolute_coefficients)[-n_candidate_features:]
            candidate_features = features_to_explore[selection_idx]

            # Score the current feature selection and the candidate feature selection
            pls.n_components = min(pls.n_components, len(selected_features))
            selected_features_score, _ = self.evaluate(X=X[:, self._idx_to_mask(selected_features)], y=y, model=pls)

            pls.n_components = min(pls.n_components, len(candidate_features))
            candidate_features_score, _ = self.evaluate(X=X[:, self._idx_to_mask(candidate_features)], y=y, model=pls)

            # Update the feature selection
            if candidate_features_score >= selected_features_score:
                selected_features = candidate_features
            elif random_state.random() < acceptance_factor * (selected_features_score / candidate_features_score):
                selected_features = candidate_features
            self.frequencies_[selected_features] += 1

        self.support_ = self._generate_mask_from_frequencies(n_features_to_select)
        self.best_model_ = pls.fit(self.transform(X), y)

        return self

    @abstractmethod
    def _idx_to_mask(self, feature_idx):
        pass

    @abstractmethod
    def _generate_mask_from_frequencies(self, n_features_to_select):
        pass

    @abstractmethod
    def _get_feature_score_from_model(self, pls, feature_idx):
        pass

    @staticmethod
    def _check_n_iterations(n_iterations):
        check_scalar(n_iterations, name='n_iterations', target_type=int, min_val=1)

    @staticmethod
    def _check_n_initial_features(X, n_initial_features):
        n_features = X.shape[1]

        if n_initial_features is None:
            n_initial_features = 0.1
        if 0 < n_initial_features < 1:
            n_initial_features = max(1, int(n_initial_features * n_features))

        if (n_initial_features < 1) or (n_initial_features > n_features):
            raise ValueError('n_initial_features has to be either an int in {1, ..., n_features}'
                             f'or a float in (0, 1); got {n_initial_features}')

        return n_initial_features

    @staticmethod
    def _check_variance_factor(variance_factor):
        if variance_factor <= 0:
            warn('variance_factor is negative! abs(variance_factor) will be used for the fit method.')
            variance_factor = abs(variance_factor)
        return variance_factor

    @staticmethod
    def _check_subset_expansion_factor(subset_expansion_factor):
        check_scalar(subset_expansion_factor, name='subset_expansion_factor', target_type=(int, float), min_val=1)

    @staticmethod
    def _check_acceptance_factor(acceptance_factor):
        check_scalar(acceptance_factor, name='acceptance_factor', target_type=(int, float), min_val=0, max_val=1)


[docs]class RandomFrog(PointSelector, _RandomFrog): r"""Feature selection with the Random Frog method. The selection frequencies are computed according to Li et al. [1]_. Read more in the :ref:`User Guide <rf>`. Parameters ---------- n_features_to_select : int or float, default=None Number of features to select. n_iterations : int, default=10000 Number of variable selection iterations. This variable is called N in the original publication. n_initial_features : int or float, default=0.1 Number of features in the initial feature subset. If `None`, 10 % of the features are used. If integer, the parameter is the size of the initial subset. If float between 0 and 1, it is the fraction of features to use. This variable is called Q in the original publication. variance_factor : float, default=0.3 Variance of the normal distribution which samples determine the amount of features that are added or removed to the candidate set in each iteration. This variable is called θ in the original publication. subset_expansion_factor : float, default=3 Multiple of the number of features that are explored if the candidate subset is expanded. If the current feature subset is n and m new features have to be added to the new feature subset, m*subset_expansion_factors features are added to a candidate set. After fitting a PLS model, only the n+m features with the highest coefficients are kept. This variable is called ω in the original publication. acceptance_factor : float, default=0.1 The factor is used to calculate the probability that a feature subset is selected even though it leads to a worse cross-validation performance of a fitted PLS model. The probability is computed by multiplying the acceptance_factor with the relative decrease of the cross-validated performance score. This variable is called η in the original publication. n_cv_folds : int, default=5 Number of cross validation folds used to evaluate the features. n_jobs : int, default=1 Number of parallel processes used to fit the PLS models on the cross-validation splits. pls : PLSRegression, default=None Estimator instance of the :py:class:`PLSRegression <sklearn.cross_decomposition.PLSRegression>` class. Use this to adjust the hyperparameters of the PLS method. random_state : int or numpy.random.RandomState, default=None Seed for the random subset sampling. Pass an int for reproducible output across function calls. Attributes ---------- frequencies_ : ndarray of shape (n_features,) Number of times each feature has been selected after all iterations. support_ : ndarray of shape (n_features,) Mask of selected features. References ---------- .. [1] Hong-Dong Li and Qing-Song Xu and Yi-Zeng Liang, 'Random frog: An efficient reversible jump Markov Chain Monte Carlo-like approach for variable selection with applications to gene selection and disease classification', Analytica Chimica Acta, 740, 20-26, 2012 Examples -------- >>> import numpy as np >>> from auswahl import RandomFrog >>> np.random.seed(1337) >>> X = np.random.randn(100, 10) >>> y = 5 * X[:, 0] - 2 * X[:, 5] # y only depends on two features >>> selector = RandomFrog(n_features_to_select=2, n_iterations=1000) >>> selector.fit(X, y).get_support() array([ True, False, False, False, False, True, False, False, False, False]) """
[docs] def __init__(self, n_features_to_select: Union[int, float] = None, n_iterations: int = 10000, n_initial_features: Union[int, float] = 0.1, variance_factor: float = 0.3, subset_expansion_factor: float = 3, acceptance_factor: float = 0.1, pls: PLSRegression = None, n_cv_folds: int = 5, n_jobs: int = 1, model_hyperparams: Union[Dict, List[Dict]] = None, random_state: Union[int, np.random.RandomState] = None): super().__init__(n_features_to_select, model_hyperparams=model_hyperparams, random_state=random_state, n_jobs=n_jobs) self.n_iterations = n_iterations self.n_initial_features = n_initial_features self.variance_factor = variance_factor self.subset_expansion_factor = subset_expansion_factor self.acceptance_factor = acceptance_factor self.n_cv_folds = n_cv_folds self.pls = pls
def _fit(self, X, y, n_features_to_select): self.n_features_ = X.shape[1] return super()._select(X, y, n_features=self.n_features_, n_features_to_select=n_features_to_select, n_iterations=self.n_iterations, n_initial_features=self.n_initial_features, variance_factor=self.variance_factor, subset_expansion_factor=self.subset_expansion_factor, acceptance_factor=self.acceptance_factor, pls=self.pls, random_state=self.random_state) def _idx_to_mask(self, feature_idx): mask = np.zeros(self.n_features_, dtype=bool) mask[feature_idx] = 1 return mask def _generate_mask_from_frequencies(self, n_features_to_select): mask = np.zeros(len(self.frequencies_), dtype=bool) selected_idx = np.argsort(self.frequencies_)[-n_features_to_select:] mask[selected_idx] = 1 return mask def _get_feature_score_from_model(self, pls, feature_idx): return abs(get_coef_from_pls(pls).squeeze())
[docs]class IntervalRandomFrog(IntervalSelector, _RandomFrog): r"""Feature selection with the Interval Random Frog (iRF) method. The selection frequencies are computed according to Yun et al. [1]_. Read more in the :ref:`User Guide <irf>`. Parameters ---------- n_intervals_to_select : int, default=1 Number of intervals to select. interval_width: int or float, default=None Size of the selected intervals. If `None`, the intervals are n_features/2 long. If integer, the parameter directly defines the number of consecutive features that form an interval. If float between 0 and 1, the intervals are n_features*n_intervals_to_select long. n_iterations : int, default=10000 Number of variable selection iterations. This variable is called N in the original publication. n_initial_intervals : int or float, default=0.1 Number of intervals in the initial interval subset. If `None`, 10 % of the intervals are used. If integer, the parameter is the size of the initial subset. If float between 0 and 1, it is the fraction of intervals to use. This variable is called Q in the original publication. variance_factor : float, default=0.3 Variance of the normal distribution which samples determine the amount of intervals that are added or removed to the candidate set in each iteration. This variable is called θ in the original publication. subset_expansion_factor : float, default=3 Multiple of the number of intervals that are explored if the candidate subset is expanded. If the current interval subset is n and m new intervals have to be added to the new interval subset, m*subset_expansion_factors intervals are added to a candidate set. After fitting a PLS model, only the n+m intervals with the highest coefficients are kept. This variable is called ω in the original publication. acceptance_factor : float, default=0.1 The factor is used to calculate the probability that an interval subset is selected even though it leads to a worse cross-validation performance of a fitted PLS model. The probability is computed by multiplying the acceptance_factor with the relative decrease of the cross-validated performance score. This variable is called η in the original publication. n_cv_folds : int, default=5 Number of cross validation folds used to evaluate the features. n_jobs : int, default=1 Number of parallel processes used to fit the PLS models on the cross-validation splits. pls : PLSRegression, default=None Estimator instance of the :py:class:`PLSRegression <sklearn.cross_decomposition.PLSRegression>` class. Use this to adjust the hyperparameters of the PLS method. random_state : int or numpy.random.RandomState, default=None Seed for the random subset sampling. Pass an int for reproducible output across function calls. Attributes ---------- frequencies_ : ndarray of shape (n_features,) Number of times each interval has been selected after all iterations. support_ : ndarray of shape (n_features,) Mask of selected intervals. References ---------- .. [1] Yong-Huan Yun and Hong-Dong Li and Leslie R. E. Wood and Wei Fan and Jia-Jun Wang and Dong-Sheng Cao and Qing-Song Xu and Yi-Zeng Liang, 'An efficient method of wavelength interval selection based on random frog for multivariate spectral calibration', Spectrochimica Acta Part A: Molecular and Biomolecular Spectroscopy, 111, 31-36, 2013 Examples -------- >>> import numpy as np >>> from auswahl import IntervalRandomFrog >>> np.random.seed(1337) >>> X = np.random.randn(100, 10) >>> y = 5 * X[:, 0] - 3 * X[:, 1] + 2 * X[:, 5] - 3 * X[:, 6] # y only depends on two intervals >>> selector = IntervalRandomFrog(n_intervals_to_select=2, interval_width=2, n_iterations=1000, random_state=7331) >>> selector.fit(X, y).get_support() array([ True, True, False, False, False, True, True, False, False, False]) """
[docs] def __init__(self, n_intervals_to_select: int = 1, interval_width: Union[int, float] = None, n_iterations: int = 10000, n_initial_intervals: Union[int, float] = 0.1, variance_factor: float = 0.3, subset_expansion_factor: float = 3, acceptance_factor: float = 0.1, n_cv_folds: int = 5, n_jobs: int = 1, pls: PLSRegression = None, model_hyperparams: Union[Dict, List[Dict]] = None, random_state: Union[int, np.random.RandomState] = None): super().__init__(n_intervals_to_select, interval_width, model_hyperparams=model_hyperparams, n_cv_folds=n_cv_folds, random_state=random_state, n_jobs=n_jobs) self.n_iterations = n_iterations self.n_initial_intervals = n_initial_intervals self.variance_factor = variance_factor self.subset_expansion_factor = subset_expansion_factor self.acceptance_factor = acceptance_factor self.n_cv_folds = n_cv_folds self.pls = pls
def _fit(self, X, y, n_intervals_to_select, interval_width): self.n_windows_ = X.shape[1] - interval_width + 1 self.interval_width_ = self._check_interval_width(X) return super()._select(X, y, n_features=self.n_windows_, n_features_to_select=n_intervals_to_select, n_iterations=self.n_iterations, n_initial_features=self.n_initial_intervals, variance_factor=self.variance_factor, subset_expansion_factor=self.subset_expansion_factor, acceptance_factor=self.acceptance_factor, pls=self.pls, random_state=self.random_state) def _idx_to_mask(self, feature_idx): mask = np.zeros(self.n_windows_ + self.interval_width_ - 1, dtype=bool) for idx in feature_idx: mask[idx:idx + self.interval_width_] = 1 return mask def _generate_mask_from_frequencies(self, n_features_to_select): mask = np.zeros(len(self.frequencies_) + self.interval_width_ - 1, dtype=bool) scores = self.frequencies_.copy() for i in range(n_features_to_select): best_idx = np.argmax(scores) mask[best_idx:best_idx + self.interval_width_] = True start = np.clip(best_idx - self.interval_width_ + 1, 0, np.inf).astype(int) end = best_idx + self.interval_width_ scores[start:end] = -1 return mask def _get_feature_score_from_model(self, pls, feature_idx): scores = np.zeros(self.n_windows_ + self.interval_width_ - 1) scores[self._idx_to_mask(feature_idx)] = abs(get_coef_from_pls(pls).squeeze()) scores = [sum(scores[idx:idx + self.interval_width_]) for idx in feature_idx] return scores