diff --git a/README.md b/README.md index 5931b1c..2767109 100644 --- a/README.md +++ b/README.md @@ -233,7 +233,7 @@ window.onload = connectWebSocket ```
@@ -243,6 +243,15 @@ curl --location 'https://starbasedb.YOUR-ID-HERE.workers.dev/export/dump' \
+For very large databases the export can exceed the 30 second request limit. Bind an R2 bucket as `DATABASE_DUMP_BUCKET` (see `wrangler.toml`) and pass `?location=r2` to stream the dump into an R2 object named `dump_YYYYMMDD-HHMMSS.sql` instead of returning it in the response. The request returns immediately with the object key while the upload finishes in the background. Provide an optional `&callback=
+
+curl --location 'https://starbasedb.YOUR-ID-HERE.workers.dev/export/dump?location=r2&callback=https://example.com/notify' \
+--header 'Authorization: Bearer ABC123'
+
+
+
diff --git a/src/export/dump.test.ts b/src/export/dump.test.ts
index ca65b43..375b970 100644
--- a/src/export/dump.test.ts
+++ b/src/export/dump.test.ts
@@ -1,5 +1,10 @@
import { describe, it, expect, vi, beforeEach } from 'vitest'
-import { dumpDatabaseRoute } from './dump'
+import {
+ dumpDatabaseRoute,
+ generateDumpChunks,
+ escapeSqlValue,
+ formatDumpTimestamp,
+} from './dump'
import { executeOperation } from '.'
import { createResponse } from '../utils'
import type { DataSource } from '../types'
@@ -24,6 +29,7 @@ let mockConfig: StarbaseDBConfiguration
beforeEach(() => {
vi.clearAllMocks()
+ vi.unstubAllGlobals()
mockDataSource = {
source: 'external',
@@ -141,5 +147,200 @@ describe('Database Dump Module', () => {
expect(response.status).toBe(500)
const jsonResponse: { error: string } = await response.json()
expect(jsonResponse.error).toBe('Failed to create database dump')
+ consoleErrorMock.mockRestore()
+ })
+})
+
+describe('escapeSqlValue', () => {
+ it('should render null and undefined as NULL', () => {
+ expect(escapeSqlValue(null)).toBe('NULL')
+ expect(escapeSqlValue(undefined)).toBe('NULL')
+ })
+
+ it('should render numbers and booleans without quotes', () => {
+ expect(escapeSqlValue(42)).toBe('42')
+ expect(escapeSqlValue(3.14)).toBe('3.14')
+ expect(escapeSqlValue(true)).toBe('1')
+ expect(escapeSqlValue(false)).toBe('0')
+ })
+
+ it('should render binary BLOB values as hex literals', () => {
+ const blob = new Uint8Array([0x00, 0x0f, 0xff])
+ expect(escapeSqlValue(blob)).toBe("X'000fff'")
+ })
+
+ it('should escape embedded single quotes in strings', () => {
+ expect(escapeSqlValue("O'Brien")).toBe("'O''Brien'")
+ })
+})
+
+describe('formatDumpTimestamp', () => {
+ it('should format a date as YYYYMMDD-HHMMSS in UTC', () => {
+ expect(formatDumpTimestamp(new Date('2024-01-01T17:00:00Z'))).toBe(
+ '20240101-170000'
+ )
+ })
+
+ it('should zero-pad single digit components', () => {
+ expect(formatDumpTimestamp(new Date('2024-03-05T07:08:09Z'))).toBe(
+ '20240305-070809'
+ )
+ })
+})
+
+describe('generateDumpChunks pagination', () => {
+ it('should page through table data without loading it all at once', async () => {
+ vi.mocked(executeOperation)
+ .mockResolvedValueOnce([{ sql: 'CREATE TABLE t (id INTEGER);' }])
+ .mockResolvedValueOnce([{ id: 1 }, { id: 2 }])
+ .mockResolvedValueOnce([{ id: 3 }])
+
+ const chunks: string[] = []
+ for await (const chunk of generateDumpChunks(
+ ['t'],
+ mockDataSource,
+ mockConfig,
+ 2
+ )) {
+ chunks.push(chunk)
+ }
+
+ const dump = chunks.join('')
+ expect(dump).toContain('INSERT INTO t VALUES (1);')
+ expect(dump).toContain('INSERT INTO t VALUES (2);')
+ expect(dump).toContain('INSERT INTO t VALUES (3);')
+
+ // schema query + two paged data queries (offset 0, then offset 2)
+ const calls = vi.mocked(executeOperation).mock.calls
+ expect(calls).toHaveLength(3)
+ expect(calls[1][0][0].sql).toContain('OFFSET 0')
+ expect(calls[2][0][0].sql).toContain('OFFSET 2')
+ })
+})
+
+describe('Database Dump R2 offload', () => {
+ function createMockUpload() {
+ return {
+ uploadPart: vi.fn((partNumber: number) =>
+ Promise.resolve({ partNumber, etag: `etag-${partNumber}` })
+ ),
+ complete: vi.fn().mockResolvedValue({}),
+ abort: vi.fn().mockResolvedValue(undefined),
+ }
+ }
+
+ it('should stream the dump into an R2 multipart object', async () => {
+ const upload = createMockUpload()
+ const bucket = {
+ createMultipartUpload: vi.fn().mockResolvedValue(upload),
+ }
+ mockDataSource.dumpBucket = bucket as any
+
+ vi.mocked(executeOperation)
+ .mockResolvedValueOnce([{ name: 'logs' }])
+ .mockResolvedValueOnce([{ sql: 'CREATE TABLE logs (id INTEGER);' }])
+ .mockResolvedValueOnce([{ id: 1 }])
+
+ const request = new Request(
+ 'https://example.com/export/dump?location=r2'
+ )
+ const response = await dumpDatabaseRoute(
+ mockDataSource,
+ mockConfig,
+ request
+ )
+
+ expect(response.status).toBe(200)
+ const body: { result: { key: string; status: string } } =
+ await response.json()
+ expect(body.result.key).toMatch(/^dump_\d{8}-\d{6}\.sql$/)
+ expect(body.result.status).toBe('completed')
+
+ expect(bucket.createMultipartUpload).toHaveBeenCalledWith(
+ body.result.key
+ )
+ expect(upload.uploadPart).toHaveBeenCalledTimes(1)
+ expect(upload.complete).toHaveBeenCalledWith([
+ { partNumber: 1, etag: 'etag-1' },
+ ])
+ })
+
+ it('should notify the callback URL once the upload completes', async () => {
+ const upload = createMockUpload()
+ const bucket = {
+ createMultipartUpload: vi.fn().mockResolvedValue(upload),
+ }
+ mockDataSource.dumpBucket = bucket as any
+
+ const fetchMock = vi.fn().mockResolvedValue(new Response('ok'))
+ vi.stubGlobal('fetch', fetchMock)
+
+ vi.mocked(executeOperation)
+ .mockResolvedValueOnce([{ name: 'logs' }])
+ .mockResolvedValueOnce([{ sql: 'CREATE TABLE logs (id INTEGER);' }])
+ .mockResolvedValueOnce([])
+
+ const request = new Request(
+ 'https://example.com/export/dump?location=r2&callback=https://hooks.example.com/done'
+ )
+ await dumpDatabaseRoute(mockDataSource, mockConfig, request)
+
+ expect(fetchMock).toHaveBeenCalledTimes(1)
+ const [calledUrl, calledInit] = fetchMock.mock.calls[0]
+ expect(calledUrl).toBe('https://hooks.example.com/done')
+ expect(calledInit.method).toBe('POST')
+ expect(JSON.parse(calledInit.body).status).toBe('completed')
+ })
+
+ it('should return 202 and run in the background when an execution context is present', async () => {
+ const upload = createMockUpload()
+ const bucket = {
+ createMultipartUpload: vi.fn().mockResolvedValue(upload),
+ }
+ const pending: Promise[] = []
+ mockDataSource.dumpBucket = bucket as any
+ mockDataSource.executionContext = {
+ waitUntil: vi.fn((p: Promise) => pending.push(p)),
+ } as any
+
+ vi.mocked(executeOperation)
+ .mockResolvedValueOnce([{ name: 'logs' }])
+ .mockResolvedValueOnce([{ sql: 'CREATE TABLE logs (id INTEGER);' }])
+ .mockResolvedValueOnce([])
+
+ const request = new Request(
+ 'https://example.com/export/dump?location=r2'
+ )
+ const response = await dumpDatabaseRoute(
+ mockDataSource,
+ mockConfig,
+ request
+ )
+
+ expect(response.status).toBe(202)
+ expect(
+ mockDataSource.executionContext!.waitUntil
+ ).toHaveBeenCalledTimes(1)
+
+ // Let the backgrounded upload settle so assertions are deterministic.
+ await Promise.all(pending)
+ expect(upload.complete).toHaveBeenCalledTimes(1)
+ })
+
+ it('should return 400 when R2 offload is requested without a bucket binding', async () => {
+ vi.mocked(executeOperation).mockResolvedValueOnce([])
+
+ const request = new Request(
+ 'https://example.com/export/dump?location=r2'
+ )
+ const response = await dumpDatabaseRoute(
+ mockDataSource,
+ mockConfig,
+ request
+ )
+
+ expect(response.status).toBe(400)
+ const body: { error: string } = await response.json()
+ expect(body.error).toContain('DATABASE_DUMP_BUCKET')
})
})
diff --git a/src/export/dump.ts b/src/export/dump.ts
index 91a2e89..c94d614 100644
--- a/src/export/dump.ts
+++ b/src/export/dump.ts
@@ -3,67 +3,314 @@ import { StarbaseDBConfiguration } from '../handler'
import { DataSource } from '../types'
import { createResponse } from '../utils'
-export async function dumpDatabaseRoute(
+// SQLite file header written at the top of every dump.
+const SQLITE_HEADER = 'SQLite format 3\0'
+
+// Number of rows fetched per page. Paging keeps the amount of data held in
+// memory bounded regardless of how large the underlying table is, so even a
+// multi-gigabyte table can be dumped without loading it all at once.
+const DUMP_PAGE_SIZE = 1000
+
+// R2 requires every part of a multipart upload (except the final one) to be at
+// least 5 MiB. We buffer encoded chunks until they cross this threshold before
+// flushing a part, which lets us stream an arbitrarily large dump into a single
+// R2 object without ever holding the whole file in memory.
+const R2_MIN_PART_SIZE = 5 * 1024 * 1024
+
+/**
+ * Format a timestamp into the `YYYYMMDD-HHMMSS` form used for dump filenames,
+ * e.g. `2024-01-01 17:00:00` UTC becomes `20240101-170000`.
+ */
+export function formatDumpTimestamp(date: Date): string {
+ const pad = (value: number) => String(value).padStart(2, '0')
+ const year = date.getUTCFullYear()
+ const month = pad(date.getUTCMonth() + 1)
+ const day = pad(date.getUTCDate())
+ const hours = pad(date.getUTCHours())
+ const minutes = pad(date.getUTCMinutes())
+ const seconds = pad(date.getUTCSeconds())
+ return `${year}${month}${day}-${hours}${minutes}${seconds}`
+}
+
+/**
+ * Convert a single column value into its SQL literal representation. Handles the
+ * boundary cases the naive implementation skipped: NULL/undefined, booleans and
+ * binary BLOB values, in addition to escaping single quotes inside strings.
+ */
+export function escapeSqlValue(value: unknown): string {
+ if (value === null || value === undefined) {
+ return 'NULL'
+ }
+
+ if (typeof value === 'number' || typeof value === 'bigint') {
+ return String(value)
+ }
+
+ if (typeof value === 'boolean') {
+ return value ? '1' : '0'
+ }
+
+ if (value instanceof ArrayBuffer || ArrayBuffer.isView(value)) {
+ const bytes =
+ value instanceof ArrayBuffer
+ ? new Uint8Array(value)
+ : new Uint8Array(
+ value.buffer,
+ value.byteOffset,
+ value.byteLength
+ )
+ let hex = ''
+ for (const byte of bytes) {
+ hex += byte.toString(16).padStart(2, '0')
+ }
+ return `X'${hex}'`
+ }
+
+ return `'${String(value).replace(/'/g, "''")}'`
+}
+
+/**
+ * Stream the database dump as a sequence of string chunks. The full database is
+ * never materialised in memory at once: table data is paged with LIMIT/OFFSET so
+ * each iteration only holds a single page of rows.
+ */
+export async function* generateDumpChunks(
+ tables: string[],
dataSource: DataSource,
- config: StarbaseDBConfiguration
-): Promise {
- try {
- // Get all table names
- const tablesResult = await executeOperation(
- [{ sql: "SELECT name FROM sqlite_master WHERE type='table';" }],
+ config: StarbaseDBConfiguration,
+ pageSize: number = DUMP_PAGE_SIZE
+): AsyncGenerator {
+ yield SQLITE_HEADER
+
+ for (const table of tables) {
+ // Get table schema
+ const schemaResult = await executeOperation(
+ [
+ {
+ sql: `SELECT sql FROM sqlite_master WHERE type='table' AND name='${table}';`,
+ },
+ ],
dataSource,
config
)
- const tables = tablesResult.map((row: any) => row.name)
- let dumpContent = 'SQLite format 3\0' // SQLite file header
+ if (schemaResult.length) {
+ const schema = schemaResult[0].sql
+ yield `\n-- Table: ${table}\n${schema};\n\n`
+ }
- // Iterate through all tables
- for (const table of tables) {
- // Get table schema
- const schemaResult = await executeOperation(
+ // Page through the table data so a large table never has to be loaded
+ // into memory in its entirety.
+ let offset = 0
+ while (true) {
+ const rows = await executeOperation(
[
{
- sql: `SELECT sql FROM sqlite_master WHERE type='table' AND name='${table}';`,
+ sql: `SELECT * FROM "${table}" LIMIT ${pageSize} OFFSET ${offset};`,
},
],
dataSource,
config
)
- if (schemaResult.length) {
- const schema = schemaResult[0].sql
- dumpContent += `\n-- Table: ${table}\n${schema};\n\n`
+ if (!rows.length) {
+ break
+ }
+
+ let chunk = ''
+ for (const row of rows) {
+ const values = Object.values(row).map(escapeSqlValue)
+ chunk += `INSERT INTO ${table} VALUES (${values.join(', ')});\n`
+ }
+ yield chunk
+
+ // A short page means we have reached the end of the table.
+ if (rows.length < pageSize) {
+ break
+ }
+ offset += pageSize
+ }
+
+ yield '\n'
+ }
+}
+
+/**
+ * Fetch the list of user table names from the database.
+ */
+async function getTableNames(
+ dataSource: DataSource,
+ config: StarbaseDBConfiguration
+): Promise {
+ const tablesResult = await executeOperation(
+ [{ sql: "SELECT name FROM sqlite_master WHERE type='table';" }],
+ dataSource,
+ config
+ )
+
+ return tablesResult.map((row: any) => row.name)
+}
+
+/**
+ * Upload a database dump to R2 using a multipart upload. Encoded chunks are
+ * buffered until they exceed R2's minimum part size and then flushed as a part,
+ * allowing a dump of any size to be written without buffering the whole file.
+ * When a `callbackUrl` is provided it is notified once the upload completes.
+ */
+export async function uploadDumpToR2(
+ bucket: R2Bucket,
+ key: string,
+ tables: string[],
+ dataSource: DataSource,
+ config: StarbaseDBConfiguration,
+ callbackUrl?: string
+): Promise {
+ const encoder = new TextEncoder()
+ const upload = await bucket.createMultipartUpload(key)
+ const parts: R2UploadedPart[] = []
+ let pending: Uint8Array[] = []
+ let pendingBytes = 0
+
+ const flush = async () => {
+ if (pendingBytes === 0) {
+ return
+ }
+
+ const part = new Uint8Array(pendingBytes)
+ let offset = 0
+ for (const buffer of pending) {
+ part.set(buffer, offset)
+ offset += buffer.length
+ }
+
+ const uploaded = await upload.uploadPart(parts.length + 1, part)
+ parts.push(uploaded)
+ pending = []
+ pendingBytes = 0
+ }
+
+ try {
+ for await (const chunk of generateDumpChunks(
+ tables,
+ dataSource,
+ config
+ )) {
+ const encoded = encoder.encode(chunk)
+ pending.push(encoded)
+ pendingBytes += encoded.length
+
+ if (pendingBytes >= R2_MIN_PART_SIZE) {
+ await flush()
+ }
+ }
+
+ // Flush whatever remains as the final part (which may be < 5 MiB).
+ await flush()
+ await upload.complete(parts)
+ } catch (error) {
+ // Abort the multipart upload so we never leave a partial object behind.
+ try {
+ await upload.abort()
+ } catch (abortError) {
+ console.error('Failed to abort R2 multipart upload:', abortError)
+ }
+ throw error
+ }
+
+ if (callbackUrl) {
+ try {
+ await fetch(callbackUrl, {
+ method: 'POST',
+ headers: { 'Content-Type': 'application/json' },
+ body: JSON.stringify({ key, status: 'completed' }),
+ })
+ } catch (error) {
+ console.error('Failed to notify dump callback URL:', error)
+ }
+ }
+}
+
+export async function dumpDatabaseRoute(
+ dataSource: DataSource,
+ config: StarbaseDBConfiguration,
+ request?: Request
+): Promise {
+ try {
+ const url = request ? new URL(request.url) : undefined
+ const wantsR2 =
+ url?.searchParams.get('location') === 'r2' ||
+ url?.searchParams.get('r2') === 'true'
+ const callbackUrl = url?.searchParams.get('callback') ?? undefined
+
+ // Resolve the table list up-front so connection/auth errors surface as a
+ // normal error response before we start streaming a body.
+ const tables = await getTableNames(dataSource, config)
+
+ // When the caller asks to offload to R2 we stream the dump into an R2
+ // object instead of the response body. This removes the 30 second
+ // request limit from the equation for very large databases: the upload
+ // continues in the background and the optional callback URL notifies the
+ // caller once the file is ready.
+ if (wantsR2) {
+ const bucket = dataSource.dumpBucket
+ if (!bucket) {
+ return createResponse(
+ undefined,
+ 'R2 bucket binding (DATABASE_DUMP_BUCKET) is required to offload a dump.',
+ 400
+ )
}
- // Get table data
- const dataResult = await executeOperation(
- [{ sql: `SELECT * FROM ${table};` }],
+ const key = `dump_${formatDumpTimestamp(new Date())}.sql`
+ const work = uploadDumpToR2(
+ bucket,
+ key,
+ tables,
dataSource,
- config
+ config,
+ callbackUrl
)
- for (const row of dataResult) {
- const values = Object.values(row).map((value) =>
- typeof value === 'string'
- ? `'${value.replace(/'/g, "''")}'`
- : value
+ const ctx = dataSource.executionContext
+ if (ctx?.waitUntil) {
+ ctx.waitUntil(work)
+ return createResponse(
+ { key, status: 'accepted' },
+ undefined,
+ 202
)
- dumpContent += `INSERT INTO ${table} VALUES (${values.join(', ')});\n`
}
- dumpContent += '\n'
+ await work
+ return createResponse({ key, status: 'completed' }, undefined, 200)
}
- // Create a Blob from the dump content
- const blob = new Blob([dumpContent], { type: 'application/x-sqlite3' })
+ // Default behaviour: stream the dump straight back to the caller. The
+ // response is produced incrementally so the database is never buffered
+ // in memory in its entirety.
+ const encoder = new TextEncoder()
+ const iterator = generateDumpChunks(tables, dataSource, config)
+ const stream = new ReadableStream({
+ async pull(controller) {
+ try {
+ const { value, done } = await iterator.next()
+ if (done) {
+ controller.close()
+ } else {
+ controller.enqueue(encoder.encode(value))
+ }
+ } catch (error) {
+ controller.error(error)
+ }
+ },
+ })
const headers = new Headers({
'Content-Type': 'application/x-sqlite3',
'Content-Disposition': 'attachment; filename="database_dump.sql"',
})
- return new Response(blob, { headers })
+ return new Response(stream, { headers })
} catch (error: any) {
console.error('Database Dump Error:', error)
return createResponse(undefined, 'Failed to create database dump', 500)
diff --git a/src/handler.ts b/src/handler.ts
index 3fa0085..ec0590a 100644
--- a/src/handler.ts
+++ b/src/handler.ts
@@ -120,8 +120,12 @@ export class StarbaseDB {
}
if (this.getFeature('export')) {
- this.app.get('/export/dump', this.isInternalSource, async () => {
- return dumpDatabaseRoute(this.dataSource, this.config)
+ this.app.get('/export/dump', this.isInternalSource, async (c) => {
+ return dumpDatabaseRoute(
+ this.dataSource,
+ this.config,
+ c.req.raw
+ )
})
this.app.get(
diff --git a/src/index.ts b/src/index.ts
index 4d08932..b8a39e7 100644
--- a/src/index.ts
+++ b/src/index.ts
@@ -56,6 +56,9 @@ export interface Env {
HYPERDRIVE: Hyperdrive
+ // Optional R2 bucket binding used to offload large database dumps.
+ DATABASE_DUMP_BUCKET?: R2Bucket
+
// ## DO NOT REMOVE: TEMPLATE INTERFACE ##
}
@@ -184,6 +187,10 @@ export default {
}
}
+ if (env.DATABASE_DUMP_BUCKET) {
+ dataSource.dumpBucket = env.DATABASE_DUMP_BUCKET
+ }
+
const config: StarbaseDBConfiguration = {
outerbaseApiKey: env.OUTERBASE_API_KEY,
role,
diff --git a/src/types.ts b/src/types.ts
index b0a59f4..f6afb3d 100644
--- a/src/types.ts
+++ b/src/types.ts
@@ -64,6 +64,8 @@ export type DataSource = {
cacheTTL?: number
registry?: StarbasePluginRegistry
executionContext?: ExecutionContext
+ // Optional R2 bucket used to offload large database dumps.
+ dumpBucket?: R2Bucket
}
export enum RegionLocationHint {
diff --git a/wrangler.toml b/wrangler.toml
index 395c4ac..33ca02e 100644
--- a/wrangler.toml
+++ b/wrangler.toml
@@ -78,3 +78,13 @@ AUTH_JWKS_ENDPOINT = ""
# [[hyperdrive]]
# binding = "HYPERDRIVE"
# id = ""
+
+# Optional R2 bucket used to offload large database dumps. When bound, calling
+# `/export/dump?location=r2` streams the dump into an R2 object named
+# `dump_YYYYMMDD-HHMMSS.sql` instead of returning it in the response, which
+# removes the 30 second request limit for very large databases. Provide an
+# optional `&callback=` to be notified when the upload completes.
+# Docs: https://developers.cloudflare.com/r2/buckets/
+# [[r2_buckets]]
+# binding = "DATABASE_DUMP_BUCKET"
+# bucket_name = "starbasedb-dumps"