Skip to content

fix(controlplane): stream COPY FROM STDIN bytes to remote workers#552

Merged
fuziontech merged 1 commit into
mainfrom
fix/copy-from-stdin-remote-worker
May 7, 2026
Merged

fix(controlplane): stream COPY FROM STDIN bytes to remote workers#552
fuziontech merged 1 commit into
mainfrom
fix/copy-from-stdin-remote-worker

Conversation

@fuziontech
Copy link
Copy Markdown
Member

TL;DR

COPY FROM STDIN from any client (Fivetran, dbt, psql \copy, etc.) is broken on the multitenant K8s control plane and has been since that backend shipped. Every COPY fails with:

IO Error: No files found that match the pattern "/tmp/duckgres-copy-*.csv"

This PR fixes it by streaming the CSV bytes through Flight DoPut to the worker pod instead of relying on a shared filesystem.

Root cause (kept in case you skipped #551)

server/conn.go handleCopyIn spooled CSV bytes into the control plane pod's local /tmp, then sent COPY <table> FROM '/tmp/<spool>' (...) to the executor. In standalone and process-backend topologies the worker shares that filesystem; in remote-worker mode the worker is a separate pod (CP only mounts /app/certs, workers have an independent /data emptyDir) so the path resolves to nothing on the worker.

server/flightclient/flight_executor.go:269 already flagged this — it explicitly errors out of ConnContext with "use batched INSERT for COPY FROM" — but handleCopyIn had no Flight-aware branch.

The fix

Add an optional capability sqlcore.CopyFromStdinExecutor that the wire handler delegates to when the executor implements it. The Flight executor does; standalone and process-backend executors don't (they keep the existing local-tempfile path unchanged).

Wire layout (CP → worker, on a single Flight DoPut stream)

frame 0:   FlightDescriptor{
             Type = PATH,
             Path = ["duckgres-copy-from-stdin"],
             Cmd  = "COPY t (...) FROM '__DUCKGRES_COPY_PATH__' (FORMAT CSV, ...)"
           }
frame 1+:  DataBody = <1 MiB chunk of CSV bytes>
...
(client closes send)
server:    PutResult{ AppMetadata = DoPutUpdateResult{RecordCount = N} }

The CP builds the COPY SQL with BuildDuckDBCopyFromSQL as before, except the path is the placeholder. The worker writes the streamed bytes to its own /tmp, replaces the placeholder with the worker-local path, then runs the COPY against session.Conn.

Worker dispatch

customActionServer.DoPut peeks at the first FlightData frame and routes:

  • If the descriptor matches IsCopyFromStdinDescriptordoCopyFromStdin handles the rest of the stream.
  • Otherwise → standard Flight SQL DoPut handler runs, with a small prebufferedDoPutServer adapter that replays the first frame so the standard handler sees the full stream.

This means existing CommandStatementUpdate traffic continues unchanged. The only new wire interaction is gated on a descriptor type that no other client produces.

CP wire reader

copyDataWireReader in server/conn.go adapts the wire MsgCopyData / MsgCopyDone / MsgCopyFail stream into an io.Reader. CopyDone surfaces as clean io.EOF; CopyFail and unexpected messages set sticky flags the caller checks after the streamer returns. Avoids any double-buffering on the CP — bytes go wire → io.Reader → DoPut frame → worker tempfile.

Files

  • server/sqlcore/interfaces.go — new CopyFromStdinExecutor optional capability.
  • server/flightclient/copyfromstdin.go — CP-side (*FlightExecutor).CopyFromStdin + descriptor / placeholder constants.
  • duckdbservice/copy_from_stdin.go — worker-side handler doCopyFromStdin and IsCopyFromStdinDescriptor.
  • duckdbservice/service.goDoPut interceptor on customActionServer and prebufferedDoPutServer adapter.
  • server/conn.gohandleCopyIn delegates to handleCopyInRemoteStreaming when the executor implements the capability; new copyDataWireReader.

Tests

  • server/flightclient/copyfromstdin_test.go — fake gRPC server, asserts CP sends correct descriptor (path, COPY SQL in Cmd), splits the payload into multiple frames for >1 MiB inputs, parses the DoPutUpdateResult.RecordCount from the response.
  • duckdbservice/copy_from_stdin_test.go:
    • IsCopyFromStdinDescriptor matcher coverage (nil, wrong type, wrong path, correct).
    • doCopyFromStdin end-to-end against an in-memory DuckDB session: ingests a 3-row CSV split across two FlightData frames, runs the COPY, asserts DoPutUpdateResult.RecordCount == 3 and that rows actually landed in the table.
    • Rejects a COPY SQL that's missing the path placeholder.
    • Rejects calls without a session token in metadata.

go test ./... and go test -tags kubernetes ./... are green (the two pre-existing controlplane/admin failures are missing-docker-compose on my laptop, unrelated).

Test plan (post-merge)

  • Deploy to staging MTCP, replay a Fivetran sync of posthog_data_import → billing_*. Confirm rows land in DuckLake-backed tables and the worker-side /tmp/duckgres-worker-copy-*.csv files come and go.
  • Spot-check a psql \copy ... FROM STDIN against a remote-worker session.
  • Confirm standalone and process-backend behavior is unchanged (same duckgres-copy-*.csv path, same logs).

Related

🤖 Generated with Claude Code

In remote-worker (multitenant K8s) mode the control plane and worker
live in different pods with no shared volume. The existing COPY FROM
STDIN handler spooled CSV bytes into the control plane's local /tmp
and then issued `COPY ... FROM '/tmp/<spool>'` to the executor; the
worker tried to open that path against its own filesystem and failed
with "IO Error: No files found that match the pattern". Every Fivetran
sync against MTCP has been failing on this path.

Add an optional sqlcore.CopyFromStdinExecutor capability the wire
handler dispatches to when the executor is remote. The Flight executor
implements it by:

  - Opening a Flight DoPut stream with a custom FlightDescriptor
    (PATH=["duckgres-copy-from-stdin"], Cmd=COPY-SQL-template).
  - Streaming CSV bytes in 1 MiB FlightData frames from the wire
    CopyData reader straight through to the worker (no double-spool
    on the CP side).
  - Reading the standard DoPutUpdateResult for rows-affected.

The worker dispatches the custom descriptor before the standard
Flight SQL router via a thin DoPut interceptor on customActionServer.
A buffered FlightData adapter replays the first frame so unmatched
streams still flow into the standard handler unchanged.

The COPY SQL template carries a `__DUCKGRES_COPY_PATH__` placeholder
that the worker substitutes with its own tempfile path before
executing — the rest of BuildDuckDBCopyFromSQL stays on the CP.

Tests:
  - flightclient: CP-side streamer sends descriptor + multi-frame
    payload, parses RecordCount.
  - duckdbservice: descriptor matcher; full doCopyFromStdin against
    an in-memory DuckDB session round-trips three rows; rejects
    missing placeholder; rejects missing session metadata.

Local executors (standalone / process backend) don't implement the
optional interface, so they keep using the existing local-tempfile
path unchanged.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@fuziontech fuziontech force-pushed the fix/copy-from-stdin-remote-worker branch from 727c634 to 3a6e6e4 Compare May 7, 2026 22:00
@fuziontech
Copy link
Copy Markdown
Member Author

Self-review pass found a correctness bug and a couple of cosmetic issues; pushed a fix-up:

Correctness — wire-level COPY cancellation could partially commit
The first version had copyDataWireReader.Read return io.EOF on MsgCopyFail. The streamer treated that as clean end-of-stream and called CloseSend, so the worker would happily run COPY on whatever bytes had already streamed in — silently committing a partially-cancelled load.

Fix: reader now returns a sentinel errCopyAborted on CopyFail / unexpected wire messages. The streamer propagates the error and returns BEFORE CloseSend. Its deferred cancel() cancels the gRPC stream, the worker's stream.Recv then returns codes.Canceled and bails out before session.Conn.ExecContext. Sticky flags on the reader still drive the client-facing error code (57014 cancellation vs. 08P01 protocol error vs. 22P02 generic).

Regression testTestDoCopyFromStdinAbortsOnStreamCancellation simulates a cancelled gRPC stream on the worker after one chunk has been buffered to its tempfile. Asserts: handler returns codes.Aborted, table row count == 0, no PutResult sent.

Cosmetic — Removed unused headerSent atomic.Bool and sync.Mutex from the test fake (single-goroutine), used errors.Is(readErr, io.EOF) consistently in the streamer.

go test ./... and go build -tags kubernetes ./... clean. Staticcheck reports no issues from the new code (only the pre-existing worker_mgr.go warnings).

@fuziontech fuziontech merged commit 42246bd into main May 7, 2026
22 checks passed
@fuziontech fuziontech deleted the fix/copy-from-stdin-remote-worker branch May 7, 2026 22:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant