Module pulearn.propensity

Propensity-estimation utilities for positive-unlabeled learning.

Sub-modules

pulearn.propensity.base

Shared utilities and base classes for PU propensity estimators.

pulearn.propensity.bootstrap

Bootstrap confidence intervals and instability warnings for c estimators.

pulearn.propensity.diagnostics

SCAR sanity-check helpers for propensity estimation workflows.

pulearn.propensity.estimators

Robust propensity estimators for SCAR-style PU workflows.

pulearn.propensity.sar

Experimental SAR propensity hooks and inverse-propensity weights.

Functions

def bootstrap_propensity_confidence_interval(estimator,
y,
*,
s_proba=None,
X=None,
n_resamples=200,
confidence_level=0.95,
random_state=None,
std_threshold=0.05,
cv_threshold=0.15,
fold_spread_threshold=0.1,
warn_on_instability=True)
Expand source code Browse git
def bootstrap_propensity_confidence_interval(
    estimator,
    y,
    *,
    s_proba=None,
    X=None,
    n_resamples=200,
    confidence_level=0.95,
    random_state=None,
    std_threshold=0.05,
    cv_threshold=0.15,
    fold_spread_threshold=0.1,
    warn_on_instability=True,
):
    """Estimate a propensity confidence interval with stratified bootstrap."""
    _validate_bootstrap_estimator(estimator)
    if n_resamples < 2:
        raise ValueError("n_resamples must be at least 2.")
    if not 0 < confidence_level < 1:
        raise ValueError("confidence_level must lie strictly in (0, 1).")
    if std_threshold < 0:
        raise ValueError("std_threshold must be non-negative.")
    if cv_threshold < 0:
        raise ValueError("cv_threshold must be non-negative.")
    if fold_spread_threshold < 0:
        raise ValueError("fold_spread_threshold must be non-negative.")
    if n_resamples < 30:
        warnings.warn(
            (
                "Bootstrap intervals with fewer than 30 resamples can be "
                "unstable."
            ),
            UserWarning,
            stacklevel=2,
        )

    labels = _normalize_propensity_labels(
        y,
        context="bootstrap {}".format(type(estimator).__name__),
    )
    if X is None and s_proba is None:
        raise ValueError("Bootstrap requires X or s_proba inputs.")
    X_arr = None
    if X is not None:
        X_arr = _validated_feature_matrix(
            X,
            labels,
            context="bootstrap {}".format(type(estimator).__name__),
        )
    s_proba_arr = None
    if s_proba is not None:
        s_proba_arr = _propensity_score_array(s_proba, y=labels)

    rng = check_random_state(random_state)
    bootstrap_estimates = []
    failures = 0
    for _ in range(n_resamples):
        sample_indices = _stratified_bootstrap_indices(labels, rng)
        bootstrap_estimator = clone(estimator)
        _seed_estimator_random_state(
            bootstrap_estimator,
            int(rng.randint(np.iinfo(np.int32).max)),
        )
        try:
            fitted = bootstrap_estimator.fit(
                labels[sample_indices],
                s_proba=(
                    None
                    if s_proba_arr is None
                    else s_proba_arr[sample_indices]
                ),
                X=None if X_arr is None else X_arr[sample_indices],
            )
        except ValueError:
            failures += 1
            continue
        result = getattr(fitted, "result_", None)
        if result is None:
            raise TypeError(
                "Bootstrap estimator {} must set result_ after fit().".format(
                    type(bootstrap_estimator).__name__
                )
            )
        bootstrap_estimates.append(
            _validate_bootstrap_result(result, bootstrap_estimator)
        )

    if not bootstrap_estimates:
        raise ValueError(
            "Bootstrap failed for every resample; could not estimate a "
            "confidence interval."
        )
    if failures:
        warnings.warn(
            (
                "Skipped {} bootstrap resamples that failed to fit cleanly."
            ).format(failures),
            UserWarning,
            stacklevel=2,
        )

    estimates = np.asarray(bootstrap_estimates, dtype=float)
    warning_flags = list(
        _stability_warning_flags(
            estimates,
            estimator=estimator,
            failures=failures,
            n_resamples=n_resamples,
            std_threshold=std_threshold,
            cv_threshold=cv_threshold,
            fold_spread_threshold=fold_spread_threshold,
        )
    )
    if warn_on_instability and warning_flags:
        warnings.warn(
            ("Propensity bootstrap for {} indicates instability: {}.").format(
                type(estimator).__name__, ", ".join(warning_flags)
            ),
            UserWarning,
            stacklevel=2,
        )

    alpha = 0.5 * (1.0 - confidence_level)
    lower, upper = np.quantile(estimates, [alpha, 1.0 - alpha])
    return PropensityConfidenceInterval(
        lower=float(lower),
        upper=float(upper),
        confidence_level=float(confidence_level),
        n_resamples=int(n_resamples),
        successful_resamples=int(estimates.shape[0]),
        random_state=_serialize_random_state(random_state),
        mean=float(np.mean(estimates)),
        std=float(np.std(estimates, ddof=0)),
        warning_flags=tuple(warning_flags),
    )

Estimate a propensity confidence interval with stratified bootstrap.

