Skip to content

feat(eval): pipeline diagnostic & retry hardening#84

Open
yilu331 wants to merge 6 commits into
mainfrom
feat/evaluations-plan-a-pipeline-fix
Open

feat(eval): pipeline diagnostic & retry hardening#84
yilu331 wants to merge 6 commits into
mainfrom
feat/evaluations-plan-a-pipeline-fix

Conversation

@yilu331
Copy link
Copy Markdown
Collaborator

@yilu331 yilu331 commented May 22, 2026

Plan A of the /evaluations redesign work. Makes the existing evaluation pipeline observable and self-healing — AND fixes a silent product bug discovered during live e2e verification.

Summary

  • EvalHealth singleton tracks scheduler ticks, per-reason skip counts, and rolling 24h producer-failure counts in one process-wide store.
  • Producer retries save_agent_success_evaluation_results with backoff (1s / 4s, 3 total attempts); records final failures into EvalHealth instead of silently dropping them.
  • Scheduler loop records its tick into EvalHealth on every iteration.
  • All five skip paths in run_group_evaluation bump matching SkipReason counters so the operator can see why sessions are dropping.
  • New /healthz/eval endpoint surfaces all three signals with a green/amber/red liveness derivation (>5min silent = amber, >30min = red).
  • Bug fix — agentic-path scheduler bypass. The agentic extraction backend hit return result before reaching the scheduler invocation in the classic path, so no AgentSuccessEvaluationResult records were ever produced for any org running with extraction_backend=agentic. This was the silent root cause of empty /evaluations tiles. Extracted into a shared _schedule_group_evaluation_if_needed helper called from both code paths.

Live verification

Discovered and fixed during local e2e testing — published 10 substantive sessions to a backend running this branch, observed:

  • Before the agentic fix: last_tick_monotonic: null, liveness: red, no scheduler logs.
  • After the fix: Scheduled group evaluation for key=...Firing group evaluationRunning group evaluation for session=... in logs; /healthz/eval reports liveness: green with populated tick.

Commits

  1. EvalHealth singleton for pipeline diagnostics
  2. scheduler records tick into EvalHealth
  3. bump SkipReason counters in each skip path
  4. retry save with backoff and record producer failures
  5. /healthz/eval exposes pipeline diagnostics
  6. schedule group evaluation from the agentic path too (discovered via e2e)

Test plan

  • Unit tests for EvalHealth counters, 24h decay window, tick recording
  • Skip-reason counter tests for run_group_evaluation
  • Save-retry tests: 3 attempts with backoff = [1s, 4s], success on attempt 2
  • /healthz/eval route tests for green/amber/red liveness derivation
  • Regression test for _schedule_group_evaluation_if_needed
  • Full submodule regression: 127 tests pass
  • Live e2e against running backend: scheduler fires through to runner; /healthz/eval reports green

Companion PR

ReflexioAI/reflexio-enterprise#100 bumps the submodule pointer here.

Operational follow-up (out of scope for Plan A)

During live e2e, the runner logged No agent_success_evaluation extractor configs found — the deployed config has agent_success_configs: null, so even when the scheduler fires correctly, the evaluator produces no records. This is a config gap, not a code bug, and is separate from Plan A.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 22, 2026

📝 Walkthrough

Walkthrough

This PR adds health instrumentation for the agent-success evaluation pipeline: a thread-safe EvalHealth singleton records skip reasons, producer failures (24h rolling), and scheduler ticks; a new /healthz/eval endpoint exposes status with a computed liveness; scheduler, runner, service, and generation scheduling integrate health recording.

Changes

Agent Success Evaluation Health Tracking

