Module pulearn.propensity
Propensity-estimation utilities for positive-unlabeled learning.
Sub-modules
pulearn.propensity.base-
Shared utilities and base classes for PU propensity estimators.
pulearn.propensity.bootstrap-
Bootstrap confidence intervals and instability warnings for c estimators.
pulearn.propensity.diagnostics-
SCAR sanity-check helpers for propensity estimation workflows.
pulearn.propensity.estimators-
Robust propensity estimators for SCAR-style PU workflows.
pulearn.propensity.sar-
Experimental SAR propensity hooks and inverse-propensity weights.
Functions
def bootstrap_propensity_confidence_interval(estimator,
y,
*,
s_proba=None,
X=None,
n_resamples=200,
confidence_level=0.95,
random_state=None,
std_threshold=0.05,
cv_threshold=0.15,
fold_spread_threshold=0.1,
warn_on_instability=True)-
Expand source code Browse git
def bootstrap_propensity_confidence_interval( estimator, y, *, s_proba=None, X=None, n_resamples=200, confidence_level=0.95, random_state=None, std_threshold=0.05, cv_threshold=0.15, fold_spread_threshold=0.1, warn_on_instability=True, ): """Estimate a propensity confidence interval with stratified bootstrap.""" _validate_bootstrap_estimator(estimator) if n_resamples < 2: raise ValueError("n_resamples must be at least 2.") if not 0 < confidence_level < 1: raise ValueError("confidence_level must lie strictly in (0, 1).") if std_threshold < 0: raise ValueError("std_threshold must be non-negative.") if cv_threshold < 0: raise ValueError("cv_threshold must be non-negative.") if fold_spread_threshold < 0: raise ValueError("fold_spread_threshold must be non-negative.") if n_resamples < 30: warnings.warn( ( "Bootstrap intervals with fewer than 30 resamples can be " "unstable." ), UserWarning, stacklevel=2, ) labels = _normalize_propensity_labels( y, context="bootstrap {}".format(type(estimator).__name__), ) if X is None and s_proba is None: raise ValueError("Bootstrap requires X or s_proba inputs.") X_arr = None if X is not None: X_arr = _validated_feature_matrix( X, labels, context="bootstrap {}".format(type(estimator).__name__), ) s_proba_arr = None if s_proba is not None: s_proba_arr = _propensity_score_array(s_proba, y=labels) rng = check_random_state(random_state) bootstrap_estimates = [] failures = 0 for _ in range(n_resamples): sample_indices = _stratified_bootstrap_indices(labels, rng) bootstrap_estimator = clone(estimator) _seed_estimator_random_state( bootstrap_estimator, int(rng.randint(np.iinfo(np.int32).max)), ) try: fitted = bootstrap_estimator.fit( labels[sample_indices], s_proba=( None if s_proba_arr is None else s_proba_arr[sample_indices] ), X=None if X_arr is None else X_arr[sample_indices], ) except ValueError: failures += 1 continue result = getattr(fitted, "result_", None) if result is None: raise TypeError( "Bootstrap estimator {} must set result_ after fit().".format( type(bootstrap_estimator).__name__ ) ) bootstrap_estimates.append( _validate_bootstrap_result(result, bootstrap_estimator) ) if not bootstrap_estimates: raise ValueError( "Bootstrap failed for every resample; could not estimate a " "confidence interval." ) if failures: warnings.warn( ( "Skipped {} bootstrap resamples that failed to fit cleanly." ).format(failures), UserWarning, stacklevel=2, ) estimates = np.asarray(bootstrap_estimates, dtype=float) warning_flags = list( _stability_warning_flags( estimates, estimator=estimator, failures=failures, n_resamples=n_resamples, std_threshold=std_threshold, cv_threshold=cv_threshold, fold_spread_threshold=fold_spread_threshold, ) ) if warn_on_instability and warning_flags: warnings.warn( ("Propensity bootstrap for {} indicates instability: {}.").format( type(estimator).__name__, ", ".join(warning_flags) ), UserWarning, stacklevel=2, ) alpha = 0.5 * (1.0 - confidence_level) lower, upper = np.quantile(estimates, [alpha, 1.0 - alpha]) return PropensityConfidenceInterval( lower=float(lower), upper=float(upper), confidence_level=float(confidence_level), n_resamples=int(n_resamples), successful_resamples=int(estimates.shape[0]), random_state=_serialize_random_state(random_state), mean=float(np.mean(estimates)), std=float(np.std(estimates, ddof=0)), warning_flags=tuple(warning_flags), )Estimate a propensity confidence interval with stratified bootstrap.
def compute_inverse_propensity_weights(propensity_scores, *, clip_min=0.05, clip_max=1.0, normalize=False)-
Expand source code Browse git
def compute_inverse_propensity_weights( propensity_scores, *, clip_min=0.05, clip_max=1.0, normalize=False, ): """Compute inverse-propensity weights with clipping and validation.""" _warn_experimental(stacklevel=3) if clip_min <= 0 or clip_min > 1: raise ValueError("clip_min must lie in (0, 1].") if clip_max <= 0 or clip_max > 1: raise ValueError("clip_max must lie in (0, 1].") if clip_min > clip_max: raise ValueError("clip_min must not exceed clip_max.") scores = _validated_propensity_scores(propensity_scores) clipped_scores = np.clip(scores, clip_min, clip_max) weights = 1.0 / clipped_scores if normalize: weights = weights / np.mean(weights) return SarWeightResult( propensity_scores=scores, weights=weights.astype(float, copy=False), clip_min=float(clip_min), clip_max=float(clip_max), clipped_count=int(np.sum(clipped_scores != scores)), normalized=bool(normalize), effective_sample_size=_effective_sample_size(weights), metadata={ "min_propensity": float(np.min(scores)), "max_propensity": float(np.max(scores)), "mean_weight": float(np.mean(weights)), "max_weight": float(np.max(weights)), }, )Compute inverse-propensity weights with clipping and validation.
def predict_sar_propensity(propensity_model, X)-
Expand source code Browse git
def predict_sar_propensity(propensity_model, X): """Return validated propensity scores from a model object or callable.""" _warn_experimental(stacklevel=3) X_arr = _validated_model_matrix(X) if callable(propensity_model): scores = _validated_propensity_scores( propensity_model(X_arr), n_samples=X_arr.shape[0], ) elif hasattr(propensity_model, "predict_proba"): scores = _positive_class_scores(propensity_model, X_arr) else: raise TypeError( "propensity_model must be callable or implement predict_proba()." ) return scoresReturn validated propensity scores from a model object or callable.
def scar_sanity_check(y,
*,
s_proba,
X=None,
candidate_quantile=0.9,
cv=5,
random_state=None,
score_ks_threshold=0.3,
mean_smd_threshold=0.2,
max_smd_threshold=0.35,
auc_threshold=0.7,
min_candidate_samples=20,
warn_on_violation=True,
group_estimator=None)-
Expand source code Browse git
def scar_sanity_check( y, *, s_proba, X=None, candidate_quantile=0.9, cv=5, random_state=None, score_ks_threshold=0.3, mean_smd_threshold=0.2, max_smd_threshold=0.35, auc_threshold=0.7, min_candidate_samples=20, warn_on_violation=True, group_estimator=None, ): """Compare labeled positives to high-scoring unlabeled candidates.""" if candidate_quantile <= 0 or candidate_quantile >= 1: raise ValueError("candidate_quantile must lie strictly in (0, 1).") if cv < 2: raise ValueError("cv must be at least 2.") if score_ks_threshold < 0: raise ValueError("score_ks_threshold must be non-negative.") if mean_smd_threshold < 0: raise ValueError("mean_smd_threshold must be non-negative.") if max_smd_threshold < 0: raise ValueError("max_smd_threshold must be non-negative.") if not 0 < auc_threshold <= 1: raise ValueError("auc_threshold must lie in (0, 1].") if min_candidate_samples < 1: raise ValueError("min_candidate_samples must be at least 1.") labels = _normalize_propensity_labels(y, context="scar_sanity_check") scores = _propensity_score_array(s_proba, y=labels) positive_mask = labels == 1 unlabeled_mask = labels == 0 unlabeled_scores = scores[unlabeled_mask] if unlabeled_scores.size == 0: raise ValueError( "scar_sanity_check requires unlabeled samples in y_pu." ) candidate_threshold = float( np.quantile(unlabeled_scores, candidate_quantile) ) candidate_mask = unlabeled_mask & (scores >= candidate_threshold) reference_positive_mask = positive_mask & (scores >= candidate_threshold) if not np.any(reference_positive_mask): reference_positive_mask = positive_mask positive_scores = scores[reference_positive_mask] candidate_scores = scores[candidate_mask] warning_flags = [] if candidate_scores.shape[0] < min_candidate_samples: warning_flags.append("small_candidate_pool") score_ks = _ks_statistic(positive_scores, candidate_scores) if score_ks >= score_ks_threshold: warning_flags.append("score_shift") mean_abs_smd = None max_abs_smd = None shifted_fraction = None group_auc = None metadata = {"candidate_quantile": float(candidate_quantile)} if X is not None: X_arr = _validated_feature_matrix( X, labels, context="scar_sanity_check", ) if X_arr.shape[1] == 0: warning_flags.append("empty_feature_matrix") else: reference_positive_X = X_arr[reference_positive_mask] candidate_X = X_arr[candidate_mask] smd = _standardized_mean_differences( reference_positive_X, candidate_X, ) abs_smd = np.abs(smd) mean_abs_smd = float(np.mean(abs_smd)) max_abs_smd = float(np.max(abs_smd)) shifted_fraction = float(np.mean(abs_smd >= mean_smd_threshold)) metadata["top_shifted_features"] = _top_shifted_feature_indices( abs_smd ) if mean_abs_smd >= mean_smd_threshold: warning_flags.append("high_mean_shift") if max_abs_smd >= max_smd_threshold: warning_flags.append("max_feature_shift") group_auc = _group_membership_auc( reference_positive_X, candidate_X, cv=cv, random_state=random_state, estimator=group_estimator, ) if group_auc is None: warning_flags.append("insufficient_group_samples") elif group_auc >= auc_threshold: warning_flags.append("group_separable") result = ScarSanityCheckResult( candidate_threshold=candidate_threshold, n_labeled_positive=int(np.sum(positive_mask)), n_candidate_unlabeled=int(np.sum(candidate_mask)), candidate_fraction_unlabeled=float( np.mean(scores[unlabeled_mask] >= candidate_threshold) ), mean_positive_score=float(np.mean(positive_scores)), mean_candidate_score=float(np.mean(candidate_scores)), score_ks_statistic=float(score_ks), mean_abs_smd=mean_abs_smd, max_abs_smd=max_abs_smd, shifted_feature_fraction=shifted_fraction, group_membership_auc=group_auc, warnings=tuple(dict.fromkeys(warning_flags)), metadata={ **metadata, "n_reference_positive": int(np.sum(reference_positive_mask)), }, ) if warn_on_violation and result.violates_scar: warnings.warn( "SCAR sanity check indicates likely assumption drift: {}.".format( ", ".join(result.warnings) ), UserWarning, stacklevel=2, ) return resultCompare labeled positives to high-scoring unlabeled candidates.
Classes
class BasePropensityEstimator-
Expand source code Browse git
class BasePropensityEstimator(BaseEstimator): """Common fit/estimate contract for PU propensity estimators.""" def fit(self, y, *, s_proba=None, X=None): """Fit the estimator and store the estimated propensity.""" y_arr = _normalize_propensity_labels(y, context=self._fit_context()) result = self._fit_propensity(y_arr, s_proba=s_proba, X=X) self._store_result(result) return self def estimate(self, y=None, *, s_proba=None, X=None): """Return a propensity estimate. Fits first when inputs are given. """ if y is not None: return self.fit(y, s_proba=s_proba, X=X).result_ check_is_fitted(self, "result_") return self.result_ def bootstrap( self, y, *, s_proba=None, X=None, n_resamples=200, confidence_level=0.95, random_state=None, std_threshold=0.05, cv_threshold=0.15, fold_spread_threshold=0.1, warn_on_instability=True, ): """Fit the estimator and attach a bootstrap confidence interval.""" from pulearn.propensity.bootstrap import ( bootstrap_propensity_confidence_interval, ) self.fit(y, s_proba=s_proba, X=X) interval = bootstrap_propensity_confidence_interval( self, y, s_proba=s_proba, X=X, n_resamples=n_resamples, confidence_level=confidence_level, random_state=random_state, std_threshold=std_threshold, cv_threshold=cv_threshold, fold_spread_threshold=fold_spread_threshold, warn_on_instability=warn_on_instability, ) self.result_ = replace(self.result_, confidence_interval=interval) self.confidence_interval_ = interval return self.result_ def _fit_context(self): """Describe the active fit call for validation errors.""" return "fit {}".format(type(self).__name__) def _fit_propensity(self, y, *, s_proba=None, X=None): raise NotImplementedError def _store_result(self, result): """Validate and persist a fitted propensity result.""" if not isinstance(result, PropensityEstimateResult): raise TypeError( "_fit_propensity() must return PropensityEstimateResult, got " "{}.".format(type(result).__name__) ) if not np.isfinite(result.c) or result.c <= 0 or result.c > 1: raise ValueError( "Estimated c must lie in (0, 1]. Got {:.6f}.".format( float(result.c) ) ) self.result_ = result self.c_ = float(result.c) self.metadata_ = dict(result.metadata)Common fit/estimate contract for PU propensity estimators.
Ancestors
- sklearn.base.BaseEstimator
- sklearn.utils._repr_html.base.ReprHTMLMixin
- sklearn.utils._repr_html.base._HTMLDocumentationLinkMixin
- sklearn.utils._metadata_requests._MetadataRequester
Subclasses
- CrossValidatedPropensityEstimator
- MeanPositivePropensityEstimator
- MedianPositivePropensityEstimator
- QuantilePositivePropensityEstimator
- TrimmedMeanPropensityEstimator
Methods
def bootstrap(self,
y,
*,
s_proba=None,
X=None,
n_resamples=200,
confidence_level=0.95,
random_state=None,
std_threshold=0.05,
cv_threshold=0.15,
fold_spread_threshold=0.1,
warn_on_instability=True)-
Expand source code Browse git
def bootstrap( self, y, *, s_proba=None, X=None, n_resamples=200, confidence_level=0.95, random_state=None, std_threshold=0.05, cv_threshold=0.15, fold_spread_threshold=0.1, warn_on_instability=True, ): """Fit the estimator and attach a bootstrap confidence interval.""" from pulearn.propensity.bootstrap import ( bootstrap_propensity_confidence_interval, ) self.fit(y, s_proba=s_proba, X=X) interval = bootstrap_propensity_confidence_interval( self, y, s_proba=s_proba, X=X, n_resamples=n_resamples, confidence_level=confidence_level, random_state=random_state, std_threshold=std_threshold, cv_threshold=cv_threshold, fold_spread_threshold=fold_spread_threshold, warn_on_instability=warn_on_instability, ) self.result_ = replace(self.result_, confidence_interval=interval) self.confidence_interval_ = interval return self.result_Fit the estimator and attach a bootstrap confidence interval.
def estimate(self, y=None, *, s_proba=None, X=None)-
Expand source code Browse git
def estimate(self, y=None, *, s_proba=None, X=None): """Return a propensity estimate. Fits first when inputs are given. """ if y is not None: return self.fit(y, s_proba=s_proba, X=X).result_ check_is_fitted(self, "result_") return self.result_Return a propensity estimate.
Fits first when inputs are given.
def fit(self, y, *, s_proba=None, X=None)-
Expand source code Browse git
def fit(self, y, *, s_proba=None, X=None): """Fit the estimator and store the estimated propensity.""" y_arr = _normalize_propensity_labels(y, context=self._fit_context()) result = self._fit_propensity(y_arr, s_proba=s_proba, X=X) self._store_result(result) return selfFit the estimator and store the estimated propensity.
def set_fit_request(self: BasePropensityEstimator,
*,
s_proba: bool | str | None = '$UNCHANGED$') ‑> BasePropensityEstimator-
Expand source code Browse git
def func(*args, **kw): """Updates the `_metadata_request` attribute of the consumer (`instance`) for the parameters provided as `**kw`. This docstring is overwritten below. See REQUESTER_DOC for expected functionality. """ if not _routing_enabled(): raise RuntimeError( "This method is only available when metadata routing is enabled." " You can enable it using" " sklearn.set_config(enable_metadata_routing=True)." ) if self.validate_keys and (set(kw) - set(self.keys)): raise TypeError( f"Unexpected args: {set(kw) - set(self.keys)} in {self.name}. " f"Accepted arguments are: {set(self.keys)}" ) # This makes it possible to use the decorated method as an unbound method, # for instance when monkeypatching. # https://github.com/scikit-learn/scikit-learn/issues/28632 if instance is None: _instance = args[0] args = args[1:] else: _instance = instance # Replicating python's behavior when positional args are given other than # `self`, and `self` is only allowed if this method is unbound. if args: raise TypeError( f"set_{self.name}_request() takes 0 positional argument but" f" {len(args)} were given" ) requests = _instance._get_metadata_request() method_metadata_request = getattr(requests, self.name) for prop, alias in kw.items(): if alias is not UNCHANGED: method_metadata_request.add_request(param=prop, alias=alias) _instance._metadata_request = requests return _instanceConfigure whether metadata should be requested to be passed to the
fitmethod.Note that this method is only relevant when this estimator is used as a sub-estimator within a :term:
meta-estimatorand metadata routing is enabled withenable_metadata_routing=True(see :func:sklearn.set_config). Please check the :ref:User Guide <metadata_routing>on how the routing mechanism works.The options for each parameter are:
-
True: metadata is requested, and passed tofitif provided. The request is ignored if metadata is not provided. -
False: metadata is not requested and the meta-estimator will not pass it tofit. -
None: metadata is not requested, and the meta-estimator will raise an error if the user provides it. -
str: metadata should be passed to the meta-estimator with this given alias instead of the original name.
The default (
sklearn.utils.metadata_routing.UNCHANGED) retains the existing request. This allows you to change the request for some parameters and not others.Added in version: 1.3
Parameters
s_proba:str, True, False,orNone, default=sklearn.utils.metadata_routing.UNCHANGED- Metadata routing for
s_probaparameter infit.
Returns
self:object- The updated object.
-
class CrossValidatedPropensityEstimator (estimator, cv=5, shuffle=True, random_state=None)-
Expand source code Browse git
class CrossValidatedPropensityEstimator(BasePropensityEstimator): """Estimate c from out-of-fold scores on labeled positives.""" def __init__(self, estimator, cv=5, shuffle=True, random_state=None): """Initialize the cross-validated propensity estimator.""" self.estimator = estimator self.cv = cv self.shuffle = shuffle self.random_state = random_state def _fit_propensity(self, y, *, s_proba=None, X=None): if self.estimator is None: raise ValueError("estimator is required for CV-based c.") if self.cv < 2: raise ValueError("cv must be at least 2.") if s_proba is not None: raise ValueError( "CrossValidatedPropensityEstimator does not accept s_proba; " "pass X and a probabilistic estimator instead." ) X_arr = _validated_feature_matrix( X, y, context="fit CrossValidatedPropensityEstimator", ) positive_count = int(np.sum(y == 1)) unlabeled_count = int(np.sum(y == 0)) if unlabeled_count == 0: raise ValueError( "CrossValidatedPropensityEstimator requires unlabeled " "samples in addition to labeled positives." ) if self.cv > min(positive_count, unlabeled_count): raise ValueError( "cv must not exceed the number of labeled positives or " "unlabeled samples." ) splitter = StratifiedKFold( n_splits=self.cv, shuffle=self.shuffle, random_state=self.random_state if self.shuffle else None, ) scores = np.zeros(y.shape[0], dtype=float) fold_estimates = [] for fold_index, (train_idx, test_idx) in enumerate( splitter.split(X_arr, y), start=1, ): estimator = clone(self.estimator) estimator.fit(X_arr[train_idx], y[train_idx]) fold_scores = _positive_class_scores(estimator, X_arr[test_idx]) scores[test_idx] = fold_scores fold_positive = y[test_idx] == 1 fold_estimates.append( _FoldEstimate( fold=fold_index, c=float(np.mean(fold_scores[fold_positive])), n_labeled_positive=int(np.sum(fold_positive)), ) ) positive_scores = scores[y == 1] result = _result_from_scores( positive_scores, y, method="cross_validated_positive", metadata={ "aggregation": "mean", "cv": int(self.cv), "shuffle": bool(self.shuffle), "random_state": self.random_state, "estimator": type(self.estimator).__name__, "fold_estimates": [ fold_estimate.as_dict() for fold_estimate in fold_estimates ], }, ) self.oof_scores_ = scores self.fold_estimates_ = tuple(fold_estimates) return resultEstimate c from out-of-fold scores on labeled positives.
Initialize the cross-validated propensity estimator.
Ancestors
- BasePropensityEstimator
- sklearn.base.BaseEstimator
- sklearn.utils._repr_html.base.ReprHTMLMixin
- sklearn.utils._repr_html.base._HTMLDocumentationLinkMixin
- sklearn.utils._metadata_requests._MetadataRequester
Inherited members
class ExperimentalSarHook (propensity_model)-
Expand source code Browse git
class ExperimentalSarHook: """Minimal wrapper around a prefit SAR propensity model.""" def __init__(self, propensity_model): """Store a prefit propensity model for experimental SAR weighting.""" self.propensity_model = propensity_model def predict_propensity(self, X): """Return validated selection probabilities for `X`.""" return predict_sar_propensity(self.propensity_model, X) def inverse_propensity_weights( self, X, *, clip_min=0.05, clip_max=1.0, normalize=False, ): """Compute inverse-propensity weights from the wrapped model.""" _warn_experimental(stacklevel=3) with warnings.catch_warnings(): warnings.filterwarnings( "ignore", message=_EXPERIMENTAL_MESSAGE, category=UserWarning, ) result = compute_inverse_propensity_weights( self.predict_propensity(X), clip_min=clip_min, clip_max=clip_max, normalize=normalize, ) return SarWeightResult( propensity_scores=result.propensity_scores, weights=result.weights, clip_min=result.clip_min, clip_max=result.clip_max, clipped_count=result.clipped_count, normalized=result.normalized, effective_sample_size=result.effective_sample_size, metadata={ **result.metadata, "propensity_model": type(self.propensity_model).__name__, }, )Minimal wrapper around a prefit SAR propensity model.
Store a prefit propensity model for experimental SAR weighting.
Methods
def inverse_propensity_weights(self, X, *, clip_min=0.05, clip_max=1.0, normalize=False)-
Expand source code Browse git
def inverse_propensity_weights( self, X, *, clip_min=0.05, clip_max=1.0, normalize=False, ): """Compute inverse-propensity weights from the wrapped model.""" _warn_experimental(stacklevel=3) with warnings.catch_warnings(): warnings.filterwarnings( "ignore", message=_EXPERIMENTAL_MESSAGE, category=UserWarning, ) result = compute_inverse_propensity_weights( self.predict_propensity(X), clip_min=clip_min, clip_max=clip_max, normalize=normalize, ) return SarWeightResult( propensity_scores=result.propensity_scores, weights=result.weights, clip_min=result.clip_min, clip_max=result.clip_max, clipped_count=result.clipped_count, normalized=result.normalized, effective_sample_size=result.effective_sample_size, metadata={ **result.metadata, "propensity_model": type(self.propensity_model).__name__, }, )Compute inverse-propensity weights from the wrapped model.
def predict_propensity(self, X)-
Expand source code Browse git
def predict_propensity(self, X): """Return validated selection probabilities for `X`.""" return predict_sar_propensity(self.propensity_model, X)Return validated selection probabilities for
X.
class MeanPositivePropensityEstimator-
Expand source code Browse git
class MeanPositivePropensityEstimator(BasePropensityEstimator): """Estimate c as the mean score among labeled positives.""" def _fit_propensity(self, y, *, s_proba=None, X=None): positive_scores = _positive_propensity_scores(y, s_proba=s_proba) return _result_from_scores( positive_scores, y, method="mean_positive", metadata={"aggregation": "mean"}, )Estimate c as the mean score among labeled positives.
Ancestors
- BasePropensityEstimator
- sklearn.base.BaseEstimator
- sklearn.utils._repr_html.base.ReprHTMLMixin
- sklearn.utils._repr_html.base._HTMLDocumentationLinkMixin
- sklearn.utils._metadata_requests._MetadataRequester
Inherited members
class MedianPositivePropensityEstimator-
Expand source code Browse git
class MedianPositivePropensityEstimator(BasePropensityEstimator): """Estimate c as the median score among labeled positives.""" def _fit_propensity(self, y, *, s_proba=None, X=None): positive_scores = _positive_propensity_scores(y, s_proba=s_proba) return _result_from_scalar( float(np.median(positive_scores)), positive_scores, y, method="median_positive", metadata={"aggregation": "median"}, )Estimate c as the median score among labeled positives.
Ancestors
- BasePropensityEstimator
- sklearn.base.BaseEstimator
- sklearn.utils._repr_html.base.ReprHTMLMixin
- sklearn.utils._repr_html.base._HTMLDocumentationLinkMixin
- sklearn.utils._metadata_requests._MetadataRequester
Inherited members
class PropensityConfidenceInterval (lower: float,
upper: float,
confidence_level: float,
n_resamples: int,
successful_resamples: int,
random_state: int | None,
mean: float,
std: float,
warning_flags: tuple[str, ...] = <factory>)-
Expand source code Browse git
@dataclass(frozen=True) class PropensityConfidenceInterval: """Bootstrap confidence interval for a propensity estimate.""" lower: float upper: float confidence_level: float n_resamples: int successful_resamples: int random_state: int | None mean: float std: float warning_flags: tuple[str, ...] = field(default_factory=tuple) def as_dict(self): """Return a machine-readable interval summary.""" return { "lower": self.lower, "upper": self.upper, "confidence_level": self.confidence_level, "n_resamples": self.n_resamples, "successful_resamples": self.successful_resamples, "random_state": self.random_state, "mean": self.mean, "std": self.std, "warning_flags": list(self.warning_flags), }Bootstrap confidence interval for a propensity estimate.
Instance variables
var confidence_level : float-
The type of the None singleton.
var lower : float-
The type of the None singleton.
var mean : float-
The type of the None singleton.
var n_resamples : int-
The type of the None singleton.
var random_state : int | None-
The type of the None singleton.
var std : float-
The type of the None singleton.
var successful_resamples : int-
The type of the None singleton.
var upper : float-
The type of the None singleton.
var warning_flags : tuple[str, ...]-
The type of the None singleton.
Methods
def as_dict(self)-
Expand source code Browse git
def as_dict(self): """Return a machine-readable interval summary.""" return { "lower": self.lower, "upper": self.upper, "confidence_level": self.confidence_level, "n_resamples": self.n_resamples, "successful_resamples": self.successful_resamples, "random_state": self.random_state, "mean": self.mean, "std": self.std, "warning_flags": list(self.warning_flags), }Return a machine-readable interval summary.
class PropensityEstimateResult (c: float,
method: str,
n_samples: int,
n_labeled_positive: int,
metadata: dict[str, object] = <factory>,
confidence_interval: object | None = None)-
Expand source code Browse git
@dataclass(frozen=True) class PropensityEstimateResult: """Container for propensity-estimation outputs.""" c: float method: str n_samples: int n_labeled_positive: int metadata: dict[str, object] = field(default_factory=dict) confidence_interval: object | None = None def as_dict(self): """Return a machine-readable representation of the result.""" return { "c": self.c, "method": self.method, "n_samples": self.n_samples, "n_labeled_positive": self.n_labeled_positive, "metadata": dict(self.metadata), "confidence_interval": ( None if self.confidence_interval is None else self.confidence_interval.as_dict() ), }Container for propensity-estimation outputs.
Instance variables
var c : float-
The type of the None singleton.
var confidence_interval : object | None-
The type of the None singleton.
var metadata : dict[str, object]-
The type of the None singleton.
var method : str-
The type of the None singleton.
var n_labeled_positive : int-
The type of the None singleton.
var n_samples : int-
The type of the None singleton.
Methods
def as_dict(self)-
Expand source code Browse git
def as_dict(self): """Return a machine-readable representation of the result.""" return { "c": self.c, "method": self.method, "n_samples": self.n_samples, "n_labeled_positive": self.n_labeled_positive, "metadata": dict(self.metadata), "confidence_interval": ( None if self.confidence_interval is None else self.confidence_interval.as_dict() ), }Return a machine-readable representation of the result.
class QuantilePositivePropensityEstimator (quantile=0.25)-
Expand source code Browse git
class QuantilePositivePropensityEstimator(BasePropensityEstimator): """Estimate c with a configurable quantile of positive scores.""" def __init__(self, quantile=0.25): """Initialize the quantile-based estimator.""" self.quantile = quantile def _fit_propensity(self, y, *, s_proba=None, X=None): if self.quantile <= 0 or self.quantile > 1: raise ValueError("quantile must lie in (0, 1].") positive_scores = _positive_propensity_scores(y, s_proba=s_proba) c_hat = float(np.quantile(positive_scores, self.quantile)) return _result_from_scalar( c_hat, positive_scores, y, method="quantile_positive", metadata={ "aggregation": "quantile", "quantile": float(self.quantile), }, )Estimate c with a configurable quantile of positive scores.
Initialize the quantile-based estimator.
Ancestors
- BasePropensityEstimator
- sklearn.base.BaseEstimator
- sklearn.utils._repr_html.base.ReprHTMLMixin
- sklearn.utils._repr_html.base._HTMLDocumentationLinkMixin
- sklearn.utils._metadata_requests._MetadataRequester
Inherited members
class SarWeightResult (propensity_scores: np.ndarray,
weights: np.ndarray,
clip_min: float,
clip_max: float,
clipped_count: int,
normalized: bool,
effective_sample_size: float,
metadata: dict[str, object] = <factory>)-
Expand source code Browse git
@dataclass(frozen=True) class SarWeightResult: """Inverse-propensity weights derived from experimental SAR hooks.""" propensity_scores: np.ndarray weights: np.ndarray clip_min: float clip_max: float clipped_count: int normalized: bool effective_sample_size: float metadata: dict[str, object] = field(default_factory=dict) def as_dict(self): """Return a machine-readable summary of the weight computation.""" return { "clip_min": self.clip_min, "clip_max": self.clip_max, "clipped_count": self.clipped_count, "normalized": self.normalized, "effective_sample_size": self.effective_sample_size, "metadata": dict(self.metadata), }Inverse-propensity weights derived from experimental SAR hooks.
Instance variables
var clip_max : float-
The type of the None singleton.
var clip_min : float-
The type of the None singleton.
var clipped_count : int-
The type of the None singleton.
var effective_sample_size : float-
The type of the None singleton.
var metadata : dict[str, object]-
The type of the None singleton.
var normalized : bool-
The type of the None singleton.
var propensity_scores : numpy.ndarray-
The type of the None singleton.
var weights : numpy.ndarray-
The type of the None singleton.
Methods
def as_dict(self)-
Expand source code Browse git
def as_dict(self): """Return a machine-readable summary of the weight computation.""" return { "clip_min": self.clip_min, "clip_max": self.clip_max, "clipped_count": self.clipped_count, "normalized": self.normalized, "effective_sample_size": self.effective_sample_size, "metadata": dict(self.metadata), }Return a machine-readable summary of the weight computation.
class ScarSanityCheckResult (candidate_threshold: float,
n_labeled_positive: int,
n_candidate_unlabeled: int,
candidate_fraction_unlabeled: float,
mean_positive_score: float,
mean_candidate_score: float,
score_ks_statistic: float,
mean_abs_smd: float | None,
max_abs_smd: float | None,
shifted_feature_fraction: float | None,
group_membership_auc: float | None,
warnings: tuple[str, ...],
metadata: dict[str, object] = <factory>)-
Expand source code Browse git
@dataclass(frozen=True) class ScarSanityCheckResult: """Summary statistics for a SCAR sanity-check run.""" candidate_threshold: float n_labeled_positive: int n_candidate_unlabeled: int candidate_fraction_unlabeled: float mean_positive_score: float mean_candidate_score: float score_ks_statistic: float mean_abs_smd: float | None max_abs_smd: float | None shifted_feature_fraction: float | None group_membership_auc: float | None warnings: tuple[str, ...] metadata: dict[str, object] = field(default_factory=dict) @property def violates_scar(self): """Return whether drift-related warnings indicate SCAR mismatch.""" return any( warning_flag in _SCAR_VIOLATION_FLAGS for warning_flag in self.warnings ) def as_dict(self): """Return a machine-readable representation of the result.""" return { "candidate_threshold": self.candidate_threshold, "n_labeled_positive": self.n_labeled_positive, "n_candidate_unlabeled": self.n_candidate_unlabeled, "candidate_fraction_unlabeled": self.candidate_fraction_unlabeled, "mean_positive_score": self.mean_positive_score, "mean_candidate_score": self.mean_candidate_score, "score_ks_statistic": self.score_ks_statistic, "mean_abs_smd": self.mean_abs_smd, "max_abs_smd": self.max_abs_smd, "shifted_feature_fraction": self.shifted_feature_fraction, "group_membership_auc": self.group_membership_auc, "warnings": list(self.warnings), "metadata": dict(self.metadata), }Summary statistics for a SCAR sanity-check run.
Instance variables
var candidate_fraction_unlabeled : float-
The type of the None singleton.
var candidate_threshold : float-
The type of the None singleton.
var group_membership_auc : float | None-
The type of the None singleton.
var max_abs_smd : float | None-
The type of the None singleton.
var mean_abs_smd : float | None-
The type of the None singleton.
var mean_candidate_score : float-
The type of the None singleton.
var mean_positive_score : float-
The type of the None singleton.
var metadata : dict[str, object]-
The type of the None singleton.
var n_candidate_unlabeled : int-
The type of the None singleton.
var n_labeled_positive : int-
The type of the None singleton.
var score_ks_statistic : float-
The type of the None singleton.
var shifted_feature_fraction : float | None-
The type of the None singleton.
prop violates_scar-
Expand source code Browse git
@property def violates_scar(self): """Return whether drift-related warnings indicate SCAR mismatch.""" return any( warning_flag in _SCAR_VIOLATION_FLAGS for warning_flag in self.warnings )Return whether drift-related warnings indicate SCAR mismatch.
var warnings : tuple[str, ...]-
The type of the None singleton.
Methods
def as_dict(self)-
Expand source code Browse git
def as_dict(self): """Return a machine-readable representation of the result.""" return { "candidate_threshold": self.candidate_threshold, "n_labeled_positive": self.n_labeled_positive, "n_candidate_unlabeled": self.n_candidate_unlabeled, "candidate_fraction_unlabeled": self.candidate_fraction_unlabeled, "mean_positive_score": self.mean_positive_score, "mean_candidate_score": self.mean_candidate_score, "score_ks_statistic": self.score_ks_statistic, "mean_abs_smd": self.mean_abs_smd, "max_abs_smd": self.max_abs_smd, "shifted_feature_fraction": self.shifted_feature_fraction, "group_membership_auc": self.group_membership_auc, "warnings": list(self.warnings), "metadata": dict(self.metadata), }Return a machine-readable representation of the result.
class TrimmedMeanPropensityEstimator (trim_fraction=0.1)-
Expand source code Browse git
class TrimmedMeanPropensityEstimator(BasePropensityEstimator): """Estimate c with a trimmed mean over labeled-positive scores.""" def __init__(self, trim_fraction=0.1): """Initialize the trimmed-mean estimator.""" self.trim_fraction = trim_fraction def _fit_propensity(self, y, *, s_proba=None, X=None): if self.trim_fraction < 0 or self.trim_fraction >= 0.5: raise ValueError("trim_fraction must lie in [0, 0.5).") positive_scores = np.sort( _positive_propensity_scores(y, s_proba=s_proba) ) n_positive = positive_scores.shape[0] trim_count = int(np.floor(n_positive * self.trim_fraction)) if trim_count: trimmed_scores = positive_scores[trim_count:-trim_count] else: trimmed_scores = positive_scores return _result_from_scores( trimmed_scores, y, method="trimmed_mean_positive", metadata={ "aggregation": "trimmed_mean", "trim_fraction": float(self.trim_fraction), "trim_count_per_side": int(trim_count), }, )Estimate c with a trimmed mean over labeled-positive scores.
Initialize the trimmed-mean estimator.
Ancestors
- BasePropensityEstimator
- sklearn.base.BaseEstimator
- sklearn.utils._repr_html.base.ReprHTMLMixin
- sklearn.utils._repr_html.base._HTMLDocumentationLinkMixin
- sklearn.utils._metadata_requests._MetadataRequester
Inherited members