def compute_inverse_propensity_weights(propensity_scores, *, clip_min=0.05, clip_max=1.0, normalize=False)
Expand source code Browse git
def compute_inverse_propensity_weights(
    propensity_scores,
    *,
    clip_min=0.05,
    clip_max=1.0,
    normalize=False,
):
    """Compute inverse-propensity weights with clipping and validation."""
    _warn_experimental(stacklevel=3)
    if clip_min <= 0 or clip_min > 1:
        raise ValueError("clip_min must lie in (0, 1].")
    if clip_max <= 0 or clip_max > 1:
        raise ValueError("clip_max must lie in (0, 1].")
    if clip_min > clip_max:
        raise ValueError("clip_min must not exceed clip_max.")

    scores = _validated_propensity_scores(propensity_scores)
    clipped_scores = np.clip(scores, clip_min, clip_max)
    weights = 1.0 / clipped_scores
    if normalize:
        weights = weights / np.mean(weights)

    return SarWeightResult(
        propensity_scores=scores,
        weights=weights.astype(float, copy=False),
        clip_min=float(clip_min),
        clip_max=float(clip_max),
        clipped_count=int(np.sum(clipped_scores != scores)),
        normalized=bool(normalize),
        effective_sample_size=_effective_sample_size(weights),
        metadata={
            "min_propensity": float(np.min(scores)),
            "max_propensity": float(np.max(scores)),
            "mean_weight": float(np.mean(weights)),
            "max_weight": float(np.max(weights)),
        },
    )

Compute inverse-propensity weights with clipping and validation.

def predict_sar_propensity(propensity_model, X)
Expand source code Browse git
def predict_sar_propensity(propensity_model, X):
    """Return validated propensity scores from a model object or callable."""
    _warn_experimental(stacklevel=3)
    X_arr = _validated_model_matrix(X)
    if callable(propensity_model):
        scores = _validated_propensity_scores(
            propensity_model(X_arr),
            n_samples=X_arr.shape[0],
        )
    elif hasattr(propensity_model, "predict_proba"):
        scores = _positive_class_scores(propensity_model, X_arr)
    else:
        raise TypeError(
            "propensity_model must be callable or implement predict_proba()."
        )
    return scores

Return validated propensity scores from a model object or callable.

def scar_sanity_check(y,
*,
s_proba,
X=None,
candidate_quantile=0.9,
cv=5,
random_state=None,
score_ks_threshold=0.3,
mean_smd_threshold=0.2,
max_smd_threshold=0.35,
auc_threshold=0.7,
min_candidate_samples=20,
warn_on_violation=True,
group_estimator=None)
Expand source code Browse git
def scar_sanity_check(
    y,
    *,
    s_proba,
    X=None,
    candidate_quantile=0.9,
    cv=5,
    random_state=None,
    score_ks_threshold=0.3,
    mean_smd_threshold=0.2,
    max_smd_threshold=0.35,
    auc_threshold=0.7,
    min_candidate_samples=20,
    warn_on_violation=True,
    group_estimator=None,
):
    """Compare labeled positives to high-scoring unlabeled candidates."""
    if candidate_quantile <= 0 or candidate_quantile >= 1:
        raise ValueError("candidate_quantile must lie strictly in (0, 1).")
    if cv < 2:
        raise ValueError("cv must be at least 2.")
    if score_ks_threshold < 0:
        raise ValueError("score_ks_threshold must be non-negative.")
    if mean_smd_threshold < 0:
        raise ValueError("mean_smd_threshold must be non-negative.")
    if max_smd_threshold < 0:
        raise ValueError("max_smd_threshold must be non-negative.")
    if not 0 < auc_threshold <= 1:
        raise ValueError("auc_threshold must lie in (0, 1].")
    if min_candidate_samples < 1:
        raise ValueError("min_candidate_samples must be at least 1.")

    labels = _normalize_propensity_labels(y, context="scar_sanity_check")
    scores = _propensity_score_array(s_proba, y=labels)
    positive_mask = labels == 1
    unlabeled_mask = labels == 0
    unlabeled_scores = scores[unlabeled_mask]
    if unlabeled_scores.size == 0:
        raise ValueError(
            "scar_sanity_check requires unlabeled samples in y_pu."
        )

    candidate_threshold = float(
        np.quantile(unlabeled_scores, candidate_quantile)
    )
    candidate_mask = unlabeled_mask & (scores >= candidate_threshold)

    reference_positive_mask = positive_mask & (scores >= candidate_threshold)
    if not np.any(reference_positive_mask):
        reference_positive_mask = positive_mask

    positive_scores = scores[reference_positive_mask]
    candidate_scores = scores[candidate_mask]
    warning_flags = []
    if candidate_scores.shape[0] < min_candidate_samples:
        warning_flags.append("small_candidate_pool")

    score_ks = _ks_statistic(positive_scores, candidate_scores)
    if score_ks >= score_ks_threshold:
        warning_flags.append("score_shift")

    mean_abs_smd = None
    max_abs_smd = None
    shifted_fraction = None
    group_auc = None
    metadata = {"candidate_quantile": float(candidate_quantile)}
    if X is not None:
        X_arr = _validated_feature_matrix(
            X,
            labels,
            context="scar_sanity_check",
        )
        if X_arr.shape[1] == 0:
            warning_flags.append("empty_feature_matrix")
        else:
            reference_positive_X = X_arr[reference_positive_mask]
            candidate_X = X_arr[candidate_mask]
            smd = _standardized_mean_differences(
                reference_positive_X,
                candidate_X,
            )
            abs_smd = np.abs(smd)
            mean_abs_smd = float(np.mean(abs_smd))
            max_abs_smd = float(np.max(abs_smd))
            shifted_fraction = float(np.mean(abs_smd >= mean_smd_threshold))
            metadata["top_shifted_features"] = _top_shifted_feature_indices(
                abs_smd
            )
            if mean_abs_smd >= mean_smd_threshold:
                warning_flags.append("high_mean_shift")
            if max_abs_smd >= max_smd_threshold:
                warning_flags.append("max_feature_shift")
            group_auc = _group_membership_auc(
                reference_positive_X,
                candidate_X,
                cv=cv,
                random_state=random_state,
                estimator=group_estimator,
            )
            if group_auc is None:
                warning_flags.append("insufficient_group_samples")
            elif group_auc >= auc_threshold:
                warning_flags.append("group_separable")

    result = ScarSanityCheckResult(
        candidate_threshold=candidate_threshold,
        n_labeled_positive=int(np.sum(positive_mask)),
        n_candidate_unlabeled=int(np.sum(candidate_mask)),
        candidate_fraction_unlabeled=float(
            np.mean(scores[unlabeled_mask] >= candidate_threshold)
        ),
        mean_positive_score=float(np.mean(positive_scores)),
        mean_candidate_score=float(np.mean(candidate_scores)),
        score_ks_statistic=float(score_ks),
        mean_abs_smd=mean_abs_smd,
        max_abs_smd=max_abs_smd,
        shifted_feature_fraction=shifted_fraction,
        group_membership_auc=group_auc,
        warnings=tuple(dict.fromkeys(warning_flags)),
        metadata={
            **metadata,
            "n_reference_positive": int(np.sum(reference_positive_mask)),
        },
    )
    if warn_on_violation and result.violates_scar:
        warnings.warn(
            "SCAR sanity check indicates likely assumption drift: {}.".format(
                ", ".join(result.warnings)
            ),
            UserWarning,
            stacklevel=2,
        )
    return result

