Skip to content

Documentation for Orthogonal Floating Search (OSF, OIF, OOS/O2S)

Orthogonal floating search algorithms (OSF, OIF, OOS/O2S).

The algorithms implemented here follow the Orthogonal Floating Search framework described in the attached OFSA manuscript. They adapt well-known floating feature selection strategies to the NARX model structure selection problem by combining: (i) orthogonal projections of candidate regressors and (ii) the classical Error Reduction Ratio (ERR) criterion. All classes are drop-in compatible with the existing SysIdentPy API (estimators, basis functions, equation formatter, etc.).

OIF

Bases: OSF

Orthogonal Improved Floating search (Section 3.3 of the OFSA paper).

Extends OSF with a swap step that replaces weak terms with better ones when the ERR score increases.

Source code in sysidentpy/model_structure_selection/orthogonal_floating_search.py
class OIF(OSF):
    """Orthogonal Improved Floating search (Section 3.3 of the OFSA paper).

    Extends OSF with a swap step that replaces weak terms with better ones
    when the ERR score increases.
    """

    def run_mss_algorithm(
        self, psi: np.ndarray, y: np.ndarray, process_term_number: int
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
        """Run OIF and return (err_vals, piv, psi_selected, target).

        Parameters
        ----------
        psi : np.ndarray
            Candidate regressor matrix with shape (n_samples, n_terms).
        y : np.ndarray
            Output signal aligned with ``psi``.
        process_term_number : int
            Maximum number of terms to select.

        Returns
        -------
        Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]
            ``(err_vals, piv, psi_selected, target)`` matching the MSS API.
        """
        target = self._default_estimation_target(y)
        squared_y = self._compute_squared_y(target)
        subset = self._floating_forward_search(
            psi,
            target,
            process_term_number,
            squared_y,
            swap_callback=self._swap_step,
        )

        _, err_vals = self._subset_err(psi, target, subset, squared_y)
        piv = np.array(subset, dtype=int)
        psi_selected = psi[:, piv] if len(piv) else psi[:, :0]
        return err_vals, piv, psi_selected, target

    def _swap_step(
        self,
        psi: np.ndarray,
        target: np.ndarray,
        subset: List[int],
        current_score: float,
        best_by_size: Dict[int, Tuple[float, Tuple[int, ...]]],
        all_indices: List[int],
        squared_y: float,
    ) -> Tuple[List[int], float, Optional[int]]:
        best_subset = subset.copy()
        best_score = current_score
        best_added: Optional[int] = None

        for idx in subset:
            reduced = [t for t in subset if t != idx]
            available = [i for i in all_indices if i not in reduced]
            ms_idx, _ = self._best_addition(psi, target, reduced, available, squared_y)
            if ms_idx is None or ms_idx in reduced:
                continue

            candidate_subset = reduced + [ms_idx]
            candidate_score, _ = self._subset_err(
                psi, target, candidate_subset, squared_y
            )

            if candidate_score > best_score:
                best_score = candidate_score
                best_subset = candidate_subset
                best_added = ms_idx

        if best_score > current_score:
            best_by_size[len(best_subset)] = (best_score, tuple(best_subset))
            return best_subset, best_score, best_added

        return subset, current_score, None

run_mss_algorithm(psi, y, process_term_number)

Run OIF and return (err_vals, piv, psi_selected, target).

Parameters:

Name Type Description Default
psi ndarray

Candidate regressor matrix with shape (n_samples, n_terms).

required
y ndarray

Output signal aligned with psi.

required
process_term_number int

Maximum number of terms to select.

required

Returns:

Type Description
Tuple[ndarray, ndarray, ndarray, ndarray]

(err_vals, piv, psi_selected, target) matching the MSS API.

Source code in sysidentpy/model_structure_selection/orthogonal_floating_search.py
def run_mss_algorithm(
    self, psi: np.ndarray, y: np.ndarray, process_term_number: int
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """Run OIF and return (err_vals, piv, psi_selected, target).

    Parameters
    ----------
    psi : np.ndarray
        Candidate regressor matrix with shape (n_samples, n_terms).
    y : np.ndarray
        Output signal aligned with ``psi``.
    process_term_number : int
        Maximum number of terms to select.

    Returns
    -------
    Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]
        ``(err_vals, piv, psi_selected, target)`` matching the MSS API.
    """
    target = self._default_estimation_target(y)
    squared_y = self._compute_squared_y(target)
    subset = self._floating_forward_search(
        psi,
        target,
        process_term_number,
        squared_y,
        swap_callback=self._swap_step,
    )

    _, err_vals = self._subset_err(psi, target, subset, squared_y)
    piv = np.array(subset, dtype=int)
    psi_selected = psi[:, piv] if len(piv) else psi[:, :0]
    return err_vals, piv, psi_selected, target

OOS

Bases: _OrthogonalFloatingBase

Orthogonal Oscillating Search (paper Section 3.4).

This class is named OOS (Orthogonal Oscillating Search) to avoid the caret notation O^2S in code. It corresponds to the method described as O2S in the paper.

Source code in sysidentpy/model_structure_selection/orthogonal_floating_search.py
class OOS(_OrthogonalFloatingBase):
    """Orthogonal Oscillating Search (paper Section 3.4).

    This class is named ``OOS`` (Orthogonal Oscillating Search) to avoid
    the caret notation ``O^2S`` in code. It corresponds to the method
    described as O2S in the paper.
    """

    def __init__(
        self,
        *,
        ylag: Union[int, List[int]] = 2,
        xlag: Union[int, List[int]] = 2,
        elag: Union[int, List[int]] = 2,
        order_selection: bool = True,
        info_criteria: str = "aic",
        n_terms: Optional[int] = None,
        n_info_values: int = 15,
        estimator: Estimators = RecursiveLeastSquares(),
        basis_function: Union[Polynomial, Fourier] = Polynomial(),
        model_type: str = "NARMAX",
        eps: float = np.finfo(np.float64).eps,
        alpha: float = 0.0,
        err_tol: Optional[float] = None,
        apress_lambda: float = 1.0,
        max_search_depth: Optional[int] = None,
    ):
        self.max_search_depth = max_search_depth
        super().__init__(
            ylag=ylag,
            xlag=xlag,
            elag=elag,
            order_selection=order_selection,
            info_criteria=info_criteria,
            n_terms=n_terms,
            n_info_values=n_info_values,
            estimator=estimator,
            basis_function=basis_function,
            model_type=model_type,
            eps=eps,
            alpha=alpha,
            err_tol=err_tol,
            apress_lambda=apress_lambda,
        )
        self._validate_oos_params()

    def _validate_oos_params(self) -> None:
        if self.max_search_depth is None:
            return
        if not isinstance(self.max_search_depth, int) or self.max_search_depth < 1:
            msg = (
                "max_search_depth must be a positive int or None; "
                f"got {self.max_search_depth!r} "
                f"(type={type(self.max_search_depth).__name__})"
            )
            raise ValueError(msg)

    def _resolve_search_depth(self, process_term_number: int, total_terms: int) -> int:
        """Choose search depth per OFSA guideline (25% of the smaller side)."""
        if self.max_search_depth is not None:
            return self.max_search_depth

        smaller_side = max(
            1, min(process_term_number, max(0, total_terms - process_term_number))
        )
        depth = int(np.floor(0.25 * smaller_side))
        return max(1, depth)

    def _down_swing(
        self,
        psi: np.ndarray,
        target: np.ndarray,
        subset: List[int],
        current_score: float,
        depth: int,
        squared_y: float,
        all_indices: List[int],
        all_indices_set: set,
    ) -> Tuple[List[int], float, bool, bool]:
        """Perform a down swing (remove then add) returning updated state."""
        if len(subset) < depth:
            return subset, current_score, False, True

        working_subset = subset.copy()
        to_remove = self._select_least_significant_subset(
            psi, target, working_subset, depth, squared_y
        )

        if len(to_remove) != depth:
            return subset, current_score, False, True

        working_subset = [t for t in working_subset if t not in to_remove]
        available_set = all_indices_set.difference(working_subset)
        if len(available_set) < depth:
            return subset, current_score, False, True

        available_down = [idx for idx in all_indices if idx in available_set]
        ms_terms = self._select_most_significant_subset(
            psi,
            target,
            working_subset,
            available_down,
            depth,
            squared_y,
        )

        if len(ms_terms) != depth:
            return subset, current_score, False, True

        working_subset = working_subset + ms_terms
        down_score, _ = self._subset_err(psi, target, working_subset, squared_y)

        if down_score > current_score:
            return working_subset, down_score, True, False

        return subset, current_score, False, True

    def _up_swing(
        self,
        psi: np.ndarray,
        target: np.ndarray,
        subset: List[int],
        current_score: float,
        depth: int,
        squared_y: float,
        total_terms: int,
        all_indices: List[int],
        all_indices_set: set,
    ) -> Tuple[List[int], float, bool, bool]:
        """Perform an up swing (add then remove) returning updated state."""
        if len(subset) + depth > total_terms:
            return subset, current_score, False, True

        available_set = all_indices_set.difference(subset)
        if len(available_set) < depth:
            return subset, current_score, False, True

        available_up = [idx for idx in all_indices if idx in available_set]
        ms_terms_up = self._select_most_significant_subset(
            psi,
            target,
            subset,
            available_up,
            depth,
            squared_y,
        )

        if len(ms_terms_up) != depth:
            return subset, current_score, False, True

        working_subset = subset + ms_terms_up
        to_remove_up = self._select_least_significant_subset(
            psi, target, working_subset, depth, squared_y
        )

        if len(to_remove_up) != depth:
            return subset, current_score, False, True

        working_subset = [t for t in working_subset if t not in to_remove_up]
        up_score, _ = self._subset_err(psi, target, working_subset, squared_y)

        if up_score > current_score:
            return working_subset, up_score, True, False

        return subset, current_score, False, True

    def run_mss_algorithm(
        self, psi: np.ndarray, y: np.ndarray, process_term_number: int
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
        """Run oscillating search and return (err_vals, piv, psi_selected, target).

        Parameters
        ----------
        psi : np.ndarray
            Candidate regressor matrix with shape (n_samples, n_terms).
        y : np.ndarray
            Output signal aligned with ``psi``.
        process_term_number : int
            Maximum number of terms to select.

        Returns
        -------
        Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]
            ``(err_vals, piv, psi_selected, target)`` matching the MSS API.
        """
        target = self._default_estimation_target(y)
        squared_y = self._compute_squared_y(target)
        total_terms = psi.shape[1]
        all_indices = list(range(total_terms))
        all_indices_set = set(all_indices)

        # Resolve depth once we know xi and n, as suggested by the paper.
        max_depth = self._resolve_search_depth(process_term_number, total_terms)

        # Initial model: greedy inclusion of ``process_term_number`` terms.
        subset: List[int] = []
        available_set = all_indices_set.copy()
        for _ in range(min(process_term_number, total_terms)):
            available_sorted = [idx for idx in all_indices if idx in available_set]
            ms_idx, _ = self._best_addition(
                psi, target, subset, available_sorted, squared_y
            )
            if ms_idx is None:
                break
            subset.append(ms_idx)
            available_set.discard(ms_idx)

        current_score, _ = self._subset_err(psi, target, subset, squared_y)

        depth = 1
        failed_down = False
        failed_up = False

        while depth <= max_depth and len(subset) > 0:
            improvement = False

            subset, current_score, down_improved, failed_down = self._down_swing(
                psi,
                target,
                subset,
                current_score,
                depth,
                squared_y,
                all_indices,
                all_indices_set,
            )
            improvement = improvement or down_improved

            if failed_down and failed_up:
                depth += 1
                failed_down = False
                failed_up = False
                if depth > max_depth:
                    break

            subset, current_score, up_improved, failed_up = self._up_swing(
                psi,
                target,
                subset,
                current_score,
                depth,
                squared_y,
                total_terms,
                all_indices,
                all_indices_set,
            )
            improvement = improvement or up_improved

            if improvement:
                depth = 1
                failed_down = False
                failed_up = False
            elif failed_down and failed_up:
                depth += 1
                failed_down = False
                failed_up = False

        _, err_vals = self._subset_err(psi, target, subset, squared_y)
        piv = np.array(subset, dtype=int)
        psi_selected = psi[:, piv] if len(piv) else psi[:, :0]
        return err_vals, piv, psi_selected, target

run_mss_algorithm(psi, y, process_term_number)

Run oscillating search and return (err_vals, piv, psi_selected, target).

Parameters:

Name Type Description Default
psi ndarray

Candidate regressor matrix with shape (n_samples, n_terms).

required
y ndarray

Output signal aligned with psi.

required
process_term_number int

Maximum number of terms to select.

required

Returns:

Type Description
Tuple[ndarray, ndarray, ndarray, ndarray]

(err_vals, piv, psi_selected, target) matching the MSS API.

Source code in sysidentpy/model_structure_selection/orthogonal_floating_search.py
def run_mss_algorithm(
    self, psi: np.ndarray, y: np.ndarray, process_term_number: int
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """Run oscillating search and return (err_vals, piv, psi_selected, target).

    Parameters
    ----------
    psi : np.ndarray
        Candidate regressor matrix with shape (n_samples, n_terms).
    y : np.ndarray
        Output signal aligned with ``psi``.
    process_term_number : int
        Maximum number of terms to select.

    Returns
    -------
    Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]
        ``(err_vals, piv, psi_selected, target)`` matching the MSS API.
    """
    target = self._default_estimation_target(y)
    squared_y = self._compute_squared_y(target)
    total_terms = psi.shape[1]
    all_indices = list(range(total_terms))
    all_indices_set = set(all_indices)

    # Resolve depth once we know xi and n, as suggested by the paper.
    max_depth = self._resolve_search_depth(process_term_number, total_terms)

    # Initial model: greedy inclusion of ``process_term_number`` terms.
    subset: List[int] = []
    available_set = all_indices_set.copy()
    for _ in range(min(process_term_number, total_terms)):
        available_sorted = [idx for idx in all_indices if idx in available_set]
        ms_idx, _ = self._best_addition(
            psi, target, subset, available_sorted, squared_y
        )
        if ms_idx is None:
            break
        subset.append(ms_idx)
        available_set.discard(ms_idx)

    current_score, _ = self._subset_err(psi, target, subset, squared_y)

    depth = 1
    failed_down = False
    failed_up = False

    while depth <= max_depth and len(subset) > 0:
        improvement = False

        subset, current_score, down_improved, failed_down = self._down_swing(
            psi,
            target,
            subset,
            current_score,
            depth,
            squared_y,
            all_indices,
            all_indices_set,
        )
        improvement = improvement or down_improved

        if failed_down and failed_up:
            depth += 1
            failed_down = False
            failed_up = False
            if depth > max_depth:
                break

        subset, current_score, up_improved, failed_up = self._up_swing(
            psi,
            target,
            subset,
            current_score,
            depth,
            squared_y,
            total_terms,
            all_indices,
            all_indices_set,
        )
        improvement = improvement or up_improved

        if improvement:
            depth = 1
            failed_down = False
            failed_up = False
        elif failed_down and failed_up:
            depth += 1
            failed_down = False
            failed_up = False

    _, err_vals = self._subset_err(psi, target, subset, squared_y)
    piv = np.array(subset, dtype=int)
    psi_selected = psi[:, piv] if len(piv) else psi[:, :0]
    return err_vals, piv, psi_selected, target

OSF

Bases: _OrthogonalFloatingBase

Orthogonal Sequential Floating search (Section 3.2 of the OFSA paper).

Iteratively adds regressors using ERR and backtracks to drop weak terms. Keeps the same constructor surface as other SysIdentPy MSS classes.

Source code in sysidentpy/model_structure_selection/orthogonal_floating_search.py
class OSF(_OrthogonalFloatingBase):
    """Orthogonal Sequential Floating search (Section 3.2 of the OFSA paper).

    Iteratively adds regressors using ERR and backtracks to drop weak terms.
    Keeps the same constructor surface as other SysIdentPy MSS classes.
    """

    def run_mss_algorithm(
        self, psi: np.ndarray, y: np.ndarray, process_term_number: int
    ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
        """Run OSF and return (err_vals, piv, psi_selected, target).

        Parameters
        ----------
        psi : np.ndarray
            Candidate regressor matrix with shape (n_samples, n_terms).
        y : np.ndarray
            Output signal aligned with ``psi``.
        process_term_number : int
            Maximum number of terms to select.

        Returns
        -------
        Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]
            ``(err_vals, piv, psi_selected, target)`` matching the MSS API.
        """
        target = self._default_estimation_target(y)
        squared_y = self._compute_squared_y(target)
        subset = self._floating_forward_search(
            psi, target, process_term_number, squared_y
        )

        _, err_vals = self._subset_err(psi, target, subset, squared_y)
        piv = np.array(subset, dtype=int)
        psi_selected = psi[:, piv] if len(piv) else psi[:, :0]
        return err_vals, piv, psi_selected, target

run_mss_algorithm(psi, y, process_term_number)

Run OSF and return (err_vals, piv, psi_selected, target).

Parameters:

Name Type Description Default
psi ndarray

Candidate regressor matrix with shape (n_samples, n_terms).

required
y ndarray

Output signal aligned with psi.

required
process_term_number int

Maximum number of terms to select.

required

Returns:

Type Description
Tuple[ndarray, ndarray, ndarray, ndarray]

(err_vals, piv, psi_selected, target) matching the MSS API.

Source code in sysidentpy/model_structure_selection/orthogonal_floating_search.py
def run_mss_algorithm(
    self, psi: np.ndarray, y: np.ndarray, process_term_number: int
) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]:
    """Run OSF and return (err_vals, piv, psi_selected, target).

    Parameters
    ----------
    psi : np.ndarray
        Candidate regressor matrix with shape (n_samples, n_terms).
    y : np.ndarray
        Output signal aligned with ``psi``.
    process_term_number : int
        Maximum number of terms to select.

    Returns
    -------
    Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]
        ``(err_vals, piv, psi_selected, target)`` matching the MSS API.
    """
    target = self._default_estimation_target(y)
    squared_y = self._compute_squared_y(target)
    subset = self._floating_forward_search(
        psi, target, process_term_number, squared_y
    )

    _, err_vals = self._subset_err(psi, target, subset, squared_y)
    piv = np.array(subset, dtype=int)
    psi_selected = psi[:, piv] if len(piv) else psi[:, :0]
    return err_vals, piv, psi_selected, target