"""robustipy.models
This module implements multivariate regression classes for Robust Inference.
It includes classes for OLS (OLSRobust and OLSResult) and logistic regression (LRobust)
analysis, along with utilities for model merging, plotting, and Bayesian model averaging.
"""
from __future__ import annotations
import _pickle
import os
import warnings
from typing import Any, Optional, Sequence, List, Tuple, Union
import numpy as np
import pandas as pd
import sys
import sklearn
from joblib import Parallel, delayed
from rich.progress import track
from scipy.stats import norm, t as student_t
from sklearn.metrics import log_loss, root_mean_squared_error
from sklearn.model_selection import GroupKFold, KFold, StratifiedKFold, train_test_split
from statsmodels.tools.tools import add_constant
from robustipy.prototypes import Protoresult, BaseRobust
from robustipy.utils import (
all_subsets,
group_demean,
logistic_regression_sm,
simple_ols,
space_size,
mcfadden_r2,
pseudo_r2,
calculate_imv_score,
sample_z_masks,
_ensure_single_constant,
_find_group_invariant_columns,
rescale,
make_inquiry,
stripped_ols
)
def _progress_iter(iterable, *, total: int, description: str = "Working..."):
"""
Return a progress-enabled iterator in terminal sessions and a plain iterator
in notebooks / non-interactive outputs.
"""
in_jupyter = _in_jupyter()
if in_jupyter:
# Notebook progress is handled with draw-level tqdm bars.
return iterable
has_tty_output = sys.stdout.isatty() or sys.stderr.isatty()
return track(
iterable,
total=total,
description=description,
disable=not (has_tty_output or in_jupyter),
)
def _in_jupyter() -> bool:
"""True when running inside a Jupyter kernel."""
try:
from IPython import get_ipython
ip = get_ipython()
if ip is None:
return bool(os.environ.get("JPY_PARENT_PID"))
shell_name = ip.__class__.__name__
if shell_name == "ZMQInteractiveShell":
return True
if hasattr(ip, "kernel"):
return True
cfg = getattr(ip, "config", {})
if isinstance(cfg, dict):
if "IPKernelApp" in cfg:
return True
else:
if "IPKernelApp" in str(cfg):
return True
return bool(os.environ.get("JPY_PARENT_PID"))
except (ImportError, AttributeError):
return bool(os.environ.get("JPY_PARENT_PID"))
def _make_notebook_draws_bar(*, total: int, description: str):
"""
Build a tqdm progress bar for notebooks. Returns None outside notebooks or
when tqdm isn't available.
"""
if not _in_jupyter():
return None
try:
from tqdm.auto import tqdm
except ImportError:
return None
return tqdm(
total=total,
desc=description,
leave=True,
miniters=1,
mininterval=1.0,
smoothing=0,
unit="draw",
unit_scale=False,
bar_format=(
"{desc}: {percentage:5.1f}%|{bar}| "
"{n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]"
),
)
def _run_parallel_seed_batches(
*,
seeds,
n_cpu: int,
run_one_seed,
draws_bar=None,
dispatch_mode: str = "streaming",
task_batch_size: Optional[int] = None,
):
"""
Execute bootstrap seeds while preserving seed order and allowing notebook
progress to advance during a specification.
"""
outputs = []
if len(seeds) == 0:
return outputs
cpu = max(1, int(n_cpu))
progress_batch_size = max(8, cpu)
if dispatch_mode not in {"streaming", "batched", "chunked"}:
raise ValueError("dispatch_mode must be 'streaming', 'batched', or 'chunked'.")
parallel_kwargs = {
"n_jobs": cpu,
# Keep process parallelism for CPU-bound bootstrap fits, but disable
# joblib's automatic memmapping so large profiler runs do not accumulate
# thousands of temporary memmap folders.
"backend": "loky",
"max_nbytes": None,
"mmap_mode": None,
}
def _update_draws_bar(n: int) -> None:
if draws_bar is None or n <= 0:
return
draws_bar.update(n)
if cpu == 1:
pending_progress = 0
for seed in seeds:
outputs.append(run_one_seed(int(seed)))
pending_progress += 1
if pending_progress >= progress_batch_size:
_update_draws_bar(pending_progress)
pending_progress = 0
_update_draws_bar(pending_progress)
return outputs
if dispatch_mode == "batched":
batch_size = max(8, cpu)
for start in range(0, len(seeds), batch_size):
seed_batch = seeds[start:start + batch_size]
batch_output = Parallel(**parallel_kwargs)(
delayed(run_one_seed)(int(seed))
for seed in seed_batch
)
outputs.extend(batch_output)
_update_draws_bar(len(batch_output))
return outputs
if dispatch_mode == "chunked":
if task_batch_size is None:
task_batch_size = max(32, int(np.ceil(len(seeds) / (cpu * 4))))
task_batch_size = max(1, int(task_batch_size))
def _run_seed_chunk(seed_chunk):
return [run_one_seed(int(seed)) for seed in seed_chunk]
seed_chunks = [
seeds[start:start + task_batch_size]
for start in range(0, len(seeds), task_batch_size)
]
parallel_output = Parallel(**parallel_kwargs)(
delayed(_run_seed_chunk)(seed_chunk)
for seed_chunk in seed_chunks
)
for batch_output in parallel_output:
outputs.extend(batch_output)
_update_draws_bar(len(batch_output))
return outputs
parallel_output = Parallel(**parallel_kwargs, return_as="generator")(
delayed(run_one_seed)(int(seed))
for seed in seeds
)
pending_progress = 0
for output in parallel_output:
outputs.append(output)
pending_progress += 1
if pending_progress >= progress_batch_size:
_update_draws_bar(pending_progress)
pending_progress = 0
_update_draws_bar(pending_progress)
return outputs
def _spec_bitmask(spec: Sequence[str], control_positions: dict[str, int]) -> int:
"""
Encode a control specification into an integer bitmask.
The i-th control variable in `control_positions` maps to bit i.
"""
mask = 0
for name in spec:
pos = control_positions.get(name)
if pos is not None:
mask |= (1 << pos)
return mask
def _make_group_bootstrap_lookup(
temp_data: pd.DataFrame,
group: str
) -> tuple[np.ndarray, dict[Any, np.ndarray]]:
"""Precompute the group order and row positions used by grouped bootstrap."""
unique_groups = temp_data[group].unique()
if unique_groups.size == 0:
raise ValueError(f"No groups available in column '{group}'.")
group_positions = {
g: np.asarray(indexes, dtype=np.intp)
for g, indexes in temp_data.groupby(group, sort=False, observed=True).indices.items()
}
return unique_groups, group_positions
def _prepare_ols_bootstrap_data(
comb_var: pd.DataFrame,
y_star: np.ndarray
) -> pd.DataFrame:
"""Build the OLS bootstrap DataFrame once per spec instead of once per draw."""
temp_data = comb_var.copy()
temp_data['y_star'] = y_star
return temp_data
def _prepare_non_group_ols_bootstrap_arrays(
temp_data: pd.DataFrame
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Precompute arrays for the non-grouped OLS bootstrap path."""
y_values = temp_data.iloc[:, [0]].to_numpy()
y_star_values = temp_data.iloc[:, [-1]].to_numpy()
x_df = temp_data.drop(columns=[temp_data.columns[0], 'y_star']).copy()
x_df['const'] = 1
x_values = x_df.to_numpy()
return y_values, y_star_values, x_values
def _stripped_ols_arrays(y: np.ndarray, x: np.ndarray) -> dict:
"""Array equivalent of stripped_ols() for pre-sampled numeric OLS data."""
if x.size == 0 or y.size == 0:
raise ValueError("Inputs must not be empty.")
with np.errstate(divide='ignore', invalid='ignore'):
try:
inv_xx = np.linalg.inv(np.dot(x.T, x))
except np.linalg.LinAlgError:
inv_xx = np.linalg.pinv(np.dot(x.T, x))
xy = np.dot(x.T, y)
b = np.dot(inv_xx, xy)
nobs = y.shape[0]
ncoef = x.shape[1]
df_e = nobs - ncoef
e = y - np.dot(x, b)
sse = np.dot(e.T, e) / df_e
se = np.sqrt(np.diagonal(sse * inv_xx))
se_col = se.reshape(-1, 1)
t_values = b / se_col
t_values = np.where(se_col == 0, np.nan, t_values)
p = np.where(
np.isfinite(t_values),
(1 - student_t.cdf(abs(t_values), df_e)) * 2,
np.nan
)
r2 = 1 - e.var() / y.var()
r2_adj = 1 - (1 - r2) * ((nobs - 1) / (nobs - ncoef))
return {'b': b, 'p': p, 'r2': r2_adj}
def _strap_non_group_OLS_arrays(
y_values: np.ndarray,
y_star_values: np.ndarray,
x_values: np.ndarray,
sample_size: int,
seed: int,
) -> tuple:
"""Run one non-grouped OLS bootstrap draw from precomputed arrays."""
n_rows = y_values.shape[0]
row_positions = np.random.RandomState(seed).choice(
np.arange(n_rows),
size=sample_size,
replace=True,
)
y = np.asfortranarray(y_values[row_positions])
y_star = np.asfortranarray(y_star_values[row_positions])
x = np.asfortranarray(x_values[row_positions])
output = _stripped_ols_arrays(y=y, x=x)
output_ystar = _stripped_ols_arrays(y=y_star, x=x)
return (
output['b'][0][0],
output['p'][0][0],
output['r2'],
output_ystar['b'][0][0],
output_ystar['p'][0][0],
)
def _cluster_bootstrap_by_rows(
temp_data: pd.DataFrame,
group: str,
seed: int,
target_rows: int,
group_lookup: Optional[tuple[np.ndarray, dict[Any, np.ndarray]]] = None,
) -> pd.DataFrame:
"""
Draw a cluster bootstrap sample by repeatedly sampling groups (with replacement)
until at least `target_rows` observations are collected.
Notes
-----
- Preserves cluster integrity (whole groups are appended each draw).
- Output may exceed `target_rows` due to whole-cluster appends.
"""
if group_lookup is None:
group_lookup = _make_group_bootstrap_lookup(temp_data, group)
unique_groups, group_positions = group_lookup
target_rows = max(1, int(target_rows))
rng = np.random.default_rng(seed)
sampled_positions = []
n_rows = 0
while n_rows < target_rows:
g = rng.choice(unique_groups)
positions = group_positions[g]
sampled_positions.append(positions)
n_rows += positions.size
row_positions = np.concatenate(sampled_positions)
return temp_data.iloc[row_positions].reset_index(drop=True)
def _cluster_bootstrap_singleton_safe(
temp_data: pd.DataFrame,
group: str,
seed: int,
target_rows: int,
min_rows_after_filter: int,
max_attempts: int = 8,
group_lookup: Optional[tuple[np.ndarray, dict[Any, np.ndarray]]] = None,
) -> tuple[pd.DataFrame, bool, int, int, int]:
"""
Draw grouped bootstrap samples and remove singleton groups when possible.
If singleton filtering repeatedly yields too few rows, fall back to the
unfiltered grouped bootstrap sample to avoid hard crashes.
Returns
-------
sample_df : pd.DataFrame
Chosen sample (filtered or fallback unfiltered).
used_filtered : bool
True when the returned sample passed singleton filtering.
filtered_rows : int
Number of rows in the last singleton-filtered sample.
selected_rows : int
Number of rows in the corresponding unfiltered sample.
attempts_used : int
Number of attempts performed.
"""
min_rows_after_filter = max(1, int(min_rows_after_filter))
max_attempts = max(1, int(max_attempts))
last_selected = None
last_filtered = None
for attempt in range(max_attempts):
attempt_seed = int(seed) + attempt
selected = _cluster_bootstrap_by_rows(
temp_data=temp_data,
group=group,
seed=attempt_seed,
target_rows=target_rows,
group_lookup=group_lookup,
)
group_sizes = selected.groupby(group)[group].transform("size")
filtered = selected.loc[group_sizes > 1].copy()
last_selected = selected
last_filtered = filtered
if len(filtered) >= min_rows_after_filter:
return filtered, True, len(filtered), len(selected), attempt + 1
# Singleton filtering remained too aggressive; use clustered sample as fallback.
assert last_selected is not None
filtered_rows = 0 if last_filtered is None else len(last_filtered)
return last_selected.copy(), False, filtered_rows, len(last_selected), max_attempts
[docs]
def stouffer_method(
p_values,
*,
two_sided=True,
betas=None, # if missing with two_sided=True, use unsigned z's
p_values_ystar=None, # null/bootstrap p-values, shape (n_specs, n_draws)
betas_ystar=None, # null/bootstrap betas, shape (n_specs, n_draws)
weights=None, # optional Stouffer weights (per spec)
clip_floor=1e-300, # must be >= np.finfo(float).tiny
na_action="omit",
warn: bool = True
):
"""
Combine p-values via a Stouffer test aligned with OLSResult._compute_inference.
Uses observed full-sample p-values and coefficients to compute ``Z_obs``.
If null draws (``p_values_ystar`` and ``betas_ystar``) are supplied, the
p-value is calibrated by two-sided Monte Carlo:
``p = (1 + sum(abs(Z_null) >= abs(Z_obs))) / (B + 1)``.
Dependence is estimated from null z-vectors (PSD projection + ridge). If
null draws are unavailable, the method falls back to an asymptotic two-sided
p-value based on ``Z_obs``.
Returns
-------
tuple[float, float]
(Z_obs, p_value). For two_sided=True this is a two-sided p-value.
"""
import numpy as np
from scipy.stats import norm
# ---- 0) observed inputs
p_obs_all = np.asarray(p_values, dtype=np.float64).reshape(-1)
n_specs = p_obs_all.size
if n_specs == 0:
raise ValueError("`p_values` must contain at least one element.")
if betas is None:
if warn and two_sided:
print("WARNING: `betas` not provided; using unsigned z-scores for observed statistics.")
b_obs_all = np.ones(n_specs, dtype=np.float64)
else:
b_obs_all = np.asarray(betas, dtype=np.float64).reshape(-1)
if b_obs_all.size != n_specs:
raise ValueError("`betas` length must match `p_values` length.")
# ---- 1) validate observed p and optionally omit invalid
obs_valid = np.isfinite(p_obs_all) & (p_obs_all >= 0.0) & (p_obs_all <= 1.0) & np.isfinite(b_obs_all)
if not obs_valid.all() and na_action == "raise":
bad = np.where(~obs_valid)[0].tolist()
raise ValueError(f"Invalid observed inputs at indices {bad}.")
# ---- 2) prepare null matrices (optional) and keep mask from null variability
p_matrix = None
beta_matrix = None
keep_null = None
z_corr = None
if (p_values_ystar is not None) and (betas_ystar is not None):
p_matrix = np.asarray(p_values_ystar, dtype=np.float64)
beta_matrix = np.asarray(betas_ystar, dtype=np.float64)
if p_matrix.shape != beta_matrix.shape:
raise ValueError("`p_values_ystar` and `betas_ystar` must have identical shape.")
if p_matrix.ndim != 2:
raise ValueError("`p_values_ystar` and `betas_ystar` must be 2D (n_specs, n_draws).")
# Accept transposed input if obvious.
if p_matrix.shape[0] != n_specs and p_matrix.shape[1] == n_specs:
p_matrix = p_matrix.T
beta_matrix = beta_matrix.T
if p_matrix.shape[0] != n_specs:
raise ValueError("Null matrices must have n_specs rows matching observed p-values.")
if p_matrix.shape[1] > 1:
floor_min = np.finfo(float).tiny
clip_floor_eff = max(float(clip_floor), floor_min) if (clip_floor is not None and np.isfinite(clip_floor) and clip_floor > 0.0) else floor_min
clip_ceiling = 1.0 - 1e-16
p_clip_null = np.clip(p_matrix, clip_floor_eff, clip_ceiling)
if two_sided:
z_matrix = np.sign(beta_matrix) * norm.isf(p_clip_null / 2.0)
else:
z_matrix = norm.isf(p_clip_null)
# draws x specs
Z_null_all = z_matrix.T
z_std = np.std(Z_null_all, axis=0, ddof=1)
keep_null = np.isfinite(z_std) & (z_std > 1e-14)
if np.sum(keep_null) >= 2:
Zk = Z_null_all[:, keep_null]
R_hat = np.corrcoef(Zk, rowvar=False)
evals, evecs = np.linalg.eigh(R_hat)
evals[evals < 0.0] = 0.0
R_psd = (evecs * evals) @ evecs.T
R_psd = 0.5 * (R_psd + R_psd.T)
np.fill_diagonal(R_psd, 1.0)
z_corr = R_psd + 1e-12 * np.eye(R_psd.shape[0], dtype=R_psd.dtype)
# ---- 3) final keep mask
if keep_null is None:
keep = obs_valid.copy()
else:
keep = obs_valid & keep_null
if np.sum(keep) == 0:
if na_action == "raise":
raise ValueError("No valid tests available after applying observed/null filtering.")
return np.nan, np.nan
# ---- 4) prepare observed z
floor_min = np.finfo(float).tiny
if clip_floor is None or not np.isfinite(clip_floor) or clip_floor <= 0.0:
clip_floor_eff = floor_min
else:
clip_floor_eff = max(float(clip_floor), floor_min)
clip_ceiling = 1.0 - 1e-16
p_kept = p_obs_all[keep]
b_kept = b_obs_all[keep]
n_zeros = int(np.sum(p_kept == 0.0))
n_ones = int(np.sum(p_kept == 1.0))
if warn and (n_zeros or n_ones):
print(f"WARNING: clipping {n_zeros} p=0 and {n_ones} p=1 to "
f"[{clip_floor_eff:g}, {clip_ceiling:g}].")
p_kept = np.clip(p_kept, clip_floor_eff, clip_ceiling)
if two_sided:
z_obs = np.sign(b_kept) * norm.isf(p_kept / 2.0)
else:
z_obs = norm.isf(p_kept)
# Guard against residual inf/-inf due to extreme p
if not np.all(np.isfinite(z_obs)):
Z_MAX = 37.0
if warn:
print("WARNING: non-finite z encountered; clipping z to ±37.")
z_obs = np.clip(z_obs, -Z_MAX, Z_MAX)
# ---- 5) weights (aligned to kept specs)
if weights is None:
w = np.ones(int(np.sum(keep)), dtype=np.float64)
else:
w_all = np.asarray(weights, dtype=np.float64).reshape(-1)
if w_all.size != n_specs:
raise ValueError("`weights` must have same length as observed p-values.")
if not np.all(np.isfinite(w_all)):
raise ValueError("`weights` must be finite.")
w = w_all[keep]
if not np.any(w != 0.0):
raise ValueError("At least one kept weight must be nonzero.")
# ---- 6) denominator (mirrors _compute_inference behavior)
denom = None
if (z_corr is not None) and (keep_null is not None) and np.array_equal(keep, keep_null):
denom2 = float(w @ (z_corr @ w))
if np.isfinite(denom2) and denom2 > 0.0:
denom = float(np.sqrt(denom2))
if denom is None:
denom = float(np.sqrt(float(np.dot(w, w))))
Z_obs = float(np.dot(w, z_obs) / denom)
# ---- 7) p-value: null-calibrated if null draws exist; else asymptotic fallback
has_null = (
(p_matrix is not None) and (beta_matrix is not None) and
(p_matrix.size > 0) and (p_matrix.shape[1] >= 1) and
(not np.isnan(p_matrix).all())
)
if not has_null:
if two_sided:
p_val = float(2.0 * norm.sf(abs(Z_obs)))
else:
p_val = float(norm.sf(Z_obs))
return Z_obs, p_val
# Null z matrix aligned to kept specs
p_clip_null = np.clip(p_matrix, clip_floor_eff, clip_ceiling)
if two_sided:
z_null_matrix = np.sign(beta_matrix) * norm.isf(p_clip_null / 2.0)
else:
z_null_matrix = norm.isf(p_clip_null)
Z_null_specs = z_null_matrix.T[:, keep] # draws x kept_specs
Z_null = (Z_null_specs @ w) / denom
B = int(Z_null.shape[0])
p_val = float((1.0 + np.sum(np.abs(Z_null) >= abs(Z_obs))) / (B + 1.0))
return Z_obs, p_val
[docs]
class MergedResult(Protoresult):
"""
Combine and summarize results exclusively from one or more OLSResult runs.
Parameters
----------
y : str
Dependent variable name shared by all merged results.
specs : Sequence[Sequence[str]]
List of specifications; each inner sequence names the controls defining one spec.
estimates : array-like or pandas.DataFrame
Coefficient estimates for each spec (rows) and bootstrap draw (columns).
p_values : array-like or pandas.DataFrame
Corresponding p-values for each estimate.
r2_values : array-like or pandas.DataFrame
R² values for each spec and draw.
Attributes
----------
y_name : str
Name of the dependent variable.
specs_names : pandas.Series[frozenset]
Each entry is the set of control variables defining a spec.
estimates : pandas.DataFrame
Coefficient estimates by spec and draw.
p_values : pandas.DataFrame
P-values by spec and draw.
r2_values : pandas.DataFrame
R² values by spec and draw.
summary_df : pandas.DataFrame
Per-spec summary with median, min, max, and 95% confidence intervals.
"""
def __init__(
self,
*,
y: str,
specs: Sequence[Sequence[str]],
estimates: Union[np.ndarray, pd.DataFrame],
p_values: Union[np.ndarray, pd.DataFrame],
r2_values: Union[np.ndarray, pd.DataFrame],
) -> None:
super().__init__()
self.y_name = y
self.specs_names = pd.Series(specs)
self.estimates = pd.DataFrame(estimates)
self.p_values = pd.DataFrame(p_values)
self.r2_values = pd.DataFrame(r2_values)
self.summary_df = self._compute_summary()
self.summary_df['spec_name'] = self.specs_names
def _compute_summary(self) -> pd.DataFrame:
"""
Computes summary statistics based on coefficient estimates.
Returns:
pd.DataFrame: DataFrame containing median, min, max, and quantiles.
"""
data = self.estimates.copy()
out = pd.DataFrame()
out['median'] = data.median(axis=1)
out['max'] = data.max(axis=1)
out['min'] = data.min(axis=1)
out['ci_up'] = data.quantile(q=0.975, axis=1, interpolation='nearest')
out['ci_down'] = data.quantile(q=0.025, axis=1, interpolation='nearest')
return out
[docs]
def plot(
self,
loess: bool = True,
ci: float = 1,
specs: Optional[List[List[str]]] = None,
colormap: str = 'viridis',
figsize: Tuple[int, int] = (16, 14),
ext: str = 'pdf',
figpath: str = None,
highlights: bool = True,
project_name: str = None,
oddsratio: bool = False,
) -> plt.Figure:
"""
Plot specification results highlighting up to three specs.
Parameters
----------
loess : bool
Whether to apply LOESS smoothing to confidence intervals.
specs : list of list of str, optional
Specifications to highlight.
colormap : str
Matplotlib colormap name.
figsize : tuple
Figure size (width, height).
ext : str
File extension for saving.
figpath : str or Path, optional
Directory in which to save outputs; if None, uses current working dir.
project_name : str
Prefix for saved figure.
bighlights: bool
Whether to highlight specs
oddsratio bool, default=False
Whether to exponentiate the coefficients (e.g. for odds ratios).
Returns
-------
matplotlib.figure.Figure:
Plot showing the regression results.
"""
if specs is not None and len(specs) == 0:
specs = None
if specs is not None:
if not all(isinstance(spec, list) for spec in specs):
raise TypeError("'specs' must be a list of lists.")
if len(specs) > 3:
raise ValueError("The max number of specifications to highlight is 3")
if not all(frozenset(spec) in self.specs_names.to_list() for spec in specs):
raise TypeError("All specifications in 'specs' must be in the valid computed specifications.")
warnings.warn(
"MergedResult plotting is not supported yet. "
"Plot each constituent OLSResult separately.",
UserWarning
)
return None
[docs]
def summary(self, digits: int = 3) -> None:
"""
Print a compact summary for merged OLS-style results.
"""
print("==============================")
print("Merged Model Summary")
print("==============================")
print(f"Dependent variable: {self.y_name}")
print(f"Number of specifications: {len(self.specs_names)}")
print("==============================")
print("Coefficient Distribution")
print("==============================")
est = self.estimates
print(f"Median β: {est.stack().median():.{digits}f}")
print(f"Min β: {est.min().min():.{digits}f}")
print(f"Max β: {est.max().max():.{digits}f}")
if hasattr(self, "r2_values") and isinstance(self.r2_values, pd.DataFrame):
print(f"Median R²: {self.r2_values.stack().median():.{digits}f}")
print("==============================")
[docs]
def merge(
self,
result_obj: "OLSResult",
left_prefix: str,
right_prefix: str,
) -> "MergedResult":
"""
Merge the current OLSResult object with another, tagging each specification
with a prefix to indicate origin.
Parameters
----------
result_obj : OLSResult
Another OLSResult object to merge.
left_prefix : str
Prefix to tag the specifications from the current object.
right_prefix : str
Prefix to tag the specifications from the result_obj object.
Returns
-------
MergedResult
A new MergedResult object containing combined estimates and metadata.
Raises
------
TypeError
If result_obj is not an instance of OLSResult or prefixes are not strings.
ValueError
If the dependent variable names do not match between the two objects.
"""
if not isinstance(result_obj, OLSResult):
raise TypeError("'result_obj' must be an instance of OLSResult.")
if not isinstance(left_prefix, str) or not isinstance(right_prefix, str):
raise TypeError("'prefixes' must be of type 'str'.")
if self.y_name != result_obj.y_name:
raise ValueError('Dependent variable names must match.')
specs_original = [frozenset(list(s) + [left_prefix]) for s in self.specs_names]
specs_new = [frozenset(list(s) + [right_prefix]) for s in result_obj.specs_names]
y = self.y_name
specs = specs_original + specs_new
estimates = pd.concat([self.estimates, result_obj.estimates], ignore_index=True)
p_values = pd.concat([self.p_values, result_obj.p_values], ignore_index=True)
r2_values = pd.concat([self.r2_values, result_obj.r2_values], ignore_index=True)
return MergedResult(
y=y,
specs=specs,
estimates=estimates,
p_values=p_values,
r2_values=r2_values
)
[docs]
class OLSResult(Protoresult):
"""
Encapsulates the results of an OLSRobust run
Attributes
----------
y_name : str
Dependent variable name.
x_name : str
Main predictor name.
data : pd.DataFrame
Original DataFrame used for all fits.
specs_names : pd.Series[frozenset[str]]
Specification sets (which controls are included, etc.).
all_predictors : list[list[str]]
List of predictor+control sets for each specification.
controls : list[str]
Pool of all control variables considered.
draws : int
Number of bootstrap draws.
kfold : int
Number of folds for out-of-sample evaluation.
estimates : pd.DataFrame
Shape (n_specs, draws), bootstrap estimates of β₁.
p_values : pd.DataFrame
Same shape, bootstrap p-values for β₁.
estimates_ystar : pd.DataFrame
Bootstrap estimates under the null (for joint inference).
p_values_ystar : pd.DataFrame
Bootstrap p-values under the null.
r2_values : pd.DataFrame
Shape (n_specs, draws), bootstrapped R².
summary_df : pd.DataFrame
Per-spec summary (median, CI, info criteria, cross-val metric).
inference : dict[str, Any]
Aggregated inference statistics (proportions, Stouffer’s Z, etc.).
shap_return : tuple[np.ndarray, pd.DataFrame] | None
Optional SHAP values and the matrix they came from.
"""
@staticmethod
def _empirical_p_two_sided(null_samples: np.ndarray, observed: float) -> float:
t = np.asarray(null_samples, dtype=float)
t = t[np.isfinite(t)]
if t.size == 0:
return np.nan
n = t.size
p_hi = (np.sum(t >= observed) + 1) / (n + 1)
p_lo = (np.sum(t <= observed) + 1) / (n + 1)
return float(min(1.0, 2.0 * min(p_hi, p_lo)))
def __init__(
self,
*,
y: str,
x: str,
data: pd.DataFrame,
specs: list[frozenset[str]],
all_predictors: list[list[str]],
controls: list[str],
draws: int,
kfold: int,
estimates: np.ndarray | pd.DataFrame,
estimates_ystar: np.ndarray | pd.DataFrame,
all_b: list[np.ndarray],
all_p: list[np.ndarray],
p_values: np.ndarray | pd.DataFrame,
p_values_ystar: np.ndarray | pd.DataFrame,
r2_values: np.ndarray | pd.DataFrame,
r2i_array: list[float],
ll_array: list[float],
aic_array: list[float],
bic_array: list[float],
hqic_array: list[float],
ll_raw_array: Optional[list[float]] = None,
ll_null_array: Optional[list[float]] = None,
ll_gain_array: Optional[list[float]] = None,
ll_gain_per_obs_array: Optional[list[float]] = None,
nobs_array: Optional[list[int]] = None,
av_k_metric_array: list[float] | None = None,
model_name: str,
name_av_k_metric: str | None = None,
shap_return: Any = None,
) -> None:
"""
Initialize an OLSResult container.
Parameters
----------
y
Dependent variable name.
x
Main predictor name.
data
Original DataFrame for all model fits.
specs
A list of frozensets indicating which controls each spec includes.
all_predictors
For each spec, the full list of predictors (x + controls).
controls
Pool of all candidate controls.
draws
Number of bootstrap draws.
kfold
Number of folds for cross-validation.
estimates
Bootstrap coefficient estimates (shape: n_specs × draws).
estimates_ystar
Bootstrap estimates under null (same shape).
all_b
Raw (non-resampled) β₁, one per spec.
all_p
Raw (non-resampled) p-values, one per spec.
p_values
Bootstrap p-values for β₁.
p_values_ystar
Bootstrap p-values under null.
r2_values
Bootstrapped R² (n_specs × draws).
r2i_array
In-sample R² for each spec.
ll_array
Primary likelihood metric for each spec. For OLS this is the
null-relative log-likelihood gain per observation; for logistic it
is the raw full-model log-likelihood.
aic_array, bic_array, hqic_array
Information criteria per spec.
ll_raw_array, ll_null_array, ll_gain_array, ll_gain_per_obs_array
Optional detailed OLS likelihood diagnostics.
nobs_array
Optional number of observations used in each spec.
av_k_metric_array
Out-of-sample metric (e.g. CV R²) per spec.
model_name
Human-readable name of the model (“OLS Robust”).
name_av_k_metric
Label for the CV metric.
shap_return
Optional return of (shap_values, input_matrix).
"""
super().__init__()
self.y_name = y
self.x_name = x
self.data = data
self.specs_names = pd.Series(specs)
self.all_predictors = all_predictors
self.controls = controls
self.draws = draws
self.kfold = kfold
self.estimates = pd.DataFrame(estimates)
self.p_values = pd.DataFrame(p_values)
self.estimates_ystar = pd.DataFrame(estimates_ystar)
self.p_values_ystar = pd.DataFrame(p_values_ystar)
self.r2_values = pd.DataFrame(r2_values)
self.all_b = all_b
self.all_p = all_p
self.summary_df = self._compute_summary()
self.summary_df['r2'] = pd.Series(r2i_array)
self.summary_df['ll'] = pd.Series(ll_array)
self.summary_df['aic'] = pd.Series(aic_array)
self.summary_df['bic'] = pd.Series(bic_array)
self.summary_df['hqic'] = pd.Series(hqic_array)
if ll_raw_array is not None:
self.summary_df['ll_raw'] = pd.Series(ll_raw_array)
if ll_null_array is not None:
self.summary_df['ll_null'] = pd.Series(ll_null_array)
if ll_gain_array is not None:
self.summary_df['ll_gain'] = pd.Series(ll_gain_array)
if ll_gain_per_obs_array is not None:
self.summary_df['ll_gain_per_obs'] = pd.Series(ll_gain_per_obs_array)
if nobs_array is not None:
self.summary_df['nobs'] = pd.Series(nobs_array)
self._compute_inference()
self.summary_df['av_k_metric'] = pd.Series(av_k_metric_array)
self.summary_df['spec_name'] = self.specs_names
self.summary_df['y'] = self.y_name
self.model_name = model_name
self.name_av_k_metric = name_av_k_metric
self.shap_return = shap_return
[docs]
def save(self, filename: str) -> None:
"""
Pickle this OLSResult to disk.
Parameters
----------
filename
Destination path for the pickle file.
"""
with open(filename, 'wb') as f:
_pickle.dump(self, f, -1)
[docs]
@classmethod
def load(cls, filename: str) -> OLSResult:
"""
Loads an OLSResult object from a pickle file.
Parameters
----------
filename
Path to the pickle file.
Returns
-------
OLSResult
"""
with open(filename, 'rb') as f:
return _pickle.load(f)
def _compute_inference(self) -> None:
"""
Compute summary statistics of the bootstrap coefficient distribution.
For each model specification, returns the median, minimum, maximum,
and the 2.5%/97.5% quantile bounds of the estimated coefficient
across all bootstrap draws.
Returns
-------
summary_df : pandas.DataFrame
A DataFrame indexed by specification, with columns:
- median : float
Median coefficient across bootstrap draws.
- min : float
Minimum coefficient across bootstrap draws.
- max : float
Maximum coefficient across bootstrap draws.
- ci_down : float
2.5th percentile coefficient (lower 95% bound).
- ci_up : float
97.5th percentile coefficient (upper 95% bound).
"""
df_model_result = pd.DataFrame({
'betas': [b[0][0] for b in self.all_b],
'p_values': [p[0][0] for p in self.all_p],
})
inference = not self.estimates_ystar.isna().all().all()
df_model_result['positive_beta'] = df_model_result['betas'].apply(lambda x: 1 if x > 0 else 0)
df_model_result['negative_beta'] = df_model_result['betas'].apply(lambda x: 1 if x < 0 else 0)
df_model_result['significant'] = df_model_result['p_values'].apply(lambda x: 1 if x < 0.05 else 0)
self.inference = {}
self.inference['median_ns'] = df_model_result['betas'].median() # note: ns for 'no sampling'
self.inference['median'] = self.estimates.stack().median()
for ic in ['aic', 'bic', 'hqic']:
if max(len(t) for t in self.y_name) == 1:
ic_array = np.array(self.summary_df[ic].to_list())
all_b = [arr[0] for arr in self.all_b]
coef_mat = np.vstack(all_b)
delta = ic_array - ic_array.min()
w = np.exp(-0.5 * delta)
w /= w.sum()
beta_avg = w @ coef_mat
self.inference[ic + '_average'] = beta_avg[0]
else:
self.inference[ic + '_average'] = np.nan
obs_med = float(df_model_result['betas'].median())
if inference:
null_meds = self.estimates_ystar.median(axis=0).to_numpy(dtype=float)
null_meds = null_meds[np.isfinite(null_meds)]
if null_meds.size == 0:
self.inference['median_p'] = np.nan
else:
self.inference['median_p'] = float(np.mean(np.abs(null_meds) >= abs(obs_med)))
else:
self.inference['median_p'] = np.nan
# Estimate dependence under the NULL bootstrap (y_star), not the sampling bootstrap.
# We estimate correlation of null z's (diag=1) after dropping degenerate specs.
p_matrix = self.p_values_ystar.to_numpy(dtype=float) # (n_specs, n_draws)
beta_matrix = self.estimates_ystar.to_numpy(dtype=float) # (n_specs, n_draws)
R_hat = None
rho_estimate = None
z_corr = None
_keep_stouffer = None
self.inference['stouffer_keep_mask'] = None
if (p_matrix.ndim == 2) and (p_matrix.shape[1] > 1):
p_clip = np.clip(p_matrix, 1e-300, 1.0 - 1e-16)
z_matrix = np.sign(beta_matrix) * norm.isf(p_clip / 2.0) # (n_specs, n_draws)
Z_null = z_matrix.T # draws × specs
z_std = np.std(Z_null, axis=0, ddof=1)
keep = np.isfinite(z_std) & (z_std > 1e-14)
self.inference['stouffer_keep_mask'] = keep
if np.sum(keep) >= 2:
Zk = Z_null[:, keep] # draws × m_eff
R_hat = np.corrcoef(Zk, rowvar=False) # (m_eff, m_eff)
m_eff = R_hat.shape[0]
rho_estimate = float((np.sum(R_hat) - np.trace(R_hat)) / (m_eff * (m_eff - 1)))
# Project to PSD (eigenvalue clipping) and add tiny ridge for stability
evals, evecs = np.linalg.eigh(R_hat)
evals[evals < 0.0] = 0.0
R_psd = (evecs * evals) @ evecs.T
R_psd = 0.5 * (R_psd + R_psd.T)
np.fill_diagonal(R_psd, 1.0)
z_corr = R_psd + 1e-12 * np.eye(R_psd.shape[0], dtype=R_psd.dtype)
_keep_stouffer = keep
else:
R_hat = None
rho_estimate = None
z_corr = None
_keep_stouffer = None
self.inference['min_ns'] = df_model_result['betas'].min()
self.inference['min'] = self.estimates.min().min()
self.inference['max_ns'] = df_model_result['betas'].max()
self.inference['max'] = self.estimates.max().max()
self.inference['pos_ns'] = df_model_result['positive_beta'].sum()
self.inference['pos_prop_ns'] = df_model_result['positive_beta'].mean()
self.inference['pos'] = (self.estimates > 0.0).sum().sum()
self.inference['pos_prop'] = (self.estimates > 0.0).mean().mean()
self.inference['neg_ns'] = df_model_result['negative_beta'].sum()
self.inference['neg_prop_ns'] = df_model_result['negative_beta'].mean()
self.inference['neg'] = (self.estimates < 0.0).sum().sum()
self.inference['neg_prop'] = (self.estimates < 0.0).mean().mean()
# observed counts
obs_pos = int((df_model_result['betas'] > 0).sum())
obs_neg = int((df_model_result['betas'] < 0).sum())
# null counts per null draw (across specs)
null_pos = self.estimates_ystar.gt(0).sum(axis=0).to_numpy()
null_neg = self.estimates_ystar.lt(0).sum(axis=0).to_numpy()
self.inference['pos_p'] = self.__class__._empirical_p_two_sided(null_pos, obs_pos)
self.inference['neg_p'] = self.__class__._empirical_p_two_sided(null_neg, obs_neg)
self.inference['sig_ns'] = df_model_result['significant'].sum()
self.inference['sig_prop_ns'] = df_model_result['significant'].mean()
self.inference['sig'] = (self.p_values.stack() < 0.05).sum().sum()
self.inference['sig_prop'] = float((self.p_values.stack() < 0.05).mean())
alpha = 0.05
obs_sig = int((self.p_values.stack() < alpha).sum())
null_sig = self.p_values_ystar.lt(alpha).sum(axis=0).to_numpy()
self.inference['sig_p'] = self.__class__._empirical_p_two_sided(null_sig, obs_sig)
self.inference['pos_sig_ns'] = (df_model_result['positive_beta'] &
df_model_result['significant']).sum()
self.inference['pos_sig_prop_ns'] = (df_model_result['positive_beta'] &
df_model_result['significant']).mean()
self.inference['pos_sig'] = ((self.estimates > 0.0) &
(self.p_values < 0.05)).sum().sum()
self.inference['pos_sig_prop'] = ((self.estimates > 0.0) &
(self.p_values < 0.05)).mean().mean()
self.inference['neg_sig_ns'] = (df_model_result['negative_beta'] &
df_model_result['significant']).sum()
self.inference['neg_sig_prop_ns'] = (df_model_result['negative_beta'] &
df_model_result['significant']).mean()
self.inference['neg_sig'] = ((self.estimates < 0.0) &
(self.p_values < 0.05)).sum().sum()
self.inference['neg_sig_prop'] = ((self.estimates < 0.0) &
(self.p_values < 0.05)).mean().mean()
obs_pos_sig = int(((self.estimates > 0.0) & (self.p_values < alpha)).sum().sum())
obs_neg_sig = int(((self.estimates < 0.0) & (self.p_values < alpha)).sum().sum())
null_pos_sig = ((self.estimates_ystar.gt(0) & self.p_values_ystar.lt(alpha))
.sum(axis=0).to_numpy())
null_neg_sig = ((self.estimates_ystar.lt(0) & self.p_values_ystar.lt(alpha))
.sum(axis=0).to_numpy())
self.inference['pos_sig_p'] = self.__class__._empirical_p_two_sided(null_pos_sig, obs_pos_sig)
self.inference['neg_sig_p'] = self.__class__._empirical_p_two_sided(null_neg_sig, obs_neg_sig)
# ----------------------------
# Stouffer (NULL-calibrated)
# ----------------------------
# We compute:
# Z_obs = (w^T z_obs) / sqrt(w^T Sigma_hat w)
# and calibrate p via the null draws:
# p = (1 + #{ |Z_null| >= |Z_obs| }) / (B + 1)
#
# where z_null are computed from (p_values_ystar, estimates_ystar).
#
# Note: Sigma_hat is estimated from null z's (already in z_corr).
# If z_corr is unavailable, we fall back to independence denom.
# Observed per-spec p and beta (full-sample, not bootstrap)
p_obs_all = df_model_result['p_values'].to_numpy(dtype=float)
b_obs_all = df_model_result['betas'].to_numpy(dtype=float)
# Valid mask for observed quantities
obs_valid = np.isfinite(p_obs_all) & (p_obs_all >= 0.0) & (p_obs_all <= 1.0) & np.isfinite(b_obs_all)
# Use the same "keep" mask you computed from null z-variance if available,
# but intersect with observed validity.
keep_null = self.inference.get('stouffer_keep_mask', None)
if keep_null is None:
keep = obs_valid
else:
keep = obs_valid & keep_null
if np.sum(keep) == 0:
self.inference['Stouffers'] = (np.nan, np.nan)
self.inference['correlation_matrix'] = None
self.inference['Sigma_hat'] = None
self.inference['mean_correlation'] = None
else:
# ---- observed z (signed, two-sided) with clipping
p_clip_obs = np.clip(p_obs_all[keep], 1e-300, 1.0 - 1e-16)
z_obs = np.sign(b_obs_all[keep]) * norm.isf(p_clip_obs / 2.0)
# If no null draws (all-NaN), Stouffer cannot be null-calibrated
if (p_matrix.size == 0) or (p_matrix.shape[1] < 1) or np.isnan(p_matrix).all():
# Fallback: just report the Z statistic with asymptotic p (diagnostic only)
w = np.ones_like(z_obs, dtype=float)
denom = float(np.sqrt(np.dot(w, w)))
Z_obs = float(np.dot(w, z_obs) / denom)
p_two = float(2.0 * norm.sf(abs(Z_obs)))
self.inference['Stouffers'] = (Z_obs, p_two)
self.inference['correlation_matrix'] = None
self.inference['Sigma_hat'] = None
self.inference['mean_correlation'] = None
else:
# Build null z scores (signed) for all specs/draws
p_clip_null = np.clip(p_matrix, 1e-300, 1.0 - 1e-16)
z_matrix = np.sign(beta_matrix) * norm.isf(p_clip_null / 2.0) # (n_specs, n_draws)
# Keep only the selected specs (columns after transpose)
Z_null_specs = z_matrix.T[:, keep] # (n_draws, m_eff)
# Weights (currently uniform; if you later add weights, replace here)
w = np.ones(Z_null_specs.shape[1], dtype=float)
# Denominator:
# - if z_corr exists and matches kept specs, use it
# - else fall back to independence
denom = None
if z_corr is not None and _keep_stouffer is not None:
# _keep_stouffer corresponds to the mask used when building z_corr.
# We must ensure we are using the same kept specs subset.
# Here, we require keep == _keep_stouffer on the indices considered.
# If they differ (because obs_valid removed some), we fall back to independence.
if np.array_equal(keep, _keep_stouffer):
denom2 = float(w @ (z_corr @ w))
if np.isfinite(denom2) and denom2 > 0.0:
denom = float(np.sqrt(denom2))
if denom is None:
denom = float(np.sqrt(np.dot(w, w)))
# Combined observed statistic
Z_obs = float(np.dot(w, z_obs) / denom)
# Combined null statistics for calibration (vectorized)
Z_null = (Z_null_specs @ w) / denom
# Two-sided null-calibrated p-value with +1 correction
B = int(Z_null.shape[0])
p_two = float((1.0 + np.sum(np.abs(Z_null) >= abs(Z_obs))) / (B + 1.0))
self.inference['Stouffers'] = (Z_obs, p_two)
self.inference['correlation_matrix'] = R_hat
self.inference['Sigma_hat'] = z_corr
self.inference['mean_correlation'] = rho_estimate
[docs]
def summary(self, digits=3) -> None:
"""
Print a comprehensive textual summary of the fitted model.
"""
def print_separator(title=None):
print("=" * 30)
if title:
print(title)
print("=" * 30)
inference = not self.estimates_ystar.isna().all().all()
# Display basic model information
print_separator("1. Model Summary")
print(f"Model: {self.model_name}")
print("Inference Tests:", "Yes" if inference else "No")
if max(len(t) for t in self.y_name) > 1:
print(f"Dependent variable: {set().union(*self.y_name)}")
else:
print(f"Dependent variable: {self.y_name}")
print(f"Independent variable: {self.x_name}")
print(f"Number of possible controls: {len(self.controls)}")
print(f"Number of draws: {self.draws}")
print(f"Number of folds: {self.kfold}")
print(f"Number of specifications: {len(self.specs_names)}")
# Print model robustness metrics
print_separator("2. Model Robustness Metrics")
print('2.1 Inference Metrics')
print_separator()
# Medians / extrema
if not inference:
print(f"Median β (specs, no resampling): {self.inference['median_ns']:.{digits}f}")
else:
print(f"Median β (specs, no resampling): {self.inference['median_ns']:.{digits}f} "
f"(null-calibrated p: {self.inference['median_p']:.{digits}g})")
print(f"Median β (bootstraps × specs): {self.inference['median']:.{digits}f}")
print(f"Min β (specs, no resampling): {self.inference['min_ns']:.{digits}f}")
print(f"Min β (bootstraps × specs): {self.inference['min']:.{digits}f}")
print(f"Max β (specs, no resampling): {self.inference['max_ns']:.{digits}f}")
print(f"Max β (bootstraps × specs): {self.inference['max']:.{digits}f}")
# IC-weighted averages (fix NaN checks)
if not np.isnan(self.inference.get('aic_average', np.nan)):
print(f"AIC-weighted β (specs, no resampling): {self.inference['aic_average']:.{digits}f}")
if not np.isnan(self.inference.get('bic_average', np.nan)):
print(f"BIC-weighted β (specs, no resampling): {self.inference['bic_average']:.{digits}f}")
if not np.isnan(self.inference.get('hqic_average', np.nan)):
print(f"HQIC-weighted β (specs, no resampling): {self.inference['hqic_average']:.{digits}f}")
# Descriptive shares + null-calibrated tests
label_share = lambda s: f"{s:.{digits}f}"
label_p = lambda p: f"{p:.{digits}g}"
# Significant share
print(f"Share significant (specs, no resampling) [descriptive]: "
f"{label_share(self.inference['sig_prop_ns'])}")
print(f"Share significant (bootstraps × specs) [descriptive]: "
f"{label_share(self.inference['sig_prop'])}"
+ ("" if not inference else f" | null-calibrated p: {label_p(self.inference['sig_p'])}"))
# Positive share
print(f"Share β>0 (specs, no resampling) [descriptive]: "
f"{label_share(self.inference['pos_prop_ns'])}")
print(f"Share β>0 (bootstraps × specs) [descriptive]: "
f"{label_share(self.inference['pos_prop'])}"
+ ("" if not inference else f" | null-calibrated p: {label_p(self.inference['pos_p'])}"))
# Negative share
print(f"Share β<0 (specs, no resampling) [descriptive]: "
f"{label_share(self.inference['neg_prop_ns'])}")
print(f"Share β<0 (bootstraps × specs) [descriptive]: "
f"{label_share(self.inference['neg_prop'])}"
+ ("" if not inference else f" | null-calibrated p: {label_p(self.inference['neg_p'])}"))
# Positive & significant share
print(f"Share β>0 & significant (specs, no resampling) [descriptive]: "
f"{label_share(self.inference['pos_sig_prop_ns'])}")
print(f"Share β>0 & significant (bootstraps × specs) [descriptive]: "
f"{label_share(self.inference['pos_sig_prop'])}"
+ ("" if not inference else f" | null-calibrated p: {label_p(self.inference['pos_sig_p'])}"))
# Negative & significant share
print(f"Share β<0 & significant (specs, no resampling) [descriptive]: "
f"{label_share(self.inference['neg_sig_prop_ns'])}")
print(f"Share β<0 & significant (bootstraps × specs) [descriptive]: "
f"{label_share(self.inference['neg_sig_prop'])}"
+ ("" if not inference else f" | null-calibrated p: {label_p(self.inference['neg_sig_p'])}"))
# Stouffer’s
Z, p_two = self.inference['Stouffers']
print(f"Stouffer’s Z = {Z:.{digits}f} (two-sided p = {p_two:.{digits}g})")
print_separator()
# print in-sample metrics
if max(len(t) for t in self.y_name) == 1:
print('2.2 In-Sample Metrics (Full Sample)')
print_separator()
print(f"Min AIC: {round(self.summary_df['aic'].min(), digits)}, Specs: {list(self.summary_df['spec_name'].loc[self.summary_df['aic'].idxmin()])}")
print(f"Min BIC: {round(self.summary_df['bic'].min(), digits)}, Specs: {list(self.summary_df['spec_name'].loc[self.summary_df['bic'].idxmin()])}")
print(f"Min HQIC: {round(self.summary_df['hqic'].min(), digits)}, Specs: {list(self.summary_df['spec_name'].loc[self.summary_df['hqic'].idxmin()])}")
if 'll_gain_per_obs' in self.summary_df.columns:
ll_col = 'll_gain_per_obs'
print(
"Max Relative Log-Likelihood Gain (per obs): "
f"{round(self.summary_df[ll_col].max(), digits)}, "
f"Specs: {list(self.summary_df['spec_name'].loc[self.summary_df[ll_col].idxmax()])}"
)
print(
"Min Relative Log-Likelihood Gain (per obs): "
f"{self.summary_df[ll_col].min():.{digits}f}, "
f"Specs: {list(self.summary_df['spec_name'].loc[self.summary_df[ll_col].idxmin()])}"
)
if 'll_raw' in self.summary_df.columns:
print(
f"Max Raw Log Likelihood: {round(self.summary_df['ll_raw'].max(), digits)}, "
f"Specs: {list(self.summary_df['spec_name'].loc[self.summary_df['ll_raw'].idxmax()])}"
)
print(
f"Min Raw Log Likelihood: {self.summary_df['ll_raw'].min():.{digits}f}, "
f"Specs: {list(self.summary_df['spec_name'].loc[self.summary_df['ll_raw'].idxmin()])}"
)
else:
print(f"Max Log Likelihood: {round(self.summary_df['ll'].max(), digits)}, Specs: {list(self.summary_df['spec_name'].loc[self.summary_df['ll'].idxmax()])}")
print(
f"Min Log Likelihood: {self.summary_df['ll'].min():.{digits}f}, "
f"Specs: {list(self.summary_df['spec_name'].loc[self.summary_df['ll'].idxmin()])}"
)
print(
f"Max {'Adj-' if 'OLS' in self.model_name else 'Pseudo-'}R²: "
f"{self.summary_df['r2'].max():.{digits}f}, "
f"Specs: {list(self.summary_df['spec_name'].loc[self.summary_df['r2'].idxmax()])}"
)
print(
f"Min {'Adj-' if 'OLS' in self.model_name else 'Pseudo-'}R²: "
f"{self.summary_df['r2'].min():.{digits}f}, "
f"Specs: {list(self.summary_df['spec_name'].loc[self.summary_df['r2'].idxmin()])}"
)
print_separator()
if max(len(t) for t in self.y_name) == 1:
print(f'2.3 Out-Of-Sample Metrics ({self.name_av_k_metric} averaged across folds)')
else:
print(f'2.2 Out-Of-Sample Metrics ({self.name_av_k_metric} averaged across folds)')
print_separator()
oos_max_row = self.summary_df.loc[self.summary_df['av_k_metric'].idxmax(),]
print(f'Max Average: {round(oos_max_row["av_k_metric"], digits)}, Specs: {list(oos_max_row["spec_name"])} ')
oos_min_row = self.summary_df.loc[self.summary_df['av_k_metric'].idxmin(),]
print(f'Min Average: {round(oos_min_row["av_k_metric"], digits)}, Specs: {list(oos_min_row["spec_name"])} ')
print(f"Mean Average: {round(self.summary_df['av_k_metric'].mean(), digits)}")
print(f"Median Average: {round(self.summary_df['av_k_metric'].median(), digits)}")
[docs]
def plot(self,
loess: bool = True,
specs: Optional[List[List[str]]] = None,
ic: str = 'aic',
ci: float = 1,
colormap: str = 'viridis',
figsize: Tuple[int, int] = (12, 6),
ext: str = ' pdf',
figpath = None,
project_name: str = 'no_project_name',
highlights: bool = True,
oddsratio: bool = False
) -> plt.Figure:
"""
Plots the regression results using specified options.
Parameters
----------
loess : bool, default=True
Whether to add a LOESS smoothed trend line.
specs : list of list of str, optional
Up to three specific model specifications to highlight.
ic : {'bic', 'aic', 'hqic'}, default='aic'
Which information criterion to display.
ci: float, default=1
Confidence interval.
colormap : str, default='viridis'
Name of the matplotlib colormap for the plot.
figpath : str or Path, optional
Directory in which to save outputs; if None, uses current working dir.
figsize : tuple of int, default=(12, 6)
Figure width and height in inches.
ext : str, default='pdf'
File extension if saving the figure (unused if not saving).
project_name : str, default='no_project_name'
Project identifier used in saved filename (unused if not saving).
highlights bool, default = True
Whether to highlight individual plots.
oddsratio bool, default=False
Whether to exponentiate the coefficients (e.g. for odds ratios).
Returns
-------
fig : matplotlib.figure.Figure
The figure object containing the plot.
Raises
------
ValueError
If `ic` is not one of {'bic', 'aic', 'hqic'}.
or if more than three specs are given,
TypeError
If `specs` is provided but is not a list of lists of str,
or any spec is not in the computed specifications.
"""
valid_ic = ['bic', 'aic', 'hqic']
if ic not in valid_ic:
raise ValueError(
f"Unsupported information criterion: expected one of {valid_ic}, "
f"received: '{ic}'."
)
if specs is not None:
if not all(isinstance(l, list) for l in specs):
raise TypeError("'specs' must be a list of lists.")
if len(specs) > 3:
raise ValueError("The max number of specifications to highlight is 3")
if not all(frozenset(spec) in self.specs_names.to_list() for spec in specs):
raise TypeError("All specifications in 'spec' must be in the valid computed specifications.")
if ic not in valid_ic:
raise ValueError(f"'ic' must be one of the following: {valid_ic}")
from robustipy.figures import plot_results
return plot_results(results_object=self,
loess=loess,
specs=specs,
ic=ic,
ci=ci,
colormap=colormap,
figsize=figsize,
ext=ext,
figpath=figpath,
project_name=project_name,
highlights=highlights,
oddsratio=oddsratio)
def _compute_summary(self):
"""
Computes summary statistics based on coefficient estimates.
Returns:
pd.DataFrame: DataFrame containing summary statistics.
"""
data = self.estimates.copy()
out = pd.DataFrame()
out['median'] = data.median(axis=1)
out['max'] = data.max(axis=1)
out['min'] = data.min(axis=1)
out['ci_up'] = data.quantile(q=0.975, axis=1,
interpolation='nearest')
out['ci_down'] = data.quantile(q=0.025, axis=1,
interpolation='nearest')
return out
[docs]
def compute_bma(self) -> pd.DataFrame:
"""
Performs Bayesian Model Averaging (BMA) using BIC-implied priors.
Returns
-------
pd.DataFrame
DataFrame containing BMA results with control variable inclusion
probabilities and average coefficients.
"""
if isinstance(self.y_name, list) and len({str(y) for y in self.y_name}) > 1:
raise ValueError(
"BMA weights are only likelihood-comparable within a common outcome, "
"likelihood family, and estimation sample. Compute BMA separately "
"within comparable outcome sets."
)
likelihood_per_var = []
weighted_coefs = []
# Compute model likelihoods using BIC-implied log posterior
max_ll = np.max(-self.summary_df.bic / 2)
shifted_ll = (-self.summary_df.bic / 2) - max_ll
models_likelihood = np.exp(shifted_ll)
sum_likelihoods = np.nansum(models_likelihood)
# Flatten list of coefficient arrays and variable names
coefs = [[i[0] for i in x] for x in self.all_b]
coefs = [i for sl in coefs for i in sl]
var_names = [i for sl in self.all_predictors for i in sl]
coefs_df = pd.DataFrame({'coef': coefs, 'var_name': var_names})
# Compute weighted likelihood and average coefficients for each control variable
for ele in self.controls:
idx = [ele in spec for spec in self.specs_names]
likelihood_per_var.append(np.nansum(models_likelihood[idx]))
coefs = coefs_df[coefs_df.var_name == ele].coef.to_numpy()
likelihood = models_likelihood[idx]
weighted_coef = coefs * likelihood
weighted_coefs.append(np.nansum(weighted_coef))
# Normalize to get posterior inclusion probabilities and BMA-weighted coefficients
likelihood_per_var = np.asarray(likelihood_per_var, dtype=float)
weighted_coefs = np.asarray(weighted_coefs, dtype=float)
probs = likelihood_per_var / sum_likelihoods
final_coefs = weighted_coefs / sum_likelihoods
conditional_coefs = np.divide(
weighted_coefs,
likelihood_per_var,
out=np.full_like(weighted_coefs, np.nan, dtype=float),
where=likelihood_per_var != 0,
)
# Return summary DataFrame
summary_bma = pd.DataFrame({
"control_var": self.controls,
"probs": probs,
"average_coefs": final_coefs,
"conditional_average_coefs": conditional_coefs,
})
return summary_bma
[docs]
def merge(
self,
result_obj: "OLSResult",
left_prefix: str,
right_prefix: str
) -> "MergedResult":
"""
Merge this OLSResult with another, tagging each spec by prefix.
Parameters
----------
result_obj : OLSResult
Another result object with the same dependent variable.
left_prefix : str
Tag to append to this object's specifications.
right_prefix : str
Tag to append to the other object's specifications.
Returns
-------
merged : MergedResult
A new MergedResult containing all specs, estimates,
p_values, and r2_values from both.
Raises
------
TypeError
If `result_obj` is not an OLSResult, or prefixes are not strings.
ValueError
If the dependent variable names do not match.
"""
# Validate input types
if not isinstance(result_obj, OLSResult):
raise TypeError("'result_obj' must be an instance of OLSResult.")
if not isinstance(left_prefix, str) or not isinstance(right_prefix, str):
raise TypeError("'prefixes' must be of type 'str.'")
# Ensure dependent variables match before merging
if self.y_name != result_obj.y_name:
raise ValueError('Dependent variable names must match.')
# Apply prefixes to spec sets
specs_original = [frozenset(list(s) + [left_prefix]) for s in self.specs_names]
specs_new = [frozenset(list(s) + [right_prefix]) for s in result_obj.specs_names]
# Merge all relevant fields
y = self.y_name
specs = specs_original + specs_new
estimates = pd.concat([self.estimates, result_obj.estimates], ignore_index=True)
p_values = pd.concat([self.p_values, result_obj.p_values], ignore_index=True)
r2_values = pd.concat([self.r2_values, result_obj.r2_values], ignore_index=True)
# Return a new MergedResult object
return MergedResult(
y=y,
specs=specs,
estimates=estimates,
p_values=p_values,
r2_values=r2_values
)
[docs]
def save_to_csv(self, path: str) -> None:
"""Function to save summary dataframe to a csv"""
self.summary_df.to_csv(path)
[docs]
class OLSRobust(BaseRobust):
"""
Class for multi-variate regression using OLS
Parameters
----------
y : list[str]
Dependent variable column name(s). If multiple dependent variables are
supplied, OLSRobust constructs non-empty standardised outcome composites.
x : list[str]
Predictor column name(s) included in every specification. The first element
is treated as the reported focal estimand; any additional elements are fixed
predictors that do not vary across the control-subset space.
data : pandas.DataFrame
The full dataset containing `y`, `x`, and any controls.
model_name : str, default='OLS Robust'
A custom label for this model run, used in outputs and plots
Attributes
----------
results : OLSResult
Populated after calling `.fit()`, contains all estimates and diagnostics.
"""
def __init__(
self,
*,
y: List[str],
x: List[str],
data: pd.DataFrame,
model_name: str = 'OLS Robust'
) -> None:
super().__init__(y=y, x=x, data=data, model_name=model_name)
[docs]
def get_results(self) -> 'OLSResult':
"""
Return the OLSResult object once .fit() has been called.
Returns
-------
OLSResult
The result object encapsulating all analysis outputs.
"""
return self.results
[docs]
def sample_z_specs(self, controls, z_specs_sample_size):
"""
Generate a sample of z specifications by randomly selecting subsets of control variables.
This method creates a set of binary masks to determine which subsets of the given `controls`
should be included in each z specification. The number of subsets sampled is controlled by
`z_specs_sample_size`.
Parameters
----------
controls : list of str
A list of control variable names from which to build z specifications.
z_specs_sample_size : int
The number of z specifications to sample. Must be a positive integer.
Returns
-------
space_n_sample : int
The number of sampled z specifications.
z_specs_sample : list of tuple of str
A list where each element is a tuple of selected control variable names
representing one sampled z specification.
Notes
-----
- The method uses `sample_z_masks` to generate binary inclusion masks.
- If `self.seed` is defined, it is passed to `sample_z_masks` to ensure reproducibility.
- Each sampled mask determines a unique subset of `controls`.
"""
z_cols = controls
n_z = len(z_cols)
masks = sample_z_masks(
n_z = n_z,
n_masks = z_specs_sample_size,
seed = getattr(self, "seed", None)
)
z_specs_sample = [
tuple(z_cols[i] for i in range(n_z) if (m >> i) & 1)
for m in masks
]
space_n_sample = len(z_specs_sample)
return space_n_sample, z_specs_sample
[docs]
def fit(
self,
*,
controls: List[str],
group: Optional[str] = None,
draws: int = None,
kfold: int = None,
oos_metric: str = None,
n_cpu: Optional[int] = None,
seed: Optional[int] = None,
composite_sample: Optional[int] = None,
z_specs_sample_size: Optional[int] = None,
rescale_y: Optional[bool] = False,
rescale_x: Optional[bool] = False,
rescale_z: Optional[bool] = False,
compute_shap: bool = True,
threshold: int = 1000000
) -> 'OLSRobust':
"""
Fit the OLS models across the specification space and over bootstrap resamples.
This method explores a variety of control variable specifications by constructing
different combinations (z-specs) and optionally sampling from this space. It performs
bootstrap resampling and/or cross-validation depending on the arguments provided.
Parameters
----------
controls : list of str
Candidate control variables to include in model specifications.
group : str, optional
Column name for grouping fixed effects. If provided, outcomes are de-meaned by group.
draws : int, optional
Number of bootstrap resamples per specification. If None, bootstrapping is skipped.
kfold : int, optional
Number of folds for out-of-sample evaluation. Requires oos_metric to be specified.
oos_metric : {'pseudo-r2', 'rmse'}, optional
Metric to evaluate out-of-sample performance when kfold is set.
n_cpu : int, optional
Number of parallel processes to use. Defaults to all available CPUs minus one if None.
seed : int, optional
Random seed for reproducibility. Propagated to all random operations.
composite_sample : int, optional
Number of non-empty outcome-composite subsets to sample when multiple dependent
variables are supplied. If None, all non-empty outcome subsets are enumerated.
Ignored for a single dependent variable.
z_specs_sample_size : int, optional
Number of z specifications to randomly sample from the full set of possible combinations.
If None, the full specification space is used.
rescale_y : bool, default=False
If True, rescale the dependent variable to have mean 0 and standard deviation 1.
rescale_x : bool, default=False
If True, rescale the x variable(s) to have mean 0 and standard deviation 1.
rescale_z : bool, default=False
If True, rescale the z variable(s) to have mean 0 and standard deviation 1.
compute_shap : bool, default=True
If True, compute SHAP values for plotting. Set False for fit-only profiling.
threshold : int, default=1_000_000
If the total number of model fits (specs × draws × folds) exceeds this number, a warning is raised.
Returns
-------
self : OLSRobust
The fitted model instance. Results are stored in the .results attribute.
Notes
-----
- At least one of draws or kfold must be set to perform model fitting.
- z_specs_sample_size samples the covariate-subset space before fitting.
- composite_sample samples the outcome-composite space before fitting when len(y) > 1.
- This method may be computationally intensive; parallelisation is recommended via n_cpu.
"""
# Validate control variable input
if not isinstance(controls, list):
raise TypeError(f"'controls' must be a list. Received types: {type(controls).__name__}.")
# Validate z_specs_sample_size input
if z_specs_sample_size is not None:
if not isinstance(z_specs_sample_size, int) or z_specs_sample_size <= 0:
raise ValueError("'z_specs_sample_size' must be an integer greater than 0 if provided.")
# Optionally rescale y, x, and z variables
if rescale_y:
if len(self.y) > 1:
warnings.warn("Rescaling of the dependent variable already occurs with multiple y variables. Skipping additional rescaling.", UserWarning)
else:
self.data[self.y] = rescale(self.data[self.y])
if rescale_x:
for column in self.x:
self.data[column] = rescale(self.data[column])
if rescale_z:
for column in controls:
self.data[column] = rescale(self.data[column])
# Standardize and propagate user arguments
draws, kfold, oos_metric, n_cpu, seed = make_inquiry(
self.model_name, self.y, self.data, draws, kfold, oos_metric, n_cpu, seed
)
self.seed = seed
self.composite_sample = composite_sample
# Ensure constant term and prepare x, controls
if group:
# With group demeaning, an intercept is not identified. Drop any existing constant.
if "const" in self.data.columns:
self.data = self.data.drop(columns=["const"])
if "const" in self.x:
self.x = [c for c in self.x if c != "const"]
if "const" in controls:
controls = [c for c in controls if c != "const"]
warnings.warn(
"[OLSRobust] Group fixed effects requested: dropping constant term "
"before demeaning so t/p are identified.",
UserWarning
)
invariant = _find_group_invariant_columns(self.data, self.x + controls, group)
inv_x = [c for c in invariant if c in self.x]
inv_controls = [c for c in invariant if c in controls]
if inv_x or inv_controls:
warnings.warn(
"[OLSRobust] These variables are constant within each group and "
"are not identified under group demeaning. t/p will be NaN for them: "
f"x={inv_x!r}, controls={inv_controls!r}.",
UserWarning
)
else:
self.x, controls = _ensure_single_constant(self.data, self.y, self.x, controls)
# Handle multiple y case
if len(self.y) > 1:
self.multiple_y()
# Check that provided arguments are valid
self._validate_fit_args(
controls=controls,
group=group,
draws=draws,
kfold=kfold,
oos_metric=oos_metric,
n_cpu=n_cpu,
seed=self.seed,
valid_oos_metrics=['pseudo-r2', 'rmse'],
threshold=threshold
)
# Set global seed
np.random.seed(self.seed)
# Inform user of configuration
print(
f"OLSRobust is running with n_cpu={n_cpu}, draws={draws}, folds={kfold}, seed={seed}.\n"
f"We're evaluating our out-of-sample predictions with the {oos_metric} metric.\n"
f"The estimand of interest is {self.x[0]}. Let's begin the calculations..."
)
# Determine sample size and metric
sample_size = self.data.shape[0]
self.oos_metric_name = oos_metric
# Optionally sample from the specification space
if z_specs_sample_size:
space_n, z_specs = self.sample_z_specs(controls=controls, z_specs_sample_size=z_specs_sample_size)
else:
space_n = space_size(controls)
z_specs = all_subsets(controls)
z_specs = [tuple(spec) for spec in z_specs]
space_n = len(z_specs)
# Cache predictor slices by control-spec bitmask to avoid repeated
# DataFrame slicing in large specification spaces.
control_positions = {name: idx for idx, name in enumerate(controls)}
predictor_prefix = self.x + ([group] if group else [])
base_predictors_df = self.data[predictor_prefix + controls]
spec_predictor_cache: dict[int, pd.DataFrame] = {}
def _get_predictors_for_spec(spec_tuple: tuple[str, ...]) -> pd.DataFrame:
mask = _spec_bitmask(spec_tuple, control_positions)
cached = spec_predictor_cache.get(mask)
if cached is None:
cols = predictor_prefix + list(spec_tuple)
cached = base_predictors_df.loc[:, cols]
spec_predictor_cache[mask] = cached
return cached
# Count y-composite specifications (1 if single y)
n_y_comps = len(getattr(self, "y_specs", [None]))
# Warn if model fit will be too large
self._warn_if_large_draws(
draws=draws,
n_control_specs=space_n,
n_y_composites=n_y_comps,
threshold=threshold
)
notebook_draws_bar = _make_notebook_draws_bar(
total=space_n * draws * n_y_comps,
description="Bootstrap draws"
)
# Delegate to single or multiple y logic
if len(self.y) > 1:
# Initialize containers to collect results across all composite outcomes
list_b_array = []
list_p_array = []
list_b_array_ystar = []
list_p_array_ystar = []
list_r2_array = []
list_r2i_array = []
list_ll_array = []
list_ll_raw_array = []
list_ll_null_array = []
list_ll_gain_array = []
list_ll_gain_per_obs_array = []
list_nobs_array = []
list_aic_array = []
list_bic_array = []
list_hqic_array = []
list_av_k_metric_array = []
y_names = []
specs = []
all_predictors = []
b_all_list = []
p_all_list = []
# Loop over each composite y variable
for y, y_name in _progress_iter(
zip(self.y_composites, self.y_specs),
total=len(self.y_composites),
description="Composite Ys",
):
# Initialize arrays to store draws for this y-spec
b_array = np.empty([space_n, draws])
p_array = np.empty([space_n, draws])
b_array_ystar = np.empty([space_n, draws])
p_array_ystar = np.empty([space_n, draws])
r2_array = np.empty([space_n, draws])
r2i_array = np.empty([space_n])
ll_array = np.empty([space_n])
ll_raw_array = np.empty([space_n])
ll_null_array = np.empty([space_n])
ll_gain_array = np.empty([space_n])
ll_gain_per_obs_array = np.empty([space_n])
nobs_array = np.empty([space_n], dtype=int)
aic_array = np.empty([space_n])
bic_array = np.empty([space_n])
hqic_array = np.empty([space_n])
av_k_metric_array = np.empty([space_n])
# Loop over all z-specs (control combinations)
for index, spec in enumerate(z_specs):
spec_predictors = _get_predictors_for_spec(spec)
comb = pd.concat([y, spec_predictors], axis=1)
comb = comb.dropna()
comb = comb.reset_index(drop=True).copy()
# Demean by group if required
if group:
comb = group_demean(comb, group=group)
X_design = comb.drop(columns=comb.columns[0])
try:
self._check_colinearity(X_design)
except ValueError as e:
warnings.warn(
f"[OLSRobust] Spec {spec!r} may be unstable due to perfect collinearity:\n{e}\n"
"Proceeding with pseudoinverse; coefficients may not be uniquely identified.",
UserWarning
)
# Fit the full sample OLS mode
(
b_all, p_all, r2_i, ll_i,
aic_i, bic_i, hqic_i, av_k_metric_i,
ll_raw_i, ll_null_i, ll_gain_i, ll_gain_per_obs_i, nobs_i
) = self._full_sample_OLS(
comb,
kfold=kfold,
group=group,
oos_metric_name=self.oos_metric_name
)
# Calculate pseudo-outcome residuals for bootstrap
y_star = comb.iloc[:, [0]] - np.dot(comb.iloc[:, [1]], b_all[0][0])
bootstrap_data = _prepare_ols_bootstrap_data(comb, y_star)
if group:
n_predictors = bootstrap_data.drop(
columns=[bootstrap_data.columns[0], "y_star", group],
errors="ignore"
).shape[1]
min_rows_after_filter = max(5, n_predictors + 2)
bootstrap_arrays = None
else:
min_rows_after_filter = None
bootstrap_arrays = _prepare_non_group_ols_bootstrap_arrays(bootstrap_data)
seeds = np.random.randint(0, 2**31, size=draws, dtype=np.int64)
# Run bootstrap regressions in parallel
def _run_one_seed(seed_i: int):
if group is None:
assert bootstrap_arrays is not None
return _strap_non_group_OLS_arrays(
*bootstrap_arrays,
sample_size=sample_size,
seed=seed_i,
)
return self._strap_OLS(
bootstrap_data,
group,
sample_size,
seed_i,
None,
min_rows_after_filter=min_rows_after_filter,
)
bootstrap_out = _run_parallel_seed_batches(
seeds=seeds,
n_cpu=n_cpu,
run_one_seed=_run_one_seed,
draws_bar=notebook_draws_bar,
dispatch_mode="streaming" if group else "chunked",
)
b_list, p_list, r2_list, b_list_ystar, p_list_ystar = zip(*bootstrap_out)
# Collect and store results for this spec
y_names.append(y_name)
specs.append(frozenset(list(y_name) + list(spec)))
all_predictors.append(self.x + list(spec))
b_array[index, :] = b_list
p_array[index, :] = p_list
b_array_ystar[index, :] = b_list_ystar
p_array_ystar[index, :] = p_list_ystar
r2_array[index, :] = r2_list
r2i_array[index] = r2_i
ll_array[index] = ll_i
ll_raw_array[index] = ll_raw_i
ll_null_array[index] = ll_null_i
ll_gain_array[index] = ll_gain_i
ll_gain_per_obs_array[index] = ll_gain_per_obs_i
nobs_array[index] = nobs_i
aic_array[index] = aic_i
bic_array[index] = bic_i
hqic_array[index] = hqic_i
av_k_metric_array[index] = av_k_metric_i
b_all_list.append(b_all)
p_all_list.append(p_all)
# Store arrays across all y-specs
list_b_array.append(b_array)
list_p_array.append(p_array)
list_b_array_ystar.append(b_array_ystar)
list_p_array_ystar.append(p_array_ystar)
list_r2_array.append(r2_array)
list_r2i_array.append(r2i_array)
list_ll_array.append(ll_array)
list_ll_raw_array.append(ll_raw_array)
list_ll_null_array.append(ll_null_array)
list_ll_gain_array.append(ll_gain_array)
list_ll_gain_per_obs_array.append(ll_gain_per_obs_array)
list_nobs_array.append(nobs_array)
list_aic_array.append(aic_array)
list_bic_array.append(bic_array)
list_hqic_array.append(hqic_array)
list_av_k_metric_array.append(av_k_metric_array)
# Assemble final result object across all composite outcomes
results = OLSResult(
y=y_names,
x=self.x,
data=self.data,
specs=specs,
all_predictors=all_predictors,
controls=controls,
draws=draws,
kfold=kfold,
all_b=b_all_list,
all_p=p_all_list,
estimates=np.vstack(list_b_array),
p_values=np.vstack(list_p_array),
estimates_ystar=np.vstack(list_b_array_ystar),
p_values_ystar=np.vstack(list_p_array_ystar),
r2_values=np.vstack(list_r2_array),
r2i_array=np.hstack(list_r2i_array),
ll_array=np.hstack(list_ll_array),
ll_raw_array=np.hstack(list_ll_raw_array),
ll_null_array=np.hstack(list_ll_null_array),
ll_gain_array=np.hstack(list_ll_gain_array),
ll_gain_per_obs_array=np.hstack(list_ll_gain_per_obs_array),
nobs_array=np.hstack(list_nobs_array),
aic_array=np.hstack(list_aic_array),
bic_array=np.hstack(list_bic_array),
hqic_array=np.hstack(list_hqic_array),
av_k_metric_array=np.hstack(list_av_k_metric_array),
model_name=self.model_name,
name_av_k_metric=self.oos_metric_name,
shap_return=None
)
self.results = results
else:
# Initialize containers for storing results across z-specs
specs = []
all_predictors = []
b_all_list = []
p_all_list = []
# Preallocate arrays for bootstrap and model stats
b_array = np.empty([space_n, draws])
p_array = np.empty([space_n, draws])
b_array_ystar = np.empty([space_n, draws])
p_array_ystar = np.empty([space_n, draws])
r2_array = np.empty([space_n, draws])
r2i_array = np.empty([space_n])
ll_array = np.empty([space_n])
ll_raw_array = np.empty([space_n])
ll_null_array = np.empty([space_n])
ll_gain_array = np.empty([space_n])
ll_gain_per_obs_array = np.empty([space_n])
nobs_array = np.empty([space_n], dtype=int)
aic_array = np.empty([space_n])
bic_array = np.empty([space_n])
hqic_array = np.empty([space_n])
av_k_metric_array = np.empty([space_n])
shap_return = None
if compute_shap:
import shap
# Prepare SHAP dataset (includes y, x, z, and optional group)
if group:
SHAP_comb = self.data[self.y + self.x + [group] + controls]
SHAP_comb = group_demean(SHAP_comb, group=group)
else:
SHAP_comb = self.data[self.y + self.x + controls]
SHAP_comb = SHAP_comb.dropna()
SHAP_comb = SHAP_comb.reset_index(drop=True).copy()
# Fit linear model for SHAP values
x_train, x_test, y_train, _ = train_test_split(
SHAP_comb[self.x + controls].drop(columns=['const'], errors='ignore'),
SHAP_comb[self.y],
test_size=0.2,
random_state=self.seed
)
model = sklearn.linear_model.LinearRegression()
model.fit(x_train, y_train)
# Explain model predictions with SHAP
explainer = shap.LinearExplainer(model, x_train)
shap_return = [explainer.shap_values(x_test), x_test]
# Generate shared bootstrap seeds ONCE for all specifications
# This ensures all specs use the same bootstrap samples, creating proper correlation
shared_seeds = np.random.randint(0, 2 ** 32 - 1, size=draws, dtype=np.int64)
y_single_df = self.data[self.y]
# Loop over all z-specs (combinations of control variables)
for index, spec in _progress_iter(
enumerate(z_specs),
total=space_n,
description="Control specs",
):
spec_predictors = _get_predictors_for_spec(spec)
comb = pd.concat([y_single_df, spec_predictors], axis=1)
# Clean combined dataset
comb = comb.dropna()
comb = comb.reset_index(drop=True).copy()
# Demean by group if required
if group:
comb = group_demean(comb, group=group)
# Fit model on full sample
(
b_all, p_all, r2_i, ll_i,
aic_i, bic_i, hqic_i, av_k_metric_i,
ll_raw_i, ll_null_i, ll_gain_i, ll_gain_per_obs_i, nobs_i
) = self._full_sample_OLS(
comb,
kfold=kfold,
group=group,
oos_metric_name=self.oos_metric_name
)
# Calculate pseudo-outcome residuals for bootstrapping
y_star = comb.iloc[:, [0]] - np.dot(comb.iloc[:, [1]], b_all[0][0])
bootstrap_data = _prepare_ols_bootstrap_data(comb, y_star)
if group:
n_predictors = bootstrap_data.drop(
columns=[bootstrap_data.columns[0], "y_star", group],
errors="ignore"
).shape[1]
min_rows_after_filter = max(5, n_predictors + 2)
bootstrap_arrays = None
else:
min_rows_after_filter = None
bootstrap_arrays = _prepare_non_group_ols_bootstrap_arrays(bootstrap_data)
# Bootstrap OLS model draws in parallel using SHARED seeds
# All specs use the same seeds to create proper correlation structure
def _run_one_seed(seed_i: int):
if group is None:
assert bootstrap_arrays is not None
return _strap_non_group_OLS_arrays(
*bootstrap_arrays,
sample_size=sample_size,
seed=seed_i,
)
return self._strap_OLS(
bootstrap_data,
group,
sample_size,
seed_i,
None,
min_rows_after_filter=min_rows_after_filter,
)
bootstrap_out = _run_parallel_seed_batches(
seeds=shared_seeds,
n_cpu=n_cpu,
run_one_seed=_run_one_seed,
draws_bar=notebook_draws_bar,
dispatch_mode="streaming" if group else "chunked",
)
b_list, p_list, r2_list, b_list_ystar, p_list_ystar = zip(*bootstrap_out)
# Store results for current spec
specs.append(frozenset(spec))
all_predictors.append(self.x + list(spec))
b_array[index, :] = b_list
p_array[index, :] = p_list
b_array_ystar[index, :] = b_list_ystar
p_array_ystar[index, :] = p_list_ystar
r2_array[index, :] = r2_list
r2i_array[index] = r2_i
ll_array[index] = ll_i
ll_raw_array[index] = ll_raw_i
ll_null_array[index] = ll_null_i
ll_gain_array[index] = ll_gain_i
ll_gain_per_obs_array[index] = ll_gain_per_obs_i
nobs_array[index] = nobs_i
aic_array[index] = aic_i
bic_array[index] = bic_i
hqic_array[index] = hqic_i
av_k_metric_array[index] = av_k_metric_i
b_all_list.append(b_all)
p_all_list.append(p_all)
# Assemble final results object
results = OLSResult(y=self.y[0],
x=self.x[0],
data=self.data,
specs=specs,
all_predictors=all_predictors,
controls=controls,
draws=draws,
kfold=kfold,
all_b=b_all_list,
all_p=p_all_list,
estimates=b_array,
p_values=p_array,
estimates_ystar=b_array_ystar,
p_values_ystar=p_array_ystar,
r2_values=r2_array,
r2i_array=r2i_array,
ll_array=ll_array,
ll_raw_array=ll_raw_array,
ll_null_array=ll_null_array,
ll_gain_array=ll_gain_array,
ll_gain_per_obs_array=ll_gain_per_obs_array,
nobs_array=nobs_array,
aic_array=aic_array,
bic_array=bic_array,
hqic_array=hqic_array,
av_k_metric_array=av_k_metric_array,
model_name=self.model_name,
name_av_k_metric=self.oos_metric_name,
shap_return=shap_return)
self.results = results
if notebook_draws_bar is not None:
notebook_draws_bar.close()
def _predict(
self,
x_test: np.ndarray,
betas: np.ndarray
) -> np.ndarray:
"""
Predict the dependent variable based on the test data and coefficients.
Parameters
----------
x_test : array-like, shape=(n_obs, n_features)
Independent variables.
betas : array-like, shape=(n_features,)
Regression coefficients.
Returns
-------
y_pred : array
Predicted values for the dependent variable.
"""
return np.dot(x_test, betas)
def _full_sample_OLS(
self,
comb_var,
kfold,
group,
oos_metric_name
):
"""
Call stripped_ols() over the full data containing y, x, and controls.
Parameters
----------
comb_var : pd.DataFrame
Combined y, x, and controls dataset (cleaned).
kfold : int
Number of CV folds; if 0 or None, skips CV.
group : str or None
Grouping column for fixed effects (dropped before fit).
oos_metric_name : str
Out-of-sample metric ('pseudo-r2' or 'rmse')..
Returns
-------
b : np.ndarray
Full-sample coefficient estimates.
p : np.ndarray
Full-sample p-values.
r2 : float
In-sample R².
ll_metric : float
Scale-invariant null-relative log-likelihood gain per observation.
ll_raw : float
Raw Gaussian log-likelihood of the full model.
ll_null : float
Raw Gaussian log-likelihood of the intercept-only model.
ll_gain : float
Difference ll_raw - ll_null.
ll_gain_per_obs : float
ll_gain divided by nobs.
nobs : int
Number of observations used in this specification.
aic : float
Akaike Information Criterion.
bic : float
Bayesian Information Criterion.
hqic : float
Hannan-Quinn Information Criterion.
av_k_metric : float
Average out-of-sample metric across folds.
"""
# Separate response and predictors
y = comb_var.iloc[:, [0]]
x_temp = comb_var.drop(comb_var.columns[0], axis=1)
# Drop group column if provided
if group:
x = x_temp.drop(columns=group)
else:
x = x_temp
# Fit full sample OLS
out = simple_ols(y=y, x=x)
# Initialize cross-validated metric
av_k_metric = None
# Cross-validation (with or without grouping)
if kfold:
metric = []
if group:
n_groups = pd.Series(x_temp[group]).nunique(dropna=False)
if kfold > n_groups:
raise ValueError(
f"kfold={kfold} cannot exceed number of unique groups "
f"({n_groups}) for grouped OLS CV."
)
k_fold = GroupKFold(kfold)
for k, (train, test) in enumerate(k_fold.split(x, y, groups=x_temp[group])):
out_k = simple_ols(y=y.loc[train], x=x.loc[train])
y_pred = self._predict(x.loc[test], out_k['b'])
y_true = y.loc[test]
if oos_metric_name == 'rmse':
k_rmse = root_mean_squared_error(y_true, y_pred)
metric.append(k_rmse)
elif oos_metric_name == 'pseudo-r2':
k_r2 = pseudo_r2(y_true, y_pred, np.mean(y.loc[train]))
metric.append(k_r2)
else:
raise ValueError('No valid OOS metric provided.')
else:
k_fold = KFold(kfold)
for k, (train, test) in enumerate(k_fold.split(x, y)):
out_k = simple_ols(y=y.loc[train], x=x.loc[train])
y_pred = self._predict(x.loc[test], out_k['b'])
y_true = y.loc[test]
if oos_metric_name == 'rmse':
k_rmse = root_mean_squared_error(y_true, y_pred)
metric.append(k_rmse)
elif oos_metric_name == 'pseudo-r2':
k_r2 = pseudo_r2(y_true, y_pred, np.mean(y.loc[train]))
metric.append(k_r2)
else:
raise ValueError('No valid OOS metric provided.')
av_k_metric = np.mean(metric)
# Return model fit statistics and OOS metric
return (
out['b'],
out['p'],
out['r2'],
out['ll_gain_per_obs'],
out['aic'],
out['bic'],
out['hqic'],
av_k_metric,
out['ll'],
out['ll_null'],
out['ll_gain'],
out['ll_gain_per_obs'],
out['nobs'],
)
def _strap_OLS(
self,
comb_var: pd.DataFrame,
group: Optional[str],
sample_size: int,
seed: int,
y_star: Optional[np.ndarray],
group_bootstrap_lookup: Optional[tuple[np.ndarray, dict[Any, np.ndarray]]] = None,
min_rows_after_filter: Optional[int] = None,
) -> tuple:
"""
Call stripped_ols() over a random sample of the data containing y, x, and controls.
Parameters
----------
comb_var : pd.DataFrame
Combined y, x, and controls dataset.
group : str or None
Grouping column for fixed effects; sampling respects groups if provided.
sample_size : int
Number of observations in bootstrap sample.
seed : int
Random seed for resampling.
y_star : array-like, optional
Pseudo-outcome residuals for stratified bootstrap.
If None, ``comb_var`` is expected to already include a ``y_star``
column. This optimized path is output-equivalent to passing
``comb_var`` and ``y_star`` separately.
Returns
-------
beta : float
Bootstrap coefficient estimate for focal predictor.
p : float
Associated p-value.
r2 : float
In-sample R² for the bootstrap sample.
"""
if y_star is None:
temp_data = comb_var
else:
temp_data = _prepare_ols_bootstrap_data(comb_var, y_star)
if group is None:
# Sample randomly from full data (no group structure)
samp_df = temp_data.sample(n=sample_size, replace=True, random_state=seed)
y = samp_df.iloc[:, [0]]
y_star = samp_df.iloc[:, [-1]]
x = samp_df.drop(columns=[samp_df.columns[0], 'y_star'])
else:
if min_rows_after_filter is None:
n_predictors = temp_data.drop(
columns=[temp_data.columns[0], "y_star", group],
errors="ignore"
).shape[1]
min_rows_after_filter = max(5, n_predictors + 2)
# Cluster bootstrap with singleton guard.
sample_df, used_filtered, n_filtered, n_selected, attempts_used = _cluster_bootstrap_singleton_safe(
temp_data=temp_data,
group=group,
seed=seed,
target_rows=sample_size,
min_rows_after_filter=min_rows_after_filter,
group_lookup=group_bootstrap_lookup,
)
if not used_filtered:
warnings.warn(
f"Grouped bootstrap singleton filtering kept {n_filtered} rows (< {min_rows_after_filter}) after "
f"{attempts_used} attempt(s); falling back to unfiltered grouped sample ({n_selected} rows).",
UserWarning
)
# Drop grouping variable and extract y, y_star, and x
no_singleton = sample_df.drop(columns=[group])
y = no_singleton.iloc[:, [0]]
y_star = no_singleton.iloc[:, no_singleton.columns.get_loc('y_star')].to_frame()
x = no_singleton.drop(columns=[no_singleton.columns[0], 'y_star'])
# Fit standard OLS and y_star model
# When using group-demeaning (fixed effects), don't add const since intercept is absorbed
output = stripped_ols(y=y, x=x, add_const=(group is None))
output_ystar = stripped_ols(y=y_star, x=x, add_const=(group is None))
b = output['b']
p = output['p']
r2 = output['r2']
b_ystar = output_ystar['b']
p_ystar = output_ystar['p']
return b[0][0], p[0][0], r2, b_ystar[0][0], p_ystar[0][0]
[docs]
class LRobust(BaseRobust):
"""
A class to perform logistic regression analysis using statsmodels.Logit.
Parameters
----------
y : list[str]
Name of the dependent binary variable, supplied as a one-element list.
Multiple binary outcomes are not currently supported in a single LRobust fit.
x : list[str]
Predictor column name(s) included in every specification. The first element
is treated as the reported focal estimand; any additional elements are fixed
predictors that do not vary across the control-subset space.
data : pandas.DataFrame
The dataset containing `y`, `x`, and any optional controls.
model_name : str, default='Logistic Regression Robust'
Custom label for this model, used in outputs and plots.
Attributes
----------
results : OLSResult
Populated after calling `.fit()`. Contains all coefficient,
p-value, and metric outputs.
"""
def __init__(
self,
*,
y: List[str],
x: List[str],
data: pd.DataFrame,
model_name: str = 'Logistic Regression Robust'
) -> None:
"""
Initialize the logistic robust analysis.
Parameters
----------
y : str or list of str
Dependent variable name(s).
x : str or list of str
Predictor variable name(s).
data : pandas.DataFrame
Input dataset.
model_name : str, optional
Descriptive name for the model. Default 'Logistic Regression Robust'.
"""
super().__init__(y=y, x=x, data=data, model_name=model_name)
[docs]
def get_results(self) -> Any:
"""
Get the results of the logistic regression.
Returns
-------
results : OLSResult
Object containing the regression results.
"""
return self.results
[docs]
def multiple_y(self) -> None:
raise NotImplementedError("Not implemented yet")
def _full_sample(
self,
comb_var: pd.DataFrame,
kfold: int,
group: Optional[str],
oos_metric_name: str
) -> Tuple[np.ndarray, np.ndarray, float, float, float, float, float, Optional[float]]:
"""
Run logistic regression over the full dataset (with y, x, controls),
and optionally compute cross-validated out-of-sample metrics.
Parameters
----------
comb_var : pandas.DataFrame
DataFrame with columns [y, x, controls...].
kfold : int
Number of folds for cross-validation.
group : str or None
Grouping variable name (reserved for future use with fixed effects).
oos_metric_name : str
Out-of-sample metric to compute (if kfold > 1). Options:
'mcfaddens-r2', 'pseudo-r2', 'rmse', 'cross-entropy', or 'imv'.
Returns
-------
beta : ndarray
Coefficient estimates from full sample regression.
p : ndarray
P-value estimates from full sample regression.
r2 : float
In-sample pseudo-R² from full model.
ll : float
Log-likelihood.
aic : float
Akaike Information Criterion.
bic : float
Bayesian Information Criterion.
hqic : float
Hannan-Quinn Information Criterion.
av_k_metric : float or None
Average k-fold cross-validation metric, or None if no cross-validation.
"""
# TODO: Implement Fixed Effects Logistic Regression (FE)
# Split into response (y) and predictors (x). If `group` is provided,
# exclude it from the model matrix but use it for grouped CV splits.
y = comb_var.iloc[:, [0]]
x_temp = comb_var.drop(comb_var.columns[0], axis=1)
if group is not None:
if group not in x_temp.columns:
raise ValueError(
f"Grouping column '{group}' must be present in comb_var when group CV is requested."
)
groups = x_temp[group]
x = x_temp.drop(columns=[group])
else:
groups = None
x = x_temp
# Fit logistic regression on the full sample
out = logistic_regression_sm(y=y, x=x)
# Initialize output for cross-validation metric
av_k_metric = None
# Perform k-fold cross-validation if requested
if kfold:
if group is not None:
n_groups = pd.Series(groups).nunique(dropna=False)
if kfold > n_groups:
raise ValueError(
f"kfold={kfold} cannot exceed number of unique groups ({n_groups})."
)
k_fold = GroupKFold(n_splits=kfold)
split_iter = k_fold.split(x, y, groups=groups)
else:
y_vec = y.iloc[:, 0].to_numpy()
class_counts = pd.Series(y_vec).value_counts(dropna=False)
if class_counts.shape[0] < 2:
raise ValueError(
"Logistic cross-validation requires at least two outcome classes "
"after preprocessing."
)
min_class_count = int(class_counts.min())
if kfold > min_class_count:
raise ValueError(
f"kfold={kfold} cannot exceed the minority-class count "
f"({min_class_count}) for stratified logistic CV."
)
k_fold = StratifiedKFold(n_splits=kfold, shuffle=True, random_state=self.seed)
split_iter = k_fold.split(x, y_vec)
metric = []
for _, (train, test) in enumerate(split_iter):
# Fit model on training fold
out_k = logistic_regression_sm(y=y.iloc[train], x=x.iloc[train])
# Predict on test fold
y_pred = self._predict_LR(x.iloc[test], out_k['b'])
y_true = y.iloc[test]
# Evaluate prediction accuracy using selected OOS metric
if oos_metric_name == 'rmse':
k_rmse = root_mean_squared_error(y_true, y_pred)
metric.append(k_rmse)
elif oos_metric_name in {'mcfaddens-r2', 'mcfadden-r2'}:
k_r2 = mcfadden_r2(y_true, y_pred, np.mean(y.iloc[train]))
metric.append(k_r2)
elif oos_metric_name == 'pseudo-r2':
k_r2 = pseudo_r2(y_true, y_pred, np.mean(y.iloc[train]))
metric.append(k_r2)
elif oos_metric_name == 'cross-entropy':
k_cross_entropy = log_loss(y_true, y_pred)
metric.append(k_cross_entropy)
elif oos_metric_name == 'imv':
imv = calculate_imv_score(
y_true,
y_pred,
null_mean=float(np.mean(y.iloc[train])),
)
metric.append(imv)
else:
raise ValueError('No valid OOS metric provided.')
# Average metric across all folds
av_k_metric = np.mean(metric)
# Return model statistics and cross-validation result
return (
out['b'], # Coefficients
out['p'], # P-values
out['r2'], # In-sample pseudo-R²
out['ll'], # Log-likelihood
out['aic'], # AIC
out['bic'], # BIC
out['hqic'], # HQIC
av_k_metric # Cross-validation metric (if any)
)
def _predict_LR(
self,
x_test: pd.DataFrame,
betas: np.ndarray
) -> np.ndarray:
"""
Predict the dependent variable using the estimated coefficients.
Parameters
----------
x_test : pandas.DataFrame
Test set covariates (no constant column).
betas : ndarray
Fitted coefficients (including intercept at end).
Returns
-------
ndarray
Predicted probabilities for the positive class.
"""
return 1 / (1 + np.exp(-x_test.dot(betas)))
[docs]
def fit(
self,
*,
controls: List[str],
group: Optional[str] = None,
draws: int = None,
sample_size: Optional[int] = None,
kfold: int = None,
oos_metric: str = None,
n_cpu: Optional[int] = None,
seed: Optional[int] = None,
rescale_x: Optional[bool] = False,
rescale_y: Optional[bool] = False,
rescale_z: Optional[bool] = False,
compute_shap: bool = True,
threshold: int = 1000000
) -> 'LRobust':
"""
Fit the logistic regression models over the specification space and bootstrap samples.
Parameters
----------
controls : list of str
Names of optional control variables to include in every spec.
group : str, optional
Grouping variable used for grouped cross-validation and cluster
bootstrap resampling. Logistic fixed-effects demeaning is not
currently implemented.
draws : int, default=None
Number of bootstrap resamples per specification.
sample_size : int, optional
Number of observations per bootstrap draw; defaults to full dataset.
kfold : int, default=None
Folds for out-of-sample CV; set to 0 to disable.
oos_metric : default None.
Options: {'pseudo-r2', 'mcfaddens-r2', 'imv', 'rmse','cross-entropy'}.
Metric to compute on held-out folds.
n_cpu : int, optional
Number of parallel jobs; defaults to all available.
seed : int, optional
Random seed for reproducibility.
rescale_y : bool, default=False
Rescale the dependent variable.
rescale_x : bool, default=False
Rescale the x variable.
rescale_z : bool, default=False
Rescale the z variables.
compute_shap : bool, default=True
If True, compute SHAP values for plotting. Set False for fit-only profiling.
threshold : int, default=1000000
Warn if `draws * n_specs` exceeds this.
Returns
-------
self : LRobust
Self, with `.results` populated as an `OLSResult`.
"""
# Rescale dependent variable (y) if needed
if rescale_y is True:
if len(self.y) > 1:
warnings.warn(
"Rescaling of the dependent variable already occurs with multiple "
"y variables. Skipping additional rescaling.",
UserWarning
)
else:
self.data[self.y] = rescale(self.data[self.y])
# Rescale independent variables (x)
if rescale_x is True:
for column in self.x:
self.data[column] = rescale(self.data[column])
# Rescale control variables (z)
if rescale_z is True:
for column in controls:
self.data[column] = rescale(self.data[column])
# Prepare modeling setup (includes seed, CPU count, metric, etc.)
draws, kfold, oos_metric, n_cpu, seed = make_inquiry(
self.model_name,
self.y,
self.data,
draws,
kfold,
oos_metric,
n_cpu,
seed
)
self.seed = seed
self.data = self.data.copy()
if oos_metric == 'mcfadden-r2':
oos_metric = 'mcfaddens-r2'
# Logistic fixed-effects demeaning is not implemented. If `group` is
# provided, we still use it for grouped CV/bootstrap resampling, but do
# not demean y/x/z.
if group is not None:
warnings.warn(
"[LRobust] Logistic fixed-effects demeaning is unsupported; "
"`group` will be used for grouped CV/bootstrap resampling only (no demeaning).",
UserWarning
)
# Ensure a single constant term in the design matrix.
self.x, controls = _ensure_single_constant(self.data, self.y, self.x, controls)
# Validate modeling parameters
self._validate_fit_args(
controls=controls,
group=group,
draws=draws,
kfold=kfold,
oos_metric=oos_metric,
n_cpu=n_cpu,
seed=seed,
valid_oos_metrics=['pseudo-r2', 'mcfaddens-r2', 'rmse', 'cross-entropy', 'imv'],
threshold=threshold
)
# Set seed for reproducibility
np.random.seed(self.seed)
# Store the selected out-of-sample evaluation metric
self.oos_metric_name = oos_metric
# Log model configuration
print(
f"LRobust is running with n_cpu={n_cpu}, draws={draws}, "
f"folds={kfold}, seed={seed}.\n"
f"We're evaluating our out-of-sample predictions with the "
f"{oos_metric} metric.\n"
f"The target variable of interest is {self.x[0]}. "
f"Let's begin the calculations..."
)
# Default to using the full dataset size if sample_size is not provided
if sample_size is None:
sample_size = self.data.shape[0]
# Currently only supports one dependent variable for logistic regression
if len(self.y) > 1:
raise NotImplementedError("Not implemented yet for logistic regression") # TODO: Add multi-y logistic support
else:
z_specs = [tuple(spec) for spec in all_subsets(controls)]
space_n = len(z_specs)
control_positions = {name: idx for idx, name in enumerate(controls)}
predictor_prefix = self.x + ([group] if group else [])
base_predictors_df = self.data[predictor_prefix + controls]
y_single_df = self.data[self.y]
spec_predictor_cache: dict[int, pd.DataFrame] = {}
def _get_lr_predictors_for_spec(spec_tuple: tuple[str, ...]) -> pd.DataFrame:
mask = _spec_bitmask(spec_tuple, control_positions)
cached = spec_predictor_cache.get(mask)
if cached is None:
cols = predictor_prefix + list(spec_tuple)
cached = base_predictors_df.loc[:, cols]
spec_predictor_cache[mask] = cached
return cached
notebook_draws_bar = _make_notebook_draws_bar(
total=space_n * draws,
description="Bootstrap draws"
)
specs = []
all_predictors = []
b_all_list = []
p_all_list = []
# Initialize arrays to store model statistics across all control sets
b_array = np.empty([space_n, draws])
p_array = np.empty([space_n, draws])
b_array_ystar = np.empty([space_n, draws])
p_array_ystar = np.empty([space_n, draws])
r2_array = np.empty([space_n, draws])
r2i_array = np.empty([space_n])
ll_array = np.empty([space_n])
aic_array = np.empty([space_n])
bic_array = np.empty([space_n])
hqic_array = np.empty([space_n])
av_k_metric_array = np.empty([space_n])
shap_return = None
if compute_shap:
import shap
# Preprocess data for SHAP values. Do not demean for logistic models.
if group:
SHAP_comb = self.data[self.y + self.x + [group] + controls]
else:
SHAP_comb = self.data[self.y + self.x + controls]
SHAP_comb = SHAP_comb.dropna().reset_index(drop=True).copy()
# Split data for training and testing
x_train, x_test, y_train, _ = train_test_split(
SHAP_comb[self.x + controls].drop(columns=['const'], errors='ignore'),
SHAP_comb[self.y],
test_size=0.2,
random_state=self.seed
)
# Train logistic regression for SHAP explainability
model = sklearn.linear_model.LogisticRegression(penalty="l2", C=0.1)
model.fit(x_train, y_train.squeeze())
explainer = shap.LinearExplainer(model, x_train)
shap_return = [explainer.shap_values(x_test), x_test]
# Loop through all subsets of control variables
for index, spec in _progress_iter(
enumerate(z_specs),
total=space_n,
description="Control specs",
):
spec_predictors = _get_lr_predictors_for_spec(spec)
comb = pd.concat([y_single_df, spec_predictors], axis=1)
comb = comb.dropna().reset_index(drop=True).copy()
if group:
X_design = comb.drop(columns=[comb.columns[0], group], errors='ignore')
else:
X_design = comb.drop(columns=comb.columns[0]) # Drop y column
self._check_colinearity(X_design)
# Estimate full-sample model metrics
(b_all, p_all, r2_i, ll_i,
aic_i, bic_i, hqic_i,
av_k_metric_i) = self._full_sample(
comb,
kfold=kfold,
group=group,
oos_metric_name=self.oos_metric_name
)
# Bootstrap sampling (parallelized)
seeds = np.random.randint(0, 2**32 - 1, size=draws, dtype=np.int64)
group_bootstrap_lookup = (
_make_group_bootstrap_lookup(comb, group)
if group else None
)
if group:
n_predictors = comb.drop(
columns=[comb.columns[0], group],
errors="ignore"
).shape[1]
min_rows_after_filter = max(5, n_predictors + 2)
else:
min_rows_after_filter = None
def _run_one_seed(seed_i: int):
return self._strap_regression(
comb,
group,
sample_size,
seed_i,
group_bootstrap_lookup=group_bootstrap_lookup,
min_rows_after_filter=min_rows_after_filter,
)
bootstrap_out = _run_parallel_seed_batches(
seeds=seeds,
n_cpu=n_cpu,
run_one_seed=_run_one_seed,
draws_bar=notebook_draws_bar
)
b_list, p_list, r2_list = zip(*bootstrap_out)
# Save results
specs.append(frozenset(spec))
all_predictors.append(self.x + list(spec))
b_array[index, :] = b_list
p_array[index, :] = p_list
b_array_ystar[index, :] = np.nan * len(b_list)
p_array_ystar[index, :] = np.nan * len(p_list)
r2_array[index, :] = r2_list
r2i_array[index] = r2_i
ll_array[index] = ll_i
aic_array[index] = aic_i
bic_array[index] = bic_i
hqic_array[index] = hqic_i
av_k_metric_array[index] = av_k_metric_i
b_all_list.append(b_all)
p_all_list.append(p_all)
# Store all modeling results in an OLSResult object
self.results = OLSResult(
y=self.y[0],
x=self.x[0],
data=self.data,
specs=specs,
all_predictors=all_predictors,
controls=controls,
draws=draws,
kfold=kfold,
all_b=b_all_list,
all_p=p_all_list,
estimates=b_array,
p_values=p_array,
estimates_ystar=b_array_ystar,
p_values_ystar=p_array_ystar,
r2_values=r2_array,
r2i_array=r2i_array,
ll_array=ll_array,
aic_array=aic_array,
bic_array=bic_array,
hqic_array=hqic_array,
av_k_metric_array=av_k_metric_array,
model_name=self.model_name,
name_av_k_metric=self.oos_metric_name,
shap_return=shap_return
)
if notebook_draws_bar is not None:
notebook_draws_bar.close()
def _strap_regression(
self,
comb_var: pd.DataFrame,
group: Optional[str],
sample_size: int,
seed: int,
group_bootstrap_lookup: Optional[tuple[np.ndarray, dict[Any, np.ndarray]]] = None,
min_rows_after_filter: Optional[int] = None,
) -> Tuple[float, float, float]:
"""
Perform one bootstrap draw of logistic regression.
Parameters
----------
comb_var : pandas.DataFrame
DataFrame with `y` in first column and features in remaining columns.
group : str, optional
Grouping variable name used for cluster bootstrap resampling.
sample_size : int
Number of rows to sample with replacement.
seed : int
Random seed for sampling.
Returns
-------
beta : float
Estimated coefficient for the primary predictor.
p : float
Corresponding p-value.
r2 : float
In-sample pseudo-R² of this bootstrap sample.
"""
temp_data = comb_var
if group is None:
samp_df = temp_data.sample(n=sample_size, replace=True, random_state=seed)
y = samp_df.iloc[:, [0]]
x = samp_df.drop(samp_df.columns[0], axis=1)
else:
if min_rows_after_filter is None:
n_predictors = temp_data.drop(
columns=[temp_data.columns[0], group],
errors="ignore"
).shape[1]
min_rows_after_filter = max(5, n_predictors + 2)
# Cluster bootstrap with singleton guard.
sample_df, used_filtered, n_filtered, n_selected, attempts_used = _cluster_bootstrap_singleton_safe(
temp_data=temp_data,
group=group,
seed=seed,
target_rows=sample_size,
min_rows_after_filter=min_rows_after_filter,
group_lookup=group_bootstrap_lookup,
)
if not used_filtered:
warnings.warn(
f"Grouped bootstrap singleton filtering kept {n_filtered} rows (< {min_rows_after_filter}) after "
f"{attempts_used} attempt(s); falling back to unfiltered grouped sample ({n_selected} rows).",
UserWarning
)
no_singleton = sample_df.drop(columns=[group])
y = no_singleton.iloc[:, [0]]
x = no_singleton.drop(columns=no_singleton.columns[0])
output = logistic_regression_sm(y, x)
return output['b'][0][0], output['p'][0][0], output['r2']