Layer / File(s) Summary
Health Singleton Foundation
reflexio/server/services/agent_success_evaluation/_eval_health.py, tests/server/services/agent_success_evaluation/test_eval_health.py
SkipReason enum and thread-safe EvalHealth track skip counts by reason, producer failures in a rolling 24-hour window, and scheduler tick monotonic timestamps; exposes module-level singleton and proxy functions; unit tests verify aggregation, windowing, and tick updates.
Health API Endpoint with Liveness Computation
reflexio/server/api_endpoints/health_api.py, tests/server/api_endpoints/test_health_api_eval.py
Adds GET /healthz/eval returning evaluation health status augmented with liveness (green/amber/red) computed from last_tick_monotonic using configurable thresholds; integration tests exercise liveness transitions.
Scheduler Tick Recording
reflexio/server/services/agent_success_evaluation/delayed_group_evaluator.py, tests/server/services/agent_success_evaluation/test_scheduler_tick.py
Scheduler loop records a tick via _eval_health.record_tick() each iteration to indicate aliveness; test asserts tick recorded within scheduler execution.
Runner Skip Reason Tracking
reflexio/server/services/agent_success_evaluation/group_evaluation_runner.py, tests/server/services/agent_success_evaluation/test_eligibility_logging.py
Runner records explicit skip reasons (ALREADY_EVALUATED, NO_REQUESTS, NOT_YET_COMPLETE, NO_INTERACTIONS, NO_DATA_MODELS) for early exits; tests validate counters increment on those paths.
Service Retry Backoff and Producer Failure Tracking
reflexio/server/services/agent_success_evaluation/agent_success_evaluation_service.py, tests/server/services/agent_success_evaluation/test_save_retry.py
Result persistence now retries with backoff delays (1s, 4s). On total failure marks last_run_save_failed and records a producer failure in EvalHealth; tests cover exhausted retries and success-on-retry scenarios.
GenerationService Scheduling Helper
reflexio/server/services/generation_service.py, tests/server/services/test_generation_service_scheduling.py
Introduces _schedule_group_evaluation_if_needed, consolidating session gating and scheduler callback creation; GenerationService.run now calls this helper in both agentic and classic paths; tests assert scheduling behavior and key formation.

Sequence Diagram

sequenceDiagram
  participant Scheduler as Scheduler Loop
  participant Runner as Evaluation Runner
  participant Service as Result Service
  participant Health as EvalHealth Singleton
  participant API as /healthz/eval Endpoint
  
  Scheduler->>Health: record_tick(monotonic_ts)
  Note over Health: last_tick_monotonic updated
  
  Runner->>Runner: evaluate session
  alt session already evaluated
    Runner->>Health: record_skip(ALREADY_EVALUATED)
  else no requests
    Runner->>Health: record_skip(NO_REQUESTS)
  end
  
  Service->>Service: persist results
  alt save succeeds
    Service->>Health: (no producer failure)
  else save fails after retries
    Service->>Health: record_producer_failure()
  end
  
  API->>Health: get_status()
  Health-->>API: status with skip_counts, failures, last_tick
  API->>API: compute liveness from tick age
  API-->>Client: return {status + liveness}
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

  • ReflexioAI/reflexio#60: Earlier PR that introduced or extended /healthz endpoint wiring in health_api.py, related to health endpoint changes here.

Poem

🐰 I counted ticks in the scheduler light,
Skips and failures logged through the night,
When beats stay fresh, the liveness is green,
If silence grows long, amber or red is seen,
A hop, a log, the pipeline hums right.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 69.05% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'feat(eval): pipeline diagnostic & retry hardening' accurately reflects the main changes: adding EvalHealth for diagnostics and implementing retry logic with backoff for result persistence.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/evaluations-plan-a-pipeline-fix

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
reflexio/server/services/agent_success_evaluation/delayed_group_evaluator.py (1)

94-111: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Bound wait duration so liveness ticks don’t go stale while scheduler is idle.

The loop records a tick only per iteration, but Line 100 can block indefinitely and Line 109 can block for very long delays. That causes false amber/red liveness in /healthz/eval even when the scheduler thread is healthy.

