try:
from collections.abc import Sized
except ImportError:
from collections import Sized
from collections import defaultdict
from functools import partial
import numpy as np
from scipy.stats import rankdata
import sklearn
from sklearn.base import is_classifier, clone
from joblib import Parallel, delayed
from sklearn.model_selection._search import BaseSearchCV
from sklearn.utils import check_random_state
from sklearn.utils.fixes import MaskedArray
from sklearn.utils.validation import indexable, check_is_fitted
try:
from sklearn.metrics import check_scoring
except ImportError:
from sklearn.metrics.scorer import check_scoring
from . import Optimizer
from .utils import point_asdict, dimensions_aslist, eval_callbacks
from .space import check_dimension
from .callbacks import check_callback
[docs]class BayesSearchCV(BaseSearchCV):
"""Bayesian optimization over hyper parameters.
BayesSearchCV implements a "fit" and a "score" method.
It also implements "predict", "predict_proba", "decision_function",
"transform" and "inverse_transform" if they are implemented in the
estimator used.
The parameters of the estimator used to apply these methods are optimized
by cross-validated search over parameter settings.
In contrast to GridSearchCV, not all parameter values are tried out, but
rather a fixed number of parameter settings is sampled from the specified
distributions. The number of parameter settings that are tried is
given by n_iter.
Parameters are presented as a list of skopt.space.Dimension objects.
Parameters
----------
estimator : estimator object.
A object of that type is instantiated for each search point.
This object is assumed to implement the scikit-learn estimator api.
Either estimator needs to provide a ``score`` function,
or ``scoring`` must be passed.
search_spaces : dict, list of dict or list of tuple containing
(dict, int).
One of these cases:
1. dictionary, where keys are parameter names (strings)
and values are skopt.space.Dimension instances (Real, Integer
or Categorical) or any other valid value that defines skopt
dimension (see skopt.Optimizer docs). Represents search space
over parameters of the provided estimator.
2. list of dictionaries: a list of dictionaries, where every
dictionary fits the description given in case 1 above.
If a list of dictionary objects is given, then the search is
performed sequentially for every parameter space with maximum
number of evaluations set to self.n_iter.
3. list of (dict, int > 0): an extension of case 2 above,
where first element of every tuple is a dictionary representing
some search subspace, similarly as in case 2, and second element
is a number of iterations that will be spent optimizing over
this subspace.
n_iter : int, default=50
Number of parameter settings that are sampled. n_iter trades
off runtime vs quality of the solution. Consider increasing
``n_points`` if you want to try more parameter settings in
parallel.
optimizer_kwargs : dict, optional
Dict of arguments passed to :class:`Optimizer`. For example,
``{'base_estimator': 'RF'}`` would use a Random Forest surrogate
instead of the default Gaussian Process.
scoring : string, callable or None, default=None
A string (see model evaluation documentation) or
a scorer callable object / function with signature
``scorer(estimator, X, y)``.
If ``None``, the ``score`` method of the estimator is used.
fit_params : dict, optional
Parameters to pass to the fit method.
n_jobs : int, default=1
Number of jobs to run in parallel. At maximum there are
``n_points`` times ``cv`` jobs available during each iteration.
n_points : int, default=1
Number of parameter settings to sample in parallel. If this does
not align with ``n_iter``, the last iteration will sample less
points. See also :func:`~Optimizer.ask`
pre_dispatch : int, or string, optional
Controls the number of jobs that get dispatched during parallel
execution. Reducing this number can be useful to avoid an
explosion of memory consumption when more jobs get dispatched
than CPUs can process. This parameter can be:
- None, in which case all the jobs are immediately
created and spawned. Use this for lightweight and
fast-running jobs, to avoid delays due to on-demand
spawning of the jobs
- An int, giving the exact number of total jobs that are
spawned
- A string, giving an expression as a function of n_jobs,
as in '2*n_jobs'
iid : boolean, default=True
If True, the data is assumed to be identically distributed across
the folds, and the loss minimized is the total loss per sample,
and not the mean loss across the folds.
cv : int, cross-validation generator or an iterable, optional
Determines the cross-validation splitting strategy.
Possible inputs for cv are:
- None, to use the default 3-fold cross validation,
- integer, to specify the number of folds in a `(Stratified)KFold`,
- An object to be used as a cross-validation generator.
- An iterable yielding train, test splits.
For integer/None inputs, if the estimator is a classifier and ``y`` is
either binary or multiclass, :class:`StratifiedKFold` is used. In all
other cases, :class:`KFold` is used.
refit : boolean, default=True
Refit the best estimator with the entire dataset.
If "False", it is impossible to make predictions using
this RandomizedSearchCV instance after fitting.
verbose : integer
Controls the verbosity: the higher, the more messages.
random_state : int or RandomState
Pseudo random number generator state used for random uniform sampling
from lists of possible values instead of scipy.stats distributions.
error_score : 'raise' (default) or numeric
Value to assign to the score if an error occurs in estimator fitting.
If set to 'raise', the error is raised. If a numeric value is given,
FitFailedWarning is raised. This parameter does not affect the refit
step, which will always raise the error.
return_train_score : boolean, default=False
If ``'True'``, the ``cv_results_`` attribute will include training
scores.
Examples
--------
>>> from skopt import BayesSearchCV
>>> # parameter ranges are specified by one of below
>>> from skopt.space import Real, Categorical, Integer
>>>
>>> from sklearn.datasets import load_iris
>>> from sklearn.svm import SVC
>>> from sklearn.model_selection import train_test_split
>>>
>>> X, y = load_iris(True)
>>> X_train, X_test, y_train, y_test = train_test_split(X, y,
... train_size=0.75,
... random_state=0)
>>>
>>> # log-uniform: understand as search over p = exp(x) by varying x
>>> opt = BayesSearchCV(
... SVC(),
... {
... 'C': Real(1e-6, 1e+6, prior='log-uniform'),
... 'gamma': Real(1e-6, 1e+1, prior='log-uniform'),
... 'degree': Integer(1,8),
... 'kernel': Categorical(['linear', 'poly', 'rbf']),
... },
... n_iter=32,
... random_state=0
... )
>>>
>>> # executes bayesian optimization
>>> _ = opt.fit(X_train, y_train)
>>>
>>> # model can be saved, used for predictions or scoring
>>> print(opt.score(X_test, y_test))
0.973...
Attributes
----------
cv_results_ : dict of numpy (masked) ndarrays
A dict with keys as column headers and values as columns, that can be
imported into a pandas ``DataFrame``.
For instance the below given table
+--------------+-------------+-------------------+---+---------------+
| param_kernel | param_gamma | split0_test_score |...|rank_test_score|
+==============+=============+===================+===+===============+
| 'rbf' | 0.1 | 0.8 |...| 2 |
+--------------+-------------+-------------------+---+---------------+
| 'rbf' | 0.2 | 0.9 |...| 1 |
+--------------+-------------+-------------------+---+---------------+
| 'rbf' | 0.3 | 0.7 |...| 1 |
+--------------+-------------+-------------------+---+---------------+
will be represented by a ``cv_results_`` dict of::
{
'param_kernel' : masked_array(data = ['rbf', 'rbf', 'rbf'],
mask = False),
'param_gamma' : masked_array(data = [0.1 0.2 0.3], mask = False),
'split0_test_score' : [0.8, 0.9, 0.7],
'split1_test_score' : [0.82, 0.5, 0.7],
'mean_test_score' : [0.81, 0.7, 0.7],
'std_test_score' : [0.02, 0.2, 0.],
'rank_test_score' : [3, 1, 1],
'split0_train_score' : [0.8, 0.9, 0.7],
'split1_train_score' : [0.82, 0.5, 0.7],
'mean_train_score' : [0.81, 0.7, 0.7],
'std_train_score' : [0.03, 0.03, 0.04],
'mean_fit_time' : [0.73, 0.63, 0.43, 0.49],
'std_fit_time' : [0.01, 0.02, 0.01, 0.01],
'mean_score_time' : [0.007, 0.06, 0.04, 0.04],
'std_score_time' : [0.001, 0.002, 0.003, 0.005],
'params' : [{'kernel' : 'rbf', 'gamma' : 0.1}, ...],
}
NOTE that the key ``'params'`` is used to store a list of parameter
settings dict for all the parameter candidates.
The ``mean_fit_time``, ``std_fit_time``, ``mean_score_time`` and
``std_score_time`` are all in seconds.
best_estimator_ : estimator
Estimator that was chosen by the search, i.e. estimator
which gave highest score (or smallest loss if specified)
on the left out data. Not available if refit=False.
optimizer_results_ : list of `OptimizeResult`
Contains a `OptimizeResult` for each search space. The search space
parameter are sorted by its name.
best_score_ : float
Score of best_estimator on the left out data.
best_params_ : dict
Parameter setting that gave the best results on the hold out data.
best_index_ : int
The index (of the ``cv_results_`` arrays) which corresponds to the best
candidate parameter setting.
The dict at ``search.cv_results_['params'][search.best_index_]`` gives
the parameter setting for the best model, that gives the highest
mean score (``search.best_score_``).
scorer_ : function
Scorer function used on the held out data to choose the best
parameters for the model.
n_splits_ : int
The number of cross-validation splits (folds/iterations).
Notes
-----
The parameters selected are those that maximize the score of the held-out
data, according to the scoring parameter.
If `n_jobs` was set to a value higher than one, the data is copied for each
parameter setting(and not `n_jobs` times). This is done for efficiency
reasons if individual jobs take very little time, but may raise errors if
the dataset is large and not enough memory is available. A workaround in
this case is to set `pre_dispatch`. Then, the memory is copied only
`pre_dispatch` many times. A reasonable value for `pre_dispatch` is `2 *
n_jobs`.
See Also
--------
:class:`GridSearchCV`:
Does exhaustive search over a grid of parameters.
"""
[docs] def __init__(self, estimator, search_spaces, optimizer_kwargs=None,
n_iter=50, scoring=None, fit_params=None, n_jobs=1,
n_points=1, iid=True, refit=True, cv=None, verbose=0,
pre_dispatch='2*n_jobs', random_state=None,
error_score='raise', return_train_score=False):
self.search_spaces = search_spaces
self.n_iter = n_iter
self.n_points = n_points
self.random_state = random_state
self.optimizer_kwargs = optimizer_kwargs
self._check_search_space(self.search_spaces)
# Temporary fix for compatibility with sklearn 0.20 and 0.21
# See scikit-optimize#762
# To be consistent with sklearn 0.21+, fit_params should be deprecated
# in the constructor and be passed in ``fit``.
self.fit_params = fit_params
super(BayesSearchCV, self).__init__(
estimator=estimator, scoring=scoring,
n_jobs=n_jobs, iid=iid, refit=refit, cv=cv, verbose=verbose,
pre_dispatch=pre_dispatch, error_score=error_score,
return_train_score=return_train_score)
def _check_search_space(self, search_space):
"""Checks whether the search space argument is correct"""
if len(search_space) == 0:
raise ValueError(
"The search_spaces parameter should contain at least one"
"non-empty search space, got %s" % search_space
)
# check if space is a single dict, convert to list if so
if isinstance(search_space, dict):
search_space = [search_space]
# check if the structure of the space is proper
if isinstance(search_space, list):
# convert to just a list of dicts
dicts_only = []
# 1. check the case when a tuple of space, n_iter is provided
for elem in search_space:
if isinstance(elem, tuple):
if len(elem) != 2:
raise ValueError(
"All tuples in list of search spaces should have"
"length 2, and contain (dict, int), got %s" % elem
)
subspace, n_iter = elem
if (not isinstance(n_iter, int)) or n_iter < 0:
raise ValueError(
"Number of iterations in search space should be"
"positive integer, got %s in tuple %s " %
(n_iter, elem)
)
# save subspaces here for further checking
dicts_only.append(subspace)
elif isinstance(elem, dict):
dicts_only.append(elem)
else:
raise TypeError(
"A search space should be provided as a dict or"
"tuple (dict, int), got %s" % elem)
# 2. check all the dicts for correctness of contents
for subspace in dicts_only:
for k, v in subspace.items():
check_dimension(v)
else:
raise TypeError(
"Search space should be provided as a dict or list of dict,"
"got %s" % search_space)
# copied for compatibility with 0.19 sklearn from 0.18 BaseSearchCV
@property
def best_score_(self):
check_is_fitted(self, 'cv_results_')
return self.cv_results_['mean_test_score'][self.best_index_]
# copied for compatibility with 0.19 sklearn from 0.18 BaseSearchCV
@property
def best_params_(self):
check_is_fitted(self, 'cv_results_')
return self.cv_results_['params'][self.best_index_]
@property
def optimizer_results_(self):
check_is_fitted(self, '_optim_results')
return self._optim_results
# copied for compatibility with 0.19 sklearn from 0.18 BaseSearchCV
def _fit(self, X, y, groups, parameter_iterable):
"""
Actual fitting, performing the search over parameters.
Taken from https://github.com/scikit-learn/scikit-learn/blob/0.18.X
.../sklearn/model_selection/_search.py
"""
estimator = self.estimator
cv = sklearn.model_selection._validation.check_cv(
self.cv, y, classifier=is_classifier(estimator))
self.scorer_ = check_scoring(
self.estimator, scoring=self.scoring)
X, y, groups = indexable(X, y, groups)
n_splits = cv.get_n_splits(X, y, groups)
if self.verbose > 0 and isinstance(parameter_iterable, Sized):
n_candidates = len(parameter_iterable)
print("Fitting {0} folds for each of {1} candidates, totalling"
" {2} fits".format(n_splits, n_candidates,
n_candidates * n_splits))
base_estimator = clone(self.estimator)
pre_dispatch = self.pre_dispatch
cv_iter = list(cv.split(X, y, groups))
out = Parallel(
n_jobs=self.n_jobs, verbose=self.verbose,
pre_dispatch=pre_dispatch
)(delayed(sklearn.model_selection._validation._fit_and_score)(
clone(base_estimator),
X, y, self.scorer_,
train, test, self.verbose, parameters,
fit_params=self.fit_params,
return_train_score=self.return_train_score,
return_n_test_samples=True,
return_times=True, return_parameters=True,
error_score=self.error_score
)
for parameters in parameter_iterable
for train, test in cv_iter)
# if one choose to see train score, "out" will contain train score info
if self.return_train_score:
(train_scores, test_scores, test_sample_counts,
fit_time, score_time, parameters) = zip(*out)
else:
(test_scores, test_sample_counts,
fit_time, score_time, parameters) = zip(*out)
candidate_params = parameters[::n_splits]
n_candidates = len(candidate_params)
results = dict()
def _store(key_name, array, weights=None, splits=False, rank=False):
"""A small helper to store the scores/times to the cv_results_"""
array = np.array(array, dtype=np.float64).reshape(n_candidates,
n_splits)
if splits:
for split_i in range(n_splits):
results["split%d_%s"
% (split_i, key_name)] = array[:, split_i]
array_means = np.average(array, axis=1, weights=weights)
results['mean_%s' % key_name] = array_means
# Weighted std is not directly available in numpy
array_stds = np.sqrt(np.average((array -
array_means[:, np.newaxis]) ** 2,
axis=1, weights=weights))
results['std_%s' % key_name] = array_stds
if rank:
results["rank_%s" % key_name] = np.asarray(
rankdata(-array_means, method='min'), dtype=np.int32)
# Computed the (weighted) mean and std for test scores alone
# NOTE test_sample counts (weights) remain the same for all candidates
test_sample_counts = np.array(test_sample_counts[:n_splits],
dtype=np.int)
_store('test_score', test_scores, splits=True, rank=True,
weights=test_sample_counts if self.iid else None)
if self.return_train_score:
_store('train_score', train_scores, splits=True)
_store('fit_time', fit_time)
_store('score_time', score_time)
best_index = np.flatnonzero(results["rank_test_score"] == 1)[0]
best_parameters = candidate_params[best_index]
# Use one MaskedArray and mask all the places where the param is not
# applicable for that candidate. Use defaultdict as each candidate may
# not contain all the params
param_results = defaultdict(partial(np.ma.array,
np.empty(n_candidates,),
mask=True,
dtype=object))
for cand_i, params in enumerate(candidate_params):
for name, value in params.items():
# An all masked empty array gets created for the key
# `"param_%s" % name` at the first occurence of `name`.
# Setting the value at an index also unmasks that index
param_results["param_%s" % name][cand_i] = value
results.update(param_results)
# Store a list of param dicts at the key 'params'
results['params'] = candidate_params
self.cv_results_ = results
self.best_index_ = best_index
self.n_splits_ = n_splits
if self.refit:
# fit the best estimator using the entire dataset
# clone first to work around broken estimators
best_estimator = clone(base_estimator).set_params(
**best_parameters)
if y is not None:
best_estimator.fit(X, y, **self.fit_params)
else:
best_estimator.fit(X, **self.fit_params)
self.best_estimator_ = best_estimator
return self
def _fit_best_model(self, X, y):
"""Fit the estimator copy with best parameters found to the
provided data.
Parameters
----------
X : array-like, shape = [n_samples, n_features]
Input data, where n_samples is the number of samples and
n_features is the number of features.
y : array-like, shape = [n_samples] or [n_samples, n_output],
Target relative to X for classification or regression.
Returns
-------
self
"""
self.best_estimator_ = clone(self.estimator)
self.best_estimator_.set_params(**self.best_params_)
self.best_estimator_.fit(X, y, **(self.fit_params or {}))
return self
def _make_optimizer(self, params_space):
"""Instantiate skopt Optimizer class.
Parameters
----------
params_space : dict
Represents parameter search space. The keys are parameter
names (strings) and values are skopt.space.Dimension instances,
one of Real, Integer or Categorical.
Returns
-------
optimizer: Instance of the `Optimizer` class used for for search
in some parameter space.
"""
kwargs = self.optimizer_kwargs_.copy()
kwargs['dimensions'] = dimensions_aslist(params_space)
optimizer = Optimizer(**kwargs)
for i in range(len(optimizer.space.dimensions)):
if optimizer.space.dimensions[i].name is not None:
continue
optimizer.space.dimensions[i].name = list(sorted(
params_space.keys()))[i]
return optimizer
def _step(self, X, y, search_space, optimizer, groups=None, n_points=1):
"""Generate n_jobs parameters and evaluate them in parallel.
"""
# get parameter values to evaluate
params = optimizer.ask(n_points=n_points)
# convert parameters to python native types
params = [[np.array(v).item() for v in p] for p in params]
# make lists into dictionaries
params_dict = [point_asdict(search_space, p) for p in params]
# HACK: self.cv_results_ is reset at every call to _fit, keep current
all_cv_results = self.cv_results_
# HACK: this adds compatibility with different versions of sklearn
refit = self.refit
self.refit = False
self._fit(X, y, groups, params_dict)
self.refit = refit
# merge existing and new cv_results_
for k in self.cv_results_:
all_cv_results[k].extend(self.cv_results_[k])
all_cv_results["rank_test_score"] = list(np.asarray(
rankdata(-np.array(all_cv_results['mean_test_score']),
method='min'), dtype=np.int32))
if self.return_train_score:
all_cv_results["rank_train_score"] = list(np.asarray(
rankdata(-np.array(all_cv_results['mean_train_score']),
method='min'), dtype=np.int32))
self.cv_results_ = all_cv_results
self.best_index_ = np.argmax(self.cv_results_['mean_test_score'])
# feed the point and objective back into optimizer
local_results = self.cv_results_['mean_test_score'][-len(params):]
# optimizer minimizes objective, hence provide negative score
return optimizer.tell(params, [-score for score in local_results])
@property
def total_iterations(self):
"""
Count total iterations that will be taken to explore
all subspaces with `fit` method.
Returns
-------
max_iter: int, total number of iterations to explore
"""
total_iter = 0
for elem in self.search_spaces:
if isinstance(elem, tuple):
space, n_iter = elem
else:
n_iter = self.n_iter
total_iter += n_iter
return total_iter
def _run_search(self, x):
pass
[docs] def fit(self, X, y=None, groups=None, callback=None):
"""Run fit on the estimator with randomly drawn parameters.
Parameters
----------
X : array-like or sparse matrix, shape = [n_samples, n_features]
The training input samples.
y : array-like, shape = [n_samples] or [n_samples, n_output]
Target relative to X for classification or regression (class
labels should be integers or strings).
groups : array-like, with shape (n_samples,), optional
Group labels for the samples used while splitting the dataset into
train/test set.
callback: [callable, list of callables, optional]
If callable then `callback(res)` is called after each parameter
combination tested. If list of callables, then each callable in
the list is called.
"""
# check if space is a single dict, convert to list if so
search_spaces = self.search_spaces
if isinstance(search_spaces, dict):
search_spaces = [search_spaces]
callbacks = check_callback(callback)
if self.optimizer_kwargs is None:
self.optimizer_kwargs_ = {}
else:
self.optimizer_kwargs_ = dict(self.optimizer_kwargs)
random_state = check_random_state(self.random_state)
self.optimizer_kwargs_['random_state'] = random_state
# Instantiate optimizers for all the search spaces.
optimizers = []
for search_space in search_spaces:
if isinstance(search_space, tuple):
search_space = search_space[0]
optimizers.append(self._make_optimizer(search_space))
self.optimizers_ = optimizers # will save the states of the optimizers
self.cv_results_ = defaultdict(list)
self.best_index_ = None
self.multimetric_ = False
self._optim_results = []
n_points = self.n_points
for search_space, optimizer in zip(search_spaces, optimizers):
# if not provided with search subspace, n_iter is taken as
# self.n_iter
if isinstance(search_space, tuple):
search_space, n_iter = search_space
else:
n_iter = self.n_iter
# do the optimization for particular search space
while n_iter > 0:
# when n_iter < n_points points left for evaluation
n_points_adjusted = min(n_iter, n_points)
optim_result = self._step(
X, y, search_space, optimizer,
groups=groups, n_points=n_points_adjusted
)
n_iter -= n_points
if eval_callbacks(callbacks, optim_result):
break
self._optim_results.append(optim_result)
# Refit the best model on the the whole dataset
if self.refit:
self._fit_best_model(X, y)
return self