fix(sync-service): don't suspend a consumer mid-transaction (#4501)#4503
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #4503 +/- ##
===========================================
+ Coverage 32.48% 56.45% +23.97%
===========================================
Files 216 358 +142
Lines 18368 39081 +20713
Branches 6478 10976 +4498
===========================================
+ Hits 5967 22064 +16097
- Misses 12369 16946 +4577
- Partials 32 71 +39
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
Claude Code ReviewSummaryThis PR fixes the production crash What's Working Well
Issues FoundCritical (Must Fix)None. Important (Should Fix)Defense in depth: the File: In iteration 1 I flagged this but left open whether a consumer crash re-emits the begin (which would make it unreachable). I have now traced it, and it does not:
This PR correctly closes the most common trigger (idle-timeout suspend), and that is a legitimate, well-targeted fix for #4501. But the underlying fragility is pre-existing and remains for any non-suspend mid-txn death (transient storage error, supervisor restart, etc.). It is reasonable to keep this PR narrow and address the class separately, but it should not be lost — consider a follow-up that adds a defensive Suggestions (Nice to Have)
Issue ConformanceDirectly addresses #4501. The linked issue is a Sentry crash report with a clear stacktrace at Previous Review StatusIteration 1 → 2:
The functional fix itself is unchanged and remains correct — this is a ✅ to merge for resolving #4501, with the defense-in-depth item recommended as a follow-up. Review iteration: 2 | 2026-06-08 |
A consumer could suspend on its idle timeout while holding a pending_txn for an in-flight multi-fragment transaction, dropping that state. When a later fragment of the transaction arrived, a fresh consumer received a has_begin?: false fragment with pending_txn=nil and crashed in process_txn_fragment/2 (KeyError on :consider_flushed?). Guard consumer_can_suspend?/1 on is_nil(pending_txn) so the consumer hibernates instead and suspends only once the transaction completes. Fixes #4501 Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
e5903b3 to
16b7617
Compare
## Summary Adds a `total_processing_time` attribute to the `pg_txn.replication_client.transaction_received` span, set on the commit fragment. It records the **wall-clock time taken to process all fragments of a single transaction** — from when the begin was received to when the commit fragment finishes processing. Today our spans only measure per-fragment *processing* time (~ms). They can't tell us how long a transaction's fragments are smeared across in wall-clock terms — which is the quantity that determines whether a shape consumer can idle past its suspend threshold mid-transaction (see #4501 / #4503). Unlike `receive_lag` — which is anchored on the Postgres commit timestamp and measures end-to-end delivery lag, from when Postgres committed the transaction to when Electric finished processing it — `total_processing_time` is anchored entirely within Electric: it spans receipt of the begin fragment to completion of the commit fragment. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Oleksii Sholik <oleksii@sholik.dev> Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Summary
Fixes #4501 —
KeyError: key :consider_flushed? not found in: nilinElectric.Shapes.Consumer.process_txn_fragment/2.A shape consumer could suspend (terminate to reclaim memory) on its idle timeout while still holding a
pending_txnfor an in-flight multi-fragment transaction. The producer'sEventRoutertracks "this shape already saw the begin for the current xid" keyed byshape_handle, independently of consumer liveness — so when a later fragment of that transaction arrived,ConsumerRegistrystarted a fresh consumer and delivered ahas_begin?: falsefragment to it. The fresh consumer haspending_txn: nil, soprocess_txn_fragment/2dereferencednilattxn.consider_flushed?.Fix
consumer_can_suspend?/1now also requiresis_nil(state.pending_txn). A consumer that is mid-transaction hibernates instead of suspending, and only suspends once the transaction completes and it goes idle again.🤖 Generated with Claude Code