💡 Proposed fix (heartbeat-bounded waits)
 GROUP_EVALUATION_DELAY_SECONDS = 600  # 10 minutes
+_TICK_HEARTBEAT_SECONDS = 30
@@
                 if next_fire_time is None:
                     # Nothing scheduled, wait for a wake signal
-                    self._wake_event.wait()
+                    self._wake_event.wait(timeout=_TICK_HEARTBEAT_SECONDS)
                     self._wake_event.clear()
                     continue
@@
                 if wait_seconds > 0:
                     # Wait until the next fire time or a wake signal
-                    self._wake_event.wait(timeout=wait_seconds)
+                    self._wake_event.wait(
+                        timeout=min(wait_seconds, _TICK_HEARTBEAT_SECONDS)
+                    )
                     self._wake_event.clear()
                     continue
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@reflexio/server/services/agent_success_evaluation/delayed_group_evaluator.py`
around lines 94 - 111, The scheduler's waits can block indefinitely and prevent
_eval_health.record_tick() from running; update the wait calls that currently
call self._wake_event.wait() (the branch when next_fire_time is None and the
branch when wait_seconds > 0) to use a bounded timeout (e.g., HEARTBEAT_SEC) so
the thread wakes periodically to call _eval_health.record_tick(). Concretely,
introduce a heartbeat constant and replace the unconditional wait() with
wait(timeout=HEARTBEAT_SEC) in the "no next_fire_time" branch, and replace
wait(timeout=wait_seconds) with wait(timeout=min(wait_seconds, HEARTBEAT_SEC))
in the scheduled-wait branch, keeping the existing self._wake_event.clear()
logic and using the same mutex/_heap access points.
🧹 Nitpick comments (2)
tests/server/services/agent_success_evaluation/test_save_retry.py (1)

25-25: ⚡ Quick win

Use a dedicated reset method instead of re-invoking __init__.

Calling _HEALTH.__init__() directly to reset singleton state is fragile—if EvalHealth.__init__ later requires arguments or changes signature, this breaks silently. Consider adding a reset() method to EvalHealth (or a test fixture) that explicitly clears state.

Suggested approach in `_eval_health.py`
# In EvalHealth class
def reset(self) -> None:
    """Reset all counters (test-only)."""
    with self._lock:
        self._skip_counts = {r: 0 for r in SkipReason}
        self._producer_failures: list[datetime] = []
        self._last_tick: datetime | None = None

Then in tests:

-    _eval_health._HEALTH.__init__()
+    _eval_health._HEALTH.reset()
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/server/services/agent_success_evaluation/test_save_retry.py` at line
25, The test resets singleton state by calling _HEALTH.__init__(), which is
fragile; add a dedicated reset method on the EvalHealth class (e.g.,
EvalHealth.reset(self)) that acquires the existing _lock and clears internal
state (_skip_counts reset over SkipReason, _producer_failures emptied,
_last_tick set to None), then change the test to call _HEALTH.reset() instead of
_HEALTH.__init__(); reference EvalHealth, _HEALTH, reset, _lock, _skip_counts,
SkipReason, _producer_failures, and _last_tick when making the change.
reflexio/server/services/agent_success_evaluation/agent_success_evaluation_service.py (1)

159-159: 💤 Low value

Consider narrowing the exception type.

The bare Exception catch is intentional for retry resilience, but if the storage layer throws specific exceptions (e.g., StorageError, IOError), catching those explicitly would prevent accidentally swallowing unrelated errors like KeyboardInterrupt subclasses or programming bugs.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@reflexio/server/services/agent_success_evaluation/agent_success_evaluation_service.py`
at line 159, The bare "except Exception as e" in the agent success evaluation
storage/retry block should be narrowed: catch the specific storage-related
exceptions (e.g., StorageError, IOError, OSError) raised by the storage layer
instead of Exception, or re-raise system/interrupt exceptions
(KeyboardInterrupt, SystemExit) if caught; update the except clause in the
try/except around the storage call in AgentSuccessEvaluationService (the block
that currently reads "except Exception as e") to list the storage exceptions
explicitly and/or add an "except Exception as e: if isinstance(e,
(KeyboardInterrupt, SystemExit)): raise; else: handle/retry" guard so unrelated
errors are not swallowed.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In
`@reflexio/server/services/agent_success_evaluation/delayed_group_evaluator.py`:
- Around line 94-111: The scheduler's waits can block indefinitely and prevent
_eval_health.record_tick() from running; update the wait calls that currently
call self._wake_event.wait() (the branch when next_fire_time is None and the
branch when wait_seconds > 0) to use a bounded timeout (e.g., HEARTBEAT_SEC) so
the thread wakes periodically to call _eval_health.record_tick(). Concretely,
introduce a heartbeat constant and replace the unconditional wait() with
wait(timeout=HEARTBEAT_SEC) in the "no next_fire_time" branch, and replace
wait(timeout=wait_seconds) with wait(timeout=min(wait_seconds, HEARTBEAT_SEC))
in the scheduled-wait branch, keeping the existing self._wake_event.clear()
logic and using the same mutex/_heap access points.

---

Nitpick comments:
In
`@reflexio/server/services/agent_success_evaluation/agent_success_evaluation_service.py`:
- Line 159: The bare "except Exception as e" in the agent success evaluation
storage/retry block should be narrowed: catch the specific storage-related
exceptions (e.g., StorageError, IOError, OSError) raised by the storage layer
instead of Exception, or re-raise system/interrupt exceptions
(KeyboardInterrupt, SystemExit) if caught; update the except clause in the
try/except around the storage call in AgentSuccessEvaluationService (the block
that currently reads "except Exception as e") to list the storage exceptions
explicitly and/or add an "except Exception as e: if isinstance(e,
(KeyboardInterrupt, SystemExit)): raise; else: handle/retry" guard so unrelated
errors are not swallowed.

In `@tests/server/services/agent_success_evaluation/test_save_retry.py`:
- Line 25: The test resets singleton state by calling _HEALTH.__init__(), which
is fragile; add a dedicated reset method on the EvalHealth class (e.g.,
EvalHealth.reset(self)) that acquires the existing _lock and clears internal
state (_skip_counts reset over SkipReason, _producer_failures emptied,
_last_tick set to None), then change the test to call _HEALTH.reset() instead of
_HEALTH.__init__(); reference EvalHealth, _HEALTH, reset, _lock, _skip_counts,
SkipReason, _producer_failures, and _last_tick when making the change.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro Plus

Run ID: 319ef730-e4f2-41da-92c5-467129dac742

📥 Commits

Reviewing files that changed from the base of the PR and between cf82d81 and 0a7404a.

📒 Files selected for processing (11)
  • reflexio/server/api_endpoints/health_api.py
  • reflexio/server/services/agent_success_evaluation/_eval_health.py
  • reflexio/server/services/agent_success_evaluation/agent_success_evaluation_service.py
  • reflexio/server/services/agent_success_evaluation/delayed_group_evaluator.py
  • reflexio/server/services/agent_success_evaluation/group_evaluation_runner.py
  • tests/server/api_endpoints/test_health_api_eval.py
  • tests/server/services/agent_success_evaluation/__init__.py
  • tests/server/services/agent_success_evaluation/test_eligibility_logging.py
  • tests/server/services/agent_success_evaluation/test_eval_health.py
  • tests/server/services/agent_success_evaluation/test_save_retry.py
  • tests/server/services/agent_success_evaluation/test_scheduler_tick.py

The agentic extraction backend hit `return result` before reaching the
scheduler invocation in the classic path, so no AgentSuccessEvaluationResult
records were ever produced for any org running with extraction_backend=agentic.
This was the silent root cause of empty /evaluations tiles observed by
operators.

Extracts the scheduler invocation into a `_schedule_group_evaluation_if_needed`
helper and calls it from BOTH the agentic and classic code paths. Regression
test verifies the helper schedules with key=(org_id, user_id, session_id) when
the session_id is set, and short-circuits cleanly when it is not.

Verified live: published 10 sessions against a backend with the fix, observed
"Scheduled group evaluation for key=..." followed by "Firing group evaluation"
and "Running group evaluation for session=..." in the logs. /healthz/eval
transitioned from liveness=red to green with last_tick_monotonic populated.
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
tests/server/services/test_generation_service_scheduling.py (1)

70-75: ⚡ Quick win

Assert callback behavior, not just callability.

The current assertion allows regressions where the callback is callable but wired with wrong captured args. Execute the callback and assert run_group_evaluation receives the expected values.

Proposed test hardening
 def test_schedules_with_correct_key_when_session_id_present(
     service: GenerationService, monkeypatch: pytest.MonkeyPatch
 ) -> None:
@@
     scheduler.schedule.assert_called_once()
     call_args = scheduler.schedule.call_args
     key = call_args[0][0] if call_args[0] else call_args.kwargs.get("key")
     callback = call_args[0][1] if len(call_args[0]) > 1 else call_args.kwargs.get("callback")
     assert key == ("org_test", "user_test", "sess_42")
     assert callable(callback)
+
+    captured: dict[str, object] = {}
+
+    def fake_run_group_evaluation(
+        *,
+        org_id: str,
+        user_id: str,
+        session_id: str,
+        agent_version: str,
+        source: str | None,
+        request_context: object,
+        llm_client: object,
+    ) -> None:
+        captured.update(
+            {
+                "org_id": org_id,
+                "user_id": user_id,
+                "session_id": session_id,
+                "agent_version": agent_version,
+                "source": source,
+                "request_context": request_context,
+                "llm_client": llm_client,
+            }
+        )
+
+    monkeypatch.setattr(
+        "reflexio.server.services.generation_service.run_group_evaluation",
+        fake_run_group_evaluation,
+    )
+
+    callback()
+    assert captured == {
+        "org_id": "org_test",
+        "user_id": "user_test",
+        "session_id": "sess_42",
+        "agent_version": "v_test",
+        "source": "ide",
+        "request_context": service.request_context,
+        "llm_client": service.client,
+    }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/server/services/test_generation_service_scheduling.py` around lines 70
- 75, The test currently only asserts the scheduled callback is callable; change
it to execute the captured callback and assert it triggers run_group_evaluation
with the expected arguments: after obtaining call_args, key and callback (from
scheduler.schedule.call_args), call callback() (or await it if it’s async) and
then assert run_group_evaluation.assert_called_once_with("org_test",
"user_test", "sess_42", any_other_expected_params) (or use assert_called_once()
plus assert call args match the tuple), ensuring you reference
scheduler.schedule, call_args, key, callback and run_group_evaluation when
adding the execution and verification.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@tests/server/services/test_generation_service_scheduling.py`:
- Around line 70-75: The test currently only asserts the scheduled callback is
callable; change it to execute the captured callback and assert it triggers
run_group_evaluation with the expected arguments: after obtaining call_args, key
and callback (from scheduler.schedule.call_args), call callback() (or await it
if it’s async) and then assert
run_group_evaluation.assert_called_once_with("org_test", "user_test", "sess_42",
any_other_expected_params) (or use assert_called_once() plus assert call args
match the tuple), ensuring you reference scheduler.schedule, call_args, key,
callback and run_group_evaluation when adding the execution and verification.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro Plus

Run ID: 527d0444-914d-4502-a8d4-a7ae408499de

📥 Commits

Reviewing files that changed from the base of the PR and between 0a7404a and d4b3958.

📒 Files selected for processing (2)
  • reflexio/server/services/generation_service.py
  • tests/server/services/test_generation_service_scheduling.py

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant