diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8779fef34..e463ad6a5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -158,6 +158,8 @@ jobs: run: bash integration/js/pg_tests/run.sh - name: Ruby run: bash integration/ruby/run.sh + - name: Prepared statements (full) + run: bash integration/prepared_statements_full/run.sh - name: Java run: bash integration/java/run.sh - name: Mirror diff --git a/docs/issues/PROTOCOL_OUT_OF_SYNC.md b/docs/issues/PROTOCOL_OUT_OF_SYNC.md new file mode 100644 index 000000000..42cf0ae46 --- /dev/null +++ b/docs/issues/PROTOCOL_OUT_OF_SYNC.md @@ -0,0 +1,442 @@ +# ProtocolOutOfSync — Known Root Causes + +`Error::ProtocolOutOfSync` fires when `ProtocolState`'s expected-response queue diverges from what +the backend actually sends. The catch site is `server.rs:394`; the connection is permanently marked +`State::Error` and discarded from the pool. + +**Queue mechanics** (`pgdog/src/backend/protocol/state.rs`). `handle()` pushes one `ExecutionItem` +per anticipated response before forwarding any client message. As server bytes arrive, `forward()` +calls `action(code)` which pops the queue front and checks the match. Two conditions raise +`ProtocolOutOfSync`: + +- **Empty queue** (`state.rs:168`) — a tracked message arrives but nothing was expected. +- **Ignore mismatch** (`state.rs:188–191`) — queue front is an `Ignore` slot but the server sent a + different code. + +--- + +## ✅ Issue 0 — `extended` flag never resets; `SET`-batch error poisons connection + +**Severity:** High — triggered by normal operation when `query_parser = "on"` and any parameterised +query has run on the connection. + +**Commit:** `54c9d3a942bdac55873f17d2af74ef1dfedb3f4a` + +**Location:** `pgdog/src/backend/protocol/state.rs`, `action()`, Error and ReadyForQuery arms. + +### Description + +When `query_parser = "on"`, pgdog caches every `SET` issued by a client so it can replay them +on whichever backend connection services the next request. The replay calls `execute_batch`, +which queues one `Code(RFQ)` per command — two cached `SET`s produce `[Code(RFQ), Code(RFQ)]`. + +Two independent defects caused `ProtocolOutOfSync` when a cached `SET` produced a server Error: + +**Defect 1 — `extended` one-way latch.** `ProtocolState.extended` was set on the first +parameterised (`Parse`/`Bind`) query and never cleared. The Error arm unconditionally set +`out_of_sync = true` when `extended == true`, permanently poisoning any connection that had +ever used extended protocol — even for unrelated simple-query errors. + +**Defect 2 — destructive queue clear.** The old Error arm (`pop_back + clear + push_back(RFQ)`) +was designed for a single-query queue. With two `Code(RFQ)` slots it collapsed to one: +the first `Z` consumed the only remaining slot, the second SET's `CommandComplete` hit empty +→ `ProtocolOutOfSync` before the client saw the actual SET error. + +Both defects are independent. Defect 2 fires on any two-command batch regardless of `extended`; +Defect 1 additionally poisons the connection via `out_of_sync = true`. + +### Conditions required + +All four must hold simultaneously: + +1. `query_parser = "on"` — pgdog tracks `SET` commands and replays them via `execute_batch` + when syncing session state onto a checked-out connection. +2. Two or more `SET` commands in the cached session state, at least one with an invalid value + (e.g. `SET lock_timeout TO sdf`) — produces two `Code(RFQ)` slots in the queue and causes + PostgreSQL to return an Error on the first replayed `SET`. +3. A prior extended-protocol query (`Parse + Bind + Execute + Sync`) on the same connection + — sets `extended = true`, arming the `out_of_sync = true` branch on any subsequent error + (Defect 1). +4. Any subsequent query after the failed `execute_batch` — hits a connection whose + `out_of_sync` is `true`, causing `PG::ConnectionBad` at the client. + +### Reproduction + +```ruby +# integration/ruby/pg_spec.rb +it 'out of sync' do + conn = PG.connect(..., application_name: '') + conn.exec_params 'SELECT $1', [1] # sets extended = true + conn.exec "SELECT 1" + conn.exec "SET lock_timeout TO sdfs" # invalid; query_parser will replay this + conn.exec "SET statement_timeout TO '1s'" + expect { conn.exec 'SELECT 1' }.to raise_error(/invalid value for parameter/) +end +``` + +```sh +cd integration/ruby && bundle exec rspec pg_spec.rb -e 'out of sync' +``` + +### Tests + +**State-machine unit test (`state.rs`, no backend needed)** + +- **`test_pipelined_simple_query_error_keeps_next_query_response`** — queues two `Code(RFQ)` slots, + fires `action('E')`, asserts the second slot survives, walks `Z`/`C`/`Z` to completion. + +**Server integration tests (`server.rs`, require PostgreSQL)** + +- **`test_execute_batch_simple_query_error_then_success`** — `execute_batch(["syntax error", "SELECT 1"])` + returns `ExecutionError` and leaves `server.done() == true`. +- **`test_out_of_sync_regression`** — exact production sequence: `Parse + Bind + Execute + Sync`, + drain five responses, then `execute_batch` with invalid + valid `SET`; asserts `ExecutionError` + and `server.done() == true`. + +```sh +cargo test -p pgdog test_execute_batch_simple_query_error_then_success +cargo test -p pgdog test_out_of_sync_regression +``` + +### Fix + +Two changes in `state.rs` (commit `54c9d3a9`): + +**Defect 2:** A `!self.extended` fast-path in the Error arm returns `Action::Forward` immediately +when the queue front is `Code(RFQ)`, leaving all slots untouched. For the extended path, +a `rfq_pos` scan drains only items before the first `Code(RFQ)` boundary — never consuming +slots belonging to subsequent queries. + +**Defect 1:** After every `ReadyForQuery`, `extended` is recomputed from remaining queue items +(`self.queue.iter().any(ExecutionItem::extended)`). When the pipeline drains, `extended` resets +to `false`, breaking the one-way latch. The `ExecutionItem::extended()` helper in `state.rs` +enables this scan. +--- + +## ✅ Issue 1 — Failed `Prepare` orphans the EXECUTE ReadyForQuery + +**Severity:** High — triggered by normal server behaviour; no client misbehaviour required. + +**Location:** `pgdog/src/backend/prepared_statements.rs`, `ProtocolMessage::Prepare` arm; +`pgdog/src/backend/protocol/state.rs`, Error handler. + +### Description + +When pgdog injects a `PREPARE` to rewrite a simple-query `EXECUTE` and that `PREPARE` fails on the +server, the Error handler incorrectly clears the queue. The subsequent `ReadyForQuery` from the now- +orphaned `EXECUTE` hits an empty queue and raises `ProtocolOutOfSync`. + +### Code path + +The simple-query rewriter turns `EXECUTE stmt_name(args)` into two prepended messages, each handled +independently by `handle()`. After both calls the queue is: + +``` +[Ignore(ExecutionCompleted), Ignore(ReadyForQuery), Code(ReadyForQuery)] + ↑────────── handle(Prepare) ──────────↑ ↑── handle(Query) ──↑ +``` + +If the injected `PREPARE` fails on the server: + +| Step | Server sends | Error handler action | Queue after | +|---|---|---|---| +| 1 | `Error` for PREPARE | simple-query path (`!extended`): `pop_front` loop drains `Ignore(C)` and `Ignore(Z)`; stops at `Code(RFQ)` | `[Code(RFQ)]` | +| 2 | `ReadyForQuery` for PREPARE | `pop_front` → `Code(RFQ)` consumed | **empty** | +| 3 | `Error` for EXECUTE (statement absent) | simple-query path: queue empty; loop is a no-op | **empty** | +| 4 | `ReadyForQuery` for EXECUTE | `pop_front` on empty → **ProtocolOutOfSync** | — | + +The simple-query Error handler (`state.rs:157–167`, the `!self.extended` arm) pops items from the +front of the queue until it reaches a `Code(ReadyForQuery)`. When the injected PREPARE fails this +drains both `Ignore` slots, leaving `Code(ReadyForQuery)` as the sole item. The subsequent `Z` for +the PREPARE consumes that slot, emptying the queue. The EXECUTE sub-request's `Error` then arrives +against an empty queue — the pop-front loop is a no-op — and its `ReadyForQuery` fires +`ProtocolOutOfSync`. + +Under high concurrency this becomes near-deterministic: the pool fast-path (`Guard::drop` → +`checkin` → `put`) hands a connection directly to a waiting client with no healthcheck, no idle +time, and no opportunity to drain the kernel socket buffer. The next query on that client consumes +the stale EXECUTE `Error + ReadyForQuery`, producing `ProtocolOutOfSync`. + +### Reproduction (historical) + +1. Connect to pgdog with session or transaction pooling. +2. Issue a simple-query `EXECUTE` for a statement that will fail to prepare (schema mismatch, syntax + error, or stale local cache with a duplicate name). +3. Issue any subsequent query on the same connection. +4. The second query fails with `PG::ConnectionBad` / `protocol is out of sync`. + +```sh +cd integration/prepared_statements_full && bash run.sh +``` + +### Tests + +Both tests live in `integration/prepared_statements_full/protocol_out_of_sync_spec.rb`. + +| Test | Pool mode | What it guards against | +|---|---|---| +| 1 | session | Orphaned EXECUTE RFQ hits empty queue on the very next query → `PG::ConnectionBad` | +| 2 | transaction | Stale EXECUTE `Error` served to next borrower (pool_size=1) → `PG::InvalidSqlStatementName` | + +- **Test 1 — Session mode.** + Session-pooled user (`pgdog_session`) pinned to one backend. After the failed prepare-inject, + `SELECT 1` is issued on the same connection. The orphaned EXECUTE `ReadyForQuery` hits an empty + queue, raising `ProtocolOutOfSync`. Client sees `PG::ConnectionBad`. + +- **Test 2 — Transaction mode (pool_size=1).** + `pgdog_tx_single` (transaction mode, one backend). After the EXECUTE error the connection returns + to the pool: the queue is empty and `out_of_sync` is false, so it appears clean. With only one + backend the stale EXECUTE `Error + ReadyForQuery` remain in the TCP buffer. The next query + (`SELECT 1`) reads the stale `Error` as its response → `PG::InvalidSqlStatementName`. + +### Fix + +Error handler in `state.rs`, `ExecutionCode::Error` arm. See inline comments for full detail. + +On error, find the first `Code(ReadyForQuery)` in the queue (the client's RFQ boundary), drain +everything before it, count the `Ignore(RFQ)` slots in the drained portion, and prepend one +`[Ignore(RFQ), Ignore(Error)]` pair per slot. A separate fast-path at the top of the arm handles +the case where the queue front is already `Ignore(Error)` — subsequent errors from the same +injected sub-request — by popping and returning `Action::Ignore` directly. + +See also: `test_injected_prepare_error_full_lifecycle` in `state.rs`. +--- + +## ✅ Issue 2 — Double `action()` call in `forward()` for server CopyDone + +**Severity:** Medium — requires the client to omit a trailing `Sync`. + +**Location:** `pgdog/src/backend/prepared_statements.rs`, `forward()`. + +### Description + +`forward()` called `state.action(code)` unconditionally near the top of the function, then called +it a second time inside the `'c'` (CopyDone) match arm. Without a `Code(ReadyForQuery)` backstop in +the queue the second call hit an empty queue and raised `ProtocolOutOfSync`. + +### Code path + +Normal path (safe): `Code(ReadyForQuery)` is always in the queue. `action('Z')` pushes it back +rather than consuming it, making the double call idempotent. + +Unsafe path — client sends `Parse + Bind + Execute + Flush` (no `Sync`). `handle()` builds: + +``` +[Code(ParseComplete), Code(BindComplete), Code(ExecutionCompleted)] +``` + +No `Code(ReadyForQuery)` is added. When the server responded with CopyDone: + +``` +First action('c'): pops Code(ExecutionCompleted) — consumed +Second action('c'): empty queue → ProtocolOutOfSync +``` + +### Reproduction (historical) + +Not triggerable via the `pg` gem or any libpq-based driver — libpq always appends `Sync` after +`Execute`. Required sending raw protocol messages directly. + +```sh +cargo test -p pgdog --lib -- test_copydone_double_action_oos_without_rfq_backstop +``` + +### Tests + +**State-machine unit tests (`state.rs`, no backend needed)** + +- **`test_copydone_double_action_safe_with_rfq_backstop`** — queue `[Code(Copy), Code(ReadyForQuery)]`; + two `action('c')` calls both succeed; RFQ slot is pushed back and survives. +- **`test_copydone_double_action_oos_without_rfq_backstop`** — documents the raw state-machine + invariant: calling `action('c')` twice with no RFQ backstop still causes `ProtocolOutOfSync` + directly on the state machine. `forward()` no longer makes this second call; this path is + unreachable through normal protocol flow. Test is retained to pin the underlying invariant. + +**Server-level tests (`server.rs`, require PostgreSQL)** + +- **`test_copydone_single_action_without_sync`** — `Parse + Bind + Execute + Flush` (no Sync); + reads ParseComplete, BindComplete, CopyOutResponse, CopyData ×2, then asserts CopyDone is + forwarded successfully. The trailing CommandComplete then hits an empty queue (no RFQ backstop) + and raises `ProtocolOutOfSync` — that is the correct remaining behavior with no `Sync`. +- **`test_copydone_double_action_safe_with_sync`** — same pipeline with `Sync`; full sequence + completes without error; asserts `server.done()`. + +```sh +cargo test -p pgdog --lib -- test_copydone_double_action +cargo test -p pgdog -- test_copydone +``` + +### Fix + +Removed the redundant `self.state.action(code)?` from the `'c'` arm in `forward()`. The call at +the top of the function already advances the state machine for CopyDone; the arm body is now empty. + + +## ✅ Issue 4 — `extended` flag set at enqueue time, not at processing time + +**Severity:** Low in production — not triggerable through current code paths. Dangerous if triggered: +a write-query executed on the backend may never be confirmed to the client. + +**Location:** `pgdog/src/backend/protocol/state.rs`, `add()` / `add_ignore()` and the +`ReadyForQuery` recalculation in `action()`. + +### Description + +Two related bugs in how `extended` is maintained: + +**Bug A — `add()` sets the flag too early.** +`add()` and `add_ignore()` update `extended` the moment any `ParseComplete` or `BindComplete` is +enqueued, regardless of where in the queue that item sits: + +```rust +self.extended = self.extended || code.extended(); +``` + +If a simple query occupies the head of the queue and an extended query is enqueued behind it, +`extended` flips to `true` before any message has been processed. When the simple query then errors, +the Error handler sees `extended == true` and takes the extended path: sets `out_of_sync = true` and +clears the queue — destroying the extended query's pending entries. + +**Bug B — `ReadyForQuery` recalculation scans the entire remaining queue.** +After consuming a `ReadyForQuery`, the flag is recalculated as: + +```rust +self.extended = self.queue.iter().any(ExecutionItem::extended); +``` + +This scans all remaining entries, not just those belonging to the next sub-request. If two simple +queries precede an extended query, after the first simple query's `ReadyForQuery` is consumed the +scan finds the extended items two hops away and sets `extended = true`. The second simple query's +error then incorrectly takes the extended path. + +### Why it does not affect production + +`ClientRequest::spliced()` never places a simple query and an extended query in the same +`ProtocolState` queue. Each sub-request — whether simple or extended — gets its own fresh state. +The flag therefore always starts at `false` for a simple sub-request and becomes `true` only for +sub-requests that actually contain `Parse`/`Bind` messages. + +### Why it is dangerous if triggered + +If a simple query and an extended query share one queue and the simple query errors: +1. The extended query's pending entries are destroyed. +2. pgdog forwards the extended query's wire messages to the backend before reading any responses. +3. When the backend's response for the extended query arrives, pgdog hits `ProtocolOutOfSync` and + discards the connection. +4. For a write query (`INSERT`/`UPDATE`/`DELETE`) the statement executed on the backend but the + client receives no confirmation — silent data inconsistency. + +### Reproduction (historical) + +Not reproducible through normal pool operation. Reproduced at the `server.rs` level by bypassing +the splicing layer: + +```sh +cargo test -p pgdog test_simple_query_error_before_queued_extended_request_does_not_set_out_of_sync +cargo test -p pgdog test_simple_query_error_after_rfq_before_extended_does_not_set_out_of_sync +cargo test -p pgdog test_simple_query_error_before_extended_query_in_same_batch +``` + +The integration test `extended query succeeds after preceding simple query error` in +`integration/ruby/protocol_out_of_sync_spec.rb` passes — pgdog's splicing prevents +the bug from reaching production — and serves as a regression guard. + +### Tests + +| Test | Level | Status | What it covers | +|---|---|---|---| +| `test_simple_query_error_before_queued_extended_request_does_not_set_out_of_sync` | unit (`state.rs`) | **passes** | Bug A: flag set by future enqueue poisons simple error path | +| `test_simple_query_error_after_rfq_before_extended_does_not_set_out_of_sync` | unit (`state.rs`) | **passes** | Bug B: full-queue scan sets flag for wrong sub-request | +| `test_simple_query_error_before_extended_query_in_same_batch` | server (`server.rs`) | **passes** | end-to-end: extended query survives simple error in same batch | +| `extended query succeeds after preceding simple query error` | integration (Ruby) | passes | regression guard via normal pgdog path | + +```sh +cargo test -p pgdog --lib -- test_simple_query_error_before_queued_extended_request_does_not_set_out_of_sync +cargo test -p pgdog --lib -- test_simple_query_error_after_rfq_before_extended_does_not_set_out_of_sync +``` + +### Fix + +The `extended` field was removed entirely from `ProtocolState`. Both bugs stemmed from maintaining +it as a cached flag — Bug A by setting it too early, Bug B by recomputing it too broadly. +Removing it eliminates both defects at the root. + +`out_of_sync` is now set unconditionally for all errors (simple or extended) and cleared +unconditionally on every `ReadyForQuery`. The distinction is irrelevant: after any error the +connection must wait for the peer's RFQ before being reused, and that RFQ always arrives as the +next terminal message in the same sub-request. + +For both simple and extended errors the sequence is: + +``` +Error → out_of_sync = true +ReadyForQuery → out_of_sync = false (connection clean; extended entries beyond this boundary intact) +``` + +--- +## ✅ Issue 5 — Error in first pipelined request clears subsequent requests' queue entries + +**Severity:** Low — not reproducible through production code paths; included for completeness and as a guard against future refactoring. + +**Location:** `pgdog/src/backend/protocol/state.rs`, Error handler (`action()`, extended branch). + +### Description + +When a client pipelines multiple extended-query sequences — each terminated by its own `Sync` — and the first sequence errors, the Error handler calls `queue.clear()` on the entire queue. This destroys the pending entries for all subsequent sequences, causing `ProtocolOutOfSync` when their backend responses arrive. + +### Code path + +Suppose three sequences share one `ProtocolState` queue: + +``` +[1,2,C,Z, 1,2,C,Z, 1,2,C,Z] + ^─seq1─^ ^─seq2─^ ^─seq3─^ +``` + +After seq1 consumes ParseComplete (`1`) and BindComplete (`2`), the queue is: + +``` +[C, Z, 1, 2, C, Z, 1, 2, C, Z] +``` + +Seq1 then errors: + +| Step | Action | Queue after | +|---|---|---| +| Error arm fires | `pop_back()` → seq3's `Z`; `clear()` removes `[C,Z,1,2,C,Z,1,2,C]`; `push_back(Z)` | `[Z]` | +| seq1 ReadyForQuery arrives | pops `Z` normally | **empty** | +| seq2 ParseComplete arrives | `pop_front()` on empty queue | **ProtocolOutOfSync** | + +### Why it does not affect production + +`ClientRequest::spliced()` in `pgdog/src/frontend/client_request.rs` splits every multi-Execute pipeline into sub-requests at `Execute` boundaries, placing each `Sync` in its own standalone sub-request. Sub-requests are processed sequentially: each one is sent and fully drained before the next one is enqueued. The `ProtocolState` queue therefore only ever holds the entries for a single sync group at a time, so `queue.clear()` on error only ever sees that one group's entries. + +This property is a load-bearing invariant. If `spliced()` is ever changed — for example to batch multiple sync groups into one send — this bug will surface immediately. + +### Reproduction (historical) + +Not reproducible through normal pool operation. Reproduced by loading all three sync groups into one `ProtocolState` directly: + +```sh +cargo test -p pgdog test_pipeline_single_queue_error_only_clears_failing_sync_group +cargo test -p pgdog test_pipelined_multiple_syncs_first_fails +``` + +Both tests now **pass**. + +### Tests + +| Test | Level | Status | What it covers | +|---|---|---|---| +| `test_pipeline_multi_sync_error_in_seq1_does_not_affect_seq2_seq3` | unit (`state.rs`) | **passes** | error in seq1 leaves seq2 and seq3 intact | +| `test_pipeline_single_queue_error_only_clears_failing_sync_group` | unit (`state.rs`) | **passes** | only the failing sync group's entries are drained | +| `test_pipelined_multiple_syncs_first_fails` | server (`server.rs`) | **passes** | end-to-end: seq2 completes after seq1 errors | + +### Fix + +The Error arm drains only the failing sync group's entries — from the current queue head up to and +including its own `ReadyForQuery` boundary — using `drain(..rfq_pos)`, leaving everything beyond +intact. A fallback `queue.clear()` applies only when no `Code(ReadyForQuery)` exists in the queue +at all (nothing to preserve in that case). + +The old `pop_back + clear + push_back(RFQ)` pattern that caused the bug was removed. \ No newline at end of file diff --git a/integration/prepared_statements_full/Gemfile b/integration/prepared_statements_full/Gemfile new file mode 100644 index 000000000..6f147c82f --- /dev/null +++ b/integration/prepared_statements_full/Gemfile @@ -0,0 +1,8 @@ +# frozen_string_literal: true + +source 'https://rubygems.org' +gem 'pg' +gem 'rails' +gem 'rspec', '~> 3.4' +gem 'rubocop' +gem 'toxiproxy' diff --git a/integration/prepared_statements_full/Gemfile.lock b/integration/prepared_statements_full/Gemfile.lock new file mode 100644 index 000000000..a9e17f4d2 --- /dev/null +++ b/integration/prepared_statements_full/Gemfile.lock @@ -0,0 +1,276 @@ +GEM + remote: https://rubygems.org/ + specs: + action_text-trix (2.1.18) + railties + actioncable (8.1.3) + actionpack (= 8.1.3) + activesupport (= 8.1.3) + nio4r (~> 2.0) + websocket-driver (>= 0.6.1) + zeitwerk (~> 2.6) + actionmailbox (8.1.3) + actionpack (= 8.1.3) + activejob (= 8.1.3) + activerecord (= 8.1.3) + activestorage (= 8.1.3) + activesupport (= 8.1.3) + mail (>= 2.8.0) + actionmailer (8.1.3) + actionpack (= 8.1.3) + actionview (= 8.1.3) + activejob (= 8.1.3) + activesupport (= 8.1.3) + mail (>= 2.8.0) + rails-dom-testing (~> 2.2) + actionpack (8.1.3) + actionview (= 8.1.3) + activesupport (= 8.1.3) + nokogiri (>= 1.8.5) + rack (>= 2.2.4) + rack-session (>= 1.0.1) + rack-test (>= 0.6.3) + rails-dom-testing (~> 2.2) + rails-html-sanitizer (~> 1.6) + useragent (~> 0.16) + actiontext (8.1.3) + action_text-trix (~> 2.1.15) + actionpack (= 8.1.3) + activerecord (= 8.1.3) + activestorage (= 8.1.3) + activesupport (= 8.1.3) + globalid (>= 0.6.0) + nokogiri (>= 1.8.5) + actionview (8.1.3) + activesupport (= 8.1.3) + builder (~> 3.1) + erubi (~> 1.11) + rails-dom-testing (~> 2.2) + rails-html-sanitizer (~> 1.6) + activejob (8.1.3) + activesupport (= 8.1.3) + globalid (>= 0.3.6) + activemodel (8.1.3) + activesupport (= 8.1.3) + activerecord (8.1.3) + activemodel (= 8.1.3) + activesupport (= 8.1.3) + timeout (>= 0.4.0) + activestorage (8.1.3) + actionpack (= 8.1.3) + activejob (= 8.1.3) + activerecord (= 8.1.3) + activesupport (= 8.1.3) + marcel (~> 1.0) + activesupport (8.1.3) + base64 + bigdecimal + concurrent-ruby (~> 1.0, >= 1.3.1) + connection_pool (>= 2.2.5) + drb + i18n (>= 1.6, < 2) + json + logger (>= 1.4.2) + minitest (>= 5.1) + securerandom (>= 0.3) + tzinfo (~> 2.0, >= 2.0.5) + uri (>= 0.13.1) + ast (2.4.3) + base64 (0.3.0) + bigdecimal (4.1.1) + builder (3.3.0) + concurrent-ruby (1.3.6) + connection_pool (3.0.2) + crass (1.0.6) + date (3.5.1) + diff-lcs (1.6.2) + drb (2.2.3) + erb (6.0.2) + erubi (1.13.1) + globalid (1.3.0) + activesupport (>= 6.1) + i18n (1.14.8) + concurrent-ruby (~> 1.0) + io-console (0.8.2) + irb (1.17.0) + pp (>= 0.6.0) + prism (>= 1.3.0) + rdoc (>= 4.0.0) + reline (>= 0.4.2) + json (2.19.3) + language_server-protocol (3.17.0.5) + lint_roller (1.1.0) + logger (1.7.0) + loofah (2.25.1) + crass (~> 1.0.2) + nokogiri (>= 1.12.0) + mail (2.9.0) + logger + mini_mime (>= 0.1.1) + net-imap + net-pop + net-smtp + marcel (1.1.0) + mini_mime (1.1.5) + minitest (6.0.3) + drb (~> 2.0) + prism (~> 1.5) + net-imap (0.6.3) + date + net-protocol + net-pop (0.1.2) + net-protocol + net-protocol (0.2.2) + timeout + net-smtp (0.5.1) + net-protocol + nio4r (2.7.5) + nokogiri (1.19.2-aarch64-linux-gnu) + racc (~> 1.4) + nokogiri (1.19.2-aarch64-linux-musl) + racc (~> 1.4) + nokogiri (1.19.2-arm-linux-gnu) + racc (~> 1.4) + nokogiri (1.19.2-arm-linux-musl) + racc (~> 1.4) + nokogiri (1.19.2-arm64-darwin) + racc (~> 1.4) + nokogiri (1.19.2-x86_64-darwin) + racc (~> 1.4) + nokogiri (1.19.2-x86_64-linux-gnu) + racc (~> 1.4) + nokogiri (1.19.2-x86_64-linux-musl) + racc (~> 1.4) + parallel (1.28.0) + parser (3.3.11.1) + ast (~> 2.4.1) + racc + pg (1.6.3) + pg (1.6.3-aarch64-linux) + pg (1.6.3-aarch64-linux-musl) + pg (1.6.3-arm64-darwin) + pg (1.6.3-x86_64-darwin) + pg (1.6.3-x86_64-linux) + pg (1.6.3-x86_64-linux-musl) + pp (0.6.3) + prettyprint + prettyprint (0.2.0) + prism (1.9.0) + psych (5.3.1) + date + stringio + racc (1.8.1) + rack (3.2.6) + rack-session (2.1.1) + base64 (>= 0.1.0) + rack (>= 3.0.0) + rack-test (2.2.0) + rack (>= 1.3) + rackup (2.3.1) + rack (>= 3) + rails (8.1.3) + actioncable (= 8.1.3) + actionmailbox (= 8.1.3) + actionmailer (= 8.1.3) + actionpack (= 8.1.3) + actiontext (= 8.1.3) + actionview (= 8.1.3) + activejob (= 8.1.3) + activemodel (= 8.1.3) + activerecord (= 8.1.3) + activestorage (= 8.1.3) + activesupport (= 8.1.3) + bundler (>= 1.15.0) + railties (= 8.1.3) + rails-dom-testing (2.3.0) + activesupport (>= 5.0.0) + minitest + nokogiri (>= 1.6) + rails-html-sanitizer (1.7.0) + loofah (~> 2.25) + nokogiri (>= 1.15.7, != 1.16.7, != 1.16.6, != 1.16.5, != 1.16.4, != 1.16.3, != 1.16.2, != 1.16.1, != 1.16.0.rc1, != 1.16.0) + railties (8.1.3) + actionpack (= 8.1.3) + activesupport (= 8.1.3) + irb (~> 1.13) + rackup (>= 1.0.0) + rake (>= 12.2) + thor (~> 1.0, >= 1.2.2) + tsort (>= 0.2) + zeitwerk (~> 2.6) + rainbow (3.1.1) + rake (13.3.1) + rdoc (7.2.0) + erb + psych (>= 4.0.0) + tsort + regexp_parser (2.12.0) + reline (0.6.3) + io-console (~> 0.5) + rspec (3.13.2) + rspec-core (~> 3.13.0) + rspec-expectations (~> 3.13.0) + rspec-mocks (~> 3.13.0) + rspec-core (3.13.6) + rspec-support (~> 3.13.0) + rspec-expectations (3.13.5) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.13.0) + rspec-mocks (3.13.8) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.13.0) + rspec-support (3.13.7) + rubocop (1.86.0) + json (~> 2.3) + language_server-protocol (~> 3.17.0.2) + lint_roller (~> 1.1.0) + parallel (~> 1.10) + parser (>= 3.3.0.2) + rainbow (>= 2.2.2, < 4.0) + regexp_parser (>= 2.9.3, < 3.0) + rubocop-ast (>= 1.49.0, < 2.0) + ruby-progressbar (~> 1.7) + unicode-display_width (>= 2.4.0, < 4.0) + rubocop-ast (1.49.1) + parser (>= 3.3.7.2) + prism (~> 1.7) + ruby-progressbar (1.13.0) + securerandom (0.4.1) + stringio (3.2.0) + thor (1.5.0) + timeout (0.6.1) + toxiproxy (2.0.2) + tsort (0.2.0) + tzinfo (2.0.6) + concurrent-ruby (~> 1.0) + unicode-display_width (3.2.0) + unicode-emoji (~> 4.1) + unicode-emoji (4.2.0) + uri (1.1.1) + useragent (0.16.11) + websocket-driver (0.8.0) + base64 + websocket-extensions (>= 0.1.0) + websocket-extensions (0.1.5) + zeitwerk (2.7.5) + +PLATFORMS + aarch64-linux + aarch64-linux-gnu + aarch64-linux-musl + arm-linux-gnu + arm-linux-musl + arm64-darwin + x86_64-darwin + x86_64-linux-gnu + x86_64-linux-musl + +DEPENDENCIES + pg + rails + rspec (~> 3.4) + rubocop + toxiproxy + +BUNDLED WITH + 2.7.2 diff --git a/integration/prepared_statements_full/dev.sh b/integration/prepared_statements_full/dev.sh new file mode 100755 index 000000000..f36274590 --- /dev/null +++ b/integration/prepared_statements_full/dev.sh @@ -0,0 +1,12 @@ +#!/bin/bash +set -e +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) + +pushd ${SCRIPT_DIR} + +export GEM_HOME=~/.gem +mkdir -p ${GEM_HOME} +bundle install +bundle exec rspec *_spec.rb + +popd diff --git a/integration/prepared_statements_full/protocol_out_of_sync_spec.rb b/integration/prepared_statements_full/protocol_out_of_sync_spec.rb new file mode 100644 index 000000000..11e3a889a --- /dev/null +++ b/integration/prepared_statements_full/protocol_out_of_sync_spec.rb @@ -0,0 +1,56 @@ +# frozen_string_literal: true + +require_relative 'rspec_helper' + +# Triggers the failed-prepare / orphaned-EXECUTE scenario. +# +# When pgdog rewrites a simple-query EXECUTE it injects [Prepare, Query]. +# If the injected PREPARE fails, the outer EXECUTE sub-request (Error + +# ReadyForQuery) must be consumed internally — nothing stale left on the wire. +def trigger_prepare_inject_failure(conn, statement_name:) + # PREPARE fails — pgdog caches the statement name despite the error. + expect { conn.exec "PREPARE #{statement_name} AS SELECT 1 FROM __pgdog_nonexistent_table__" } + .to raise_error(PG::Error, /__pgdog_nonexistent_table__/) + + # EXECUTE triggers [Prepare, Query] injection; the re-injected PREPARE fails + # again. pgdog must drain the orphaned EXECUTE E+Z internally and surface + # only the application-visible error to the caller. + expect { conn.exec "EXECUTE #{statement_name}" } + .to raise_error(PG::Error, /__pgdog_nonexistent_table__/) +end + +describe 'protocol out of sync regressions' do + after do + ensure_done + end + + # Session mode: a failed prepare-inject must not leave stale bytes on the + # wire. The connection stays usable for the next query. + it 'connection remains usable after failed prepare-inject in session mode' do + conn = connect_pgdog(user: 'pgdog_session') + begin + trigger_prepare_inject_failure(conn, statement_name: 'pgdog_prepare_inject_session') + + result = conn.exec 'SELECT 1 AS alive' + expect(result.first['alive']).to eq('1') + ensure + conn.close unless conn.finished? + end + end + + # Transaction mode (pool_size=1): the backend connection is returned to the + # pool after each query. With a single backend any stale bytes not drained + # internally are visible to the very next borrower. The connection must be + # clean so the next query succeeds. + it 'connection remains usable after failed prepare-inject in transaction mode' do + conn = connect_pgdog(user: 'pgdog_tx_single') + begin + trigger_prepare_inject_failure(conn, statement_name: 'pgdog_prepare_inject_tx') + + result = conn.exec 'SELECT 1 AS alive' + expect(result.first['alive']).to eq('1') + ensure + conn.close unless conn.finished? + end + end +end diff --git a/integration/prepared_statements_full/rspec_helper.rb b/integration/prepared_statements_full/rspec_helper.rb new file mode 100644 index 000000000..782cd1146 --- /dev/null +++ b/integration/prepared_statements_full/rspec_helper.rb @@ -0,0 +1,106 @@ +# frozen_string_literal: true + +require 'active_record' +require 'rspec' +require 'pg' +require 'toxiproxy' + +def admin + PG.connect('postgres://admin:pgdog@127.0.0.1:6432/admin') +end + +def admin_exec(sql) + conn = admin + conn.exec sql +ensure + conn&.close +end + +def failover + PG.connect('postgres://pgdog:pgdog@127.0.0.1:6432/failover') +end + +def admin_stats(database, column = nil) + conn = admin + stats = conn.exec 'SHOW STATS' + conn.close + stats = stats.select { |item| item['database'] == database } + return stats.map { |item| item[column].to_i } unless column.nil? + + stats +end + +def ensure_done + deadline = Time.now + 2 + pools = [] + clients = [] + servers = [] + pg_clients = [] + current_client_id = nil + + loop do + conn = PG.connect(dbname: 'admin', user: 'admin', password: 'pgdog', port: 6432, host: '127.0.0.1') + begin + pools = conn.exec 'SHOW POOLS' + current_client_id = conn.backend_pid + clients = conn.exec 'SHOW CLIENTS' + servers = conn.exec 'SHOW SERVERS' + ensure + conn.close + end + + pg_conn = PG.connect(dbname: 'pgdog', user: 'pgdog', password: 'pgdog', port: 5432, host: '127.0.0.1') + begin + pg_clients = pg_conn.exec 'SELECT state FROM pg_stat_activity'\ + " WHERE datname IN ('pgdog', 'shard_0', 'shard_1')"\ + " AND backend_type = 'client backend'"\ + " AND query NOT LIKE '%pg_stat_activity%'" + ensure + pg_conn.close + end + + pools_ready = pools.all? do |pool| + pool['sv_active'] == '0' && pool['cl_waiting'] == '0' && pool['out_of_sync'] == '0' + end + clients_ready = clients.all? do |client| + client['id'].to_i == current_client_id || client['state'] == 'idle' + end + servers_ready = servers + .select { |server| server['application_name'] != 'PgDog Pub/Sub Listener' } + .all? { |server| server['state'] == 'idle' } + pg_clients_ready = pg_clients.all? { |client| client['state'] == 'idle' } + + break if pools_ready && clients_ready && servers_ready && pg_clients_ready + break if Time.now >= deadline + + sleep 0.05 + end + + pools.each do |pool| + expect(pool['sv_active']).to eq('0') + expect(pool['cl_waiting']).to eq('0') + expect(pool['out_of_sync']).to eq('0') + end + + clients.each do |client| + next if client['id'].to_i == current_client_id + expect(client['state']).to eq('idle') + end + + servers + .select do |server| + server['application_name'] != 'PgDog Pub/Sub Listener' + end + .each do |server| + expect(server['state']).to eq('idle') + end + + pg_clients.each do |client| + expect(client['state']).to eq('idle') + end +end + + +def connect_pgdog(user: 'pgdog') + PG.connect(dbname: 'pgdog', user:, password: 'pgdog', port: 6432, host: '127.0.0.1') +end \ No newline at end of file diff --git a/integration/prepared_statements_full/run.sh b/integration/prepared_statements_full/run.sh new file mode 100755 index 000000000..7efdf24d1 --- /dev/null +++ b/integration/prepared_statements_full/run.sh @@ -0,0 +1,11 @@ +#!/bin/bash +set -e +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +source ${SCRIPT_DIR}/../common.sh + +run_pgdog "integration/prepared_statements_full" +wait_for_pgdog + +bash ${SCRIPT_DIR}/dev.sh + +stop_pgdog diff --git a/integration/prepared_statements_full/users.toml b/integration/prepared_statements_full/users.toml index 9a8205f04..a97596a57 100644 --- a/integration/prepared_statements_full/users.toml +++ b/integration/prepared_statements_full/users.toml @@ -2,3 +2,19 @@ database = "pgdog" name = "pgdog" password = "pgdog" + + +[[users]] +name = "pgdog_session" +database = "pgdog" +password = "pgdog" +server_user = "pgdog" +pooler_mode = "session" + +[[users]] +name = "pgdog_tx_single" +database = "pgdog" +password = "pgdog" +server_user = "pgdog" +pooler_mode = "transaction" +pool_size = 1 \ No newline at end of file diff --git a/integration/ruby/lb_spec.rb b/integration/ruby/lb_spec.rb index a3a6c3c39..061d614eb 100644 --- a/integration/ruby/lb_spec.rb +++ b/integration/ruby/lb_spec.rb @@ -13,7 +13,7 @@ it 'distributes traffic evenly' do conn = failover # Reset stats and bans - admin.exec "RECONNECT" + admin_exec 'RECONNECT' before = admin_stats('failover') 250.times do diff --git a/integration/ruby/protocol_out_of_sync_spec.rb b/integration/ruby/protocol_out_of_sync_spec.rb new file mode 100644 index 000000000..fda5f853a --- /dev/null +++ b/integration/ruby/protocol_out_of_sync_spec.rb @@ -0,0 +1,73 @@ +# frozen_string_literal: true + +require_relative 'rspec_helper' + +def connect(dbname = 'pgdog', user = 'pgdog') + PG.connect(dbname: dbname, user: user, password: 'pgdog', port: 6432, host: '127.0.0.1', application_name: '') +end + +describe 'protocol out of sync' do + after do + ensure_done + end + + # A simple query that errors must not prevent a subsequent extended query + # from executing. Both are sent as separate requests; pgdog must process + # each independently. + it 'extended query succeeds after preceding simple query error' do + conn = connect + + # Simple query that errors. + expect { conn.exec 'SELECT 1/0' }.to raise_error(PG::Error, /division by zero/) + + # Extended query must succeed despite the preceding error. + res = conn.exec_params 'SELECT $1::integer AS val', [42] + expect(res[0]['val']).to eq('42') + + conn.close + end + + # In pipeline mode, a failed first query must not prevent subsequent queries + # from executing. Seq2 and seq3 must return rows even when seq1 errors. + it 'extended query pipeline: error in seq1 does not drop seq2 and seq3' do + conn = connect + + conn.enter_pipeline_mode + + # Seq1: will fail — division by zero + conn.send_query_params 'SELECT 1/0', [] + conn.pipeline_sync + # Seq2: must succeed + conn.send_query_params 'SELECT $1::integer AS val', [2] + conn.pipeline_sync + # Seq3: must succeed + conn.send_query_params 'SELECT $1::integer AS val', [3] + conn.pipeline_sync + + # Seq1: fails — consume the error result, then the sync boundary. + begin + conn.get_result + rescue PG::Error => e + expect(e.message).to include('division by zero') + end + conn.get_result # nil — end of command + conn.get_result # PipelineSync — sync boundary + + # Seq2: must return a real row — if aborted instead, the request was dropped. + r2 = conn.get_result + expect(r2.result_status).to eq(PG::PGRES_TUPLES_OK) + expect(r2[0]['val']).to eq('2') + conn.get_result # end of command + conn.get_result # PipelineSync + + # Seq3: must return a real row. + r3 = conn.get_result + expect(r3.result_status).to eq(PG::PGRES_TUPLES_OK) + expect(r3[0]['val']).to eq('3') + conn.get_result # end of command + conn.get_result # PipelineSync + + conn.exit_pipeline_mode + conn.close + end +end diff --git a/integration/ruby/rspec_helper.rb b/integration/ruby/rspec_helper.rb index 5eb8317ae..ac55b4acf 100644 --- a/integration/ruby/rspec_helper.rb +++ b/integration/ruby/rspec_helper.rb @@ -9,6 +9,13 @@ def admin PG.connect('postgres://admin:pgdog@127.0.0.1:6432/admin') end +def admin_exec(sql) + conn = admin + conn.exec sql +ensure + conn&.close +end + def failover PG.connect('postgres://pgdog:pgdog@127.0.0.1:6432/failover') end @@ -24,20 +31,62 @@ def admin_stats(database, column = nil) end def ensure_done - conn = PG.connect(dbname: 'admin', user: 'admin', password: 'pgdog', port: 6432, host: '127.0.0.1') - pools = conn.exec 'SHOW POOLS' + deadline = Time.now + 2 + pools = [] + clients = [] + servers = [] + pg_clients = [] + current_client_id = nil + + loop do + conn = PG.connect(dbname: 'admin', user: 'admin', password: 'pgdog', port: 6432, host: '127.0.0.1') + begin + pools = conn.exec 'SHOW POOLS' + current_client_id = conn.backend_pid + clients = conn.exec 'SHOW CLIENTS' + servers = conn.exec 'SHOW SERVERS' + ensure + conn.close + end + + pg_conn = PG.connect(dbname: 'pgdog', user: 'pgdog', password: 'pgdog', port: 5432, host: '127.0.0.1') + begin + pg_clients = pg_conn.exec 'SELECT state FROM pg_stat_activity'\ + " WHERE datname IN ('pgdog', 'shard_0', 'shard_1')"\ + " AND backend_type = 'client backend'"\ + " AND query NOT LIKE '%pg_stat_activity%'" + ensure + pg_conn.close + end + + pools_ready = pools.all? do |pool| + pool['sv_active'] == '0' && pool['cl_waiting'] == '0' && pool['out_of_sync'] == '0' + end + clients_ready = clients.all? do |client| + client['id'].to_i == current_client_id || client['state'] == 'idle' + end + servers_ready = servers + .select { |server| server['application_name'] != 'PgDog Pub/Sub Listener' } + .all? { |server| server['state'] == 'idle' } + pg_clients_ready = pg_clients.all? { |client| client['state'] == 'idle' } + + break if pools_ready && clients_ready && servers_ready && pg_clients_ready + break if Time.now >= deadline + + sleep 0.05 + end + pools.each do |pool| expect(pool['sv_active']).to eq('0') expect(pool['cl_waiting']).to eq('0') expect(pool['out_of_sync']).to eq('0') end - current_client_id = conn.backend_pid - clients = conn.exec 'SHOW CLIENTS' + clients.each do |client| next if client['id'].to_i == current_client_id expect(client['state']).to eq('idle') end - servers = conn.exec 'SHOW SERVERS' + servers .select do |server| server['application_name'] != 'PgDog Pub/Sub Listener' @@ -46,12 +95,7 @@ def ensure_done expect(server['state']).to eq('idle') end - conn = PG.connect(dbname: 'pgdog', user: 'pgdog', password: 'pgdog', port: 5432, host: '127.0.0.1') - clients = conn.exec 'SELECT state FROM pg_stat_activity'\ - " WHERE datname IN ('pgdog', 'shard_0', 'shard_1')"\ - " AND backend_type = 'client backend'"\ - " AND query NOT LIKE '%pg_stat_activity%'" - clients.each do |client| + pg_clients.each do |client| expect(client['state']).to eq('idle') end -end +end \ No newline at end of file diff --git a/pgdog/src/backend/prepared_statements.rs b/pgdog/src/backend/prepared_statements.rs index 76ad8480a..93d90c38d 100644 --- a/pgdog/src/backend/prepared_statements.rs +++ b/pgdog/src/backend/prepared_statements.rs @@ -231,9 +231,7 @@ impl PreparedStatements { } // Backend told us the copy is done. - 'c' => { - self.state.action(code)?; - } + 'c' => {} _ => (), } diff --git a/pgdog/src/backend/protocol/state.rs b/pgdog/src/backend/protocol/state.rs index a93b89a99..e47f47206 100644 --- a/pgdog/src/backend/protocol/state.rs +++ b/pgdog/src/backend/protocol/state.rs @@ -1,3 +1,5 @@ +use tracing::error; + use crate::{ net::{Message, Protocol}, stats::memory::MemoryUsage, @@ -32,12 +34,6 @@ impl MemoryUsage for ExecutionCode { } } -impl ExecutionCode { - fn extended(&self) -> bool { - matches!(self, Self::ParseComplete | Self::BindComplete) - } -} - impl From for ExecutionCode { fn from(value: char) -> Self { match value { @@ -67,29 +63,17 @@ impl MemoryUsage for ExecutionItem { } } -impl ExecutionItem { - fn extended(&self) -> bool { - match self { - Self::Code(code) | Self::Ignore(code) => code.extended(), - } - } -} - #[derive(Debug, Clone, Default)] pub struct ProtocolState { queue: VecDeque, simulated: VecDeque, - extended: bool, out_of_sync: bool, } impl MemoryUsage for ProtocolState { #[inline] fn memory_usage(&self) -> usize { - self.queue.memory_usage() - + self.simulated.memory_usage() - + self.extended.memory_usage() - + self.out_of_sync.memory_usage() + self.queue.memory_usage() + self.simulated.memory_usage() + self.out_of_sync.memory_usage() } } @@ -102,7 +86,6 @@ impl ProtocolState { /// pub(crate) fn add_ignore(&mut self, code: impl Into) { let code = code.into(); - self.extended = self.extended || code.extended(); self.queue.push_back(ExecutionItem::Ignore(code)); } @@ -110,7 +93,6 @@ impl ProtocolState { /// to be returned by the server. pub(crate) fn add(&mut self, code: impl Into) { let code = code.into(); - self.extended = self.extended || code.extended(); self.queue.push_back(ExecutionItem::Code(code)) } @@ -154,29 +136,60 @@ impl ProtocolState { match code { ExecutionCode::Untracked => return Ok(Action::Forward), ExecutionCode::Error => { - if !self.extended { - // A simple-query error only aborts the current simple query. - // Keep any later pipelined simple query RFQs queued. - while !self.queue.is_empty() - && self.queue.front() - != Some(&ExecutionItem::Code(ExecutionCode::ReadyForQuery)) - { - self.queue.pop_front(); - } - return Ok(Action::Forward); + if matches!( + self.queue.front(), + Some(ExecutionItem::Ignore(ExecutionCode::Error)) + ) { + // We ignore errors only for the pgdog-injected sub-request. + // In that case the first error is already processed and + // sent to the client, for the remaining expected errors + // we've added ignores for errors and RFQ. + // The error is ignored but still be logged by [backend::server] module + self.queue.pop_front(); + return Ok(Action::Ignore); } - // Remove everything from the execution queue. - // The connection is out of sync until client re-syncs it. - if self.extended { - self.out_of_sync = true; - } - let last = self.queue.pop_back(); - self.queue.clear(); - if let Some(ExecutionItem::Code(ExecutionCode::ReadyForQuery)) = last { + // This is the first (and client-visible) error in the chain. It is forwarded + // so the client receives exactly one Error+RFQ for their request. + + // Mark the state out-of-sync so the connection is not reused until the client re-syncs. + // For simple query it happens immediately after receiving the RFQ + self.out_of_sync = true; + + // find the first position for RFQ code to effectively + // separate the pgdog-injected sub-request from the remaining queries + let Some(rfq_pos) = self + .queue + .iter() + .position(|i| matches!(i, ExecutionItem::Code(ExecutionCode::ReadyForQuery))) + else { + self.queue.clear(); + return Ok(Action::Forward); + }; + + // broken_queue - pgdog-injected sub-request part that contains multiple requests + // that are not be executed properly anyway, since we've got an error previously + let broken_queue = self.queue.drain(..rfq_pos); + + // Count how many queries are expected to finish in the pgdog-injected sub-request + // The current use case is only the Prepare + Execute messages from the [backend::server] + // And in case the prepare fails the execute will fail as well. + // WARN: That is not most reliable solution in case the injected set of queries + // will extend, but it should work for now. + let count_ignores = broken_queue + .filter(|i| matches!(i, ExecutionItem::Ignore(ExecutionCode::ReadyForQuery))) + .count(); + + // For every message that we expect to run add ignore for one error and one RFQ + // For prepare it'll be a one iteration that will create the query + // [Ignore(RFQ), Ignore(Error), Code(RFQ)] + for _ in 0..count_ignores { + self.queue + .push_front(ExecutionItem::Ignore(ExecutionCode::Error)); self.queue - .push_back(ExecutionItem::Code(ExecutionCode::ReadyForQuery)); + .push_front(ExecutionItem::Ignore(ExecutionCode::ReadyForQuery)); } + return Ok(Action::Forward); } @@ -185,8 +198,11 @@ impl ProtocolState { } _ => (), }; - let in_queue = self.queue.pop_front().ok_or(Error::ProtocolOutOfSync)?; - let action = match in_queue { + let in_queue = self.queue.pop_front().ok_or_else(|| { + error!("Unexpected action {code:?}: queue is empty"); + Error::ProtocolOutOfSync + })?; + match in_queue { // The queue is waiting for the server to send ReadyForQuery, // but it sent something else. That means the execution pipeline // isn't done. We are not tracking every single message, so this is expected. @@ -205,16 +221,12 @@ impl ProtocolState { if code == in_queue { Ok(Action::Ignore) } else { + error!(?self, "Unexpected action {code:?}: expected: {in_queue:?}"); + Err(Error::ProtocolOutOfSync) } } - }?; - - if code == ExecutionCode::ReadyForQuery { - self.extended = self.queue.iter().any(ExecutionItem::extended); } - - Ok(action) } pub(crate) fn in_copy_mode(&self) -> bool { @@ -234,11 +246,6 @@ impl ProtocolState { &self.queue } - #[cfg(test)] - pub(crate) fn queue_mut(&mut self) -> &mut VecDeque { - &mut self.queue - } - pub(crate) fn done(&self) -> bool { self.is_empty() && !self.out_of_sync } @@ -252,7 +259,7 @@ impl ProtocolState { !self.out_of_sync } - /// Check if the protocol is out of sync due to an error in extended protocol. + /// Check if the protocol is out of sync due to an error. pub(crate) fn out_of_sync(&self) -> bool { self.out_of_sync } @@ -360,7 +367,6 @@ mod test { assert_eq!(state.action('C').unwrap(), Action::Forward); assert_eq!(state.action('Z').unwrap(), Action::Forward); assert!(state.is_empty()); - assert!(!state.extended); } #[test] @@ -507,14 +513,65 @@ mod test { #[test] fn test_simple_query_error_no_out_of_sync() { let mut state = ProtocolState::default(); - // Simple query error should NOT set out_of_sync + // Simple query error sets out_of_sync temporarily; it is cleared by the next RFQ. state.add('C'); // CommandComplete (expected but won't arrive) state.add('Z'); // ReadyForQuery - assert!(!state.extended); assert_eq!(state.action('E').unwrap(), Action::Forward); - assert!(!state.out_of_sync); // Simple query doesn't set out_of_sync + assert!(state.out_of_sync); // set on error, cleared on RFQ + assert_eq!(state.action('Z').unwrap(), Action::Forward); + assert!(!state.out_of_sync); + } + + // A simple-query error sets out_of_sync temporarily; the following RFQ clears it. + // Extended-query entries queued after the simple query are unaffected. + #[test] + fn test_simple_query_error_before_queued_extended_request_does_not_set_out_of_sync() { + let mut state = ProtocolState::default(); + state.add('C'); // CommandComplete (simple query) + state.add('Z'); // ReadyForQuery (simple query) + state.add('1'); // ParseComplete (extended query) + state.add('2'); // BindComplete (extended query) + state.add('C'); // CommandComplete (extended query) + state.add('Z'); // ReadyForQuery (extended query) + + assert_eq!(state.action('E').unwrap(), Action::Forward); // error forwarded + assert!(state.out_of_sync); // set on error + assert_eq!(state.action('Z').unwrap(), Action::Forward); // simple query RFQ + assert!(!state.out_of_sync); // cleared by RFQ; extended query intact + assert_eq!(state.action('1').unwrap(), Action::Forward); // ParseComplete + assert_eq!(state.action('2').unwrap(), Action::Forward); // BindComplete + assert_eq!(state.action('C').unwrap(), Action::Forward); // CommandComplete + assert_eq!(state.action('Z').unwrap(), Action::Forward); // ReadyForQuery + assert!(state.is_empty()); + } + + // out_of_sync is cleared by each sub-request's own RFQ, so a simple-query error + // between two RFQ boundaries does not bleed into subsequent extended-query entries. + #[test] + fn test_simple_query_error_after_rfq_before_extended_does_not_set_out_of_sync() { + let mut state = ProtocolState::default(); + state.add('C'); // CommandComplete (simple query 1) + state.add('Z'); // ReadyForQuery (simple query 1) + state.add('C'); // CommandComplete (simple query 2) + state.add('Z'); // ReadyForQuery (simple query 2) + state.add('1'); // ParseComplete (extended query) + state.add('2'); // BindComplete (extended query) + state.add('C'); // CommandComplete (extended query) + state.add('Z'); // ReadyForQuery (extended query) + + assert_eq!(state.action('C').unwrap(), Action::Forward); // simple query 1 OK assert_eq!(state.action('Z').unwrap(), Action::Forward); + + assert_eq!(state.action('E').unwrap(), Action::Forward); // simple query 2 errors + assert!(state.out_of_sync); // set on error + assert_eq!(state.action('Z').unwrap(), Action::Forward); // simple query 2 RFQ + assert!(!state.out_of_sync); // cleared; extended query intact + assert_eq!(state.action('1').unwrap(), Action::Forward); // ParseComplete + assert_eq!(state.action('2').unwrap(), Action::Forward); // BindComplete + assert_eq!(state.action('C').unwrap(), Action::Forward); // CommandComplete + assert_eq!(state.action('Z').unwrap(), Action::Forward); // ReadyForQuery + assert!(state.is_empty()); } #[test] @@ -821,7 +878,6 @@ mod test { state.add('Z'); // ReadyForQuery assert_eq!(state.action('1').unwrap(), Action::Forward); - assert!(state.extended); // Now marked as extended assert_eq!(state.action('2').unwrap(), Action::Forward); assert_eq!(state.action('C').unwrap(), Action::Forward); assert_eq!(state.action('Z').unwrap(), Action::Forward); @@ -890,4 +946,203 @@ mod test { assert_eq!(state.action('Z').unwrap(), Action::Forward); assert!(state.is_empty()); } + + // Double action('c') for server CopyDone + + // Safe path: Code(ReadyForQuery) backstop makes the double action('c') call idempotent. + #[test] + fn test_copydone_double_action_safe_with_rfq_backstop() { + let mut state = ProtocolState::default(); + // 1. Queue: CopyOut slot + RFQ backstop (from Sync). + state.add('G'); // CopyOut + state.add('Z'); // ReadyForQuery backstop + + // 2. First action('c'): pops CopyOut; RFQ backstop untouched. + assert_eq!(state.action('c').unwrap(), Action::Forward); + assert_eq!(state.len(), 1); + + // 3. Second action('c'): sees RFQ at front; pushes it back (idempotent). + assert_eq!(state.action('c').unwrap(), Action::Forward); + assert_eq!(state.len(), 1); // RFQ still present for the server's ReadyForQuery + } + + // Documents raw state-machine behavior: calling action('c') twice with no RFQ backstop + // causes ProtocolOutOfSync. forward() was the only caller that did this; the second call + // has been removed from the 'c' arm in prepared_statements.rs, making this path unreachable + // through normal protocol flow. The test is kept to pin the underlying invariant. + #[test] + fn test_copydone_double_action_oos_without_rfq_backstop() { + let mut state = ProtocolState::default(); + // Queue: Execute + Flush (no Sync) — no RFQ backstop. + state.add('C'); // ExecutionCompleted + + // First action('c'): pops ExecutionCompleted; queue empty. + assert_eq!(state.action('c').unwrap(), Action::Forward); + assert!(state.is_empty()); + + // Second action('c') directly: empty queue → ProtocolOutOfSync. + // This is the raw state machine. forward() no longer makes this second call. + assert!(state.action('c').is_err()); + } + + // Happy path: injected ParseComplete arrives in order — silently ignored, rest forwarded. + #[test] + fn test_injected_parse_happy_path() { + let mut state = ProtocolState::default(); + state.add_ignore('1'); // ParseComplete — injected, swallowed + state.add('2'); // BindComplete + state.add('C'); // CommandComplete + state.add('Z'); // ReadyForQuery + + assert_eq!(state.action('1').unwrap(), Action::Ignore); // swallowed + assert_eq!(state.action('2').unwrap(), Action::Forward); // forwarded + assert_eq!(state.action('C').unwrap(), Action::Forward); // forwarded + assert_eq!(state.action('Z').unwrap(), Action::Forward); // forwarded + assert!(state.is_empty()); + } + + // Replicates the full lifecycle of an injected PREPARE that errors: + // + // Client sends: PREPARE foo AS ... (simple-query style) + // EXECUTE (via Query) + // + // pgdog injects ahead of the client's Query: + // add_ignore('C') — CommandComplete from PREPARE + // add_ignore('Z') — RFQ from PREPARE + // Then the client's Query adds: + // add('Z') — the client-visible RFQ + // + // Queue before first error: [Ignore(C), Ignore(Z), Code(Z)] + // + // Server responds to PREPARE with an error: + // 'E' → error branch fires: drain [Ignore(C), Ignore(Z)], count 1 Ignore(RFQ), + // push_front loop produces [Ignore(RFQ), Ignore(Error), Code(Z)]. + // Action::Forward — client receives this error. + // 'Z' → matches Ignore(RFQ) → Action::Ignore (PREPARE's RFQ suppressed) + // + // Server responds to EXECUTE (which fails because PREPARE never succeeded): + // 'E' → fast-path: front is Ignore(Error) → pop → Action::Ignore (suppressed) + // 'Z' → Code(Z) → Action::Forward — client receives the closing RFQ + #[test] + fn test_injected_prepare_error_full_lifecycle() { + let mut state = ProtocolState::default(); + + // --- setup: replicate what prepared_statements.rs does --- + // ProtocolMessage::Prepare injects: + state.add_ignore('C'); // Ignore(CommandComplete) — PREPARE response + state.add_ignore('Z'); // Ignore(RFQ) — PREPARE response + // ProtocolMessage::Query (client EXECUTE) adds: + state.add('Z'); // Code(RFQ) — client-visible + + // --- server sends Error for PREPARE --- + // Error branch: drains [Ignore(C), Ignore(Z)], finds 1 Ignore(Z), + // rebuilds queue as [Ignore(RFQ), Ignore(Error), Code(Z)]. + assert_eq!(state.action('E').unwrap(), Action::Forward); + + // --- server sends RFQ for PREPARE (now suppressed) --- + assert_eq!(state.action('Z').unwrap(), Action::Ignore); + + // --- server sends Error for EXECUTE (prepare never succeeded) --- + // Fast-path: Ignore(Error) is at front → pop and ignore. + assert_eq!(state.action('E').unwrap(), Action::Ignore); + + // --- server sends RFQ for EXECUTE --- + // Code(Z) is at front → forwarded to client. + assert_eq!(state.action('Z').unwrap(), Action::Forward); + assert!(state.is_empty()); + } + + // ========================================= + // Pipeline multi-sync tests + // ========================================= + + // In pipeline mode, each request runs in isolation; a failure in one must not affect the others. + #[test] + fn test_pipeline_multi_sync_error_in_seq1_does_not_affect_seq2_seq3() { + // Setup: seq1 expects ParseComplete, BindComplete, CommandComplete, ReadyForQuery. + let mut seq1 = ProtocolState::default(); + seq1.add('1'); // ParseComplete + seq1.add('2'); // BindComplete + seq1.add('C'); // CommandComplete (won't arrive — execute errors) + seq1.add('Z'); // ReadyForQuery + + // Seq1: fails — seq2 and seq3 must still respond. + assert_eq!(seq1.action('1').unwrap(), Action::Forward); // ParseComplete + assert_eq!(seq1.action('2').unwrap(), Action::Forward); // BindComplete + assert_eq!(seq1.action('E').unwrap(), Action::Forward); // ErrorResponse + assert!(seq1.out_of_sync); + assert_eq!(seq1.len(), 1); + assert_eq!(seq1.action('Z').unwrap(), Action::Forward); // ReadyForQuery + assert!(!seq1.out_of_sync); + assert!(seq1.is_empty()); + + // Seq2: independent state; seq1's error must not affect it. + let mut seq2 = ProtocolState::default(); + seq2.add('1'); + seq2.add('2'); + seq2.add('C'); + seq2.add('Z'); + + assert_eq!(seq2.action('1').unwrap(), Action::Forward); // ParseComplete + assert_eq!(seq2.action('2').unwrap(), Action::Forward); // BindComplete + assert_eq!(seq2.action('C').unwrap(), Action::Forward); // CommandComplete + assert_eq!(seq2.action('Z').unwrap(), Action::Forward); // ReadyForQuery + assert!(seq2.is_empty()); + + // Seq3: independent state; seq1's error must not affect it. + let mut seq3 = ProtocolState::default(); + seq3.add('1'); + seq3.add('2'); + seq3.add('C'); + seq3.add('Z'); + + assert_eq!(seq3.action('1').unwrap(), Action::Forward); // ParseComplete + assert_eq!(seq3.action('2').unwrap(), Action::Forward); // BindComplete + assert_eq!(seq3.action('C').unwrap(), Action::Forward); // CommandComplete + assert_eq!(seq3.action('Z').unwrap(), Action::Forward); // ReadyForQuery + assert!(seq3.is_empty()); + } + + // In pipeline mode, a failed request must not prevent subsequent pipelined + // requests from being processed. Seq1 errors; seq2 and seq3 must still + // receive their responses. + // + // Currently FAILS: when seq1 errors, seq2 and seq3 receive errors instead + // of their expected responses. + #[test] + fn test_pipeline_single_queue_error_only_clears_failing_sync_group() { + let mut state = ProtocolState::default(); + // Setup: all three sync groups loaded into a single shared queue. + state.add('1'); + state.add('2'); + state.add('C'); + state.add('Z'); // seq1 + state.add('1'); + state.add('2'); + state.add('C'); + state.add('Z'); // seq2 + state.add('1'); + state.add('2'); + state.add('C'); + state.add('Z'); // seq3 + + // Seq1: fails — seq2 and seq3 must still be reachable. + assert_eq!(state.action('1').unwrap(), Action::Forward); // ParseComplete + assert_eq!(state.action('2').unwrap(), Action::Forward); // BindComplete + assert_eq!(state.action('E').unwrap(), Action::Forward); // error — must not clear seq2/seq3 + assert_eq!(state.action('Z').unwrap(), Action::Forward); // ReadyForQuery + + // Seq2: must process normally. + assert_eq!(state.action('1').unwrap(), Action::Forward); // ParseComplete + assert_eq!(state.action('2').unwrap(), Action::Forward); // BindComplete + assert_eq!(state.action('C').unwrap(), Action::Forward); // CommandComplete + assert_eq!(state.action('Z').unwrap(), Action::Forward); // ReadyForQuery + + // Seq3: must process normally. + assert_eq!(state.action('1').unwrap(), Action::Forward); // ParseComplete + assert_eq!(state.action('2').unwrap(), Action::Forward); // BindComplete + assert_eq!(state.action('C').unwrap(), Action::Forward); // CommandComplete + assert_eq!(state.action('Z').unwrap(), Action::Forward); // ReadyForQuery + assert!(state.is_empty()); + } } diff --git a/pgdog/src/backend/server.rs b/pgdog/src/backend/server.rs index 5b2c7e09c..16c00307a 100644 --- a/pgdog/src/backend/server.rs +++ b/pgdog/src/backend/server.rs @@ -435,6 +435,13 @@ impl Server { Ok(forward) => { if forward { break message; + } else if message.code() == 'E' { + // we got an error that will not be forwarded to the client, + // but it still be useful for tracing + error!( + "Ignore error from stream: {:?}", + ErrorResponse::from_bytes(message.payload()) + ); } } Err(err) => { @@ -1092,7 +1099,6 @@ impl Drop for Server { } } -// Used for testing. #[cfg(test)] pub mod test { use bytes::{BufMut, BytesMut}; @@ -1351,6 +1357,44 @@ pub mod test { assert!(server.done()); } + // A simple query that errors must not prevent a subsequent extended query + // from executing when both are sent in the same batch. + // + // Currently FAILS: queuing the extended items sets extended=true before the + // simple query is processed, so the simple query error incorrectly clears + // the extended query's pending entries. + #[tokio::test] + async fn test_simple_query_error_before_extended_query_in_same_batch() { + let mut server = test_server().await; + + // Setup: simple query that errors, immediately followed by an extended query. + server + .send( + &vec![ + Query::new("SELECT 1/0").into(), + Parse::new_anonymous("SELECT 1").into(), + Bind::default().into(), + Execute::new().into(), + Sync.into(), + ] + .into(), + ) + .await + .unwrap(); + + // Simple query: errors, then ReadyForQuery. + assert_eq!(server.read().await.unwrap().code(), 'E'); // ErrorResponse + assert_eq!(server.read().await.unwrap().code(), 'Z'); // ReadyForQuery + + // Extended query must still return its results. + assert_eq!(server.read().await.unwrap().code(), '1'); // ParseComplete + assert_eq!(server.read().await.unwrap().code(), '2'); // BindComplete + assert_eq!(server.read().await.unwrap().code(), 'D'); // DataRow + assert_eq!(server.read().await.unwrap().code(), 'C'); // CommandComplete + assert_eq!(server.read().await.unwrap().code(), 'Z'); // ReadyForQuery + assert!(server.done()); + } + #[tokio::test] async fn test_execute_batch_simple_query_error_then_success() { let mut server = test_server().await; @@ -1517,7 +1561,7 @@ pub mod test { let (new, name) = global.write().insert(&parse); assert!(new); let parse = parse.rename(&name); - assert_eq!(parse.name(), "__pgdog_1"); + assert!(parse.name().starts_with("__pgdog_")); let mut server = test_server().await; @@ -1526,7 +1570,7 @@ pub mod test { .send( &vec![ ProtocolMessage::from(Bind::new_params( - "__pgdog_1", + &name, &[Parameter { len: 1, data: "1".as_bytes().into(), @@ -1960,14 +2004,17 @@ pub mod test { let mut prep = PreparedStatements::new(); let mut parse = Parse::named("test", "SELECT 1::bigint"); + prep.insert_anyway(&mut parse); - assert_eq!(parse.name(), "__pgdog_1"); + + let name = parse.name().to_owned(); + assert!(name.starts_with("__pgdog_")); server .send( &vec![ProtocolMessage::from(Query::new(format!( "PREPARE {} AS {}", - parse.name(), + name, parse.query() )))] .into(), @@ -1980,10 +2027,10 @@ pub mod test { } assert!(server.sync_prepared()); server.sync_prepared_statements().await.unwrap(); - assert!(server.prepared_statements.contains("__pgdog_1")); + assert!(server.prepared_statements.contains(&name)); - let describe = Describe::new_statement("__pgdog_1"); - let bind = Bind::new_statement("__pgdog_1"); + let describe = Describe::new_statement(&name); + let bind = Bind::new_statement(&name); let execute = Execute::new(); server .send( @@ -2003,7 +2050,7 @@ pub mod test { assert_eq!(c, msg.code()); } - let parse = Parse::named("__pgdog_1", "SELECT 2::bigint"); + let parse = Parse::named(&name, "SELECT 2::bigint"); let describe = describe.clone(); server @@ -2017,7 +2064,7 @@ pub mod test { } server - .send(&vec![ProtocolMessage::from(Query::new("EXECUTE __pgdog_1"))].into()) + .send(&vec![ProtocolMessage::from(Query::new(format!("EXECUTE {name}")))].into()) .await .unwrap(); for c in ['T', 'D', 'C', 'Z'] { @@ -2720,38 +2767,6 @@ pub mod test { ); } - #[tokio::test] - async fn test_protocol_out_of_sync_sets_error_state() { - let mut server = test_server().await; - - server - .send(&vec![Query::new("SELECT 1").into()].into()) - .await - .unwrap(); - - for c in ['T', 'D'] { - let msg = server.read().await.unwrap(); - assert_eq!(msg.code(), c); - } - - // simulate an unlikely, but existent out-of-sync state - server - .prepared_statements_mut() - .state_mut() - .queue_mut() - .clear(); - - let res = server.read().await; - assert!( - matches!(res, Err(Error::ProtocolOutOfSync)), - "protocol should be out of sync" - ); - assert!( - server.stats().get_state() == State::Error, - "state should be Error after detecting desync" - ) - } - #[tokio::test] async fn test_reset_clears_client_params() { let mut server = test_server().await; @@ -3177,6 +3192,63 @@ pub mod test { assert!(!server.needs_drain()); } + // In pipeline mode, a failed request must not prevent subsequent pipelined + // requests from being processed. All three sequences are sent in one batch; + // seq1 fails, seq2 and seq3 must still return their rows. + // + // Currently FAILS: the error in seq1 causes seq2 and seq3 to receive + // ProtocolOutOfSync instead of their expected responses. + #[tokio::test] + async fn test_pipelined_multiple_syncs_first_fails() { + let mut server = test_server().await; + + // Three pipelined sequences sent in one batch. + server + .send( + &vec![ + // Seq1 — will fail. + Parse::new_anonymous("SELECT 1/0").into(), + Bind::default().into(), + Execute::new().into(), + Sync.into(), + // Seq2 — must succeed. + Parse::new_anonymous("SELECT 2").into(), + Bind::default().into(), + Execute::new().into(), + Sync.into(), + // Seq3 — must succeed. + Parse::new_anonymous("SELECT 3").into(), + Bind::default().into(), + Execute::new().into(), + Sync.into(), + ] + .into(), + ) + .await + .unwrap(); + + // Seq1: fails — seq2 and seq3 must still respond. + assert_eq!(server.read().await.unwrap().code(), '1'); // ParseComplete + assert_eq!(server.read().await.unwrap().code(), 'E'); // ErrorResponse + assert_eq!(server.read().await.unwrap().code(), 'Z'); // ReadyForQuery + + // Seq2: must return data despite seq1 erroring. + assert_eq!(server.read().await.unwrap().code(), '1'); // ParseComplete + assert_eq!(server.read().await.unwrap().code(), '2'); // BindComplete + assert_eq!(server.read().await.unwrap().code(), 'D'); // DataRow + assert_eq!(server.read().await.unwrap().code(), 'C'); // CommandComplete + assert_eq!(server.read().await.unwrap().code(), 'Z'); // ReadyForQuery + + // Seq3: must return data. + assert_eq!(server.read().await.unwrap().code(), '1'); // ParseComplete + assert_eq!(server.read().await.unwrap().code(), '2'); // BindComplete + assert_eq!(server.read().await.unwrap().code(), 'D'); // DataRow + assert_eq!(server.read().await.unwrap().code(), 'C'); // CommandComplete + assert_eq!(server.read().await.unwrap().code(), 'Z'); // ReadyForQuery + assert!(server.done()); + assert!(!server.has_more_messages()); + } + #[tokio::test] async fn test_empty_query_extended() { let mut server = test_server().await; @@ -3588,4 +3660,106 @@ pub mod test { assert!(server.force_close()); assert_eq!(server.stats().get_state(), State::ForceClose); } + + // Failed injected PREPARE leaves EXECUTE ReadyForQuery unmatched — Error handler empties the queue. + #[tokio::test] + async fn test_prepare_execute_inject_failure_orphans_execute_rfq() { + let mut server = test_server().await; + + // 1. Send [Prepare, Query] as the rewriter injects for EXECUTE. + server + .send( + &vec![ + ProtocolMessage::Prepare { + name: "__pgdog_prepare_inject_test".to_string(), + statement: "SELECT 1 FROM __pgdog_nonexistent_table__".to_string(), + }, + ProtocolMessage::Query(Query::new("EXECUTE __pgdog_prepare_inject_test()")), + ] + .into(), + ) + .await + .unwrap(); + + // 2. PREPARE 'E' forwarded; 'Z' consumes re-added Code(RFQ) — queue empty. + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'E'); // 'E' PREPARE error + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'Z'); // 'Z' RFQ — queue now empty + } + + // Extended Execute + Flush (no Sync): single action('c') now succeeds. + // CopyDone is forwarded to client; the trailing CommandComplete then hits an empty + // queue (no RFQ backstop, no Sync) and raises ProtocolOutOfSync. + // This is distinct from the former double-action bug, which fired on CopyDone itself. + #[tokio::test] + async fn test_copydone_single_action_without_sync() { + let mut server = test_server().await; + + // 1. Parse + Bind + Execute + Flush (not Sync); no RFQ backstop in queue. + server + .send( + &vec![ + ProtocolMessage::Parse(Parse::new_anonymous("COPY (VALUES (1),(2)) TO STDOUT")), + ProtocolMessage::Bind(Bind::new_params("", &[])), + ProtocolMessage::Execute(Execute::new()), + // Flush (not Sync): prompts PostgreSQL to send buffered responses. + // handle() maps this to Other, adding nothing to the queue. + Flush.into(), + ] + .into(), + ) + .await + .unwrap(); + + // 2. ParseComplete, BindComplete, CopyOutResponse, CopyData x2 arrive normally. + assert_eq!(server.read().await.unwrap().code(), '1'); // ParseComplete + assert_eq!(server.read().await.unwrap().code(), '2'); // BindComplete + assert_eq!(server.read().await.unwrap().code(), 'H'); // CopyOutResponse + assert_eq!(server.read().await.unwrap().code(), 'd'); // CopyData row 1 + assert_eq!(server.read().await.unwrap().code(), 'd'); // CopyData row 2 + + // 3. CopyDone — fixed: single action() pops ExecutionCompleted; no second call. + assert_eq!(server.read().await.unwrap().code(), 'c'); // CopyDone forwarded + + // 4. CommandComplete hits empty queue (no RFQ backstop without Sync). + assert!( + matches!(server.read().await.unwrap_err(), Error::ProtocolOutOfSync), + "expected ProtocolOutOfSync on CommandComplete with empty queue" + ); + } + + // Safe path: Sync adds Code(RFQ) backstop — double action('c') is idempotent. + #[tokio::test] + async fn test_copydone_double_action_safe_with_sync() { + let mut server = test_server().await; + + // 1. Parse + Bind + Execute + Sync; RFQ backstop added to queue. + server + .send( + &vec![ + ProtocolMessage::Parse(Parse::new_anonymous("COPY (VALUES (1),(2)) TO STDOUT")), + ProtocolMessage::Bind(Bind::new_params("", &[])), + ProtocolMessage::Execute(Execute::new()), + ProtocolMessage::Sync(Sync), + ] + .into(), + ) + .await + .unwrap(); + + // 2. Full response sequence — CopyDone is safe with RFQ backstop. + assert_eq!(server.read().await.unwrap().code(), '1'); // ParseComplete + assert_eq!(server.read().await.unwrap().code(), '2'); // BindComplete + assert_eq!(server.read().await.unwrap().code(), 'H'); // CopyOutResponse + assert_eq!(server.read().await.unwrap().code(), 'd'); // CopyData row 1 + assert_eq!(server.read().await.unwrap().code(), 'd'); // CopyData row 2 + assert_eq!(server.read().await.unwrap().code(), 'c'); // CopyDone -- safe with RFQ backstop + assert_eq!(server.read().await.unwrap().code(), 'C'); // CommandComplete + assert_eq!(server.read().await.unwrap().code(), 'Z'); // ReadyForQuery + assert!( + server.done(), + "server must be done after full response sequence" + ); + } }