# Source code for sysidentpy.base

```"""Base classes for NARX estimator."""

# Authors:
#           Wilson Rocha Lacerda Junior <wilsonrljr@outlook.com>
#           Luan Pascoal da Costa Andrade <luan_pascoal13@hotmail.com>
#           Samuel Carlos Pessoa Oliveira <samuelcpoliveira@gmail.com>
#           Samir Angelo Milani Martins <martins@ufsj.edu.br>
# License: BSD 3 clause

import numpy as np
from itertools import combinations_with_replacement
from itertools import chain
from collections import Counter

def _get_max_lag(ylag, xlag):
"""Get the max lag defined by the user.

Parameters
----------
ylag : int
The maximum lag of output regressors.
xlag : int
The maximum lag of input regressors.

Returns
-------
max_lag = int
The max lag value defined by the user.
"""
ny = np.max(list(chain.from_iterable([[ylag]])))
nx = np.max(list(chain.from_iterable([[xlag]])))
return np.max([ny, np.max(nx)])

[docs]class GenerateRegressors:
"""Polynomial NARMAX model

Provides the main functions to generate the regressor dictionary
and regressor codes for polynomial basis.
"""

[docs]    def regressor_space(self, non_degree, xlag, ylag, n_inputs):
"""Create the code representation of the regressors.

This function generates a codification from all possibles
regressors given the maximum lag of the input and output.
This is used to write the final terms of the model in a
readable form. [1001] -> y(k-1).
This code format was based on a dissertation from UFMG. See
reference below.

Parameters
----------
non_degree : int
The desired maximum nonlinearity degree.
ylag : int
The maximum lag of output regressors.
xlag : int
The maximum lag of input regressors.

Returns
-------
max_lag : int
This value can be used by another functions.
regressor_code : ndarray of int
Matrix codification of all possible regressors.

Examples
--------
The codification is defined as:

>>> 100n = y(k-n)
>>> 200n = u(k-n)
>>> [100n 100n] = y(k-n)y(k-n)
>>> [200n 200n] = u(k-n)u(k-n)

References
----------
[1] Master Thesis: Barbosa, Alípio Monteiro.
Técnicas de otimizaçao bi-objetivo para a determinaçao
da estrutura de modelos NARX (2010).

"""
if not isinstance(non_degree, int) or non_degree < 1:
raise ValueError(
"non_degree must be integer and > zero. Got %f" % non_degree
)

if not isinstance(ylag, (int, list)) or np.min(np.minimum(ylag, 1)) < 1:
raise ValueError("ylag must be integer or list and > zero. Got %f" % ylag)

if (
not isinstance(xlag, (int, list))
# or np.min(np.minimum(xlag, 1)) < 1):
or np.min(np.min(list(chain.from_iterable([[xlag]])))) < 1
):
raise ValueError("xlag must be integer or list and > zero. Got %f" % xlag)

if not isinstance(n_inputs, int) or n_inputs < 1:
raise ValueError("n_inputs must be integer and > zero. Got %f" % n_inputs)

if isinstance(ylag, list):
# create only the lags passed from list
y_vec = []
y_vec.extend([lag + 1000 for lag in ylag])
y_vec = np.array(y_vec)
else:
# create a range of lags if passed a int value
y_vec = np.arange(1001, 1001 + ylag)

if isinstance(xlag, list) and n_inputs == 1:
# create only the lags passed from list
x_vec_tmp = []
x_vec_tmp.extend([lag + 2000 for lag in xlag])
x_vec_tmp = np.array(x_vec_tmp)
elif isinstance(xlag, int) and n_inputs == 1:
# create a range of lags if passed a int value
x_vec_tmp = np.arange(2001, 2001 + xlag)
elif n_inputs > 1:
# only list are allowed if n_inputs > 1
# the user must entered list of the desired lags explicity
x_vec_tmp = []
for i in range(n_inputs):
if isinstance(xlag[i], list) and n_inputs > 1:
# create 200n, 300n,..., 400n to describe each input
x_vec_tmp.extend([lag + 2000 + i * 1000 for lag in xlag[i]])
elif isinstance(xlag[i], int) and n_inputs > 1:
x_vec_tmp.extend(
[np.arange(2001 + i * 1000, 2001 + i * 1000 + xlag[i])]
)

reg_aux = np.array([0])
if n_inputs > 1:
# if x_vec is a nested list, ensure all elements are arrays
all_arrays = [np.array([i]) if isinstance(i, int) else i for i in x_vec_tmp]
x_vec = np.concatenate([i for i in all_arrays])
else:
x_vec = x_vec_tmp

reg_aux = np.concatenate([reg_aux, y_vec, x_vec])

regressor_code = list(combinations_with_replacement(reg_aux, non_degree))

regressor_code = np.array(regressor_code)
regressor_code = regressor_code[:, regressor_code.shape[1] :: -1]
max_lag = _get_max_lag(ylag, xlag)
return regressor_code, max_lag

[docs]class HouseHolder:
"""Householder reflection and transformation."""

def _house(self, x):
"""Perform a Househoulder reflection of vector.

Parameters
----------
x : array-like of shape = number_of_training_samples
The respective column of the matrix of regressors in each
iteration of ERR function.

Returns
-------
v : array-like of shape = number_of_training_samples
The reflection of the array x.

References
----------
[1] Manuscript: Chen, S., Billings, S. A., & Luo, W. (1989).
Orthogonal least squares methods and their application to non-linear system identification.

"""
u = np.linalg.norm(x, 2)
if u != 0:
aux_b = x[0] + np.sign(x[0]) * u
x = x[1:] / aux_b
x = np.concatenate((np.array([1]), x))
return x

def _rowhouse(self, RA, v):
"""Perform a row Househoulder transformation.

Parameters
----------
RA : array-like of shape = number_of_training_samples
The respective column of the matrix of regressors in each
iteration of ERR function.
v : array-like of shape = number_of_training_samples
The reflected vector obtained by using the householder reflection.

Returns
-------
B : array-like of shape = number_of_training_samples

References
----------
[1] Manuscript: Chen, S., Billings, S. A., & Luo, W. (1989).
Orthogonal least squares methods and their application to
non-linear system identification. International Journal of
control, 50(5), 1873-1896.

"""
b = -2 / np.dot(v.T, v)
w = b * np.dot(RA.T, v)
w = w.reshape(1, -1)
v = v.reshape(-1, 1)
RA = RA + v * w
B = RA
return B

[docs]class InformationMatrix:
"""Class for methods regarding preprocessing of columns"""

[docs]    def shift_column(self, col_to_shift, lag):
"""Shift values based on a lag.

Parameters
----------
col_to_shift : array-like of shape = n_samples
The samples of the input or output.
lag : int
The respective lag of the regressor.

Returns
-------
tmp_column : array-like of shape = n_samples
The shifted array of the input or output.

Examples
--------
>>> y = [1, 2, 3, 4, 5]
>>> shift_column(y, 1)
[0, 1, 2, 3, 4]

"""
n_samples = col_to_shift.shape[0]
tmp_column = np.zeros((n_samples, 1))
aux = col_to_shift[0 : n_samples - lag]
aux = np.reshape(aux, (len(aux), 1))
tmp_column[lag:, 0] = aux[:, 0]
return tmp_column

def _process_xlag(self, X, xlag):
"""Create the list of lags to be used for the inputs

Parameters
----------
X : array-like
Input data used on training phase.
xlag : int
The maximum lag of input regressors.

Returns
-------
x_lag : ndarray of int
The range of lags according to user definition.

"""
n_inputs = X.shape[1]
if isinstance(xlag, int) and n_inputs > 1:
raise ValueError(
"If n_inputs > 1, xlag must be a nested list. Got %f" % xlag
)

if isinstance(xlag, int):
xlag = range(1, xlag + 1)

return n_inputs, xlag

def _process_ylag(self, ylag):
"""Create the list of lags to be used for the outputs

Parameters
----------
ylag : int, list
The maximum lag of input regressors.

Returns
-------
y_lag : ndarray of int
The range of lags according to user definition.

"""
if isinstance(ylag, int):
ylag = range(1, ylag + 1)

return ylag

def _create_lagged_X(self, X, xlag, n_inputs):
"""Create a lagged matrix of inputs without combinations.

Parameters
----------
X : array-like
Input data used on training phase.
xlag : int
The maximum lag of input regressors.
n_inputs : int
Number of input variables.

Returns
-------
x_lagged : ndarray of floats
A lagged input matrix formed by the input regressors
without combinations.

"""
if n_inputs == 1:
x_lagged = np.column_stack(
[self.shift_column(X[:, 0], lag) for lag in xlag]
)
else:
x_lagged = np.zeros([len(X), 1])  # just to stack other columns
# if user input a nested list like [[1, 2], 4], the following
# line convert it to [[1, 2], [4]].
# Remember, for multiple inputs all lags must be entered explicity
xlag = [[i] if isinstance(i, int) else i for i in xlag]
for col in range(n_inputs):
x_lagged_col = np.column_stack(
[self.shift_column(X[:, col], lag) for lag in xlag[col]]
)
x_lagged = np.column_stack([x_lagged, x_lagged_col])

x_lagged = x_lagged[:, 1:]  # remove the column of 0 created above

return x_lagged

def _create_lagged_y(self, y, ylag):
"""Create a lagged matrix of the output without combinations.

Parameters
----------
y : array-like
Output data used on training phase.
ylag : int
The maximum lag of output regressors.

Returns
-------
y_lagged : ndarray of floats
A lagged input matrix formed by the output regressors
without combinations.

"""
y_lagged = np.column_stack([self.shift_column(y[:, 0], lag) for lag in ylag])
return y_lagged

[docs]    def initial_lagged_matrix(self, X, y, xlag, ylag):
"""Build a lagged matrix concerning each lag for each column.

Parameters
----------
model : ndarray of int
The model code representation.
y : array-like
Target data used on training phase.
X : array-like
Input data used on training phase.
ylag : int
The maximum lag of output regressors.
xlag : int
The maximum lag of input regressors.

Returns
-------
lagged_data : ndarray of floats
The lagged matrix built in respect with each lag and column.

Examples
--------
Let X and y be the input and output values of shape Nx1.
If the chosen lags are 2 for both input and output
the initial lagged matrix will be formed by Y[k-1], Y[k-2],
X[k-1], and X[k-2].

"""
n_inputs, xlag = self._process_xlag(X, xlag)

ylag = self._process_ylag(ylag)

x_lagged = self._create_lagged_X(X, xlag, n_inputs)

y_lagged = self._create_lagged_y(y, ylag)
lagged_data = np.concatenate([y_lagged, x_lagged], axis=1)
return lagged_data

[docs]    def build_information_matrix(self, X, y, xlag, ylag, non_degree, predefined_regressors=None):
"""Build the information matrix.

Each columns of the information matrix represents a candidate
regressor. The set of candidate regressors are based on xlag,
ylag, and non_degree entered by the user.

Parameters
----------
model : ndarray of int
The model code representation.
y : array-like
Target data used on training phase.
X : array-like
Input data used on training phase.
ylag : int
The maximum lag of output regressors.
xlag : int
The maximum lag of input regressors.
non_degree : int
The desired maximum nonlinearity degree.

Returns
-------
lagged_data = ndarray of floats
The lagged matrix built in respect with each lag and column.

"""
max_lag = _get_max_lag(ylag, xlag)

# Generate a lagged data which each column is a input or output
# related to its respective lags. With this approach we can create
# the information matrix by using all possible combination of
# the columns as a product in the iterations
lagged_data = self.initial_lagged_matrix(X, y, xlag=xlag, ylag=ylag)

constant = np.ones([lagged_data.shape[0], 1])
data = np.concatenate([constant, lagged_data], axis=1)

# Create combinations of all columns based on its index
iterable_list = range(data.shape[1])
combinations = list(combinations_with_replacement(iterable_list, non_degree))
if predefined_regressors is not None:
combinations = [combinations[index] for index in predefined_regressors]

psi = np.column_stack(
[
np.prod(data[:, combinations[i]], axis=1)
for i in range(len(combinations))
]
)
psi = psi[max_lag:, :]
return psi

class ModelInformation:
def _list_output_regressor_code(self, model_code):
"""Create a flattened array of input or output regressors.

Parameters
----------
model_code : ndarray of int
Model defined by the user to simulate.

Returns
-------
model_code : ndarray of int
Flattened list of output regressors.

"""
regressor_code = [
code for code in model_code.ravel() if (code != 0) and (str(code)[0] == "1")
]

return np.asarray(regressor_code)

def _list_input_regressor_code(self, model_code):
"""Create a flattened array of input or output regressors.

Parameters
----------
model_code : ndarray of int
Model defined by the user to simulate.

Returns
-------
model_code : ndarray of int
Flattened list of output regressors.

"""
regressor_code = [
code for code in model_code.ravel() if (code != 0) and (str(code)[0] != "1")
]
return np.asarray(regressor_code)

def _get_lag_from_regressor_code(self, regressors):
"""Get the maximum lag from array of regressors.

Parameters
----------
regressors : ndarray of int
Flattened list of input or output regressors.

Returns
-------
max_lag : int
Maximum lag of list of regressors.

"""
lag_list = [
int(i) for i in regressors.astype("str") for i in [np.sum(int(i[2:]))]
]
if len(lag_list) != 0:
return max(lag_list)
else:
return 1

def _get_max_lag_from_model_code(self, model_code):
xlag_code = self._list_input_regressor_code(model_code)
ylag_code = self._list_output_regressor_code(model_code)
xlag = self._get_lag_from_regressor_code(xlag_code)
ylag = self._get_lag_from_regressor_code(ylag_code)
return max(xlag, ylag)
```