Skip to content

Commit 6993ae9

Browse files
fix(tables): stream large CSV imports from storage instead of buffering the whole file
1 parent db9cdc8 commit 6993ae9

4 files changed

Lines changed: 122 additions & 27 deletions

File tree

apps/sim/lib/table/import-runner.ts

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Readable } from 'node:stream'
1+
import { Transform } from 'node:stream'
22
import { createLogger } from '@sim/logger'
33
import { getErrorMessage } from '@sim/utils/errors'
44
import { generateId } from '@sim/utils/id'
@@ -26,7 +26,7 @@ import {
2626
setTableSchemaForImport,
2727
updateImportProgress,
2828
} from '@/lib/table/service'
29-
import { downloadFile } from '@/lib/uploads/core/storage-service'
29+
import { downloadFileStream, headObject } from '@/lib/uploads/core/storage-service'
3030
import { normalizeColumn } from '@/app/api/table/utils'
3131

3232
const logger = createLogger('TableImportRunner')
@@ -70,38 +70,31 @@ export async function runTableImport(payload: TableImportPayload): Promise<void>
7070
if (!loaded) throw new Error(`Import target table ${tableId} not found`)
7171
const table = loaded
7272

73-
const buffer = await downloadFile({ key: fileKey, context: 'workspace' })
73+
// Total byte size for the progress estimate — a cheap HEAD, no download. May be null on
74+
// the local dev provider, in which case the bar stays indeterminate (rows still show).
75+
const totalBytes = (await headObject(fileKey, 'workspace'))?.size ?? 0
7476

75-
// Delete only after the download succeeds — otherwise a failed download would wipe the
76-
// table with nothing to replace it with.
77-
if (mode === 'replace') await deleteAllTableRows(tableId)
77+
// Stream the file rather than buffering it — a ~1M-row import must never be held in memory.
78+
const source = await downloadFileStream({ key: fileKey, context: 'workspace' })
7879

79-
// Estimate total data rows by counting line breaks (minus the header) for a
80-
// determinate progress bar. It's an estimate — quoted newlines and blank lines
81-
// make it imprecise — so the client caps the bar below 100% until the terminal
82-
// `ready` event lands. Cheap: one O(bytes) pass over the already-buffered file.
83-
let newlineCount = 0
84-
for (let i = 0; i < buffer.length; i++) {
85-
if (buffer[i] === 0x0a) newlineCount++
86-
}
87-
const estimatedTotal = Math.max(0, newlineCount - 1)
80+
// Delete only after the stream opens (a missing object rejects above) — otherwise a failed
81+
// download would wipe the table with nothing to replace it with.
82+
if (mode === 'replace') await deleteAllTableRows(tableId)
8883

89-
// Publish the estimated total up front so the client shows a determinate bar at 0%
90-
// immediately, instead of "0 rows and counting" until the first batch lands.
91-
void appendTableEvent({
92-
kind: 'import',
93-
tableId,
94-
importId,
95-
status: 'importing',
96-
progress: 0,
97-
total: estimatedTotal,
84+
// Count bytes as they flow so the row total can be extrapolated from byte progress.
85+
let bytesRead = 0
86+
const byteCounter = new Transform({
87+
transform(chunk: Buffer, _enc, cb) {
88+
bytesRead += chunk.length
89+
cb(null, chunk)
90+
},
9891
})
9992

10093
const parser = createCsvParser(delimiter)
10194
// `.pipe` doesn't forward source errors; forward so the iterator throws.
102-
const source = Readable.from(buffer)
10395
source.on('error', (err) => parser.destroy(err))
104-
source.pipe(parser)
96+
byteCounter.on('error', (err) => parser.destroy(err))
97+
source.pipe(byteCounter).pipe(parser)
10598

10699
let schema: TableSchema | null = null
107100
let headerToColumn: Map<string, string> | null = null
@@ -173,9 +166,19 @@ export async function runTableImport(payload: TableImportPayload): Promise<void>
173166
{ ...table, schema },
174167
requestId
175168
)
176-
if (inserted - lastReported >= PROGRESS_INTERVAL_ROWS) {
169+
// Emit after the first batch lands, then every interval, so the bar appears early.
170+
if (
171+
inserted - lastReported >= PROGRESS_INTERVAL_ROWS ||
172+
(lastReported === 0 && inserted > 0)
173+
) {
177174
lastReported = inserted
178175
await updateImportProgress(tableId, inserted)
176+
// Extrapolate the total from rows-per-byte observed so far; self-refines as it runs.
177+
// `Math.max(inserted, …)` keeps it monotonic; omit when the byte size is unknown.
178+
const estimatedTotal =
179+
totalBytes > 0 && bytesRead > 0
180+
? Math.max(inserted, Math.round((inserted / bytesRead) * totalBytes))
181+
: undefined
179182
void appendTableEvent({
180183
kind: 'import',
181184
tableId,

apps/sim/lib/uploads/core/storage-service.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import type { Readable } from 'node:stream'
12
import { randomBytes } from 'crypto'
23
import { createLogger } from '@sim/logger'
34
import { getErrorMessage } from '@sim/utils/errors'
@@ -222,6 +223,34 @@ export async function downloadFile(options: DownloadFileOptions): Promise<Buffer
222223
return readFile(filePath)
223224
}
224225

226+
/**
227+
* Stream a file out of the configured storage provider without buffering it in memory.
228+
* The caller MUST fully consume or `destroy()` the returned stream. Used by the large-CSV
229+
* import worker so a multi-hundred-MB file is never held resident.
230+
*/
231+
export async function downloadFileStream(options: {
232+
key: string
233+
context: StorageContext
234+
}): Promise<Readable> {
235+
const { key, context } = options
236+
const config = getStorageConfig(context)
237+
238+
if (USE_BLOB_STORAGE) {
239+
const { downloadFromBlobStream } = await import('@/lib/uploads/providers/blob/client')
240+
return downloadFromBlobStream(key, createBlobConfig(config))
241+
}
242+
243+
if (USE_S3_STORAGE) {
244+
const { downloadFromS3Stream } = await import('@/lib/uploads/providers/s3/client')
245+
return downloadFromS3Stream(key, createS3Config(config))
246+
}
247+
248+
const { createReadStream } = await import('fs')
249+
const { join } = await import('path')
250+
const { UPLOAD_DIR_SERVER } = await import('./setup.server')
251+
return createReadStream(join(UPLOAD_DIR_SERVER, sanitizeFileKey(key)))
252+
}
253+
225254
/**
226255
* Delete a file from the configured storage provider
227256
*/

apps/sim/lib/uploads/providers/blob/client.ts

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import type { Readable } from 'node:stream'
12
import type { BlobServiceClient as BlobServiceClientType } from '@azure/storage-blob'
23
import { createLogger } from '@sim/logger'
34
import { generateId } from '@sim/utils/id'
@@ -341,6 +342,49 @@ export async function downloadFromBlob(
341342
return downloaded
342343
}
343344

345+
/**
346+
* Stream a blob out of storage without buffering it. The caller MUST fully consume or
347+
* `destroy()` the returned stream. Used by the large-CSV import worker.
348+
*/
349+
export async function downloadFromBlobStream(
350+
key: string,
351+
customConfig?: BlobConfig
352+
): Promise<Readable> {
353+
const { BlobServiceClient, StorageSharedKeyCredential } = await import('@azure/storage-blob')
354+
let blobServiceClient: BlobServiceClientType
355+
let containerName: string
356+
357+
if (customConfig) {
358+
if (customConfig.connectionString) {
359+
blobServiceClient = BlobServiceClient.fromConnectionString(customConfig.connectionString)
360+
} else if (customConfig.accountName && customConfig.accountKey) {
361+
const credential = new StorageSharedKeyCredential(
362+
customConfig.accountName,
363+
customConfig.accountKey
364+
)
365+
blobServiceClient = new BlobServiceClient(
366+
`https://${customConfig.accountName}.blob.core.windows.net`,
367+
credential
368+
)
369+
} else {
370+
throw new Error('Invalid custom blob configuration')
371+
}
372+
containerName = customConfig.containerName
373+
} else {
374+
blobServiceClient = await getBlobServiceClient()
375+
containerName = BLOB_CONFIG.containerName
376+
}
377+
378+
const containerClient = blobServiceClient.getContainerClient(containerName)
379+
const blockBlobClient = containerClient.getBlockBlobClient(key)
380+
381+
const downloadBlockBlobResponse = await blockBlobClient.download()
382+
if (!downloadBlockBlobResponse.readableStreamBody) {
383+
throw new Error('Failed to get readable stream from blob download')
384+
}
385+
return downloadBlockBlobResponse.readableStreamBody as Readable
386+
}
387+
344388
/**
345389
* Check whether a blob exists (and return its size when it does).
346390
* Returns null when the blob is missing.

apps/sim/lib/uploads/providers/s3/client.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import type { Readable } from 'node:stream'
12
import {
23
AbortMultipartUploadCommand,
34
CompleteMultipartUploadCommand,
@@ -221,6 +222,24 @@ export async function downloadFromS3(
221222
})
222223
}
223224

225+
/**
226+
* Stream an object out of S3 without buffering it. The caller MUST fully consume or
227+
* `destroy()` the returned stream. Used by the large-CSV import worker so a 1M-row file is
228+
* never resident in memory.
229+
*/
230+
export async function downloadFromS3Stream(
231+
key: string,
232+
customConfig?: S3Config
233+
): Promise<Readable> {
234+
const config = customConfig || { bucket: S3_CONFIG.bucket, region: S3_CONFIG.region }
235+
const command = new GetObjectCommand({ Bucket: config.bucket, Key: key })
236+
const response = await getS3Client().send(command)
237+
if (!response.Body) {
238+
throw new Error(`S3 object has no body: ${key}`)
239+
}
240+
return response.Body as Readable
241+
}
242+
224243
/**
225244
* Check whether an object exists in S3 (and return its size when it does).
226245
* Returns null when the object is missing.

0 commit comments

Comments
 (0)