class BaseMSS(RegressorDictionary, metaclass=ABCMeta):
"""Base class for Model Structure Selection."""
@abstractmethod
def __init__(self):
super().__init__(self)
self.max_lag = None
self.n_inputs = None
self.theta = None
self.final_model = None
self.pivv = None
self._polynomial_narmax_predict_cache = None
self._polynomial_narmax_predict_cache_key = None
@abstractmethod
def fit(self, *, X, y):
"""Abstract method."""
@abstractmethod
def predict(
self,
*,
X: Optional[np.ndarray] = None,
y: np.ndarray,
steps_ahead: Optional[int] = None,
forecast_horizon: int = 1,
) -> np.ndarray:
"""Abstract method."""
def _predict_on_cpu(
self,
*,
X: Optional[np.ndarray],
y: np.ndarray,
steps_ahead: Optional[int],
forecast_horizon: Optional[int],
original_xp,
target_device,
) -> np.ndarray:
"""Run predict on CPU and convert the result back to the original backend.
Sequential NARX prediction (free-run and n-step-ahead) is inherently
recursive: y[t] depends on y[t-1]. Each iteration operates on a tiny
regressor vector, so GPU kernel-launch overhead dominates and makes the
loop ~20-30x slower than NumPy. This helper transparently moves inputs
to CPU, runs the existing (fast) NumPy predict path, and returns the
result in the caller's original namespace/device.
"""
X_np = _to_numpy(X) if X is not None else None
y_np = _to_numpy(y)
original_theta = self.theta
try:
if self.theta is not None:
self.theta = _to_numpy(self.theta)
yhat_np = self.predict(
X=X_np,
y=y_np,
steps_ahead=steps_ahead,
forecast_horizon=forecast_horizon,
)
finally:
self.theta = original_theta
return _asarray(
yhat_np, xp=original_xp, dtype=y.dtype, target_device=target_device
)
def _code2exponents(self, *, code: np.ndarray) -> np.ndarray:
"""Convert regressor code to exponents array.
Parameters
----------
code : 1D-array of int
Codification of one regressor.
Returns
-------
exponents = ndarray of ints
"""
regressors = np.array(list(set(code)))
regressors_count = Counter(code)
if np.all(regressors == 0):
return np.zeros(self.max_lag * (1 + self.n_inputs))
exponents = np.array([], dtype=float)
elements = np.round(np.divide(regressors, 1000), 0)[(regressors > 0)].astype(
int
)
for j in range(1, self.n_inputs + 2):
base_exponents = np.zeros(self.max_lag, dtype=float)
if j in elements:
for i in range(1, self.max_lag + 1):
regressor_code = int(j * 1000 + i)
base_exponents[-i] = regressors_count[regressor_code]
exponents = np.append(exponents, base_exponents)
else:
exponents = np.append(exponents, base_exponents)
return exponents
def _get_polynomial_narmax_predict_cache_key(self):
final_model = np.asarray(self.final_model)
degree = getattr(self.basis_function, "degree", None)
return (
self.model_type,
self.max_lag,
self.n_inputs,
degree,
final_model.shape,
final_model.dtype.str,
final_model.tobytes(),
)
def _get_polynomial_narmax_predict_exponents(self) -> np.ndarray:
cache_key = self._get_polynomial_narmax_predict_cache_key()
cached_key = getattr(self, "_polynomial_narmax_predict_cache_key", None)
if cached_key != cache_key or not hasattr(
self, "_polynomial_narmax_predict_cache"
):
final_model = np.asarray(self.final_model)
if final_model.shape[0] == 0:
exponent_matrix = np.zeros(
(0, self.max_lag * (1 + self.n_inputs)),
dtype=float,
)
else:
exponent_matrix = np.vstack(
[self._code2exponents(code=model) for model in final_model]
)
self._polynomial_narmax_predict_cache = exponent_matrix
self._polynomial_narmax_predict_cache_key = cache_key
return self._polynomial_narmax_predict_cache
def _should_use_polynomial_narmax_fast_path(
self,
x: Optional[np.ndarray],
y_initial: np.ndarray,
forecast_horizon: int,
) -> bool:
if x is None:
return False
has_supported_state = (
self.model_type == "NARMAX"
and isinstance(self.basis_function, Polynomial)
and self.max_lag is not None
and self.n_inputs is not None
and self.theta is not None
and self.final_model is not None
and self.n_inputs > 0
and y_initial.shape[0] > self.max_lag
and forecast_horizon >= self.max_lag
)
if not has_supported_state:
return False
xp = get_namespace(x, y_initial)
namespace_name = getattr(xp, "__name__", xp.__class__.__name__)
return (
_is_numpy_namespace(xp)
or "torch" in namespace_name
or "cupy" in namespace_name
)
def _shift_regressor_block(
self,
xp,
raw_regressor,
start: int,
stop: int,
value,
):
if stop - start > 1:
raw_regressor[start : stop - 1] = _copy(
xp,
raw_regressor[start + 1 : stop],
)
raw_regressor[stop - 1] = value
def _polynomial_narmax_predict_fast(
self,
x: np.ndarray,
y_initial: np.ndarray,
forecast_horizon: int,
) -> np.ndarray:
xp = get_namespace(x, y_initial)
_dtype = x.dtype
target_device = _device(x, y_initial)
x = xp.reshape(x, (-1, self.n_inputs))
n_predictions = forecast_horizon - self.max_lag
if n_predictions <= 0:
return xp.reshape(
_zeros(xp, 0, dtype=_dtype, target_device=target_device),
(-1, 1),
)
model_exponents = _asarray(
self._get_polynomial_narmax_predict_exponents(),
xp=xp,
dtype=_dtype,
target_device=target_device,
)
raw_regressor = _zeros(
xp,
model_exponents.shape[1],
dtype=_dtype,
target_device=target_device,
)
raw_regressor[: self.max_lag] = y_initial[: self.max_lag, 0]
for input_index in range(self.n_inputs):
start = self.max_lag * (1 + input_index)
stop = start + self.max_lag
raw_regressor[start:stop] = x[: self.max_lag, input_index]
theta = xp.reshape(
_asarray(
self.theta,
xp=xp,
dtype=_dtype,
target_device=target_device,
),
(-1,),
)
predictions = _zeros(
xp,
n_predictions,
dtype=_dtype,
target_device=target_device,
)
for step in range(n_predictions):
regressor_powers = _pow(xp, raw_regressor, model_exponents)
regressor_value = xp.prod(regressor_powers, axis=1)
y_next = regressor_value @ theta
predictions[step] = y_next
if step == n_predictions - 1:
continue
self._shift_regressor_block(
xp,
raw_regressor,
0,
self.max_lag,
y_next,
)
new_input_index = self.max_lag + step
for input_index in range(self.n_inputs):
start = self.max_lag * (1 + input_index)
stop = start + self.max_lag
self._shift_regressor_block(
xp,
raw_regressor,
start,
stop,
x[new_input_index, input_index],
)
return xp.reshape(predictions, (-1, 1))
def _narmax_predict_reference(
self,
x: np.ndarray,
y_initial: np.ndarray,
forecast_horizon: int = 1,
) -> np.ndarray:
"""Return the reference recursive NARMAX prediction."""
xp = get_namespace(x, y_initial)
_dtype = x.dtype if x is not None else y_initial.dtype
target_device = _device(x, y_initial)
y_output = _zeros(
xp,
forecast_horizon,
dtype=_dtype,
target_device=target_device,
)
y_output = y_output * float("nan")
y_output[: self.max_lag] = y_initial[: self.max_lag, 0]
model_exponents = _vstack(
xp,
[
_asarray(
self._code2exponents(code=model),
xp=xp,
target_device=target_device,
)
for model in self.final_model
],
)
raw_regressor = _zeros(
xp,
model_exponents.shape[1],
dtype=_dtype,
target_device=target_device,
)
theta = xp.reshape(
_asarray(
self.theta,
xp=xp,
dtype=_dtype,
target_device=target_device,
),
(-1,),
)
for i in range(self.max_lag, forecast_horizon):
init = 0
final = self.max_lag
k = int(i - self.max_lag)
raw_regressor[:final] = y_output[k:i]
for j in range(self.n_inputs):
init += self.max_lag
final += self.max_lag
raw_regressor[init:final] = x[k:i, j]
regressor_powers = _pow(xp, raw_regressor, model_exponents)
regressor_value = xp.prod(regressor_powers, axis=1)
y_output[i] = regressor_value @ theta
return xp.reshape(y_output[self.max_lag : :], (-1, 1))
def _one_step_ahead_prediction(
self,
x_base: np.ndarray,
y: Optional[np.ndarray] = None,
) -> np.ndarray:
"""Perform the 1-step-ahead prediction of a model.
Parameters
----------
x_base : ndarray of floats of shape = n_samples
Regressor matrix with input-output arrays.
y : ndarray, optional
Unused placeholder to keep signature compatible with subclasses.
Returns
-------
yhat : ndarray of floats
The 1-step-ahead predicted values of the model.
"""
_ = y # keeps signature aligned with subclasses
xp = get_namespace(x_base)
theta = xp.reshape(
_asarray(
self.theta,
xp=xp,
dtype=x_base.dtype,
target_device=_device(x_base),
),
(-1,),
)
yhat = x_base @ theta
return xp.reshape(yhat, (-1, 1))
@abstractmethod
def _model_prediction(
self,
x: Optional[np.ndarray],
y_initial: np.ndarray,
forecast_horizon: int = 1,
) -> np.ndarray:
"""Model prediction wrapper."""
def _narmax_predict(
self,
x: np.ndarray,
y_initial: np.ndarray,
forecast_horizon: int = 1,
) -> np.ndarray:
"""narmax_predict method."""
if self._should_use_polynomial_narmax_fast_path(
x,
y_initial,
forecast_horizon,
):
return self._polynomial_narmax_predict_fast(
x,
y_initial,
forecast_horizon,
)
return self._narmax_predict_reference(x, y_initial, forecast_horizon)
@abstractmethod
def _nfir_predict(self, x: np.ndarray, y_initial: np.ndarray) -> np.ndarray:
"""Nfir predict method."""
xp = get_namespace(x, y_initial)
target_device = _device(x, y_initial)
y_output = _zeros(xp, x.shape[0], dtype=x.dtype, target_device=target_device)
y_output = y_output * float("nan")
y_output[: self.max_lag] = y_initial[: self.max_lag, 0]
x = xp.reshape(x, (-1, self.n_inputs))
model_exponents = _vstack(
xp,
[
_asarray(
self._code2exponents(code=model),
xp=xp,
target_device=target_device,
)
for model in self.final_model
],
)
raw_regressor = _zeros(
xp,
model_exponents.shape[1],
dtype=x.dtype,
target_device=target_device,
)
theta = xp.reshape(
_asarray(
self.theta,
xp=xp,
dtype=x.dtype,
target_device=target_device,
),
(-1,),
)
for i in range(self.max_lag, x.shape[0]):
init = 0
final = self.max_lag
k = int(i - self.max_lag)
raw_regressor[:final] = y_output[k:i]
for j in range(self.n_inputs):
init += self.max_lag
final += self.max_lag
raw_regressor[init:final] = x[k:i, j]
regressor_powers = _pow(xp, raw_regressor, model_exponents)
regressor_value = xp.prod(regressor_powers, axis=1)
y_output[i] = regressor_value @ theta
return xp.reshape(y_output[self.max_lag : :], (-1, 1))
def _nar_step_ahead(self, y: np.ndarray, steps_ahead: int) -> np.ndarray:
xp = get_namespace(y)
if len(y) < self.max_lag:
raise ValueError(
"Insufficient initial condition elements! Expected at least"
f" {self.max_lag} elements."
)
to_remove = math.ceil((len(y) - self.max_lag) / steps_ahead)
yhat_length = len(y) + steps_ahead
yhat = _zeros(xp, yhat_length, dtype=y.dtype, target_device=_device(y))
yhat = yhat * float("nan")
yhat[: self.max_lag] = y[: self.max_lag, 0]
i = self.max_lag
steps = [step for step in range(0, to_remove * steps_ahead, steps_ahead)]
if len(steps) > 1:
for step in steps[:-1]:
yhat[i : i + steps_ahead] = xp.reshape(
self._model_prediction(
x=None, y_initial=y[step:i], forecast_horizon=steps_ahead
)[-steps_ahead:],
(-1,),
)
i += steps_ahead
steps_ahead = int(xp.sum(xp.asarray(xp.isnan(yhat), dtype=xp.int32)))
yhat[i : i + steps_ahead] = xp.reshape(
self._model_prediction(x=None, y_initial=y[steps[-1] : i])[
-steps_ahead:
],
(-1,),
)
else:
yhat[i : i + steps_ahead] = xp.reshape(
self._model_prediction(
x=None, y_initial=y[0:i], forecast_horizon=steps_ahead
)[-steps_ahead:],
(-1,),
)
yhat = xp.reshape(yhat, (-1,))[self.max_lag : :]
return xp.reshape(yhat, (-1, 1))
def narmax_n_step_ahead(
self,
x: np.ndarray,
y: np.ndarray,
steps_ahead: Optional[int],
) -> np.ndarray:
"""n_steps ahead prediction method for NARMAX model."""
xp = get_namespace(x, y)
if len(y) < self.max_lag:
raise ValueError(
"Insufficient initial condition elements! Expected at least"
f" {self.max_lag} elements."
)
to_remove = math.ceil((len(y) - self.max_lag) / steps_ahead)
x = xp.reshape(x, (-1, self.n_inputs))
yhat = _zeros(xp, x.shape[0], dtype=x.dtype, target_device=_device(x, y))
yhat = yhat * float("nan")
yhat[: self.max_lag] = y[: self.max_lag, 0]
i = self.max_lag
steps = [step for step in range(0, to_remove * steps_ahead, steps_ahead)]
if len(steps) > 1:
for step in steps[:-1]:
yhat[i : i + steps_ahead] = xp.reshape(
self._model_prediction(
x=x[step : i + steps_ahead],
y_initial=y[step:i],
)[-steps_ahead:],
(-1,),
)
i += steps_ahead
steps_ahead = int(xp.sum(xp.asarray(xp.isnan(yhat), dtype=xp.int32)))
yhat[i : i + steps_ahead] = xp.reshape(
self._model_prediction(
x=x[steps[-1] : i + steps_ahead],
y_initial=y[steps[-1] : i],
)[-steps_ahead:],
(-1,),
)
else:
yhat[i : i + steps_ahead] = xp.reshape(
self._model_prediction(
x=x[0 : i + steps_ahead],
y_initial=y[0:i],
)[-steps_ahead:],
(-1,),
)
yhat = xp.reshape(yhat, (-1,))[self.max_lag : :]
return xp.reshape(yhat, (-1, 1))
@abstractmethod
def _n_step_ahead_prediction(
self,
x: Optional[np.ndarray],
y: Optional[np.ndarray],
steps_ahead: Optional[int],
) -> np.ndarray:
"""Perform the n-steps-ahead prediction of a model.
Parameters
----------
y : array-like of shape = max_lag
Initial conditions values of the model
to start recursive process.
x : ndarray of floats of shape = n_samples
Vector with input values to be used in model simulation.
steps_ahead : int (default = None)
The user can use free run simulation, one-step ahead prediction
and n-step ahead prediction.
Returns
-------
yhat : ndarray of floats
Predicted values for NARMAX and NAR models.
"""
if self.model_type == "NARMAX":
return self.narmax_n_step_ahead(x, y, steps_ahead)
if self.model_type == "NAR":
return self._nar_step_ahead(y, steps_ahead)
raise ValueError(
"n_steps_ahead prediction will be implemented for NFIR models in v0.4.*"
)
@abstractmethod
def _basis_function_predict(
self,
x: Optional[np.ndarray],
y_initial: np.ndarray,
forecast_horizon: int = 1,
) -> np.ndarray:
"""Basis function prediction."""
xp = get_namespace(y_initial)
yhat = _zeros(
xp,
forecast_horizon,
dtype=y_initial.dtype,
target_device=_device(x, y_initial),
)
yhat = yhat * float("nan")
yhat[: self.max_lag] = y_initial[: self.max_lag, 0]
# Discard unnecessary initial values
analyzed_elements_number = self.max_lag + 1
for i in range(forecast_horizon - self.max_lag):
if self.model_type == "NARMAX":
lagged_data = build_input_output_matrix(
x[i : i + analyzed_elements_number],
xp.reshape(yhat[i : i + analyzed_elements_number], (-1, 1)),
self.xlag,
self.ylag,
)
elif self.model_type == "NAR":
lagged_data = build_output_matrix(
xp.reshape(yhat[i : i + analyzed_elements_number], (-1, 1)),
self.ylag,
)
elif self.model_type == "NFIR":
lagged_data = build_input_matrix(
x[i : i + analyzed_elements_number], self.xlag
)
else:
raise ValueError(
f"model_type must be NARMAX, NAR or NFIR. Got {self.model_type}"
)
x_tmp = self.basis_function.transform(
lagged_data,
self.max_lag,
self.ylag,
self.xlag,
self.model_type,
predefined_regressors=self.pivv[: len(self.final_model)],
)
a = x_tmp @ self.theta
yhat[i + self.max_lag] = a.item()
return xp.reshape(yhat[self.max_lag :], (-1, 1))
@abstractmethod
def _basis_function_n_step_prediction(
self,
x: Optional[np.ndarray],
y: np.ndarray,
steps_ahead: int,
forecast_horizon: int,
) -> np.ndarray:
"""Basis function n step ahead."""
xp = get_namespace(y)
yhat = _zeros(xp, forecast_horizon, dtype=y.dtype, target_device=_device(x, y))
yhat = yhat * float("nan")
yhat[: self.max_lag] = y[: self.max_lag, 0]
# Discard unnecessary initial values
i = self.max_lag
while i < len(y):
k = int(i - self.max_lag)
if i + steps_ahead > len(y):
steps_ahead = len(y) - i # predicts the remaining values
if self.model_type == "NARMAX":
yhat[i : i + steps_ahead] = xp.reshape(
self._basis_function_predict(
x[k : i + steps_ahead],
y[k : i + steps_ahead],
forecast_horizon=forecast_horizon,
)[-steps_ahead:],
(-1,),
)
elif self.model_type == "NAR":
yhat[i : i + steps_ahead] = xp.reshape(
self._basis_function_predict(
x=None,
y_initial=y[k : i + steps_ahead],
forecast_horizon=forecast_horizon,
)[-forecast_horizon : -forecast_horizon + steps_ahead],
(-1,),
)
elif self.model_type == "NFIR":
yhat[i : i + steps_ahead] = xp.reshape(
self._basis_function_predict(
x=x[k : i + steps_ahead],
y_initial=y[k : i + steps_ahead],
forecast_horizon=forecast_horizon,
)[-steps_ahead:],
(-1,),
)
else:
raise ValueError(
f"model_type must be NARMAX, NAR or NFIR. Got {self.model_type}"
)
i += steps_ahead
return xp.reshape(yhat[self.max_lag :], (-1, 1))
def _basis_function_n_steps_horizon(
self,
x: Optional[np.ndarray],
y: np.ndarray,
steps_ahead: int,
forecast_horizon: int,
) -> np.ndarray:
"""Basis n steps horizon."""
xp = get_namespace(y)
yhat = _zeros(xp, forecast_horizon, dtype=y.dtype, target_device=_device(x, y))
yhat = yhat * float("nan")
yhat[: self.max_lag] = y[: self.max_lag, 0]
# Discard unnecessary initial values
i = self.max_lag
while i < len(y):
k = int(i - self.max_lag)
if i + steps_ahead > len(y):
steps_ahead = len(y) - i # predicts the remaining values
if self.model_type == "NARMAX":
yhat[i : i + steps_ahead] = xp.reshape(
self._basis_function_predict(
x[k : i + steps_ahead], y[k : i + steps_ahead], forecast_horizon
)[-forecast_horizon : -forecast_horizon + steps_ahead],
(-1,),
)
elif self.model_type == "NAR":
yhat[i : i + steps_ahead] = xp.reshape(
self._basis_function_predict(
x=None,
y_initial=y[k : i + steps_ahead],
forecast_horizon=forecast_horizon,
)[-forecast_horizon : -forecast_horizon + steps_ahead],
(-1,),
)
elif self.model_type == "NFIR":
yhat[i : i + steps_ahead] = xp.reshape(
self._basis_function_predict(
x=x[k : i + steps_ahead],
y_initial=y[k : i + steps_ahead],
forecast_horizon=forecast_horizon,
)[-forecast_horizon : -forecast_horizon + steps_ahead],
(-1,),
)
else:
raise ValueError(
f"model_type must be NARMAX, NAR or NFIR. Got {self.model_type}"
)
i += steps_ahead
yhat = xp.reshape(yhat, (-1,))
return xp.reshape(yhat[self.max_lag :], (-1, 1))