Compare labeled positives to high-scoring unlabeled candidates.

Classes

class BasePropensityEstimator
Expand source code Browse git
class BasePropensityEstimator(BaseEstimator):
    """Common fit/estimate contract for PU propensity estimators."""

    def fit(self, y, *, s_proba=None, X=None):
        """Fit the estimator and store the estimated propensity."""
        y_arr = _normalize_propensity_labels(y, context=self._fit_context())
        result = self._fit_propensity(y_arr, s_proba=s_proba, X=X)
        self._store_result(result)
        return self

    def estimate(self, y=None, *, s_proba=None, X=None):
        """Return a propensity estimate.

        Fits first when inputs are given.

        """
        if y is not None:
            return self.fit(y, s_proba=s_proba, X=X).result_
        check_is_fitted(self, "result_")
        return self.result_

    def bootstrap(
        self,
        y,
        *,
        s_proba=None,
        X=None,
        n_resamples=200,
        confidence_level=0.95,
        random_state=None,
        std_threshold=0.05,
        cv_threshold=0.15,
        fold_spread_threshold=0.1,
        warn_on_instability=True,
    ):
        """Fit the estimator and attach a bootstrap confidence interval."""
        from pulearn.propensity.bootstrap import (
            bootstrap_propensity_confidence_interval,
        )

        self.fit(y, s_proba=s_proba, X=X)
        interval = bootstrap_propensity_confidence_interval(
            self,
            y,
            s_proba=s_proba,
            X=X,
            n_resamples=n_resamples,
            confidence_level=confidence_level,
            random_state=random_state,
            std_threshold=std_threshold,
            cv_threshold=cv_threshold,
            fold_spread_threshold=fold_spread_threshold,
            warn_on_instability=warn_on_instability,
        )
        self.result_ = replace(self.result_, confidence_interval=interval)
        self.confidence_interval_ = interval
        return self.result_

    def _fit_context(self):
        """Describe the active fit call for validation errors."""
        return "fit {}".format(type(self).__name__)

    def _fit_propensity(self, y, *, s_proba=None, X=None):
        raise NotImplementedError

    def _store_result(self, result):
        """Validate and persist a fitted propensity result."""
        if not isinstance(result, PropensityEstimateResult):
            raise TypeError(
                "_fit_propensity() must return PropensityEstimateResult, got "
                "{}.".format(type(result).__name__)
            )
        if not np.isfinite(result.c) or result.c <= 0 or result.c > 1:
            raise ValueError(
                "Estimated c must lie in (0, 1]. Got {:.6f}.".format(
                    float(result.c)
                )
            )
        self.result_ = result
        self.c_ = float(result.c)
        self.metadata_ = dict(result.metadata)

Common fit/estimate contract for PU propensity estimators.

Ancestors

  • sklearn.base.BaseEstimator
  • sklearn.utils._repr_html.base.ReprHTMLMixin
  • sklearn.utils._repr_html.base._HTMLDocumentationLinkMixin
  • sklearn.utils._metadata_requests._MetadataRequester

Subclasses

Methods

def bootstrap(self,
y,
*,
s_proba=None,
X=None,
n_resamples=200,
confidence_level=0.95,
random_state=None,
std_threshold=0.05,
cv_threshold=0.15,
fold_spread_threshold=0.1,
warn_on_instability=True)
Expand source code Browse git
def bootstrap(
    self,
    y,
    *,
    s_proba=None,
    X=None,
    n_resamples=200,
    confidence_level=0.95,
    random_state=None,
    std_threshold=0.05,
    cv_threshold=0.15,
    fold_spread_threshold=0.1,
    warn_on_instability=True,
):
    """Fit the estimator and attach a bootstrap confidence interval."""
    from pulearn.propensity.bootstrap import (
        bootstrap_propensity_confidence_interval,
    )

    self.fit(y, s_proba=s_proba, X=X)
    interval = bootstrap_propensity_confidence_interval(
        self,
        y,
        s_proba=s_proba,
        X=X,
        n_resamples=n_resamples,
        confidence_level=confidence_level,
        random_state=random_state,
        std_threshold=std_threshold,
        cv_threshold=cv_threshold,
        fold_spread_threshold=fold_spread_threshold,
        warn_on_instability=warn_on_instability,
    )
    self.result_ = replace(self.result_, confidence_interval=interval)
    self.confidence_interval_ = interval
    return self.result_

Fit the estimator and attach a bootstrap confidence interval.

def estimate(self, y=None, *, s_proba=None, X=None)
Expand source code Browse git
def estimate(self, y=None, *, s_proba=None, X=None):
    """Return a propensity estimate.

    Fits first when inputs are given.

    """
    if y is not None:
        return self.fit(y, s_proba=s_proba, X=X).result_
    check_is_fitted(self, "result_")
    return self.result_

Return a propensity estimate.

Fits first when inputs are given.

def fit(self, y, *, s_proba=None, X=None)
Expand source code Browse git
def fit(self, y, *, s_proba=None, X=None):
    """Fit the estimator and store the estimated propensity."""
    y_arr = _normalize_propensity_labels(y, context=self._fit_context())
    result = self._fit_propensity(y_arr, s_proba=s_proba, X=X)
    self._store_result(result)
    return self

Fit the estimator and store the estimated propensity.

def set_fit_request(self: BasePropensityEstimator,
*,
s_proba: bool | str | None = '$UNCHANGED$') ‑> BasePropensityEstimator
Expand source code Browse git
def func(*args, **kw):
    """Updates the `_metadata_request` attribute of the consumer (`instance`)
    for the parameters provided as `**kw`.

    This docstring is overwritten below.
    See REQUESTER_DOC for expected functionality.
    """
    if not _routing_enabled():
        raise RuntimeError(
            "This method is only available when metadata routing is enabled."
            " You can enable it using"
            " sklearn.set_config(enable_metadata_routing=True)."
        )

    if self.validate_keys and (set(kw) - set(self.keys)):
        raise TypeError(
            f"Unexpected args: {set(kw) - set(self.keys)} in {self.name}. "
            f"Accepted arguments are: {set(self.keys)}"
        )

    # This makes it possible to use the decorated method as an unbound method,
    # for instance when monkeypatching.
    # https://github.com/scikit-learn/scikit-learn/issues/28632
    if instance is None:
        _instance = args[0]
        args = args[1:]
    else:
        _instance = instance

    # Replicating python's behavior when positional args are given other than
    # `self`, and `self` is only allowed if this method is unbound.
    if args:
        raise TypeError(
            f"set_{self.name}_request() takes 0 positional argument but"
            f" {len(args)} were given"
        )

    requests = _instance._get_metadata_request()
    method_metadata_request = getattr(requests, self.name)

    for prop, alias in kw.items():
        if alias is not UNCHANGED:
            method_metadata_request.add_request(param=prop, alias=alias)
    _instance._metadata_request = requests

    return _instance

Configure whether metadata should be requested to be passed to the fit method.

Note that this method is only relevant when this estimator is used as a sub-estimator within a :term:meta-estimator and metadata routing is enabled with enable_metadata_routing=True (see :func:sklearn.set_config). Please check the :ref:User Guide <metadata_routing> on how the routing mechanism works.

The options for each parameter are:

  • True: metadata is requested, and passed to fit if provided. The request is ignored if metadata is not provided.

  • False: metadata is not requested and the meta-estimator will not pass it to fit.

  • None: metadata is not requested, and the meta-estimator will raise an error if the user provides it.

  • str: metadata should be passed to the meta-estimator with this given alias instead of the original name.

The default (sklearn.utils.metadata_routing.UNCHANGED) retains the existing request. This allows you to change the request for some parameters and not others.

Added in version: 1.3

Parameters

s_proba : str, True, False, or None, default=sklearn.utils.metadata_routing.UNCHANGED
Metadata routing for s_proba parameter in fit.

Returns

