"""Base classes for NARMAX estimator."""
# Authors:
# Wilson Rocha Lacerda Junior <wilsonrljr@outlook.com>
# License: BSD 3 clause
import warnings
from collections import Counter
from itertools import chain, combinations_with_replacement
import numpy as np
from .utils._check_arrays import _check_positive_int, _num_features, check_X_y
[docs]class GenerateRegressors:
"""Polynomial NARMAX model
Provides the main functions to generate the regressor dictionary
and regressor codes for polynomial basis.
"""
[docs] def create_narmax_code(self, non_degree, xlag, ylag, n_inputs):
"""Create the code representation of the regressors.
This function generates a codification from all possibles
regressors given the maximum lag of the input and output.
This is used to write the final terms of the model in a
readable form. [1001] -> y(k-1).
This code format was based on a dissertation from UFMG. See
reference below.
Parameters
----------
non_degree : int
The desired maximum nonlinearity degree.
ylag : int
The maximum lag of output regressors.
xlag : int
The maximum lag of input regressors.
Returns
-------
max_lag : int
This value can be used by another functions.
regressor_code : ndarray of int
Matrix codification of all possible regressors.
Examples
--------
The codification is defined as:
>>> 100n = y(k-n)
>>> 200n = u(k-n)
>>> [100n 100n] = y(k-n)y(k-n)
>>> [200n 200n] = u(k-n)u(k-n)
References
----------
[1] Master Thesis: Barbosa, Alípio Monteiro.
Técnicas de otimização bi-objetivo para a determinação
da estrutura de modelos NARX (2010).
"""
if not isinstance(non_degree, int) or non_degree < 1:
raise ValueError(
"non_degree must be integer and > zero. Got %f" % non_degree
)
if not isinstance(ylag, (int, list)) or np.min(np.minimum(ylag, 1)) < 1:
raise ValueError("ylag must be integer or list and > zero. Got %f" % ylag)
if (
not isinstance(xlag, (int, list))
# or np.min(np.minimum(xlag, 1)) < 1):
or np.min(np.min(list(chain.from_iterable([[xlag]])))) < 1
):
raise ValueError("xlag must be integer or list and > zero. Got %f" % xlag)
if not isinstance(n_inputs, int) or n_inputs < 1:
raise ValueError("n_inputs must be integer and > zero. Got %f" % n_inputs)
if isinstance(ylag, list):
# create only the lags passed from list
y_vec = []
y_vec.extend([lag + 1000 for lag in ylag])
y_vec = np.array(y_vec)
else:
# create a range of lags if passed a int value
y_vec = np.arange(1001, 1001 + ylag)
if isinstance(xlag, list) and n_inputs == 1:
# create only the lags passed from list
x_vec_tmp = []
x_vec_tmp.extend([lag + 2000 for lag in xlag])
x_vec_tmp = np.array(x_vec_tmp)
elif isinstance(xlag, int) and n_inputs == 1:
# create a range of lags if passed a int value
x_vec_tmp = np.arange(2001, 2001 + xlag)
elif n_inputs > 1:
# only list are allowed if n_inputs > 1
# the user must entered list of the desired lags explicitly
x_vec_tmp = []
for i in range(n_inputs):
if isinstance(xlag[i], list) and n_inputs > 1:
# create 200n, 300n,..., 400n to describe each input
x_vec_tmp.extend([lag + 2000 + i * 1000 for lag in xlag[i]])
elif isinstance(xlag[i], int) and n_inputs > 1:
x_vec_tmp.extend(
[np.arange(2001 + i * 1000, 2001 + i * 1000 + xlag[i])]
)
if n_inputs > 1:
# if x_vec is a nested list, ensure all elements are arrays
all_arrays = [np.array([i]) if isinstance(i, int) else i for i in x_vec_tmp]
x_vec = np.concatenate([i for i in all_arrays])
else:
x_vec = x_vec_tmp
return x_vec, y_vec
def regressor_space(self, non_degree, xlag, ylag, n_inputs, model_type="NARMAX"):
x_vec, y_vec = self.create_narmax_code(non_degree, xlag, ylag, n_inputs)
reg_aux = np.array([0])
if model_type == "NARMAX":
reg_aux = np.concatenate([reg_aux, y_vec, x_vec])
elif model_type == "NAR":
reg_aux = np.concatenate([reg_aux, y_vec])
elif model_type == "NFIR":
reg_aux = np.concatenate([reg_aux, x_vec])
else:
raise Exception(
"Unrecognized model type. Model type should be NARMAX, NAR or NFIR"
)
regressor_code = list(combinations_with_replacement(reg_aux, non_degree))
regressor_code = np.array(regressor_code)
regressor_code = regressor_code[:, regressor_code.shape[1] :: -1]
return regressor_code
[docs]class HouseHolder:
"""Householder reflection and transformation."""
def _house(self, x):
"""Perform a Householder reflection of vector.
Parameters
----------
x : array-like of shape = number_of_training_samples
The respective column of the matrix of regressors in each
iteration of ERR function.
Returns
-------
v : array-like of shape = number_of_training_samples
The reflection of the array x.
References
----------
[1] Manuscript: Chen, S., Billings, S. A., & Luo, W. (1989).
Orthogonal least squares methods and their application to non-linear system identification.
"""
u = np.linalg.norm(x, 2)
if u != 0:
aux_b = x[0] + np.sign(x[0]) * u
x = x[1:] / aux_b
x = np.concatenate((np.array([1]), x))
return x
def _rowhouse(self, RA, v):
"""Perform a row Householder transformation.
Parameters
----------
RA : array-like of shape = number_of_training_samples
The respective column of the matrix of regressors in each
iteration of ERR function.
v : array-like of shape = number_of_training_samples
The reflected vector obtained by using the householder reflection.
Returns
-------
B : array-like of shape = number_of_training_samples
References
----------
[1] Manuscript: Chen, S., Billings, S. A., & Luo, W. (1989).
Orthogonal least squares methods and their application to
non-linear system identification. International Journal of
control, 50(5), 1873-1896.
"""
b = -2 / np.dot(v.T, v)
w = b * np.dot(RA.T, v)
w = w.reshape(1, -1)
v = v.reshape(-1, 1)
RA = RA + v * w
B = RA
return B
class ModelInformation:
def _get_index_from_regressor_code(self, regressor_code, model_code):
"""Get the index of user regressor in regressor space.
Took from: https://stackoverflow.com/questions/38674027/find-the-row-indexes-of-several-values-in-a-numpy-array/38674038#38674038
Parameters
----------
regressor_code : ndarray of int
Matrix codification of all possible regressors.
model_code : ndarray of int
Model defined by the user to simulate.
Returns
-------
model_index : ndarray of int
Index of model code in the regressor space.
"""
dims = regressor_code.max(0) + 1
model_index = np.where(
np.in1d(
np.ravel_multi_index(regressor_code.T, dims),
np.ravel_multi_index(model_code.T, dims),
)
)[0]
return model_index
def _list_output_regressor_code(self, model_code):
"""Create a flattened array of output regressors.
Parameters
----------
model_code : ndarray of int
Model defined by the user to simulate.
Returns
-------
model_code : ndarray of int
Flattened list of output regressors.
"""
regressor_code = [
code for code in model_code.ravel() if (code != 0) and (str(code)[0] == "1")
]
return np.asarray(regressor_code)
def _list_input_regressor_code(self, model_code):
"""Create a flattened array of input regressors.
Parameters
----------
model_code : ndarray of int
Model defined by the user to simulate.
Returns
-------
model_code : ndarray of int
Flattened list of output regressors.
"""
regressor_code = [
code for code in model_code.ravel() if (code != 0) and (str(code)[0] != "1")
]
return np.asarray(regressor_code)
def _get_lag_from_regressor_code(self, regressors):
"""Get the maximum lag from array of regressors.
Parameters
----------
regressors : ndarray of int
Flattened list of input or output regressors.
Returns
-------
max_lag : int
Maximum lag of list of regressors.
"""
lag_list = [
int(i) for i in regressors.astype("str") for i in [np.sum(int(i[2:]))]
]
if len(lag_list) != 0:
return max(lag_list)
else:
return 1
def _get_max_lag_from_model_code(self, model_code):
xlag_code = self._list_input_regressor_code(model_code)
ylag_code = self._list_output_regressor_code(model_code)
xlag = self._get_lag_from_regressor_code(xlag_code)
ylag = self._get_lag_from_regressor_code(ylag_code)
return max(xlag, ylag)
def _get_max_lag(self, ylag=1, xlag=1):
"""Get the max lag defined by the user.
Parameters
----------
ylag : int
The maximum lag of output regressors.
xlag : int
The maximum lag of input regressors.
Returns
-------
max_lag = int
The max lag value defined by the user.
"""
ny = np.max(list(chain.from_iterable([[ylag]])))
nx = np.max(list(chain.from_iterable([[xlag]])))
return np.max([ny, np.max(nx)])
class ModelPrediction:
def predict(self, X, y, steps_ahead=None):
"""Return the predicted values given an input.
The predict function allows a friendly usage by the user.
Given a previously trained model, predict values given
a new set of data.
This method accept y values mainly for prediction n-steps ahead
(to be implemented in the future)
Parameters
----------
X : ndarray of floats
The input data to be used in the prediction process.
y : ndarray of floats
The output data to be used in the prediction process.
steps_ahead = int (default = None)
The forecast horizon.
Returns
-------
yhat : ndarray of floats
The predicted values of the model.
"""
if self.basis_function.__class__.__name__ == "Polynomial":
if steps_ahead is None:
return self._model_prediction(X, y)
elif steps_ahead == 1:
return self._one_step_ahead_prediction(X, y)
else:
_check_positive_int(steps_ahead, "steps_ahead")
return self._n_step_ahead_prediction(X, y, steps_ahead=steps_ahead)
else:
if steps_ahead is None:
return self._basis_function_predict(X, y)
elif steps_ahead == 1:
return self._one_step_ahead_prediction(X, y)
def _code2exponents(self, code):
"""
Convert regressor code to exponents array.
Parameters
----------
code : 1D-array of int
Codification of one regressor.
"""
regressors = np.array(list(set(code)))
regressors_count = Counter(code)
if np.all(regressors == 0):
return np.zeros(self.max_lag * (1 + self._n_inputs))
else:
exponents = np.array([], dtype=float)
elements = np.round(np.divide(regressors, 1000), 0)[
(regressors > 0)
].astype(int)
for j in range(1, self._n_inputs + 2):
base_exponents = np.zeros(self.max_lag, dtype=float)
if j in elements:
for i in range(1, self.max_lag + 1):
regressor_code = int(j * 1000 + i)
base_exponents[-i] = regressors_count[regressor_code]
exponents = np.append(exponents, base_exponents)
else:
exponents = np.append(exponents, base_exponents)
return exponents
def _one_step_ahead_prediction(self, X, y):
"""Perform the 1-step-ahead prediction of a model.
Parameters
----------
y : array-like of shape = max_lag
Initial conditions values of the model
to start recursive process.
X : ndarray of floats of shape = n_samples
Vector with input values to be used in model simulation.
Returns
-------
yhat : ndarray of floats
The 1-step-ahead predicted values of the model.
"""
if self.model_type == "NAR":
lagged_data = self.build_output_matrix(y, self.ylag)
# self.max_lag = ModelInformation()._get_max_lag(ylag=self.ylag)
elif self.model_type == "NFIR":
lagged_data = self.build_input_matrix(X, self.xlag)
# self.max_lag = ModelInformation()._get_max_lag(xlag=self.xlag)
elif self.model_type == "NARMAX":
# check_X_y(X, y)
# self.max_lag = ModelInformation()._get_max_lag(ylag=self.ylag, xlag=self.xlag)
lagged_data = self.build_input_output_matrix(X, y, self.xlag, self.ylag)
else:
raise ValueError(
"Unrecognized model type. The model_type should be NARMAX, NAR or NFIR."
)
if self.basis_function.__class__.__name__ == "Polynomial":
X_base = self.basis_function.transform(
lagged_data,
self.max_lag,
predefined_regressors=self.pivv[: len(self.final_model)],
)
else:
X_base, _ = self.basis_function.transform(
lagged_data,
self.max_lag,
predefined_regressors=self.pivv[: len(self.final_model)],
)
# piv_final_model = self.pivv[: len(self.final_model)]
# X_base = X_base[:, piv_final_model]
yhat = np.dot(X_base, self.theta.flatten())
yhat = np.concatenate([y[: self.max_lag].flatten(), yhat])
return yhat.reshape(-1, 1)
def _n_step_ahead_prediction(self, X, y, steps_ahead):
"""Perform the n-steps-ahead prediction of a model.
Parameters
----------
y : array-like of shape = max_lag
Initial conditions values of the model
to start recursive process.
X : ndarray of floats of shape = n_samples
Vector with input values to be used in model simulation.
Returns
-------
yhat : ndarray of floats
The n-steps-ahead predicted values of the model.
"""
if len(y) < self.max_lag:
raise Exception("Insufficient initial conditions elements!")
yhat = np.zeros(X.shape[0], dtype=float)
yhat.fill(np.nan)
yhat[: self.max_lag] = y[: self.max_lag, 0]
i = self.max_lag
X = X.reshape(-1, self._n_inputs)
while i < len(y):
k = int(i - self.max_lag)
if i + steps_ahead > len(y):
steps_ahead = len(y) - i # predicts the remaining values
yhat[i : i + steps_ahead] = self._model_prediction(
X[k : i + steps_ahead], y[k : i + steps_ahead]
)[-steps_ahead:].ravel()
i += steps_ahead
yhat = yhat.ravel()
return yhat.reshape(-1, 1)
def _model_prediction(self, X, y_initial, forecast_horizon=None):
"""Perform the infinity steps-ahead simulation of a model.
Parameters
----------
y_initial : array-like of shape = max_lag
Number of initial conditions values of output
to start recursive process.
X : ndarray of floats of shape = n_samples
Vector with input values to be used in model simulation.
Returns
-------
yhat : ndarray of floats
The predicted values of the model.
"""
if self.model_type in ["NARMAX", "NAR"]:
return self._narmax_predict(X, y_initial, forecast_horizon)
elif self.model_type == "NFIR":
return self._nfir_predict(X, y_initial)
else:
raise Exception(
"model_type do not exist! Model type must be NARMAX, NAR or NFIR"
)
def _narmax_predict(self, X, y_initial, forecast_horizon):
if len(y_initial) < self.max_lag:
raise Exception("Insufficient initial conditions elements!")
# X = X.reshape(-1, self._n_inputs)
if X is not None:
forecast_horizon = X.shape[0]
else:
forecast_horizon = forecast_horizon + self.max_lag
if self.model_type == "NAR":
self._n_inputs = 0
y_output = np.zeros(forecast_horizon, dtype=float)
y_output.fill(np.nan)
y_output[: self.max_lag] = y_initial[: self.max_lag, 0]
model_exponents = [self._code2exponents(model) for model in self.final_model]
raw_regressor = np.zeros(len(model_exponents[0]), dtype=float)
for i in range(self.max_lag, forecast_horizon):
init = 0
final = self.max_lag
k = int(i - self.max_lag)
raw_regressor[:final] = y_output[k:i]
for j in range(self._n_inputs):
init += self.max_lag
final += self.max_lag
raw_regressor[init:final] = X[k:i, j]
regressor_value = np.zeros(len(model_exponents))
for j in range(len(model_exponents)):
regressor_value[j] = np.prod(
np.power(raw_regressor, model_exponents[j])
)
y_output[i] = np.dot(regressor_value, self.theta.flatten())
return y_output.reshape(-1, 1)
def _nfir_predict(self, X, y_initial):
y_output = np.zeros(X.shape[0], dtype=float)
y_output.fill(np.nan)
y_output[: self.max_lag] = y_initial[: self.max_lag, 0]
X = X.reshape(-1, self._n_inputs)
model_exponents = [self._code2exponents(model) for model in self.final_model]
raw_regressor = np.zeros(len(model_exponents[0]), dtype=float)
for i in range(self.max_lag, X.shape[0]):
init = 0
final = self.max_lag
k = int(i - self.max_lag)
for j in range(self._n_inputs):
raw_regressor[init:final] = X[k:i, j]
init += self.max_lag
final += self.max_lag
regressor_value = np.zeros(len(model_exponents))
for j in range(len(model_exponents)):
regressor_value[j] = np.prod(
np.power(raw_regressor, model_exponents[j])
)
y_output[i] = np.dot(regressor_value, self.theta.flatten())
return y_output.reshape(-1, 1)
def _basis_function_predict(self, X, y_initial, theta, forecast_horizon=None):
if X is not None:
forecast_horizon = X.shape[0]
else:
forecast_horizon = forecast_horizon + self.max_lag
if self.model_type == "NAR":
self._n_inputs = 0
yhat = np.zeros(forecast_horizon, dtype=float)
yhat.fill(np.nan)
yhat[: self.max_lag] = y_initial[: self.max_lag, 0]
# Discard unnecessary initial values
# yhat[0:self.max_lag] = y_initial[0:self.max_lag]
analysed_elements_number = self.max_lag + 1
for i in range(0, forecast_horizon - self.max_lag):
if self.model_type == "NARMAX":
lagged_data = self.build_input_output_matrix(
X[i : i + analysed_elements_number],
yhat[i : i + analysed_elements_number].reshape(-1, 1),
self.xlag,
self.ylag,
)
elif self.model_type == "NAR":
lagged_data = self.build_output_matrix(
yhat[i : i + analysed_elements_number].reshape(-1, 1), self.ylag
)
elif self.model_type == "NFIR":
lagged_data = self.build_input_matrix(
X[i : i + analysed_elements_number], self.xlag
)
else:
raise ValueError(
"Unrecognized model type. The model_type should be NARMAX, NAR or NFIR."
)
X_tmp, _ = self.basis_function.transform(
lagged_data,
self.max_lag,
predefined_regressors=self.pivv[: len(self.final_model)],
)
a = X_tmp @ theta
yhat[i + self.max_lag] = a[:, 0]
return yhat.reshape(-1, 1)
def basis_function_n_step_prediction(self, X, y, steps_ahead, forecast_horizon):
"""Perform the n-steps-ahead prediction of a model.
Parameters
----------
y : array-like of shape = max_lag
Initial conditions values of the model
to start recursive process.
X : ndarray of floats of shape = n_samples
Vector with input values to be used in model simulation.
Returns
-------
yhat : ndarray of floats
The n-steps-ahead predicted values of the model.
"""
if len(y) < self.max_lag:
raise Exception("Insufficient initial conditions elements!")
if X is not None:
forecast_horizon = X.shape[0]
else:
forecast_horizon = forecast_horizon + self.max_lag
yhat = np.zeros(forecast_horizon, dtype=float)
yhat.fill(np.nan)
yhat[: self.max_lag] = y[: self.max_lag, 0]
# Discard unnecessary initial values
analysed_elements_number = self.max_lag + 1
i = self.max_lag
while i < len(y):
k = int(i - self.max_lag)
if i + steps_ahead > len(y):
steps_ahead = len(y) - i # predicts the remaining values
if self.model_type == "NARMAX":
yhat[i : i + steps_ahead] = self._basis_function_predict(
X[k : i + steps_ahead], y[k : i + steps_ahead], self.theta
)[-steps_ahead:].ravel()
elif self.model_type == "NAR":
yhat[i : i + steps_ahead] = self._basis_function_predict(
X=None,
y_initial=y[k : i + steps_ahead],
theta=self.theta,
forecast_horizon=forecast_horizon,
)[-forecast_horizon : -forecast_horizon + steps_ahead].ravel()
elif self.model_type == "NFIR":
yhat[i : i + steps_ahead] = self._basis_function_predict(
X=X[k : i + steps_ahead],
y_initial=y[k : i + steps_ahead],
theta=self.theta,
)[-steps_ahead:].ravel()
else:
raise ValueError(
"Unrecognized model type. The model_type should be NARMAX, NAR or NFIR."
)
# yhat[i : i + steps_ahead] = self._basis_function_predict(
# X[k : i + steps_ahead], y[k : i + steps_ahead], self.theta
# )[-steps_ahead:].ravel()
i += steps_ahead
# yhat = yhat.ravel()
return yhat.reshape(-1, 1)
def _basis_function_n_steps_horizon(self, X, y, steps_ahead, forecast_horizon):
yhat = np.zeros(forecast_horizon, dtype=float)
yhat.fill(np.nan)
yhat[: self.max_lag] = y[: self.max_lag, 0]
# Discard unnecessary initial values
analysed_elements_number = self.max_lag + 1
i = self.max_lag
while i < len(y):
k = int(i - self.max_lag)
if i + steps_ahead > len(y):
steps_ahead = len(y) - i # predicts the remaining values
if self.model_type == "NARMAX":
yhat[i : i + steps_ahead] = self._basis_function_predict(
X[k : i + steps_ahead], y[k : i + steps_ahead], self.theta
)[-forecast_horizon : -forecast_horizon + steps_ahead].ravel()
elif self.model_type == "NAR":
yhat[i : i + steps_ahead] = self._basis_function_predict(
X=None,
y_initial=y[k : i + steps_ahead],
theta=self.theta,
forecast_horizon=forecast_horizon,
)[-forecast_horizon : -forecast_horizon + steps_ahead].ravel()
elif self.model_type == "NFIR":
yhat[i : i + steps_ahead] = self._basis_function_predict(
X=X[k : i + steps_ahead],
y_initial=y[k : i + steps_ahead],
theta=self.theta,
)[-forecast_horizon : -forecast_horizon + steps_ahead].ravel()
else:
raise ValueError(
"Unrecognized model type. The model_type should be NARMAX, NAR or NFIR."
)
# yhat[i : i + steps_ahead] = self._basis_function_predict(
# X[k : i + steps_ahead], y[k : i + steps_ahead], self.theta
# )[-steps_ahead:].ravel()
i += steps_ahead
yhat = yhat.ravel()
return yhat.reshape(-1, 1)