Source code for pysequd.sequd

import time
import numpy as np
import pandas as pd
from joblib import Parallel
from joblib import delayed
from matplotlib import pylab as plt
from sklearn.model_selection import cross_val_score

import pyunidoe as pydoe

EPS = 10**(-8)


[docs]class SeqUD(object): """ Implementation of sequential uniform design. Parameters ---------- :type para_space: dict or list of dictionaries :param para_space: It has three types: Continuous: Specify `Type` as `continuous`, and include the keys of `Range` (a list with lower-upper elements pair) and `Wrapper`, a callable function for wrapping the values. Integer: Specify `Type` as `integer`, and include the keys of `Mapping` (a list with all the sortted integer elements). Categorical: Specify `Type` as `categorical`, and include the keys of `Mapping` (a list with all the possible categories). :type n_runs_per_stage: int, optional, default=20 :param n_runs_per_stage: The positive integer which represent the number of levels in generating uniform design. :type max_runs: int, optional, default=100 :param max_runs: The maximum number of trials to be evaluated. When this values is reached, then the algorithm will stop. :type max_search_iter: int, optional, default=100 :param max_search_iter: The maximum number of iterations used to generate uniform design or augmented uniform design. :type n_jobs: int or None, optional, optional, default=None :param n_jobs: Number of jobs to run in parallel. If -1 all CPUs are used. If 1 is given, no parallel computing code is used at all, which is useful for debugging. See the package `joblib` for details. :type estimator: estimator object :param estimator: This is assumed to implement the scikit-learn estimator interface. :type cv: cross-validation method, an sklearn object. :param cv: e.g., `StratifiedKFold` and KFold` is used. :type scoring: string, callable, list/tuple, dict or None, optional, default=None :param scoring: A sklearn type scoring function. If None, the estimator's default scorer (if available) is used. See the package `sklearn` for details. :type refit: boolean, or string, optional, default=True :param refit: It controls whether to refit an estimator using the best found parameters on the whole dataset. :type random_state: int, optional, default=0 :param random_state: The random seed for optimization. :type verbose: boolean, optional, default=False :param verbose: It controls whether the searching history will be printed. Examples ---------- >>> import numpy as np >>> from sklearn import svm >>> from sklearn import datasets >>> from sequd import SeqUD >>> from sklearn.model_selection import KFold >>> iris = datasets.load_iris() >>> ParaSpace = {'C':{'Type': 'continuous', 'Range': [-6, 16], 'Wrapper': np.exp2}, 'gamma': {'Type': 'continuous', 'Range': [-16, 6], 'Wrapper': np.exp2}} >>> estimator = svm.SVC() >>> cv = KFold(n_splits=5, random_state=1, shuffle=True) >>> clf = SeqUD(ParaSpace, n_runs_per_stage=20, max_runs=100, max_search_iter=100, n_jobs=None, estimator=None, cv=None, scoring=None, refit=None, random_state=0, verbose=False) >>> clf.fit(iris.data, iris.target) Attributes ---------- :vartype best_score\_: float :ivar best_score\_: The best average cv score among the evaluated trials. :vartype best_params\_: dict :ivar best_params\_: Parameters that reaches `best_score_`. :vartype best_estimator\_: sklearn estimator :ivar best_estimator\_: The estimator refitted based on the `best_params_`. Not available if estimator = None or `refit=False`. :vartype search_time_consumed\_: float :ivar search_time_consumed\_: Seconds used for whole searching procedure. :vartype refit_time\_: float :ivar refit_time\_: Seconds used for refitting the best model on the whole dataset. Not available if estimator=None or `refit=False`. """ def __init__(self, para_space, n_runs_per_stage=20, max_runs=100, max_search_iter=100, n_jobs=None, estimator=None, cv=None, scoring=None, refit=True, random_state=0, verbose=False): self.para_space = para_space self.n_runs_per_stage = n_runs_per_stage self.max_runs = max_runs self.max_search_iter = max_search_iter self.n_jobs = n_jobs if isinstance(n_jobs, int) else 1 self.random_state = random_state self.verbose = verbose self.cv = cv self.refit = refit self.scoring = scoring self.estimator = estimator self.stage = 0 self.stop_flag = False self.para_ud_names = [] self.variable_number = [0] self.factor_number = len(self.para_space) self.para_names = list(self.para_space.keys()) for items, values in self.para_space.items(): if (values['Type'] == "categorical"): self.variable_number.append(len(values['Mapping'])) self.para_ud_names.extend( [items + "_UD_" + str(i + 1) for i in range(len(values['Mapping']))]) else: self.variable_number.append(1) self.para_ud_names.append(items + "_UD") self.extend_factor_number = sum(self.variable_number)
[docs] def plot_scores(self): """ Visualize the scores history. """ if self.logs.shape[0] > 0: cum_best_score = self.logs["score"].cummax() plt.figure(figsize=(6, 4)) plt.plot(cum_best_score) plt.xlabel('# of Runs') plt.ylabel('Best Scores') plt.title('The best found scores during optimization') plt.grid(True) plt.show() else: print("No available logs!")
def _summary(self): """ This function summarizes the evaluation results and makes records. Parameters ---------- para_set_ud: A pandas dataframe where each row represents a UD trial point, and columns are used to represent variables. para_set: A pandas dataframe which contains the trial points in original form. score: A numpy vector, which contains the evaluated scores of trial points in para_set. """ self.best_index_ = self.logs.loc[:, "score"].idxmax() self.best_params_ = {self.logs.loc[:, self.para_names].columns[j]: self.logs.loc[:, self.para_names].iloc[self.best_index_, j] for j in range(self.logs.loc[:, self.para_names].shape[1])} self.best_score_ = self.logs.loc[:, "score"].iloc[self.best_index_] if self.verbose: print("SeqUD completed in %.2f seconds." % self.search_time_consumed_) print("The best score is: %.5f." % self.best_score_) print("The best configurations are:") print("\n".join("%-20s: %s" % (k, v if self.para_space[k]['Type'] == "categorical" else round(v, 5)) for k, v in self.best_params_.items())) def _para_mapping(self, para_set_ud): """ This function maps trials points in UD space ([0, 1]) to original scales. There are three types of variables: - continuous:Perform inverse Maxmin scaling for each value. - integer: Evenly split the UD space, and map each partition to the corresponding integer values. - categorical: The UD space uses one-hot encoding, and this function selects the one with the maximal value as class label. Parameters ---------- para_set_ud: A pandas dataframe where each row represents a UD trial point, and columns are used to represent variables. Returns ---------- para_set: The transformed variables. """ para_set = pd.DataFrame( np.zeros((para_set_ud.shape[0], self.factor_number)), columns=self.para_names) for item, values in self.para_space.items(): if (values['Type'] == "continuous"): para_set[item] = values['Wrapper']( para_set_ud[item + "_UD"] * (values['Range'][1] - values['Range'][0]) + values['Range'][0]) elif (values['Type'] == "integer"): temp = np.linspace(0, 1, len(values['Mapping']) + 1) for j in range(1, len(temp)): para_set.loc[(para_set_ud[item + "_UD"] >= (temp[j - 1] - EPS)) & (para_set_ud[item + "_UD"] < (temp[j] + EPS)), item] = values['Mapping'][j - 1] para_set.loc[np.abs(para_set_ud[item + "_UD"] - 1) <= EPS, item] = values['Mapping'][-1] para_set[item] = para_set[item].round().astype(int) elif (values['Type'] == "categorical"): column_bool = [ item in para_name for para_name in self.para_ud_names] col_index = np.argmax( para_set_ud.loc[:, column_bool].values, axis=1).tolist() para_set[item] = np.array(values['Mapping'])[col_index] return para_set def _generate_init_design(self): """ This function generates the initial uniform design. Returns ---------- para_set_ud: A pandas dataframe where each row represents a UD trial point, and columns are used to represent variables. """ self.logs = pd.DataFrame() ud_space = np.repeat(np.linspace(1 / (2 * self.n_runs_per_stage), 1 - 1 / (2 * self.n_runs_per_stage), self.n_runs_per_stage).reshape([-1, 1]), self.extend_factor_number, axis=1) base_ud = pydoe.design_query(n=self.n_runs_per_stage, s=self.extend_factor_number, q=self.n_runs_per_stage, crit="CD2", show_crit=False) if base_ud is None: base_ud = pydoe.gen_ud_ms(n=self.n_runs_per_stage, s=self.extend_factor_number, q=self.n_runs_per_stage, crit="CD2", maxiter=self.max_search_iter, random_state=self.random_state, n_jobs=10, nshoot=10) if (not isinstance(base_ud, np.ndarray)): raise ValueError('Uniform design is not correctly constructed!') para_set_ud = np.zeros((self.n_runs_per_stage, self.extend_factor_number)) for i in range(self.factor_number): loc_min = np.sum(self.variable_number[:(i + 1)]) loc_max = np.sum(self.variable_number[:(i + 2)]) for k in range(int(loc_min), int(loc_max)): para_set_ud[:, k] = ud_space[base_ud[:, k] - 1, k] para_set_ud = pd.DataFrame(para_set_ud, columns=self.para_ud_names) return para_set_ud def _generate_augment_design(self, ud_center): """ This function refines the search space to a subspace of interest, and generates augmented uniform designs given existing designs. Parameters ---------- ud_center: A numpy vector representing the center of the subspace, and corresponding elements denote the position of the center for each variable. Returns ---------- para_set_ud: A pandas dataframe where each row represents a UD trial point, and columns are used to represent variables. """ # 1. Transform the existing Parameters to Standardized Horizon (0-1) ud_space = np.zeros((self.n_runs_per_stage, self.extend_factor_number)) ud_grid_size = 1.0 / (self.n_runs_per_stage * 2**(self.stage - 1)) left_radius = np.floor((self.n_runs_per_stage - 1) / 2) * ud_grid_size right_radius = (self.n_runs_per_stage - np.floor((self.n_runs_per_stage - 1) / 2) - 1) * ud_grid_size for i in range(self.extend_factor_number): if ((ud_center[i] - left_radius) < (0 - EPS)): lb = 0 ub = ud_center[i] + right_radius - (ud_center[i] - left_radius) elif ((ud_center[i] + right_radius) > (1 + EPS)): ub = 1 lb = ud_center[i] - left_radius - \ (ud_center[i] + right_radius - 1) else: lb = max(ud_center[i] - left_radius, 0) ub = min(ud_center[i] + right_radius, 1) ud_space[:, i] = np.linspace(lb, ub, self.n_runs_per_stage) # 2. Map existing Runs' Parameters to UD Levels "x0" (1 - n_runs_per_stage) flag = True for i in range(self.extend_factor_number): flag = flag & ( self.logs.loc[:, self.para_ud_names].iloc[:, i] >= (ud_space[0, i] - EPS)) flag = flag & ( self.logs.loc[:, self.para_ud_names].iloc[:, i] <= (ud_space[-1, i] + EPS)) x0 = self.logs.loc[flag, self.para_ud_names].values for i in range(x0.shape[0]): for j in range(x0.shape[1]): x0[i, j] = ( np.where(abs(x0[i, j] - ud_space[:, j]) <= EPS)[0][0] + 1) x0 = np.round(x0).astype(int) # 3. Delete existing UD points on the same levels grids for i in range(self.extend_factor_number): keep_list = [] unique = np.unique(x0[:, i]) for j in range(len(unique)): xx_loc = np.where(np.abs(x0[:, i] - unique[j]) <= EPS)[0].tolist() keep_list.extend(np.random.choice(xx_loc, 1)) x0 = x0[keep_list, :].reshape([-1, self.extend_factor_number]) # Return if the maximum run has been reached. if ((self.logs.shape[0] + self.n_runs_per_stage - x0.shape[0]) > self.max_runs): self.stop_flag = True if self.verbose: print("Maximum number of runs reached, stop!") return if (x0.shape[0] >= self.n_runs_per_stage): self.stop_flag = True if self.verbose: print("Search space already full, stop!") return # 4. Generate Sequential UD base_ud = pydoe.gen_aud_ms(x0, n=self.n_runs_per_stage, s=self.extend_factor_number, q=self.n_runs_per_stage, crit="CD2", maxiter=self.max_search_iter, random_state=self.random_state, n_jobs=10, nshoot=10) if (not isinstance(base_ud, np.ndarray)): raise ValueError('Uniform design is not correctly constructed!') base_ud_aug = base_ud[(x0.shape[0]):base_ud.shape[0], :].reshape([-1, self.extend_factor_number]) para_set_ud = np.zeros( (base_ud_aug.shape[0], self.extend_factor_number)) for i in range(self.factor_number): loc_min = np.sum(self.variable_number[:(i + 1)]) loc_max = np.sum(self.variable_number[:(i + 2)]) for k in range(int(loc_min), int(loc_max)): para_set_ud[:, k] = ud_space[base_ud_aug[:, k] - 1, k] para_set_ud = pd.DataFrame(para_set_ud, columns=self.para_ud_names) return para_set_ud def _evaluate_runs(self, obj_func, para_set_ud): """ This function evaluates the performance scores of given trials. Parameters ---------- obj_func: A callable function. It takes the values stored in each trial as input parameters, and output the corresponding scores. para_set_ud: A pandas dataframe where each row represents a UD trial point, and columns are used to represent variables. """ para_set = self._para_mapping(para_set_ud) para_set_ud.columns = self.para_ud_names candidate_params = [{para_set.columns[j]: para_set.iloc[i, j] for j in range(para_set.shape[1])} for i in range(para_set.shape[0])] if self.n_jobs > 1: out = Parallel(n_jobs=self.n_jobs)(delayed(obj_func)(parameters) for parameters in candidate_params) else: out = [] for parameters in candidate_params: out.append(obj_func(parameters)) out = np.array(out) logs_aug = para_set_ud.to_dict() logs_aug.update(para_set) logs_aug.update(pd.DataFrame(out, columns=["score"])) logs_aug = pd.DataFrame(logs_aug) logs_aug["stage"] = self.stage self.logs = pd.concat([self.logs, logs_aug]).reset_index(drop=True) if self.verbose: print("Stage %d completed (%d/%d) with best score: %.5f." % (self.stage, self.logs.shape[0], self.max_runs, self.logs["score"].max())) def _run(self, obj_func): """ This function controls the procedures for implementing the sequential uniform design method. Parameters ---------- obj_func: A callable function. It takes the values stored in each trial as input parameters, and output the corresponding scores. """ para_set_ud = self._generate_init_design() self._evaluate_runs(obj_func, para_set_ud) self.stage += 1 while (True): ud_center = self.logs.sort_values( "score", ascending=False).loc[:, self.para_ud_names].values[0, :] para_set_ud = self._generate_augment_design(ud_center) if not self.stop_flag: self._evaluate_runs(obj_func, para_set_ud) self.stage += 1 else: break
[docs] def fmax(self, wrapper_func): """ Search the optimal value of a function. Parameters ---------- :type func: callable function :param func: the function to be optimized. """ self.stage = 1 np.random.seed(self.random_state) search_start_time = time.time() self._run(wrapper_func) search_end_time = time.time() self.search_time_consumed_ = search_end_time - search_start_time self._summary()
[docs] def fit(self, x, y=None): """ Run fit with all sets of parameters. Parameters ---------- :type x: array, shape = [n_samples, n_features] :param x: input variales. :type y: array, shape = [n_samples] or [n_samples, n_output], optional :param y: target variable. """ def sklearn_wrapper(parameters): self.estimator.set_params(**parameters) out = cross_val_score(self.estimator, x, y, cv=self.cv, scoring=self.scoring) score = np.mean(out) return score self.stage = 1 self.logs = pd.DataFrame() np.random.seed(self.random_state) index = np.where(["random_state" in param for param in list(self.estimator.get_params().keys())])[0] for idx in index: self.estimator.set_params(**{list(self.estimator.get_params().keys())[idx]:self.random_state}) search_start_time = time.time() self._run(sklearn_wrapper) search_end_time = time.time() self.search_time_consumed_ = search_end_time - search_start_time self._summary() if self.refit: self.best_estimator_ = self.estimator.set_params( **self.best_params_) refit_start_time = time.time() if y is not None: self.best_estimator_.fit(x, y) else: self.best_estimator_.fit(x) refit_end_time = time.time() self.refit_time_ = refit_end_time - refit_start_time