self : object
The updated object.
class CrossValidatedPropensityEstimator (estimator, cv=5, shuffle=True, random_state=None)
Expand source code Browse git
class CrossValidatedPropensityEstimator(BasePropensityEstimator):
    """Estimate c from out-of-fold scores on labeled positives."""

    def __init__(self, estimator, cv=5, shuffle=True, random_state=None):
        """Initialize the cross-validated propensity estimator."""
        self.estimator = estimator
        self.cv = cv
        self.shuffle = shuffle
        self.random_state = random_state

    def _fit_propensity(self, y, *, s_proba=None, X=None):
        if self.estimator is None:
            raise ValueError("estimator is required for CV-based c.")
        if self.cv < 2:
            raise ValueError("cv must be at least 2.")
        if s_proba is not None:
            raise ValueError(
                "CrossValidatedPropensityEstimator does not accept s_proba; "
                "pass X and a probabilistic estimator instead."
            )

        X_arr = _validated_feature_matrix(
            X,
            y,
            context="fit CrossValidatedPropensityEstimator",
        )
        positive_count = int(np.sum(y == 1))
        unlabeled_count = int(np.sum(y == 0))
        if unlabeled_count == 0:
            raise ValueError(
                "CrossValidatedPropensityEstimator requires unlabeled "
                "samples in addition to labeled positives."
            )
        if self.cv > min(positive_count, unlabeled_count):
            raise ValueError(
                "cv must not exceed the number of labeled positives or "
                "unlabeled samples."
            )

        splitter = StratifiedKFold(
            n_splits=self.cv,
            shuffle=self.shuffle,
            random_state=self.random_state if self.shuffle else None,
        )
        scores = np.zeros(y.shape[0], dtype=float)
        fold_estimates = []
        for fold_index, (train_idx, test_idx) in enumerate(
            splitter.split(X_arr, y),
            start=1,
        ):
            estimator = clone(self.estimator)
            estimator.fit(X_arr[train_idx], y[train_idx])
            fold_scores = _positive_class_scores(estimator, X_arr[test_idx])
            scores[test_idx] = fold_scores
            fold_positive = y[test_idx] == 1
            fold_estimates.append(
                _FoldEstimate(
                    fold=fold_index,
                    c=float(np.mean(fold_scores[fold_positive])),
                    n_labeled_positive=int(np.sum(fold_positive)),
                )
            )

        positive_scores = scores[y == 1]
        result = _result_from_scores(
            positive_scores,
            y,
            method="cross_validated_positive",
            metadata={
                "aggregation": "mean",
                "cv": int(self.cv),
                "shuffle": bool(self.shuffle),
                "random_state": self.random_state,
                "estimator": type(self.estimator).__name__,
                "fold_estimates": [
                    fold_estimate.as_dict() for fold_estimate in fold_estimates
                ],
            },
        )
        self.oof_scores_ = scores
        self.fold_estimates_ = tuple(fold_estimates)
        return result

Estimate c from out-of-fold scores on labeled positives.

Initialize the cross-validated propensity estimator.

Ancestors

  • BasePropensityEstimator
  • sklearn.base.BaseEstimator
  • sklearn.utils._repr_html.base.ReprHTMLMixin
  • sklearn.utils._repr_html.base._HTMLDocumentationLinkMixin
  • sklearn.utils._metadata_requests._MetadataRequester

Inherited members

class ExperimentalSarHook (propensity_model)
Expand source code Browse git
class ExperimentalSarHook:
    """Minimal wrapper around a prefit SAR propensity model."""

    def __init__(self, propensity_model):
        """Store a prefit propensity model for experimental SAR weighting."""
        self.propensity_model = propensity_model

    def predict_propensity(self, X):
        """Return validated selection probabilities for `X`."""
        return predict_sar_propensity(self.propensity_model, X)

    def inverse_propensity_weights(
        self,
        X,
        *,
        clip_min=0.05,
        clip_max=1.0,
        normalize=False,
    ):
        """Compute inverse-propensity weights from the wrapped model."""
        _warn_experimental(stacklevel=3)
        with warnings.catch_warnings():
            warnings.filterwarnings(
                "ignore",
                message=_EXPERIMENTAL_MESSAGE,
                category=UserWarning,
            )
            result = compute_inverse_propensity_weights(
                self.predict_propensity(X),
                clip_min=clip_min,
                clip_max=clip_max,
                normalize=normalize,
            )
        return SarWeightResult(
            propensity_scores=result.propensity_scores,
            weights=result.weights,
            clip_min=result.clip_min,
            clip_max=result.clip_max,
            clipped_count=result.clipped_count,
            normalized=result.normalized,
            effective_sample_size=result.effective_sample_size,
            metadata={
                **result.metadata,
                "propensity_model": type(self.propensity_model).__name__,
            },
        )

Minimal wrapper around a prefit SAR propensity model.

Store a prefit propensity model for experimental SAR weighting.

Methods

