Source code for elm.elmk

# -*- coding: utf-8 -*-

"""
    This file contains ELMKernel classes and all developed methods.
"""

# Python2 support
from __future__ import unicode_literals
from __future__ import division
from __future__ import absolute_import
from __future__ import print_function

from .mltools import *

import numpy as np
import optunity
import ast

import sys
if sys.version_info < (3, 0):
    import ConfigParser as configparser
else:
    import configparser


# Find configuration file
from pkg_resources import Requirement, resource_filename
_ELMK_CONFIG = resource_filename(Requirement.parse("elm"), "elm/elmk.cfg")


[docs]class ELMKernel(MLTools): """ A Python implementation of ELM Kernel defined by Huang[1]. An ELM is a single-hidden layer feedforward network (SLFN) proposed by Huang back in 2006, in 2012 the author revised and introduced a new concept of using kernel functions to his previous work. This implementation currently accepts both methods proposed at 2012, random neurons and kernel functions to estimate classifier/regression functions. Let the dimensionality "d" of the problem be the sum of "t" size (number of targets per pattern) and "f" size (number of features per pattern). So, d = t + f The data will be set as Pattern = (Target | Features). If database has *N* patterns, its size follows *Nxd*. Note: [1] Paper reference: Huang, 2012, "Extreme Learning Machine for Regression and Multiclass Classification" Attributes: output_weight (numpy.ndarray): a column vector (*Nx1*) calculated after training, represent :math:\\beta. training_patterns (numpy.ndarray): a matrix (*Nxd*) containing all patterns used for training. Need to save all training patterns to perform kernel calculation at testing and prediction phase. param_kernel_function (str): kernel function that will be used for training. param_c (float): regularization coefficient (*C*) used for training. param_kernel_params (list of float): kernel function parameters that will be used for training. Other Parameters: regressor_name (str): The name of classifier/regressor. available_kernel_functions (list of str): List with all available kernel functions. default_param_kernel_function (str): Default kernel function if not set at class constructor. default_param_c (float): Default parameter c value if not set at class constructor. default_param_kernel_params (list of float): Default kernel function parameters if not set at class constructor. Note: * **regressor_name**: defaults to "elmk". * **default_param_kernel_function**: defaults to "rbf". * **default_param_c**: defaults to 9. * **default_param_kernel_params**: defaults to [-15]. """
[docs] def __init__(self, params=[]): """ Class constructor. Arguments: params (list): first argument (*str*) is an available kernel function, second argument (*float*) is the coefficient *C* of regularization and the third and last argument is a list of arguments for the kernel function. Example: >>> import elm >>> params = ["linear", 5, []] >>> elmk = elm.ELMKernel(params) """ super(self.__class__, self).__init__() self.regressor_name = "elmk" self.available_kernel_functions = ["rbf", "linear", "poly"] self.default_param_kernel_function = "rbf" self.default_param_c = 9 self.default_param_kernel_params = [-15] self.output_weight = [] self.training_patterns = [] # Initialized parameters values if not params: self.param_kernel_function = self.default_param_kernel_function self.param_c = self.default_param_c self.param_kernel_params = self.default_param_kernel_params else: self.param_kernel_function = params[0] self.param_c = params[1] self.param_kernel_params = params[2] # ######################## # Private Methods # ########################
def _kernel_matrix(self, training_patterns, kernel_type, kernel_param, test_patterns=None): """ Calculate the Omega matrix (kernel matrix). If test_patterns is None, then the training Omega matrix will be calculated. This matrix represents the kernel value from each pattern of the training matrix with each other. If test_patterns exists, then the test Omega matrix will be calculated. This matrix represents the kernel value from each pattern of the training matrix with the patterns of test matrix. Arguments: training_patterns (numpy.ndarray): A matrix containing the features from all training patterns. kernel_type (str): The type of kernel to be used e.g: rbf kernel_param (list of float): The parameters of the chosen kernel. test_patterns (numpy.ndarray): An optional parameter used to calculate the Omega test matrix. Returns: numpy.ndarray: Omega matrix """ number_training_patterns = training_patterns.shape[0] if kernel_type == "rbf": if test_patterns is None: temp_omega = np.dot( np.sum(training_patterns ** 2, axis=1).reshape(-1, 1), np.ones((1, number_training_patterns))) temp_omega = temp_omega + temp_omega.conj().T omega = np.exp( -(2 ** kernel_param[0]) * (temp_omega - 2 * (np.dot( training_patterns, training_patterns.conj().T)))) else: number_test_patterns = test_patterns.shape[0] temp1 = np.dot( np.sum(training_patterns ** 2, axis=1).reshape(-1, 1), np.ones((1, number_test_patterns))) temp2 = np.dot( np.sum(test_patterns ** 2, axis=1).reshape(-1, 1), np.ones((1, number_training_patterns))) temp_omega = temp1 + temp2.conj().T omega = \ np.exp(- (2 ** kernel_param[0]) * (temp_omega - 2 * np.dot(training_patterns, test_patterns.conj().T))) elif kernel_type == "linear": if test_patterns is None: omega = np.dot(training_patterns, training_patterns.conj().T) else: omega = np.dot(training_patterns, test_patterns.conj().T) elif kernel_type == "poly": # Power a**x is undefined when x is real and 'a' is negative, # so is necessary to force an integer value kernel_param[1] = round(kernel_param[1]) if test_patterns is None: temp = np.dot(training_patterns, training_patterns.conj().T)+ kernel_param[0] omega = temp ** kernel_param[1] else: temp = np.dot(training_patterns, test_patterns.conj().T)+ kernel_param[0] omega = temp ** kernel_param[1] else: print("Error: Invalid or unavailable kernel function.") return return omega def _local_train(self, training_patterns, training_expected_targets, params): # If params not provided, uses initialized parameters values if not params: pass else: self.param_kernel_function = params[0] self.param_c = params[1] self.param_kernel_params = params[2] # Need to save all training patterns to perform kernel calculation at # testing and prediction phase self.training_patterns = training_patterns number_training_patterns = self.training_patterns.shape[0] # Training phase omega_train = self._kernel_matrix(self.training_patterns, self.param_kernel_function, self.param_kernel_params) self.output_weight = np.linalg.solve( (omega_train + np.eye(number_training_patterns) / (2 ** self.param_c)), training_expected_targets).reshape(-1, 1) training_predicted_targets = np.dot(omega_train, self.output_weight) return training_predicted_targets def _local_test(self, testing_patterns, testing_expected_targets, predicting): omega_test = self._kernel_matrix(self.training_patterns, self.param_kernel_function, self.param_kernel_params, testing_patterns) testing_predicted_targets = np.dot(omega_test.conj().T, self.output_weight) return testing_predicted_targets # ######################## # Public Methods # ########################
[docs] def get_available_kernel_functions(self): """ Return available kernel functions. """ return self.available_kernel_functions
[docs] def print_parameters(self): """ Print parameters values. """ print() print("Regressor Parameters") print() print("Regularization coefficient: ", self.param_c) print("Kernel Function: ", self.param_kernel_function) print("Kernel parameters: ", self.param_kernel_params) print() print("CV error: ", self.cv_best_rmse) print()
[docs] def search_param(self, database, dataprocess=None, path_filename=("", ""), save=False, cv="ts", of="rmse", kf=None, eval=50): """ Search best hyperparameters for classifier/regressor based on optunity algorithms. Arguments: database (numpy.ndarray): a matrix containing all patterns that will be used for training/testing at some cross-validation method. dataprocess (DataProcess): an object that will pre-process database before training. Defaults to None. path_filename (tuple): *TODO*. save (bool): *TODO*. cv (str): Cross-validation method. Defaults to "ts". of (str): Objective function to be minimized at optunity.minimize. Defaults to "rmse". kf (list of str): a list of kernel functions to be used by the search. Defaults to None, this set all available functions. eval (int): Number of steps (evaluations) to optunity algorithm. Each set of hyperparameters will perform a cross-validation method chosen by param cv. Available *cv* methods: - "ts" :func:`mltools.time_series_cross_validation()` Perform a time-series cross-validation suggested by Hydman. - "kfold" :func:`mltools.kfold_cross_validation()` Perform a k-fold cross-validation. Available *of* function: - "accuracy", "rmse", "mape", "me". See Also: http://optunity.readthedocs.org/en/latest/user/index.html """ if kf is None: search_kernel_functions = self.available_kernel_functions elif type(kf) is list: search_kernel_functions = kf else: raise Exception("Invalid format for argument 'kf'.") print(self.regressor_name) print("##### Start search #####") config = configparser.ConfigParser() if sys.version_info < (3, 0): config.readfp(open(_ELMK_CONFIG)) else: config.read_file(open(_ELMK_CONFIG)) best_function_error = 99999.9 temp_error = best_function_error best_param_c = 0 best_param_kernel_function = "" best_param_kernel_param = [] for kernel_function in search_kernel_functions: if sys.version_info < (3, 0): elmk_c_range = ast.literal_eval(config.get("DEFAULT", "elmk_c_range")) n_parameters = config.getint(kernel_function, "kernel_n_param") kernel_p_range = \ ast.literal_eval(config.get(kernel_function, "kernel_params_range")) else: kernel_config = config[kernel_function] elmk_c_range = ast.literal_eval(kernel_config["elmk_c_range"]) n_parameters = int(kernel_config["kernel_n_param"]) kernel_p_range = \ ast.literal_eval(kernel_config["kernel_params_range"]) param_ranges = [[elmk_c_range[0][0], elmk_c_range[0][1]]] for param in range(n_parameters): param_ranges.append([kernel_p_range[param][0], kernel_p_range[param][1]]) def wrapper_0param(param_c): """ Wrapper for objective function. """ if cv == "ts": cv_tr_error, cv_te_error = \ time_series_cross_validation(self, database, [kernel_function, param_c, list([])], number_folds=10, dataprocess=dataprocess) elif cv == "kfold": cv_tr_error, cv_te_error = \ kfold_cross_validation(self, database, [kernel_function, param_c, list([])], number_folds=10, dataprocess=dataprocess) else: raise Exception("Invalid type of cross-validation.") if of == "accuracy": util = 1 / cv_te_error.get_accuracy() else: util = cv_te_error.get(of) # print("c:", param_c, "util: ", util) return util def wrapper_1param(param_c, param_kernel): """ Wrapper for optunity. """ if cv == "ts": cv_tr_error, cv_te_error = \ time_series_cross_validation(self, database, [kernel_function, param_c, list([param_kernel])], number_folds=10, dataprocess=dataprocess) elif cv == "kfold": cv_tr_error, cv_te_error = \ kfold_cross_validation(self, database, [kernel_function, param_c, list([param_kernel])], number_folds=10, dataprocess=dataprocess) else: raise Exception("Invalid type of cross-validation.") if of == "accuracy": util = 1 / cv_te_error.get_accuracy() else: util = cv_te_error.get(of) # print("c:", param_c, " gamma:", param_kernel, "util: ", util) return util def wrapper_2param(param_c, param_kernel1, param_kernel2): """ Wrapper for optunity. """ if cv == "ts": cv_tr_error, cv_te_error = \ time_series_cross_validation(self, database, [kernel_function, param_c, list([param_kernel1, param_kernel2])], number_folds=10, dataprocess=dataprocess) elif cv == "kfold": cv_tr_error, cv_te_error = \ kfold_cross_validation(self, database, [kernel_function, param_c, list([param_kernel1, param_kernel2])], number_folds=10, dataprocess=dataprocess) else: raise Exception("Invalid type of cross-validation.") if of == "accuracy": util = 1 / cv_te_error.get_accuracy() else: util = cv_te_error.get(of) # print("c:", param_c, " param1:", param_kernel1, # " param2:", param_kernel2, "util: ", util) return util if kernel_function == "linear": optimal_parameters, details, _ = \ optunity.minimize(wrapper_0param, solver_name="cma-es", num_evals=eval, param_c=param_ranges[0]) elif kernel_function == "rbf": optimal_parameters, details, _ = \ optunity.minimize(wrapper_1param, solver_name="cma-es", num_evals=eval, param_c=param_ranges[0], param_kernel=param_ranges[1]) elif kernel_function == "poly": optimal_parameters, details, _ = \ optunity.minimize(wrapper_2param, solver_name="cma-es", num_evals=eval, param_c=param_ranges[0], param_kernel1=param_ranges[1], param_kernel2=param_ranges[2]) else: raise Exception("Invalid kernel function.") # Save best kernel result if details[0] < temp_error: temp_error = details[0] if of == "accuracy": best_function_error = 1 / temp_error else: best_function_error = temp_error best_param_kernel_function = kernel_function best_param_c = optimal_parameters["param_c"] if best_param_kernel_function == "linear": best_param_kernel_param = [] elif best_param_kernel_function == "rbf": best_param_kernel_param = [optimal_parameters["param_kernel"]] elif best_param_kernel_function == "poly": best_param_kernel_param = \ [optimal_parameters["param_kernel1"], optimal_parameters["param_kernel2"]] else: raise Exception("Invalid kernel function.") # print("best: ", best_param_kernel_function, # best_function_error, best_param_c, best_param_kernel_param) if of == "accuracy": print("Kernel function: ", kernel_function, " best cv value: ", 1/details[0]) else: print("Kernel function: ", kernel_function, " best cv value: ", details[0]) # MLTools attribute self.cv_best_rmse = best_function_error # ELM attribute self.param_c = best_param_c self.param_kernel_function = best_param_kernel_function self.param_kernel_params = best_param_kernel_param print("##### Search complete #####") self.print_parameters() return None
[docs] def train(self, training_matrix, params=[]): """ Calculate output_weight values needed to test/predict data. If params is provided, this method will use at training phase. Else, it will use the default value provided at object initialization. Arguments: training_matrix (numpy.ndarray): a matrix containing all patterns that will be used for training. params (list): a list of parameters defined at :func:`ELMKernel.__init__` Returns: :class:`Error`: training error object containing expected, predicted targets and all error metrics. Note: Training matrix must have target variables as the first column. """ return self._ml_train(training_matrix, params)
[docs] def test(self, testing_matrix, predicting=False): """ Calculate test predicted values based on previous training. Args: testing_matrix (numpy.ndarray): a matrix containing all patterns that will be used for testing. predicting (bool): Don't set. Returns: :class:`Error`: testing error object containing expected, predicted targets and all error metrics. Note: Testing matrix must have target variables as the first column. """ return self._ml_test(testing_matrix, predicting)
@copy_doc_of(MLTools._ml_predict)
[docs] def predict(self, horizon=1): # self.__doc__ = self._ml_predict.__doc__ return self._ml_predict(horizon)
@copy_doc_of(MLTools._ml_train_iterative)
[docs] def train_iterative(self, database_matrix, params=[], sliding_window=168, k=1): # self.__doc__ = self._ml_train_iterative.__doc__ return self._ml_train_iterative(database_matrix, params, sliding_window, k)