Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/pages/apis/client.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ const client = new Client()
await client.connect()
```

## client.pipelining

`client.pipelining: boolean`

When set to `true`, queries are sent to the server immediately without waiting for previous queries to complete. Defaults to `false`. See [Pipelining](/features/pipelining) for details and examples.

```js
const client = new Client()
await client.connect()
client.pipelining = true
```

## client.query

### QueryConfig
Expand Down
5 changes: 5 additions & 0 deletions docs/pages/apis/pool.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ type Config = {
// If the function throws or returns a promise that rejects, the client is destroyed
// and the error is returned to the caller requesting the connection.
onConnect?: (client: Client) => void | Promise<void>

// When set to true, enables query pipelining on every client the pool creates.
// Pipelined clients send queries to the server without waiting for previous responses.
// Default is false. See /features/pipelining for details.
pipelining?: boolean
}
```

Expand Down
1 change: 1 addition & 0 deletions docs/pages/features/_meta.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
export default {
connecting: 'Connecting',
queries: 'Queries',
pipelining: 'Pipelining',
pooling: 'Pooling',
transactions: 'Transactions',
types: 'Data Types',
Expand Down
132 changes: 132 additions & 0 deletions docs/pages/features/pipelining.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
---
title: Pipelining
---

import { Alert } from '/components/alert.tsx'

## What is pipelining?

By default node-postgres waits for each query to complete before sending the next one. This means every query pays a full network round-trip of latency. **Query pipelining** sends multiple queries to the server without waiting for responses, and the server processes them in order. Each query still gets its own result (or error), but you avoid the idle time between them.

```
sequential (default) pipelined
───────────────────── ─────────────────────
client ──Parse──▶ server client ──Parse──▶ server
client ◀──Ready── server ──Parse──▶
client ──Parse──▶ server ──Parse──▶
client ◀──Ready── server client ◀──Ready── server
client ──Parse──▶ server client ◀──Ready── server
client ◀──Ready── server client ◀──Ready── server
```

In benchmarks, pipelining typically delivers **2-3x throughput** for batches of simple queries on a local connection, with larger gains over higher-latency links.

## Enabling pipelining

Pipelining is opt-in. Set `client.pipelining = true` after connecting:

```js
import { Client } from 'pg'

const client = new Client()
await client.connect()
client.pipelining = true

const [r1, r2, r3] = await Promise.all([
client.query('SELECT 1 AS num'),
client.query('SELECT 2 AS num'),
client.query('SELECT 3 AS num'),
])

console.log(r1.rows[0].num, r2.rows[0].num, r3.rows[0].num) // 1 2 3

await client.end()
```

All query types work with pipelining: plain text, parameterized, and named prepared statements.

## Pipelining with a pool

Pass `pipelining: true` in the pool config to enable it on every client the pool creates:

```js
import { Pool } from 'pg'

const pool = new Pool({ pipelining: true })

const client = await pool.connect()
// client.pipelining is already true

const [users, orders] = await Promise.all([
client.query('SELECT * FROM users WHERE id = $1', [1]),
client.query('SELECT * FROM orders WHERE user_id = $1', [1]),
])

client.release()
```

<Alert>
<div>
<code>pool.query()</code> checks out a client for a single query and releases it immediately, so pipelining has no effect there. Use <code>pool.connect()</code> to check out a client and send multiple queries on it.
</div>
</Alert>

## Error isolation

Each pipelined query gets its own error boundary. A failing query in the middle of a batch does not break the other queries:

```js
const results = await Promise.allSettled([
client.query('SELECT 1 AS num'),
client.query('SELECT INVALID SYNTAX'),
client.query('SELECT 3 AS num'),
])

console.log(results[0].status) // 'fulfilled'
console.log(results[1].status) // 'rejected'
console.log(results[2].status) // 'fulfilled'
```

This works because node-postgres sends a `Sync` message after each query, which is how PostgreSQL delimits error boundaries in the extended query protocol.

## Named prepared statements

Named prepared statements work with pipelining. When two pipelined queries share the same statement name, node-postgres sends `Parse` only once and reuses the prepared statement for subsequent queries:

```js
const queries = Array.from({ length: 100 }, (_, i) => ({
name: 'get-user',
text: 'SELECT * FROM users WHERE id = $1',
values: [i],
}))

const results = await Promise.all(queries.map(q => client.query(q)))
```

## Graceful shutdown

Calling `client.end()` while pipelined queries are in flight will wait for all of them to complete before closing the connection:

```js
client.pipelining = true

const p1 = client.query('SELECT 1')
const p2 = client.query('SELECT 2')
const endPromise = client.end()

// Both queries will resolve normally
const [r1, r2] = await Promise.all([p1, p2])
await endPromise
```

## When to use pipelining

Pipelining is most useful when you have multiple **independent** queries that don't depend on each other's results. Common use cases:

- Fetching data from multiple tables in parallel for a page load
- Inserting or updating multiple rows simultaneously
- Running a batch of analytics queries

<div className="alert alert-warning">
Do not use pipelining inside a transaction if you need to read the result of one query before issuing the next. Pipelined queries are all sent before any responses arrive, so you cannot branch on intermediate results. For dependent queries within a transaction, use sequential <code>await</code> calls instead.
</div>
3 changes: 3 additions & 0 deletions packages/pg-pool/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ class Pool extends EventEmitter {

newClient(pendingItem) {
const client = new this.Client(this.options)
if (this.options.pipelining) {
client.pipelining = true
}
this._clients.push(client)
const idleListener = makeIdleListener(this, client)

Expand Down
27 changes: 27 additions & 0 deletions packages/pg-pool/test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,33 @@ describe('pool', function () {
})
})

it('enables pipelining on clients when configured', async function () {
const pool = new Pool({ pipelining: true })
const client = await pool.connect()
expect(client.pipelining).to.be(true)

const [r1, r2, r3] = await Promise.all([
client.query('SELECT 1 AS num'),
client.query('SELECT 2 AS num'),
client.query('SELECT 3 AS num'),
])

expect(r1.rows[0].num).to.eql(1)
expect(r2.rows[0].num).to.eql(2)
expect(r3.rows[0].num).to.eql(3)

client.release()
return pool.end()
})

it('does not enable pipelining by default', async function () {
const pool = new Pool()
const client = await pool.connect()
expect(client.pipelining).to.be(false)
client.release()
return pool.end()
})

it('recovers from query errors', function () {
const pool = new Pool()

Expand Down
102 changes: 102 additions & 0 deletions packages/pg/bench-pipelining.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
'use strict'
const pg = require('./lib')

// Queries to benchmark
const SIMPLE = { text: 'SELECT 1' }
const PARAM = {
text: 'SELECT $1::int AS n',
values: [42],
}
const NAMED = {
name: 'bench-named',
text: 'SELECT $1::int AS n',
values: [42],
}

async function bench(label, fn, seconds) {
// warmup
for (let i = 0; i < 100; i++) await fn()

const deadline = Date.now() + seconds * 1000
let count = 0
while (Date.now() < deadline) {
await fn()
count++
}
const qps = (count / seconds).toFixed(0)
console.log(` ${label}: ${qps} qps (${count} queries in ${seconds}s)`)
return count / seconds
}

async function benchPipelined(label, makeQueries, batchSize, seconds) {
const client = new pg.Client()
await client.connect()
client.pipelining = true

// warmup
for (let i = 0; i < 10; i++) {
await Promise.all(makeQueries(batchSize).map((q) => client.query(q)))
}

const deadline = Date.now() + seconds * 1000
let count = 0
while (Date.now() < deadline) {
const queries = makeQueries(batchSize)
await Promise.all(queries.map((q) => client.query(q)))
count += batchSize
}
const qps = (count / seconds).toFixed(0)
console.log(` ${label} (batch=${batchSize}): ${qps} qps`)

await client.end()
return count / seconds
}

async function runSerial(label, query, seconds) {
const client = new pg.Client()
await client.connect()

const qps = await bench(label, () => client.query(query), seconds)
await client.end()
return qps
}

async function run() {
const SECONDS = 5
const BATCH = 10

console.log('\n=== Serial (no pipelining) ===')
const serialSimple = await runSerial('simple SELECT 1', SIMPLE, SECONDS)
const serialParam = await runSerial('parameterized', PARAM, SECONDS)
const serialNamed = await runSerial('named prepared', NAMED, SECONDS)

console.log('\n=== Pipelined ===')
const pipedSimple = await benchPipelined(
'simple SELECT 1',
(n) => Array.from({ length: n }, () => ({ text: 'SELECT 1' })),
BATCH,
SECONDS
)
const pipedParam = await benchPipelined(
'parameterized',
(n) => Array.from({ length: n }, () => ({ text: 'SELECT $1::int AS n', values: [42] })),
BATCH,
SECONDS
)
const pipedNamed = await benchPipelined(
'named prepared',
(n) => Array.from({ length: n }, (_, i) => ({ name: `bench-named-${i}`, text: 'SELECT $1::int AS n', values: [42] })),

Check failure on line 88 in packages/pg/bench-pipelining.js

View workflow job for this annotation

GitHub Actions / lint

Insert `⏎·····`
BATCH,
SECONDS
)

console.log('\n=== Speedup ===')
console.log(` simple: ${(pipedSimple / serialSimple).toFixed(2)}x`)
console.log(` parameterized: ${(pipedParam / serialParam).toFixed(2)}x`)
console.log(` named: ${(pipedNamed / serialNamed).toFixed(2)}x`)
}

run().catch((e) => {
console.error(e)
process.exit(1)
})
Loading
Loading