def inverse_propensity_weights(self, X, *, clip_min=0.05, clip_max=1.0, normalize=False)
Expand source code Browse git
def inverse_propensity_weights(
    self,
    X,
    *,
    clip_min=0.05,
    clip_max=1.0,
    normalize=False,
):
    """Compute inverse-propensity weights from the wrapped model."""
    _warn_experimental(stacklevel=3)
    with warnings.catch_warnings():
        warnings.filterwarnings(
            "ignore",
            message=_EXPERIMENTAL_MESSAGE,
            category=UserWarning,
        )
        result = compute_inverse_propensity_weights(
            self.predict_propensity(X),
            clip_min=clip_min,
            clip_max=clip_max,
            normalize=normalize,
        )
    return SarWeightResult(
        propensity_scores=result.propensity_scores,
        weights=result.weights,
        clip_min=result.clip_min,
        clip_max=result.clip_max,
        clipped_count=result.clipped_count,
        normalized=result.normalized,
        effective_sample_size=result.effective_sample_size,
        metadata={
            **result.metadata,
            "propensity_model": type(self.propensity_model).__name__,
        },
    )

Compute inverse-propensity weights from the wrapped model.

def predict_propensity(self, X)
Expand source code Browse git
def predict_propensity(self, X):
    """Return validated selection probabilities for `X`."""
    return predict_sar_propensity(self.propensity_model, X)

Return validated selection probabilities for X.

class MeanPositivePropensityEstimator
Expand source code Browse git
class MeanPositivePropensityEstimator(BasePropensityEstimator):
    """Estimate c as the mean score among labeled positives."""

    def _fit_propensity(self, y, *, s_proba=None, X=None):
        positive_scores = _positive_propensity_scores(y, s_proba=s_proba)
        return _result_from_scores(
            positive_scores,
            y,
            method="mean_positive",
            metadata={"aggregation": "mean"},
        )

Estimate c as the mean score among labeled positives.

Ancestors

  • BasePropensityEstimator
  • sklearn.base.BaseEstimator
  • sklearn.utils._repr_html.base.ReprHTMLMixin
  • sklearn.utils._repr_html.base._HTMLDocumentationLinkMixin
  • sklearn.utils._metadata_requests._MetadataRequester

Inherited members

class MedianPositivePropensityEstimator
Expand source code Browse git
class MedianPositivePropensityEstimator(BasePropensityEstimator):
    """Estimate c as the median score among labeled positives."""

    def _fit_propensity(self, y, *, s_proba=None, X=None):
        positive_scores = _positive_propensity_scores(y, s_proba=s_proba)
        return _result_from_scalar(
            float(np.median(positive_scores)),
            positive_scores,
            y,
            method="median_positive",
            metadata={"aggregation": "median"},
        )

Estimate c as the median score among labeled positives.

Ancestors

  • BasePropensityEstimator
  • sklearn.base.BaseEstimator
  • sklearn.utils._repr_html.base.ReprHTMLMixin
  • sklearn.utils._repr_html.base._HTMLDocumentationLinkMixin
  • sklearn.utils._metadata_requests._MetadataRequester

Inherited members

class PropensityConfidenceInterval (lower: float,
upper: float,
confidence_level: float,
n_resamples: int,
successful_resamples: int,
random_state: int | None,
mean: float,
std: float,
warning_flags: tuple[str, ...] = <factory>)
Expand source code Browse git
@dataclass(frozen=True)
class PropensityConfidenceInterval:
    """Bootstrap confidence interval for a propensity estimate."""

    lower: float
    upper: float
    confidence_level: float
    n_resamples: int
    successful_resamples: int
    random_state: int | None
    mean: float
    std: float
    warning_flags: tuple[str, ...] = field(default_factory=tuple)

    def as_dict(self):
        """Return a machine-readable interval summary."""
        return {
            "lower": self.lower,
            "upper": self.upper,
            "confidence_level": self.confidence_level,
            "n_resamples": self.n_resamples,
            "successful_resamples": self.successful_resamples,
            "random_state": self.random_state,
            "mean": self.mean,
            "std": self.std,
            "warning_flags": list(self.warning_flags),
        }

Bootstrap confidence interval for a propensity estimate.

Instance variables

var confidence_level : float

The type of the None singleton.

var lower : float

The type of the None singleton.

var mean : float

The type of the None singleton.

var n_resamples : int

The type of the None singleton.

var random_state : int | None

The type of the None singleton.

var std : float

The type of the None singleton.

var successful_resamples : int

The type of the None singleton.

var upper : float

The type of the None singleton.

var warning_flags : tuple[str, ...]

The type of the None singleton.

Methods

def as_dict(self)
Expand source code Browse git
def as_dict(self):
    """Return a machine-readable interval summary."""
    return {
        "lower": self.lower,
        "upper": self.upper,
        "confidence_level": self.confidence_level,
        "n_resamples": self.n_resamples,
        "successful_resamples": self.successful_resamples,
        "random_state": self.random_state,
        "mean": self.mean,
        "std": self.std,
        "warning_flags": list(self.warning_flags),
    }

Return a machine-readable interval summary.

