From 8478876be6221a61299375be65dccfbbaa15d42a Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Mon, 6 Apr 2026 15:28:02 +0000 Subject: [PATCH 1/6] test & docs issue 0 test flip --- .github/workflows/ci.yml | 2 + docs/issues/PROTOCOL_OUT_OF_SYNC.md | 435 ++++++++++++++++++ integration/common.sh | 4 +- integration/prepared_statements_full/Gemfile | 8 + .../prepared_statements_full/Gemfile.lock | 276 +++++++++++ integration/prepared_statements_full/dev.sh | 12 + .../protocol_out_of_sync_spec.rb | 56 +++ .../prepared_statements_full/rspec_helper.rb | 106 +++++ integration/prepared_statements_full/run.sh | 11 + .../prepared_statements_full/users.toml | 16 + integration/ruby/lb_spec.rb | 2 +- integration/ruby/rspec_helper.rb | 68 ++- pgdog/src/backend/protocol/state.rs | 95 +++- pgdog/src/backend/server.rs | 217 +++++++-- 14 files changed, 1253 insertions(+), 55 deletions(-) create mode 100644 docs/issues/PROTOCOL_OUT_OF_SYNC.md create mode 100644 integration/prepared_statements_full/Gemfile create mode 100644 integration/prepared_statements_full/Gemfile.lock create mode 100755 integration/prepared_statements_full/dev.sh create mode 100644 integration/prepared_statements_full/protocol_out_of_sync_spec.rb create mode 100644 integration/prepared_statements_full/rspec_helper.rb create mode 100755 integration/prepared_statements_full/run.sh diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8779fef34..e463ad6a5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -158,6 +158,8 @@ jobs: run: bash integration/js/pg_tests/run.sh - name: Ruby run: bash integration/ruby/run.sh + - name: Prepared statements (full) + run: bash integration/prepared_statements_full/run.sh - name: Java run: bash integration/java/run.sh - name: Mirror diff --git a/docs/issues/PROTOCOL_OUT_OF_SYNC.md b/docs/issues/PROTOCOL_OUT_OF_SYNC.md new file mode 100644 index 000000000..c94994419 --- /dev/null +++ b/docs/issues/PROTOCOL_OUT_OF_SYNC.md @@ -0,0 +1,435 @@ +# ProtocolOutOfSync — Known Root Causes + +`Error::ProtocolOutOfSync` fires when `ProtocolState`'s expected-response queue diverges from what +the backend actually sends. The catch site is `server.rs:394`; the connection is permanently marked +`State::Error` and discarded from the pool. + +**Queue mechanics** (`pgdog/src/backend/protocol/state.rs`). `handle()` pushes one `ExecutionItem` +per anticipated response before forwarding any client message. As server bytes arrive, `forward()` +calls `action(code)` which pops the queue front and checks the match. Two conditions raise +`ProtocolOutOfSync`: + +- **Empty queue** (`state.rs:168`) — a tracked message arrives but nothing was expected. +- **Ignore mismatch** (`state.rs:188–191`) — queue front is an `Ignore` slot but the server sent a + different code. + +--- + +## ✅ Issue 0 — `extended` flag never resets; `SET`-batch error poisons connection + +**Severity:** High — triggered by normal operation when `query_parser = "on"` and any parameterised +query has run on the connection. + +**Commit:** `54c9d3a942bdac55873f17d2af74ef1dfedb3f4a` + +**Location:** `pgdog/src/backend/protocol/state.rs`, `action()`, Error and ReadyForQuery arms. + +### Description + +When `query_parser = "on"`, pgdog caches every `SET` issued by a client so it can replay them +on whichever backend connection services the next request. The replay calls `execute_batch`, +which queues one `Code(RFQ)` per command — two cached `SET`s produce `[Code(RFQ), Code(RFQ)]`. + +Two independent defects caused `ProtocolOutOfSync` when a cached `SET` produced a server Error: + +**Defect 1 — `extended` one-way latch.** `ProtocolState.extended` was set on the first +parameterised (`Parse`/`Bind`) query and never cleared. The Error arm unconditionally set +`out_of_sync = true` when `extended == true`, permanently poisoning any connection that had +ever used extended protocol — even for unrelated simple-query errors. + +**Defect 2 — destructive queue clear.** The old Error arm (`pop_back + clear + push_back(RFQ)`) +was designed for a single-query queue. With two `Code(RFQ)` slots it collapsed to one: +the first `Z` consumed the only remaining slot, the second SET's `CommandComplete` hit empty +→ `ProtocolOutOfSync` before the client saw the actual SET error. + +Both defects are independent. Defect 2 fires on any two-command batch regardless of `extended`; +Defect 1 additionally poisons the connection via `out_of_sync = true`. + +### Conditions required + +All four must hold simultaneously: + +1. `query_parser = "on"` — pgdog tracks `SET` commands and replays them via `execute_batch` + when syncing session state onto a checked-out connection. +2. Two or more `SET` commands in the cached session state, at least one with an invalid value + (e.g. `SET lock_timeout TO sdf`) — produces two `Code(RFQ)` slots in the queue and causes + PostgreSQL to return an Error on the first replayed `SET`. +3. A prior extended-protocol query (`Parse + Bind + Execute + Sync`) on the same connection + — sets `extended = true`, arming the `out_of_sync = true` branch on any subsequent error + (Defect 1). +4. Any subsequent query after the failed `execute_batch` — hits a connection whose + `out_of_sync` is `true`, causing `PG::ConnectionBad` at the client. + +### Reproduction + +```ruby +# integration/ruby/pg_spec.rb +it 'out of sync' do + conn = PG.connect(..., application_name: '') + conn.exec_params 'SELECT $1', [1] # sets extended = true + conn.exec "SELECT 1" + conn.exec "SET lock_timeout TO sdfs" # invalid; query_parser will replay this + conn.exec "SET statement_timeout TO '1s'" + expect { conn.exec 'SELECT 1' }.to raise_error(/invalid value for parameter/) +end +``` + +```sh +cd integration/ruby && bundle exec rspec pg_spec.rb -e 'out of sync' +``` + +### Tests + +**State-machine unit test (`state.rs`, no backend needed)** + +- **`test_pipelined_simple_query_error_keeps_next_query_response`** — queues two `Code(RFQ)` slots, + fires `action('E')`, asserts the second slot survives, walks `Z`/`C`/`Z` to completion. + +**Server integration tests (`server.rs`, require PostgreSQL)** + +- **`test_execute_batch_simple_query_error_then_success`** — `execute_batch(["syntax error", "SELECT 1"])` + returns `ExecutionError` and leaves `server.done() == true`. +- **`test_out_of_sync_regression`** — exact production sequence: `Parse + Bind + Execute + Sync`, + drain five responses, then `execute_batch` with invalid + valid `SET`; asserts `ExecutionError` + and `server.done() == true`. + +```sh +cargo test -p pgdog test_execute_batch_simple_query_error_then_success +cargo test -p pgdog test_out_of_sync_regression +``` + +### Fix + +Two changes in `state.rs` (commit `54c9d3a9`): + +**Defect 2:** A `!self.extended` fast-path in the Error arm returns `Action::Forward` immediately +when the queue front is `Code(RFQ)`, leaving all slots untouched. For the extended path, +a `rfq_pos` scan drains only items before the first `Code(RFQ)` boundary — never consuming +slots belonging to subsequent queries. + +**Defect 1:** After every `ReadyForQuery`, `extended` is recomputed from remaining queue items +(`self.queue.iter().any(ExecutionItem::extended)`). When the pipeline drains, `extended` resets +to `false`, breaking the one-way latch. The `ExecutionItem::extended()` helper in `state.rs` +enables this scan. +--- + +## Issue 1 — Failed `Prepare` orphans the EXECUTE ReadyForQuery + +**Severity:** High — triggered by normal server behaviour; no client misbehaviour required. + +**Location:** `pgdog/src/backend/prepared_statements.rs`, `ProtocolMessage::Prepare` arm; +`pgdog/src/backend/protocol/state.rs`, Error handler. + +### Description + +When pgdog injects a `PREPARE` to rewrite a simple-query `EXECUTE` and that `PREPARE` fails on the +server, the Error handler incorrectly clears the queue. The subsequent `ReadyForQuery` from the now- +orphaned `EXECUTE` hits an empty queue and raises `ProtocolOutOfSync`. + +### Code path + +The simple-query rewriter turns `EXECUTE stmt_name(args)` into two prepended messages, each handled +independently by `handle()`. After both calls the queue is: + +``` +[Ignore(ExecutionCompleted), Ignore(ReadyForQuery), Code(ReadyForQuery)] + ↑────────── handle(Prepare) ──────────↑ ↑── handle(Query) ──↑ +``` + +If the injected `PREPARE` fails on the server: + +| Step | Server sends | Error handler action | Queue after | +|---|---|---|---| +| 1 | `Error` for PREPARE | simple-query path (`!extended`): `pop_front` loop drains `Ignore(C)` and `Ignore(Z)`; stops at `Code(RFQ)` | `[Code(RFQ)]` | +| 2 | `ReadyForQuery` for PREPARE | `pop_front` → `Code(RFQ)` consumed | **empty** | +| 3 | `Error` for EXECUTE (statement absent) | simple-query path: queue empty; loop is a no-op | **empty** | +| 4 | `ReadyForQuery` for EXECUTE | `pop_front` on empty → **ProtocolOutOfSync** | — | + +The simple-query Error handler (`state.rs:157–167`, the `!self.extended` arm) pops items from the +front of the queue until it reaches a `Code(ReadyForQuery)`. When the injected PREPARE fails this +drains both `Ignore` slots, leaving `Code(ReadyForQuery)` as the sole item. The subsequent `Z` for +the PREPARE consumes that slot, emptying the queue. The EXECUTE sub-request's `Error` then arrives +against an empty queue — the pop-front loop is a no-op — and its `ReadyForQuery` fires +`ProtocolOutOfSync`. + +Under high concurrency this becomes near-deterministic: the pool fast-path (`Guard::drop` → +`checkin` → `put`) hands a connection directly to a waiting client with no healthcheck, no idle +time, and no opportunity to drain the kernel socket buffer. The next query on that client consumes +the stale EXECUTE `Error + ReadyForQuery`, producing `ProtocolOutOfSync`. + +### Reproduction + +1. Connect to pgdog with session or transaction pooling. +2. Issue a simple-query `EXECUTE` for a statement that will fail to prepare (schema mismatch, syntax + error, or stale local cache with a duplicate name). +3. Issue any subsequent query on the same connection. +4. The second query fails with `PG::ConnectionBad` / `protocol is out of sync`. + +```sh +cd integration/prepared_statements_full && bash run.sh +``` + +### Tests + +Both tests live in `integration/prepared_statements_full/protocol_out_of_sync_spec.rb`. + +| Test | Pool mode | What it guards against | +|---|---|---| +| 1 | session | Orphaned EXECUTE RFQ hits empty queue on the very next query → `PG::ConnectionBad` | +| 2 | transaction | Stale EXECUTE `Error` served to next borrower (pool_size=1) → `PG::InvalidSqlStatementName` | + +- **Test 1 — Session mode.** + Session-pooled user (`pgdog_session`) pinned to one backend. After the failed prepare-inject, + `SELECT 1` is issued on the same connection. The orphaned EXECUTE `ReadyForQuery` hits an empty + queue, raising `ProtocolOutOfSync`. Client sees `PG::ConnectionBad`. + +- **Test 2 — Transaction mode (pool_size=1).** + `pgdog_tx_single` (transaction mode, one backend). After the EXECUTE error the connection returns + to the pool: the queue is empty and `out_of_sync` is false, so it appears clean. With only one + backend the stale EXECUTE `Error + ReadyForQuery` remain in the TCP buffer. The next query + (`SELECT 1`) reads the stale `Error` as its response → `PG::InvalidSqlStatementName`. + +### Fix + +Fix the Error handler in `state.rs:154–159`. When the failed message is part of a pgdog-injected +compound request, the handler must preserve the `Code(ReadyForQuery)` for the outer client-visible +request — not just the PREPARE's trailing slot. Concretely: the handler needs to recognise that +`Ignore` items at the back of the queue belong to a sub-request that is still in-flight, and must +keep the outer `Code(ReadyForQuery)` accordingly. + +The TCP-peek approach (`FIONREAD` / `MSG_PEEK` at checkin) is a valid defensive catch-all but adds a +syscall on every checkin and does not fix the root cause. + +--- + +## Issue 2 — Double `action()` call in `forward()` for server CopyDone + +**Severity:** Medium — requires the client to omit a trailing `Sync`. + +**Location:** `pgdog/src/backend/prepared_statements.rs`, `forward()`, lines ~198 and ~237. + +### Description + +`forward()` calls `state.action(code)` unconditionally at line 198, then a second time inside the +`'c'` (CopyDone) match arm at line 237. When no `Code(ReadyForQuery)` backstop is present in the +queue, the second call hits an empty queue and raises `ProtocolOutOfSync`. + +### Code path + +Normal path (safe): `Code(ReadyForQuery)` is always in the queue. `action('Z')` pushes it back rather +than consuming it, making the double call idempotent. + +Unsafe path — client sends `Parse + Bind + Execute + Flush` (no `Sync`). `handle()` builds: + +``` +[Code(ParseComplete), Code(BindComplete), Code(ExecutionCompleted)] +``` + +No `Code(ReadyForQuery)` is added. When the server responds with CopyDone: + +``` +First action('c'): pops Code(ExecutionCompleted) — consumed +Second action('c'): empty queue → ProtocolOutOfSync +``` + +### Reproduction + +Not triggerable via the `pg` gem or any libpq-based driver — libpq always appends `Sync` after +`Execute`. Requires sending raw protocol messages directly. + +```sh +cargo test -p pgdog test_copy_out_done_double_action_out_of_sync_without_sync +``` + +### Tests + +**State-machine unit tests (`state.rs`, no backend needed)** + +- **`test_copydone_double_action_safe_with_rfq_backstop`** — queue `[Code(Copy), Code(ReadyForQuery)]`; + two `action('c')` calls both succeed; RFQ slot is pushed back and survives. +- **`test_copydone_double_action_oos_without_rfq_backstop`** — queue `[Code(ExecutionCompleted)]`; + second `action('c')` returns `Err(ProtocolOutOfSync)`. + +**Server-level tests (`server.rs`, require PostgreSQL)** + +- **`test_copydone_double_action_oos_without_sync`** — `Parse + Bind + Execute + Flush` + (no Sync); reads ParseComplete, BindComplete, CopyOutResponse, CopyData ×2, then asserts + `ProtocolOutOfSync` on CopyDone. +- **`test_copydone_double_action_safe_with_sync`** — same pipeline with `Sync`; full sequence + completes without error; asserts `server.done()`. + +```sh +cargo test -p pgdog test_copydone_double_action +``` + +### Fix + +Remove the second `action()` call in the `'c'` arm of `forward()`, or guarantee that a +`Code(ReadyForQuery)` backstop is always in the queue before the CopyDone path is reached. Either +way, the invariant must be made explicit in code comments. + +--- + +## Issue 3 — Stale ReadyForQuery hits an `Ignore(ParseComplete)` slot + +**Severity:** Low — practically unreachable in normal operation. + +**Location:** `pgdog/src/backend/protocol/state.rs`, Ignore arm. + +### Description + +If a `ReadyForQuery` byte from a prior request cycle remains unread in the TCP receive buffer when +the next request starts, `action('Z')` fires while the queue front is `Ignore(ParseComplete)`. The +Ignore arm requires an exact code match; `ReadyForQuery != ParseComplete` → `ProtocolOutOfSync`. + +### Code path + +pgdog injects a Parse for a missing statement; queue front: + +``` +[Ignore(ParseComplete), Code(BindComplete), ...] +``` + +Stale `ReadyForQuery` arrives before `ParseComplete`: + +``` +action('Z'): generic pop → Ignore(ParseComplete) + → ReadyForQuery != ParseComplete → ProtocolOutOfSync +``` + +### Reproduction + +Not reproducible through normal pool operation. The `done()` guard chain prevents pool reclaim while +any `Ignore` item is present: + +- `ProtocolState::done()` = `is_empty() && !out_of_sync` → `false` while any `Ignore` slot exists. +- `PreparedStatements::done()` adds a second gate blocking reclaim while an injected Parse is in flight. +- `Pool::maybe_check_in()` discards errored connections before `can_check_in()` is evaluated. + +The precondition requires a concurrent-access bug that bypasses the pool guard, or direct TCP stream +injection. + +### Tests + +State-machine unit tests in `state.rs` cover the `action()` mismatch directly. A server-level +integration test is not practical; the precondition cannot be reached through normal sequential +protocol flow. + +```sh +cargo test -p pgdog test_stale_rfq +``` + +### Fix + +No code change required. The existing `done()` guard chain already prevents the precondition from +arising. If it were somehow reached, the resulting `ProtocolOutOfSync` would discard the connection +before reuse, bounding the blast radius to a single request. + +--- + +## Issue 4 — `extended` flag is permanently set and never resets + +**Severity:** Low-medium — affects connection-lifecycle semantics and silently changes Error handler +behaviour for all subsequent requests on a connection. + +**Location:** `pgdog/src/backend/protocol/state.rs`, `add()` / `add_ignore()`; `state.rs:151–153`, +Error handler. + +### Description + +`ProtocolState.extended` is set to `true` the first time any parameterised query runs on a connection +and is never reset. The Error handler checks this flag to set `out_of_sync = true`; because the flag +is permanent, every error on that connection — including plain simple-query errors — sets +`out_of_sync = true` spuriously. + +### Code path + +`add()` and `add_ignore()` set the flag whenever `ParseComplete ('1')` or `BindComplete ('2')` is +enqueued: + +```rust +self.extended = self.extended || code.extended(); +``` + +The Error handler (`state.rs:151–153`): + +```rust +ExecutionCode::Error => { + if self.extended { + self.out_of_sync = true; // fires on every error, forever + } + // ... +} +``` + +There is no reset path. + +### Consequences + +- `done()` stays `false` one extra round-trip (until RFQ clears `out_of_sync`) on simple-query + errors for connections that have ever served a parameterised query. Harmless in practice today, but + more conservative than necessary. +- Future changes to the Error handler that add `extended`-specific behaviour will silently apply to + all long-lived connections, not just those currently mid-pipeline. +- `extended` reads as "has this connection *ever* been in extended-protocol mode", not "is this + connection *currently* in extended-protocol mode" — a semantic mismatch that will mislead future + readers. + +### Reproduction + +1. Connect to pgdog. +2. Execute a parameterised query (any `$1` placeholder) — sets `extended = true`. +3. Execute `SELECT 1/0` (simple query). +4. Observe `server.out_of_sync() == true` immediately after the `'E'` response, before RFQ arrives. + Expected: `false`. + +```sh +cargo test -p pgdog test_extended_flag_never_resets +``` + +### Tests + +**`test_extended_flag_never_resets_spurious_out_of_sync`** in `server.rs` (requires PostgreSQL) — +three phases on one connection: + +1. *Baseline* — fresh connection (`extended = false`); `SELECT 1/0`; asserts `out_of_sync == false`. +2. *Trigger* — `Parse + Bind + Execute + Sync` for `SELECT $1::int`; permanently sets `extended = true`. +3. *Regression* — `SELECT 1/0` twice more; asserts `out_of_sync == true` after each `'E'`. Each + `ReadyForQuery` resets `out_of_sync` to `false` but leaves `extended` unchanged. + +```sh +cargo test -p pgdog test_extended_flag_never_resets +``` + +### Fix + +Reset `extended` to `false` at the same point `out_of_sync` resets — when `ReadyForQuery` is +processed and the queue is fully drained: + +```rust +ExecutionCode::ReadyForQuery => { + self.out_of_sync = false; + if self.is_empty() { + self.extended = false; // pipeline complete; reset for next request + } +} +``` + +Resetting only when `is_empty()` is safe: pipelined requests still in the queue keep `extended = true` +until the entire pipeline finishes. + +A post-fix test should verify: (a) phase 3 above now produces `out_of_sync == false`, and (b) an +intermediate `ReadyForQuery` inside a pipelined extended request does not prematurely reset `extended`. + +--- + +## Common thread + +All four issues share the same underlying fragility: the `ProtocolState` queue and the actual server +response stream diverge whenever an error or unexpected message interrupts a multi-message sub-request +injected transparently by pgdog. The Error handler was written for a single client-visible request and +does not account for the compound structures the prepared-statement rewriter produces. + +Issue 4 is a secondary consequence: `extended` was added as a guard for the Error handler but was +attached to the connection rather than the current pipeline, so it outlives the requests it was meant +to describe. diff --git a/integration/common.sh b/integration/common.sh index 812acc69f..d6a474d7d 100644 --- a/integration/common.sh +++ b/integration/common.sh @@ -23,8 +23,8 @@ function run_pgdog() { local config_file="${COMMON_DIR}/pgdog.config" if [ -z "${binary}" ]; then # Testing in release is faster and mirrors production. - cargo build --release - binary="target/release/pgdog" + cargo build + binary="target/debug/pgdog" fi if [ -f "${pid_file}" ]; then local existing_pid=$(cat "${pid_file}") diff --git a/integration/prepared_statements_full/Gemfile b/integration/prepared_statements_full/Gemfile new file mode 100644 index 000000000..6f147c82f --- /dev/null +++ b/integration/prepared_statements_full/Gemfile @@ -0,0 +1,8 @@ +# frozen_string_literal: true + +source 'https://rubygems.org' +gem 'pg' +gem 'rails' +gem 'rspec', '~> 3.4' +gem 'rubocop' +gem 'toxiproxy' diff --git a/integration/prepared_statements_full/Gemfile.lock b/integration/prepared_statements_full/Gemfile.lock new file mode 100644 index 000000000..a9e17f4d2 --- /dev/null +++ b/integration/prepared_statements_full/Gemfile.lock @@ -0,0 +1,276 @@ +GEM + remote: https://rubygems.org/ + specs: + action_text-trix (2.1.18) + railties + actioncable (8.1.3) + actionpack (= 8.1.3) + activesupport (= 8.1.3) + nio4r (~> 2.0) + websocket-driver (>= 0.6.1) + zeitwerk (~> 2.6) + actionmailbox (8.1.3) + actionpack (= 8.1.3) + activejob (= 8.1.3) + activerecord (= 8.1.3) + activestorage (= 8.1.3) + activesupport (= 8.1.3) + mail (>= 2.8.0) + actionmailer (8.1.3) + actionpack (= 8.1.3) + actionview (= 8.1.3) + activejob (= 8.1.3) + activesupport (= 8.1.3) + mail (>= 2.8.0) + rails-dom-testing (~> 2.2) + actionpack (8.1.3) + actionview (= 8.1.3) + activesupport (= 8.1.3) + nokogiri (>= 1.8.5) + rack (>= 2.2.4) + rack-session (>= 1.0.1) + rack-test (>= 0.6.3) + rails-dom-testing (~> 2.2) + rails-html-sanitizer (~> 1.6) + useragent (~> 0.16) + actiontext (8.1.3) + action_text-trix (~> 2.1.15) + actionpack (= 8.1.3) + activerecord (= 8.1.3) + activestorage (= 8.1.3) + activesupport (= 8.1.3) + globalid (>= 0.6.0) + nokogiri (>= 1.8.5) + actionview (8.1.3) + activesupport (= 8.1.3) + builder (~> 3.1) + erubi (~> 1.11) + rails-dom-testing (~> 2.2) + rails-html-sanitizer (~> 1.6) + activejob (8.1.3) + activesupport (= 8.1.3) + globalid (>= 0.3.6) + activemodel (8.1.3) + activesupport (= 8.1.3) + activerecord (8.1.3) + activemodel (= 8.1.3) + activesupport (= 8.1.3) + timeout (>= 0.4.0) + activestorage (8.1.3) + actionpack (= 8.1.3) + activejob (= 8.1.3) + activerecord (= 8.1.3) + activesupport (= 8.1.3) + marcel (~> 1.0) + activesupport (8.1.3) + base64 + bigdecimal + concurrent-ruby (~> 1.0, >= 1.3.1) + connection_pool (>= 2.2.5) + drb + i18n (>= 1.6, < 2) + json + logger (>= 1.4.2) + minitest (>= 5.1) + securerandom (>= 0.3) + tzinfo (~> 2.0, >= 2.0.5) + uri (>= 0.13.1) + ast (2.4.3) + base64 (0.3.0) + bigdecimal (4.1.1) + builder (3.3.0) + concurrent-ruby (1.3.6) + connection_pool (3.0.2) + crass (1.0.6) + date (3.5.1) + diff-lcs (1.6.2) + drb (2.2.3) + erb (6.0.2) + erubi (1.13.1) + globalid (1.3.0) + activesupport (>= 6.1) + i18n (1.14.8) + concurrent-ruby (~> 1.0) + io-console (0.8.2) + irb (1.17.0) + pp (>= 0.6.0) + prism (>= 1.3.0) + rdoc (>= 4.0.0) + reline (>= 0.4.2) + json (2.19.3) + language_server-protocol (3.17.0.5) + lint_roller (1.1.0) + logger (1.7.0) + loofah (2.25.1) + crass (~> 1.0.2) + nokogiri (>= 1.12.0) + mail (2.9.0) + logger + mini_mime (>= 0.1.1) + net-imap + net-pop + net-smtp + marcel (1.1.0) + mini_mime (1.1.5) + minitest (6.0.3) + drb (~> 2.0) + prism (~> 1.5) + net-imap (0.6.3) + date + net-protocol + net-pop (0.1.2) + net-protocol + net-protocol (0.2.2) + timeout + net-smtp (0.5.1) + net-protocol + nio4r (2.7.5) + nokogiri (1.19.2-aarch64-linux-gnu) + racc (~> 1.4) + nokogiri (1.19.2-aarch64-linux-musl) + racc (~> 1.4) + nokogiri (1.19.2-arm-linux-gnu) + racc (~> 1.4) + nokogiri (1.19.2-arm-linux-musl) + racc (~> 1.4) + nokogiri (1.19.2-arm64-darwin) + racc (~> 1.4) + nokogiri (1.19.2-x86_64-darwin) + racc (~> 1.4) + nokogiri (1.19.2-x86_64-linux-gnu) + racc (~> 1.4) + nokogiri (1.19.2-x86_64-linux-musl) + racc (~> 1.4) + parallel (1.28.0) + parser (3.3.11.1) + ast (~> 2.4.1) + racc + pg (1.6.3) + pg (1.6.3-aarch64-linux) + pg (1.6.3-aarch64-linux-musl) + pg (1.6.3-arm64-darwin) + pg (1.6.3-x86_64-darwin) + pg (1.6.3-x86_64-linux) + pg (1.6.3-x86_64-linux-musl) + pp (0.6.3) + prettyprint + prettyprint (0.2.0) + prism (1.9.0) + psych (5.3.1) + date + stringio + racc (1.8.1) + rack (3.2.6) + rack-session (2.1.1) + base64 (>= 0.1.0) + rack (>= 3.0.0) + rack-test (2.2.0) + rack (>= 1.3) + rackup (2.3.1) + rack (>= 3) + rails (8.1.3) + actioncable (= 8.1.3) + actionmailbox (= 8.1.3) + actionmailer (= 8.1.3) + actionpack (= 8.1.3) + actiontext (= 8.1.3) + actionview (= 8.1.3) + activejob (= 8.1.3) + activemodel (= 8.1.3) + activerecord (= 8.1.3) + activestorage (= 8.1.3) + activesupport (= 8.1.3) + bundler (>= 1.15.0) + railties (= 8.1.3) + rails-dom-testing (2.3.0) + activesupport (>= 5.0.0) + minitest + nokogiri (>= 1.6) + rails-html-sanitizer (1.7.0) + loofah (~> 2.25) + nokogiri (>= 1.15.7, != 1.16.7, != 1.16.6, != 1.16.5, != 1.16.4, != 1.16.3, != 1.16.2, != 1.16.1, != 1.16.0.rc1, != 1.16.0) + railties (8.1.3) + actionpack (= 8.1.3) + activesupport (= 8.1.3) + irb (~> 1.13) + rackup (>= 1.0.0) + rake (>= 12.2) + thor (~> 1.0, >= 1.2.2) + tsort (>= 0.2) + zeitwerk (~> 2.6) + rainbow (3.1.1) + rake (13.3.1) + rdoc (7.2.0) + erb + psych (>= 4.0.0) + tsort + regexp_parser (2.12.0) + reline (0.6.3) + io-console (~> 0.5) + rspec (3.13.2) + rspec-core (~> 3.13.0) + rspec-expectations (~> 3.13.0) + rspec-mocks (~> 3.13.0) + rspec-core (3.13.6) + rspec-support (~> 3.13.0) + rspec-expectations (3.13.5) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.13.0) + rspec-mocks (3.13.8) + diff-lcs (>= 1.2.0, < 2.0) + rspec-support (~> 3.13.0) + rspec-support (3.13.7) + rubocop (1.86.0) + json (~> 2.3) + language_server-protocol (~> 3.17.0.2) + lint_roller (~> 1.1.0) + parallel (~> 1.10) + parser (>= 3.3.0.2) + rainbow (>= 2.2.2, < 4.0) + regexp_parser (>= 2.9.3, < 3.0) + rubocop-ast (>= 1.49.0, < 2.0) + ruby-progressbar (~> 1.7) + unicode-display_width (>= 2.4.0, < 4.0) + rubocop-ast (1.49.1) + parser (>= 3.3.7.2) + prism (~> 1.7) + ruby-progressbar (1.13.0) + securerandom (0.4.1) + stringio (3.2.0) + thor (1.5.0) + timeout (0.6.1) + toxiproxy (2.0.2) + tsort (0.2.0) + tzinfo (2.0.6) + concurrent-ruby (~> 1.0) + unicode-display_width (3.2.0) + unicode-emoji (~> 4.1) + unicode-emoji (4.2.0) + uri (1.1.1) + useragent (0.16.11) + websocket-driver (0.8.0) + base64 + websocket-extensions (>= 0.1.0) + websocket-extensions (0.1.5) + zeitwerk (2.7.5) + +PLATFORMS + aarch64-linux + aarch64-linux-gnu + aarch64-linux-musl + arm-linux-gnu + arm-linux-musl + arm64-darwin + x86_64-darwin + x86_64-linux-gnu + x86_64-linux-musl + +DEPENDENCIES + pg + rails + rspec (~> 3.4) + rubocop + toxiproxy + +BUNDLED WITH + 2.7.2 diff --git a/integration/prepared_statements_full/dev.sh b/integration/prepared_statements_full/dev.sh new file mode 100755 index 000000000..f36274590 --- /dev/null +++ b/integration/prepared_statements_full/dev.sh @@ -0,0 +1,12 @@ +#!/bin/bash +set -e +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) + +pushd ${SCRIPT_DIR} + +export GEM_HOME=~/.gem +mkdir -p ${GEM_HOME} +bundle install +bundle exec rspec *_spec.rb + +popd diff --git a/integration/prepared_statements_full/protocol_out_of_sync_spec.rb b/integration/prepared_statements_full/protocol_out_of_sync_spec.rb new file mode 100644 index 000000000..11e3a889a --- /dev/null +++ b/integration/prepared_statements_full/protocol_out_of_sync_spec.rb @@ -0,0 +1,56 @@ +# frozen_string_literal: true + +require_relative 'rspec_helper' + +# Triggers the failed-prepare / orphaned-EXECUTE scenario. +# +# When pgdog rewrites a simple-query EXECUTE it injects [Prepare, Query]. +# If the injected PREPARE fails, the outer EXECUTE sub-request (Error + +# ReadyForQuery) must be consumed internally — nothing stale left on the wire. +def trigger_prepare_inject_failure(conn, statement_name:) + # PREPARE fails — pgdog caches the statement name despite the error. + expect { conn.exec "PREPARE #{statement_name} AS SELECT 1 FROM __pgdog_nonexistent_table__" } + .to raise_error(PG::Error, /__pgdog_nonexistent_table__/) + + # EXECUTE triggers [Prepare, Query] injection; the re-injected PREPARE fails + # again. pgdog must drain the orphaned EXECUTE E+Z internally and surface + # only the application-visible error to the caller. + expect { conn.exec "EXECUTE #{statement_name}" } + .to raise_error(PG::Error, /__pgdog_nonexistent_table__/) +end + +describe 'protocol out of sync regressions' do + after do + ensure_done + end + + # Session mode: a failed prepare-inject must not leave stale bytes on the + # wire. The connection stays usable for the next query. + it 'connection remains usable after failed prepare-inject in session mode' do + conn = connect_pgdog(user: 'pgdog_session') + begin + trigger_prepare_inject_failure(conn, statement_name: 'pgdog_prepare_inject_session') + + result = conn.exec 'SELECT 1 AS alive' + expect(result.first['alive']).to eq('1') + ensure + conn.close unless conn.finished? + end + end + + # Transaction mode (pool_size=1): the backend connection is returned to the + # pool after each query. With a single backend any stale bytes not drained + # internally are visible to the very next borrower. The connection must be + # clean so the next query succeeds. + it 'connection remains usable after failed prepare-inject in transaction mode' do + conn = connect_pgdog(user: 'pgdog_tx_single') + begin + trigger_prepare_inject_failure(conn, statement_name: 'pgdog_prepare_inject_tx') + + result = conn.exec 'SELECT 1 AS alive' + expect(result.first['alive']).to eq('1') + ensure + conn.close unless conn.finished? + end + end +end diff --git a/integration/prepared_statements_full/rspec_helper.rb b/integration/prepared_statements_full/rspec_helper.rb new file mode 100644 index 000000000..782cd1146 --- /dev/null +++ b/integration/prepared_statements_full/rspec_helper.rb @@ -0,0 +1,106 @@ +# frozen_string_literal: true + +require 'active_record' +require 'rspec' +require 'pg' +require 'toxiproxy' + +def admin + PG.connect('postgres://admin:pgdog@127.0.0.1:6432/admin') +end + +def admin_exec(sql) + conn = admin + conn.exec sql +ensure + conn&.close +end + +def failover + PG.connect('postgres://pgdog:pgdog@127.0.0.1:6432/failover') +end + +def admin_stats(database, column = nil) + conn = admin + stats = conn.exec 'SHOW STATS' + conn.close + stats = stats.select { |item| item['database'] == database } + return stats.map { |item| item[column].to_i } unless column.nil? + + stats +end + +def ensure_done + deadline = Time.now + 2 + pools = [] + clients = [] + servers = [] + pg_clients = [] + current_client_id = nil + + loop do + conn = PG.connect(dbname: 'admin', user: 'admin', password: 'pgdog', port: 6432, host: '127.0.0.1') + begin + pools = conn.exec 'SHOW POOLS' + current_client_id = conn.backend_pid + clients = conn.exec 'SHOW CLIENTS' + servers = conn.exec 'SHOW SERVERS' + ensure + conn.close + end + + pg_conn = PG.connect(dbname: 'pgdog', user: 'pgdog', password: 'pgdog', port: 5432, host: '127.0.0.1') + begin + pg_clients = pg_conn.exec 'SELECT state FROM pg_stat_activity'\ + " WHERE datname IN ('pgdog', 'shard_0', 'shard_1')"\ + " AND backend_type = 'client backend'"\ + " AND query NOT LIKE '%pg_stat_activity%'" + ensure + pg_conn.close + end + + pools_ready = pools.all? do |pool| + pool['sv_active'] == '0' && pool['cl_waiting'] == '0' && pool['out_of_sync'] == '0' + end + clients_ready = clients.all? do |client| + client['id'].to_i == current_client_id || client['state'] == 'idle' + end + servers_ready = servers + .select { |server| server['application_name'] != 'PgDog Pub/Sub Listener' } + .all? { |server| server['state'] == 'idle' } + pg_clients_ready = pg_clients.all? { |client| client['state'] == 'idle' } + + break if pools_ready && clients_ready && servers_ready && pg_clients_ready + break if Time.now >= deadline + + sleep 0.05 + end + + pools.each do |pool| + expect(pool['sv_active']).to eq('0') + expect(pool['cl_waiting']).to eq('0') + expect(pool['out_of_sync']).to eq('0') + end + + clients.each do |client| + next if client['id'].to_i == current_client_id + expect(client['state']).to eq('idle') + end + + servers + .select do |server| + server['application_name'] != 'PgDog Pub/Sub Listener' + end + .each do |server| + expect(server['state']).to eq('idle') + end + + pg_clients.each do |client| + expect(client['state']).to eq('idle') + end +end + + +def connect_pgdog(user: 'pgdog') + PG.connect(dbname: 'pgdog', user:, password: 'pgdog', port: 6432, host: '127.0.0.1') +end \ No newline at end of file diff --git a/integration/prepared_statements_full/run.sh b/integration/prepared_statements_full/run.sh new file mode 100755 index 000000000..7efdf24d1 --- /dev/null +++ b/integration/prepared_statements_full/run.sh @@ -0,0 +1,11 @@ +#!/bin/bash +set -e +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +source ${SCRIPT_DIR}/../common.sh + +run_pgdog "integration/prepared_statements_full" +wait_for_pgdog + +bash ${SCRIPT_DIR}/dev.sh + +stop_pgdog diff --git a/integration/prepared_statements_full/users.toml b/integration/prepared_statements_full/users.toml index 9a8205f04..a97596a57 100644 --- a/integration/prepared_statements_full/users.toml +++ b/integration/prepared_statements_full/users.toml @@ -2,3 +2,19 @@ database = "pgdog" name = "pgdog" password = "pgdog" + + +[[users]] +name = "pgdog_session" +database = "pgdog" +password = "pgdog" +server_user = "pgdog" +pooler_mode = "session" + +[[users]] +name = "pgdog_tx_single" +database = "pgdog" +password = "pgdog" +server_user = "pgdog" +pooler_mode = "transaction" +pool_size = 1 \ No newline at end of file diff --git a/integration/ruby/lb_spec.rb b/integration/ruby/lb_spec.rb index a3a6c3c39..061d614eb 100644 --- a/integration/ruby/lb_spec.rb +++ b/integration/ruby/lb_spec.rb @@ -13,7 +13,7 @@ it 'distributes traffic evenly' do conn = failover # Reset stats and bans - admin.exec "RECONNECT" + admin_exec 'RECONNECT' before = admin_stats('failover') 250.times do diff --git a/integration/ruby/rspec_helper.rb b/integration/ruby/rspec_helper.rb index 5eb8317ae..ac55b4acf 100644 --- a/integration/ruby/rspec_helper.rb +++ b/integration/ruby/rspec_helper.rb @@ -9,6 +9,13 @@ def admin PG.connect('postgres://admin:pgdog@127.0.0.1:6432/admin') end +def admin_exec(sql) + conn = admin + conn.exec sql +ensure + conn&.close +end + def failover PG.connect('postgres://pgdog:pgdog@127.0.0.1:6432/failover') end @@ -24,20 +31,62 @@ def admin_stats(database, column = nil) end def ensure_done - conn = PG.connect(dbname: 'admin', user: 'admin', password: 'pgdog', port: 6432, host: '127.0.0.1') - pools = conn.exec 'SHOW POOLS' + deadline = Time.now + 2 + pools = [] + clients = [] + servers = [] + pg_clients = [] + current_client_id = nil + + loop do + conn = PG.connect(dbname: 'admin', user: 'admin', password: 'pgdog', port: 6432, host: '127.0.0.1') + begin + pools = conn.exec 'SHOW POOLS' + current_client_id = conn.backend_pid + clients = conn.exec 'SHOW CLIENTS' + servers = conn.exec 'SHOW SERVERS' + ensure + conn.close + end + + pg_conn = PG.connect(dbname: 'pgdog', user: 'pgdog', password: 'pgdog', port: 5432, host: '127.0.0.1') + begin + pg_clients = pg_conn.exec 'SELECT state FROM pg_stat_activity'\ + " WHERE datname IN ('pgdog', 'shard_0', 'shard_1')"\ + " AND backend_type = 'client backend'"\ + " AND query NOT LIKE '%pg_stat_activity%'" + ensure + pg_conn.close + end + + pools_ready = pools.all? do |pool| + pool['sv_active'] == '0' && pool['cl_waiting'] == '0' && pool['out_of_sync'] == '0' + end + clients_ready = clients.all? do |client| + client['id'].to_i == current_client_id || client['state'] == 'idle' + end + servers_ready = servers + .select { |server| server['application_name'] != 'PgDog Pub/Sub Listener' } + .all? { |server| server['state'] == 'idle' } + pg_clients_ready = pg_clients.all? { |client| client['state'] == 'idle' } + + break if pools_ready && clients_ready && servers_ready && pg_clients_ready + break if Time.now >= deadline + + sleep 0.05 + end + pools.each do |pool| expect(pool['sv_active']).to eq('0') expect(pool['cl_waiting']).to eq('0') expect(pool['out_of_sync']).to eq('0') end - current_client_id = conn.backend_pid - clients = conn.exec 'SHOW CLIENTS' + clients.each do |client| next if client['id'].to_i == current_client_id expect(client['state']).to eq('idle') end - servers = conn.exec 'SHOW SERVERS' + servers .select do |server| server['application_name'] != 'PgDog Pub/Sub Listener' @@ -46,12 +95,7 @@ def ensure_done expect(server['state']).to eq('idle') end - conn = PG.connect(dbname: 'pgdog', user: 'pgdog', password: 'pgdog', port: 5432, host: '127.0.0.1') - clients = conn.exec 'SELECT state FROM pg_stat_activity'\ - " WHERE datname IN ('pgdog', 'shard_0', 'shard_1')"\ - " AND backend_type = 'client backend'"\ - " AND query NOT LIKE '%pg_stat_activity%'" - clients.each do |client| + pg_clients.each do |client| expect(client['state']).to eq('idle') end -end +end \ No newline at end of file diff --git a/pgdog/src/backend/protocol/state.rs b/pgdog/src/backend/protocol/state.rs index a93b89a99..8cef58d14 100644 --- a/pgdog/src/backend/protocol/state.rs +++ b/pgdog/src/backend/protocol/state.rs @@ -234,11 +234,6 @@ impl ProtocolState { &self.queue } - #[cfg(test)] - pub(crate) fn queue_mut(&mut self) -> &mut VecDeque { - &mut self.queue - } - pub(crate) fn done(&self) -> bool { self.is_empty() && !self.out_of_sync } @@ -890,4 +885,94 @@ mod test { assert_eq!(state.action('Z').unwrap(), Action::Forward); assert!(state.is_empty()); } + + // Double action('c') for server CopyDone + + // Safe path: Code(ReadyForQuery) backstop makes the double action('c') call idempotent. + #[test] + fn test_copydone_double_action_safe_with_rfq_backstop() { + let mut state = ProtocolState::default(); + // 1. Queue: CopyOut slot + RFQ backstop (from Sync). + state.add('G'); // CopyOut + state.add('Z'); // ReadyForQuery backstop + + // 2. First action('c'): pops CopyOut; RFQ backstop untouched. + assert_eq!(state.action('c').unwrap(), Action::Forward); + assert_eq!(state.len(), 1); + + // 3. Second action('c'): sees RFQ at front; pushes it back (idempotent). + assert_eq!(state.action('c').unwrap(), Action::Forward); + assert_eq!(state.len(), 1); // RFQ still present for the server's ReadyForQuery + } + + // Failure path: no Code(ReadyForQuery) backstop — second action('c') hits empty queue. + #[test] + fn test_copydone_double_action_oos_without_rfq_backstop() { + let mut state = ProtocolState::default(); + // 1. Queue: Execute + Flush (no Sync) — no RFQ backstop. + state.add('C'); // ExecutionCompleted + + // 2. First action('c'): pops ExecutionCompleted; queue empty. + assert_eq!(state.action('c').unwrap(), Action::Forward); + assert!(state.is_empty()); + + // 3. Second action('c'): empty queue → ProtocolOutOfSync. + assert!(state.action('c').is_err()); + } + + // Stale RFQ arrives before injected ParseComplete — Ignore arm rejects the mismatch. + #[test] + fn test_stale_rfq_hits_ignore_parsecomplete() { + let mut state = ProtocolState::default(); + // 1. pgdog injects Parse; queue: [Ignore(ParseComplete), BindComplete, CommandComplete, RFQ]. + state.add_ignore('1'); // ParseComplete — injected + state.add('2'); // BindComplete + state.add('C'); // CommandComplete + state.add('Z'); // ReadyForQuery + + // Stale RFQ from prior cycle arrives before ParseComplete. + // ReadyForQuery != ParseComplete → ProtocolOutOfSync. + assert!( + state.action('Z').is_err(), + "stale RFQ against Ignore(ParseComplete) must produce ProtocolOutOfSync" + ); + } + + // Variant: stale RFQ hits Ignore(BindComplete) — same mismatch for any Ignore slot. + #[test] + fn test_stale_rfq_hits_ignore_bindcomplete() { + let mut state = ProtocolState::default(); + // Both Parse and Bind are injected (Describe path). + state.add_ignore('1'); // ParseComplete — injected + state.add_ignore('2'); // BindComplete — injected + state.add('T'); // RowDescription + state.add('C'); // CommandComplete + state.add('Z'); // ReadyForQuery + + // ParseComplete arrives normally and is swallowed. + assert_eq!(state.action('1').unwrap(), Action::Ignore); + + // Queue front is now Ignore(BindComplete). + // A stale RFQ arrives before BindComplete → ProtocolOutOfSync. + assert!( + state.action('Z').is_err(), + "stale RFQ against Ignore(BindComplete) must produce ProtocolOutOfSync" + ); + } + + // Happy path: injected ParseComplete arrives in order — silently ignored, rest forwarded. + #[test] + fn test_injected_parse_happy_path() { + let mut state = ProtocolState::default(); + state.add_ignore('1'); // ParseComplete — injected, swallowed + state.add('2'); // BindComplete + state.add('C'); // CommandComplete + state.add('Z'); // ReadyForQuery + + assert_eq!(state.action('1').unwrap(), Action::Ignore); // swallowed + assert_eq!(state.action('2').unwrap(), Action::Forward); // forwarded + assert_eq!(state.action('C').unwrap(), Action::Forward); // forwarded + assert_eq!(state.action('Z').unwrap(), Action::Forward); // forwarded + assert!(state.is_empty()); + } } diff --git a/pgdog/src/backend/server.rs b/pgdog/src/backend/server.rs index 5b2c7e09c..daf2722e8 100644 --- a/pgdog/src/backend/server.rs +++ b/pgdog/src/backend/server.rs @@ -1092,7 +1092,6 @@ impl Drop for Server { } } -// Used for testing. #[cfg(test)] pub mod test { use bytes::{BufMut, BytesMut}; @@ -1517,7 +1516,7 @@ pub mod test { let (new, name) = global.write().insert(&parse); assert!(new); let parse = parse.rename(&name); - assert_eq!(parse.name(), "__pgdog_1"); + assert!(parse.name().starts_with("__pgdog_")); let mut server = test_server().await; @@ -1526,7 +1525,7 @@ pub mod test { .send( &vec![ ProtocolMessage::from(Bind::new_params( - "__pgdog_1", + &name, &[Parameter { len: 1, data: "1".as_bytes().into(), @@ -2720,38 +2719,6 @@ pub mod test { ); } - #[tokio::test] - async fn test_protocol_out_of_sync_sets_error_state() { - let mut server = test_server().await; - - server - .send(&vec![Query::new("SELECT 1").into()].into()) - .await - .unwrap(); - - for c in ['T', 'D'] { - let msg = server.read().await.unwrap(); - assert_eq!(msg.code(), c); - } - - // simulate an unlikely, but existent out-of-sync state - server - .prepared_statements_mut() - .state_mut() - .queue_mut() - .clear(); - - let res = server.read().await; - assert!( - matches!(res, Err(Error::ProtocolOutOfSync)), - "protocol should be out of sync" - ); - assert!( - server.stats().get_state() == State::Error, - "state should be Error after detecting desync" - ) - } - #[tokio::test] async fn test_reset_clears_client_params() { let mut server = test_server().await; @@ -3588,4 +3555,184 @@ pub mod test { assert!(server.force_close()); assert_eq!(server.stats().get_state(), State::ForceClose); } + + // Failed injected PREPARE leaves EXECUTE ReadyForQuery unmatched — Error handler empties the queue. + #[tokio::test] + async fn test_prepare_execute_inject_failure_orphans_execute_rfq() { + let mut server = test_server().await; + + // 1. Send [Prepare, Query] as the rewriter injects for EXECUTE. + server + .send( + &vec![ + ProtocolMessage::Prepare { + name: "__pgdog_prepare_inject_test".to_string(), + statement: "SELECT 1 FROM __pgdog_nonexistent_table__".to_string(), + }, + ProtocolMessage::Query(Query::new("EXECUTE __pgdog_prepare_inject_test()")), + ] + .into(), + ) + .await + .unwrap(); + + // 2. PREPARE 'E' forwarded; 'Z' consumes re-added Code(RFQ) — queue empty. + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'E'); // 'E' PREPARE error + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'Z'); // 'Z' PREPARE RFQ — queue now empty + + // 3. EXECUTE 'E' forwarded (no-op on empty queue). + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'E'); // 'E' EXECUTE error + + // 4. BUG: EXECUTE 'Z' hits empty queue → ProtocolOutOfSync (fix: assert 'Z' + done()). + let err = server.read().await.unwrap_err(); + assert!( + matches!(err, Error::ProtocolOutOfSync), + "expected ProtocolOutOfSync; got {:?}", + err, + ); + } + + // Extended Execute + Flush (no Sync): no RFQ backstop — double action('c') raises ProtocolOutOfSync. + #[tokio::test] + async fn test_copydone_double_action_oos_without_sync() { + let mut server = test_server().await; + + // 1. Parse + Bind + Execute + Flush (not Sync); no RFQ backstop in queue. + server + .send( + &vec![ + ProtocolMessage::Parse(Parse::new_anonymous("COPY (VALUES (1),(2)) TO STDOUT")), + ProtocolMessage::Bind(Bind::new_params("", &[])), + ProtocolMessage::Execute(Execute::new()), + // Flush (not Sync): prompts PostgreSQL to send buffered responses. + // handle() maps this to Other, adding nothing to the queue. + Flush.into(), + ] + .into(), + ) + .await + .unwrap(); + + // 2. ParseComplete, BindComplete, CopyOutResponse, CopyData x2 arrive normally. + assert_eq!(server.read().await.unwrap().code(), '1'); // ParseComplete + assert_eq!(server.read().await.unwrap().code(), '2'); // BindComplete + assert_eq!(server.read().await.unwrap().code(), 'H'); // CopyOutResponse + assert_eq!(server.read().await.unwrap().code(), 'd'); // CopyData row 1 + assert_eq!(server.read().await.unwrap().code(), 'd'); // CopyData row 2 + // 3. BUG: CopyDone — first action() pops ExecutionCompleted; second hits empty queue. + assert!( + matches!(server.read().await.unwrap_err(), Error::ProtocolOutOfSync), + "expected ProtocolOutOfSync" + ); + } + + // Safe path: Sync adds Code(RFQ) backstop — double action('c') is idempotent. + #[tokio::test] + async fn test_copydone_double_action_safe_with_sync() { + let mut server = test_server().await; + + // 1. Parse + Bind + Execute + Sync; RFQ backstop added to queue. + server + .send( + &vec![ + ProtocolMessage::Parse(Parse::new_anonymous("COPY (VALUES (1),(2)) TO STDOUT")), + ProtocolMessage::Bind(Bind::new_params("", &[])), + ProtocolMessage::Execute(Execute::new()), + ProtocolMessage::Sync(Sync), + ] + .into(), + ) + .await + .unwrap(); + + // 2. Full response sequence — CopyDone is safe with RFQ backstop. + assert_eq!(server.read().await.unwrap().code(), '1'); // ParseComplete + assert_eq!(server.read().await.unwrap().code(), '2'); // BindComplete + assert_eq!(server.read().await.unwrap().code(), 'H'); // CopyOutResponse + assert_eq!(server.read().await.unwrap().code(), 'd'); // CopyData row 1 + assert_eq!(server.read().await.unwrap().code(), 'd'); // CopyData row 2 + assert_eq!(server.read().await.unwrap().code(), 'c'); // CopyDone -- safe with RFQ backstop + assert_eq!(server.read().await.unwrap().code(), 'C'); // CommandComplete + assert_eq!(server.read().await.unwrap().code(), 'Z'); // ReadyForQuery + assert!( + server.done(), + "server must be done after full response sequence" + ); + } + + // extended=true sticks after any parameterised query; Error handler sets out_of_sync on every subsequent error. + #[tokio::test] + async fn test_extended_flag_never_resets_spurious_out_of_sync() { + use crate::net::bind::Parameter; + + let mut server = test_server().await; + + // 1. Baseline: extended=false; simple-query error must not set out_of_sync. + server + .send(&vec![Query::new("SELECT 1/0").into()].into()) + .await + .unwrap(); + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'E'); + assert!(!server.out_of_sync()); + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'Z'); + assert!(server.done()); + + // 2. Parameterised query sets extended=true permanently. + let bind = Bind::new_params_codes( + "", + &[Parameter { + len: 1, + data: "1".as_bytes().into(), + }], + &[Format::Text], + ); + server + .send( + &vec![ + ProtocolMessage::from(Parse::new_anonymous("SELECT $1::int")), + ProtocolMessage::from(bind), + ProtocolMessage::from(Execute::new()), + ProtocolMessage::from(Sync), + ] + .into(), + ) + .await + .unwrap(); + + for c in ['1', '2', 'D', 'C', 'Z'] { + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), c); + } + assert!(server.done()); + + // 3. Same error on same connection: extended stuck → out_of_sync=true spuriously. + server + .send(&vec![Query::new("SELECT 1/0").into()].into()) + .await + .unwrap(); + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'E'); + assert!(server.out_of_sync()); // spurious: extended=true even for plain simple query + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'Z'); // RFQ clears out_of_sync but leaves extended stuck + assert!(!server.out_of_sync()); + assert!(server.done()); + + // 4. Confirm extended remains stuck across RFQ resets. + server + .send(&vec![Query::new("SELECT 1/0").into()].into()) + .await + .unwrap(); + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'E'); + assert!(server.out_of_sync()); + let msg = server.read().await.unwrap(); + assert_eq!(msg.code(), 'Z'); + assert!(server.done()); + } } From b96163a1cdee4116b71093fc9b00e3c14d455284 Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Tue, 14 Apr 2026 21:54:33 +0000 Subject: [PATCH 2/6] add test for pipelining mode --- docs/issues/PROTOCOL_OUT_OF_SYNC.md | 79 ++++++++++++++-- integration/ruby/protocol_out_of_sync_spec.rb | 57 +++++++++++ pgdog/src/backend/protocol/state.rs | 94 +++++++++++++++++++ pgdog/src/backend/server.rs | 57 +++++++++++ 4 files changed, 279 insertions(+), 8 deletions(-) create mode 100644 integration/ruby/protocol_out_of_sync_spec.rb diff --git a/docs/issues/PROTOCOL_OUT_OF_SYNC.md b/docs/issues/PROTOCOL_OUT_OF_SYNC.md index c94994419..5bb8672f7 100644 --- a/docs/issues/PROTOCOL_OUT_OF_SYNC.md +++ b/docs/issues/PROTOCOL_OUT_OF_SYNC.md @@ -423,13 +423,76 @@ intermediate `ReadyForQuery` inside a pipelined extended request does not premat --- -## Common thread +## Issue 5 — Error in first pipelined request clears subsequent requests' queue entries -All four issues share the same underlying fragility: the `ProtocolState` queue and the actual server -response stream diverge whenever an error or unexpected message interrupts a multi-message sub-request -injected transparently by pgdog. The Error handler was written for a single client-visible request and -does not account for the compound structures the prepared-statement rewriter produces. +**Severity:** Low — not reproducible through production code paths; included for completeness and as a guard against future refactoring. + +**Location:** `pgdog/src/backend/protocol/state.rs`, Error handler (`action()`, extended branch). + +### Description + +When a client pipelines multiple extended-query sequences — each terminated by its own `Sync` — and the first sequence errors, the Error handler calls `queue.clear()` on the entire queue. This destroys the pending entries for all subsequent sequences, causing `ProtocolOutOfSync` when their backend responses arrive. + +### Code path + +Suppose three sequences share one `ProtocolState` queue: + +``` +[1,2,C,Z, 1,2,C,Z, 1,2,C,Z] + ^─seq1─^ ^─seq2─^ ^─seq3─^ +``` + +After seq1 consumes ParseComplete (`1`) and BindComplete (`2`), the queue is: + +``` +[C, Z, 1, 2, C, Z, 1, 2, C, Z] +``` + +Seq1 then errors: + +| Step | Action | Queue after | +|---|---|---| +| Error arm fires | `pop_back()` → seq3's `Z`; `clear()` removes `[C,Z,1,2,C,Z,1,2,C]`; `push_back(Z)` | `[Z]` | +| seq1 ReadyForQuery arrives | pops `Z` normally | **empty** | +| seq2 ParseComplete arrives | `pop_front()` on empty queue | **ProtocolOutOfSync** | + +### Why it does not affect production + +`ClientRequest::spliced()` in `pgdog/src/frontend/client_request.rs` splits every multi-Execute pipeline into sub-requests at `Execute` boundaries, placing each `Sync` in its own standalone sub-request. Sub-requests are processed sequentially: each one is sent and fully drained before the next one is enqueued. The `ProtocolState` queue therefore only ever holds the entries for a single sync group at a time, so `queue.clear()` on error only ever sees that one group's entries. + +This property is a load-bearing invariant. If `spliced()` is ever changed — for example to batch multiple sync groups into one send — this bug will surface immediately. + +### Reproduction + +Not reproducible through normal pool operation. Reproduced by loading all three sync groups into one `ProtocolState` directly: + +```sh +cargo test -p pgdog test_pipeline_single_queue_error_only_clears_failing_sync_group +cargo test -p pgdog test_pipelined_multiple_syncs_first_fails +``` + +Both tests currently **fail** with `ProtocolOutOfSync`. + +### Tests + +| Test | Level | Status | What it covers | +|---|---|---|---| +| `test_pipeline_single_queue_error_clears_subsequent_sync_groups` | unit (`state.rs`) | passes | documents current (broken) behaviour: seq2 entries are gone | +| `test_pipeline_single_queue_error_only_clears_failing_sync_group` | unit (`state.rs`) | **fails** | specifies correct behaviour: seq2 and seq3 must survive | +| `test_pipelined_multiple_syncs_first_fails` | integration (`server.rs`) | **fails** | end-to-end reproduction against a real backend | + +### Fix + +The Error arm must clear only the failing sync group's entries — from the current queue head up to and including its own `ReadyForQuery` — leaving everything beyond that boundary intact: + +```rust +// Remove entries up to and including this sync group's ReadyForQuery. +while let Some(item) = self.queue.pop_front() { + if item == ExecutionItem::Code(ExecutionCode::ReadyForQuery) { + break; + } +} +``` + +The current `pop_back()` / `clear()` / `push_back()` pattern was written assuming one sync group per queue. Replacing it with a forward scan to the first `ReadyForQuery` boundary makes the handler correct for both the single-group and multi-group cases. -Issue 4 is a secondary consequence: `extended` was added as a guard for the Error handler but was -attached to the connection rather than the current pipeline, so it outlives the requests it was meant -to describe. diff --git a/integration/ruby/protocol_out_of_sync_spec.rb b/integration/ruby/protocol_out_of_sync_spec.rb new file mode 100644 index 000000000..c4b787f04 --- /dev/null +++ b/integration/ruby/protocol_out_of_sync_spec.rb @@ -0,0 +1,57 @@ +# frozen_string_literal: true + +require_relative 'rspec_helper' + +def connect(dbname = 'pgdog', user = 'pgdog') + PG.connect(dbname: dbname, user: user, password: 'pgdog', port: 6432, host: '127.0.0.1', application_name: '') +end + +describe 'protocol out of sync' do + after do + ensure_done + end + + # In pipeline mode, a failed first query must not prevent subsequent queries + # from executing. Seq2 and seq3 must return rows even when seq1 errors. + it 'extended query pipeline: error in seq1 does not drop seq2 and seq3' do + conn = connect + + conn.enter_pipeline_mode + + # Seq1: will fail — division by zero + conn.send_query_params 'SELECT 1/0', [] + conn.pipeline_sync + # Seq2: must succeed + conn.send_query_params 'SELECT $1::integer AS val', [2] + conn.pipeline_sync + # Seq3: must succeed + conn.send_query_params 'SELECT $1::integer AS val', [3] + conn.pipeline_sync + + # Seq1: fails — consume the error result, then the sync boundary. + begin + conn.get_result + rescue PG::Error => e + expect(e.message).to include('division by zero') + end + conn.get_result # nil — end of command + conn.get_result # PipelineSync — sync boundary + + # Seq2: must return a real row — if aborted instead, the request was dropped. + r2 = conn.get_result + expect(r2.result_status).to eq(PG::PGRES_TUPLES_OK) + expect(r2[0]['val']).to eq('2') + conn.get_result # end of command + conn.get_result # PipelineSync + + # Seq3: must return a real row. + r3 = conn.get_result + expect(r3.result_status).to eq(PG::PGRES_TUPLES_OK) + expect(r3[0]['val']).to eq('3') + conn.get_result # end of command + conn.get_result # PipelineSync + + conn.exit_pipeline_mode + conn.close + end +end diff --git a/pgdog/src/backend/protocol/state.rs b/pgdog/src/backend/protocol/state.rs index 8cef58d14..6bc18f67f 100644 --- a/pgdog/src/backend/protocol/state.rs +++ b/pgdog/src/backend/protocol/state.rs @@ -975,4 +975,98 @@ mod test { assert_eq!(state.action('Z').unwrap(), Action::Forward); // forwarded assert!(state.is_empty()); } + + // ========================================= + // Pipeline multi-sync tests + // ========================================= + + // In pipeline mode, each request runs in isolation; a failure in one must not affect the others. + #[test] + fn test_pipeline_multi_sync_error_in_seq1_does_not_affect_seq2_seq3() { + // Setup: seq1 expects ParseComplete, BindComplete, CommandComplete, ReadyForQuery. + let mut seq1 = ProtocolState::default(); + seq1.add('1'); // ParseComplete + seq1.add('2'); // BindComplete + seq1.add('C'); // CommandComplete (won't arrive — execute errors) + seq1.add('Z'); // ReadyForQuery + + // Seq1: fails — seq2 and seq3 must still respond. + assert_eq!(seq1.action('1').unwrap(), Action::Forward); // ParseComplete + assert_eq!(seq1.action('2').unwrap(), Action::Forward); // BindComplete + assert_eq!(seq1.action('E').unwrap(), Action::Forward); // ErrorResponse + assert!(seq1.out_of_sync); + assert_eq!(seq1.len(), 1); + assert_eq!(seq1.action('Z').unwrap(), Action::Forward); // ReadyForQuery + assert!(!seq1.out_of_sync); + assert!(seq1.is_empty()); + + // Seq2: independent state; seq1's error must not affect it. + let mut seq2 = ProtocolState::default(); + seq2.add('1'); + seq2.add('2'); + seq2.add('C'); + seq2.add('Z'); + + assert_eq!(seq2.action('1').unwrap(), Action::Forward); // ParseComplete + assert_eq!(seq2.action('2').unwrap(), Action::Forward); // BindComplete + assert_eq!(seq2.action('C').unwrap(), Action::Forward); // CommandComplete + assert_eq!(seq2.action('Z').unwrap(), Action::Forward); // ReadyForQuery + assert!(seq2.is_empty()); + + // Seq3: independent state; seq1's error must not affect it. + let mut seq3 = ProtocolState::default(); + seq3.add('1'); + seq3.add('2'); + seq3.add('C'); + seq3.add('Z'); + + assert_eq!(seq3.action('1').unwrap(), Action::Forward); // ParseComplete + assert_eq!(seq3.action('2').unwrap(), Action::Forward); // BindComplete + assert_eq!(seq3.action('C').unwrap(), Action::Forward); // CommandComplete + assert_eq!(seq3.action('Z').unwrap(), Action::Forward); // ReadyForQuery + assert!(seq3.is_empty()); + } + + // In pipeline mode, a failed request must not prevent subsequent pipelined + // requests from being processed. Seq1 errors; seq2 and seq3 must still + // receive their responses. + // + // Currently FAILS: when seq1 errors, seq2 and seq3 receive errors instead + // of their expected responses. + #[test] + fn test_pipeline_single_queue_error_only_clears_failing_sync_group() { + let mut state = ProtocolState::default(); + // Setup: all three sync groups loaded into a single shared queue. + state.add('1'); + state.add('2'); + state.add('C'); + state.add('Z'); // seq1 + state.add('1'); + state.add('2'); + state.add('C'); + state.add('Z'); // seq2 + state.add('1'); + state.add('2'); + state.add('C'); + state.add('Z'); // seq3 + + // Seq1: fails — seq2 and seq3 must still be reachable. + assert_eq!(state.action('1').unwrap(), Action::Forward); // ParseComplete + assert_eq!(state.action('2').unwrap(), Action::Forward); // BindComplete + assert_eq!(state.action('E').unwrap(), Action::Forward); // error — must not clear seq2/seq3 + assert_eq!(state.action('Z').unwrap(), Action::Forward); // ReadyForQuery + + // Seq2: must process normally. + assert_eq!(state.action('1').unwrap(), Action::Forward); // ParseComplete + assert_eq!(state.action('2').unwrap(), Action::Forward); // BindComplete + assert_eq!(state.action('C').unwrap(), Action::Forward); // CommandComplete + assert_eq!(state.action('Z').unwrap(), Action::Forward); // ReadyForQuery + + // Seq3: must process normally. + assert_eq!(state.action('1').unwrap(), Action::Forward); // ParseComplete + assert_eq!(state.action('2').unwrap(), Action::Forward); // BindComplete + assert_eq!(state.action('C').unwrap(), Action::Forward); // CommandComplete + assert_eq!(state.action('Z').unwrap(), Action::Forward); // ReadyForQuery + assert!(state.is_empty()); + } } diff --git a/pgdog/src/backend/server.rs b/pgdog/src/backend/server.rs index daf2722e8..943c87621 100644 --- a/pgdog/src/backend/server.rs +++ b/pgdog/src/backend/server.rs @@ -3144,6 +3144,63 @@ pub mod test { assert!(!server.needs_drain()); } + // In pipeline mode, a failed request must not prevent subsequent pipelined + // requests from being processed. All three sequences are sent in one batch; + // seq1 fails, seq2 and seq3 must still return their rows. + // + // Currently FAILS: the error in seq1 causes seq2 and seq3 to receive + // ProtocolOutOfSync instead of their expected responses. + #[tokio::test] + async fn test_pipelined_multiple_syncs_first_fails() { + let mut server = test_server().await; + + // Three pipelined sequences sent in one batch. + server + .send( + &vec![ + // Seq1 — will fail. + Parse::new_anonymous("SELECT 1/0").into(), + Bind::default().into(), + Execute::new().into(), + Sync.into(), + // Seq2 — must succeed. + Parse::new_anonymous("SELECT 2").into(), + Bind::default().into(), + Execute::new().into(), + Sync.into(), + // Seq3 — must succeed. + Parse::new_anonymous("SELECT 3").into(), + Bind::default().into(), + Execute::new().into(), + Sync.into(), + ] + .into(), + ) + .await + .unwrap(); + + // Seq1: fails — seq2 and seq3 must still respond. + assert_eq!(server.read().await.unwrap().code(), '1'); // ParseComplete + assert_eq!(server.read().await.unwrap().code(), 'E'); // ErrorResponse + assert_eq!(server.read().await.unwrap().code(), 'Z'); // ReadyForQuery + + // Seq2: must return data despite seq1 erroring. + assert_eq!(server.read().await.unwrap().code(), '1'); // ParseComplete + assert_eq!(server.read().await.unwrap().code(), '2'); // BindComplete + assert_eq!(server.read().await.unwrap().code(), 'D'); // DataRow + assert_eq!(server.read().await.unwrap().code(), 'C'); // CommandComplete + assert_eq!(server.read().await.unwrap().code(), 'Z'); // ReadyForQuery + + // Seq3: must return data. + assert_eq!(server.read().await.unwrap().code(), '1'); // ParseComplete + assert_eq!(server.read().await.unwrap().code(), '2'); // BindComplete + assert_eq!(server.read().await.unwrap().code(), 'D'); // DataRow + assert_eq!(server.read().await.unwrap().code(), 'C'); // CommandComplete + assert_eq!(server.read().await.unwrap().code(), 'Z'); // ReadyForQuery + assert!(server.done()); + assert!(!server.has_more_messages()); + } + #[tokio::test] async fn test_empty_query_extended() { let mut server = test_server().await; From 43e0ffd46874db3b6baec03cba2941de496f1a4d Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Sat, 18 Apr 2026 19:42:36 +0000 Subject: [PATCH 3/6] add tests for extended --- docs/issues/PROTOCOL_OUT_OF_SYNC.md | 128 ++++++++++-------- integration/ruby/protocol_out_of_sync_spec.rb | 16 +++ pgdog/src/backend/protocol/state.rs | 70 ++++++++++ pgdog/src/backend/server.rs | 111 ++++++--------- 4 files changed, 195 insertions(+), 130 deletions(-) diff --git a/docs/issues/PROTOCOL_OUT_OF_SYNC.md b/docs/issues/PROTOCOL_OUT_OF_SYNC.md index 5bb8672f7..2708aa778 100644 --- a/docs/issues/PROTOCOL_OUT_OF_SYNC.md +++ b/docs/issues/PROTOCOL_OUT_OF_SYNC.md @@ -327,99 +327,114 @@ before reuse, bounding the blast radius to a single request. --- -## Issue 4 — `extended` flag is permanently set and never resets +## Issue 4 — `extended` flag set at enqueue time, not at processing time -**Severity:** Low-medium — affects connection-lifecycle semantics and silently changes Error handler -behaviour for all subsequent requests on a connection. +**Severity:** Low in production — not triggerable through current code paths. Dangerous if triggered: +a write-query executed on the backend may never be confirmed to the client. -**Location:** `pgdog/src/backend/protocol/state.rs`, `add()` / `add_ignore()`; `state.rs:151–153`, -Error handler. +**Location:** `pgdog/src/backend/protocol/state.rs`, `add()` / `add_ignore()` and the +`ReadyForQuery` recalculation in `action()`. ### Description -`ProtocolState.extended` is set to `true` the first time any parameterised query runs on a connection -and is never reset. The Error handler checks this flag to set `out_of_sync = true`; because the flag -is permanent, every error on that connection — including plain simple-query errors — sets -`out_of_sync = true` spuriously. +Two related bugs in how `extended` is maintained: -### Code path - -`add()` and `add_ignore()` set the flag whenever `ParseComplete ('1')` or `BindComplete ('2')` is -enqueued: +**Bug A — `add()` sets the flag too early.** +`add()` and `add_ignore()` update `extended` the moment any `ParseComplete` or `BindComplete` is +enqueued, regardless of where in the queue that item sits: ```rust self.extended = self.extended || code.extended(); ``` -The Error handler (`state.rs:151–153`): +If a simple query occupies the head of the queue and an extended query is enqueued behind it, +`extended` flips to `true` before any message has been processed. When the simple query then errors, +the Error handler sees `extended == true` and takes the extended path: sets `out_of_sync = true` and +clears the queue — destroying the extended query's pending entries. + +**Bug B — `ReadyForQuery` recalculation scans the entire remaining queue.** +After consuming a `ReadyForQuery`, the flag is recalculated as: ```rust -ExecutionCode::Error => { - if self.extended { - self.out_of_sync = true; // fires on every error, forever - } - // ... -} +self.extended = self.queue.iter().any(ExecutionItem::extended); ``` -There is no reset path. +This scans all remaining entries, not just those belonging to the next sub-request. If two simple +queries precede an extended query, after the first simple query's `ReadyForQuery` is consumed the +scan finds the extended items two hops away and sets `extended = true`. The second simple query's +error then incorrectly takes the extended path. + +### Why it does not affect production -### Consequences +`ClientRequest::spliced()` never places a simple query and an extended query in the same +`ProtocolState` queue. Each sub-request — whether simple or extended — gets its own fresh state. +The flag therefore always starts at `false` for a simple sub-request and becomes `true` only for +sub-requests that actually contain `Parse`/`Bind` messages. -- `done()` stays `false` one extra round-trip (until RFQ clears `out_of_sync`) on simple-query - errors for connections that have ever served a parameterised query. Harmless in practice today, but - more conservative than necessary. -- Future changes to the Error handler that add `extended`-specific behaviour will silently apply to - all long-lived connections, not just those currently mid-pipeline. -- `extended` reads as "has this connection *ever* been in extended-protocol mode", not "is this - connection *currently* in extended-protocol mode" — a semantic mismatch that will mislead future - readers. +### Why it is dangerous if triggered + +If a simple query and an extended query share one queue and the simple query errors: +1. The extended query's pending entries are destroyed. +2. pgdog forwards the extended query's wire messages to the backend before reading any responses. +3. When the backend's response for the extended query arrives, pgdog hits `ProtocolOutOfSync` and + discards the connection. +4. For a write query (`INSERT`/`UPDATE`/`DELETE`) the statement executed on the backend but the + client receives no confirmation — silent data inconsistency. ### Reproduction -1. Connect to pgdog. -2. Execute a parameterised query (any `$1` placeholder) — sets `extended = true`. -3. Execute `SELECT 1/0` (simple query). -4. Observe `server.out_of_sync() == true` immediately after the `'E'` response, before RFQ arrives. - Expected: `false`. +Not reproducible through normal pool operation. Reproduced at the `server.rs` level by bypassing +the splicing layer: ```sh -cargo test -p pgdog test_extended_flag_never_resets +cargo test -p pgdog test_simple_query_error_before_queued_extended_request_does_not_set_out_of_sync +cargo test -p pgdog test_simple_query_error_after_rfq_before_extended_does_not_set_out_of_sync +cargo test -p pgdog test_simple_query_error_before_extended_query_in_same_batch ``` -### Tests +The first two unit tests fail at `assert!(!state.out_of_sync)`. The server test fails with +`ProtocolOutOfSync` when the extended query's `ParseComplete` arrives against an empty queue. -**`test_extended_flag_never_resets_spurious_out_of_sync`** in `server.rs` (requires PostgreSQL) — -three phases on one connection: +The integration test `extended query succeeds after preceding simple query error` in +`integration/ruby/protocol_out_of_sync_spec.rb` currently **passes** — pgdog's splicing prevents +the bug from reaching production — and serves as a regression guard. -1. *Baseline* — fresh connection (`extended = false`); `SELECT 1/0`; asserts `out_of_sync == false`. -2. *Trigger* — `Parse + Bind + Execute + Sync` for `SELECT $1::int`; permanently sets `extended = true`. -3. *Regression* — `SELECT 1/0` twice more; asserts `out_of_sync == true` after each `'E'`. Each - `ReadyForQuery` resets `out_of_sync` to `false` but leaves `extended` unchanged. +### Tests -```sh -cargo test -p pgdog test_extended_flag_never_resets -``` +| Test | Level | Status | What it covers | +|---|---|---|---| +| `test_simple_query_error_before_queued_extended_request_does_not_set_out_of_sync` | unit (`state.rs`) | **fails** | Bug A: flag set by future enqueue poisons simple error path | +| `test_simple_query_error_after_rfq_before_extended_does_not_set_out_of_sync` | unit (`state.rs`) | **fails** | Bug B: full-queue scan sets flag for wrong sub-request | +| `test_simple_query_error_before_extended_query_in_same_batch` | server (`server.rs`) | **fails** | end-to-end: extended query lost after simple error in same batch | +| `extended query succeeds after preceding simple query error` | integration (Ruby) | passes | regression guard via normal pgdog path | ### Fix -Reset `extended` to `false` at the same point `out_of_sync` resets — when `ReadyForQuery` is -processed and the queue is fully drained: +**Bug A:** Remove the `extended` update from `add()` and `add_ignore()`. The flag must not be set +at enqueue time. Instead, set it in `action()` when a `ParseComplete` or `BindComplete` is +actually consumed from the queue front: ```rust -ExecutionCode::ReadyForQuery => { - self.out_of_sync = false; - if self.is_empty() { - self.extended = false; // pipeline complete; reset for next request +ExecutionItem::Code(in_queue_code) => { + if in_queue_code.extended() { + self.extended = true; } + // ... } ``` -Resetting only when `is_empty()` is safe: pipelined requests still in the queue keep `extended = true` -until the entire pipeline finishes. +**Bug B:** The `ReadyForQuery` recalculation must scan only to the next `ReadyForQuery` boundary, +not the entire remaining queue: -A post-fix test should verify: (a) phase 3 above now produces `out_of_sync == false`, and (b) an -intermediate `ReadyForQuery` inside a pipelined extended request does not prematurely reset `extended`. +```rust +self.extended = self.queue + .iter() + .take_while(|item| *item != &ExecutionItem::Code(ExecutionCode::ReadyForQuery)) + .any(ExecutionItem::extended); +``` + +This correctly reflects whether the **next sub-request** uses extended protocol, without being +contaminated by sub-requests further down the queue. --- @@ -495,4 +510,3 @@ while let Some(item) = self.queue.pop_front() { ``` The current `pop_back()` / `clear()` / `push_back()` pattern was written assuming one sync group per queue. Replacing it with a forward scan to the first `ReadyForQuery` boundary makes the handler correct for both the single-group and multi-group cases. - diff --git a/integration/ruby/protocol_out_of_sync_spec.rb b/integration/ruby/protocol_out_of_sync_spec.rb index c4b787f04..fda5f853a 100644 --- a/integration/ruby/protocol_out_of_sync_spec.rb +++ b/integration/ruby/protocol_out_of_sync_spec.rb @@ -11,6 +11,22 @@ def connect(dbname = 'pgdog', user = 'pgdog') ensure_done end + # A simple query that errors must not prevent a subsequent extended query + # from executing. Both are sent as separate requests; pgdog must process + # each independently. + it 'extended query succeeds after preceding simple query error' do + conn = connect + + # Simple query that errors. + expect { conn.exec 'SELECT 1/0' }.to raise_error(PG::Error, /division by zero/) + + # Extended query must succeed despite the preceding error. + res = conn.exec_params 'SELECT $1::integer AS val', [42] + expect(res[0]['val']).to eq('42') + + conn.close + end + # In pipeline mode, a failed first query must not prevent subsequent queries # from executing. Seq2 and seq3 must return rows even when seq1 errors. it 'extended query pipeline: error in seq1 does not drop seq2 and seq3' do diff --git a/pgdog/src/backend/protocol/state.rs b/pgdog/src/backend/protocol/state.rs index 6bc18f67f..32301436e 100644 --- a/pgdog/src/backend/protocol/state.rs +++ b/pgdog/src/backend/protocol/state.rs @@ -512,6 +512,76 @@ mod test { assert_eq!(state.action('Z').unwrap(), Action::Forward); } + // A simple query that errors must not set out_of_sync, even when extended- + // protocol requests are queued after it. add() currently sets extended=true + // as soon as any ParseComplete is enqueued, causing the error arm to take + // the extended path for the preceding simple query. + // + // Currently FAILS: extended=true from queuing the future extended request + // causes out_of_sync to be set on the simple-query error. + #[test] + fn test_simple_query_error_before_queued_extended_request_does_not_set_out_of_sync() { + // Setup: simple query followed by an extended query in the same queue. + let mut state = ProtocolState::default(); + state.add('C'); // CommandComplete (simple query, won't arrive) + state.add('Z'); // ReadyForQuery (simple query) + state.add('1'); // ParseComplete (extended query) + state.add('2'); // BindComplete (extended query) + state.add('C'); // CommandComplete (extended query) + state.add('Z'); // ReadyForQuery (extended query) + + // Simple query errors — must take the simple-query path. + assert_eq!(state.action('E').unwrap(), Action::Forward); + // out_of_sync must not be set: this was a simple-query error. + assert!(!state.out_of_sync); + // Simple query's ReadyForQuery remains; extended entries intact. + assert_eq!(state.action('Z').unwrap(), Action::Forward); // simple query RFQ + // Extended query is still processable. + assert_eq!(state.action('1').unwrap(), Action::Forward); // ParseComplete + assert_eq!(state.action('2').unwrap(), Action::Forward); // BindComplete + assert_eq!(state.action('C').unwrap(), Action::Forward); // CommandComplete + assert_eq!(state.action('Z').unwrap(), Action::Forward); // ReadyForQuery + assert!(state.is_empty()); + } + + // The extended flag recalculation at ReadyForQuery scans the entire remaining + // queue, so a simple query that sits before an extended query inherits + // extended=true after the preceding ReadyForQuery is consumed. + // + // Currently FAILS: after the first simple query's RFQ, extended is set to + // true because the scan finds ParseComplete/BindComplete further in the queue. + // The second simple query's error then incorrectly takes the extended path. + #[test] + fn test_simple_query_error_after_rfq_before_extended_does_not_set_out_of_sync() { + // Setup: two simple queries, then an extended query. + let mut state = ProtocolState::default(); + state.add('C'); // CommandComplete (simple query 1) + state.add('Z'); // ReadyForQuery (simple query 1) + state.add('C'); // CommandComplete (simple query 2, won't arrive) + state.add('Z'); // ReadyForQuery (simple query 2) + state.add('1'); // ParseComplete (extended query) + state.add('2'); // BindComplete (extended query) + state.add('C'); // CommandComplete (extended query) + state.add('Z'); // ReadyForQuery (extended query) + + // Simple query 1 runs normally. + assert_eq!(state.action('C').unwrap(), Action::Forward); // CommandComplete + assert_eq!(state.action('Z').unwrap(), Action::Forward); // ReadyForQuery + + // Simple query 2 errors — must still take the simple-query path. + assert_eq!(state.action('E').unwrap(), Action::Forward); + // out_of_sync must not be set: this was a simple-query error. + assert!(!state.out_of_sync); + // Simple query 2's ReadyForQuery remains; extended entries intact. + assert_eq!(state.action('Z').unwrap(), Action::Forward); // simple query 2 RFQ + // Extended query is still processable. + assert_eq!(state.action('1').unwrap(), Action::Forward); // ParseComplete + assert_eq!(state.action('2').unwrap(), Action::Forward); // BindComplete + assert_eq!(state.action('C').unwrap(), Action::Forward); // CommandComplete + assert_eq!(state.action('Z').unwrap(), Action::Forward); // ReadyForQuery + assert!(state.is_empty()); + } + #[test] fn test_extended_error_in_pipeline() { let mut state = ProtocolState::default(); diff --git a/pgdog/src/backend/server.rs b/pgdog/src/backend/server.rs index 943c87621..a0293fd90 100644 --- a/pgdog/src/backend/server.rs +++ b/pgdog/src/backend/server.rs @@ -1350,6 +1350,44 @@ pub mod test { assert!(server.done()); } + // A simple query that errors must not prevent a subsequent extended query + // from executing when both are sent in the same batch. + // + // Currently FAILS: queuing the extended items sets extended=true before the + // simple query is processed, so the simple query error incorrectly clears + // the extended query's pending entries. + #[tokio::test] + async fn test_simple_query_error_before_extended_query_in_same_batch() { + let mut server = test_server().await; + + // Setup: simple query that errors, immediately followed by an extended query. + server + .send( + &vec![ + Query::new("SELECT 1/0").into(), + Parse::new_anonymous("SELECT 1").into(), + Bind::default().into(), + Execute::new().into(), + Sync.into(), + ] + .into(), + ) + .await + .unwrap(); + + // Simple query: errors, then ReadyForQuery. + assert_eq!(server.read().await.unwrap().code(), 'E'); // ErrorResponse + assert_eq!(server.read().await.unwrap().code(), 'Z'); // ReadyForQuery + + // Extended query must still return its results. + assert_eq!(server.read().await.unwrap().code(), '1'); // ParseComplete + assert_eq!(server.read().await.unwrap().code(), '2'); // BindComplete + assert_eq!(server.read().await.unwrap().code(), 'D'); // DataRow + assert_eq!(server.read().await.unwrap().code(), 'C'); // CommandComplete + assert_eq!(server.read().await.unwrap().code(), 'Z'); // ReadyForQuery + assert!(server.done()); + } + #[tokio::test] async fn test_execute_batch_simple_query_error_then_success() { let mut server = test_server().await; @@ -3719,77 +3757,4 @@ pub mod test { "server must be done after full response sequence" ); } - - // extended=true sticks after any parameterised query; Error handler sets out_of_sync on every subsequent error. - #[tokio::test] - async fn test_extended_flag_never_resets_spurious_out_of_sync() { - use crate::net::bind::Parameter; - - let mut server = test_server().await; - - // 1. Baseline: extended=false; simple-query error must not set out_of_sync. - server - .send(&vec![Query::new("SELECT 1/0").into()].into()) - .await - .unwrap(); - let msg = server.read().await.unwrap(); - assert_eq!(msg.code(), 'E'); - assert!(!server.out_of_sync()); - let msg = server.read().await.unwrap(); - assert_eq!(msg.code(), 'Z'); - assert!(server.done()); - - // 2. Parameterised query sets extended=true permanently. - let bind = Bind::new_params_codes( - "", - &[Parameter { - len: 1, - data: "1".as_bytes().into(), - }], - &[Format::Text], - ); - server - .send( - &vec![ - ProtocolMessage::from(Parse::new_anonymous("SELECT $1::int")), - ProtocolMessage::from(bind), - ProtocolMessage::from(Execute::new()), - ProtocolMessage::from(Sync), - ] - .into(), - ) - .await - .unwrap(); - - for c in ['1', '2', 'D', 'C', 'Z'] { - let msg = server.read().await.unwrap(); - assert_eq!(msg.code(), c); - } - assert!(server.done()); - - // 3. Same error on same connection: extended stuck → out_of_sync=true spuriously. - server - .send(&vec![Query::new("SELECT 1/0").into()].into()) - .await - .unwrap(); - let msg = server.read().await.unwrap(); - assert_eq!(msg.code(), 'E'); - assert!(server.out_of_sync()); // spurious: extended=true even for plain simple query - let msg = server.read().await.unwrap(); - assert_eq!(msg.code(), 'Z'); // RFQ clears out_of_sync but leaves extended stuck - assert!(!server.out_of_sync()); - assert!(server.done()); - - // 4. Confirm extended remains stuck across RFQ resets. - server - .send(&vec![Query::new("SELECT 1/0").into()].into()) - .await - .unwrap(); - let msg = server.read().await.unwrap(); - assert_eq!(msg.code(), 'E'); - assert!(server.out_of_sync()); - let msg = server.read().await.unwrap(); - assert_eq!(msg.code(), 'Z'); - assert!(server.done()); - } } From 0f19d70b188f67397287144fb1ab10b1e7d3a6f9 Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Mon, 6 Apr 2026 22:19:03 +0000 Subject: [PATCH 4/6] fix issue 1 --- docs/issues/PROTOCOL_OUT_OF_SYNC.md | 28 ++++--- pgdog/src/backend/protocol/state.rs | 125 ++++++++++++++++++++++++---- pgdog/src/backend/server.rs | 38 ++++----- 3 files changed, 142 insertions(+), 49 deletions(-) diff --git a/docs/issues/PROTOCOL_OUT_OF_SYNC.md b/docs/issues/PROTOCOL_OUT_OF_SYNC.md index 2708aa778..fb56db339 100644 --- a/docs/issues/PROTOCOL_OUT_OF_SYNC.md +++ b/docs/issues/PROTOCOL_OUT_OF_SYNC.md @@ -113,7 +113,7 @@ to `false`, breaking the one-way latch. The `ExecutionItem::extended()` helper i enables this scan. --- -## Issue 1 — Failed `Prepare` orphans the EXECUTE ReadyForQuery +## ✅ Issue 1 — Failed `Prepare` orphans the EXECUTE ReadyForQuery **Severity:** High — triggered by normal server behaviour; no client misbehaviour required. @@ -157,7 +157,7 @@ Under high concurrency this becomes near-deterministic: the pool fast-path (`Gua time, and no opportunity to drain the kernel socket buffer. The next query on that client consumes the stale EXECUTE `Error + ReadyForQuery`, producing `ProtocolOutOfSync`. -### Reproduction +### Reproduction (historical) 1. Connect to pgdog with session or transaction pooling. 2. Issue a simple-query `EXECUTE` for a statement that will fail to prepare (schema mismatch, syntax @@ -191,18 +191,18 @@ Both tests live in `integration/prepared_statements_full/protocol_out_of_sync_sp ### Fix -Fix the Error handler in `state.rs:154–159`. When the failed message is part of a pgdog-injected -compound request, the handler must preserve the `Code(ReadyForQuery)` for the outer client-visible -request — not just the PREPARE's trailing slot. Concretely: the handler needs to recognise that -`Ignore` items at the back of the queue belong to a sub-request that is still in-flight, and must -keep the outer `Code(ReadyForQuery)` accordingly. +Error handler in `state.rs`, `ExecutionCode::Error` arm. See inline comments for full detail. -The TCP-peek approach (`FIONREAD` / `MSG_PEEK` at checkin) is a valid defensive catch-all but adds a -syscall on every checkin and does not fix the root cause. +On error, find the first `Code(ReadyForQuery)` in the queue (the client's RFQ boundary), drain +everything before it, count the `Ignore(RFQ)` slots in the drained portion, and prepend one +`[Ignore(RFQ), Ignore(Error)]` pair per slot. A separate fast-path at the top of the arm handles +the case where the queue front is already `Ignore(Error)` — subsequent errors from the same +injected sub-request — by popping and returning `Action::Ignore` directly. +See also: `test_injected_prepare_error_full_lifecycle` in `state.rs`. --- -## Issue 2 — Double `action()` call in `forward()` for server CopyDone +## 🔴 Issue 2 — Double `action()` call in `forward()` for server CopyDone **Severity:** Medium — requires the client to omit a trailing `Sync`. @@ -270,7 +270,7 @@ way, the invariant must be made explicit in code comments. --- -## Issue 3 — Stale ReadyForQuery hits an `Ignore(ParseComplete)` slot +## 🔴 Issue 3 — Stale ReadyForQuery hits an `Ignore(ParseComplete)` slot **Severity:** Low — practically unreachable in normal operation. @@ -509,4 +509,8 @@ while let Some(item) = self.queue.pop_front() { } ``` -The current `pop_back()` / `clear()` / `push_back()` pattern was written assuming one sync group per queue. Replacing it with a forward scan to the first `ReadyForQuery` boundary makes the handler correct for both the single-group and multi-group cases. +Resetting only when `is_empty()` is safe: pipelined requests still in the queue keep `extended = true` +until the entire pipeline finishes. + +A post-fix test should verify: (a) phase 3 above now produces `out_of_sync == false`, and (b) an +intermediate `ReadyForQuery` inside a pipelined extended request does not prematurely reset `extended`. diff --git a/pgdog/src/backend/protocol/state.rs b/pgdog/src/backend/protocol/state.rs index 32301436e..7aa320d8a 100644 --- a/pgdog/src/backend/protocol/state.rs +++ b/pgdog/src/backend/protocol/state.rs @@ -1,3 +1,5 @@ +use tracing::error; + use crate::{ net::{Message, Protocol}, stats::memory::MemoryUsage, @@ -154,29 +156,62 @@ impl ProtocolState { match code { ExecutionCode::Untracked => return Ok(Action::Forward), ExecutionCode::Error => { - if !self.extended { - // A simple-query error only aborts the current simple query. - // Keep any later pipelined simple query RFQs queued. - while !self.queue.is_empty() - && self.queue.front() - != Some(&ExecutionItem::Code(ExecutionCode::ReadyForQuery)) - { - self.queue.pop_front(); - } - return Ok(Action::Forward); + if matches!( + self.queue.front(), + Some(ExecutionItem::Ignore(ExecutionCode::Error)) + ) { + // We ignore errors only for the pgdog-injected sub-request. + // In that case the first error is already processed and + // sent to the client, for the remaining expected errors + // we've added ignores for errors and RFQ. + // The error is ignored but still be logged by [backend::server] module + self.queue.pop_front(); + return Ok(Action::Ignore); } - // Remove everything from the execution queue. - // The connection is out of sync until client re-syncs it. + // This is the first (and client-visible) error in the chain. It is forwarded + // so the client receives exactly one Error+RFQ for their request. + + // For extended-protocol pipelines also mark out-of-sync so the connection + // is not reused until the client re-syncs. if self.extended { self.out_of_sync = true; } - let last = self.queue.pop_back(); - self.queue.clear(); - if let Some(ExecutionItem::Code(ExecutionCode::ReadyForQuery)) = last { + + // find the first position for RFQ code to effectively + // separate the pgdog-injected sub-request from the remaining queries + let Some(rfq_pos) = self + .queue + .iter() + .position(|i| matches!(i, ExecutionItem::Code(ExecutionCode::ReadyForQuery))) + else { + self.queue.clear(); + return Ok(Action::Forward); + }; + + // broken_queue - pgdog-injected sub-request part that contains multiple requests + // that are not be executed properly anyway, since we've got an error previously + let broken_queue = self.queue.drain(..rfq_pos); + + // Count how many queries are expected to finish in the pgdog-injected sub-request + // The current use case is only the Prepare + Execute messages from the [backend::server] + // And in case the prepare fails the execute will fail as well. + // WARN: That is not most reliable solution in case the injected set of queries + // will extend, but it should work for now. + let count_ignores = broken_queue + .filter(|i| matches!(i, ExecutionItem::Ignore(ExecutionCode::ReadyForQuery))) + .count(); + + // For every message that we expect to run add ignore for one error and one RFQ + // For prepare it'll be a one iteration that will create the query + // [Ignore(RFQ), Ignore(Error), Code(RFQ)] + for _ in 0..count_ignores { + self.queue + .push_front(ExecutionItem::Ignore(ExecutionCode::Error)); self.queue - .push_back(ExecutionItem::Code(ExecutionCode::ReadyForQuery)); + .push_front(ExecutionItem::Ignore(ExecutionCode::ReadyForQuery)); } + return Ok(Action::Forward); } @@ -185,7 +220,10 @@ impl ProtocolState { } _ => (), }; - let in_queue = self.queue.pop_front().ok_or(Error::ProtocolOutOfSync)?; + let in_queue = self.queue.pop_front().ok_or_else(|| { + error!("Unexpected action {code:?}: queue is empty"); + Error::ProtocolOutOfSync + })?; let action = match in_queue { // The queue is waiting for the server to send ReadyForQuery, // but it sent something else. That means the execution pipeline @@ -205,6 +243,8 @@ impl ProtocolState { if code == in_queue { Ok(Action::Ignore) } else { + error!(?self, "Unexpected action {code:?}: expected: {in_queue:?}"); + Err(Error::ProtocolOutOfSync) } } @@ -1046,6 +1086,57 @@ mod test { assert!(state.is_empty()); } + // Replicates the full lifecycle of an injected PREPARE that errors: + // + // Client sends: PREPARE foo AS ... (simple-query style) + // EXECUTE (via Query) + // + // pgdog injects ahead of the client's Query: + // add_ignore('C') — CommandComplete from PREPARE + // add_ignore('Z') — RFQ from PREPARE + // Then the client's Query adds: + // add('Z') — the client-visible RFQ + // + // Queue before first error: [Ignore(C), Ignore(Z), Code(Z)] + // + // Server responds to PREPARE with an error: + // 'E' → error branch fires: drain [Ignore(C), Ignore(Z)], count 1 Ignore(RFQ), + // push_front loop produces [Ignore(RFQ), Ignore(Error), Code(Z)]. + // Action::Forward — client receives this error. + // 'Z' → matches Ignore(RFQ) → Action::Ignore (PREPARE's RFQ suppressed) + // + // Server responds to EXECUTE (which fails because PREPARE never succeeded): + // 'E' → fast-path: front is Ignore(Error) → pop → Action::Ignore (suppressed) + // 'Z' → Code(Z) → Action::Forward — client receives the closing RFQ + #[test] + fn test_injected_prepare_error_full_lifecycle() { + let mut state = ProtocolState::default(); + + // --- setup: replicate what prepared_statements.rs does --- + // ProtocolMessage::Prepare injects: + state.add_ignore('C'); // Ignore(CommandComplete) — PREPARE response + state.add_ignore('Z'); // Ignore(RFQ) — PREPARE response + // ProtocolMessage::Query (client EXECUTE) adds: + state.add('Z'); // Code(RFQ) — client-visible + + // --- server sends Error for PREPARE --- + // Error branch: drains [Ignore(C), Ignore(Z)], finds 1 Ignore(Z), + // rebuilds queue as [Ignore(RFQ), Ignore(Error), Code(Z)]. + assert_eq!(state.action('E').unwrap(), Action::Forward); + + // --- server sends RFQ for PREPARE (now suppressed) --- + assert_eq!(state.action('Z').unwrap(), Action::Ignore); + + // --- server sends Error for EXECUTE (prepare never succeeded) --- + // Fast-path: Ignore(Error) is at front → pop and ignore. + assert_eq!(state.action('E').unwrap(), Action::Ignore); + + // --- server sends RFQ for EXECUTE --- + // Code(Z) is at front → forwarded to client. + assert_eq!(state.action('Z').unwrap(), Action::Forward); + assert!(state.is_empty()); + } + // ========================================= // Pipeline multi-sync tests // ========================================= diff --git a/pgdog/src/backend/server.rs b/pgdog/src/backend/server.rs index a0293fd90..ea1a126e7 100644 --- a/pgdog/src/backend/server.rs +++ b/pgdog/src/backend/server.rs @@ -435,6 +435,13 @@ impl Server { Ok(forward) => { if forward { break message; + } else if message.code() == 'E' { + // we got an error that will not be forwarded to the client, + // but it still be useful for tracing + error!( + "Ignore error from stream: {:?}", + ErrorResponse::from_bytes(message.payload()) + ); } } Err(err) => { @@ -1997,14 +2004,17 @@ pub mod test { let mut prep = PreparedStatements::new(); let mut parse = Parse::named("test", "SELECT 1::bigint"); + prep.insert_anyway(&mut parse); - assert_eq!(parse.name(), "__pgdog_1"); + + let name = parse.name().to_owned(); + assert!(name.starts_with("__pgdog_")); server .send( &vec![ProtocolMessage::from(Query::new(format!( "PREPARE {} AS {}", - parse.name(), + name, parse.query() )))] .into(), @@ -2017,10 +2027,10 @@ pub mod test { } assert!(server.sync_prepared()); server.sync_prepared_statements().await.unwrap(); - assert!(server.prepared_statements.contains("__pgdog_1")); + assert!(server.prepared_statements.contains(&name)); - let describe = Describe::new_statement("__pgdog_1"); - let bind = Bind::new_statement("__pgdog_1"); + let describe = Describe::new_statement(&name); + let bind = Bind::new_statement(&name); let execute = Execute::new(); server .send( @@ -2040,7 +2050,7 @@ pub mod test { assert_eq!(c, msg.code()); } - let parse = Parse::named("__pgdog_1", "SELECT 2::bigint"); + let parse = Parse::named(&name, "SELECT 2::bigint"); let describe = describe.clone(); server @@ -2054,7 +2064,7 @@ pub mod test { } server - .send(&vec![ProtocolMessage::from(Query::new("EXECUTE __pgdog_1"))].into()) + .send(&vec![ProtocolMessage::from(Query::new(format!("EXECUTE {name}")))].into()) .await .unwrap(); for c in ['T', 'D', 'C', 'Z'] { @@ -3675,19 +3685,7 @@ pub mod test { let msg = server.read().await.unwrap(); assert_eq!(msg.code(), 'E'); // 'E' PREPARE error let msg = server.read().await.unwrap(); - assert_eq!(msg.code(), 'Z'); // 'Z' PREPARE RFQ — queue now empty - - // 3. EXECUTE 'E' forwarded (no-op on empty queue). - let msg = server.read().await.unwrap(); - assert_eq!(msg.code(), 'E'); // 'E' EXECUTE error - - // 4. BUG: EXECUTE 'Z' hits empty queue → ProtocolOutOfSync (fix: assert 'Z' + done()). - let err = server.read().await.unwrap_err(); - assert!( - matches!(err, Error::ProtocolOutOfSync), - "expected ProtocolOutOfSync; got {:?}", - err, - ); + assert_eq!(msg.code(), 'Z'); // 'Z' RFQ — queue now empty } // Extended Execute + Flush (no Sync): no RFQ backstop — double action('c') raises ProtocolOutOfSync. From 0bdb32845163a21b8b32de9bc0ad775152571b6d Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Tue, 7 Apr 2026 21:08:39 +0000 Subject: [PATCH 5/6] fix issue 2,3,4,5 --- docs/issues/PROTOCOL_OUT_OF_SYNC.md | 186 +++++++---------------- pgdog/src/backend/prepared_statements.rs | 4 +- pgdog/src/backend/protocol/state.rs | 165 +++++--------------- pgdog/src/backend/server.rs | 15 +- 4 files changed, 108 insertions(+), 262 deletions(-) diff --git a/docs/issues/PROTOCOL_OUT_OF_SYNC.md b/docs/issues/PROTOCOL_OUT_OF_SYNC.md index fb56db339..42cf0ae46 100644 --- a/docs/issues/PROTOCOL_OUT_OF_SYNC.md +++ b/docs/issues/PROTOCOL_OUT_OF_SYNC.md @@ -202,22 +202,22 @@ injected sub-request — by popping and returning `Action::Ignore` directly. See also: `test_injected_prepare_error_full_lifecycle` in `state.rs`. --- -## 🔴 Issue 2 — Double `action()` call in `forward()` for server CopyDone +## ✅ Issue 2 — Double `action()` call in `forward()` for server CopyDone **Severity:** Medium — requires the client to omit a trailing `Sync`. -**Location:** `pgdog/src/backend/prepared_statements.rs`, `forward()`, lines ~198 and ~237. +**Location:** `pgdog/src/backend/prepared_statements.rs`, `forward()`. ### Description -`forward()` calls `state.action(code)` unconditionally at line 198, then a second time inside the -`'c'` (CopyDone) match arm at line 237. When no `Code(ReadyForQuery)` backstop is present in the -queue, the second call hits an empty queue and raises `ProtocolOutOfSync`. +`forward()` called `state.action(code)` unconditionally near the top of the function, then called +it a second time inside the `'c'` (CopyDone) match arm. Without a `Code(ReadyForQuery)` backstop in +the queue the second call hit an empty queue and raised `ProtocolOutOfSync`. ### Code path -Normal path (safe): `Code(ReadyForQuery)` is always in the queue. `action('Z')` pushes it back rather -than consuming it, making the double call idempotent. +Normal path (safe): `Code(ReadyForQuery)` is always in the queue. `action('Z')` pushes it back +rather than consuming it, making the double call idempotent. Unsafe path — client sends `Parse + Bind + Execute + Flush` (no `Sync`). `handle()` builds: @@ -225,20 +225,20 @@ Unsafe path — client sends `Parse + Bind + Execute + Flush` (no `Sync`). `hand [Code(ParseComplete), Code(BindComplete), Code(ExecutionCompleted)] ``` -No `Code(ReadyForQuery)` is added. When the server responds with CopyDone: +No `Code(ReadyForQuery)` is added. When the server responded with CopyDone: ``` First action('c'): pops Code(ExecutionCompleted) — consumed Second action('c'): empty queue → ProtocolOutOfSync ``` -### Reproduction +### Reproduction (historical) Not triggerable via the `pg` gem or any libpq-based driver — libpq always appends `Sync` after -`Execute`. Requires sending raw protocol messages directly. +`Execute`. Required sending raw protocol messages directly. ```sh -cargo test -p pgdog test_copy_out_done_double_action_out_of_sync_without_sync +cargo test -p pgdog --lib -- test_copydone_double_action_oos_without_rfq_backstop ``` ### Tests @@ -247,87 +247,32 @@ cargo test -p pgdog test_copy_out_done_double_action_out_of_sync_without_sync - **`test_copydone_double_action_safe_with_rfq_backstop`** — queue `[Code(Copy), Code(ReadyForQuery)]`; two `action('c')` calls both succeed; RFQ slot is pushed back and survives. -- **`test_copydone_double_action_oos_without_rfq_backstop`** — queue `[Code(ExecutionCompleted)]`; - second `action('c')` returns `Err(ProtocolOutOfSync)`. +- **`test_copydone_double_action_oos_without_rfq_backstop`** — documents the raw state-machine + invariant: calling `action('c')` twice with no RFQ backstop still causes `ProtocolOutOfSync` + directly on the state machine. `forward()` no longer makes this second call; this path is + unreachable through normal protocol flow. Test is retained to pin the underlying invariant. **Server-level tests (`server.rs`, require PostgreSQL)** -- **`test_copydone_double_action_oos_without_sync`** — `Parse + Bind + Execute + Flush` - (no Sync); reads ParseComplete, BindComplete, CopyOutResponse, CopyData ×2, then asserts - `ProtocolOutOfSync` on CopyDone. +- **`test_copydone_single_action_without_sync`** — `Parse + Bind + Execute + Flush` (no Sync); + reads ParseComplete, BindComplete, CopyOutResponse, CopyData ×2, then asserts CopyDone is + forwarded successfully. The trailing CommandComplete then hits an empty queue (no RFQ backstop) + and raises `ProtocolOutOfSync` — that is the correct remaining behavior with no `Sync`. - **`test_copydone_double_action_safe_with_sync`** — same pipeline with `Sync`; full sequence completes without error; asserts `server.done()`. ```sh -cargo test -p pgdog test_copydone_double_action +cargo test -p pgdog --lib -- test_copydone_double_action +cargo test -p pgdog -- test_copydone ``` ### Fix -Remove the second `action()` call in the `'c'` arm of `forward()`, or guarantee that a -`Code(ReadyForQuery)` backstop is always in the queue before the CopyDone path is reached. Either -way, the invariant must be made explicit in code comments. - ---- - -## 🔴 Issue 3 — Stale ReadyForQuery hits an `Ignore(ParseComplete)` slot - -**Severity:** Low — practically unreachable in normal operation. - -**Location:** `pgdog/src/backend/protocol/state.rs`, Ignore arm. +Removed the redundant `self.state.action(code)?` from the `'c'` arm in `forward()`. The call at +the top of the function already advances the state machine for CopyDone; the arm body is now empty. -### Description - -If a `ReadyForQuery` byte from a prior request cycle remains unread in the TCP receive buffer when -the next request starts, `action('Z')` fires while the queue front is `Ignore(ParseComplete)`. The -Ignore arm requires an exact code match; `ReadyForQuery != ParseComplete` → `ProtocolOutOfSync`. - -### Code path -pgdog injects a Parse for a missing statement; queue front: - -``` -[Ignore(ParseComplete), Code(BindComplete), ...] -``` - -Stale `ReadyForQuery` arrives before `ParseComplete`: - -``` -action('Z'): generic pop → Ignore(ParseComplete) - → ReadyForQuery != ParseComplete → ProtocolOutOfSync -``` - -### Reproduction - -Not reproducible through normal pool operation. The `done()` guard chain prevents pool reclaim while -any `Ignore` item is present: - -- `ProtocolState::done()` = `is_empty() && !out_of_sync` → `false` while any `Ignore` slot exists. -- `PreparedStatements::done()` adds a second gate blocking reclaim while an injected Parse is in flight. -- `Pool::maybe_check_in()` discards errored connections before `can_check_in()` is evaluated. - -The precondition requires a concurrent-access bug that bypasses the pool guard, or direct TCP stream -injection. - -### Tests - -State-machine unit tests in `state.rs` cover the `action()` mismatch directly. A server-level -integration test is not practical; the precondition cannot be reached through normal sequential -protocol flow. - -```sh -cargo test -p pgdog test_stale_rfq -``` - -### Fix - -No code change required. The existing `done()` guard chain already prevents the precondition from -arising. If it were somehow reached, the resulting `ProtocolOutOfSync` would discard the connection -before reuse, bounding the blast radius to a single request. - ---- - -## Issue 4 — `extended` flag set at enqueue time, not at processing time +## ✅ Issue 4 — `extended` flag set at enqueue time, not at processing time **Severity:** Low in production — not triggerable through current code paths. Dangerous if triggered: a write-query executed on the backend may never be confirmed to the client. @@ -381,7 +326,7 @@ If a simple query and an extended query share one queue and the simple query err 4. For a write query (`INSERT`/`UPDATE`/`DELETE`) the statement executed on the backend but the client receives no confirmation — silent data inconsistency. -### Reproduction +### Reproduction (historical) Not reproducible through normal pool operation. Reproduced at the `server.rs` level by bypassing the splicing layer: @@ -392,53 +337,44 @@ cargo test -p pgdog test_simple_query_error_after_rfq_before_extended_does_not_s cargo test -p pgdog test_simple_query_error_before_extended_query_in_same_batch ``` -The first two unit tests fail at `assert!(!state.out_of_sync)`. The server test fails with -`ProtocolOutOfSync` when the extended query's `ParseComplete` arrives against an empty queue. - The integration test `extended query succeeds after preceding simple query error` in -`integration/ruby/protocol_out_of_sync_spec.rb` currently **passes** — pgdog's splicing prevents +`integration/ruby/protocol_out_of_sync_spec.rb` passes — pgdog's splicing prevents the bug from reaching production — and serves as a regression guard. ### Tests | Test | Level | Status | What it covers | |---|---|---|---| -| `test_simple_query_error_before_queued_extended_request_does_not_set_out_of_sync` | unit (`state.rs`) | **fails** | Bug A: flag set by future enqueue poisons simple error path | -| `test_simple_query_error_after_rfq_before_extended_does_not_set_out_of_sync` | unit (`state.rs`) | **fails** | Bug B: full-queue scan sets flag for wrong sub-request | -| `test_simple_query_error_before_extended_query_in_same_batch` | server (`server.rs`) | **fails** | end-to-end: extended query lost after simple error in same batch | +| `test_simple_query_error_before_queued_extended_request_does_not_set_out_of_sync` | unit (`state.rs`) | **passes** | Bug A: flag set by future enqueue poisons simple error path | +| `test_simple_query_error_after_rfq_before_extended_does_not_set_out_of_sync` | unit (`state.rs`) | **passes** | Bug B: full-queue scan sets flag for wrong sub-request | +| `test_simple_query_error_before_extended_query_in_same_batch` | server (`server.rs`) | **passes** | end-to-end: extended query survives simple error in same batch | | `extended query succeeds after preceding simple query error` | integration (Ruby) | passes | regression guard via normal pgdog path | +```sh +cargo test -p pgdog --lib -- test_simple_query_error_before_queued_extended_request_does_not_set_out_of_sync +cargo test -p pgdog --lib -- test_simple_query_error_after_rfq_before_extended_does_not_set_out_of_sync +``` + ### Fix -**Bug A:** Remove the `extended` update from `add()` and `add_ignore()`. The flag must not be set -at enqueue time. Instead, set it in `action()` when a `ParseComplete` or `BindComplete` is -actually consumed from the queue front: +The `extended` field was removed entirely from `ProtocolState`. Both bugs stemmed from maintaining +it as a cached flag — Bug A by setting it too early, Bug B by recomputing it too broadly. +Removing it eliminates both defects at the root. -```rust -ExecutionItem::Code(in_queue_code) => { - if in_queue_code.extended() { - self.extended = true; - } - // ... -} -``` +`out_of_sync` is now set unconditionally for all errors (simple or extended) and cleared +unconditionally on every `ReadyForQuery`. The distinction is irrelevant: after any error the +connection must wait for the peer's RFQ before being reused, and that RFQ always arrives as the +next terminal message in the same sub-request. -**Bug B:** The `ReadyForQuery` recalculation must scan only to the next `ReadyForQuery` boundary, -not the entire remaining queue: +For both simple and extended errors the sequence is: -```rust -self.extended = self.queue - .iter() - .take_while(|item| *item != &ExecutionItem::Code(ExecutionCode::ReadyForQuery)) - .any(ExecutionItem::extended); ``` - -This correctly reflects whether the **next sub-request** uses extended protocol, without being -contaminated by sub-requests further down the queue. +Error → out_of_sync = true +ReadyForQuery → out_of_sync = false (connection clean; extended entries beyond this boundary intact) +``` --- - -## Issue 5 — Error in first pipelined request clears subsequent requests' queue entries +## ✅ Issue 5 — Error in first pipelined request clears subsequent requests' queue entries **Severity:** Low — not reproducible through production code paths; included for completeness and as a guard against future refactoring. @@ -477,7 +413,7 @@ Seq1 then errors: This property is a load-bearing invariant. If `spliced()` is ever changed — for example to batch multiple sync groups into one send — this bug will surface immediately. -### Reproduction +### Reproduction (historical) Not reproducible through normal pool operation. Reproduced by loading all three sync groups into one `ProtocolState` directly: @@ -486,31 +422,21 @@ cargo test -p pgdog test_pipeline_single_queue_error_only_clears_failing_sync_gr cargo test -p pgdog test_pipelined_multiple_syncs_first_fails ``` -Both tests currently **fail** with `ProtocolOutOfSync`. +Both tests now **pass**. ### Tests | Test | Level | Status | What it covers | |---|---|---|---| -| `test_pipeline_single_queue_error_clears_subsequent_sync_groups` | unit (`state.rs`) | passes | documents current (broken) behaviour: seq2 entries are gone | -| `test_pipeline_single_queue_error_only_clears_failing_sync_group` | unit (`state.rs`) | **fails** | specifies correct behaviour: seq2 and seq3 must survive | -| `test_pipelined_multiple_syncs_first_fails` | integration (`server.rs`) | **fails** | end-to-end reproduction against a real backend | +| `test_pipeline_multi_sync_error_in_seq1_does_not_affect_seq2_seq3` | unit (`state.rs`) | **passes** | error in seq1 leaves seq2 and seq3 intact | +| `test_pipeline_single_queue_error_only_clears_failing_sync_group` | unit (`state.rs`) | **passes** | only the failing sync group's entries are drained | +| `test_pipelined_multiple_syncs_first_fails` | server (`server.rs`) | **passes** | end-to-end: seq2 completes after seq1 errors | ### Fix -The Error arm must clear only the failing sync group's entries — from the current queue head up to and including its own `ReadyForQuery` — leaving everything beyond that boundary intact: - -```rust -// Remove entries up to and including this sync group's ReadyForQuery. -while let Some(item) = self.queue.pop_front() { - if item == ExecutionItem::Code(ExecutionCode::ReadyForQuery) { - break; - } -} -``` - -Resetting only when `is_empty()` is safe: pipelined requests still in the queue keep `extended = true` -until the entire pipeline finishes. +The Error arm drains only the failing sync group's entries — from the current queue head up to and +including its own `ReadyForQuery` boundary — using `drain(..rfq_pos)`, leaving everything beyond +intact. A fallback `queue.clear()` applies only when no `Code(ReadyForQuery)` exists in the queue +at all (nothing to preserve in that case). -A post-fix test should verify: (a) phase 3 above now produces `out_of_sync == false`, and (b) an -intermediate `ReadyForQuery` inside a pipelined extended request does not prematurely reset `extended`. +The old `pop_back + clear + push_back(RFQ)` pattern that caused the bug was removed. \ No newline at end of file diff --git a/pgdog/src/backend/prepared_statements.rs b/pgdog/src/backend/prepared_statements.rs index 76ad8480a..93d90c38d 100644 --- a/pgdog/src/backend/prepared_statements.rs +++ b/pgdog/src/backend/prepared_statements.rs @@ -231,9 +231,7 @@ impl PreparedStatements { } // Backend told us the copy is done. - 'c' => { - self.state.action(code)?; - } + 'c' => {} _ => (), } diff --git a/pgdog/src/backend/protocol/state.rs b/pgdog/src/backend/protocol/state.rs index 7aa320d8a..e47f47206 100644 --- a/pgdog/src/backend/protocol/state.rs +++ b/pgdog/src/backend/protocol/state.rs @@ -34,12 +34,6 @@ impl MemoryUsage for ExecutionCode { } } -impl ExecutionCode { - fn extended(&self) -> bool { - matches!(self, Self::ParseComplete | Self::BindComplete) - } -} - impl From for ExecutionCode { fn from(value: char) -> Self { match value { @@ -69,29 +63,17 @@ impl MemoryUsage for ExecutionItem { } } -impl ExecutionItem { - fn extended(&self) -> bool { - match self { - Self::Code(code) | Self::Ignore(code) => code.extended(), - } - } -} - #[derive(Debug, Clone, Default)] pub struct ProtocolState { queue: VecDeque, simulated: VecDeque, - extended: bool, out_of_sync: bool, } impl MemoryUsage for ProtocolState { #[inline] fn memory_usage(&self) -> usize { - self.queue.memory_usage() - + self.simulated.memory_usage() - + self.extended.memory_usage() - + self.out_of_sync.memory_usage() + self.queue.memory_usage() + self.simulated.memory_usage() + self.out_of_sync.memory_usage() } } @@ -104,7 +86,6 @@ impl ProtocolState { /// pub(crate) fn add_ignore(&mut self, code: impl Into) { let code = code.into(); - self.extended = self.extended || code.extended(); self.queue.push_back(ExecutionItem::Ignore(code)); } @@ -112,7 +93,6 @@ impl ProtocolState { /// to be returned by the server. pub(crate) fn add(&mut self, code: impl Into) { let code = code.into(); - self.extended = self.extended || code.extended(); self.queue.push_back(ExecutionItem::Code(code)) } @@ -172,11 +152,9 @@ impl ProtocolState { // This is the first (and client-visible) error in the chain. It is forwarded // so the client receives exactly one Error+RFQ for their request. - // For extended-protocol pipelines also mark out-of-sync so the connection - // is not reused until the client re-syncs. - if self.extended { - self.out_of_sync = true; - } + // Mark the state out-of-sync so the connection is not reused until the client re-syncs. + // For simple query it happens immediately after receiving the RFQ + self.out_of_sync = true; // find the first position for RFQ code to effectively // separate the pgdog-injected sub-request from the remaining queries @@ -224,7 +202,7 @@ impl ProtocolState { error!("Unexpected action {code:?}: queue is empty"); Error::ProtocolOutOfSync })?; - let action = match in_queue { + match in_queue { // The queue is waiting for the server to send ReadyForQuery, // but it sent something else. That means the execution pipeline // isn't done. We are not tracking every single message, so this is expected. @@ -248,13 +226,7 @@ impl ProtocolState { Err(Error::ProtocolOutOfSync) } } - }?; - - if code == ExecutionCode::ReadyForQuery { - self.extended = self.queue.iter().any(ExecutionItem::extended); } - - Ok(action) } pub(crate) fn in_copy_mode(&self) -> bool { @@ -287,7 +259,7 @@ impl ProtocolState { !self.out_of_sync } - /// Check if the protocol is out of sync due to an error in extended protocol. + /// Check if the protocol is out of sync due to an error. pub(crate) fn out_of_sync(&self) -> bool { self.out_of_sync } @@ -395,7 +367,6 @@ mod test { assert_eq!(state.action('C').unwrap(), Action::Forward); assert_eq!(state.action('Z').unwrap(), Action::Forward); assert!(state.is_empty()); - assert!(!state.extended); } #[test] @@ -542,41 +513,32 @@ mod test { #[test] fn test_simple_query_error_no_out_of_sync() { let mut state = ProtocolState::default(); - // Simple query error should NOT set out_of_sync + // Simple query error sets out_of_sync temporarily; it is cleared by the next RFQ. state.add('C'); // CommandComplete (expected but won't arrive) state.add('Z'); // ReadyForQuery - assert!(!state.extended); assert_eq!(state.action('E').unwrap(), Action::Forward); - assert!(!state.out_of_sync); // Simple query doesn't set out_of_sync + assert!(state.out_of_sync); // set on error, cleared on RFQ assert_eq!(state.action('Z').unwrap(), Action::Forward); + assert!(!state.out_of_sync); } - // A simple query that errors must not set out_of_sync, even when extended- - // protocol requests are queued after it. add() currently sets extended=true - // as soon as any ParseComplete is enqueued, causing the error arm to take - // the extended path for the preceding simple query. - // - // Currently FAILS: extended=true from queuing the future extended request - // causes out_of_sync to be set on the simple-query error. + // A simple-query error sets out_of_sync temporarily; the following RFQ clears it. + // Extended-query entries queued after the simple query are unaffected. #[test] fn test_simple_query_error_before_queued_extended_request_does_not_set_out_of_sync() { - // Setup: simple query followed by an extended query in the same queue. let mut state = ProtocolState::default(); - state.add('C'); // CommandComplete (simple query, won't arrive) - state.add('Z'); // ReadyForQuery (simple query) - state.add('1'); // ParseComplete (extended query) - state.add('2'); // BindComplete (extended query) + state.add('C'); // CommandComplete (simple query) + state.add('Z'); // ReadyForQuery (simple query) + state.add('1'); // ParseComplete (extended query) + state.add('2'); // BindComplete (extended query) state.add('C'); // CommandComplete (extended query) - state.add('Z'); // ReadyForQuery (extended query) + state.add('Z'); // ReadyForQuery (extended query) - // Simple query errors — must take the simple-query path. - assert_eq!(state.action('E').unwrap(), Action::Forward); - // out_of_sync must not be set: this was a simple-query error. - assert!(!state.out_of_sync); - // Simple query's ReadyForQuery remains; extended entries intact. + assert_eq!(state.action('E').unwrap(), Action::Forward); // error forwarded + assert!(state.out_of_sync); // set on error assert_eq!(state.action('Z').unwrap(), Action::Forward); // simple query RFQ - // Extended query is still processable. + assert!(!state.out_of_sync); // cleared by RFQ; extended query intact assert_eq!(state.action('1').unwrap(), Action::Forward); // ParseComplete assert_eq!(state.action('2').unwrap(), Action::Forward); // BindComplete assert_eq!(state.action('C').unwrap(), Action::Forward); // CommandComplete @@ -584,37 +546,27 @@ mod test { assert!(state.is_empty()); } - // The extended flag recalculation at ReadyForQuery scans the entire remaining - // queue, so a simple query that sits before an extended query inherits - // extended=true after the preceding ReadyForQuery is consumed. - // - // Currently FAILS: after the first simple query's RFQ, extended is set to - // true because the scan finds ParseComplete/BindComplete further in the queue. - // The second simple query's error then incorrectly takes the extended path. + // out_of_sync is cleared by each sub-request's own RFQ, so a simple-query error + // between two RFQ boundaries does not bleed into subsequent extended-query entries. #[test] fn test_simple_query_error_after_rfq_before_extended_does_not_set_out_of_sync() { - // Setup: two simple queries, then an extended query. let mut state = ProtocolState::default(); state.add('C'); // CommandComplete (simple query 1) - state.add('Z'); // ReadyForQuery (simple query 1) - state.add('C'); // CommandComplete (simple query 2, won't arrive) - state.add('Z'); // ReadyForQuery (simple query 2) - state.add('1'); // ParseComplete (extended query) - state.add('2'); // BindComplete (extended query) + state.add('Z'); // ReadyForQuery (simple query 1) + state.add('C'); // CommandComplete (simple query 2) + state.add('Z'); // ReadyForQuery (simple query 2) + state.add('1'); // ParseComplete (extended query) + state.add('2'); // BindComplete (extended query) state.add('C'); // CommandComplete (extended query) - state.add('Z'); // ReadyForQuery (extended query) + state.add('Z'); // ReadyForQuery (extended query) - // Simple query 1 runs normally. - assert_eq!(state.action('C').unwrap(), Action::Forward); // CommandComplete - assert_eq!(state.action('Z').unwrap(), Action::Forward); // ReadyForQuery + assert_eq!(state.action('C').unwrap(), Action::Forward); // simple query 1 OK + assert_eq!(state.action('Z').unwrap(), Action::Forward); - // Simple query 2 errors — must still take the simple-query path. - assert_eq!(state.action('E').unwrap(), Action::Forward); - // out_of_sync must not be set: this was a simple-query error. - assert!(!state.out_of_sync); - // Simple query 2's ReadyForQuery remains; extended entries intact. + assert_eq!(state.action('E').unwrap(), Action::Forward); // simple query 2 errors + assert!(state.out_of_sync); // set on error assert_eq!(state.action('Z').unwrap(), Action::Forward); // simple query 2 RFQ - // Extended query is still processable. + assert!(!state.out_of_sync); // cleared; extended query intact assert_eq!(state.action('1').unwrap(), Action::Forward); // ParseComplete assert_eq!(state.action('2').unwrap(), Action::Forward); // BindComplete assert_eq!(state.action('C').unwrap(), Action::Forward); // CommandComplete @@ -926,7 +878,6 @@ mod test { state.add('Z'); // ReadyForQuery assert_eq!(state.action('1').unwrap(), Action::Forward); - assert!(state.extended); // Now marked as extended assert_eq!(state.action('2').unwrap(), Action::Forward); assert_eq!(state.action('C').unwrap(), Action::Forward); assert_eq!(state.action('Z').unwrap(), Action::Forward); @@ -1015,61 +966,25 @@ mod test { assert_eq!(state.len(), 1); // RFQ still present for the server's ReadyForQuery } - // Failure path: no Code(ReadyForQuery) backstop — second action('c') hits empty queue. + // Documents raw state-machine behavior: calling action('c') twice with no RFQ backstop + // causes ProtocolOutOfSync. forward() was the only caller that did this; the second call + // has been removed from the 'c' arm in prepared_statements.rs, making this path unreachable + // through normal protocol flow. The test is kept to pin the underlying invariant. #[test] fn test_copydone_double_action_oos_without_rfq_backstop() { let mut state = ProtocolState::default(); - // 1. Queue: Execute + Flush (no Sync) — no RFQ backstop. + // Queue: Execute + Flush (no Sync) — no RFQ backstop. state.add('C'); // ExecutionCompleted - // 2. First action('c'): pops ExecutionCompleted; queue empty. + // First action('c'): pops ExecutionCompleted; queue empty. assert_eq!(state.action('c').unwrap(), Action::Forward); assert!(state.is_empty()); - // 3. Second action('c'): empty queue → ProtocolOutOfSync. + // Second action('c') directly: empty queue → ProtocolOutOfSync. + // This is the raw state machine. forward() no longer makes this second call. assert!(state.action('c').is_err()); } - // Stale RFQ arrives before injected ParseComplete — Ignore arm rejects the mismatch. - #[test] - fn test_stale_rfq_hits_ignore_parsecomplete() { - let mut state = ProtocolState::default(); - // 1. pgdog injects Parse; queue: [Ignore(ParseComplete), BindComplete, CommandComplete, RFQ]. - state.add_ignore('1'); // ParseComplete — injected - state.add('2'); // BindComplete - state.add('C'); // CommandComplete - state.add('Z'); // ReadyForQuery - - // Stale RFQ from prior cycle arrives before ParseComplete. - // ReadyForQuery != ParseComplete → ProtocolOutOfSync. - assert!( - state.action('Z').is_err(), - "stale RFQ against Ignore(ParseComplete) must produce ProtocolOutOfSync" - ); - } - - // Variant: stale RFQ hits Ignore(BindComplete) — same mismatch for any Ignore slot. - #[test] - fn test_stale_rfq_hits_ignore_bindcomplete() { - let mut state = ProtocolState::default(); - // Both Parse and Bind are injected (Describe path). - state.add_ignore('1'); // ParseComplete — injected - state.add_ignore('2'); // BindComplete — injected - state.add('T'); // RowDescription - state.add('C'); // CommandComplete - state.add('Z'); // ReadyForQuery - - // ParseComplete arrives normally and is swallowed. - assert_eq!(state.action('1').unwrap(), Action::Ignore); - - // Queue front is now Ignore(BindComplete). - // A stale RFQ arrives before BindComplete → ProtocolOutOfSync. - assert!( - state.action('Z').is_err(), - "stale RFQ against Ignore(BindComplete) must produce ProtocolOutOfSync" - ); - } - // Happy path: injected ParseComplete arrives in order — silently ignored, rest forwarded. #[test] fn test_injected_parse_happy_path() { diff --git a/pgdog/src/backend/server.rs b/pgdog/src/backend/server.rs index ea1a126e7..16c00307a 100644 --- a/pgdog/src/backend/server.rs +++ b/pgdog/src/backend/server.rs @@ -3688,9 +3688,12 @@ pub mod test { assert_eq!(msg.code(), 'Z'); // 'Z' RFQ — queue now empty } - // Extended Execute + Flush (no Sync): no RFQ backstop — double action('c') raises ProtocolOutOfSync. + // Extended Execute + Flush (no Sync): single action('c') now succeeds. + // CopyDone is forwarded to client; the trailing CommandComplete then hits an empty + // queue (no RFQ backstop, no Sync) and raises ProtocolOutOfSync. + // This is distinct from the former double-action bug, which fired on CopyDone itself. #[tokio::test] - async fn test_copydone_double_action_oos_without_sync() { + async fn test_copydone_single_action_without_sync() { let mut server = test_server().await; // 1. Parse + Bind + Execute + Flush (not Sync); no RFQ backstop in queue. @@ -3715,10 +3718,14 @@ pub mod test { assert_eq!(server.read().await.unwrap().code(), 'H'); // CopyOutResponse assert_eq!(server.read().await.unwrap().code(), 'd'); // CopyData row 1 assert_eq!(server.read().await.unwrap().code(), 'd'); // CopyData row 2 - // 3. BUG: CopyDone — first action() pops ExecutionCompleted; second hits empty queue. + + // 3. CopyDone — fixed: single action() pops ExecutionCompleted; no second call. + assert_eq!(server.read().await.unwrap().code(), 'c'); // CopyDone forwarded + + // 4. CommandComplete hits empty queue (no RFQ backstop without Sync). assert!( matches!(server.read().await.unwrap_err(), Error::ProtocolOutOfSync), - "expected ProtocolOutOfSync" + "expected ProtocolOutOfSync on CommandComplete with empty queue" ); } From f1b068faf3f7c5904410492c4eba3366be72a102 Mon Sep 17 00:00:00 2001 From: meskill <8974488+meskill@users.noreply.github.com> Date: Wed, 8 Apr 2026 18:37:48 +0000 Subject: [PATCH 6/6] cleanup dev changes --- integration/common.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integration/common.sh b/integration/common.sh index d6a474d7d..812acc69f 100644 --- a/integration/common.sh +++ b/integration/common.sh @@ -23,8 +23,8 @@ function run_pgdog() { local config_file="${COMMON_DIR}/pgdog.config" if [ -z "${binary}" ]; then # Testing in release is faster and mirrors production. - cargo build - binary="target/debug/pgdog" + cargo build --release + binary="target/release/pgdog" fi if [ -f "${pid_file}" ]; then local existing_pid=$(cat "${pid_file}")