Ir para o conteúdo

Documentation for OFRBase

Base methods for Orthogonal Forward Regression algorithm.

OFRBase

Bases: BaseMSS

Base class for Model Structure Selection.

Source code in sysidentpy/model_structure_selection/ofr_base.py
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
class OFRBase(BaseMSS, metaclass=ABCMeta):
    """Base class for Model Structure Selection."""

    @abstractmethod
    def __init__(
        self,
        *,
        ylag: Union[int, list] = 2,
        xlag: Union[int, list] = 2,
        elag: Union[int, list] = 2,
        order_selection: bool = True,
        info_criteria: str = "aic",
        n_terms: Union[int, None] = None,
        n_info_values: int = 15,
        estimator: Estimators = RecursiveLeastSquares(),
        basis_function: Union[Polynomial, Fourier] = Polynomial(),
        model_type: str = "NARMAX",
        eps: np.float64 = np.finfo(np.float64).eps,
        alpha: float = 0,
        err_tol: Optional[float] = None,
        apress_lambda: float = 1.0,
    ):
        self.order_selection = order_selection
        self.ylag = ylag
        self.xlag = xlag
        self.max_lag = self._get_max_lag()
        self.info_criteria = info_criteria
        self.apress_lambda = apress_lambda
        self.info_criteria_function = get_info_criteria(info_criteria, apress_lambda)
        self.n_info_values = n_info_values
        self.n_terms = n_terms
        self.estimator = estimator
        self.elag = elag
        self.model_type = model_type
        self.basis_function = basis_function
        self.eps = eps
        if isinstance(self.estimator, RidgeRegression):
            self.alpha = self.estimator.alpha
        else:
            self.alpha = alpha

        self.err_tol = err_tol
        self._validate_params()
        self.n_inputs = None
        self.regressor_code = None
        self.info_values = None
        self.err = None
        self.final_model = None
        self.theta = None
        self.pivv = None

    def _default_estimation_target(self, y: np.ndarray) -> np.ndarray:
        """Compute the standard estimation target used across MSS algorithms."""
        xp = get_namespace(y)
        return xp.reshape(y[self.max_lag :, 0], (-1, 1))

    def _unpack_mss_output(
        self,
        mss_output: Tuple[np.ndarray, ...],
        y: np.ndarray,
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
        """Normalize MSS outputs to (err, piv, psi, target)."""
        if len(mss_output) == 3:
            err, piv, psi = mss_output
            estimation_target = self._default_estimation_target(y)
        elif len(mss_output) == 4:
            err, piv, psi, estimation_target = mss_output
        else:
            raise ValueError(
                "run_mss_algorithm must return"
                " (err, piv, psi) or (err, piv, psi, target)."
            )
        return err, piv, psi, estimation_target

    def _validate_params(self):
        """Validate input params."""
        if not isinstance(self.n_info_values, int) or self.n_info_values < 1:
            raise ValueError(
                f"n_info_values must be integer and > zero. Got {self.n_info_values}"
            )

        if isinstance(self.ylag, int) and self.ylag < 1:
            raise ValueError(f"ylag must be integer and > zero. Got {self.ylag}")

        if isinstance(self.xlag, int) and self.xlag < 1:
            raise ValueError(f"xlag must be integer and > zero. Got {self.xlag}")

        if not isinstance(self.xlag, (int, list)):
            raise ValueError(f"xlag must be integer and > zero. Got {self.xlag}")

        if not isinstance(self.ylag, (int, list)):
            raise ValueError(f"ylag must be integer and > zero. Got {self.ylag}")

        if not isinstance(self.order_selection, bool):
            raise TypeError(
                f"order_selection must be False or True. Got {self.order_selection}"
            )

        if self.info_criteria not in ["aic", "aicc", "bic", "fpe", "lilc", "apress"]:
            raise ValueError(
                "info_criteria must be aic, aicc, bic, fpe, lilc"
                f" or apress. Got {self.info_criteria}"
            )

        if self.info_criteria == "apress":
            if not isinstance(self.apress_lambda, (int, float)):
                raise TypeError(
                    "apress_lambda must be a numeric value."
                    f" Got {type(self.apress_lambda)}"
                )
            if self.apress_lambda <= 0:
                raise ValueError(f"apress_lambda must be > 0. Got {self.apress_lambda}")

        if self.model_type not in ["NARMAX", "NAR", "NFIR"]:
            raise ValueError(
                f"model_type must be NARMAX, NAR or NFIR. Got {self.model_type}"
            )

        if (
            not isinstance(self.n_terms, int) or self.n_terms < 1
        ) and self.n_terms is not None:
            raise ValueError(f"n_terms must be integer and > zero. Got {self.n_terms}")

        if not isinstance(self.eps, float) or self.eps < 0:
            raise ValueError(f"eps must be float and > zero. Got {self.eps}")

    @abstractmethod
    def run_mss_algorithm(
        self, psi: np.ndarray, y: np.ndarray, process_term_number: int
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        return self.error_reduction_ratio(psi, y, process_term_number)

    def error_reduction_ratio(
        self, psi: np.ndarray, y: np.ndarray, process_term_number: int
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
        """Perform the Error Reduction Ration algorithm.

        Parameters
        ----------
        y : array-like of shape = n_samples
            The target data used in the identification process.
        psi : ndarray of floats
            The information matrix of the model.
        process_term_number : int
            Number of Process Terms defined by the user.

        Returns
        -------
        err : array-like of shape = number_of_model_elements
            The respective ERR calculated for each regressor.
        piv : array-like of shape = number_of_model_elements
            Contains the index to put the regressors in the correct order
            based on err values.
        psi_orthogonal : ndarray of floats
            The updated and orthogonal information matrix.

        References
        ----------
        - Manuscript: Orthogonal least squares methods and their application
           to non-linear system identification
           https://eprints.soton.ac.uk/251147/1/778742007_content.pdf
        - Manuscript (portuguese): Identificação de Sistemas não Lineares
           Utilizando Modelos NARMAX Polinomiais - Uma Revisão
           e Novos Resultados

        """
        xp = get_namespace(psi, y)
        target_device = _device(psi, y)
        squared_y = xp.sum(y[self.max_lag :, :] ** 2)
        squared_y = float(
            xp.asarray(max(float(squared_y), float(np.finfo(np.float64).eps)))
        )
        tmp_psi = _copy(xp, psi)
        y = xp.reshape(y[self.max_lag :, 0], (-1, 1))
        tmp_y = _copy(xp, y)
        dimension = tmp_psi.shape[1]
        piv = _asarray(np.arange(dimension), xp=xp, target_device=target_device)
        tmp_err = _zeros(
            xp, dimension, dtype=tmp_psi.dtype, target_device=target_device
        )
        err = _zeros(xp, dimension, dtype=tmp_psi.dtype, target_device=target_device)

        for i in range(dimension):
            tmp_err_slice = _compute_err_slice(
                tmp_psi,
                tmp_y,
                i,
                squared_y,
                self.alpha,
                self.eps,
            )
            # Use index assignment for the slice
            if _is_numpy_namespace(xp):
                tmp_err[i:] = tmp_err_slice
            else:
                tmp_err = xp.concat([tmp_err[:i], tmp_err_slice])

            piv_index = int(_to_numpy(xp.argmax(tmp_err[i:]))) + i
            err_val = tmp_err[piv_index]
            if _is_numpy_namespace(xp):
                err[i] = err_val
            else:
                err = xp.concat(
                    [
                        err[:i],
                        xp.reshape(
                            _asarray(
                                err_val,
                                xp=xp,
                                dtype=err.dtype,
                                target_device=target_device,
                            ),
                            (1,),
                        ),
                        err[i + 1 :],
                    ]
                )
            if i == process_term_number:
                break

            if _is_numpy_namespace(xp):
                cumsum_val = float(err.cumsum()[i])
            else:
                cumsum_val = float(xp.sum(err[: i + 1]))
            if (self.err_tol is not None) and (cumsum_val >= self.err_tol):
                self.n_terms = i + 1
                process_term_number = i + 1
                break

            if _is_numpy_namespace(xp):
                tmp_psi[:, [piv_index, i]] = tmp_psi[:, [i, piv_index]]
            else:
                tmp_psi = _swap_matrix_columns(xp, tmp_psi, piv_index, i)
            if _is_numpy_namespace(xp):
                piv_i = piv[i]
                piv_p = piv[piv_index]
                piv[[piv_index, i]] = piv[[i, piv_index]]
            else:
                piv_i = _copy(xp, piv[i])
                piv_p = _copy(xp, piv[piv_index])
                piv = _set_element(xp, piv, piv_index, piv_i)
                piv = _set_element(xp, piv, i, piv_p)
            v = house(tmp_psi[i:, i])
            row_result = rowhouse(tmp_psi[i:, i:], v)
            y_slice = (slice(i, None), slice(None))
            transformed_y = rowhouse(tmp_y[y_slice], v)
            if _is_numpy_namespace(xp):
                tmp_y[y_slice] = transformed_y
            else:
                tmp_y = _set_element(xp, tmp_y, y_slice, transformed_y)
            tmp_psi[i:, i:] = _copy(xp, row_result)

        tmp_piv = piv[0:process_term_number]
        if _is_numpy_namespace(xp):
            psi_orthogonal = psi[:, tmp_piv]
        else:
            psi_orthogonal = _take_columns_by_index(xp, psi, tmp_piv)
        return err, tmp_piv, psi_orthogonal

    def information_criterion(self, x: np.ndarray, y: np.ndarray) -> np.ndarray:
        """Determine the model order.

        This function uses a information criterion to determine the model size.
        'Akaike'-  Akaike's Information Criterion with
               critical value 2 (AIC) (default).
        'AICc' -   Akaike's Information Criterion with finite-sample correction.
        'Bayes' -  Bayes Information Criterion (BIC).
        'FPE'   -  Final Prediction Error (FPE).
        'LILC'  -  Khundrin's law of iterated logarithm criterion (LILC).
        'APRESS'- Adjustable Prediction Error Sum of Squares (paper eq. 9).

        Parameters
        ----------
        y : array-like of shape = n_samples
            Target values of the system.
        x : array-like of shape = n_samples
            Input system values measured by the user.

        Returns
        -------
        output_vector : array-like of shape = n_regressor
            Vector with values of akaike's information criterion
            for models with N terms (where N is the
            vector position + 1).

        """
        xp = get_namespace(x, y)
        if self.n_info_values is not None and self.n_info_values > x.shape[1]:
            self.n_info_values = x.shape[1]
            warnings.warn(
                "n_info_values is greater than the maximum number of all"
                " regressors space considering the chosen y_lag, u_lag, and"
                f" non_degree. We set as {x.shape[1]}",
                stacklevel=2,
            )
        output_vector = _full(
            xp,
            self.n_info_values,
            xp.nan,
            dtype=xp.float64,
            target_device=_device(x),
        )

        n_samples = y.shape[0] - self.max_lag

        for i in range(self.n_info_values):
            n_theta = i + 1
            mss_result = self.run_mss_algorithm(x, y, n_theta)
            _, _, regressor_matrix, estimation_target = self._unpack_mss_output(
                mss_result, y
            )

            tmp_theta = self.estimator.optimize(regressor_matrix, estimation_target)

            tmp_yhat = regressor_matrix @ tmp_theta
            tmp_residual = estimation_target - tmp_yhat

            if self.info_criteria == "apress":
                mse = float(xp.mean(tmp_residual**2))
                output_vector[i] = apress(n_theta, n_samples, mse, self.apress_lambda)
            else:
                if _is_numpy_namespace(xp):
                    e_var = float(np.var(tmp_residual, ddof=1))
                else:
                    n = tmp_residual.shape[0]
                    e_var = float(
                        xp.sum((tmp_residual - xp.mean(tmp_residual)) ** 2) / (n - 1)
                    )
                output_vector[i] = self.info_criteria_function(
                    n_theta, n_samples, e_var
                )

        return output_vector

    def fit(self, *, X: Optional[np.ndarray] = None, y: np.ndarray):
        """Fit polynomial NARMAX model.

        This is an 'alpha' version of the 'fit' function which allows
        a friendly usage by the user. Given two arguments, x and y, fit
        training data.

        Parameters
        ----------
        X : ndarray of floats
            The input data to be used in the training process.
        y : ndarray of floats
            The output data to be used in the training process.

        Returns
        -------
        model : ndarray of int
            The model code representation.
        piv : array-like of shape = number_of_model_elements
            Contains the index to put the regressors in the correct order
            based on err values.
        theta : array-like of shape = number_of_model_elements
            The estimated parameters of the model.
        err : array-like of shape = number_of_model_elements
            The respective ERR calculated for each regressor.
        info_values : array-like of shape = n_regressor
            Vector with values of akaike's information criterion
            for models with N terms (where N is the
            vector position + 1).

        """
        if y is None:
            raise ValueError("y cannot be None")

        self.max_lag = self._get_max_lag()
        lagged_data = build_lagged_matrix(X, y, self.xlag, self.ylag, self.model_type)

        reg_matrix = self.basis_function.fit(
            lagged_data,
            self.max_lag,
            self.ylag,
            self.xlag,
            self.model_type,
            predefined_regressors=None,
        )

        if X is not None:
            self.n_inputs = num_features(X)
        else:
            self.n_inputs = 1  # just to create the regressor space base

        self.regressor_code = self.regressor_space(self.n_inputs)

        if self.order_selection is True:
            self.info_values = self.information_criterion(reg_matrix, y)

        if self.n_terms is None and self.order_selection is True:
            if self.info_criteria == "apress":
                # APRESS uses the minimizer of the criterion (eq. 10)
                xp = get_namespace(self.info_values)
                model_length = int(_to_numpy(_nanargmin(xp, self.info_values))) + 1
            else:
                model_length = get_min_info_value(self.info_values)
            self.n_terms = model_length
        elif self.n_terms is None and self.order_selection is not True:
            raise ValueError(
                "If order_selection is False, you must define n_terms value."
            )
        else:
            model_length = self.n_terms

        mss_result = self.run_mss_algorithm(reg_matrix, y, model_length)
        self.err, self.pivv, psi, estimation_target = self._unpack_mss_output(
            mss_result, y
        )
        self.pivv = np.asarray(_to_numpy(self.pivv), dtype=np.intp).reshape(-1)

        tmp_piv = self.pivv[0:model_length]
        repetition = reg_matrix.shape[0]
        if isinstance(self.basis_function, Polynomial):
            self.final_model = self.regressor_code[tmp_piv, :].copy()
        else:
            self.regressor_code = np.sort(
                np.tile(self.regressor_code[1:, :], (repetition, 1)),
                axis=0,
            )
            self.final_model = self.regressor_code[tmp_piv, :].copy()

        self.theta = self.estimator.optimize(psi, estimation_target)
        if self.estimator.unbiased is True:
            self.theta = self.estimator.unbiased_estimator(
                psi,
                estimation_target,
                self.theta,
                self.elag,
                self.max_lag,
                self.estimator,
                self.basis_function,
                self.estimator.uiter,
            )
        return self

    def predict(
        self,
        *,
        X: Optional[np.ndarray] = None,
        y: np.ndarray,
        steps_ahead: Optional[int] = None,
        forecast_horizon: Optional[int] = None,
    ) -> np.ndarray:
        """Return the predicted values given an input.

        The predict function allows a friendly usage by the user.
        Given a previously trained model, predict values given
        a new set of data.

        This method accept y values mainly for prediction n-steps ahead
        (to be implemented in the future)

        Parameters
        ----------
        X : ndarray of floats
            The input data to be used in the prediction process.
        y : ndarray of floats
            The output data to be used in the prediction process.
        steps_ahead : int (default = None)
            The user can use free run simulation, one-step ahead prediction
            and n-step ahead prediction.
        forecast_horizon : int, default=None
            The number of predictions over the time.

        Returns
        -------
        yhat : ndarray of floats
            The predicted values of the model.

        """
        xp, target_device = _get_namespace_and_device(X, y)
        # Sequential predict (free-run / n-step) on GPU backends is dominated
        # by kernel-launch overhead.  Fall back to the fast NumPy path and
        # convert the result back to the original device.
        if steps_ahead != 1 and not _is_numpy_namespace(xp):
            return self._predict_on_cpu(
                X=X,
                y=y,
                steps_ahead=steps_ahead,
                forecast_horizon=forecast_horizon,
                original_xp=xp,
                target_device=target_device,
            )

        prefix = y[: self.max_lag, ...]
        if isinstance(self.basis_function, Polynomial):
            if steps_ahead is None:
                yhat = self._model_prediction(X, y, forecast_horizon=forecast_horizon)
                yhat = _concat(xp, [prefix, yhat], axis=0)
                return yhat
            if steps_ahead == 1:
                yhat = self._one_step_ahead_prediction(X, y)
                yhat = _concat(xp, [prefix, yhat], axis=0)
                return yhat

            check_positive_int(steps_ahead, "steps_ahead")
            yhat = self._n_step_ahead_prediction(X, y, steps_ahead=steps_ahead)
            yhat = _concat(xp, [prefix, yhat], axis=0)
            return yhat

        if steps_ahead is None:
            yhat = self._basis_function_predict(X, y, forecast_horizon)
            yhat = _concat(xp, [prefix, yhat], axis=0)
            return yhat
        if steps_ahead == 1:
            yhat = self._one_step_ahead_prediction(X, y)
            yhat = _concat(xp, [prefix, yhat], axis=0)
            return yhat

        yhat = self._basis_function_n_step_prediction(
            X, y, steps_ahead, forecast_horizon
        )
        yhat = _concat(xp, [prefix, yhat], axis=0)
        return yhat

    def _one_step_ahead_prediction(
        self, x_base: np.ndarray, y: Optional[np.ndarray] = None
    ) -> np.ndarray:
        """Perform the 1-step-ahead prediction of a model.

        Parameters
        ----------
        y : array-like of shape = max_lag
            Initial conditions values of the model
            to start recursive process.
        x : ndarray of floats of shape = n_samples
            Vector with input values to be used in model simulation.

        Returns
        -------
        yhat : ndarray of floats
               The 1-step-ahead predicted values of the model.

        """
        lagged_data = build_lagged_matrix(
            x_base, y, self.xlag, self.ylag, self.model_type
        )

        x_tmp = self.basis_function.transform(
            lagged_data,
            self.max_lag,
            self.ylag,
            self.xlag,
            self.model_type,
            predefined_regressors=self.pivv[: len(self.final_model)],
        )

        yhat = super()._one_step_ahead_prediction(x_tmp)
        return yhat.reshape(-1, 1)

    def _n_step_ahead_prediction(
        self, x: Optional[np.ndarray], y: Optional[np.ndarray], steps_ahead: int
    ) -> float:
        """Perform the n-steps-ahead prediction of a model.

        Parameters
        ----------
        y : array-like of shape = max_lag
            Initial conditions values of the model
            to start recursive process.
        x : ndarray of floats of shape = n_samples
            Vector with input values to be used in model simulation.

        Returns
        -------
        yhat : ndarray of floats
               The n-steps-ahead predicted values of the model.

        """
        yhat = super()._n_step_ahead_prediction(x, y, steps_ahead)
        return yhat

    def _model_prediction(
        self,
        x: Optional[np.ndarray],
        y_initial: np.ndarray,
        forecast_horizon: int = 0,
    ) -> np.ndarray:
        """Perform the infinity steps-ahead simulation of a model.

        Parameters
        ----------
        y_initial : array-like of shape = max_lag
            Number of initial conditions values of output
            to start recursive process.
        x : ndarray of floats of shape = n_samples
            Vector with input values to be used in model simulation.

        Returns
        -------
        yhat : ndarray of floats
               The predicted values of the model.

        """
        if self.model_type in ["NARMAX", "NAR"]:
            return self._narmax_predict(x, y_initial, forecast_horizon)

        if self.model_type == "NFIR":
            return self._nfir_predict(x, y_initial)

        raise ValueError(
            f"model_type must be NARMAX, NAR or NFIR. Got {self.model_type}"
        )

    def _narmax_predict(
        self,
        x: Optional[np.ndarray],
        y_initial: np.ndarray,
        forecast_horizon: int = 0,
    ) -> np.ndarray:
        if y_initial.shape[0] < self.max_lag:
            raise ValueError(
                "Insufficient initial condition elements! Expected at least"
                f" {self.max_lag} elements."
            )

        if x is not None:
            forecast_horizon = x.shape[0]
        else:
            forecast_horizon = forecast_horizon + self.max_lag

        if self.model_type == "NAR":
            self.n_inputs = 0

        y_output = super()._narmax_predict(x, y_initial, forecast_horizon)
        return y_output

    def _nfir_predict(
        self, x: Optional[np.ndarray], y_initial: Optional[np.ndarray]
    ) -> np.ndarray:
        y_output = super()._nfir_predict(x, y_initial)
        return y_output

    def _basis_function_predict(
        self,
        x: Optional[np.ndarray],
        y_initial: Optional[np.ndarray],
        forecast_horizon: int = 0,
    ) -> np.ndarray:
        if x is not None:
            forecast_horizon = x.shape[0]
        else:
            forecast_horizon = forecast_horizon + self.max_lag

        if self.model_type == "NAR":
            self.n_inputs = 0

        yhat = super()._basis_function_predict(x, y_initial, forecast_horizon)
        return yhat.reshape(-1, 1)

    def _basis_function_n_step_prediction(
        self,
        x: Optional[np.ndarray],
        y: np.ndarray,
        steps_ahead: Optional[int],
        forecast_horizon: int,
    ) -> np.ndarray:
        """Perform the n-steps-ahead prediction of a model.

        Parameters
        ----------
        y : array-like of shape = max_lag
            Initial conditions values of the model
            to start recursive process.
        x : ndarray of floats of shape = n_samples
            Vector with input values to be used in model simulation.

        Returns
        -------
        yhat : ndarray of floats
               The n-steps-ahead predicted values of the model.

        """
        if y.shape[0] < self.max_lag:
            raise ValueError(
                "Insufficient initial condition elements! Expected at least"
                f" {self.max_lag} elements."
            )

        if x is not None:
            forecast_horizon = x.shape[0]
        else:
            forecast_horizon = forecast_horizon + self.max_lag

        yhat = super()._basis_function_n_step_prediction(
            x, y, steps_ahead, forecast_horizon
        )
        return yhat.reshape(-1, 1)

    def _basis_function_n_steps_horizon(
        self,
        x: Optional[np.ndarray],
        y: Optional[np.ndarray],
        steps_ahead: Optional[int],
        forecast_horizon: int,
    ) -> np.ndarray:
        yhat = super()._basis_function_n_steps_horizon(
            x, y, steps_ahead, forecast_horizon
        )
        return yhat.reshape(-1, 1)

error_reduction_ratio(psi, y, process_term_number)

Perform the Error Reduction Ration algorithm.

Parameters:

Name Type Description Default
y array-like of shape = n_samples

The target data used in the identification process.

required
psi ndarray of floats

The information matrix of the model.

required
process_term_number int

Number of Process Terms defined by the user.

required

Returns:

Name Type Description
err array-like of shape = number_of_model_elements

The respective ERR calculated for each regressor.

piv array-like of shape = number_of_model_elements

Contains the index to put the regressors in the correct order based on err values.

psi_orthogonal ndarray of floats

The updated and orthogonal information matrix.

References
  • Manuscript: Orthogonal least squares methods and their application to non-linear system identification https://eprints.soton.ac.uk/251147/1/778742007_content.pdf
  • Manuscript (portuguese): Identificação de Sistemas não Lineares Utilizando Modelos NARMAX Polinomiais - Uma Revisão e Novos Resultados
Source code in sysidentpy/model_structure_selection/ofr_base.py
def error_reduction_ratio(
    self, psi: np.ndarray, y: np.ndarray, process_term_number: int
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Perform the Error Reduction Ration algorithm.

    Parameters
    ----------
    y : array-like of shape = n_samples
        The target data used in the identification process.
    psi : ndarray of floats
        The information matrix of the model.
    process_term_number : int
        Number of Process Terms defined by the user.

    Returns
    -------
    err : array-like of shape = number_of_model_elements
        The respective ERR calculated for each regressor.
    piv : array-like of shape = number_of_model_elements
        Contains the index to put the regressors in the correct order
        based on err values.
    psi_orthogonal : ndarray of floats
        The updated and orthogonal information matrix.

    References
    ----------
    - Manuscript: Orthogonal least squares methods and their application
       to non-linear system identification
       https://eprints.soton.ac.uk/251147/1/778742007_content.pdf
    - Manuscript (portuguese): Identificação de Sistemas não Lineares
       Utilizando Modelos NARMAX Polinomiais - Uma Revisão
       e Novos Resultados

    """
    xp = get_namespace(psi, y)
    target_device = _device(psi, y)
    squared_y = xp.sum(y[self.max_lag :, :] ** 2)
    squared_y = float(
        xp.asarray(max(float(squared_y), float(np.finfo(np.float64).eps)))
    )
    tmp_psi = _copy(xp, psi)
    y = xp.reshape(y[self.max_lag :, 0], (-1, 1))
    tmp_y = _copy(xp, y)
    dimension = tmp_psi.shape[1]
    piv = _asarray(np.arange(dimension), xp=xp, target_device=target_device)
    tmp_err = _zeros(
        xp, dimension, dtype=tmp_psi.dtype, target_device=target_device
    )
    err = _zeros(xp, dimension, dtype=tmp_psi.dtype, target_device=target_device)

    for i in range(dimension):
        tmp_err_slice = _compute_err_slice(
            tmp_psi,
            tmp_y,
            i,
            squared_y,
            self.alpha,
            self.eps,
        )
        # Use index assignment for the slice
        if _is_numpy_namespace(xp):
            tmp_err[i:] = tmp_err_slice
        else:
            tmp_err = xp.concat([tmp_err[:i], tmp_err_slice])

        piv_index = int(_to_numpy(xp.argmax(tmp_err[i:]))) + i
        err_val = tmp_err[piv_index]
        if _is_numpy_namespace(xp):
            err[i] = err_val
        else:
            err = xp.concat(
                [
                    err[:i],
                    xp.reshape(
                        _asarray(
                            err_val,
                            xp=xp,
                            dtype=err.dtype,
                            target_device=target_device,
                        ),
                        (1,),
                    ),
                    err[i + 1 :],
                ]
            )
        if i == process_term_number:
            break

        if _is_numpy_namespace(xp):
            cumsum_val = float(err.cumsum()[i])
        else:
            cumsum_val = float(xp.sum(err[: i + 1]))
        if (self.err_tol is not None) and (cumsum_val >= self.err_tol):
            self.n_terms = i + 1
            process_term_number = i + 1
            break

        if _is_numpy_namespace(xp):
            tmp_psi[:, [piv_index, i]] = tmp_psi[:, [i, piv_index]]
        else:
            tmp_psi = _swap_matrix_columns(xp, tmp_psi, piv_index, i)
        if _is_numpy_namespace(xp):
            piv_i = piv[i]
            piv_p = piv[piv_index]
            piv[[piv_index, i]] = piv[[i, piv_index]]
        else:
            piv_i = _copy(xp, piv[i])
            piv_p = _copy(xp, piv[piv_index])
            piv = _set_element(xp, piv, piv_index, piv_i)
            piv = _set_element(xp, piv, i, piv_p)
        v = house(tmp_psi[i:, i])
        row_result = rowhouse(tmp_psi[i:, i:], v)
        y_slice = (slice(i, None), slice(None))
        transformed_y = rowhouse(tmp_y[y_slice], v)
        if _is_numpy_namespace(xp):
            tmp_y[y_slice] = transformed_y
        else:
            tmp_y = _set_element(xp, tmp_y, y_slice, transformed_y)
        tmp_psi[i:, i:] = _copy(xp, row_result)

    tmp_piv = piv[0:process_term_number]
    if _is_numpy_namespace(xp):
        psi_orthogonal = psi[:, tmp_piv]
    else:
        psi_orthogonal = _take_columns_by_index(xp, psi, tmp_piv)
    return err, tmp_piv, psi_orthogonal

fit(*, X=None, y)

Fit polynomial NARMAX model.

This is an 'alpha' version of the 'fit' function which allows a friendly usage by the user. Given two arguments, x and y, fit training data.

Parameters:

Name Type Description Default
X ndarray of floats

The input data to be used in the training process.

None
y ndarray of floats

The output data to be used in the training process.

required

Returns:

Name Type Description
model ndarray of int

The model code representation.

piv array-like of shape = number_of_model_elements

Contains the index to put the regressors in the correct order based on err values.

theta array-like of shape = number_of_model_elements

The estimated parameters of the model.

err array-like of shape = number_of_model_elements

The respective ERR calculated for each regressor.

info_values array-like of shape = n_regressor

Vector with values of akaike's information criterion for models with N terms (where N is the vector position + 1).

Source code in sysidentpy/model_structure_selection/ofr_base.py
def fit(self, *, X: Optional[np.ndarray] = None, y: np.ndarray):
    """Fit polynomial NARMAX model.

    This is an 'alpha' version of the 'fit' function which allows
    a friendly usage by the user. Given two arguments, x and y, fit
    training data.

    Parameters
    ----------
    X : ndarray of floats
        The input data to be used in the training process.
    y : ndarray of floats
        The output data to be used in the training process.

    Returns
    -------
    model : ndarray of int
        The model code representation.
    piv : array-like of shape = number_of_model_elements
        Contains the index to put the regressors in the correct order
        based on err values.
    theta : array-like of shape = number_of_model_elements
        The estimated parameters of the model.
    err : array-like of shape = number_of_model_elements
        The respective ERR calculated for each regressor.
    info_values : array-like of shape = n_regressor
        Vector with values of akaike's information criterion
        for models with N terms (where N is the
        vector position + 1).

    """
    if y is None:
        raise ValueError("y cannot be None")

    self.max_lag = self._get_max_lag()
    lagged_data = build_lagged_matrix(X, y, self.xlag, self.ylag, self.model_type)

    reg_matrix = self.basis_function.fit(
        lagged_data,
        self.max_lag,
        self.ylag,
        self.xlag,
        self.model_type,
        predefined_regressors=None,
    )

    if X is not None:
        self.n_inputs = num_features(X)
    else:
        self.n_inputs = 1  # just to create the regressor space base

    self.regressor_code = self.regressor_space(self.n_inputs)

    if self.order_selection is True:
        self.info_values = self.information_criterion(reg_matrix, y)

    if self.n_terms is None and self.order_selection is True:
        if self.info_criteria == "apress":
            # APRESS uses the minimizer of the criterion (eq. 10)
            xp = get_namespace(self.info_values)
            model_length = int(_to_numpy(_nanargmin(xp, self.info_values))) + 1
        else:
            model_length = get_min_info_value(self.info_values)
        self.n_terms = model_length
    elif self.n_terms is None and self.order_selection is not True:
        raise ValueError(
            "If order_selection is False, you must define n_terms value."
        )
    else:
        model_length = self.n_terms

    mss_result = self.run_mss_algorithm(reg_matrix, y, model_length)
    self.err, self.pivv, psi, estimation_target = self._unpack_mss_output(
        mss_result, y
    )
    self.pivv = np.asarray(_to_numpy(self.pivv), dtype=np.intp).reshape(-1)

    tmp_piv = self.pivv[0:model_length]
    repetition = reg_matrix.shape[0]
    if isinstance(self.basis_function, Polynomial):
        self.final_model = self.regressor_code[tmp_piv, :].copy()
    else:
        self.regressor_code = np.sort(
            np.tile(self.regressor_code[1:, :], (repetition, 1)),
            axis=0,
        )
        self.final_model = self.regressor_code[tmp_piv, :].copy()

    self.theta = self.estimator.optimize(psi, estimation_target)
    if self.estimator.unbiased is True:
        self.theta = self.estimator.unbiased_estimator(
            psi,
            estimation_target,
            self.theta,
            self.elag,
            self.max_lag,
            self.estimator,
            self.basis_function,
            self.estimator.uiter,
        )
    return self

information_criterion(x, y)

Determine the model order.

This function uses a information criterion to determine the model size. 'Akaike'- Akaike's Information Criterion with critical value 2 (AIC) (default). 'AICc' - Akaike's Information Criterion with finite-sample correction. 'Bayes' - Bayes Information Criterion (BIC). 'FPE' - Final Prediction Error (FPE). 'LILC' - Khundrin's law of iterated logarithm criterion (LILC). 'APRESS'- Adjustable Prediction Error Sum of Squares (paper eq. 9).

Parameters:

Name Type Description Default
y array-like of shape = n_samples

Target values of the system.

required
x array-like of shape = n_samples

Input system values measured by the user.

required

Returns:

Name Type Description
output_vector array-like of shape = n_regressor

Vector with values of akaike's information criterion for models with N terms (where N is the vector position + 1).

Source code in sysidentpy/model_structure_selection/ofr_base.py
def information_criterion(self, x: np.ndarray, y: np.ndarray) -> np.ndarray:
    """Determine the model order.

    This function uses a information criterion to determine the model size.
    'Akaike'-  Akaike's Information Criterion with
           critical value 2 (AIC) (default).
    'AICc' -   Akaike's Information Criterion with finite-sample correction.
    'Bayes' -  Bayes Information Criterion (BIC).
    'FPE'   -  Final Prediction Error (FPE).
    'LILC'  -  Khundrin's law of iterated logarithm criterion (LILC).
    'APRESS'- Adjustable Prediction Error Sum of Squares (paper eq. 9).

    Parameters
    ----------
    y : array-like of shape = n_samples
        Target values of the system.
    x : array-like of shape = n_samples
        Input system values measured by the user.

    Returns
    -------
    output_vector : array-like of shape = n_regressor
        Vector with values of akaike's information criterion
        for models with N terms (where N is the
        vector position + 1).

    """
    xp = get_namespace(x, y)
    if self.n_info_values is not None and self.n_info_values > x.shape[1]:
        self.n_info_values = x.shape[1]
        warnings.warn(
            "n_info_values is greater than the maximum number of all"
            " regressors space considering the chosen y_lag, u_lag, and"
            f" non_degree. We set as {x.shape[1]}",
            stacklevel=2,
        )
    output_vector = _full(
        xp,
        self.n_info_values,
        xp.nan,
        dtype=xp.float64,
        target_device=_device(x),
    )

    n_samples = y.shape[0] - self.max_lag

    for i in range(self.n_info_values):
        n_theta = i + 1
        mss_result = self.run_mss_algorithm(x, y, n_theta)
        _, _, regressor_matrix, estimation_target = self._unpack_mss_output(
            mss_result, y
        )

        tmp_theta = self.estimator.optimize(regressor_matrix, estimation_target)

        tmp_yhat = regressor_matrix @ tmp_theta
        tmp_residual = estimation_target - tmp_yhat

        if self.info_criteria == "apress":
            mse = float(xp.mean(tmp_residual**2))
            output_vector[i] = apress(n_theta, n_samples, mse, self.apress_lambda)
        else:
            if _is_numpy_namespace(xp):
                e_var = float(np.var(tmp_residual, ddof=1))
            else:
                n = tmp_residual.shape[0]
                e_var = float(
                    xp.sum((tmp_residual - xp.mean(tmp_residual)) ** 2) / (n - 1)
                )
            output_vector[i] = self.info_criteria_function(
                n_theta, n_samples, e_var
            )

    return output_vector

predict(*, X=None, y, steps_ahead=None, forecast_horizon=None)

Return the predicted values given an input.

The predict function allows a friendly usage by the user. Given a previously trained model, predict values given a new set of data.

This method accept y values mainly for prediction n-steps ahead (to be implemented in the future)

Parameters:

Name Type Description Default
X ndarray of floats

The input data to be used in the prediction process.

None
y ndarray of floats

The output data to be used in the prediction process.

required
steps_ahead int(default=None)

The user can use free run simulation, one-step ahead prediction and n-step ahead prediction.

None
forecast_horizon int

The number of predictions over the time.

None

Returns:

Name Type Description
yhat ndarray of floats

The predicted values of the model.

Source code in sysidentpy/model_structure_selection/ofr_base.py
def predict(
    self,
    *,
    X: Optional[np.ndarray] = None,
    y: np.ndarray,
    steps_ahead: Optional[int] = None,
    forecast_horizon: Optional[int] = None,
) -> np.ndarray:
    """Return the predicted values given an input.

    The predict function allows a friendly usage by the user.
    Given a previously trained model, predict values given
    a new set of data.

    This method accept y values mainly for prediction n-steps ahead
    (to be implemented in the future)

    Parameters
    ----------
    X : ndarray of floats
        The input data to be used in the prediction process.
    y : ndarray of floats
        The output data to be used in the prediction process.
    steps_ahead : int (default = None)
        The user can use free run simulation, one-step ahead prediction
        and n-step ahead prediction.
    forecast_horizon : int, default=None
        The number of predictions over the time.

    Returns
    -------
    yhat : ndarray of floats
        The predicted values of the model.

    """
    xp, target_device = _get_namespace_and_device(X, y)
    # Sequential predict (free-run / n-step) on GPU backends is dominated
    # by kernel-launch overhead.  Fall back to the fast NumPy path and
    # convert the result back to the original device.
    if steps_ahead != 1 and not _is_numpy_namespace(xp):
        return self._predict_on_cpu(
            X=X,
            y=y,
            steps_ahead=steps_ahead,
            forecast_horizon=forecast_horizon,
            original_xp=xp,
            target_device=target_device,
        )

    prefix = y[: self.max_lag, ...]
    if isinstance(self.basis_function, Polynomial):
        if steps_ahead is None:
            yhat = self._model_prediction(X, y, forecast_horizon=forecast_horizon)
            yhat = _concat(xp, [prefix, yhat], axis=0)
            return yhat
        if steps_ahead == 1:
            yhat = self._one_step_ahead_prediction(X, y)
            yhat = _concat(xp, [prefix, yhat], axis=0)
            return yhat

        check_positive_int(steps_ahead, "steps_ahead")
        yhat = self._n_step_ahead_prediction(X, y, steps_ahead=steps_ahead)
        yhat = _concat(xp, [prefix, yhat], axis=0)
        return yhat

    if steps_ahead is None:
        yhat = self._basis_function_predict(X, y, forecast_horizon)
        yhat = _concat(xp, [prefix, yhat], axis=0)
        return yhat
    if steps_ahead == 1:
        yhat = self._one_step_ahead_prediction(X, y)
        yhat = _concat(xp, [prefix, yhat], axis=0)
        return yhat

    yhat = self._basis_function_n_step_prediction(
        X, y, steps_ahead, forecast_horizon
    )
    yhat = _concat(xp, [prefix, yhat], axis=0)
    return yhat

aic(n_theta, n_samples, e_var)

Compute the Akaike information criteria value.

Parameters:

Name Type Description Default
n_theta int

Number of parameters of the model.

required
n_samples int

Number of samples given the maximum lag.

required
e_var float

Variance of the residues

required

Returns:

Name Type Description
info_criteria_value float

The computed value given the information criteria selected by the user.

Source code in sysidentpy/model_structure_selection/ofr_base.py
def aic(n_theta: int, n_samples: int, e_var: float) -> float:
    """Compute the Akaike information criteria value.

    Parameters
    ----------
    n_theta : int
        Number of parameters of the model.
    n_samples : int
        Number of samples given the maximum lag.
    e_var : float
        Variance of the residues

    Returns
    -------
    info_criteria_value : float
        The computed value given the information criteria selected by the
        user.

    """
    model_factor = 2 * n_theta
    e_factor = n_samples * np.log(e_var)
    info_criteria_value = e_factor + model_factor

    return info_criteria_value

aicc(n_theta, n_samples, e_var)

Compute the Akaike information Criteria corrected value.

Parameters:

Name Type Description Default
n_theta int

Number of parameters of the model.

required
n_samples int

Number of samples given the maximum lag.

required
e_var float

Variance of the residues

required

Returns:

Name Type Description
aicc float

The computed aicc value.

References
Source code in sysidentpy/model_structure_selection/ofr_base.py
def aicc(n_theta: int, n_samples: int, e_var: float) -> float:
    """Compute the Akaike information Criteria corrected value.

    Parameters
    ----------
    n_theta : int
        Number of parameters of the model.
    n_samples : int
        Number of samples given the maximum lag.
    e_var : float
        Variance of the residues

    Returns
    -------
    aicc : float
        The computed aicc value.

    References
    ----------
    - https://www.mathworks.com/help/ident/ref/idmodel.aic.html

    """
    aic_values = aic(n_theta, n_samples, e_var)
    aicc_values = aic_values + (2 * n_theta * (n_theta + 1) / (n_samples - n_theta - 1))

    return aicc_values

apress(n_theta, n_samples, mse, apress_lambda)

Compute the APRESS criterion (eq. 9 of RMSS paper).

Parameters:

Name Type Description Default
n_theta int

Number of selected terms (parameters).

required
n_samples int

Number of available samples after lag alignment.

required
mse float

Mean squared error of the model with n_theta terms.

required
apress_lambda float

Small positive constant (lambda in the paper).

required

Returns:

Type Description
float

The APRESS score for the current model size.

Source code in sysidentpy/model_structure_selection/ofr_base.py
def apress(n_theta: int, n_samples: int, mse: float, apress_lambda: float) -> float:
    """Compute the APRESS criterion (eq. 9 of RMSS paper).

    Parameters
    ----------
    n_theta : int
        Number of selected terms (parameters).
    n_samples : int
        Number of available samples after lag alignment.
    mse : float
        Mean squared error of the model with ``n_theta`` terms.
    apress_lambda : float
        Small positive constant (lambda in the paper).

    Returns
    -------
    float
        The APRESS score for the current model size.
    """
    denom = n_samples - apress_lambda * n_theta
    if denom <= 0:
        # Prevent division by zero/negative scaling; fall back to large penalty.
        return np.inf
    scale = (n_samples / denom) ** 2
    return scale * mse

bic(n_theta, n_samples, e_var)

Compute the Bayesian information criteria value.

Parameters:

Name Type Description Default
n_theta int

Number of parameters of the model.

required
n_samples int

Number of samples given the maximum lag.

required
e_var float

Variance of the residues

required

Returns:

Name Type Description
info_criteria_value float

The computed value given the information criteria selected by the user.

Source code in sysidentpy/model_structure_selection/ofr_base.py
def bic(n_theta: int, n_samples: int, e_var: float) -> float:
    """Compute the Bayesian information criteria value.

    Parameters
    ----------
    n_theta : int
        Number of parameters of the model.
    n_samples : int
        Number of samples given the maximum lag.
    e_var : float
        Variance of the residues

    Returns
    -------
    info_criteria_value : float
        The computed value given the information criteria selected by the
        user.

    """
    model_factor = n_theta * np.log(n_samples)
    e_factor = n_samples * np.log(e_var)
    info_criteria_value = e_factor + model_factor

    return info_criteria_value

fpe(n_theta, n_samples, e_var)

Compute the Final Error Prediction value.

Parameters:

Name Type Description Default
n_theta int

Number of parameters of the model.

required
n_samples int

Number of samples given the maximum lag.

required
e_var float

Variance of the residues

required

Returns:

Name Type Description
info_criteria_value float

The computed value given the information criteria selected by the user.

Source code in sysidentpy/model_structure_selection/ofr_base.py
def fpe(n_theta: int, n_samples: int, e_var: float) -> float:
    """Compute the Final Error Prediction value.

    Parameters
    ----------
    n_theta : int
        Number of parameters of the model.
    n_samples : int
        Number of samples given the maximum lag.
    e_var : float
        Variance of the residues

    Returns
    -------
    info_criteria_value : float
        The computed value given the information criteria selected by the
        user.

    """
    model_factor = n_samples * np.log((n_samples + n_theta) / (n_samples - n_theta))
    e_factor = n_samples * np.log(e_var)
    info_criteria_value = e_factor + model_factor

    return info_criteria_value

get_info_criteria(info_criteria, apress_lambda=1.0)

Get info criteria.

Source code in sysidentpy/model_structure_selection/ofr_base.py
def get_info_criteria(info_criteria: str, apress_lambda: float = 1.0):
    """Get info criteria."""
    info_criteria_options = {
        "aic": aic,
        "aicc": aicc,
        "bic": bic,
        "fpe": fpe,
        "lilc": lilc,
        "apress": partial(apress, apress_lambda=apress_lambda),
    }
    return info_criteria_options.get(info_criteria)

get_min_info_value(info_values)

Find the index of the first increasing value in an array.

Parameters:

Name Type Description Default
info_values array - like

A sequence of numeric values to be analyzed.

required

Returns:

Type Description
int

The index of the first element where the values start to increase monotonically. If no such element exists, the length of info_values is returned.

Notes
  • The function assumes that info_values is a 1-dimensional array-like structure.
  • The function uses np.diff to compute the difference between consecutive elements in the sequence.
  • The function checks if any differences are positive, indicating an increase in value.

Examples:

>>> class MyClass:
...     def __init__(self, values):
...         self.info_values = values
...     def get_min_info_value(self):
...         is_monotonique = np.diff(self.info_values) > 0
...         if any(is_monotonique):
...             return np.where(is_monotonique)[0][0] + 1
...         return len(self.info_values)
>>> instance = MyClass([3, 2, 1, 4, 5])
>>> instance.get_min_info_value()
3
Source code in sysidentpy/model_structure_selection/ofr_base.py
def get_min_info_value(info_values):
    """Find the index of the first increasing value in an array.

    Parameters
    ----------
    info_values : array-like
        A sequence of numeric values to be analyzed.

    Returns
    -------
    int
        The index of the first element where the values start to increase
        monotonically. If no such element exists, the length of
        `info_values` is returned.

    Notes
    -----
    - The function assumes that `info_values` is a 1-dimensional array-like
    structure.
    - The function uses `np.diff` to compute the difference between consecutive
    elements in the sequence.
    - The function checks if any differences are positive, indicating an increase
    in value.

    Examples
    --------
    >>> class MyClass:
    ...     def __init__(self, values):
    ...         self.info_values = values
    ...     def get_min_info_value(self):
    ...         is_monotonique = np.diff(self.info_values) > 0
    ...         if any(is_monotonique):
    ...             return np.where(is_monotonique)[0][0] + 1
    ...         return len(self.info_values)
    >>> instance = MyClass([3, 2, 1, 4, 5])
    >>> instance.get_min_info_value()
    3
    """
    xp = get_namespace(info_values)

    if _is_numpy_namespace(xp):
        is_monotonique = np.diff(info_values) > 0
        if any(is_monotonique):
            return np.where(is_monotonique)[0][0] + 1
        return info_values.shape[0]

    is_monotonique = info_values[1:] > info_values[:-1]
    if bool(_to_numpy(xp.any(is_monotonique))):
        first_increase = xp.nonzero(is_monotonique)[0][0]
        return int(_to_numpy(first_increase)) + 1
    return info_values.shape[0]

lilc(n_theta, n_samples, e_var)

Compute the Lilc information criteria value.

Parameters:

Name Type Description Default
n_theta int

Number of parameters of the model.

required
n_samples int

Number of samples given the maximum lag.

required
e_var float

Variance of the residues

required

Returns:

Name Type Description
info_criteria_value float

The computed value given the information criteria selected by the user.

Source code in sysidentpy/model_structure_selection/ofr_base.py
def lilc(n_theta: int, n_samples: int, e_var: float) -> float:
    """Compute the Lilc information criteria value.

    Parameters
    ----------
    n_theta : int
        Number of parameters of the model.
    n_samples : int
        Number of samples given the maximum lag.
    e_var : float
        Variance of the residues

    Returns
    -------
    info_criteria_value : float
        The computed value given the information criteria selected by the
        user.

    """
    model_factor = 2 * n_theta * np.log(np.log(n_samples))
    e_factor = n_samples * np.log(e_var)
    info_criteria_value = e_factor + model_factor

    return info_criteria_value