diff --git a/src/pyrecest/calibration/time_offset.py b/src/pyrecest/calibration/time_offset.py index 5c98273b7..a4b0ab84f 100644 --- a/src/pyrecest/calibration/time_offset.py +++ b/src/pyrecest/calibration/time_offset.py @@ -25,19 +25,8 @@ class TimeOffsetFitResult: metadata: Mapping[str, Any] = field(default_factory=dict) def summary(self) -> dict[str, Any]: - """Return a compact JSON-serializable summary.""" - - out: dict[str, Any] = { - "metric": self.metric, - "best_offset_s": self.best_offset_s, - "evaluated_offsets": int(len(self.offsets_s)), - } - best_index = _best_metric_index( - self.offsets_s, - self.metric_values, - self.counts, - self.best_offset_s, - ) + out: dict[str, Any] = {"metric": self.metric, "best_offset_s": self.best_offset_s, "evaluated_offsets": int(len(self.offsets_s))} + best_index = _best_metric_index(self.offsets_s, self.metric_values, self.counts, self.best_offset_s) if best_index is not None: out["best_metric_value"] = float(self.metric_values[best_index]) out["best_count"] = int(self.counts[best_index]) @@ -45,14 +34,7 @@ def summary(self) -> dict[str, Any]: return out -def _best_metric_index( - offsets_s: np.ndarray, - metric_values: np.ndarray, - counts: np.ndarray, - best_offset_s: float | None, -) -> int | None: - """Return the finite, non-empty row corresponding to ``best_offset_s``.""" - +def _best_metric_index(offsets_s: np.ndarray, metric_values: np.ndarray, counts: np.ndarray, best_offset_s: float | None) -> int | None: if best_offset_s is None: return None offsets = np.asarray(offsets_s, dtype=float).reshape(-1) @@ -64,16 +46,11 @@ def _best_metric_index( if not finite.any(): return None matching = finite & np.isclose(offsets, float(best_offset_s), rtol=0.0, atol=1e-12) - if matching.any(): - candidates = np.flatnonzero(matching) - return int(candidates[int(np.nanargmin(values[candidates]))]) - candidates = np.flatnonzero(finite) + candidates = np.flatnonzero(matching if matching.any() else finite) return int(candidates[int(np.nanargmin(values[candidates]))]) def _validate_error_metric(metric: Any) -> str: - """Return a normalized fit metric name with a deterministic validation error.""" - if not isinstance(metric, str): raise ValueError(_ERROR_METRIC_MESSAGE) normalized = metric.strip().lower() @@ -83,8 +60,6 @@ def _validate_error_metric(metric: Any) -> str: def _as_finite_float(value: Any, name: str) -> float: - """Return ``value`` as a finite scalar float.""" - arr = np.asarray(value) if arr.ndim != 0 or arr.dtype == np.bool_: raise ValueError(f"{name} must be a finite scalar") @@ -101,8 +76,6 @@ def _as_finite_float(value: Any, name: str) -> float: def _as_nonnegative_time_delta(value: Any, name: str) -> float: - """Return ``value`` as a nonnegative scalar time delta, allowing infinity.""" - arr = np.asarray(value) if arr.ndim != 0 or arr.dtype == np.bool_: raise ValueError(f"{name} must be nonnegative") @@ -119,14 +92,11 @@ def _as_nonnegative_time_delta(value: Any, name: str) -> float: def _as_real_numeric_array(value: Any, name: str) -> np.ndarray: - """Return ``value`` as a real float array without silent text/bool coercion.""" - arr = np.asarray(value) if arr.dtype == np.bool_ or arr.dtype.kind in "USbcMm": raise ValueError(f"{name} must contain real numeric values") if arr.dtype == object: - flat = arr.reshape(-1) - for item in flat: + for item in arr.reshape(-1): if isinstance(item, (bool, np.bool_, str, bytes, bytearray)): raise ValueError(f"{name} must contain real numeric values") try: @@ -136,8 +106,6 @@ def _as_real_numeric_array(value: Any, name: str) -> np.ndarray: def _as_summary_scalar(value: Any, name: str, *, allow_nan: bool = False) -> float: - """Return a sweep-summary scalar without silent text, bool, or complex coercion.""" - arr = np.asarray(value) if arr.ndim != 0 or arr.dtype == np.bool_ or arr.dtype.kind in "USbcMm": raise ValueError(f"{name} must be a real scalar") @@ -156,8 +124,6 @@ def _as_summary_scalar(value: Any, name: str, *, allow_nan: bool = False) -> flo def _as_nonnegative_summary_count(value: Any, name: str) -> float: - """Return a finite, nonnegative summary count.""" - result = _as_summary_scalar(value, name) if result < 0.0: raise ValueError(f"{name} must be nonnegative") @@ -165,8 +131,6 @@ def _as_nonnegative_summary_count(value: Any, name: str) -> float: def make_offset_grid(min_s: float, max_s: float, step_s: float) -> np.ndarray: - """Return an inclusive offset grid rounded to nanosecond precision.""" - min_s = _as_finite_float(min_s, "min_s") max_s = _as_finite_float(max_s, "max_s") step_s = _as_finite_float(step_s, "step_s") @@ -182,27 +146,16 @@ def make_offset_grid(min_s: float, max_s: float, step_s: float) -> np.ndarray: def apply_time_offset(times_s: np.ndarray, offset_s: float | None) -> np.ndarray: - """Return a copy of ``times_s`` shifted by ``offset_s`` seconds.""" - offset = 0.0 if offset_s is None else _as_finite_float(offset_s, "offset_s") return _as_real_numeric_array(times_s, "times_s") + offset def _validate_max_time_delta(max_time_delta_s: float | None) -> float | None: - if max_time_delta_s is None: - return None - return _as_nonnegative_time_delta(max_time_delta_s, "max_time_delta_s") - + return None if max_time_delta_s is None else _as_nonnegative_time_delta(max_time_delta_s, "max_time_delta_s") -def _finite_reference_rows( - reference_times_s: np.ndarray, - reference_values: np.ndarray | None = None, -) -> np.ndarray: - """Return a mask selecting reference rows that are usable for matching.""" - reference_times = _as_real_numeric_array( - reference_times_s, "reference_times_s" - ).reshape(-1) +def _finite_reference_rows(reference_times_s: np.ndarray, reference_values: np.ndarray | None = None) -> np.ndarray: + reference_times = _as_real_numeric_array(reference_times_s, "reference_times_s").reshape(-1) finite = np.isfinite(reference_times) if reference_values is not None: values = _as_real_numeric_array(reference_values, "reference_values") @@ -212,27 +165,16 @@ def _finite_reference_rows( return finite -def nearest_time_indices( - reference_times_s: np.ndarray, query_times_s: np.ndarray -) -> np.ndarray: - """Return original indices of nearest finite reference times for each query time. - - Non-finite query times have no nearest reference time and are marked as ``-1``. - """ - - reference = _as_real_numeric_array(reference_times_s, "reference_times_s").reshape( - -1 - ) +def nearest_time_indices(reference_times_s: np.ndarray, query_times_s: np.ndarray) -> np.ndarray: + reference = _as_real_numeric_array(reference_times_s, "reference_times_s").reshape(-1) query = _as_real_numeric_array(query_times_s, "query_times_s").reshape(-1) finite_reference = _finite_reference_rows(reference) if not finite_reference.any(): raise ValueError("reference_times_s must contain at least one finite value") - nearest = np.full(query.shape, -1, dtype=int) finite_query = np.isfinite(query) if not finite_query.any(): return nearest - original_indices = np.flatnonzero(finite_reference) reference = reference[finite_reference] order = np.argsort(reference) @@ -241,26 +183,14 @@ def nearest_time_indices( insertion = np.searchsorted(sorted_reference, finite_query_values) right = np.clip(insertion, 0, sorted_reference.size - 1) left = np.clip(insertion - 1, 0, sorted_reference.size - 1) - use_right = np.abs(sorted_reference[right] - finite_query_values) < np.abs( - sorted_reference[left] - finite_query_values - ) + use_right = np.abs(sorted_reference[right] - finite_query_values) < np.abs(sorted_reference[left] - finite_query_values) nearest[finite_query] = original_indices[order[np.where(use_right, right, left)]] return nearest -def interpolate_reference_values( - reference_times_s: np.ndarray, - reference_values: np.ndarray, - query_times_s: np.ndarray, - *, - max_time_delta_s: float | None = None, -) -> tuple[np.ndarray, np.ndarray]: - """Interpolate reference values at query times and return a validity mask.""" - +def interpolate_reference_values(reference_times_s: np.ndarray, reference_values: np.ndarray, query_times_s: np.ndarray, *, max_time_delta_s: float | None = None) -> tuple[np.ndarray, np.ndarray]: max_time_delta = _validate_max_time_delta(max_time_delta_s) - reference_times = _as_real_numeric_array( - reference_times_s, "reference_times_s" - ).reshape(-1) + reference_times = _as_real_numeric_array(reference_times_s, "reference_times_s").reshape(-1) reference_values = _as_real_numeric_array(reference_values, "reference_values") query_times = _as_real_numeric_array(query_times_s, "query_times_s").reshape(-1) if reference_values.ndim not in (1, 2): @@ -271,26 +201,14 @@ def interpolate_reference_values( raise ValueError("reference_times_s length must match reference_values rows") finite_reference = _finite_reference_rows(reference_times, reference_values) if np.count_nonzero(finite_reference) < 2: - raise ValueError( - "at least two finite reference rows are required for interpolation" - ) + raise ValueError("at least two finite reference rows are required for interpolation") reference_times = reference_times[finite_reference] reference_values = reference_values[finite_reference] order = np.argsort(reference_times) reference_times = reference_times[order] reference_values = reference_values[order] - - interpolated = np.column_stack( - [ - np.interp(query_times, reference_times, reference_values[:, dim]) - for dim in range(reference_values.shape[1]) - ] - ) - valid = ( - np.isfinite(query_times) - & (query_times >= reference_times[0]) - & (query_times <= reference_times[-1]) - ) + interpolated = np.column_stack([np.interp(query_times, reference_times, reference_values[:, dim]) for dim in range(reference_values.shape[1])]) + valid = np.isfinite(query_times) & (query_times >= reference_times[0]) & (query_times <= reference_times[-1]) if max_time_delta is not None: nearest = nearest_time_indices(reference_times, query_times) valid &= np.abs(reference_times[nearest] - query_times) <= max_time_delta @@ -298,119 +216,39 @@ def interpolate_reference_values( return interpolated, valid -def time_offset_error_summary( - measurement_times_s: np.ndarray, - measurement_values: np.ndarray, - reference_times_s: np.ndarray, - reference_values: np.ndarray, - offset_s: float, - *, - max_time_delta_s: float | None = None, -) -> dict[str, float]: - """Evaluate one candidate offset against a reference trajectory.""" - - measurement_values = _as_real_numeric_array( - measurement_values, "measurement_values" - ) +def time_offset_error_summary(measurement_times_s: np.ndarray, measurement_values: np.ndarray, reference_times_s: np.ndarray, reference_values: np.ndarray, offset_s: float, *, max_time_delta_s: float | None = None) -> dict[str, float]: + measurement_values = _as_real_numeric_array(measurement_values, "measurement_values") if measurement_values.ndim == 1: measurement_values = measurement_values.reshape(-1, 1) elif measurement_values.ndim != 2: raise ValueError("measurement_values must be one- or two-dimensional") query_times = apply_time_offset(measurement_times_s, offset_s) if query_times.size != measurement_values.shape[0]: - raise ValueError( - "measurement_times_s length must match measurement_values rows" - ) - reference_at_query, valid = interpolate_reference_values( - reference_times_s, - reference_values, - query_times, - max_time_delta_s=max_time_delta_s, - ) + raise ValueError("measurement_times_s length must match measurement_values rows") + reference_at_query, valid = interpolate_reference_values(reference_times_s, reference_values, query_times, max_time_delta_s=max_time_delta_s) if measurement_values.shape[1] != reference_at_query.shape[1]: - raise ValueError( - "measurement_values and reference_values must have the same value dimension" - ) + raise ValueError("measurement_values and reference_values must have the same value dimension") valid &= np.isfinite(measurement_values).all(axis=1) - errors = np.linalg.norm( - measurement_values[valid] - reference_at_query[valid], axis=1 - ) + errors = np.linalg.norm(measurement_values[valid] - reference_at_query[valid], axis=1) return _error_stats(float(offset_s), errors, total_count=len(measurement_values)) -def time_offset_sweep( - measurement_times_s: np.ndarray, - measurement_values: np.ndarray, - reference_times_s: np.ndarray, - reference_values: np.ndarray, - offsets_s: Iterable[float], - *, - max_time_delta_s: float | None = None, -) -> list[dict[str, float]]: - """Return error summaries for each candidate timestamp offset.""" - - return [ - time_offset_error_summary( - measurement_times_s, - measurement_values, - reference_times_s, - reference_values, - offset, - max_time_delta_s=max_time_delta_s, - ) - for offset in offsets_s - ] - - -def fit_time_offset( - measurement_times_s: np.ndarray, - measurement_values: np.ndarray, - reference_times_s: np.ndarray, - reference_values: np.ndarray, - offsets_s: Iterable[float], - *, - metric: str = "rmse", - max_time_delta_s: float | None = None, - metadata: Mapping[str, Any] | None = None, -) -> TimeOffsetFitResult: - """Fit a timestamp offset by minimizing a sweep metric.""" +def time_offset_sweep(measurement_times_s: np.ndarray, measurement_values: np.ndarray, reference_times_s: np.ndarray, reference_values: np.ndarray, offsets_s: Iterable[float], *, max_time_delta_s: float | None = None) -> list[dict[str, float]]: + return [time_offset_error_summary(measurement_times_s, measurement_values, reference_times_s, reference_values, offset, max_time_delta_s=max_time_delta_s) for offset in offsets_s] + +def fit_time_offset(measurement_times_s: np.ndarray, measurement_values: np.ndarray, reference_times_s: np.ndarray, reference_values: np.ndarray, offsets_s: Iterable[float], *, metric: str = "rmse", max_time_delta_s: float | None = None, metadata: Mapping[str, Any] | None = None) -> TimeOffsetFitResult: metric = _validate_error_metric(metric) - summaries = time_offset_sweep( - measurement_times_s, - measurement_values, - reference_times_s, - reference_values, - offsets_s, - max_time_delta_s=max_time_delta_s, - ) + summaries = time_offset_sweep(measurement_times_s, measurement_values, reference_times_s, reference_values, offsets_s, max_time_delta_s=max_time_delta_s) offsets = np.array([row["time_offset_s"] for row in summaries], dtype=float) values = np.array([row[metric] for row in summaries], dtype=float) counts = np.array([row.get("count", 0.0) for row in summaries], dtype=float) finite = np.isfinite(values) & (counts > 0) - best = ( - None - if not finite.any() - else float(offsets[np.where(finite)[0][int(np.nanargmin(values[finite]))]]) - ) - return TimeOffsetFitResult( - best_offset_s=best, - metric=metric, - offsets_s=offsets, - metric_values=values, - counts=counts.astype(int), - summaries=summaries, - metadata={} if metadata is None else dict(metadata), - ) - - -def aggregate_time_offset_sweeps( - sweeps: Iterable[Iterable[Mapping[str, float]]], - *, - metric: str = "rmse", -) -> list[dict[str, float]]: - """Aggregate same-offset sweeps with count weighting.""" + best = None if not finite.any() else float(offsets[np.where(finite)[0][int(np.nanargmin(values[finite]))]]) + return TimeOffsetFitResult(best_offset_s=best, metric=metric, offsets_s=offsets, metric_values=values, counts=counts.astype(int), summaries=summaries, metadata={} if metadata is None else dict(metadata)) + +def aggregate_time_offset_sweeps(sweeps: Iterable[Iterable[Mapping[str, float]]], *, metric: str = "rmse") -> list[dict[str, float]]: metric = _validate_error_metric(metric) by_offset: dict[float, list[Mapping[str, float]]] = {} for sweep in sweeps: @@ -419,33 +257,12 @@ def aggregate_time_offset_sweeps( by_offset.setdefault(offset, []).append(row) rows: list[dict[str, float]] = [] for offset, parts in sorted(by_offset.items()): - counts = np.array( - [ - _as_nonnegative_summary_count(part.get("count", 0.0), "count") - for part in parts - ], - dtype=float, - ) - total = float(np.sum(counts)) - row = {"time_offset_s": float(offset), "count": total} - for key in dict.fromkeys(("mean", "rmse", "p95", "max", metric)): - values = np.array( - [ - _as_summary_scalar(part.get(key, np.nan), str(key), allow_nan=True) - for part in parts - ], - dtype=float, - ) + counts = np.array([_as_nonnegative_summary_count(part.get("count", 0.0), "count") for part in parts], dtype=float) + row = {"time_offset_s": float(offset), "count": float(np.sum(counts))} + for key in dict.fromkeys(("mean", "std", "rmse", "p95", "max", metric)): + values = np.array([_as_summary_scalar(part.get(key, np.nan), str(key), allow_nan=True) for part in parts], dtype=float) if key == "std": - means = np.array( - [ - _as_summary_scalar( - part.get("mean", np.nan), "mean", allow_nan=True - ) - for part in parts - ], - dtype=float, - ) + means = np.array([_as_summary_scalar(part.get("mean", np.nan), "mean", allow_nan=True) for part in parts], dtype=float) row[key] = _aggregate_std_metric(values, means, counts) else: row[key] = _aggregate_summary_metric(key, values, counts) @@ -453,9 +270,7 @@ def aggregate_time_offset_sweeps( return rows -def _aggregate_summary_metric( - key: str, values: np.ndarray, counts: np.ndarray -) -> float: +def _aggregate_summary_metric(key: str, values: np.ndarray, counts: np.ndarray) -> float: valid = np.isfinite(values) & (counts > 0.0) if not valid.any(): return float("nan") @@ -466,59 +281,22 @@ def _aggregate_summary_metric( return float(np.average(values[valid], weights=counts[valid])) -def _aggregate_std_metric( - stds: np.ndarray, means: np.ndarray, counts: np.ndarray -) -> float: +def _aggregate_std_metric(stds: np.ndarray, means: np.ndarray, counts: np.ndarray) -> float: valid = np.isfinite(stds) & np.isfinite(means) & (counts > 0.0) if not valid.any(): return float("nan") weights = counts[valid] pooled_mean = float(np.average(means[valid], weights=weights)) - second_moment = float( - np.average(stds[valid] ** 2 + means[valid] ** 2, weights=weights) - ) - variance = max(0.0, second_moment - pooled_mean**2) - return float(np.sqrt(variance)) + second_moment = float(np.average(stds[valid] ** 2 + means[valid] ** 2, weights=weights)) + return float(np.sqrt(max(0.0, second_moment - pooled_mean**2))) -def _error_stats( - offset_s: float, errors: np.ndarray, *, total_count: int -) -> dict[str, float]: +def _error_stats(offset_s: float, errors: np.ndarray, *, total_count: int) -> dict[str, float]: errors = np.asarray(errors, dtype=float).reshape(-1) errors = errors[np.isfinite(errors)] if errors.size == 0: - return { - "time_offset_s": float(offset_s), - "count": 0.0, - "coverage": 0.0 if total_count else float("nan"), - "mean": float("nan"), - "std": float("nan"), - "rmse": float("nan"), - "p95": float("nan"), - "max": float("nan"), - } - return { - "time_offset_s": float(offset_s), - "count": float(errors.size), - "coverage": ( - float(errors.size / total_count) if total_count > 0 else float("nan") - ), - "mean": float(np.mean(errors)), - "std": float(np.std(errors)), - "rmse": float(np.sqrt(np.mean(errors**2))), - "p95": float(np.percentile(errors, 95)), - "max": float(np.max(errors)), - } - - -__all__ = [ - "TimeOffsetFitResult", - "aggregate_time_offset_sweeps", - "apply_time_offset", - "fit_time_offset", - "interpolate_reference_values", - "make_offset_grid", - "nearest_time_indices", - "time_offset_error_summary", - "time_offset_sweep", -] + return {"time_offset_s": float(offset_s), "count": 0.0, "coverage": 0.0 if total_count else float("nan"), "mean": float("nan"), "std": float("nan"), "rmse": float("nan"), "p95": float("nan"), "max": float("nan")} + return {"time_offset_s": float(offset_s), "count": float(errors.size), "coverage": float(errors.size / total_count) if total_count > 0 else float("nan"), "mean": float(np.mean(errors)), "std": float(np.std(errors)), "rmse": float(np.sqrt(np.mean(errors**2))), "p95": float(np.percentile(errors, 95)), "max": float(np.max(errors))} + + +__all__ = ["TimeOffsetFitResult", "aggregate_time_offset_sweeps", "apply_time_offset", "fit_time_offset", "interpolate_reference_values", "make_offset_grid", "nearest_time_indices", "time_offset_error_summary", "time_offset_sweep"]