""" Simulation methods for NARMAX models """
# Authors:
# Wilson Rocha Lacerda Junior <wilsonrljr@outlook.com>
# License: BSD 3 clause
import numpy as np
from ..utils._check_arrays import check_X_y, _num_features, _check_positive_int
from ..narmax_base import GenerateRegressors, ModelPrediction
from ..narmax_base import HouseHolder
from ..narmax_base import InformationMatrix
from ..narmax_base import ModelInformation
from ..narmax_base import ModelPrediction
from ..parameter_estimation.estimators import Estimators
[docs]class SimulateNARMAX(
Estimators,
GenerateRegressors,
HouseHolder,
ModelInformation,
InformationMatrix,
ModelPrediction,
):
"""Simulation of Polynomial NARMAX model
The NARMAX model is described as:
.. math::
y_k= F^\ell[y_{k-1}, \dotsc, y_{k-n_y},x_{k-d}, x_{k-d-1}, \dotsc, x_{k-d-n_x} + e_{k-1}, \dotsc, e_{k-n_e}] + e_k
where :math:`n_y\in \mathbb{N}^*`, :math:`n_x \in \mathbb{N}`, :math:`n_e \in \mathbb{N}`,
are the maximum lags for the system output and input respectively;
:math:`x_k \in \mathbb{R}^{n_x}` is the system input and :math:`y_k \in \mathbb{R}^{n_y}`
is the system output at discrete time :math:`k \in \mathbb{N}^n`;
:math:`e_k \in \mathbb{R}^{n_e}` stands for uncertainties and possible noise
at discrete time :math:`k`. In this case, :math:`\mathcal{F}^\ell` is some nonlinear function
of the input and output regressors with nonlinearity degree :math:`\ell \in \mathbb{N}`
and :math:`d` is a time delay typically set to :math:`d=1`.
Parameters
----------
estimator : str, default="least_squares"
The parameter estimation method.
extended_least_squares : bool, default=False
Whether to use extended least squares method
for parameter estimation.
Note that we define a specific set of noise regressors.
estimate_parameter : bool, default=False
Whether to use a method for parameter estimation.
Must be True if the user do not enter the pre-estimated parameters.
Note that we define a specific set of noise regressors.
calculate_err : bool, default=False
Whether to use a ERR algorithm to the pre-defined regressors.
lam : float, default=0.98
Forgetting factor of the Recursive Least Squares method.
delta : float, default=0.01
Normalization factor of the P matrix.
offset_covariance : float, default=0.2
The offset covariance factor of the affine least mean squares
filter.
mu : float, default=0.01
The convergence coefficient (learning rate) of the filter.
eps : float
Normalization factor of the normalized filters.
gama : float, default=0.2
The leakage factor of the Leaky LMS method.
weight : float, default=0.02
Weight factor to control the proportions of the error norms
and offers an extra degree of freedom within the adaptation
of the LMS mixed norm method.
Examples
--------
>>> import numpy as np
>>> import matplotlib.pyplot as plt
>>> from sysidentpy.simulation import SimulateNARMAX
>>> from sysidentpy.basis_function._basis_function import Polynomial
>>> from sysidentpy.metrics import root_relative_squared_error
>>> from sysidentpy.utils.generate_data import get_miso_data, get_siso_data
>>> x_train, x_valid, y_train, y_valid = get_siso_data(n=1000,
... colored_noise=True,
... sigma=0.2,
... train_percentage=90)
>>> basis_function = Polynomial(degree=2)
>>> s = SimulateNARMAX(basis_function=basis_function)
>>> model = np.array(
... [
... [1001, 0], # y(k-1)
... [2001, 1001], # x1(k-1)y(k-1)
... [2002, 0], # x1(k-2)
... ]
... )
>>> # theta must be a numpy array of shape (n, 1) where n is the number of regressors
>>> theta = np.array([[0.2, 0.9, 0.1]]).T
>>> yhat = s.simulate(
... X_test=x_test,
... y_test=y_test,
... model_code=model,
... theta=theta,
... )
>>> r = pd.DataFrame(
... results(
... model.final_model, model.theta, model.err,
... model.n_terms, err_precision=8, dtype='sci'
... ),
... columns=['Regressors', 'Parameters', 'ERR'])
>>> print(r)
Regressors Parameters ERR
0 x1(k-2) 0.9000 0.0
1 y(k-1) 0.1999 0.0
2 x1(k-1)y(k-1) 0.1000 0.0
"""
def __init__(
self,
*,
estimator="recursive_least_squares",
extended_least_squares=False,
lam=0.98,
delta=0.01,
offset_covariance=0.2,
mu=0.01,
eps=np.finfo(np.float64).eps,
gama=0.2,
weight=0.02,
estimate_parameter=True,
calculate_err=False,
model_type="NARMAX",
basis_function=None,
):
super().__init__(
lam=lam,
delta=delta,
offset_covariance=offset_covariance,
mu=mu,
eps=eps,
gama=gama,
weight=weight,
)
self.model_type = model_type
self.basis_function = basis_function
self.estimator = estimator
self._extended_least_squares = extended_least_squares
self.estimate_parameter = estimate_parameter
self.calculate_err = calculate_err
self._validate_simulate_params()
def _validate_simulate_params(self):
if not isinstance(self.estimate_parameter, bool):
raise TypeError(
f"estimate_parameter must be False or True. Got {self.estimate_parameter}"
)
if not isinstance(self.calculate_err, bool):
raise TypeError(
f"calculate_err must be False or True. Got {self.calculate_err}"
)
if self.basis_function is None:
raise TypeError(f"basis_function can't be. Got {self.basis_function}")
if self.model_type not in ["NARMAX", "NAR", "NFIR"]:
raise ValueError(
"model_type must be NARMAX, NAR, or NFIR. Got %s" % self.model_type
)
[docs] def simulate(
self,
*,
X_train=None,
y_train=None,
X_test=None,
y_test=None,
model_code=None,
steps_ahead=None,
theta=None,
forecast_horizon=None,
):
"""Simulate a model defined by the user.
Parameters
----------
X_train : ndarray of floats
The input data to be used in the training process.
y_train : ndarray of floats
The output data to be used in the training process.
X_test : ndarray of floats
The input data to be used in the prediction process.
y_test : ndarray of floats
The output data (initial conditions) to be used in the prediction process.
model_code : ndarray of int
Flattened list of input or output regressors.
steps_ahead = int, default = None
The forecast horizon.
theta : array-like of shape = number_of_model_elements
The parameters of the model.
plot : bool, default=True
Indicate if the user wants to plot or not.
Returns
-------
yhat : ndarray of floats
The predicted values of the model.
results : string
Where:
First column represents each regressor element;
Second column represents associated parameter;
Third column represents the error reduction ratio associated
to each regressor.
"""
if self.basis_function.__class__.__name__ != "Polynomial":
raise NotImplementedError(
"Currently, SimulateNARMAX only works for polynomial" " models."
)
if y_test is None:
raise ValueError("y_test cannot be None")
if not isinstance(model_code, np.ndarray):
raise TypeError(f"model_code must be an np.np.ndarray. Got {model_code}")
if not isinstance(steps_ahead, (int, type(None))):
raise ValueError(
f"steps_ahead must be None or integer > zero. Got {steps_ahead}"
)
if not isinstance(theta, np.ndarray) and not self.estimate_parameter:
raise TypeError(
f"If estimate_parameter is False, theta must be an np.np.ndarray. Got {theta}"
)
if self.estimate_parameter:
if not all(isinstance(i, np.ndarray) for i in [y_train]):
raise TypeError(
f"If estimate_parameter is True, X_train and y_train must be an np.ndarray. Got {type(y_train)}"
)
if y_train is None:
raise ValueError("y_train cannot be None")
# self._n_inputs = _num_features(X_test) ####
if X_test is not None:
self._n_inputs = _num_features(X_test)
else:
self._n_inputs = 1 # just to create the regressor space base
xlag_code = self._list_input_regressor_code(model_code)
ylag_code = self._list_output_regressor_code(model_code)
self.xlag = self._get_lag_from_regressor_code(xlag_code)
self.ylag = self._get_lag_from_regressor_code(ylag_code)
self.max_lag = max(self.xlag, self.ylag)
if self._n_inputs != 1:
self.xlag = self._n_inputs * [list(range(1, self.max_lag + 1))]
# for MetaMSS NAR modelling
if self.model_type == "NAR" and forecast_horizon is None:
forecast_horizon = y_test.shape[0] - self.max_lag
self.non_degree = model_code.shape[1]
regressor_code = self.regressor_space(
self.non_degree, self.xlag, self.ylag, self._n_inputs, self.model_type
)
self.pivv = self._get_index_from_regressor_code(regressor_code, model_code)
self.final_model = regressor_code[self.pivv]
# to use in the predict function
self.n_terms = self.final_model.shape[0]
if self.estimate_parameter and not self.calculate_err:
if self.model_type == "NARMAX":
self.max_lag = self._get_max_lag(ylag=self.ylag, xlag=self.xlag)
lagged_data = self.build_input_output_matrix(
X_train, y_train, self.xlag, self.ylag
)
elif self.model_type == "NAR":
lagged_data = self.build_output_matrix(y_train, self.ylag)
self.max_lag = self._get_max_lag(ylag=self.ylag)
elif self.model_type == "NFIR":
lagged_data = self.build_input_matrix(X_train, self.xlag)
self.max_lag = self._get_max_lag(xlag=self.xlag)
else:
raise ValueError(
"Unrecognized model type. The model_type should be NARMAX, NAR or NFIR."
)
psi = self.basis_function.fit(
lagged_data, self.max_lag, predefined_regressors=self.pivv
)
self.theta = getattr(self, self.estimator)(psi, y_train)
if self._extended_least_squares is True:
self.theta = self._unbiased_estimator(
psi, y_train, self.theta, self.non_degree, self.elag, self.max_lag
)
self.err = self.n_terms * [0]
elif not self.estimate_parameter:
self.theta = theta
self.err = self.n_terms * [0]
else:
if self.model_type == "NARMAX":
self.max_lag = self._get_max_lag(ylag=self.ylag, xlag=self.xlag)
lagged_data = self.build_input_output_matrix(
X_train, y_train, self.xlag, self.ylag
)
elif self.model_type == "NAR":
lagged_data = self.build_output_matrix(y_train, self.ylag)
self.max_lag = self._get_max_lag(ylag=self.ylag)
elif self.model_type == "NFIR":
lagged_data = self.build_input_matrix(X_train, self.xlag)
self.max_lag = self._get_max_lag(xlag=self.xlag)
else:
raise ValueError(
"Unrecognized model type. The model_type should be NARMAX, NAR or NFIR."
)
psi = self.basis_function.fit(
lagged_data, self.max_lag, predefined_regressors=self.pivv
)
_, self.err, self.pivv, _ = self.error_reduction_ratio(
psi, y_train, self.n_terms, self.final_model
)
self.theta = getattr(self, self.estimator)(psi, y_train)
if self._extended_least_squares is True:
self.theta = self._unbiased_estimator(
psi, y_train, self.theta, self.non_degree, self.elag, self.max_lag
)
# yhat = self.predict(X_test, y_test, steps_ahead)
# return yhat
if self.basis_function.__class__.__name__ == "Polynomial":
if steps_ahead is None:
return self._model_prediction(
X_test, y_test, forecast_horizon=forecast_horizon
)
elif steps_ahead == 1:
return self._one_step_ahead_prediction(X_test, y_test)
else:
_check_positive_int(steps_ahead, "steps_ahead")
return self._n_step_ahead_prediction(
X_test, y_test, steps_ahead=steps_ahead
)
[docs] def error_reduction_ratio(self, psi, y, process_term_number, regressor_code):
"""Perform the Error Reduction Ration algorithm.
Parameters
----------
y : array-like of shape = n_samples
The target data used in the identification process.
psi : ndarray of floats
The information matrix of the model.
process_term_number : int
Number of Process Terms defined by the user.
Returns
-------
err : array-like of shape = number_of_model_elements
The respective ERR calculated for each regressor.
piv : array-like of shape = number_of_model_elements
Contains the index to put the regressors in the correct order
based on err values.
psi_orthogonal : ndarray of floats
The updated and orthogonal information matrix.
References
----------
.. [1] Manuscript: Orthogonal least squares methods and their application
to non-linear system identification
https://eprints.soton.ac.uk/251147/1/778742007_content.pdf
.. [2] Manuscript (portuguese): Identificação de Sistemas não Lineares
Utilizando Modelos NARMAX Polinomiais – Uma Revisão
e Novos Resultados
"""
squared_y = np.dot(y[self.max_lag :].T, y[self.max_lag :])
tmp_psi = psi.copy()
y = y[self.max_lag :, 0].reshape(-1, 1)
tmp_y = y.copy()
dimension = tmp_psi.shape[1]
piv = np.arange(dimension)
tmp_err = np.zeros(dimension)
err = np.zeros(dimension)
for i in np.arange(0, dimension):
for j in np.arange(i, dimension):
# Add `eps` in the denominator to omit division by zero if
# denominator is zero
tmp_err[j] = (np.dot(tmp_psi[i:, j].T, tmp_y[i:]) ** 2) / (
np.dot(tmp_psi[i:, j].T, tmp_psi[i:, j]) * squared_y + self._eps
)
if i == process_term_number:
break
piv_index = np.argmax(tmp_err[i:]) + i
err[i] = tmp_err[piv_index]
tmp_psi[:, [piv_index, i]] = tmp_psi[:, [i, piv_index]]
piv[[piv_index, i]] = piv[[i, piv_index]]
v = self._house(tmp_psi[i:, i])
row_result = self._rowhouse(tmp_psi[i:, i:], v)
tmp_y[i:] = self._rowhouse(tmp_y[i:], v)
tmp_psi[i:, i:] = np.copy(row_result)
tmp_piv = piv[0:process_term_number]
psi_orthogonal = psi[:, tmp_piv]
model_code = regressor_code[tmp_piv, :].copy()
return model_code, err, piv, psi_orthogonal