class PropensityEstimateResult (c: float,
method: str,
n_samples: int,
n_labeled_positive: int,
metadata: dict[str, object] = <factory>,
confidence_interval: object | None = None)
Expand source code Browse git
@dataclass(frozen=True)
class PropensityEstimateResult:
    """Container for propensity-estimation outputs."""

    c: float
    method: str
    n_samples: int
    n_labeled_positive: int
    metadata: dict[str, object] = field(default_factory=dict)
    confidence_interval: object | None = None

    def as_dict(self):
        """Return a machine-readable representation of the result."""
        return {
            "c": self.c,
            "method": self.method,
            "n_samples": self.n_samples,
            "n_labeled_positive": self.n_labeled_positive,
            "metadata": dict(self.metadata),
            "confidence_interval": (
                None
                if self.confidence_interval is None
                else self.confidence_interval.as_dict()
            ),
        }

Container for propensity-estimation outputs.

Instance variables

var c : float

The type of the None singleton.

var confidence_interval : object | None

The type of the None singleton.

var metadata : dict[str, object]

The type of the None singleton.

var method : str

The type of the None singleton.

var n_labeled_positive : int

The type of the None singleton.

var n_samples : int

The type of the None singleton.

Methods

def as_dict(self)
Expand source code Browse git
def as_dict(self):
    """Return a machine-readable representation of the result."""
    return {
        "c": self.c,
        "method": self.method,
        "n_samples": self.n_samples,
        "n_labeled_positive": self.n_labeled_positive,
        "metadata": dict(self.metadata),
        "confidence_interval": (
            None
            if self.confidence_interval is None
            else self.confidence_interval.as_dict()
        ),
    }

Return a machine-readable representation of the result.

class QuantilePositivePropensityEstimator (quantile=0.25)
Expand source code Browse git
class QuantilePositivePropensityEstimator(BasePropensityEstimator):
    """Estimate c with a configurable quantile of positive scores."""

    def __init__(self, quantile=0.25):
        """Initialize the quantile-based estimator."""
        self.quantile = quantile

    def _fit_propensity(self, y, *, s_proba=None, X=None):
        if self.quantile <= 0 or self.quantile > 1:
            raise ValueError("quantile must lie in (0, 1].")
        positive_scores = _positive_propensity_scores(y, s_proba=s_proba)
        c_hat = float(np.quantile(positive_scores, self.quantile))
        return _result_from_scalar(
            c_hat,
            positive_scores,
            y,
            method="quantile_positive",
            metadata={
                "aggregation": "quantile",
                "quantile": float(self.quantile),
            },
        )

Estimate c with a configurable quantile of positive scores.

Initialize the quantile-based estimator.

Ancestors

  • BasePropensityEstimator
  • sklearn.base.BaseEstimator
  • sklearn.utils._repr_html.base.ReprHTMLMixin
  • sklearn.utils._repr_html.base._HTMLDocumentationLinkMixin
  • sklearn.utils._metadata_requests._MetadataRequester

Inherited members

class SarWeightResult (propensity_scores: np.ndarray,
weights: np.ndarray,
clip_min: float,
clip_max: float,
clipped_count: int,
normalized: bool,
effective_sample_size: float,
metadata: dict[str, object] = <factory>)
Expand source code Browse git
@dataclass(frozen=True)
class SarWeightResult:
    """Inverse-propensity weights derived from experimental SAR hooks."""

    propensity_scores: np.ndarray
    weights: np.ndarray
    clip_min: float
    clip_max: float
    clipped_count: int
    normalized: bool
    effective_sample_size: float
    metadata: dict[str, object] = field(default_factory=dict)

    def as_dict(self):
        """Return a machine-readable summary of the weight computation."""
        return {
            "clip_min": self.clip_min,
            "clip_max": self.clip_max,
            "clipped_count": self.clipped_count,
            "normalized": self.normalized,
            "effective_sample_size": self.effective_sample_size,
            "metadata": dict(self.metadata),
        }

Inverse-propensity weights derived from experimental SAR hooks.

Instance variables

var clip_max : float

The type of the None singleton.

var clip_min : float

The type of the None singleton.

var clipped_count : int

The type of the None singleton.

var effective_sample_size : float

The type of the None singleton.

var metadata : dict[str, object]

The type of the None singleton.

var normalized : bool

The type of the None singleton.

var propensity_scores : numpy.ndarray

The type of the None singleton.

var weights : numpy.ndarray

The type of the None singleton.

Methods

def as_dict(self)
Expand source code Browse git
def as_dict(self):
    """Return a machine-readable summary of the weight computation."""
    return {
        "clip_min": self.clip_min,
        "clip_max": self.clip_max,
        "clipped_count": self.clipped_count,
        "normalized": self.normalized,
        "effective_sample_size": self.effective_sample_size,
        "metadata": dict(self.metadata),
    }

Return a machine-readable summary of the weight computation.

class ScarSanityCheckResult (candidate_threshold: float,
n_labeled_positive: int,
n_candidate_unlabeled: int,
candidate_fraction_unlabeled: float,
mean_positive_score: float,
mean_candidate_score: float,
score_ks_statistic: float,
mean_abs_smd: float | None,
max_abs_smd: float | None,
shifted_feature_fraction: float | None,
group_membership_auc: float | None,
warnings: tuple[str, ...],
metadata: dict[str, object] = <factory>)
Expand source code Browse git
@dataclass(frozen=True)
class ScarSanityCheckResult:
    """Summary statistics for a SCAR sanity-check run."""

    candidate_threshold: float
    n_labeled_positive: int
    n_candidate_unlabeled: int
    candidate_fraction_unlabeled: float
    mean_positive_score: float
    mean_candidate_score: float
    score_ks_statistic: float
    mean_abs_smd: float | None
    max_abs_smd: float | None
    shifted_feature_fraction: float | None
    group_membership_auc: float | None
    warnings: tuple[str, ...]
    metadata: dict[str, object] = field(default_factory=dict)

    @property
    def violates_scar(self):
        """Return whether drift-related warnings indicate SCAR mismatch."""
        return any(
            warning_flag in _SCAR_VIOLATION_FLAGS
            for warning_flag in self.warnings
        )

    def as_dict(self):
        """Return a machine-readable representation of the result."""
        return {
            "candidate_threshold": self.candidate_threshold,
            "n_labeled_positive": self.n_labeled_positive,
            "n_candidate_unlabeled": self.n_candidate_unlabeled,
            "candidate_fraction_unlabeled": self.candidate_fraction_unlabeled,
            "mean_positive_score": self.mean_positive_score,
            "mean_candidate_score": self.mean_candidate_score,
            "score_ks_statistic": self.score_ks_statistic,
            "mean_abs_smd": self.mean_abs_smd,
            "max_abs_smd": self.max_abs_smd,
            "shifted_feature_fraction": self.shifted_feature_fraction,
            "group_membership_auc": self.group_membership_auc,
            "warnings": list(self.warnings),
            "metadata": dict(self.metadata),
        }

Summary statistics for a SCAR sanity-check run.

Instance variables

var candidate_fraction_unlabeled : float

The type of the None singleton.

var candidate_threshold : float

The type of the None singleton.

var group_membership_auc : float | None

The type of the None singleton.

var max_abs_smd : float | None

The type of the None singleton.

var mean_abs_smd : float | None

The type of the None singleton.

var mean_candidate_score : float

The type of the None singleton.

var mean_positive_score : float

The type of the None singleton.

var metadata : dict[str, object]

The type of the None singleton.

var n_candidate_unlabeled : int

The type of the None singleton.

var n_labeled_positive : int

The type of the None singleton.

var score_ks_statistic : float

The type of the None singleton.

var shifted_feature_fraction : float | None

The type of the None singleton.

prop violates_scar
Expand source code Browse git
@property
def violates_scar(self):
    """Return whether drift-related warnings indicate SCAR mismatch."""
    return any(
        warning_flag in _SCAR_VIOLATION_FLAGS
        for warning_flag in self.warnings
    )

Return whether drift-related warnings indicate SCAR mismatch.

var warnings : tuple[str, ...]

The type of the None singleton.

Methods

def as_dict(self)
Expand source code Browse git
def as_dict(self):
    """Return a machine-readable representation of the result."""
    return {
        "candidate_threshold": self.candidate_threshold,
        "n_labeled_positive": self.n_labeled_positive,
        "n_candidate_unlabeled": self.n_candidate_unlabeled,
        "candidate_fraction_unlabeled": self.candidate_fraction_unlabeled,
        "mean_positive_score": self.mean_positive_score,
        "mean_candidate_score": self.mean_candidate_score,
        "score_ks_statistic": self.score_ks_statistic,
        "mean_abs_smd": self.mean_abs_smd,
        "max_abs_smd": self.max_abs_smd,
        "shifted_feature_fraction": self.shifted_feature_fraction,
        "group_membership_auc": self.group_membership_auc,
        "warnings": list(self.warnings),
        "metadata": dict(self.metadata),
    }

Return a machine-readable representation of the result.

class TrimmedMeanPropensityEstimator (trim_fraction=0.1)
Expand source code Browse git
class TrimmedMeanPropensityEstimator(BasePropensityEstimator):
    """Estimate c with a trimmed mean over labeled-positive scores."""

    def __init__(self, trim_fraction=0.1):
        """Initialize the trimmed-mean estimator."""
        self.trim_fraction = trim_fraction

    def _fit_propensity(self, y, *, s_proba=None, X=None):
        if self.trim_fraction < 0 or self.trim_fraction >= 0.5:
            raise ValueError("trim_fraction must lie in [0, 0.5).")
        positive_scores = np.sort(
            _positive_propensity_scores(y, s_proba=s_proba)
        )
        n_positive = positive_scores.shape[0]
        trim_count = int(np.floor(n_positive * self.trim_fraction))
        if trim_count:
            trimmed_scores = positive_scores[trim_count:-trim_count]
        else:
            trimmed_scores = positive_scores
        return _result_from_scores(
            trimmed_scores,
            y,
            method="trimmed_mean_positive",
            metadata={
                "aggregation": "trimmed_mean",
                "trim_fraction": float(self.trim_fraction),
                "trim_count_per_side": int(trim_count),
            },
        )

Estimate c with a trimmed mean over labeled-positive scores.

Initialize the trimmed-mean estimator.

Ancestors

  • BasePropensityEstimator
  • sklearn.base.BaseEstimator
  • sklearn.utils._repr_html.base.ReprHTMLMixin
  • sklearn.utils._repr_html.base._HTMLDocumentationLinkMixin
  • sklearn.utils._metadata_requests._MetadataRequester

Inherited members