diff --git a/changelog.d/imputer-weights.fixed.md b/changelog.d/imputer-weights.fixed.md new file mode 100644 index 0000000..8d03f0f --- /dev/null +++ b/changelog.d/imputer-weights.fixed.md @@ -0,0 +1 @@ +Fixed `Imputer.fit(weight_col=...)` silently discarding weights (#4). Previously, weights were used only as bootstrap-resample probabilities over `X_train`, with the resampled data then fed unweighted into the underlying estimator; effective sample size shrank, rare donors were dropped, and variance was inflated relative to the correct weighted estimator. The base `Imputer.fit` now threads `sample_weight` through to each learner's native weighted-fit API: `RandomForestQuantileRegressor.fit(sample_weight=...)`, `sm.WLS` (instead of `sm.OLS`), `LogisticRegression.fit(sample_weight=...)`, `RandomForestClassifier.fit(sample_weight=...)`, and StatMatch's `NND.hotdeck` via `weight.don`. Models that do not support weighted fit (`QuantReg`, `MDN`) now raise `NotImplementedError` rather than silently ignoring weights. NaN weights are now rejected explicitly (previously `(weights <= 0).any()` returned `False` on NaN and let the NaN propagate into `.sample()` probabilities). diff --git a/microimpute/models/imputer.py b/microimpute/models/imputer.py index 14016fd..ec563a8 100644 --- a/microimpute/models/imputer.py +++ b/microimpute/models/imputer.py @@ -271,9 +271,21 @@ def fit( weights = X_train[weight_col] elif weight_col is not None and isinstance(weight_col, np.ndarray): weights = pd.Series(weight_col, index=X_train.index) + elif weight_col is not None and isinstance(weight_col, pd.Series): + weights = weight_col.reindex(X_train.index) - if weights is not None and (weights <= 0).any(): - raise ValueError("Weights must be positive") + if weights is not None: + # Check for NaN AND non-positive values together. Previously only + # (weights <= 0).any() was checked, which returns False for NaN + # weights — those then propagated into .sample() as NaN + # probabilities or corrupted sample_weight passed to learners. + weights_arr = np.asarray(weights, dtype=float) + invalid_mask = np.isnan(weights_arr) | (weights_arr <= 0) + if invalid_mask.any(): + raise ValueError( + "Weights must be positive and finite; found " + f"{int(invalid_mask.sum())} non-positive or NaN weight(s)" + ) # Identify target types BEFORE preprocessing self.identify_target_types(X_train, imputed_variables, not_numeric_categorical) @@ -284,21 +296,28 @@ def fit( ) ) - if weights is not None: - weights_normalized = weights / weights.sum() - X_train = X_train.sample( - n=len(X_train), - replace=True, - weights=weights_normalized, - random_state=self.seed, - ).reset_index(drop=True) - # Save predictors and imputed variables self.predictors = predictors self.imputed_variables = imputed_variables self.imputed_vars_dummy_info = imputed_vars_dummy_info self.original_predictors = original_predictors + # Pass sample_weight through to the subclass so it can use each + # learner's native weighted-fit API (QRF, OLS→WLS, logistic, RFC all + # support sample_weight). This replaces the previous bootstrap + # resample, which silently discarded weights for the underlying + # estimator and inflated variance / shrank effective sample size. + sample_weight = None + if weights is not None: + sample_weight = np.asarray(weights_arr, dtype=float) + # Reindex if preprocess_data_types changed the row ordering + # (it currently does not, but guard against future drift). + if len(sample_weight) != len(X_train): + raise RuntimeError( + "Internal error: sample_weight length no longer matches " + "X_train after preprocessing" + ) + # Defer actual training to subclass with all parameters fitted_model = self._fit( X_train, @@ -309,6 +328,7 @@ def fit( boolean_targets=self.boolean_targets, numeric_targets=self.numeric_targets, constant_targets=self.constant_targets, + sample_weight=sample_weight, **kwargs, ) return fitted_model diff --git a/microimpute/models/matching.py b/microimpute/models/matching.py index 86b2d6e..07952d5 100644 --- a/microimpute/models/matching.py +++ b/microimpute/models/matching.py @@ -449,6 +449,7 @@ def _fit( numeric_targets: Optional[List[str]] = None, constant_targets: Optional[Dict[str, Dict]] = None, tune_hyperparameters: bool = False, + sample_weight: Optional[np.ndarray] = None, **matching_kwargs: Any, ) -> MatchingResults: """Fit the matching model by storing the donor data and variable names. @@ -457,6 +458,11 @@ def _fit( X_train: DataFrame containing the donor data. predictors: List of column names to use as predictors. imputed_variables: List of column names to impute. + sample_weight: Optional per-row sample weights for the donor + dataset. When provided, weights are passed to R StatMatch's + ``NND.hotdeck`` via ``weight.don`` so that donor records are + matched in proportion to their survey weights rather than + uniformly. matching_kwargs: Additional keyword arguments for hyperparameter tuning of the matching function. @@ -468,6 +474,13 @@ def _fit( """ try: self.donor_data = X_train.copy() + if sample_weight is not None: + # Attach donor weights to the matching hyperparameters so + # they're forwarded into the StatMatch R call (weight.don). + matching_kwargs = { + **matching_kwargs, + "donor_sample_weight": np.asarray(sample_weight, dtype=float), + } if tune_hyperparameters: self.logger.info("Tuning hyperparameters for the matching model") diff --git a/microimpute/models/mdn.py b/microimpute/models/mdn.py index aac265e..5a518a3 100644 --- a/microimpute/models/mdn.py +++ b/microimpute/models/mdn.py @@ -926,6 +926,7 @@ def _fit( numeric_targets: Optional[List[str]] = None, constant_targets: Optional[Dict[str, Dict]] = None, tune_hyperparameters: bool = False, + sample_weight: Optional[np.ndarray] = None, **kwargs: Any, ) -> Union[MDNResults, Tuple[MDNResults, Dict[str, Any]]]: """Fit the MDN model to the training data. @@ -940,12 +941,22 @@ def _fit( numeric_targets: List of numeric target names. constant_targets: Dict of constant target info. tune_hyperparameters: If True, tune hyperparameters before fitting. + sample_weight: Optional per-row sample weights. The underlying + pytorch_tabular MDN implementation does not accept sample + weights; when provided, the model raises + ``NotImplementedError`` so callers do not silently get an + unweighted fit. **kwargs: Additional parameters. Returns: MDNResults instance with fitted models. If tune_hyperparameters=True, returns (MDNResults, best_params). """ + if sample_weight is not None: + raise NotImplementedError( + "MDN does not yet support sample weights. Use QRF, OLS, or " + "Matching for weighted imputation." + ) try: best_params = None diff --git a/microimpute/models/ols.py b/microimpute/models/ols.py index 8595450..b9bf632 100644 --- a/microimpute/models/ols.py +++ b/microimpute/models/ols.py @@ -31,6 +31,7 @@ def fit( y: pd.Series, var_type: str, categories: List = None, + sample_weight: Optional[np.ndarray] = None, **lr_kwargs: Any, ) -> None: """Fit logistic regression for categorical/boolean target. @@ -71,7 +72,10 @@ def fit( } self.classifier = LogisticRegression(**classifier_params) - self.classifier.fit(X, y_encoded) + fit_kwargs = {} + if sample_weight is not None: + fit_kwargs["sample_weight"] = np.asarray(sample_weight, dtype=float) + self.classifier.fit(X, y_encoded, **fit_kwargs) def predict( self, @@ -137,11 +141,26 @@ def __init__(self, seed: int, logger): self.model = None self.output_column = None - def fit(self, X: pd.DataFrame, y: pd.Series, **kwargs) -> None: - """Fit OLS model.""" + def fit( + self, + X: pd.DataFrame, + y: pd.Series, + sample_weight: Optional[np.ndarray] = None, + **kwargs, + ) -> None: + """Fit OLS (or WLS when sample_weight is provided). + + When ``sample_weight`` is provided, uses ``statsmodels.api.WLS`` to + perform a genuine weighted least-squares fit rather than ignoring + the weights. + """ self.output_column = y.name X_with_const = sm.add_constant(X) - self.model = sm.OLS(y, X_with_const).fit() + if sample_weight is not None: + weights = np.asarray(sample_weight, dtype=float) + self.model = sm.WLS(y, X_with_const, weights=weights).fit() + else: + self.model = sm.OLS(y, X_with_const).fit() self.scale = self.model.scale def predict(self, X: pd.DataFrame) -> np.ndarray: @@ -431,6 +450,7 @@ def _fit( boolean_targets: Optional[Dict[str, Dict]] = None, numeric_targets: Optional[List[str]] = None, constant_targets: Optional[Dict[str, Dict]] = None, + sample_weight: Optional[np.ndarray] = None, **kwargs: Any, ) -> OLSResults: """Fit the OLS model to the training data. @@ -439,6 +459,9 @@ def _fit( X_train: DataFrame containing the training data. predictors: List of column names to use as predictors. imputed_variables: List of column names to impute. + sample_weight: Optional per-row sample weights, threaded through + to ``sm.WLS`` (for numeric targets) or + ``LogisticRegression.fit`` (for categorical/boolean). Returns: The fitted model instance. @@ -476,6 +499,7 @@ def _fit( Y, var_type=categorical_targets[variable]["type"], categories=categorical_targets[variable].get("categories"), + sample_weight=sample_weight, **kwargs, ) self.logger.info( @@ -484,14 +508,22 @@ def _fit( elif variable in (boolean_targets or {}): # Use logistic regression for boolean targets model = _LogisticRegressionModel(seed=self.seed, logger=self.logger) - model.fit(X_train[predictors], Y, var_type="boolean", **kwargs) + model.fit( + X_train[predictors], + Y, + var_type="boolean", + sample_weight=sample_weight, + **kwargs, + ) self.logger.info( f"Logistic regression fitted for boolean variable {variable}" ) else: # Use OLS for numeric targets model = _OLSModel(seed=self.seed, logger=self.logger) - model.fit(X_train[predictors], Y, **kwargs) + model.fit( + X_train[predictors], Y, sample_weight=sample_weight, **kwargs + ) self.logger.info( f"OLS regression fitted for numeric variable {variable}" ) diff --git a/microimpute/models/qrf.py b/microimpute/models/qrf.py index 3f46c5f..6a602bf 100644 --- a/microimpute/models/qrf.py +++ b/microimpute/models/qrf.py @@ -57,6 +57,7 @@ def fit( y: pd.Series, var_type: str, categories: List = None, + sample_weight: Optional[np.ndarray] = None, **rf_kwargs: Any, ) -> None: """Fit classifier for categorical/boolean target. @@ -95,7 +96,10 @@ def fit( } self.classifier = RandomForestClassifier(**classifier_params) - self.classifier.fit(X, y_encoded) + fit_kwargs = {} + if sample_weight is not None: + fit_kwargs["sample_weight"] = np.asarray(sample_weight, dtype=float) + self.classifier.fit(X, y_encoded, **fit_kwargs) def predict(self, X: pd.DataFrame, return_probs: bool = False) -> pd.Series: """Predict classes or probabilities.""" @@ -148,24 +152,44 @@ def __init__(self, seed: int, logger): self.qrf = None self.output_column = None - def fit(self, X: pd.DataFrame, y: pd.Series, **qrf_kwargs: Any) -> None: + def fit( + self, + X: pd.DataFrame, + y: pd.Series, + sample_weight: Optional[np.ndarray] = None, + **qrf_kwargs: Any, + ) -> None: """Fit the QRF model. Note: Assumes X is already preprocessed with categorical encoding handled by the base Imputer class. + + Args: + X: Predictor DataFrame (preprocessed). + y: Target Series. + sample_weight: Optional per-row sample weights, passed directly to + the underlying ``RandomForestQuantileRegressor.fit`` so each + row contributes to the weighted-survey estimator rather than + being treated as a bootstrap-resample probability. """ self.output_column = y.name - # Remove random_state from kwargs if present, since we set it explicitly + # Remove random_state / sample_weight from kwargs if present, since + # we set them explicitly below. qrf_kwargs_filtered = { - k: v for k, v in qrf_kwargs.items() if k != "random_state" + k: v + for k, v in qrf_kwargs.items() + if k not in ("random_state", "sample_weight") } # Create and fit model self.qrf = RandomForestQuantileRegressor( random_state=self.seed, **qrf_kwargs_filtered ) - self.qrf.fit(X, y.values.ravel()) + fit_kwargs = {} + if sample_weight is not None: + fit_kwargs["sample_weight"] = np.asarray(sample_weight, dtype=float) + self.qrf.fit(X, y.values.ravel(), **fit_kwargs) def predict( self, @@ -608,6 +632,11 @@ def _fit_model( categorical_targets = getattr(self, "categorical_targets", {}) boolean_targets = getattr(self, "boolean_targets", {}) + # sample_weight is threaded via kwargs from the base Imputer.fit, + # bypassing the nested qrf/rfc structure so both classifier and + # regressor paths see the same per-row weights. + sample_weight = kwargs.pop("sample_weight", None) + # Extract appropriate parameters based on model type # Handle nested structure from hyperparameter tuning if isinstance(model, _RandomForestClassifierModel): @@ -627,10 +656,17 @@ def _fit_model( y, var_type=categorical_targets[variable]["type"], categories=categorical_targets[variable].get("categories"), + sample_weight=sample_weight, **model_params, ) elif variable in boolean_targets: - model.fit(X, y, var_type="boolean", **model_params) + model.fit( + X, + y, + var_type="boolean", + sample_weight=sample_weight, + **model_params, + ) else: # Use QRF params if they exist in a nested structure if "qrf" in kwargs: @@ -643,7 +679,7 @@ def _fit_model( model_params = kwargs # Regular QRF fit - model.fit(X, y, **model_params) + model.fit(X, y, sample_weight=sample_weight, **model_params) def _get_memory_usage_info(self) -> str: """Get formatted memory usage information.""" @@ -665,6 +701,7 @@ def _fit( numeric_targets: Optional[List[str]] = None, constant_targets: Optional[Dict[str, Dict]] = None, tune_hyperparameters: bool = False, + sample_weight: Optional[np.ndarray] = None, **qrf_kwargs: Any, ) -> QRFResults: """Fit the QRF model to the training data. @@ -673,6 +710,9 @@ def _fit( X_train: DataFrame containing the training data. predictors: List of column names to use as predictors. imputed_variables: List of column names to impute. + sample_weight: Optional per-row sample weights threaded through + to ``RandomForestQuantileRegressor.fit`` / + ``RandomForestClassifier.fit``. **qrf_kwargs: Additional keyword arguments to pass to QRF. Returns: @@ -691,10 +731,15 @@ def _fit( f"Subsampling training data from " f"{len(X_train)} to {self.max_train_samples} rows" ) - X_train = X_train.sample( - n=self.max_train_samples, - random_state=self.seed, - ).reset_index(drop=True) + # Sample by positional index so sample_weight stays aligned + # with X_train after reset_index. + rng = np.random.default_rng(self.seed) + sel = rng.choice( + len(X_train), size=self.max_train_samples, replace=False + ) + if sample_weight is not None: + sample_weight = np.asarray(sample_weight, dtype=float)[sel] + X_train = X_train.iloc[sel].reset_index(drop=True) # Store target type information early for hyperparameter tuning self.categorical_targets = categorical_targets or {} @@ -737,6 +782,7 @@ def _fit( batch_variables, qrf_kwargs, constant_targets, + sample_weight=sample_weight, ) # Memory cleanup after each batch @@ -795,6 +841,7 @@ def _fit( X_train[encoded_predictors], X_train[variable], variable, + sample_weight=sample_weight, **qrf_kwargs, ) @@ -892,6 +939,7 @@ def _fit( batch_variables, qrf_kwargs, constant_targets, + sample_weight=sample_weight, ) # Memory cleanup after each batch @@ -951,6 +999,7 @@ def _fit( X_train[encoded_predictors], X_train[variable], variable, + sample_weight=sample_weight, **qrf_kwargs, ) @@ -1028,6 +1077,7 @@ def _fit_variable_batch( batch_variables: List[str], qrf_kwargs: Dict[str, Any], constant_targets: Optional[Dict[str, Dict]] = None, + sample_weight: Optional[np.ndarray] = None, ) -> None: """Fit models for a batch of variables. @@ -1076,6 +1126,7 @@ def _fit_variable_batch( X_train[current_predictors], X_train[variable], variable, + sample_weight=sample_weight, **qrf_kwargs, ) diff --git a/microimpute/models/quantreg.py b/microimpute/models/quantreg.py index a808303..0cca15d 100644 --- a/microimpute/models/quantreg.py +++ b/microimpute/models/quantreg.py @@ -282,6 +282,7 @@ def _fit( numeric_targets: Optional[List[str]] = None, constant_targets: Optional[Dict[str, Dict]] = None, quantiles: Optional[List[float]] = None, + sample_weight: Optional[np.ndarray] = None, ) -> QuantRegResults: """Fit the Quantile Regression model to the training data. @@ -290,14 +291,24 @@ def _fit( predictors: List of column names to use as predictors. imputed_variables: List of column names to impute. quantiles: List of quantiles to fit models for. + sample_weight: Optional per-row sample weights. statsmodels' + ``QuantReg`` does not support weights directly; when weights + are provided the model raises ``NotImplementedError`` so + callers do not silently get an unweighted fit. Returns: The fitted model instance. Raises: ValueError: If any quantile is outside the [0, 1] range. + NotImplementedError: If sample_weight is provided. RuntimeError: If model fitting fails. """ + if sample_weight is not None: + raise NotImplementedError( + "QuantReg does not support sample weights. Use QRF or OLS " + "for weighted imputation." + ) # Check for unsupported categorical targets if categorical_targets: unsupported = list(categorical_targets.keys()) diff --git a/microimpute/utils/statmatch_hotdeck.py b/microimpute/utils/statmatch_hotdeck.py index 2d87742..94b509b 100644 --- a/microimpute/utils/statmatch_hotdeck.py +++ b/microimpute/utils/statmatch_hotdeck.py @@ -106,12 +106,26 @@ def nnd_hotdeck_using_rpy2( r_match = ro.StrVector(matching_variables) r_z = ro.StrVector(z_variables) - if matching_kwargs: + # Extract optional donor sample weights (threaded from Imputer.fit + # when weight_col was supplied). StatMatch accepts these via the + # ``weight.don`` R argument; we pop it from matching_kwargs so that + # other kwargs pass through unchanged. + r_kwargs = dict(matching_kwargs) + donor_sample_weight = r_kwargs.pop("donor_sample_weight", None) + if donor_sample_weight is not None: + with localconverter( + default_converter + pandas2ri.converter + numpy2ri.converter + ): + r_kwargs["weight_don"] = ro.FloatVector( + np.asarray(donor_sample_weight, dtype=float) + ) + + if r_kwargs: out_NND = StatMatch.NND_hotdeck( data_rec=r_receiver, data_don=r_donor, match_vars=r_match, - **matching_kwargs, + **r_kwargs, ) else: out_NND = StatMatch.NND_hotdeck( diff --git a/tests/test_models/test_imputers.py b/tests/test_models/test_imputers.py index 9e43b49..bd7016f 100644 --- a/tests/test_models/test_imputers.py +++ b/tests/test_models/test_imputers.py @@ -522,16 +522,31 @@ def test_weighted_training( model = model_class() - if model_class.__name__ == "QuantReg": - fitted = model.fit( - X_train, - predictors, - imputed_variables, - weight_col="wgt", - quantiles=QUANTILES, - ) - else: - fitted = model.fit(X_train, predictors, imputed_variables, weight_col="wgt") + # QuantReg and MDN don't support sample weights — they should raise + # NotImplementedError rather than silently dropping weights. + if model_class.__name__ in ("QuantReg", "MDN"): + with pytest.raises( + (NotImplementedError, RuntimeError), + match="does not.*support.*weights|does not support sample weights", + ): + if model_class.__name__ == "QuantReg": + model.fit( + X_train, + predictors, + imputed_variables, + weight_col="wgt", + quantiles=QUANTILES, + ) + else: + model.fit( + X_train, + predictors, + imputed_variables, + weight_col="wgt", + ) + return + + fitted = model.fit(X_train, predictors, imputed_variables, weight_col="wgt") assert fitted is not None @@ -545,6 +560,74 @@ def test_weighted_training( assert not predictions[0.5].isna().any().any() +def test_imputer_rejects_nan_weights(diabetes_data: pd.DataFrame) -> None: + """Regression test for the NaN-weight silent-corruption bug (#4): the + imputer must raise a clear error when weights contain NaN values, + rather than letting NaN propagate through .sample() or sample_weight. + """ + X_train, _ = preprocess_data(diabetes_data) + X_train["wgt"] = 1.0 + X_train.loc[X_train.index[0], "wgt"] = float("nan") + + model = OLS() + with pytest.raises(ValueError, match="positive and finite|NaN"): + model.fit(X_train, ["age", "sex"], ["bmi"], weight_col="wgt") + + +def test_imputer_rejects_zero_weights(diabetes_data: pd.DataFrame) -> None: + """Regression test for the non-positive-weight bug (#4): weights of 0 + or negative values must raise a clear error.""" + X_train, _ = preprocess_data(diabetes_data) + X_train["wgt"] = 1.0 + X_train.loc[X_train.index[0], "wgt"] = 0.0 + + model = OLS() + with pytest.raises(ValueError, match="positive and finite|positive"): + model.fit(X_train, ["age", "sex"], ["bmi"], weight_col="wgt") + + +def test_weighted_fit_differs_from_unweighted( + diabetes_data: pd.DataFrame, +) -> None: + """Regression test for the weight-discard bug (#4): a truly weighted fit + must produce different parameter estimates than an unweighted fit on + an asymmetric dataset. Previously weights were used as bootstrap + resample probabilities and not passed to the underlying estimator, so + parameter estimates converged to the unweighted solution in + expectation.""" + np.random.seed(0) + n = 300 + + # Asymmetric weights: first half gets weight 1, second half gets weight + # 50. An unweighted OLS fit ignores this; a true WLS fit does not. + x = np.linspace(-2, 2, n) + # Introduce a slope shift in the second half so the WLS coefficient + # should skew toward it. + y = np.where(np.arange(n) < n // 2, 1.0 * x, 5.0 * x) + np.random.randn(n) * 0.2 + weights = np.where(np.arange(n) < n // 2, 1.0, 50.0) + + data = pd.DataFrame({"x": x, "y": y, "wgt": weights}) + + unweighted_ols = OLS() + unweighted_fit = unweighted_ols.fit(data, ["x"], ["y"]) + unweighted_pred = unweighted_fit.predict(data[["x"]].head(20), quantiles=[0.5])[ + 0.5 + ]["y"].values + + weighted_ols = OLS() + weighted_fit = weighted_ols.fit(data, ["x"], ["y"], weight_col="wgt") + weighted_pred = weighted_fit.predict(data[["x"]].head(20), quantiles=[0.5])[0.5][ + "y" + ].values + + # Weighted predictions should differ substantially from unweighted + # ones when a large weight block has a different slope. + assert not np.allclose(unweighted_pred, weighted_pred, atol=0.05), ( + "Weighted OLS fit should differ from unweighted fit on asymmetric " + "data; previously weights were silently discarded" + ) + + # === Quantile-Specific Tests === @@ -644,9 +727,18 @@ def test_missing_predictors_in_test(model_class: Type[Imputer]) -> None: # === Reproducibility Tests === +_REPRODUCIBILITY_MODELS = [OLS, QuantReg, QRF] +try: + from microimpute.models.matching import Matching as _Matching_for_repro + + _REPRODUCIBILITY_MODELS.append(_Matching_for_repro) +except ImportError: + pass + + @pytest.mark.parametrize( "model_class", - [OLS, QuantReg, QRF, Matching], + _REPRODUCIBILITY_MODELS, ids=lambda cls: cls.__name__, ) def test_reproducibility(model_class: Type[Imputer], simple_data: pd.DataFrame) -> None: