Source code for robustipy.models

"""robustipy.models

This module implements multivariate regression classes for Robust Inference.
It includes classes for OLS (OLSRobust and OLSResult) and logistic regression (LRobust)
analysis, along with utilities for model merging, plotting, and Bayesian model averaging.
"""

from __future__ import annotations
import _pickle
import os
import warnings
from typing import Any, Optional, Sequence, List, Tuple, Union
import numpy as np
import pandas as pd
import sys
import sklearn
from joblib import Parallel, delayed
from rich.progress import track
from scipy.stats import norm, t as student_t
from sklearn.metrics import log_loss, root_mean_squared_error
from sklearn.model_selection import GroupKFold, KFold, StratifiedKFold, train_test_split
from statsmodels.tools.tools import add_constant

from robustipy.prototypes import Protoresult, BaseRobust
from robustipy.utils import (
    all_subsets,
    group_demean,
    logistic_regression_sm,
    simple_ols,
    space_size,
    mcfadden_r2,
    pseudo_r2,
    calculate_imv_score,
    sample_z_masks,
    _ensure_single_constant,
    _find_group_invariant_columns,
    rescale,
    make_inquiry,
    stripped_ols
)


def _progress_iter(iterable, *, total: int, description: str = "Working..."):
    """
    Return a progress-enabled iterator in terminal sessions and a plain iterator
    in notebooks / non-interactive outputs.
    """
    in_jupyter = _in_jupyter()
    if in_jupyter:
        # Notebook progress is handled with draw-level tqdm bars.
        return iterable
    has_tty_output = sys.stdout.isatty() or sys.stderr.isatty()
    return track(
        iterable,
        total=total,
        description=description,
        disable=not (has_tty_output or in_jupyter),
    )


def _in_jupyter() -> bool:
    """True when running inside a Jupyter kernel."""
    try:
        from IPython import get_ipython
        ip = get_ipython()
        if ip is None:
            return bool(os.environ.get("JPY_PARENT_PID"))
        shell_name = ip.__class__.__name__
        if shell_name == "ZMQInteractiveShell":
            return True
        if hasattr(ip, "kernel"):
            return True
        cfg = getattr(ip, "config", {})
        if isinstance(cfg, dict):
            if "IPKernelApp" in cfg:
                return True
        else:
            if "IPKernelApp" in str(cfg):
                return True
        return bool(os.environ.get("JPY_PARENT_PID"))
    except (ImportError, AttributeError):
        return bool(os.environ.get("JPY_PARENT_PID"))


def _make_notebook_draws_bar(*, total: int, description: str):
    """
    Build a tqdm progress bar for notebooks. Returns None outside notebooks or
    when tqdm isn't available.
    """
    if not _in_jupyter():
        return None
    try:
        from tqdm.auto import tqdm
    except ImportError:
        return None
    return tqdm(
        total=total,
        desc=description,
        leave=True,
        miniters=1,
        mininterval=1.0,
        smoothing=0,
        unit="draw",
        unit_scale=False,
        bar_format=(
            "{desc}: {percentage:5.1f}%|{bar}| "
            "{n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]"
        ),
    )


def _run_parallel_seed_batches(
    *,
    seeds,
    n_cpu: int,
    run_one_seed,
    draws_bar=None,
    dispatch_mode: str = "streaming",
    task_batch_size: Optional[int] = None,
):
    """
    Execute bootstrap seeds while preserving seed order and allowing notebook
    progress to advance during a specification.
    """
    outputs = []
    if len(seeds) == 0:
        return outputs

    cpu = max(1, int(n_cpu))
    progress_batch_size = max(8, cpu)
    if dispatch_mode not in {"streaming", "batched", "chunked"}:
        raise ValueError("dispatch_mode must be 'streaming', 'batched', or 'chunked'.")
    parallel_kwargs = {
        "n_jobs": cpu,
        # Keep process parallelism for CPU-bound bootstrap fits, but disable
        # joblib's automatic memmapping so large profiler runs do not accumulate
        # thousands of temporary memmap folders.
        "backend": "loky",
        "max_nbytes": None,
        "mmap_mode": None,
    }

    def _update_draws_bar(n: int) -> None:
        if draws_bar is None or n <= 0:
            return
        draws_bar.update(n)

    if cpu == 1:
        pending_progress = 0
        for seed in seeds:
            outputs.append(run_one_seed(int(seed)))
            pending_progress += 1
            if pending_progress >= progress_batch_size:
                _update_draws_bar(pending_progress)
                pending_progress = 0
        _update_draws_bar(pending_progress)
        return outputs

    if dispatch_mode == "batched":
        batch_size = max(8, cpu)
        for start in range(0, len(seeds), batch_size):
            seed_batch = seeds[start:start + batch_size]
            batch_output = Parallel(**parallel_kwargs)(
                delayed(run_one_seed)(int(seed))
                for seed in seed_batch
            )
            outputs.extend(batch_output)
            _update_draws_bar(len(batch_output))
        return outputs

    if dispatch_mode == "chunked":
        if task_batch_size is None:
            task_batch_size = max(32, int(np.ceil(len(seeds) / (cpu * 4))))
        task_batch_size = max(1, int(task_batch_size))

        def _run_seed_chunk(seed_chunk):
            return [run_one_seed(int(seed)) for seed in seed_chunk]

        seed_chunks = [
            seeds[start:start + task_batch_size]
            for start in range(0, len(seeds), task_batch_size)
        ]
        parallel_output = Parallel(**parallel_kwargs)(
            delayed(_run_seed_chunk)(seed_chunk)
            for seed_chunk in seed_chunks
        )
        for batch_output in parallel_output:
            outputs.extend(batch_output)
            _update_draws_bar(len(batch_output))
        return outputs

    parallel_output = Parallel(**parallel_kwargs, return_as="generator")(
        delayed(run_one_seed)(int(seed))
        for seed in seeds
    )
    pending_progress = 0
    for output in parallel_output:
        outputs.append(output)
        pending_progress += 1
        if pending_progress >= progress_batch_size:
            _update_draws_bar(pending_progress)
            pending_progress = 0
    _update_draws_bar(pending_progress)
    return outputs


def _spec_bitmask(spec: Sequence[str], control_positions: dict[str, int]) -> int:
    """
    Encode a control specification into an integer bitmask.

    The i-th control variable in `control_positions` maps to bit i.
    """
    mask = 0
    for name in spec:
        pos = control_positions.get(name)
        if pos is not None:
            mask |= (1 << pos)
    return mask


def _make_group_bootstrap_lookup(
    temp_data: pd.DataFrame,
    group: str
) -> tuple[np.ndarray, dict[Any, np.ndarray]]:
    """Precompute the group order and row positions used by grouped bootstrap."""
    unique_groups = temp_data[group].unique()
    if unique_groups.size == 0:
        raise ValueError(f"No groups available in column '{group}'.")

    group_positions = {
        g: np.asarray(indexes, dtype=np.intp)
        for g, indexes in temp_data.groupby(group, sort=False, observed=True).indices.items()
    }
    return unique_groups, group_positions


def _prepare_ols_bootstrap_data(
    comb_var: pd.DataFrame,
    y_star: np.ndarray
) -> pd.DataFrame:
    """Build the OLS bootstrap DataFrame once per spec instead of once per draw."""
    temp_data = comb_var.copy()
    temp_data['y_star'] = y_star
    return temp_data


def _prepare_non_group_ols_bootstrap_arrays(
    temp_data: pd.DataFrame
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Precompute arrays for the non-grouped OLS bootstrap path."""
    y_values = temp_data.iloc[:, [0]].to_numpy()
    y_star_values = temp_data.iloc[:, [-1]].to_numpy()
    x_df = temp_data.drop(columns=[temp_data.columns[0], 'y_star']).copy()
    x_df['const'] = 1
    x_values = x_df.to_numpy()
    return y_values, y_star_values, x_values


def _stripped_ols_arrays(y: np.ndarray, x: np.ndarray) -> dict:
    """Array equivalent of stripped_ols() for pre-sampled numeric OLS data."""
    if x.size == 0 or y.size == 0:
        raise ValueError("Inputs must not be empty.")
    with np.errstate(divide='ignore', invalid='ignore'):
        try:
            inv_xx = np.linalg.inv(np.dot(x.T, x))
        except np.linalg.LinAlgError:
            inv_xx = np.linalg.pinv(np.dot(x.T, x))
        xy = np.dot(x.T, y)
        b = np.dot(inv_xx, xy)
        nobs = y.shape[0]
        ncoef = x.shape[1]
        df_e = nobs - ncoef
        e = y - np.dot(x, b)
        sse = np.dot(e.T, e) / df_e
        se = np.sqrt(np.diagonal(sse * inv_xx))
        se_col = se.reshape(-1, 1)
        t_values = b / se_col
        t_values = np.where(se_col == 0, np.nan, t_values)
        p = np.where(
            np.isfinite(t_values),
            (1 - student_t.cdf(abs(t_values), df_e)) * 2,
            np.nan
        )
        r2 = 1 - e.var() / y.var()
        r2_adj = 1 - (1 - r2) * ((nobs - 1) / (nobs - ncoef))
    return {'b': b, 'p': p, 'r2': r2_adj}


def _strap_non_group_OLS_arrays(
    y_values: np.ndarray,
    y_star_values: np.ndarray,
    x_values: np.ndarray,
    sample_size: int,
    seed: int,
) -> tuple:
    """Run one non-grouped OLS bootstrap draw from precomputed arrays."""
    n_rows = y_values.shape[0]
    row_positions = np.random.RandomState(seed).choice(
        np.arange(n_rows),
        size=sample_size,
        replace=True,
    )
    y = np.asfortranarray(y_values[row_positions])
    y_star = np.asfortranarray(y_star_values[row_positions])
    x = np.asfortranarray(x_values[row_positions])

    output = _stripped_ols_arrays(y=y, x=x)
    output_ystar = _stripped_ols_arrays(y=y_star, x=x)

    return (
        output['b'][0][0],
        output['p'][0][0],
        output['r2'],
        output_ystar['b'][0][0],
        output_ystar['p'][0][0],
    )


def _cluster_bootstrap_by_rows(
    temp_data: pd.DataFrame,
    group: str,
    seed: int,
    target_rows: int,
    group_lookup: Optional[tuple[np.ndarray, dict[Any, np.ndarray]]] = None,
) -> pd.DataFrame:
    """
    Draw a cluster bootstrap sample by repeatedly sampling groups (with replacement)
    until at least `target_rows` observations are collected.

    Notes
    -----
    - Preserves cluster integrity (whole groups are appended each draw).
    - Output may exceed `target_rows` due to whole-cluster appends.
    """
    if group_lookup is None:
        group_lookup = _make_group_bootstrap_lookup(temp_data, group)
    unique_groups, group_positions = group_lookup

    target_rows = max(1, int(target_rows))
    rng = np.random.default_rng(seed)

    sampled_positions = []
    n_rows = 0
    while n_rows < target_rows:
        g = rng.choice(unique_groups)
        positions = group_positions[g]
        sampled_positions.append(positions)
        n_rows += positions.size

    row_positions = np.concatenate(sampled_positions)
    return temp_data.iloc[row_positions].reset_index(drop=True)


def _cluster_bootstrap_singleton_safe(
    temp_data: pd.DataFrame,
    group: str,
    seed: int,
    target_rows: int,
    min_rows_after_filter: int,
    max_attempts: int = 8,
    group_lookup: Optional[tuple[np.ndarray, dict[Any, np.ndarray]]] = None,
) -> tuple[pd.DataFrame, bool, int, int, int]:
    """
    Draw grouped bootstrap samples and remove singleton groups when possible.

    If singleton filtering repeatedly yields too few rows, fall back to the
    unfiltered grouped bootstrap sample to avoid hard crashes.

    Returns
    -------
    sample_df : pd.DataFrame
        Chosen sample (filtered or fallback unfiltered).
    used_filtered : bool
        True when the returned sample passed singleton filtering.
    filtered_rows : int
        Number of rows in the last singleton-filtered sample.
    selected_rows : int
        Number of rows in the corresponding unfiltered sample.
    attempts_used : int
        Number of attempts performed.
    """
    min_rows_after_filter = max(1, int(min_rows_after_filter))
    max_attempts = max(1, int(max_attempts))

    last_selected = None
    last_filtered = None

    for attempt in range(max_attempts):
        attempt_seed = int(seed) + attempt
        selected = _cluster_bootstrap_by_rows(
            temp_data=temp_data,
            group=group,
            seed=attempt_seed,
            target_rows=target_rows,
            group_lookup=group_lookup,
        )
        group_sizes = selected.groupby(group)[group].transform("size")
        filtered = selected.loc[group_sizes > 1].copy()

        last_selected = selected
        last_filtered = filtered

        if len(filtered) >= min_rows_after_filter:
            return filtered, True, len(filtered), len(selected), attempt + 1

    # Singleton filtering remained too aggressive; use clustered sample as fallback.
    assert last_selected is not None
    filtered_rows = 0 if last_filtered is None else len(last_filtered)
    return last_selected.copy(), False, filtered_rows, len(last_selected), max_attempts


[docs] def stouffer_method( p_values, *, two_sided=True, betas=None, # if missing with two_sided=True, use unsigned z's p_values_ystar=None, # null/bootstrap p-values, shape (n_specs, n_draws) betas_ystar=None, # null/bootstrap betas, shape (n_specs, n_draws) weights=None, # optional Stouffer weights (per spec) clip_floor=1e-300, # must be >= np.finfo(float).tiny na_action="omit", warn: bool = True ): """ Combine p-values via a Stouffer test aligned with OLSResult._compute_inference. Uses observed full-sample p-values and coefficients to compute ``Z_obs``. If null draws (``p_values_ystar`` and ``betas_ystar``) are supplied, the p-value is calibrated by two-sided Monte Carlo: ``p = (1 + sum(abs(Z_null) >= abs(Z_obs))) / (B + 1)``. Dependence is estimated from null z-vectors (PSD projection + ridge). If null draws are unavailable, the method falls back to an asymptotic two-sided p-value based on ``Z_obs``. Returns ------- tuple[float, float] (Z_obs, p_value). For two_sided=True this is a two-sided p-value. """ import numpy as np from scipy.stats import norm # ---- 0) observed inputs p_obs_all = np.asarray(p_values, dtype=np.float64).reshape(-1) n_specs = p_obs_all.size if n_specs == 0: raise ValueError("`p_values` must contain at least one element.") if betas is None: if warn and two_sided: print("WARNING: `betas` not provided; using unsigned z-scores for observed statistics.") b_obs_all = np.ones(n_specs, dtype=np.float64) else: b_obs_all = np.asarray(betas, dtype=np.float64).reshape(-1) if b_obs_all.size != n_specs: raise ValueError("`betas` length must match `p_values` length.") # ---- 1) validate observed p and optionally omit invalid obs_valid = np.isfinite(p_obs_all) & (p_obs_all >= 0.0) & (p_obs_all <= 1.0) & np.isfinite(b_obs_all) if not obs_valid.all() and na_action == "raise": bad = np.where(~obs_valid)[0].tolist() raise ValueError(f"Invalid observed inputs at indices {bad}.") # ---- 2) prepare null matrices (optional) and keep mask from null variability p_matrix = None beta_matrix = None keep_null = None z_corr = None if (p_values_ystar is not None) and (betas_ystar is not None): p_matrix = np.asarray(p_values_ystar, dtype=np.float64) beta_matrix = np.asarray(betas_ystar, dtype=np.float64) if p_matrix.shape != beta_matrix.shape: raise ValueError("`p_values_ystar` and `betas_ystar` must have identical shape.") if p_matrix.ndim != 2: raise ValueError("`p_values_ystar` and `betas_ystar` must be 2D (n_specs, n_draws).") # Accept transposed input if obvious. if p_matrix.shape[0] != n_specs and p_matrix.shape[1] == n_specs: p_matrix = p_matrix.T beta_matrix = beta_matrix.T if p_matrix.shape[0] != n_specs: raise ValueError("Null matrices must have n_specs rows matching observed p-values.") if p_matrix.shape[1] > 1: floor_min = np.finfo(float).tiny clip_floor_eff = max(float(clip_floor), floor_min) if (clip_floor is not None and np.isfinite(clip_floor) and clip_floor > 0.0) else floor_min clip_ceiling = 1.0 - 1e-16 p_clip_null = np.clip(p_matrix, clip_floor_eff, clip_ceiling) if two_sided: z_matrix = np.sign(beta_matrix) * norm.isf(p_clip_null / 2.0) else: z_matrix = norm.isf(p_clip_null) # draws x specs Z_null_all = z_matrix.T z_std = np.std(Z_null_all, axis=0, ddof=1) keep_null = np.isfinite(z_std) & (z_std > 1e-14) if np.sum(keep_null) >= 2: Zk = Z_null_all[:, keep_null] R_hat = np.corrcoef(Zk, rowvar=False) evals, evecs = np.linalg.eigh(R_hat) evals[evals < 0.0] = 0.0 R_psd = (evecs * evals) @ evecs.T R_psd = 0.5 * (R_psd + R_psd.T) np.fill_diagonal(R_psd, 1.0) z_corr = R_psd + 1e-12 * np.eye(R_psd.shape[0], dtype=R_psd.dtype) # ---- 3) final keep mask if keep_null is None: keep = obs_valid.copy() else: keep = obs_valid & keep_null if np.sum(keep) == 0: if na_action == "raise": raise ValueError("No valid tests available after applying observed/null filtering.") return np.nan, np.nan # ---- 4) prepare observed z floor_min = np.finfo(float).tiny if clip_floor is None or not np.isfinite(clip_floor) or clip_floor <= 0.0: clip_floor_eff = floor_min else: clip_floor_eff = max(float(clip_floor), floor_min) clip_ceiling = 1.0 - 1e-16 p_kept = p_obs_all[keep] b_kept = b_obs_all[keep] n_zeros = int(np.sum(p_kept == 0.0)) n_ones = int(np.sum(p_kept == 1.0)) if warn and (n_zeros or n_ones): print(f"WARNING: clipping {n_zeros} p=0 and {n_ones} p=1 to " f"[{clip_floor_eff:g}, {clip_ceiling:g}].") p_kept = np.clip(p_kept, clip_floor_eff, clip_ceiling) if two_sided: z_obs = np.sign(b_kept) * norm.isf(p_kept / 2.0) else: z_obs = norm.isf(p_kept) # Guard against residual inf/-inf due to extreme p if not np.all(np.isfinite(z_obs)): Z_MAX = 37.0 if warn: print("WARNING: non-finite z encountered; clipping z to ±37.") z_obs = np.clip(z_obs, -Z_MAX, Z_MAX) # ---- 5) weights (aligned to kept specs) if weights is None: w = np.ones(int(np.sum(keep)), dtype=np.float64) else: w_all = np.asarray(weights, dtype=np.float64).reshape(-1) if w_all.size != n_specs: raise ValueError("`weights` must have same length as observed p-values.") if not np.all(np.isfinite(w_all)): raise ValueError("`weights` must be finite.") w = w_all[keep] if not np.any(w != 0.0): raise ValueError("At least one kept weight must be nonzero.") # ---- 6) denominator (mirrors _compute_inference behavior) denom = None if (z_corr is not None) and (keep_null is not None) and np.array_equal(keep, keep_null): denom2 = float(w @ (z_corr @ w)) if np.isfinite(denom2) and denom2 > 0.0: denom = float(np.sqrt(denom2)) if denom is None: denom = float(np.sqrt(float(np.dot(w, w)))) Z_obs = float(np.dot(w, z_obs) / denom) # ---- 7) p-value: null-calibrated if null draws exist; else asymptotic fallback has_null = ( (p_matrix is not None) and (beta_matrix is not None) and (p_matrix.size > 0) and (p_matrix.shape[1] >= 1) and (not np.isnan(p_matrix).all()) ) if not has_null: if two_sided: p_val = float(2.0 * norm.sf(abs(Z_obs))) else: p_val = float(norm.sf(Z_obs)) return Z_obs, p_val # Null z matrix aligned to kept specs p_clip_null = np.clip(p_matrix, clip_floor_eff, clip_ceiling) if two_sided: z_null_matrix = np.sign(beta_matrix) * norm.isf(p_clip_null / 2.0) else: z_null_matrix = norm.isf(p_clip_null) Z_null_specs = z_null_matrix.T[:, keep] # draws x kept_specs Z_null = (Z_null_specs @ w) / denom B = int(Z_null.shape[0]) p_val = float((1.0 + np.sum(np.abs(Z_null) >= abs(Z_obs))) / (B + 1.0)) return Z_obs, p_val
[docs] class MergedResult(Protoresult): """ Combine and summarize results exclusively from one or more OLSResult runs. Parameters ---------- y : str Dependent variable name shared by all merged results. specs : Sequence[Sequence[str]] List of specifications; each inner sequence names the controls defining one spec. estimates : array-like or pandas.DataFrame Coefficient estimates for each spec (rows) and bootstrap draw (columns). p_values : array-like or pandas.DataFrame Corresponding p-values for each estimate. r2_values : array-like or pandas.DataFrame R² values for each spec and draw. Attributes ---------- y_name : str Name of the dependent variable. specs_names : pandas.Series[frozenset] Each entry is the set of control variables defining a spec. estimates : pandas.DataFrame Coefficient estimates by spec and draw. p_values : pandas.DataFrame P-values by spec and draw. r2_values : pandas.DataFrame R² values by spec and draw. summary_df : pandas.DataFrame Per-spec summary with median, min, max, and 95% confidence intervals. """ def __init__( self, *, y: str, specs: Sequence[Sequence[str]], estimates: Union[np.ndarray, pd.DataFrame], p_values: Union[np.ndarray, pd.DataFrame], r2_values: Union[np.ndarray, pd.DataFrame], ) -> None: super().__init__() self.y_name = y self.specs_names = pd.Series(specs) self.estimates = pd.DataFrame(estimates) self.p_values = pd.DataFrame(p_values) self.r2_values = pd.DataFrame(r2_values) self.summary_df = self._compute_summary() self.summary_df['spec_name'] = self.specs_names def _compute_summary(self) -> pd.DataFrame: """ Computes summary statistics based on coefficient estimates. Returns: pd.DataFrame: DataFrame containing median, min, max, and quantiles. """ data = self.estimates.copy() out = pd.DataFrame() out['median'] = data.median(axis=1) out['max'] = data.max(axis=1) out['min'] = data.min(axis=1) out['ci_up'] = data.quantile(q=0.975, axis=1, interpolation='nearest') out['ci_down'] = data.quantile(q=0.025, axis=1, interpolation='nearest') return out
[docs] def plot( self, loess: bool = True, ci: float = 1, specs: Optional[List[List[str]]] = None, colormap: str = 'viridis', figsize: Tuple[int, int] = (16, 14), ext: str = 'pdf', figpath: str = None, highlights: bool = True, project_name: str = None, oddsratio: bool = False, ) -> plt.Figure: """ Plot specification results highlighting up to three specs. Parameters ---------- loess : bool Whether to apply LOESS smoothing to confidence intervals. specs : list of list of str, optional Specifications to highlight. colormap : str Matplotlib colormap name. figsize : tuple Figure size (width, height). ext : str File extension for saving. figpath : str or Path, optional Directory in which to save outputs; if None, uses current working dir. project_name : str Prefix for saved figure. bighlights: bool Whether to highlight specs oddsratio bool, default=False Whether to exponentiate the coefficients (e.g. for odds ratios). Returns ------- matplotlib.figure.Figure: Plot showing the regression results. """ if specs is not None and len(specs) == 0: specs = None if specs is not None: if not all(isinstance(spec, list) for spec in specs): raise TypeError("'specs' must be a list of lists.") if len(specs) > 3: raise ValueError("The max number of specifications to highlight is 3") if not all(frozenset(spec) in self.specs_names.to_list() for spec in specs): raise TypeError("All specifications in 'specs' must be in the valid computed specifications.") warnings.warn( "MergedResult plotting is not supported yet. " "Plot each constituent OLSResult separately.", UserWarning ) return None
[docs] def summary(self, digits: int = 3) -> None: """ Print a compact summary for merged OLS-style results. """ print("==============================") print("Merged Model Summary") print("==============================") print(f"Dependent variable: {self.y_name}") print(f"Number of specifications: {len(self.specs_names)}") print("==============================") print("Coefficient Distribution") print("==============================") est = self.estimates print(f"Median β: {est.stack().median():.{digits}f}") print(f"Min β: {est.min().min():.{digits}f}") print(f"Max β: {est.max().max():.{digits}f}") if hasattr(self, "r2_values") and isinstance(self.r2_values, pd.DataFrame): print(f"Median R²: {self.r2_values.stack().median():.{digits}f}") print("==============================")
[docs] def merge( self, result_obj: "OLSResult", left_prefix: str, right_prefix: str, ) -> "MergedResult": """ Merge the current OLSResult object with another, tagging each specification with a prefix to indicate origin. Parameters ---------- result_obj : OLSResult Another OLSResult object to merge. left_prefix : str Prefix to tag the specifications from the current object. right_prefix : str Prefix to tag the specifications from the result_obj object. Returns ------- MergedResult A new MergedResult object containing combined estimates and metadata. Raises ------ TypeError If result_obj is not an instance of OLSResult or prefixes are not strings. ValueError If the dependent variable names do not match between the two objects. """ if not isinstance(result_obj, OLSResult): raise TypeError("'result_obj' must be an instance of OLSResult.") if not isinstance(left_prefix, str) or not isinstance(right_prefix, str): raise TypeError("'prefixes' must be of type 'str'.") if self.y_name != result_obj.y_name: raise ValueError('Dependent variable names must match.') specs_original = [frozenset(list(s) + [left_prefix]) for s in self.specs_names] specs_new = [frozenset(list(s) + [right_prefix]) for s in result_obj.specs_names] y = self.y_name specs = specs_original + specs_new estimates = pd.concat([self.estimates, result_obj.estimates], ignore_index=True) p_values = pd.concat([self.p_values, result_obj.p_values], ignore_index=True) r2_values = pd.concat([self.r2_values, result_obj.r2_values], ignore_index=True) return MergedResult( y=y, specs=specs, estimates=estimates, p_values=p_values, r2_values=r2_values )
[docs] class OLSResult(Protoresult): """ Encapsulates the results of an OLSRobust run Attributes ---------- y_name : str Dependent variable name. x_name : str Main predictor name. data : pd.DataFrame Original DataFrame used for all fits. specs_names : pd.Series[frozenset[str]] Specification sets (which controls are included, etc.). all_predictors : list[list[str]] List of predictor+control sets for each specification. controls : list[str] Pool of all control variables considered. draws : int Number of bootstrap draws. kfold : int Number of folds for out-of-sample evaluation. estimates : pd.DataFrame Shape (n_specs, draws), bootstrap estimates of β₁. p_values : pd.DataFrame Same shape, bootstrap p-values for β₁. estimates_ystar : pd.DataFrame Bootstrap estimates under the null (for joint inference). p_values_ystar : pd.DataFrame Bootstrap p-values under the null. r2_values : pd.DataFrame Shape (n_specs, draws), bootstrapped R². summary_df : pd.DataFrame Per-spec summary (median, CI, info criteria, cross-val metric). inference : dict[str, Any] Aggregated inference statistics (proportions, Stouffer’s Z, etc.). shap_return : tuple[np.ndarray, pd.DataFrame] | None Optional SHAP values and the matrix they came from. """ @staticmethod def _empirical_p_two_sided(null_samples: np.ndarray, observed: float) -> float: t = np.asarray(null_samples, dtype=float) t = t[np.isfinite(t)] if t.size == 0: return np.nan n = t.size p_hi = (np.sum(t >= observed) + 1) / (n + 1) p_lo = (np.sum(t <= observed) + 1) / (n + 1) return float(min(1.0, 2.0 * min(p_hi, p_lo))) def __init__( self, *, y: str, x: str, data: pd.DataFrame, specs: list[frozenset[str]], all_predictors: list[list[str]], controls: list[str], draws: int, kfold: int, estimates: np.ndarray | pd.DataFrame, estimates_ystar: np.ndarray | pd.DataFrame, all_b: list[np.ndarray], all_p: list[np.ndarray], p_values: np.ndarray | pd.DataFrame, p_values_ystar: np.ndarray | pd.DataFrame, r2_values: np.ndarray | pd.DataFrame, r2i_array: list[float], ll_array: list[float], aic_array: list[float], bic_array: list[float], hqic_array: list[float], ll_raw_array: Optional[list[float]] = None, ll_null_array: Optional[list[float]] = None, ll_gain_array: Optional[list[float]] = None, ll_gain_per_obs_array: Optional[list[float]] = None, nobs_array: Optional[list[int]] = None, av_k_metric_array: list[float] | None = None, model_name: str, name_av_k_metric: str | None = None, shap_return: Any = None, ) -> None: """ Initialize an OLSResult container. Parameters ---------- y Dependent variable name. x Main predictor name. data Original DataFrame for all model fits. specs A list of frozensets indicating which controls each spec includes. all_predictors For each spec, the full list of predictors (x + controls). controls Pool of all candidate controls. draws Number of bootstrap draws. kfold Number of folds for cross-validation. estimates Bootstrap coefficient estimates (shape: n_specs × draws). estimates_ystar Bootstrap estimates under null (same shape). all_b Raw (non-resampled) β₁, one per spec. all_p Raw (non-resampled) p-values, one per spec. p_values Bootstrap p-values for β₁. p_values_ystar Bootstrap p-values under null. r2_values Bootstrapped R² (n_specs × draws). r2i_array In-sample R² for each spec. ll_array Primary likelihood metric for each spec. For OLS this is the null-relative log-likelihood gain per observation; for logistic it is the raw full-model log-likelihood. aic_array, bic_array, hqic_array Information criteria per spec. ll_raw_array, ll_null_array, ll_gain_array, ll_gain_per_obs_array Optional detailed OLS likelihood diagnostics. nobs_array Optional number of observations used in each spec. av_k_metric_array Out-of-sample metric (e.g. CV R²) per spec. model_name Human-readable name of the model (“OLS Robust”). name_av_k_metric Label for the CV metric. shap_return Optional return of (shap_values, input_matrix). """ super().__init__() self.y_name = y self.x_name = x self.data = data self.specs_names = pd.Series(specs) self.all_predictors = all_predictors self.controls = controls self.draws = draws self.kfold = kfold self.estimates = pd.DataFrame(estimates) self.p_values = pd.DataFrame(p_values) self.estimates_ystar = pd.DataFrame(estimates_ystar) self.p_values_ystar = pd.DataFrame(p_values_ystar) self.r2_values = pd.DataFrame(r2_values) self.all_b = all_b self.all_p = all_p self.summary_df = self._compute_summary() self.summary_df['r2'] = pd.Series(r2i_array) self.summary_df['ll'] = pd.Series(ll_array) self.summary_df['aic'] = pd.Series(aic_array) self.summary_df['bic'] = pd.Series(bic_array) self.summary_df['hqic'] = pd.Series(hqic_array) if ll_raw_array is not None: self.summary_df['ll_raw'] = pd.Series(ll_raw_array) if ll_null_array is not None: self.summary_df['ll_null'] = pd.Series(ll_null_array) if ll_gain_array is not None: self.summary_df['ll_gain'] = pd.Series(ll_gain_array) if ll_gain_per_obs_array is not None: self.summary_df['ll_gain_per_obs'] = pd.Series(ll_gain_per_obs_array) if nobs_array is not None: self.summary_df['nobs'] = pd.Series(nobs_array) self._compute_inference() self.summary_df['av_k_metric'] = pd.Series(av_k_metric_array) self.summary_df['spec_name'] = self.specs_names self.summary_df['y'] = self.y_name self.model_name = model_name self.name_av_k_metric = name_av_k_metric self.shap_return = shap_return
[docs] def save(self, filename: str) -> None: """ Pickle this OLSResult to disk. Parameters ---------- filename Destination path for the pickle file. """ with open(filename, 'wb') as f: _pickle.dump(self, f, -1)
[docs] @classmethod def load(cls, filename: str) -> OLSResult: """ Loads an OLSResult object from a pickle file. Parameters ---------- filename Path to the pickle file. Returns ------- OLSResult """ with open(filename, 'rb') as f: return _pickle.load(f)
def _compute_inference(self) -> None: """ Compute summary statistics of the bootstrap coefficient distribution. For each model specification, returns the median, minimum, maximum, and the 2.5%/97.5% quantile bounds of the estimated coefficient across all bootstrap draws. Returns ------- summary_df : pandas.DataFrame A DataFrame indexed by specification, with columns: - median : float Median coefficient across bootstrap draws. - min : float Minimum coefficient across bootstrap draws. - max : float Maximum coefficient across bootstrap draws. - ci_down : float 2.5th percentile coefficient (lower 95% bound). - ci_up : float 97.5th percentile coefficient (upper 95% bound). """ df_model_result = pd.DataFrame({ 'betas': [b[0][0] for b in self.all_b], 'p_values': [p[0][0] for p in self.all_p], }) inference = not self.estimates_ystar.isna().all().all() df_model_result['positive_beta'] = df_model_result['betas'].apply(lambda x: 1 if x > 0 else 0) df_model_result['negative_beta'] = df_model_result['betas'].apply(lambda x: 1 if x < 0 else 0) df_model_result['significant'] = df_model_result['p_values'].apply(lambda x: 1 if x < 0.05 else 0) self.inference = {} self.inference['median_ns'] = df_model_result['betas'].median() # note: ns for 'no sampling' self.inference['median'] = self.estimates.stack().median() for ic in ['aic', 'bic', 'hqic']: if max(len(t) for t in self.y_name) == 1: ic_array = np.array(self.summary_df[ic].to_list()) all_b = [arr[0] for arr in self.all_b] coef_mat = np.vstack(all_b) delta = ic_array - ic_array.min() w = np.exp(-0.5 * delta) w /= w.sum() beta_avg = w @ coef_mat self.inference[ic + '_average'] = beta_avg[0] else: self.inference[ic + '_average'] = np.nan obs_med = float(df_model_result['betas'].median()) if inference: null_meds = self.estimates_ystar.median(axis=0).to_numpy(dtype=float) null_meds = null_meds[np.isfinite(null_meds)] if null_meds.size == 0: self.inference['median_p'] = np.nan else: self.inference['median_p'] = float(np.mean(np.abs(null_meds) >= abs(obs_med))) else: self.inference['median_p'] = np.nan # Estimate dependence under the NULL bootstrap (y_star), not the sampling bootstrap. # We estimate correlation of null z's (diag=1) after dropping degenerate specs. p_matrix = self.p_values_ystar.to_numpy(dtype=float) # (n_specs, n_draws) beta_matrix = self.estimates_ystar.to_numpy(dtype=float) # (n_specs, n_draws) R_hat = None rho_estimate = None z_corr = None _keep_stouffer = None self.inference['stouffer_keep_mask'] = None if (p_matrix.ndim == 2) and (p_matrix.shape[1] > 1): p_clip = np.clip(p_matrix, 1e-300, 1.0 - 1e-16) z_matrix = np.sign(beta_matrix) * norm.isf(p_clip / 2.0) # (n_specs, n_draws) Z_null = z_matrix.T # draws × specs z_std = np.std(Z_null, axis=0, ddof=1) keep = np.isfinite(z_std) & (z_std > 1e-14) self.inference['stouffer_keep_mask'] = keep if np.sum(keep) >= 2: Zk = Z_null[:, keep] # draws × m_eff R_hat = np.corrcoef(Zk, rowvar=False) # (m_eff, m_eff) m_eff = R_hat.shape[0] rho_estimate = float((np.sum(R_hat) - np.trace(R_hat)) / (m_eff * (m_eff - 1))) # Project to PSD (eigenvalue clipping) and add tiny ridge for stability evals, evecs = np.linalg.eigh(R_hat) evals[evals < 0.0] = 0.0 R_psd = (evecs * evals) @ evecs.T R_psd = 0.5 * (R_psd + R_psd.T) np.fill_diagonal(R_psd, 1.0) z_corr = R_psd + 1e-12 * np.eye(R_psd.shape[0], dtype=R_psd.dtype) _keep_stouffer = keep else: R_hat = None rho_estimate = None z_corr = None _keep_stouffer = None self.inference['min_ns'] = df_model_result['betas'].min() self.inference['min'] = self.estimates.min().min() self.inference['max_ns'] = df_model_result['betas'].max() self.inference['max'] = self.estimates.max().max() self.inference['pos_ns'] = df_model_result['positive_beta'].sum() self.inference['pos_prop_ns'] = df_model_result['positive_beta'].mean() self.inference['pos'] = (self.estimates > 0.0).sum().sum() self.inference['pos_prop'] = (self.estimates > 0.0).mean().mean() self.inference['neg_ns'] = df_model_result['negative_beta'].sum() self.inference['neg_prop_ns'] = df_model_result['negative_beta'].mean() self.inference['neg'] = (self.estimates < 0.0).sum().sum() self.inference['neg_prop'] = (self.estimates < 0.0).mean().mean() # observed counts obs_pos = int((df_model_result['betas'] > 0).sum()) obs_neg = int((df_model_result['betas'] < 0).sum()) # null counts per null draw (across specs) null_pos = self.estimates_ystar.gt(0).sum(axis=0).to_numpy() null_neg = self.estimates_ystar.lt(0).sum(axis=0).to_numpy() self.inference['pos_p'] = self.__class__._empirical_p_two_sided(null_pos, obs_pos) self.inference['neg_p'] = self.__class__._empirical_p_two_sided(null_neg, obs_neg) self.inference['sig_ns'] = df_model_result['significant'].sum() self.inference['sig_prop_ns'] = df_model_result['significant'].mean() self.inference['sig'] = (self.p_values.stack() < 0.05).sum().sum() self.inference['sig_prop'] = float((self.p_values.stack() < 0.05).mean()) alpha = 0.05 obs_sig = int((self.p_values.stack() < alpha).sum()) null_sig = self.p_values_ystar.lt(alpha).sum(axis=0).to_numpy() self.inference['sig_p'] = self.__class__._empirical_p_two_sided(null_sig, obs_sig) self.inference['pos_sig_ns'] = (df_model_result['positive_beta'] & df_model_result['significant']).sum() self.inference['pos_sig_prop_ns'] = (df_model_result['positive_beta'] & df_model_result['significant']).mean() self.inference['pos_sig'] = ((self.estimates > 0.0) & (self.p_values < 0.05)).sum().sum() self.inference['pos_sig_prop'] = ((self.estimates > 0.0) & (self.p_values < 0.05)).mean().mean() self.inference['neg_sig_ns'] = (df_model_result['negative_beta'] & df_model_result['significant']).sum() self.inference['neg_sig_prop_ns'] = (df_model_result['negative_beta'] & df_model_result['significant']).mean() self.inference['neg_sig'] = ((self.estimates < 0.0) & (self.p_values < 0.05)).sum().sum() self.inference['neg_sig_prop'] = ((self.estimates < 0.0) & (self.p_values < 0.05)).mean().mean() obs_pos_sig = int(((self.estimates > 0.0) & (self.p_values < alpha)).sum().sum()) obs_neg_sig = int(((self.estimates < 0.0) & (self.p_values < alpha)).sum().sum()) null_pos_sig = ((self.estimates_ystar.gt(0) & self.p_values_ystar.lt(alpha)) .sum(axis=0).to_numpy()) null_neg_sig = ((self.estimates_ystar.lt(0) & self.p_values_ystar.lt(alpha)) .sum(axis=0).to_numpy()) self.inference['pos_sig_p'] = self.__class__._empirical_p_two_sided(null_pos_sig, obs_pos_sig) self.inference['neg_sig_p'] = self.__class__._empirical_p_two_sided(null_neg_sig, obs_neg_sig) # ---------------------------- # Stouffer (NULL-calibrated) # ---------------------------- # We compute: # Z_obs = (w^T z_obs) / sqrt(w^T Sigma_hat w) # and calibrate p via the null draws: # p = (1 + #{ |Z_null| >= |Z_obs| }) / (B + 1) # # where z_null are computed from (p_values_ystar, estimates_ystar). # # Note: Sigma_hat is estimated from null z's (already in z_corr). # If z_corr is unavailable, we fall back to independence denom. # Observed per-spec p and beta (full-sample, not bootstrap) p_obs_all = df_model_result['p_values'].to_numpy(dtype=float) b_obs_all = df_model_result['betas'].to_numpy(dtype=float) # Valid mask for observed quantities obs_valid = np.isfinite(p_obs_all) & (p_obs_all >= 0.0) & (p_obs_all <= 1.0) & np.isfinite(b_obs_all) # Use the same "keep" mask you computed from null z-variance if available, # but intersect with observed validity. keep_null = self.inference.get('stouffer_keep_mask', None) if keep_null is None: keep = obs_valid else: keep = obs_valid & keep_null if np.sum(keep) == 0: self.inference['Stouffers'] = (np.nan, np.nan) self.inference['correlation_matrix'] = None self.inference['Sigma_hat'] = None self.inference['mean_correlation'] = None else: # ---- observed z (signed, two-sided) with clipping p_clip_obs = np.clip(p_obs_all[keep], 1e-300, 1.0 - 1e-16) z_obs = np.sign(b_obs_all[keep]) * norm.isf(p_clip_obs / 2.0) # If no null draws (all-NaN), Stouffer cannot be null-calibrated if (p_matrix.size == 0) or (p_matrix.shape[1] < 1) or np.isnan(p_matrix).all(): # Fallback: just report the Z statistic with asymptotic p (diagnostic only) w = np.ones_like(z_obs, dtype=float) denom = float(np.sqrt(np.dot(w, w))) Z_obs = float(np.dot(w, z_obs) / denom) p_two = float(2.0 * norm.sf(abs(Z_obs))) self.inference['Stouffers'] = (Z_obs, p_two) self.inference['correlation_matrix'] = None self.inference['Sigma_hat'] = None self.inference['mean_correlation'] = None else: # Build null z scores (signed) for all specs/draws p_clip_null = np.clip(p_matrix, 1e-300, 1.0 - 1e-16) z_matrix = np.sign(beta_matrix) * norm.isf(p_clip_null / 2.0) # (n_specs, n_draws) # Keep only the selected specs (columns after transpose) Z_null_specs = z_matrix.T[:, keep] # (n_draws, m_eff) # Weights (currently uniform; if you later add weights, replace here) w = np.ones(Z_null_specs.shape[1], dtype=float) # Denominator: # - if z_corr exists and matches kept specs, use it # - else fall back to independence denom = None if z_corr is not None and _keep_stouffer is not None: # _keep_stouffer corresponds to the mask used when building z_corr. # We must ensure we are using the same kept specs subset. # Here, we require keep == _keep_stouffer on the indices considered. # If they differ (because obs_valid removed some), we fall back to independence. if np.array_equal(keep, _keep_stouffer): denom2 = float(w @ (z_corr @ w)) if np.isfinite(denom2) and denom2 > 0.0: denom = float(np.sqrt(denom2)) if denom is None: denom = float(np.sqrt(np.dot(w, w))) # Combined observed statistic Z_obs = float(np.dot(w, z_obs) / denom) # Combined null statistics for calibration (vectorized) Z_null = (Z_null_specs @ w) / denom # Two-sided null-calibrated p-value with +1 correction B = int(Z_null.shape[0]) p_two = float((1.0 + np.sum(np.abs(Z_null) >= abs(Z_obs))) / (B + 1.0)) self.inference['Stouffers'] = (Z_obs, p_two) self.inference['correlation_matrix'] = R_hat self.inference['Sigma_hat'] = z_corr self.inference['mean_correlation'] = rho_estimate
[docs] def summary(self, digits=3) -> None: """ Print a comprehensive textual summary of the fitted model. """ def print_separator(title=None): print("=" * 30) if title: print(title) print("=" * 30) inference = not self.estimates_ystar.isna().all().all() # Display basic model information print_separator("1. Model Summary") print(f"Model: {self.model_name}") print("Inference Tests:", "Yes" if inference else "No") if max(len(t) for t in self.y_name) > 1: print(f"Dependent variable: {set().union(*self.y_name)}") else: print(f"Dependent variable: {self.y_name}") print(f"Independent variable: {self.x_name}") print(f"Number of possible controls: {len(self.controls)}") print(f"Number of draws: {self.draws}") print(f"Number of folds: {self.kfold}") print(f"Number of specifications: {len(self.specs_names)}") # Print model robustness metrics print_separator("2. Model Robustness Metrics") print('2.1 Inference Metrics') print_separator() # Medians / extrema if not inference: print(f"Median β (specs, no resampling): {self.inference['median_ns']:.{digits}f}") else: print(f"Median β (specs, no resampling): {self.inference['median_ns']:.{digits}f} " f"(null-calibrated p: {self.inference['median_p']:.{digits}g})") print(f"Median β (bootstraps × specs): {self.inference['median']:.{digits}f}") print(f"Min β (specs, no resampling): {self.inference['min_ns']:.{digits}f}") print(f"Min β (bootstraps × specs): {self.inference['min']:.{digits}f}") print(f"Max β (specs, no resampling): {self.inference['max_ns']:.{digits}f}") print(f"Max β (bootstraps × specs): {self.inference['max']:.{digits}f}") # IC-weighted averages (fix NaN checks) if not np.isnan(self.inference.get('aic_average', np.nan)): print(f"AIC-weighted β (specs, no resampling): {self.inference['aic_average']:.{digits}f}") if not np.isnan(self.inference.get('bic_average', np.nan)): print(f"BIC-weighted β (specs, no resampling): {self.inference['bic_average']:.{digits}f}") if not np.isnan(self.inference.get('hqic_average', np.nan)): print(f"HQIC-weighted β (specs, no resampling): {self.inference['hqic_average']:.{digits}f}") # Descriptive shares + null-calibrated tests label_share = lambda s: f"{s:.{digits}f}" label_p = lambda p: f"{p:.{digits}g}" # Significant share print(f"Share significant (specs, no resampling) [descriptive]: " f"{label_share(self.inference['sig_prop_ns'])}") print(f"Share significant (bootstraps × specs) [descriptive]: " f"{label_share(self.inference['sig_prop'])}" + ("" if not inference else f" | null-calibrated p: {label_p(self.inference['sig_p'])}")) # Positive share print(f"Share β>0 (specs, no resampling) [descriptive]: " f"{label_share(self.inference['pos_prop_ns'])}") print(f"Share β>0 (bootstraps × specs) [descriptive]: " f"{label_share(self.inference['pos_prop'])}" + ("" if not inference else f" | null-calibrated p: {label_p(self.inference['pos_p'])}")) # Negative share print(f"Share β<0 (specs, no resampling) [descriptive]: " f"{label_share(self.inference['neg_prop_ns'])}") print(f"Share β<0 (bootstraps × specs) [descriptive]: " f"{label_share(self.inference['neg_prop'])}" + ("" if not inference else f" | null-calibrated p: {label_p(self.inference['neg_p'])}")) # Positive & significant share print(f"Share β>0 & significant (specs, no resampling) [descriptive]: " f"{label_share(self.inference['pos_sig_prop_ns'])}") print(f"Share β>0 & significant (bootstraps × specs) [descriptive]: " f"{label_share(self.inference['pos_sig_prop'])}" + ("" if not inference else f" | null-calibrated p: {label_p(self.inference['pos_sig_p'])}")) # Negative & significant share print(f"Share β<0 & significant (specs, no resampling) [descriptive]: " f"{label_share(self.inference['neg_sig_prop_ns'])}") print(f"Share β<0 & significant (bootstraps × specs) [descriptive]: " f"{label_share(self.inference['neg_sig_prop'])}" + ("" if not inference else f" | null-calibrated p: {label_p(self.inference['neg_sig_p'])}")) # Stouffer’s Z, p_two = self.inference['Stouffers'] print(f"Stouffer’s Z = {Z:.{digits}f} (two-sided p = {p_two:.{digits}g})") print_separator() # print in-sample metrics if max(len(t) for t in self.y_name) == 1: print('2.2 In-Sample Metrics (Full Sample)') print_separator() print(f"Min AIC: {round(self.summary_df['aic'].min(), digits)}, Specs: {list(self.summary_df['spec_name'].loc[self.summary_df['aic'].idxmin()])}") print(f"Min BIC: {round(self.summary_df['bic'].min(), digits)}, Specs: {list(self.summary_df['spec_name'].loc[self.summary_df['bic'].idxmin()])}") print(f"Min HQIC: {round(self.summary_df['hqic'].min(), digits)}, Specs: {list(self.summary_df['spec_name'].loc[self.summary_df['hqic'].idxmin()])}") if 'll_gain_per_obs' in self.summary_df.columns: ll_col = 'll_gain_per_obs' print( "Max Relative Log-Likelihood Gain (per obs): " f"{round(self.summary_df[ll_col].max(), digits)}, " f"Specs: {list(self.summary_df['spec_name'].loc[self.summary_df[ll_col].idxmax()])}" ) print( "Min Relative Log-Likelihood Gain (per obs): " f"{self.summary_df[ll_col].min():.{digits}f}, " f"Specs: {list(self.summary_df['spec_name'].loc[self.summary_df[ll_col].idxmin()])}" ) if 'll_raw' in self.summary_df.columns: print( f"Max Raw Log Likelihood: {round(self.summary_df['ll_raw'].max(), digits)}, " f"Specs: {list(self.summary_df['spec_name'].loc[self.summary_df['ll_raw'].idxmax()])}" ) print( f"Min Raw Log Likelihood: {self.summary_df['ll_raw'].min():.{digits}f}, " f"Specs: {list(self.summary_df['spec_name'].loc[self.summary_df['ll_raw'].idxmin()])}" ) else: print(f"Max Log Likelihood: {round(self.summary_df['ll'].max(), digits)}, Specs: {list(self.summary_df['spec_name'].loc[self.summary_df['ll'].idxmax()])}") print( f"Min Log Likelihood: {self.summary_df['ll'].min():.{digits}f}, " f"Specs: {list(self.summary_df['spec_name'].loc[self.summary_df['ll'].idxmin()])}" ) print( f"Max {'Adj-' if 'OLS' in self.model_name else 'Pseudo-'}R²: " f"{self.summary_df['r2'].max():.{digits}f}, " f"Specs: {list(self.summary_df['spec_name'].loc[self.summary_df['r2'].idxmax()])}" ) print( f"Min {'Adj-' if 'OLS' in self.model_name else 'Pseudo-'}R²: " f"{self.summary_df['r2'].min():.{digits}f}, " f"Specs: {list(self.summary_df['spec_name'].loc[self.summary_df['r2'].idxmin()])}" ) print_separator() if max(len(t) for t in self.y_name) == 1: print(f'2.3 Out-Of-Sample Metrics ({self.name_av_k_metric} averaged across folds)') else: print(f'2.2 Out-Of-Sample Metrics ({self.name_av_k_metric} averaged across folds)') print_separator() oos_max_row = self.summary_df.loc[self.summary_df['av_k_metric'].idxmax(),] print(f'Max Average: {round(oos_max_row["av_k_metric"], digits)}, Specs: {list(oos_max_row["spec_name"])} ') oos_min_row = self.summary_df.loc[self.summary_df['av_k_metric'].idxmin(),] print(f'Min Average: {round(oos_min_row["av_k_metric"], digits)}, Specs: {list(oos_min_row["spec_name"])} ') print(f"Mean Average: {round(self.summary_df['av_k_metric'].mean(), digits)}") print(f"Median Average: {round(self.summary_df['av_k_metric'].median(), digits)}")
[docs] def plot(self, loess: bool = True, specs: Optional[List[List[str]]] = None, ic: str = 'aic', ci: float = 1, colormap: str = 'viridis', figsize: Tuple[int, int] = (12, 6), ext: str = ' pdf', figpath = None, project_name: str = 'no_project_name', highlights: bool = True, oddsratio: bool = False ) -> plt.Figure: """ Plots the regression results using specified options. Parameters ---------- loess : bool, default=True Whether to add a LOESS smoothed trend line. specs : list of list of str, optional Up to three specific model specifications to highlight. ic : {'bic', 'aic', 'hqic'}, default='aic' Which information criterion to display. ci: float, default=1 Confidence interval. colormap : str, default='viridis' Name of the matplotlib colormap for the plot. figpath : str or Path, optional Directory in which to save outputs; if None, uses current working dir. figsize : tuple of int, default=(12, 6) Figure width and height in inches. ext : str, default='pdf' File extension if saving the figure (unused if not saving). project_name : str, default='no_project_name' Project identifier used in saved filename (unused if not saving). highlights bool, default = True Whether to highlight individual plots. oddsratio bool, default=False Whether to exponentiate the coefficients (e.g. for odds ratios). Returns ------- fig : matplotlib.figure.Figure The figure object containing the plot. Raises ------ ValueError If `ic` is not one of {'bic', 'aic', 'hqic'}. or if more than three specs are given, TypeError If `specs` is provided but is not a list of lists of str, or any spec is not in the computed specifications. """ valid_ic = ['bic', 'aic', 'hqic'] if ic not in valid_ic: raise ValueError( f"Unsupported information criterion: expected one of {valid_ic}, " f"received: '{ic}'." ) if specs is not None: if not all(isinstance(l, list) for l in specs): raise TypeError("'specs' must be a list of lists.") if len(specs) > 3: raise ValueError("The max number of specifications to highlight is 3") if not all(frozenset(spec) in self.specs_names.to_list() for spec in specs): raise TypeError("All specifications in 'spec' must be in the valid computed specifications.") if ic not in valid_ic: raise ValueError(f"'ic' must be one of the following: {valid_ic}") from robustipy.figures import plot_results return plot_results(results_object=self, loess=loess, specs=specs, ic=ic, ci=ci, colormap=colormap, figsize=figsize, ext=ext, figpath=figpath, project_name=project_name, highlights=highlights, oddsratio=oddsratio)
def _compute_summary(self): """ Computes summary statistics based on coefficient estimates. Returns: pd.DataFrame: DataFrame containing summary statistics. """ data = self.estimates.copy() out = pd.DataFrame() out['median'] = data.median(axis=1) out['max'] = data.max(axis=1) out['min'] = data.min(axis=1) out['ci_up'] = data.quantile(q=0.975, axis=1, interpolation='nearest') out['ci_down'] = data.quantile(q=0.025, axis=1, interpolation='nearest') return out
[docs] def compute_bma(self) -> pd.DataFrame: """ Performs Bayesian Model Averaging (BMA) using BIC-implied priors. Returns ------- pd.DataFrame DataFrame containing BMA results with control variable inclusion probabilities and average coefficients. """ if isinstance(self.y_name, list) and len({str(y) for y in self.y_name}) > 1: raise ValueError( "BMA weights are only likelihood-comparable within a common outcome, " "likelihood family, and estimation sample. Compute BMA separately " "within comparable outcome sets." ) likelihood_per_var = [] weighted_coefs = [] # Compute model likelihoods using BIC-implied log posterior max_ll = np.max(-self.summary_df.bic / 2) shifted_ll = (-self.summary_df.bic / 2) - max_ll models_likelihood = np.exp(shifted_ll) sum_likelihoods = np.nansum(models_likelihood) # Flatten list of coefficient arrays and variable names coefs = [[i[0] for i in x] for x in self.all_b] coefs = [i for sl in coefs for i in sl] var_names = [i for sl in self.all_predictors for i in sl] coefs_df = pd.DataFrame({'coef': coefs, 'var_name': var_names}) # Compute weighted likelihood and average coefficients for each control variable for ele in self.controls: idx = [ele in spec for spec in self.specs_names] likelihood_per_var.append(np.nansum(models_likelihood[idx])) coefs = coefs_df[coefs_df.var_name == ele].coef.to_numpy() likelihood = models_likelihood[idx] weighted_coef = coefs * likelihood weighted_coefs.append(np.nansum(weighted_coef)) # Normalize to get posterior inclusion probabilities and BMA-weighted coefficients likelihood_per_var = np.asarray(likelihood_per_var, dtype=float) weighted_coefs = np.asarray(weighted_coefs, dtype=float) probs = likelihood_per_var / sum_likelihoods final_coefs = weighted_coefs / sum_likelihoods conditional_coefs = np.divide( weighted_coefs, likelihood_per_var, out=np.full_like(weighted_coefs, np.nan, dtype=float), where=likelihood_per_var != 0, ) # Return summary DataFrame summary_bma = pd.DataFrame({ "control_var": self.controls, "probs": probs, "average_coefs": final_coefs, "conditional_average_coefs": conditional_coefs, }) return summary_bma
[docs] def merge( self, result_obj: "OLSResult", left_prefix: str, right_prefix: str ) -> "MergedResult": """ Merge this OLSResult with another, tagging each spec by prefix. Parameters ---------- result_obj : OLSResult Another result object with the same dependent variable. left_prefix : str Tag to append to this object's specifications. right_prefix : str Tag to append to the other object's specifications. Returns ------- merged : MergedResult A new MergedResult containing all specs, estimates, p_values, and r2_values from both. Raises ------ TypeError If `result_obj` is not an OLSResult, or prefixes are not strings. ValueError If the dependent variable names do not match. """ # Validate input types if not isinstance(result_obj, OLSResult): raise TypeError("'result_obj' must be an instance of OLSResult.") if not isinstance(left_prefix, str) or not isinstance(right_prefix, str): raise TypeError("'prefixes' must be of type 'str.'") # Ensure dependent variables match before merging if self.y_name != result_obj.y_name: raise ValueError('Dependent variable names must match.') # Apply prefixes to spec sets specs_original = [frozenset(list(s) + [left_prefix]) for s in self.specs_names] specs_new = [frozenset(list(s) + [right_prefix]) for s in result_obj.specs_names] # Merge all relevant fields y = self.y_name specs = specs_original + specs_new estimates = pd.concat([self.estimates, result_obj.estimates], ignore_index=True) p_values = pd.concat([self.p_values, result_obj.p_values], ignore_index=True) r2_values = pd.concat([self.r2_values, result_obj.r2_values], ignore_index=True) # Return a new MergedResult object return MergedResult( y=y, specs=specs, estimates=estimates, p_values=p_values, r2_values=r2_values )
[docs] def save_to_csv(self, path: str) -> None: """Function to save summary dataframe to a csv""" self.summary_df.to_csv(path)
[docs] class OLSRobust(BaseRobust): """ Class for multi-variate regression using OLS Parameters ---------- y : list[str] Dependent variable column name(s). If multiple dependent variables are supplied, OLSRobust constructs non-empty standardised outcome composites. x : list[str] Predictor column name(s) included in every specification. The first element is treated as the reported focal estimand; any additional elements are fixed predictors that do not vary across the control-subset space. data : pandas.DataFrame The full dataset containing `y`, `x`, and any controls. model_name : str, default='OLS Robust' A custom label for this model run, used in outputs and plots Attributes ---------- results : OLSResult Populated after calling `.fit()`, contains all estimates and diagnostics. """ def __init__( self, *, y: List[str], x: List[str], data: pd.DataFrame, model_name: str = 'OLS Robust' ) -> None: super().__init__(y=y, x=x, data=data, model_name=model_name)
[docs] def get_results(self) -> 'OLSResult': """ Return the OLSResult object once .fit() has been called. Returns ------- OLSResult The result object encapsulating all analysis outputs. """ return self.results
[docs] def sample_z_specs(self, controls, z_specs_sample_size): """ Generate a sample of z specifications by randomly selecting subsets of control variables. This method creates a set of binary masks to determine which subsets of the given `controls` should be included in each z specification. The number of subsets sampled is controlled by `z_specs_sample_size`. Parameters ---------- controls : list of str A list of control variable names from which to build z specifications. z_specs_sample_size : int The number of z specifications to sample. Must be a positive integer. Returns ------- space_n_sample : int The number of sampled z specifications. z_specs_sample : list of tuple of str A list where each element is a tuple of selected control variable names representing one sampled z specification. Notes ----- - The method uses `sample_z_masks` to generate binary inclusion masks. - If `self.seed` is defined, it is passed to `sample_z_masks` to ensure reproducibility. - Each sampled mask determines a unique subset of `controls`. """ z_cols = controls n_z = len(z_cols) masks = sample_z_masks( n_z = n_z, n_masks = z_specs_sample_size, seed = getattr(self, "seed", None) ) z_specs_sample = [ tuple(z_cols[i] for i in range(n_z) if (m >> i) & 1) for m in masks ] space_n_sample = len(z_specs_sample) return space_n_sample, z_specs_sample
[docs] def fit( self, *, controls: List[str], group: Optional[str] = None, draws: int = None, kfold: int = None, oos_metric: str = None, n_cpu: Optional[int] = None, seed: Optional[int] = None, composite_sample: Optional[int] = None, z_specs_sample_size: Optional[int] = None, rescale_y: Optional[bool] = False, rescale_x: Optional[bool] = False, rescale_z: Optional[bool] = False, compute_shap: bool = True, threshold: int = 1000000 ) -> 'OLSRobust': """ Fit the OLS models across the specification space and over bootstrap resamples. This method explores a variety of control variable specifications by constructing different combinations (z-specs) and optionally sampling from this space. It performs bootstrap resampling and/or cross-validation depending on the arguments provided. Parameters ---------- controls : list of str Candidate control variables to include in model specifications. group : str, optional Column name for grouping fixed effects. If provided, outcomes are de-meaned by group. draws : int, optional Number of bootstrap resamples per specification. If None, bootstrapping is skipped. kfold : int, optional Number of folds for out-of-sample evaluation. Requires oos_metric to be specified. oos_metric : {'pseudo-r2', 'rmse'}, optional Metric to evaluate out-of-sample performance when kfold is set. n_cpu : int, optional Number of parallel processes to use. Defaults to all available CPUs minus one if None. seed : int, optional Random seed for reproducibility. Propagated to all random operations. composite_sample : int, optional Number of non-empty outcome-composite subsets to sample when multiple dependent variables are supplied. If None, all non-empty outcome subsets are enumerated. Ignored for a single dependent variable. z_specs_sample_size : int, optional Number of z specifications to randomly sample from the full set of possible combinations. If None, the full specification space is used. rescale_y : bool, default=False If True, rescale the dependent variable to have mean 0 and standard deviation 1. rescale_x : bool, default=False If True, rescale the x variable(s) to have mean 0 and standard deviation 1. rescale_z : bool, default=False If True, rescale the z variable(s) to have mean 0 and standard deviation 1. compute_shap : bool, default=True If True, compute SHAP values for plotting. Set False for fit-only profiling. threshold : int, default=1_000_000 If the total number of model fits (specs × draws × folds) exceeds this number, a warning is raised. Returns ------- self : OLSRobust The fitted model instance. Results are stored in the .results attribute. Notes ----- - At least one of draws or kfold must be set to perform model fitting. - z_specs_sample_size samples the covariate-subset space before fitting. - composite_sample samples the outcome-composite space before fitting when len(y) > 1. - This method may be computationally intensive; parallelisation is recommended via n_cpu. """ # Validate control variable input if not isinstance(controls, list): raise TypeError(f"'controls' must be a list. Received types: {type(controls).__name__}.") # Validate z_specs_sample_size input if z_specs_sample_size is not None: if not isinstance(z_specs_sample_size, int) or z_specs_sample_size <= 0: raise ValueError("'z_specs_sample_size' must be an integer greater than 0 if provided.") # Optionally rescale y, x, and z variables if rescale_y: if len(self.y) > 1: warnings.warn("Rescaling of the dependent variable already occurs with multiple y variables. Skipping additional rescaling.", UserWarning) else: self.data[self.y] = rescale(self.data[self.y]) if rescale_x: for column in self.x: self.data[column] = rescale(self.data[column]) if rescale_z: for column in controls: self.data[column] = rescale(self.data[column]) # Standardize and propagate user arguments draws, kfold, oos_metric, n_cpu, seed = make_inquiry( self.model_name, self.y, self.data, draws, kfold, oos_metric, n_cpu, seed ) self.seed = seed self.composite_sample = composite_sample # Ensure constant term and prepare x, controls if group: # With group demeaning, an intercept is not identified. Drop any existing constant. if "const" in self.data.columns: self.data = self.data.drop(columns=["const"]) if "const" in self.x: self.x = [c for c in self.x if c != "const"] if "const" in controls: controls = [c for c in controls if c != "const"] warnings.warn( "[OLSRobust] Group fixed effects requested: dropping constant term " "before demeaning so t/p are identified.", UserWarning ) invariant = _find_group_invariant_columns(self.data, self.x + controls, group) inv_x = [c for c in invariant if c in self.x] inv_controls = [c for c in invariant if c in controls] if inv_x or inv_controls: warnings.warn( "[OLSRobust] These variables are constant within each group and " "are not identified under group demeaning. t/p will be NaN for them: " f"x={inv_x!r}, controls={inv_controls!r}.", UserWarning ) else: self.x, controls = _ensure_single_constant(self.data, self.y, self.x, controls) # Handle multiple y case if len(self.y) > 1: self.multiple_y() # Check that provided arguments are valid self._validate_fit_args( controls=controls, group=group, draws=draws, kfold=kfold, oos_metric=oos_metric, n_cpu=n_cpu, seed=self.seed, valid_oos_metrics=['pseudo-r2', 'rmse'], threshold=threshold ) # Set global seed np.random.seed(self.seed) # Inform user of configuration print( f"OLSRobust is running with n_cpu={n_cpu}, draws={draws}, folds={kfold}, seed={seed}.\n" f"We're evaluating our out-of-sample predictions with the {oos_metric} metric.\n" f"The estimand of interest is {self.x[0]}. Let's begin the calculations..." ) # Determine sample size and metric sample_size = self.data.shape[0] self.oos_metric_name = oos_metric # Optionally sample from the specification space if z_specs_sample_size: space_n, z_specs = self.sample_z_specs(controls=controls, z_specs_sample_size=z_specs_sample_size) else: space_n = space_size(controls) z_specs = all_subsets(controls) z_specs = [tuple(spec) for spec in z_specs] space_n = len(z_specs) # Cache predictor slices by control-spec bitmask to avoid repeated # DataFrame slicing in large specification spaces. control_positions = {name: idx for idx, name in enumerate(controls)} predictor_prefix = self.x + ([group] if group else []) base_predictors_df = self.data[predictor_prefix + controls] spec_predictor_cache: dict[int, pd.DataFrame] = {} def _get_predictors_for_spec(spec_tuple: tuple[str, ...]) -> pd.DataFrame: mask = _spec_bitmask(spec_tuple, control_positions) cached = spec_predictor_cache.get(mask) if cached is None: cols = predictor_prefix + list(spec_tuple) cached = base_predictors_df.loc[:, cols] spec_predictor_cache[mask] = cached return cached # Count y-composite specifications (1 if single y) n_y_comps = len(getattr(self, "y_specs", [None])) # Warn if model fit will be too large self._warn_if_large_draws( draws=draws, n_control_specs=space_n, n_y_composites=n_y_comps, threshold=threshold ) notebook_draws_bar = _make_notebook_draws_bar( total=space_n * draws * n_y_comps, description="Bootstrap draws" ) # Delegate to single or multiple y logic if len(self.y) > 1: # Initialize containers to collect results across all composite outcomes list_b_array = [] list_p_array = [] list_b_array_ystar = [] list_p_array_ystar = [] list_r2_array = [] list_r2i_array = [] list_ll_array = [] list_ll_raw_array = [] list_ll_null_array = [] list_ll_gain_array = [] list_ll_gain_per_obs_array = [] list_nobs_array = [] list_aic_array = [] list_bic_array = [] list_hqic_array = [] list_av_k_metric_array = [] y_names = [] specs = [] all_predictors = [] b_all_list = [] p_all_list = [] # Loop over each composite y variable for y, y_name in _progress_iter( zip(self.y_composites, self.y_specs), total=len(self.y_composites), description="Composite Ys", ): # Initialize arrays to store draws for this y-spec b_array = np.empty([space_n, draws]) p_array = np.empty([space_n, draws]) b_array_ystar = np.empty([space_n, draws]) p_array_ystar = np.empty([space_n, draws]) r2_array = np.empty([space_n, draws]) r2i_array = np.empty([space_n]) ll_array = np.empty([space_n]) ll_raw_array = np.empty([space_n]) ll_null_array = np.empty([space_n]) ll_gain_array = np.empty([space_n]) ll_gain_per_obs_array = np.empty([space_n]) nobs_array = np.empty([space_n], dtype=int) aic_array = np.empty([space_n]) bic_array = np.empty([space_n]) hqic_array = np.empty([space_n]) av_k_metric_array = np.empty([space_n]) # Loop over all z-specs (control combinations) for index, spec in enumerate(z_specs): spec_predictors = _get_predictors_for_spec(spec) comb = pd.concat([y, spec_predictors], axis=1) comb = comb.dropna() comb = comb.reset_index(drop=True).copy() # Demean by group if required if group: comb = group_demean(comb, group=group) X_design = comb.drop(columns=comb.columns[0]) try: self._check_colinearity(X_design) except ValueError as e: warnings.warn( f"[OLSRobust] Spec {spec!r} may be unstable due to perfect collinearity:\n{e}\n" "Proceeding with pseudoinverse; coefficients may not be uniquely identified.", UserWarning ) # Fit the full sample OLS mode ( b_all, p_all, r2_i, ll_i, aic_i, bic_i, hqic_i, av_k_metric_i, ll_raw_i, ll_null_i, ll_gain_i, ll_gain_per_obs_i, nobs_i ) = self._full_sample_OLS( comb, kfold=kfold, group=group, oos_metric_name=self.oos_metric_name ) # Calculate pseudo-outcome residuals for bootstrap y_star = comb.iloc[:, [0]] - np.dot(comb.iloc[:, [1]], b_all[0][0]) bootstrap_data = _prepare_ols_bootstrap_data(comb, y_star) if group: n_predictors = bootstrap_data.drop( columns=[bootstrap_data.columns[0], "y_star", group], errors="ignore" ).shape[1] min_rows_after_filter = max(5, n_predictors + 2) bootstrap_arrays = None else: min_rows_after_filter = None bootstrap_arrays = _prepare_non_group_ols_bootstrap_arrays(bootstrap_data) seeds = np.random.randint(0, 2**31, size=draws, dtype=np.int64) # Run bootstrap regressions in parallel def _run_one_seed(seed_i: int): if group is None: assert bootstrap_arrays is not None return _strap_non_group_OLS_arrays( *bootstrap_arrays, sample_size=sample_size, seed=seed_i, ) return self._strap_OLS( bootstrap_data, group, sample_size, seed_i, None, min_rows_after_filter=min_rows_after_filter, ) bootstrap_out = _run_parallel_seed_batches( seeds=seeds, n_cpu=n_cpu, run_one_seed=_run_one_seed, draws_bar=notebook_draws_bar, dispatch_mode="streaming" if group else "chunked", ) b_list, p_list, r2_list, b_list_ystar, p_list_ystar = zip(*bootstrap_out) # Collect and store results for this spec y_names.append(y_name) specs.append(frozenset(list(y_name) + list(spec))) all_predictors.append(self.x + list(spec)) b_array[index, :] = b_list p_array[index, :] = p_list b_array_ystar[index, :] = b_list_ystar p_array_ystar[index, :] = p_list_ystar r2_array[index, :] = r2_list r2i_array[index] = r2_i ll_array[index] = ll_i ll_raw_array[index] = ll_raw_i ll_null_array[index] = ll_null_i ll_gain_array[index] = ll_gain_i ll_gain_per_obs_array[index] = ll_gain_per_obs_i nobs_array[index] = nobs_i aic_array[index] = aic_i bic_array[index] = bic_i hqic_array[index] = hqic_i av_k_metric_array[index] = av_k_metric_i b_all_list.append(b_all) p_all_list.append(p_all) # Store arrays across all y-specs list_b_array.append(b_array) list_p_array.append(p_array) list_b_array_ystar.append(b_array_ystar) list_p_array_ystar.append(p_array_ystar) list_r2_array.append(r2_array) list_r2i_array.append(r2i_array) list_ll_array.append(ll_array) list_ll_raw_array.append(ll_raw_array) list_ll_null_array.append(ll_null_array) list_ll_gain_array.append(ll_gain_array) list_ll_gain_per_obs_array.append(ll_gain_per_obs_array) list_nobs_array.append(nobs_array) list_aic_array.append(aic_array) list_bic_array.append(bic_array) list_hqic_array.append(hqic_array) list_av_k_metric_array.append(av_k_metric_array) # Assemble final result object across all composite outcomes results = OLSResult( y=y_names, x=self.x, data=self.data, specs=specs, all_predictors=all_predictors, controls=controls, draws=draws, kfold=kfold, all_b=b_all_list, all_p=p_all_list, estimates=np.vstack(list_b_array), p_values=np.vstack(list_p_array), estimates_ystar=np.vstack(list_b_array_ystar), p_values_ystar=np.vstack(list_p_array_ystar), r2_values=np.vstack(list_r2_array), r2i_array=np.hstack(list_r2i_array), ll_array=np.hstack(list_ll_array), ll_raw_array=np.hstack(list_ll_raw_array), ll_null_array=np.hstack(list_ll_null_array), ll_gain_array=np.hstack(list_ll_gain_array), ll_gain_per_obs_array=np.hstack(list_ll_gain_per_obs_array), nobs_array=np.hstack(list_nobs_array), aic_array=np.hstack(list_aic_array), bic_array=np.hstack(list_bic_array), hqic_array=np.hstack(list_hqic_array), av_k_metric_array=np.hstack(list_av_k_metric_array), model_name=self.model_name, name_av_k_metric=self.oos_metric_name, shap_return=None ) self.results = results else: # Initialize containers for storing results across z-specs specs = [] all_predictors = [] b_all_list = [] p_all_list = [] # Preallocate arrays for bootstrap and model stats b_array = np.empty([space_n, draws]) p_array = np.empty([space_n, draws]) b_array_ystar = np.empty([space_n, draws]) p_array_ystar = np.empty([space_n, draws]) r2_array = np.empty([space_n, draws]) r2i_array = np.empty([space_n]) ll_array = np.empty([space_n]) ll_raw_array = np.empty([space_n]) ll_null_array = np.empty([space_n]) ll_gain_array = np.empty([space_n]) ll_gain_per_obs_array = np.empty([space_n]) nobs_array = np.empty([space_n], dtype=int) aic_array = np.empty([space_n]) bic_array = np.empty([space_n]) hqic_array = np.empty([space_n]) av_k_metric_array = np.empty([space_n]) shap_return = None if compute_shap: import shap # Prepare SHAP dataset (includes y, x, z, and optional group) if group: SHAP_comb = self.data[self.y + self.x + [group] + controls] SHAP_comb = group_demean(SHAP_comb, group=group) else: SHAP_comb = self.data[self.y + self.x + controls] SHAP_comb = SHAP_comb.dropna() SHAP_comb = SHAP_comb.reset_index(drop=True).copy() # Fit linear model for SHAP values x_train, x_test, y_train, _ = train_test_split( SHAP_comb[self.x + controls].drop(columns=['const'], errors='ignore'), SHAP_comb[self.y], test_size=0.2, random_state=self.seed ) model = sklearn.linear_model.LinearRegression() model.fit(x_train, y_train) # Explain model predictions with SHAP explainer = shap.LinearExplainer(model, x_train) shap_return = [explainer.shap_values(x_test), x_test] # Generate shared bootstrap seeds ONCE for all specifications # This ensures all specs use the same bootstrap samples, creating proper correlation shared_seeds = np.random.randint(0, 2 ** 32 - 1, size=draws, dtype=np.int64) y_single_df = self.data[self.y] # Loop over all z-specs (combinations of control variables) for index, spec in _progress_iter( enumerate(z_specs), total=space_n, description="Control specs", ): spec_predictors = _get_predictors_for_spec(spec) comb = pd.concat([y_single_df, spec_predictors], axis=1) # Clean combined dataset comb = comb.dropna() comb = comb.reset_index(drop=True).copy() # Demean by group if required if group: comb = group_demean(comb, group=group) # Fit model on full sample ( b_all, p_all, r2_i, ll_i, aic_i, bic_i, hqic_i, av_k_metric_i, ll_raw_i, ll_null_i, ll_gain_i, ll_gain_per_obs_i, nobs_i ) = self._full_sample_OLS( comb, kfold=kfold, group=group, oos_metric_name=self.oos_metric_name ) # Calculate pseudo-outcome residuals for bootstrapping y_star = comb.iloc[:, [0]] - np.dot(comb.iloc[:, [1]], b_all[0][0]) bootstrap_data = _prepare_ols_bootstrap_data(comb, y_star) if group: n_predictors = bootstrap_data.drop( columns=[bootstrap_data.columns[0], "y_star", group], errors="ignore" ).shape[1] min_rows_after_filter = max(5, n_predictors + 2) bootstrap_arrays = None else: min_rows_after_filter = None bootstrap_arrays = _prepare_non_group_ols_bootstrap_arrays(bootstrap_data) # Bootstrap OLS model draws in parallel using SHARED seeds # All specs use the same seeds to create proper correlation structure def _run_one_seed(seed_i: int): if group is None: assert bootstrap_arrays is not None return _strap_non_group_OLS_arrays( *bootstrap_arrays, sample_size=sample_size, seed=seed_i, ) return self._strap_OLS( bootstrap_data, group, sample_size, seed_i, None, min_rows_after_filter=min_rows_after_filter, ) bootstrap_out = _run_parallel_seed_batches( seeds=shared_seeds, n_cpu=n_cpu, run_one_seed=_run_one_seed, draws_bar=notebook_draws_bar, dispatch_mode="streaming" if group else "chunked", ) b_list, p_list, r2_list, b_list_ystar, p_list_ystar = zip(*bootstrap_out) # Store results for current spec specs.append(frozenset(spec)) all_predictors.append(self.x + list(spec)) b_array[index, :] = b_list p_array[index, :] = p_list b_array_ystar[index, :] = b_list_ystar p_array_ystar[index, :] = p_list_ystar r2_array[index, :] = r2_list r2i_array[index] = r2_i ll_array[index] = ll_i ll_raw_array[index] = ll_raw_i ll_null_array[index] = ll_null_i ll_gain_array[index] = ll_gain_i ll_gain_per_obs_array[index] = ll_gain_per_obs_i nobs_array[index] = nobs_i aic_array[index] = aic_i bic_array[index] = bic_i hqic_array[index] = hqic_i av_k_metric_array[index] = av_k_metric_i b_all_list.append(b_all) p_all_list.append(p_all) # Assemble final results object results = OLSResult(y=self.y[0], x=self.x[0], data=self.data, specs=specs, all_predictors=all_predictors, controls=controls, draws=draws, kfold=kfold, all_b=b_all_list, all_p=p_all_list, estimates=b_array, p_values=p_array, estimates_ystar=b_array_ystar, p_values_ystar=p_array_ystar, r2_values=r2_array, r2i_array=r2i_array, ll_array=ll_array, ll_raw_array=ll_raw_array, ll_null_array=ll_null_array, ll_gain_array=ll_gain_array, ll_gain_per_obs_array=ll_gain_per_obs_array, nobs_array=nobs_array, aic_array=aic_array, bic_array=bic_array, hqic_array=hqic_array, av_k_metric_array=av_k_metric_array, model_name=self.model_name, name_av_k_metric=self.oos_metric_name, shap_return=shap_return) self.results = results if notebook_draws_bar is not None: notebook_draws_bar.close()
def _predict( self, x_test: np.ndarray, betas: np.ndarray ) -> np.ndarray: """ Predict the dependent variable based on the test data and coefficients. Parameters ---------- x_test : array-like, shape=(n_obs, n_features) Independent variables. betas : array-like, shape=(n_features,) Regression coefficients. Returns ------- y_pred : array Predicted values for the dependent variable. """ return np.dot(x_test, betas) def _full_sample_OLS( self, comb_var, kfold, group, oos_metric_name ): """ Call stripped_ols() over the full data containing y, x, and controls. Parameters ---------- comb_var : pd.DataFrame Combined y, x, and controls dataset (cleaned). kfold : int Number of CV folds; if 0 or None, skips CV. group : str or None Grouping column for fixed effects (dropped before fit). oos_metric_name : str Out-of-sample metric ('pseudo-r2' or 'rmse').. Returns ------- b : np.ndarray Full-sample coefficient estimates. p : np.ndarray Full-sample p-values. r2 : float In-sample R². ll_metric : float Scale-invariant null-relative log-likelihood gain per observation. ll_raw : float Raw Gaussian log-likelihood of the full model. ll_null : float Raw Gaussian log-likelihood of the intercept-only model. ll_gain : float Difference ll_raw - ll_null. ll_gain_per_obs : float ll_gain divided by nobs. nobs : int Number of observations used in this specification. aic : float Akaike Information Criterion. bic : float Bayesian Information Criterion. hqic : float Hannan-Quinn Information Criterion. av_k_metric : float Average out-of-sample metric across folds. """ # Separate response and predictors y = comb_var.iloc[:, [0]] x_temp = comb_var.drop(comb_var.columns[0], axis=1) # Drop group column if provided if group: x = x_temp.drop(columns=group) else: x = x_temp # Fit full sample OLS out = simple_ols(y=y, x=x) # Initialize cross-validated metric av_k_metric = None # Cross-validation (with or without grouping) if kfold: metric = [] if group: n_groups = pd.Series(x_temp[group]).nunique(dropna=False) if kfold > n_groups: raise ValueError( f"kfold={kfold} cannot exceed number of unique groups " f"({n_groups}) for grouped OLS CV." ) k_fold = GroupKFold(kfold) for k, (train, test) in enumerate(k_fold.split(x, y, groups=x_temp[group])): out_k = simple_ols(y=y.loc[train], x=x.loc[train]) y_pred = self._predict(x.loc[test], out_k['b']) y_true = y.loc[test] if oos_metric_name == 'rmse': k_rmse = root_mean_squared_error(y_true, y_pred) metric.append(k_rmse) elif oos_metric_name == 'pseudo-r2': k_r2 = pseudo_r2(y_true, y_pred, np.mean(y.loc[train])) metric.append(k_r2) else: raise ValueError('No valid OOS metric provided.') else: k_fold = KFold(kfold) for k, (train, test) in enumerate(k_fold.split(x, y)): out_k = simple_ols(y=y.loc[train], x=x.loc[train]) y_pred = self._predict(x.loc[test], out_k['b']) y_true = y.loc[test] if oos_metric_name == 'rmse': k_rmse = root_mean_squared_error(y_true, y_pred) metric.append(k_rmse) elif oos_metric_name == 'pseudo-r2': k_r2 = pseudo_r2(y_true, y_pred, np.mean(y.loc[train])) metric.append(k_r2) else: raise ValueError('No valid OOS metric provided.') av_k_metric = np.mean(metric) # Return model fit statistics and OOS metric return ( out['b'], out['p'], out['r2'], out['ll_gain_per_obs'], out['aic'], out['bic'], out['hqic'], av_k_metric, out['ll'], out['ll_null'], out['ll_gain'], out['ll_gain_per_obs'], out['nobs'], ) def _strap_OLS( self, comb_var: pd.DataFrame, group: Optional[str], sample_size: int, seed: int, y_star: Optional[np.ndarray], group_bootstrap_lookup: Optional[tuple[np.ndarray, dict[Any, np.ndarray]]] = None, min_rows_after_filter: Optional[int] = None, ) -> tuple: """ Call stripped_ols() over a random sample of the data containing y, x, and controls. Parameters ---------- comb_var : pd.DataFrame Combined y, x, and controls dataset. group : str or None Grouping column for fixed effects; sampling respects groups if provided. sample_size : int Number of observations in bootstrap sample. seed : int Random seed for resampling. y_star : array-like, optional Pseudo-outcome residuals for stratified bootstrap. If None, ``comb_var`` is expected to already include a ``y_star`` column. This optimized path is output-equivalent to passing ``comb_var`` and ``y_star`` separately. Returns ------- beta : float Bootstrap coefficient estimate for focal predictor. p : float Associated p-value. r2 : float In-sample R² for the bootstrap sample. """ if y_star is None: temp_data = comb_var else: temp_data = _prepare_ols_bootstrap_data(comb_var, y_star) if group is None: # Sample randomly from full data (no group structure) samp_df = temp_data.sample(n=sample_size, replace=True, random_state=seed) y = samp_df.iloc[:, [0]] y_star = samp_df.iloc[:, [-1]] x = samp_df.drop(columns=[samp_df.columns[0], 'y_star']) else: if min_rows_after_filter is None: n_predictors = temp_data.drop( columns=[temp_data.columns[0], "y_star", group], errors="ignore" ).shape[1] min_rows_after_filter = max(5, n_predictors + 2) # Cluster bootstrap with singleton guard. sample_df, used_filtered, n_filtered, n_selected, attempts_used = _cluster_bootstrap_singleton_safe( temp_data=temp_data, group=group, seed=seed, target_rows=sample_size, min_rows_after_filter=min_rows_after_filter, group_lookup=group_bootstrap_lookup, ) if not used_filtered: warnings.warn( f"Grouped bootstrap singleton filtering kept {n_filtered} rows (< {min_rows_after_filter}) after " f"{attempts_used} attempt(s); falling back to unfiltered grouped sample ({n_selected} rows).", UserWarning ) # Drop grouping variable and extract y, y_star, and x no_singleton = sample_df.drop(columns=[group]) y = no_singleton.iloc[:, [0]] y_star = no_singleton.iloc[:, no_singleton.columns.get_loc('y_star')].to_frame() x = no_singleton.drop(columns=[no_singleton.columns[0], 'y_star']) # Fit standard OLS and y_star model # When using group-demeaning (fixed effects), don't add const since intercept is absorbed output = stripped_ols(y=y, x=x, add_const=(group is None)) output_ystar = stripped_ols(y=y_star, x=x, add_const=(group is None)) b = output['b'] p = output['p'] r2 = output['r2'] b_ystar = output_ystar['b'] p_ystar = output_ystar['p'] return b[0][0], p[0][0], r2, b_ystar[0][0], p_ystar[0][0]
[docs] class LRobust(BaseRobust): """ A class to perform logistic regression analysis using statsmodels.Logit. Parameters ---------- y : list[str] Name of the dependent binary variable, supplied as a one-element list. Multiple binary outcomes are not currently supported in a single LRobust fit. x : list[str] Predictor column name(s) included in every specification. The first element is treated as the reported focal estimand; any additional elements are fixed predictors that do not vary across the control-subset space. data : pandas.DataFrame The dataset containing `y`, `x`, and any optional controls. model_name : str, default='Logistic Regression Robust' Custom label for this model, used in outputs and plots. Attributes ---------- results : OLSResult Populated after calling `.fit()`. Contains all coefficient, p-value, and metric outputs. """ def __init__( self, *, y: List[str], x: List[str], data: pd.DataFrame, model_name: str = 'Logistic Regression Robust' ) -> None: """ Initialize the logistic robust analysis. Parameters ---------- y : str or list of str Dependent variable name(s). x : str or list of str Predictor variable name(s). data : pandas.DataFrame Input dataset. model_name : str, optional Descriptive name for the model. Default 'Logistic Regression Robust'. """ super().__init__(y=y, x=x, data=data, model_name=model_name)
[docs] def get_results(self) -> Any: """ Get the results of the logistic regression. Returns ------- results : OLSResult Object containing the regression results. """ return self.results
[docs] def multiple_y(self) -> None: raise NotImplementedError("Not implemented yet")
def _full_sample( self, comb_var: pd.DataFrame, kfold: int, group: Optional[str], oos_metric_name: str ) -> Tuple[np.ndarray, np.ndarray, float, float, float, float, float, Optional[float]]: """ Run logistic regression over the full dataset (with y, x, controls), and optionally compute cross-validated out-of-sample metrics. Parameters ---------- comb_var : pandas.DataFrame DataFrame with columns [y, x, controls...]. kfold : int Number of folds for cross-validation. group : str or None Grouping variable name (reserved for future use with fixed effects). oos_metric_name : str Out-of-sample metric to compute (if kfold > 1). Options: 'mcfaddens-r2', 'pseudo-r2', 'rmse', 'cross-entropy', or 'imv'. Returns ------- beta : ndarray Coefficient estimates from full sample regression. p : ndarray P-value estimates from full sample regression. r2 : float In-sample pseudo-R² from full model. ll : float Log-likelihood. aic : float Akaike Information Criterion. bic : float Bayesian Information Criterion. hqic : float Hannan-Quinn Information Criterion. av_k_metric : float or None Average k-fold cross-validation metric, or None if no cross-validation. """ # TODO: Implement Fixed Effects Logistic Regression (FE) # Split into response (y) and predictors (x). If `group` is provided, # exclude it from the model matrix but use it for grouped CV splits. y = comb_var.iloc[:, [0]] x_temp = comb_var.drop(comb_var.columns[0], axis=1) if group is not None: if group not in x_temp.columns: raise ValueError( f"Grouping column '{group}' must be present in comb_var when group CV is requested." ) groups = x_temp[group] x = x_temp.drop(columns=[group]) else: groups = None x = x_temp # Fit logistic regression on the full sample out = logistic_regression_sm(y=y, x=x) # Initialize output for cross-validation metric av_k_metric = None # Perform k-fold cross-validation if requested if kfold: if group is not None: n_groups = pd.Series(groups).nunique(dropna=False) if kfold > n_groups: raise ValueError( f"kfold={kfold} cannot exceed number of unique groups ({n_groups})." ) k_fold = GroupKFold(n_splits=kfold) split_iter = k_fold.split(x, y, groups=groups) else: y_vec = y.iloc[:, 0].to_numpy() class_counts = pd.Series(y_vec).value_counts(dropna=False) if class_counts.shape[0] < 2: raise ValueError( "Logistic cross-validation requires at least two outcome classes " "after preprocessing." ) min_class_count = int(class_counts.min()) if kfold > min_class_count: raise ValueError( f"kfold={kfold} cannot exceed the minority-class count " f"({min_class_count}) for stratified logistic CV." ) k_fold = StratifiedKFold(n_splits=kfold, shuffle=True, random_state=self.seed) split_iter = k_fold.split(x, y_vec) metric = [] for _, (train, test) in enumerate(split_iter): # Fit model on training fold out_k = logistic_regression_sm(y=y.iloc[train], x=x.iloc[train]) # Predict on test fold y_pred = self._predict_LR(x.iloc[test], out_k['b']) y_true = y.iloc[test] # Evaluate prediction accuracy using selected OOS metric if oos_metric_name == 'rmse': k_rmse = root_mean_squared_error(y_true, y_pred) metric.append(k_rmse) elif oos_metric_name in {'mcfaddens-r2', 'mcfadden-r2'}: k_r2 = mcfadden_r2(y_true, y_pred, np.mean(y.iloc[train])) metric.append(k_r2) elif oos_metric_name == 'pseudo-r2': k_r2 = pseudo_r2(y_true, y_pred, np.mean(y.iloc[train])) metric.append(k_r2) elif oos_metric_name == 'cross-entropy': k_cross_entropy = log_loss(y_true, y_pred) metric.append(k_cross_entropy) elif oos_metric_name == 'imv': imv = calculate_imv_score( y_true, y_pred, null_mean=float(np.mean(y.iloc[train])), ) metric.append(imv) else: raise ValueError('No valid OOS metric provided.') # Average metric across all folds av_k_metric = np.mean(metric) # Return model statistics and cross-validation result return ( out['b'], # Coefficients out['p'], # P-values out['r2'], # In-sample pseudo-R² out['ll'], # Log-likelihood out['aic'], # AIC out['bic'], # BIC out['hqic'], # HQIC av_k_metric # Cross-validation metric (if any) ) def _predict_LR( self, x_test: pd.DataFrame, betas: np.ndarray ) -> np.ndarray: """ Predict the dependent variable using the estimated coefficients. Parameters ---------- x_test : pandas.DataFrame Test set covariates (no constant column). betas : ndarray Fitted coefficients (including intercept at end). Returns ------- ndarray Predicted probabilities for the positive class. """ return 1 / (1 + np.exp(-x_test.dot(betas)))
[docs] def fit( self, *, controls: List[str], group: Optional[str] = None, draws: int = None, sample_size: Optional[int] = None, kfold: int = None, oos_metric: str = None, n_cpu: Optional[int] = None, seed: Optional[int] = None, rescale_x: Optional[bool] = False, rescale_y: Optional[bool] = False, rescale_z: Optional[bool] = False, compute_shap: bool = True, threshold: int = 1000000 ) -> 'LRobust': """ Fit the logistic regression models over the specification space and bootstrap samples. Parameters ---------- controls : list of str Names of optional control variables to include in every spec. group : str, optional Grouping variable used for grouped cross-validation and cluster bootstrap resampling. Logistic fixed-effects demeaning is not currently implemented. draws : int, default=None Number of bootstrap resamples per specification. sample_size : int, optional Number of observations per bootstrap draw; defaults to full dataset. kfold : int, default=None Folds for out-of-sample CV; set to 0 to disable. oos_metric : default None. Options: {'pseudo-r2', 'mcfaddens-r2', 'imv', 'rmse','cross-entropy'}. Metric to compute on held-out folds. n_cpu : int, optional Number of parallel jobs; defaults to all available. seed : int, optional Random seed for reproducibility. rescale_y : bool, default=False Rescale the dependent variable. rescale_x : bool, default=False Rescale the x variable. rescale_z : bool, default=False Rescale the z variables. compute_shap : bool, default=True If True, compute SHAP values for plotting. Set False for fit-only profiling. threshold : int, default=1000000 Warn if `draws * n_specs` exceeds this. Returns ------- self : LRobust Self, with `.results` populated as an `OLSResult`. """ # Rescale dependent variable (y) if needed if rescale_y is True: if len(self.y) > 1: warnings.warn( "Rescaling of the dependent variable already occurs with multiple " "y variables. Skipping additional rescaling.", UserWarning ) else: self.data[self.y] = rescale(self.data[self.y]) # Rescale independent variables (x) if rescale_x is True: for column in self.x: self.data[column] = rescale(self.data[column]) # Rescale control variables (z) if rescale_z is True: for column in controls: self.data[column] = rescale(self.data[column]) # Prepare modeling setup (includes seed, CPU count, metric, etc.) draws, kfold, oos_metric, n_cpu, seed = make_inquiry( self.model_name, self.y, self.data, draws, kfold, oos_metric, n_cpu, seed ) self.seed = seed self.data = self.data.copy() if oos_metric == 'mcfadden-r2': oos_metric = 'mcfaddens-r2' # Logistic fixed-effects demeaning is not implemented. If `group` is # provided, we still use it for grouped CV/bootstrap resampling, but do # not demean y/x/z. if group is not None: warnings.warn( "[LRobust] Logistic fixed-effects demeaning is unsupported; " "`group` will be used for grouped CV/bootstrap resampling only (no demeaning).", UserWarning ) # Ensure a single constant term in the design matrix. self.x, controls = _ensure_single_constant(self.data, self.y, self.x, controls) # Validate modeling parameters self._validate_fit_args( controls=controls, group=group, draws=draws, kfold=kfold, oos_metric=oos_metric, n_cpu=n_cpu, seed=seed, valid_oos_metrics=['pseudo-r2', 'mcfaddens-r2', 'rmse', 'cross-entropy', 'imv'], threshold=threshold ) # Set seed for reproducibility np.random.seed(self.seed) # Store the selected out-of-sample evaluation metric self.oos_metric_name = oos_metric # Log model configuration print( f"LRobust is running with n_cpu={n_cpu}, draws={draws}, " f"folds={kfold}, seed={seed}.\n" f"We're evaluating our out-of-sample predictions with the " f"{oos_metric} metric.\n" f"The target variable of interest is {self.x[0]}. " f"Let's begin the calculations..." ) # Default to using the full dataset size if sample_size is not provided if sample_size is None: sample_size = self.data.shape[0] # Currently only supports one dependent variable for logistic regression if len(self.y) > 1: raise NotImplementedError("Not implemented yet for logistic regression") # TODO: Add multi-y logistic support else: z_specs = [tuple(spec) for spec in all_subsets(controls)] space_n = len(z_specs) control_positions = {name: idx for idx, name in enumerate(controls)} predictor_prefix = self.x + ([group] if group else []) base_predictors_df = self.data[predictor_prefix + controls] y_single_df = self.data[self.y] spec_predictor_cache: dict[int, pd.DataFrame] = {} def _get_lr_predictors_for_spec(spec_tuple: tuple[str, ...]) -> pd.DataFrame: mask = _spec_bitmask(spec_tuple, control_positions) cached = spec_predictor_cache.get(mask) if cached is None: cols = predictor_prefix + list(spec_tuple) cached = base_predictors_df.loc[:, cols] spec_predictor_cache[mask] = cached return cached notebook_draws_bar = _make_notebook_draws_bar( total=space_n * draws, description="Bootstrap draws" ) specs = [] all_predictors = [] b_all_list = [] p_all_list = [] # Initialize arrays to store model statistics across all control sets b_array = np.empty([space_n, draws]) p_array = np.empty([space_n, draws]) b_array_ystar = np.empty([space_n, draws]) p_array_ystar = np.empty([space_n, draws]) r2_array = np.empty([space_n, draws]) r2i_array = np.empty([space_n]) ll_array = np.empty([space_n]) aic_array = np.empty([space_n]) bic_array = np.empty([space_n]) hqic_array = np.empty([space_n]) av_k_metric_array = np.empty([space_n]) shap_return = None if compute_shap: import shap # Preprocess data for SHAP values. Do not demean for logistic models. if group: SHAP_comb = self.data[self.y + self.x + [group] + controls] else: SHAP_comb = self.data[self.y + self.x + controls] SHAP_comb = SHAP_comb.dropna().reset_index(drop=True).copy() # Split data for training and testing x_train, x_test, y_train, _ = train_test_split( SHAP_comb[self.x + controls].drop(columns=['const'], errors='ignore'), SHAP_comb[self.y], test_size=0.2, random_state=self.seed ) # Train logistic regression for SHAP explainability model = sklearn.linear_model.LogisticRegression(penalty="l2", C=0.1) model.fit(x_train, y_train.squeeze()) explainer = shap.LinearExplainer(model, x_train) shap_return = [explainer.shap_values(x_test), x_test] # Loop through all subsets of control variables for index, spec in _progress_iter( enumerate(z_specs), total=space_n, description="Control specs", ): spec_predictors = _get_lr_predictors_for_spec(spec) comb = pd.concat([y_single_df, spec_predictors], axis=1) comb = comb.dropna().reset_index(drop=True).copy() if group: X_design = comb.drop(columns=[comb.columns[0], group], errors='ignore') else: X_design = comb.drop(columns=comb.columns[0]) # Drop y column self._check_colinearity(X_design) # Estimate full-sample model metrics (b_all, p_all, r2_i, ll_i, aic_i, bic_i, hqic_i, av_k_metric_i) = self._full_sample( comb, kfold=kfold, group=group, oos_metric_name=self.oos_metric_name ) # Bootstrap sampling (parallelized) seeds = np.random.randint(0, 2**32 - 1, size=draws, dtype=np.int64) group_bootstrap_lookup = ( _make_group_bootstrap_lookup(comb, group) if group else None ) if group: n_predictors = comb.drop( columns=[comb.columns[0], group], errors="ignore" ).shape[1] min_rows_after_filter = max(5, n_predictors + 2) else: min_rows_after_filter = None def _run_one_seed(seed_i: int): return self._strap_regression( comb, group, sample_size, seed_i, group_bootstrap_lookup=group_bootstrap_lookup, min_rows_after_filter=min_rows_after_filter, ) bootstrap_out = _run_parallel_seed_batches( seeds=seeds, n_cpu=n_cpu, run_one_seed=_run_one_seed, draws_bar=notebook_draws_bar ) b_list, p_list, r2_list = zip(*bootstrap_out) # Save results specs.append(frozenset(spec)) all_predictors.append(self.x + list(spec)) b_array[index, :] = b_list p_array[index, :] = p_list b_array_ystar[index, :] = np.nan * len(b_list) p_array_ystar[index, :] = np.nan * len(p_list) r2_array[index, :] = r2_list r2i_array[index] = r2_i ll_array[index] = ll_i aic_array[index] = aic_i bic_array[index] = bic_i hqic_array[index] = hqic_i av_k_metric_array[index] = av_k_metric_i b_all_list.append(b_all) p_all_list.append(p_all) # Store all modeling results in an OLSResult object self.results = OLSResult( y=self.y[0], x=self.x[0], data=self.data, specs=specs, all_predictors=all_predictors, controls=controls, draws=draws, kfold=kfold, all_b=b_all_list, all_p=p_all_list, estimates=b_array, p_values=p_array, estimates_ystar=b_array_ystar, p_values_ystar=p_array_ystar, r2_values=r2_array, r2i_array=r2i_array, ll_array=ll_array, aic_array=aic_array, bic_array=bic_array, hqic_array=hqic_array, av_k_metric_array=av_k_metric_array, model_name=self.model_name, name_av_k_metric=self.oos_metric_name, shap_return=shap_return ) if notebook_draws_bar is not None: notebook_draws_bar.close()
def _strap_regression( self, comb_var: pd.DataFrame, group: Optional[str], sample_size: int, seed: int, group_bootstrap_lookup: Optional[tuple[np.ndarray, dict[Any, np.ndarray]]] = None, min_rows_after_filter: Optional[int] = None, ) -> Tuple[float, float, float]: """ Perform one bootstrap draw of logistic regression. Parameters ---------- comb_var : pandas.DataFrame DataFrame with `y` in first column and features in remaining columns. group : str, optional Grouping variable name used for cluster bootstrap resampling. sample_size : int Number of rows to sample with replacement. seed : int Random seed for sampling. Returns ------- beta : float Estimated coefficient for the primary predictor. p : float Corresponding p-value. r2 : float In-sample pseudo-R² of this bootstrap sample. """ temp_data = comb_var if group is None: samp_df = temp_data.sample(n=sample_size, replace=True, random_state=seed) y = samp_df.iloc[:, [0]] x = samp_df.drop(samp_df.columns[0], axis=1) else: if min_rows_after_filter is None: n_predictors = temp_data.drop( columns=[temp_data.columns[0], group], errors="ignore" ).shape[1] min_rows_after_filter = max(5, n_predictors + 2) # Cluster bootstrap with singleton guard. sample_df, used_filtered, n_filtered, n_selected, attempts_used = _cluster_bootstrap_singleton_safe( temp_data=temp_data, group=group, seed=seed, target_rows=sample_size, min_rows_after_filter=min_rows_after_filter, group_lookup=group_bootstrap_lookup, ) if not used_filtered: warnings.warn( f"Grouped bootstrap singleton filtering kept {n_filtered} rows (< {min_rows_after_filter}) after " f"{attempts_used} attempt(s); falling back to unfiltered grouped sample ({n_selected} rows).", UserWarning ) no_singleton = sample_df.drop(columns=[group]) y = no_singleton.iloc[:, [0]] x = no_singleton.drop(columns=no_singleton.columns[0]) output = logistic_regression_sm(y, x) return output['b'][0][0], output['p'][0][0], output['r2']