From 4a2949d4002177de2c0c9833198a9980eb83445f Mon Sep 17 00:00:00 2001 From: Roz Date: Sat, 6 Jun 2026 02:46:13 +0200 Subject: [PATCH 1/5] feat: resume interrupted peer downloads via HTTP Range downloadFile now detects an existing .part file and sends Range: bytes=-, validating the peer's 206 + Content-Range against the persisted expected size before appending. On 200 (range ignored), a Content-Range mismatch, or 416 it discards the stale .part and restarts from byte 0, emitting a restart progress event. The write path uses a node:fs FileHandle (append/write) with datasync at checkpoints, and the .part is preserved on error so the next attempt can resume. A truncated stream throws a retryable IncompleteDownloadError. Refs #13. --- .../src/__tests__/peer-download.test.ts | 169 +++++++++++++++++- .../src/lib/errors/IncompleteDownloadError.ts | 11 ++ apps/backend/src/lib/servers/peer.ts | 157 +++++++++++----- .../modules/downloads/downloads.service.ts | 6 + 4 files changed, 301 insertions(+), 42 deletions(-) create mode 100644 apps/backend/src/lib/errors/IncompleteDownloadError.ts diff --git a/apps/backend/src/__tests__/peer-download.test.ts b/apps/backend/src/__tests__/peer-download.test.ts index 015d15e..9ac1261 100644 --- a/apps/backend/src/__tests__/peer-download.test.ts +++ b/apps/backend/src/__tests__/peer-download.test.ts @@ -1,4 +1,5 @@ -import { mkdtemp, rm } from 'node:fs/promises' +import type { PeerDownloadProgressEvent } from '../lib/servers/peer' +import { mkdtemp, rm, writeFile } from 'node:fs/promises' import { tmpdir } from 'node:os' import { join } from 'node:path' import { afterAll, afterEach, beforeAll, describe, expect, test } from 'bun:test' @@ -207,3 +208,169 @@ describe('PeerConnector.downloadFile', () => { } }) }) + +describe('PeerConnector.downloadFile resume', () => { + test('resumes from an existing .part via a Range request and appends', async () => { + const seen: { range: string | null } = { range: null } + server.use( + http.get(`${PEER_JACK_URL}/peer/items/:itemId/file`, ({ request }) => { + seen.range = request.headers.get('Range') + return new Response(streamOf([2, 3, 4]), { + status: 206, + headers: { 'Content-Length': '3', 'Content-Range': 'bytes 2-4/5' }, + }) + }), + ) + const peer = markInitialized(new PeerConnector({ url: PEER_JACK_URL, apiKey: 'peer-api-key', name: 'Friend Jack' })) + const dir = await mkdtemp(join(tmpdir(), 'jack-resume-')) + const destPath = join(dir, 'Movie.mkv') + const partPath = `${destPath}.part` + await writeFile(partPath, new Uint8Array([0, 1])) + const events: PeerDownloadProgressEvent[] = [] + + try { + await peer.downloadFile('remote1:movie:99', destPath, { + partPath, + releaseSize: 5, + onProgress: (e) => { events.push(e) }, + }) + + expect(seen.range).toBe('bytes=2-') + expect(new Uint8Array(await Bun.file(destPath).arrayBuffer())).toEqual(new Uint8Array([0, 1, 2, 3, 4])) + expect(events.some(e => e.type === 'restart')).toBe(false) + expect(events).toContainEqual({ type: 'completed', downloadedBytes: 5, expectedBytes: 5 }) + } + finally { + await rm(dir, { recursive: true, force: true }) + } + }) + + test('restarts from byte 0 when the peer ignores Range and returns 200', async () => { + server.use( + http.get(`${PEER_JACK_URL}/peer/items/:itemId/file`, () => + new Response(streamOf([0, 1, 2, 3, 4]), { headers: { 'Content-Length': '5' } })), + ) + const peer = markInitialized(new PeerConnector({ url: PEER_JACK_URL, apiKey: 'peer-api-key', name: 'Friend Jack' })) + const dir = await mkdtemp(join(tmpdir(), 'jack-resume-ignored-')) + const destPath = join(dir, 'Movie.mkv') + const partPath = `${destPath}.part` + await writeFile(partPath, new Uint8Array([9, 9])) + const events: PeerDownloadProgressEvent[] = [] + + try { + await peer.downloadFile('remote1:movie:99', destPath, { + partPath, + releaseSize: 5, + onProgress: (e) => { events.push(e) }, + }) + + expect(new Uint8Array(await Bun.file(destPath).arrayBuffer())).toEqual(new Uint8Array([0, 1, 2, 3, 4])) + expect(events.some(e => e.type === 'restart' && e.reason === 'range_ignored')).toBe(true) + } + finally { + await rm(dir, { recursive: true, force: true }) + } + }) + + test('restarts when the 206 Content-Range total does not match releaseSize', async () => { + server.use( + http.get(`${PEER_JACK_URL}/peer/items/:itemId/file`, ({ request }) => { + if (request.headers.get('Range')) { + return new Response(streamOf([2, 3]), { status: 206, headers: { 'Content-Length': '2', 'Content-Range': 'bytes 2-3/4' } }) + } + return new Response(streamOf([0, 1, 2, 3, 4]), { headers: { 'Content-Length': '5' } }) + }), + ) + const peer = markInitialized(new PeerConnector({ url: PEER_JACK_URL, apiKey: 'peer-api-key', name: 'Friend Jack' })) + const dir = await mkdtemp(join(tmpdir(), 'jack-resume-mismatch-')) + const destPath = join(dir, 'Movie.mkv') + const partPath = `${destPath}.part` + await writeFile(partPath, new Uint8Array([0, 1])) + const events: PeerDownloadProgressEvent[] = [] + + try { + await peer.downloadFile('remote1:movie:99', destPath, { + partPath, + releaseSize: 5, + onProgress: (e) => { events.push(e) }, + }) + + expect(new Uint8Array(await Bun.file(destPath).arrayBuffer())).toEqual(new Uint8Array([0, 1, 2, 3, 4])) + expect(events.some(e => e.type === 'restart' && e.reason === 'content_range_mismatch')).toBe(true) + } + finally { + await rm(dir, { recursive: true, force: true }) + } + }) + + test('restarts when the peer returns 416 for the resume range', async () => { + server.use( + http.get(`${PEER_JACK_URL}/peer/items/:itemId/file`, ({ request }) => { + if (request.headers.get('Range')) + return new Response(null, { status: 416, headers: { 'Content-Range': 'bytes */5' } }) + return new Response(streamOf([0, 1, 2, 3, 4]), { headers: { 'Content-Length': '5' } }) + }), + ) + const peer = markInitialized(new PeerConnector({ url: PEER_JACK_URL, apiKey: 'peer-api-key', name: 'Friend Jack' })) + const dir = await mkdtemp(join(tmpdir(), 'jack-resume-416-')) + const destPath = join(dir, 'Movie.mkv') + const partPath = `${destPath}.part` + await writeFile(partPath, new Uint8Array([0, 1, 2, 3, 4, 5])) + const events: PeerDownloadProgressEvent[] = [] + + try { + await peer.downloadFile('remote1:movie:99', destPath, { + partPath, + releaseSize: 5, + onProgress: (e) => { events.push(e) }, + }) + + expect(new Uint8Array(await Bun.file(destPath).arrayBuffer())).toEqual(new Uint8Array([0, 1, 2, 3, 4])) + expect(events.some(e => e.type === 'restart' && e.reason === 'range_not_satisfiable')).toBe(true) + } + finally { + await rm(dir, { recursive: true, force: true }) + } + }) + + test('preserves the .part file when a download fails mid-stream', async () => { + server.use( + http.get(`${PEER_JACK_URL}/peer/items/:itemId/file`, () => + // Declares 5 bytes but only delivers 3 → "Incomplete file download". + new Response(streamOf([0, 1, 2]), { headers: { 'Content-Length': '5' } })), + ) + const peer = markInitialized(new PeerConnector({ url: PEER_JACK_URL, apiKey: 'peer-api-key', name: 'Friend Jack' })) + const dir = await mkdtemp(join(tmpdir(), 'jack-resume-preserve-')) + const destPath = join(dir, 'Movie.mkv') + const partPath = `${destPath}.part` + + try { + await expect(peer.downloadFile('remote1:movie:99', destPath, { partPath, releaseSize: 5 })).rejects.toThrow('Incomplete') + expect(await Bun.file(partPath).exists()).toBe(true) + expect(Bun.file(partPath).size).toBe(3) + expect(await Bun.file(destPath).exists()).toBe(false) + } + finally { + await rm(dir, { recursive: true, force: true }) + } + }) + + test('rejects a 206 returned for a non-range (fresh) request', async () => { + server.use( + http.get(`${PEER_JACK_URL}/peer/items/:itemId/file`, () => + // No .part exists, so no Range is sent — a 206 here is untrustworthy. + new Response(streamOf([2, 3, 4]), { status: 206, headers: { 'Content-Length': '3', 'Content-Range': 'bytes 2-4/5' } })), + ) + const peer = markInitialized(new PeerConnector({ url: PEER_JACK_URL, apiKey: 'peer-api-key', name: 'Friend Jack' })) + const dir = await mkdtemp(join(tmpdir(), 'jack-resume-206-fresh-')) + const destPath = join(dir, 'Movie.mkv') + + try { + await expect(peer.downloadFile('remote1:movie:99', destPath, { partPath: `${destPath}.part`, releaseSize: 5 })).rejects.toThrow('206') + expect(await Bun.file(destPath).exists()).toBe(false) + } + finally { + await rm(dir, { recursive: true, force: true }) + } + }) +}) diff --git a/apps/backend/src/lib/errors/IncompleteDownloadError.ts b/apps/backend/src/lib/errors/IncompleteDownloadError.ts new file mode 100644 index 0000000..fbba323 --- /dev/null +++ b/apps/backend/src/lib/errors/IncompleteDownloadError.ts @@ -0,0 +1,11 @@ +import { AppError } from './AppError' + +/** + * The peer's stream ended before the expected number of bytes arrived. The + * `.part` file is preserved, so this is retryable: the next attempt resumes. + */ +export class IncompleteDownloadError extends AppError { + constructor(message: string, cause?: unknown) { + super(message, 'INCOMPLETE_DOWNLOAD', { cause }) + } +} diff --git a/apps/backend/src/lib/servers/peer.ts b/apps/backend/src/lib/servers/peer.ts index 0099494..592fda3 100644 --- a/apps/backend/src/lib/servers/peer.ts +++ b/apps/backend/src/lib/servers/peer.ts @@ -1,9 +1,10 @@ import type { ConnectorHeadersConfig } from '../config' -import { rename, unlink } from 'node:fs/promises' +import { open, rename, unlink } from 'node:fs/promises' import z from 'zod' import { logger } from '../../logger' import { requireInitialization } from '../decorators/require-initialization' import { FetchError } from '../errors/FetchError' +import { IncompleteDownloadError } from '../errors/IncompleteDownloadError' import { normalizeImdbId, Release } from '../release' import { withSpan } from '../tracing' import { ServerConnector } from './base' @@ -12,10 +13,12 @@ const PeerSearchResponse = z.object({ items: z.array(Release) }) const MAX_DOWNLOAD_BYTES = 100 * 1024 * 1024 * 1024 // 100GB const DOWNLOAD_PROGRESS_INTERVAL_MS = 10_000 const DOWNLOAD_PROGRESS_BYTES = 64 * 1024 * 1024 +const CONTENT_RANGE_PATTERN = /^bytes (\d+)-(\d+)\/(\d+)$/ export type PeerDownloadProgressEvent = | { type: 'headers', expectedBytes: number | null, expectedBytesSource: 'content_length' | null, expectedBytesMismatch: boolean } | { type: 'progress', downloadedBytes: number, expectedBytes: number | null } + | { type: 'restart', reason: 'range_ignored' | 'content_range_mismatch' | 'range_not_satisfiable', discardedBytes: number } | { type: 'completed', downloadedBytes: number, expectedBytes: number | null } export interface PeerDownloadOptions { @@ -38,6 +41,20 @@ function parseContentLength(headers: Headers): number | null { return parsed } +function parseContentRange(value: string | null): { start: number, end: number, total: number } | null { + if (!value) + return null + const match = CONTENT_RANGE_PATTERN.exec(value.trim()) + if (!match) + return null + const start = Number(match[1]) + const end = Number(match[2]) + const total = Number(match[3]) + if (![start, end, total].every(Number.isSafeInteger)) + return null + return { start, end, total } +} + /** * A connector to another jack instance (a "peer"). Sources only: we fan out * searches to it over the /peer API and stream files it serves. It speaks in @@ -153,41 +170,100 @@ export class PeerConnector extends ServerConnector { const torrentFilename = options.torrentFilename const url = new URL(`/peer/items/${encodeURIComponent(id)}/file`, this.url) const partPath = options.partPath ?? `${destPath}.part` - span.setAttributes({ - 'http.request.timeout_ms': timeoutMs, - 'url.path': url.pathname, - }) + const baseHeaders = { ...this.headers, 'X-Api-Key': this.apiKey } + span.setAttributes({ 'http.request.timeout_ms': timeoutMs, 'url.path': url.pathname }) - const response = await fetch(url, { - headers: { ...this.headers, 'X-Api-Key': this.apiKey }, + const partFile = Bun.file(partPath) + let existingBytes = await partFile.exists() ? partFile.size : 0 + + const doFetch = (withRange: boolean) => fetch(url, { + headers: withRange ? { ...baseHeaders, Range: `bytes=${existingBytes}-` } : baseHeaders, signal: AbortSignal.timeout(timeoutMs), }) + const emitRestart = async (reason: 'range_ignored' | 'content_range_mismatch' | 'range_not_satisfiable', discardedBytes: number) => { + logger.warn({ id, torrentFilename, partPath, discardedBytes, reason, peer: this.name }, 'Resume validation failed; restarting download from byte 0') + try { + await options.onProgress?.({ type: 'restart', reason, discardedBytes }) + } + catch (err) { + const message = err instanceof Error ? err.message : String(err) + logger.error({ id, torrentFilename, reason, error: message }, 'Restart progress callback failed') + } + } + + const restartFresh = async (response: Response, reason: 'content_range_mismatch' | 'range_not_satisfiable', discardedBytes: number): Promise => { + // Discard the unwanted partial response. Don't await the cancel: under + // some fetch/stream implementations a closed body's cancel() never + // settles, which would stall the restart. + void response.body?.cancel().catch(() => {}) + await unlink(partPath).catch(() => {}) + existingBytes = 0 + await emitRestart(reason, discardedBytes) + return doFetch(false) + } + + let response = await doFetch(existingBytes > 0) span.setAttribute('http.response.status_code', response.status) - if (!response.ok) { - throw new FetchError(`Failed to download file from peer: ${response.statusText}`, response) + if (existingBytes > 0) { + if (response.status === 206) { + const cr = parseContentRange(response.headers.get('Content-Range')) + const valid = cr != null && cr.start === existingBytes + && (options.releaseSize == null || cr.total === options.releaseSize) + if (!valid) { + response = await restartFresh(response, 'content_range_mismatch', existingBytes) + span.setAttribute('http.response.status_code', response.status) + } + } + else if (response.status === 416) { + response = await restartFresh(response, 'range_not_satisfiable', existingBytes) + span.setAttribute('http.response.status_code', response.status) + } + else if (response.ok) { + // Peer ignored the Range header and is streaming the whole file from + // byte 0. Discard the stale .part and use this response as-is. + const discarded = existingBytes + await unlink(partPath).catch(() => {}) + existingBytes = 0 + await emitRestart('range_ignored', discarded) + } } - if (!response.body) { - throw new Error('Peer returned a file response without a body') + const resuming = existingBytes > 0 + + if (resuming) { + // We only reach here with a 206 that was already validated above. + } + else { + // Fresh download: require a clean 200. A 206 here is untrustworthy — we + // did not send a satisfiable Range, so a partial body must not be + // treated as the whole file (it would rename a truncated file into place). + if (!response.ok) + throw new FetchError(`Failed to download file from peer: ${response.statusText}`, response) + if (response.status === 206) + throw new Error('Peer returned 206 Partial Content for a non-range request') } - const expectedBytes = parseContentLength(response.headers) + if (!response.body) + throw new Error('Peer returned a file response without a body') + + // Total file size: from Content-Range on a resume (206), else Content-Length. + const expectedBytes = resuming + ? parseContentRange(response.headers.get('Content-Range'))?.total ?? null + : parseContentLength(response.headers) const expectedBytesMismatch = expectedBytes != null && options.releaseSize != null && expectedBytes !== options.releaseSize if (expectedBytes != null) span.setAttribute('download.expected_bytes', expectedBytes) - span.setAttribute('download.expected_bytes_source', expectedBytes == null ? 'unknown' : 'content_length') - span.setAttribute('download.expected_bytes_mismatch', expectedBytesMismatch) + span.setAttributes({ + 'download.resuming': resuming, + 'download.resume_from_bytes': existingBytes, + 'download.expected_bytes_source': expectedBytes == null ? 'unknown' : 'content_length', + 'download.expected_bytes_mismatch': expectedBytesMismatch, + }) if (expectedBytesMismatch) { - logger.warn({ - id, - torrentFilename, - releaseSize: options.releaseSize, - expectedBytes, - peer: this.name, - }, 'Peer file Content-Length differs from release metadata size') + logger.warn({ id, torrentFilename, releaseSize: options.releaseSize, expectedBytes, peer: this.name }, 'Peer file total size differs from release metadata size') } await options.onProgress?.({ @@ -201,17 +277,17 @@ export class PeerConnector extends ServerConnector { throw new Error(`File too large: ${expectedBytes} bytes exceeds ${MAX_DOWNLOAD_BYTES} byte limit`) const reader = response.body.getReader() - const writer = Bun.file(partPath).writer() - let downloadedBytes = 0 + const handle = await open(partPath, resuming ? 'a' : 'w') + let downloadedBytes = existingBytes let lastLoggedAt = Date.now() - let lastLoggedBytes = 0 - let writerEnded = false + let lastLoggedBytes = downloadedBytes + let handleClosed = false - const endWriter = () => { - if (writerEnded) + const closeHandle = async () => { + if (handleClosed) return - writer.end() - writerEnded = true + handleClosed = true + await handle.close().catch(() => {}) } try { @@ -223,16 +299,15 @@ export class PeerConnector extends ServerConnector { continue downloadedBytes += value.byteLength - if (downloadedBytes > MAX_DOWNLOAD_BYTES) { + if (downloadedBytes > MAX_DOWNLOAD_BYTES) throw new Error(`File too large: downloaded ${downloadedBytes} bytes exceeds ${MAX_DOWNLOAD_BYTES} byte limit`) - } - writer.write(value) + await handle.write(value) const now = Date.now() const shouldLogProgress = downloadedBytes - lastLoggedBytes >= DOWNLOAD_PROGRESS_BYTES || now - lastLoggedAt >= DOWNLOAD_PROGRESS_INTERVAL_MS - if (lastLoggedBytes === 0 || shouldLogProgress) { - await writer.flush() + if (lastLoggedBytes === existingBytes || shouldLogProgress) { + await handle.datasync().catch(() => {}) logger.debug({ id, torrentFilename, destPath, partPath, downloadedBytes, expectedBytes, peer: this.name }, 'Download progress from peer') await options.onProgress?.({ type: 'progress', downloadedBytes, expectedBytes }) lastLoggedAt = now @@ -240,11 +315,12 @@ export class PeerConnector extends ServerConnector { } } - endWriter() + await handle.datasync().catch(() => {}) + await closeHandle() reader.releaseLock() if (expectedBytes != null && downloadedBytes !== expectedBytes) - throw new Error(`Incomplete file download: got ${downloadedBytes} bytes, expected ${expectedBytes}`) + throw new IncompleteDownloadError(`Incomplete file download: got ${downloadedBytes} bytes, expected ${expectedBytes}`) await rename(partPath, destPath) span.setAttribute('download.downloaded_bytes', downloadedBytes) @@ -257,15 +333,14 @@ export class PeerConnector extends ServerConnector { } } catch (err) { - try { - endWriter() - } - catch {} + await closeHandle() try { reader.releaseLock() } catch {} - await unlink(partPath).catch(() => {}) + // Leave the .part file in place so the next attempt can resume from the + // durable bytes written so far. The atomic rename above means an + // incomplete download never reaches `destPath`. throw err } }) diff --git a/apps/backend/src/modules/downloads/downloads.service.ts b/apps/backend/src/modules/downloads/downloads.service.ts index e60ae81..1a0932a 100644 --- a/apps/backend/src/modules/downloads/downloads.service.ts +++ b/apps/backend/src/modules/downloads/downloads.service.ts @@ -94,6 +94,12 @@ export class DownloadsService { return } + if (event.type === 'restart') { + // Resume validation failed; the download restarts from byte 0. + // Persisting this is handled in a later phase. + return + } + this.downloadsRepository?.markCompleted(download.id, event.downloadedBytes) } From ce79179ad13541a6c30203483139e56174e1a418 Mon Sep 17 00:00:00 2001 From: Roz <3948961+roziscoding@users.noreply.github.com> Date: Sat, 6 Jun 2026 12:29:10 +0200 Subject: [PATCH 2/5] feat: retry, semaphore, and download concurrency/retry config (#13 3/5) (#18) * feat: add retry, semaphore, and download concurrency/retry config Add a generic retry() helper (bounded attempts, exponential backoff with full jitter, optional Retry-After override, injectable sleep/random) and a download retry classifier (transient: network/timeout/5xx/429/incomplete stream; permanent: non-429 4xx and others). Add a FIFO async Semaphore. Extend DownloadsConfig with maxConcurrentDownloads and retry knobs (all defaulted so existing configs keep parsing). Primitives are wired into DownloadsService in a later change. Refs #13. * feat: track download attempts and expose stale rows for re-drive (#13 4/5) (#19) * feat: track download attempts and expose stale rows for re-drive Add an attempts column to the downloads table (additive migration) and repository methods: incrementAttempts, markResumeReset (reset downloadedBytes and record the resume-from-zero transition), and listStaleDownloads (returns stale downloading rows without mutating them, for active startup re-enqueue). reconcileStaleDownloads is kept as the fallback for when downloads is unconfigured. Refs #13. * feat: bound, retry, and resume peer downloads end-to-end (#13 5/5) (#20) * feat: bound, retry, and resume peer downloads end-to-end Rewire DownloadsService around a shared Semaphore (maxConcurrentDownloads), a retry loop (bounded backoff+jitter, attempts tracked, transient vs permanent classification, Retry-After honored), and resume: the restart progress event persists via markResumeReset, and an active/reenqueued dedupe prevents duplicate rows. On startup, index.ts re-drives stale downloading rows with resumeStaleDownloads() before the watcher scans, falling back to reconcileStaleDownloads() when downloads is unconfigured. Closes #13. * fix: harden startup re-enqueue dedupe (review feedback) - Dedupe stale downloading rows by destPath before re-driving: only one row per destination is resumable (they share the same .part), so mark the superseded duplicates failed instead of letting the second silently early-return in runDownload and stay stuck in downloading. - Release the reenqueued claim on successful resume (stub already unlinked, so no scan race) so a later legitimate re-drop of the same torrent filename is not silently skipped for the rest of the process. Refs #13. * fix: address retry review feedback --- .../drizzle/0001_tearful_the_fallen.sql | 1 + apps/backend/drizzle/meta/0001_snapshot.json | 195 ++++++++++++++ apps/backend/drizzle/meta/_journal.json | 7 + apps/backend/src/__tests__/config.test.ts | 18 ++ apps/backend/src/__tests__/database.test.ts | 61 +++++ .../src/__tests__/downloads-api.test.ts | 6 +- .../src/__tests__/downloads-service.test.ts | 245 ++++++++++++++++-- .../backend/src/__tests__/integration.test.ts | 6 +- apps/backend/src/__tests__/retry.test.ts | 181 +++++++++++++ apps/backend/src/__tests__/semaphore.test.ts | 45 ++++ apps/backend/src/database/schema.ts | 1 + apps/backend/src/index.ts | 18 +- apps/backend/src/lib/config.ts | 7 + apps/backend/src/lib/retry.ts | 43 +++ apps/backend/src/lib/semaphore.ts | 39 +++ .../modules/downloads/downloads.repository.ts | 25 +- .../modules/downloads/downloads.service.ts | 214 +++++++++++---- .../src/modules/downloads/retry-policy.ts | 45 ++++ examples/config.jsonc | 9 +- 19 files changed, 1082 insertions(+), 84 deletions(-) create mode 100644 apps/backend/drizzle/0001_tearful_the_fallen.sql create mode 100644 apps/backend/drizzle/meta/0001_snapshot.json create mode 100644 apps/backend/src/__tests__/retry.test.ts create mode 100644 apps/backend/src/__tests__/semaphore.test.ts create mode 100644 apps/backend/src/lib/retry.ts create mode 100644 apps/backend/src/lib/semaphore.ts create mode 100644 apps/backend/src/modules/downloads/retry-policy.ts diff --git a/apps/backend/drizzle/0001_tearful_the_fallen.sql b/apps/backend/drizzle/0001_tearful_the_fallen.sql new file mode 100644 index 0000000..f59c3a9 --- /dev/null +++ b/apps/backend/drizzle/0001_tearful_the_fallen.sql @@ -0,0 +1 @@ +ALTER TABLE `downloads` ADD `attempts` integer DEFAULT 0 NOT NULL; \ No newline at end of file diff --git a/apps/backend/drizzle/meta/0001_snapshot.json b/apps/backend/drizzle/meta/0001_snapshot.json new file mode 100644 index 0000000..892f474 --- /dev/null +++ b/apps/backend/drizzle/meta/0001_snapshot.json @@ -0,0 +1,195 @@ +{ + "version": "6", + "dialect": "sqlite", + "id": "0e170273-951e-4b46-b8be-8d77d6188596", + "prevId": "7511a5bb-12b1-4836-8099-498542b3d20d", + "tables": { + "downloads": { + "name": "downloads", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "autoincrement": true + }, + "torrent_filename": { + "name": "torrent_filename", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "peer_id": { + "name": "peer_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "peer_name": { + "name": "peer_name", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "item_id": { + "name": "item_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "filename": { + "name": "filename", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "dest_path": { + "name": "dest_path", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "part_path": { + "name": "part_path", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "release_size": { + "name": "release_size", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "release_json": { + "name": "release_json", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "expected_bytes": { + "name": "expected_bytes", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "expected_bytes_source": { + "name": "expected_bytes_source", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "expected_bytes_mismatch": { + "name": "expected_bytes_mismatch", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": false + }, + "downloaded_bytes": { + "name": "downloaded_bytes", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": 0 + }, + "attempts": { + "name": "attempts", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": 0 + }, + "status": { + "name": "status", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "started_at": { + "name": "started_at", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "updated_at": { + "name": "updated_at", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "completed_at": { + "name": "completed_at", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "error": { + "name": "error", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + } + }, + "indexes": { + "downloads_status_idx": { + "name": "downloads_status_idx", + "columns": [ + "status" + ], + "isUnique": false + }, + "downloads_updated_at_idx": { + "name": "downloads_updated_at_idx", + "columns": [ + "updated_at" + ], + "isUnique": false + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": { + "downloads_status_check": { + "name": "downloads_status_check", + "value": "\"downloads\".\"status\" in ('downloading', 'completed', 'failed', 'import_queued')" + }, + "downloads_expected_bytes_source_check": { + "name": "downloads_expected_bytes_source_check", + "value": "\"downloads\".\"expected_bytes_source\" is null or \"downloads\".\"expected_bytes_source\" = 'content_length'" + } + } + } + }, + "views": {}, + "enums": {}, + "_meta": { + "schemas": {}, + "tables": {}, + "columns": {} + }, + "internal": { + "indexes": {} + } +} diff --git a/apps/backend/drizzle/meta/_journal.json b/apps/backend/drizzle/meta/_journal.json index 35efc42..643afd8 100644 --- a/apps/backend/drizzle/meta/_journal.json +++ b/apps/backend/drizzle/meta/_journal.json @@ -8,6 +8,13 @@ "when": 1780589033291, "tag": "0000_mysterious_vin_gonzales", "breakpoints": true + }, + { + "idx": 1, + "version": "6", + "when": 1780707145431, + "tag": "0001_tearful_the_fallen", + "breakpoints": true } ] } diff --git a/apps/backend/src/__tests__/config.test.ts b/apps/backend/src/__tests__/config.test.ts index 6e37718..a8faa35 100644 --- a/apps/backend/src/__tests__/config.test.ts +++ b/apps/backend/src/__tests__/config.test.ts @@ -277,4 +277,22 @@ describe('appConfig parsing', () => { }) expect(result.success).toBe(false) }) + + test('defaults the downloads hardening knobs', () => { + const parsed = AppConfig.parse({ + downloads: { watchPath: '/w', completedPath: '/c' }, + }) + expect(parsed.downloads).toMatchObject({ + maxConcurrentDownloads: 3, + maxDownloadAttempts: 5, + retryBaseDelayMs: 1000, + retryMaxDelayMs: 60_000, + }) + }) + + test('respects an explicit maxConcurrentDownloads and rejects non-positive values', () => { + const parsed = AppConfig.parse({ downloads: { watchPath: '/w', completedPath: '/c', maxConcurrentDownloads: 8 } }) + expect(parsed.downloads?.maxConcurrentDownloads).toBe(8) + expect(AppConfig.safeParse({ downloads: { watchPath: '/w', completedPath: '/c', maxConcurrentDownloads: 0 } }).success).toBe(false) + }) }) diff --git a/apps/backend/src/__tests__/database.test.ts b/apps/backend/src/__tests__/database.test.ts index 82c618a..31058e3 100644 --- a/apps/backend/src/__tests__/database.test.ts +++ b/apps/backend/src/__tests__/database.test.ts @@ -151,4 +151,65 @@ describe('DownloadsRepository', () => { expect(stale.error).toContain('stale') handle.close() }) + + test('increments attempts and records a resume reset', async () => { + const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) + const repository = new DownloadsRepository(handle.db) + const created = repository.create({ + torrentFilename: 'movie.torrent', + peerId: 'peer-1', + peerName: 'Friend Jack', + itemId: 'movie:1', + filename: release.filename, + destPath: join(tempDir, release.filename), + partPath: join(tempDir, `${release.filename}.part`), + releaseSize: release.size, + release, + }) + + expect(repository.incrementAttempts(created.id)).toBe(1) + expect(repository.incrementAttempts(created.id)).toBe(2) + repository.updateProgress(created.id, 40) + repository.markResumeReset(created.id) + + const row = repository.get(created.id)! + expect(row.attempts).toBe(2) + expect(row.downloadedBytes).toBe(0) + expect(row.status).toBe('downloading') + expect(row.error).toContain('resume validation failed') + handle.close() + }) + + test('lists stale downloading rows without mutating them', async () => { + const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) + const repository = new DownloadsRepository(handle.db) + const a = repository.create({ + torrentFilename: 'a.torrent', + peerId: 'peer-1', + peerName: 'Friend Jack', + itemId: 'movie:1', + filename: 'A.mkv', + destPath: join(tempDir, 'A.mkv'), + partPath: join(tempDir, 'A.mkv.part'), + releaseSize: 10, + release, + }) + const b = repository.create({ + torrentFilename: 'b.torrent', + peerId: 'peer-1', + peerName: 'Friend Jack', + itemId: 'movie:2', + filename: 'B.mkv', + destPath: join(tempDir, 'B.mkv'), + partPath: join(tempDir, 'B.mkv.part'), + releaseSize: 10, + release, + }) + repository.markCompleted(b.id, 10) + + const stale = repository.listStaleDownloads() + expect(stale.map(r => r.id)).toEqual([a.id]) + expect(repository.get(a.id)?.status).toBe('downloading') + handle.close() + }) }) diff --git a/apps/backend/src/__tests__/downloads-api.test.ts b/apps/backend/src/__tests__/downloads-api.test.ts index 09d987c..4df87ff 100644 --- a/apps/backend/src/__tests__/downloads-api.test.ts +++ b/apps/backend/src/__tests__/downloads-api.test.ts @@ -1,4 +1,3 @@ -import type { AppConfig } from '../lib/config' import type { Envs } from '../lib/envs' import type { Release } from '../lib/release' import { mkdtemp, rm } from 'node:fs/promises' @@ -7,6 +6,7 @@ import { join } from 'node:path' import { afterEach, beforeEach, describe, expect, test } from 'bun:test' import { getApp } from '../app' import { openDatabase } from '../database/connection' +import { AppConfig } from '../lib/config' import { DownloadsRepository } from '../modules/downloads/downloads.repository' const envs: Envs = { @@ -20,12 +20,12 @@ const envs: Envs = { NODE_ENV: 'test', } -const config: AppConfig = { +const config = AppConfig.parse({ jack: { baseUrl: 'http://localhost:3000', apiKey: 'test-api-key' }, downloads: { watchPath: '/tmp/watch', completedPath: '/tmp/completed' }, servers: [], peers: [], -} +}) const release: Release = { id: 'remote:movie:1', diff --git a/apps/backend/src/__tests__/downloads-service.test.ts b/apps/backend/src/__tests__/downloads-service.test.ts index ff12025..9e333ee 100644 --- a/apps/backend/src/__tests__/downloads-service.test.ts +++ b/apps/backend/src/__tests__/downloads-service.test.ts @@ -2,8 +2,9 @@ import type { Release } from '../lib/release' import { mkdtemp, rm, writeFile } from 'node:fs/promises' import { tmpdir } from 'node:os' import { join } from 'node:path' -import { afterEach, beforeEach, describe, expect, test } from 'bun:test' +import { afterEach, beforeEach, describe, expect, spyOn, test } from 'bun:test' import { openDatabase } from '../database/connection' +import { FetchError } from '../lib/errors/FetchError' import { DownloadsRepository } from '../modules/downloads/downloads.repository' import { DownloadsService } from '../modules/downloads/downloads.service' import { createTorrentStub } from '../modules/torznab/torrent' @@ -31,6 +32,18 @@ afterEach(async () => { await rm(tempDir, { recursive: true, force: true }) }) +function downloadsConfig(overrides: Partial> = {}) { + return { + watchPath, + completedPath, + maxConcurrentDownloads: 2, + maxDownloadAttempts: 3, + retryBaseDelayMs: 0, + retryMaxDelayMs: 0, + ...overrides, + } +} + function fakePeer(overrides: Partial> = {}) { return { id: 'peer-1', @@ -49,9 +62,9 @@ function fakeDestination() { return { isInitialized: true, canDestination: true, name: 'Radarr', triggerImport: async () => {} } } -async function writeTorrent(filename = 'movie.torrent') { +async function writeTorrent(filename = 'movie.torrent', itemId = 'movie:1') { const filePath = join(watchPath, filename) - await writeFile(filePath, createTorrentStub({ name: release.title, size: release.size, peerId: 'peer-1', itemId: 'movie:1' })) + await writeFile(filePath, createTorrentStub({ name: release.title, size: release.size, peerId: 'peer-1', itemId })) return filePath } @@ -68,12 +81,11 @@ describe('DownloadsService download progress persistence', () => { await options.onProgress({ type: 'completed', downloadedBytes: 10, expectedBytes: 10 }) }, }) - const service = new DownloadsService({ completedPath }, [peer as any], [fakeDestination() as any], repository) + const service = new DownloadsService(downloadsConfig(), [peer as any], [fakeDestination() as any], repository) const filePath = await writeTorrent() await service.processTorrentFile(filePath, 'movie.torrent') - // The service forwards dest/part/size/torrentFilename into downloadFile. expect(calls).toHaveLength(1) expect(calls[0].destPath).toBe(join(completedPath, release.filename)) expect(calls[0].options.partPath).toBe(`${join(completedPath, release.filename)}.part`) @@ -82,14 +94,9 @@ describe('DownloadsService download progress persistence', () => { const downloads = repository.list() expect(downloads).toHaveLength(1) - expect(downloads[0]?.torrentFilename).toBe('movie.torrent') - expect(downloads[0]?.filename).toBe(release.filename) - expect(downloads[0]?.destPath).toBe(join(completedPath, release.filename)) - expect(downloads[0]?.partPath).toBe(`${join(completedPath, release.filename)}.part`) - expect(downloads[0]?.releaseSize).toBe(10) - expect(downloads[0]?.expectedBytes).toBe(10) - expect(downloads[0]?.downloadedBytes).toBe(10) expect(downloads[0]?.status).toBe('import_queued') + expect(downloads[0]?.downloadedBytes).toBe(10) + expect(downloads[0]?.attempts).toBe(1) handle.close() }) @@ -99,7 +106,7 @@ describe('DownloadsService download progress persistence', () => { const peer = fakePeer({ getRelease: async () => { throw new Error('metadata failed') } }) - const service = new DownloadsService({ completedPath }, [peer as any], [], repository) + const service = new DownloadsService(downloadsConfig(), [peer as any], [], repository) const filePath = await writeTorrent() await service.processTorrentFile(filePath, 'movie.torrent') @@ -108,7 +115,7 @@ describe('DownloadsService download progress persistence', () => { handle.close() }) - test('rejects a peer release with a path-traversal filename and does not write outside completedPath', async () => { + test('rejects a peer release with a path-traversal filename', async () => { const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) const repository = new DownloadsRepository(handle.db) const writtenPaths: string[] = [] @@ -118,36 +125,224 @@ describe('DownloadsService download progress persistence', () => { writtenPaths.push(destPath) }, }) - const service = new DownloadsService({ completedPath }, [peer as any], [fakeDestination() as any], repository) + const service = new DownloadsService(downloadsConfig(), [peer as any], [fakeDestination() as any], repository) const filePath = await writeTorrent() await service.processTorrentFile(filePath, 'movie.torrent') - // The unsafe name must never reach downloadFile / be written to disk. expect(writtenPaths).toHaveLength(0) - const evilOutside = join(tempDir, 'evil.mkv') - expect(await Bun.file(evilOutside).exists()).toBe(false) - expect(await Bun.file(`${evilOutside}.part`).exists()).toBe(false) - expect(repository.list()).toHaveLength(0) handle.close() }) - test('marks an existing row failed when download fails after metadata resolves', async () => { + test('marks an existing row failed when a permanent download error occurs', async () => { const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) const repository = new DownloadsRepository(handle.db) + let calls = 0 const peer = fakePeer({ downloadFile: async () => { - throw new Error('download failed') + calls++ + throw new FetchError('not found', new Response(null, { status: 404 })) } }) - const service = new DownloadsService({ completedPath }, [peer as any], [], repository) + const service = new DownloadsService(downloadsConfig(), [peer as any], [], repository) const filePath = await writeTorrent() await service.processTorrentFile(filePath, 'movie.torrent') + expect(calls).toBe(1) // 404 is permanent — no retry const downloads = repository.list() - expect(downloads).toHaveLength(1) expect(downloads[0]?.status).toBe('failed') - expect(downloads[0]?.error).toContain('download failed') + handle.close() + }) + + test('retries a transient failure then succeeds', async () => { + const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) + const repository = new DownloadsRepository(handle.db) + let calls = 0 + const peer = fakePeer({ + downloadFile: async (_itemId: string, _destPath: string, options: any) => { + calls++ + if (calls === 1) + throw new FetchError('busy', new Response(null, { status: 503 })) + await options.onProgress({ type: 'completed', downloadedBytes: 10, expectedBytes: 10 }) + }, + }) + const service = new DownloadsService(downloadsConfig(), [peer as any], [fakeDestination() as any], repository) + const filePath = await writeTorrent() + + await service.processTorrentFile(filePath, 'movie.torrent') + + expect(calls).toBe(2) + const downloads = repository.list() + expect(downloads[0]?.status).toBe('import_queued') + expect(downloads[0]?.attempts).toBe(2) + handle.close() + }) + + test('persists a resume reset from a restart event', async () => { + const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) + const repository = new DownloadsRepository(handle.db) + // Spy, because a realistic download ends with a `completed` event whose + // markCompleted() clears the error markResumeReset() set — so asserting the + // final row state cannot prove the reset ran. + const resetSpy = spyOn(repository, 'markResumeReset') + const peer = fakePeer({ + downloadFile: async (_itemId: string, _destPath: string, options: any) => { + await options.onProgress({ type: 'headers', expectedBytes: 10, expectedBytesSource: 'content_length', expectedBytesMismatch: false }) + await options.onProgress({ type: 'restart', reason: 'range_ignored', discardedBytes: 4 }) + await options.onProgress({ type: 'completed', downloadedBytes: 10, expectedBytes: 10 }) + }, + }) + const service = new DownloadsService(downloadsConfig(), [peer as any], [fakeDestination() as any], repository) + const filePath = await writeTorrent() + + await service.processTorrentFile(filePath, 'movie.torrent') + + expect(resetSpy).toHaveBeenCalledTimes(1) + expect(repository.list()[0]?.status).toBe('import_queued') + resetSpy.mockRestore() + handle.close() + }) + + test('limits concurrent downloads to maxConcurrentDownloads', async () => { + const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) + const repository = new DownloadsRepository(handle.db) + let active = 0 + let maxActive = 0 + const peer = { + id: 'peer-1', + name: 'Friend Jack', + url: 'http://peer.test', + // Distinct filename per item so each maps to a distinct destPath. + getRelease: async (itemId: string) => ({ ...release, id: `remote:${itemId}`, filename: `${itemId.replace(':', '_')}.mkv` }), + downloadFile: async (_itemId: string, _destPath: string, options: any) => { + active++ + maxActive = Math.max(maxActive, active) + await Bun.sleep(20) + active-- + await options.onProgress({ type: 'completed', downloadedBytes: 10, expectedBytes: 10 }) + }, + } + const service = new DownloadsService(downloadsConfig({ maxConcurrentDownloads: 1 }), [peer as any], [fakeDestination() as any], repository) + const a = await writeTorrent('a.torrent', 'movie:1') + const b = await writeTorrent('b.torrent', 'movie:2') + + await Promise.all([ + service.processTorrentFile(a, 'a.torrent'), + service.processTorrentFile(b, 'b.torrent'), + ]) + + expect(maxActive).toBe(1) + expect(repository.list().filter(d => d.status === 'import_queued')).toHaveLength(2) + handle.close() + }) + + test('resumeStaleDownloads re-drives a stale downloading row to import_queued', async () => { + const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) + const repository = new DownloadsRepository(handle.db) + const calls: string[] = [] + const peer = fakePeer({ + downloadFile: async (_itemId: string, destPath: string, options: any) => { + calls.push(destPath) + await options.onProgress({ type: 'completed', downloadedBytes: 10, expectedBytes: 10 }) + }, + }) + // Seed a stale `downloading` row, as if Jack crashed mid-download. + repository.create({ + torrentFilename: 'movie.torrent', + peerId: 'peer-1', + peerName: 'Friend Jack', + itemId: 'movie:1', + filename: release.filename, + destPath: join(completedPath, release.filename), + partPath: `${join(completedPath, release.filename)}.part`, + releaseSize: release.size, + release, + }) + const service = new DownloadsService(downloadsConfig(), [peer as any], [fakeDestination() as any], repository) + + const resumed = await service.resumeStaleDownloads() + // resumeStaleDownloads fires in the background; wait for the row to settle. + for (let i = 0; i < 50 && repository.list()[0]?.status !== 'import_queued'; i++) + await Bun.sleep(10) + + expect(resumed).toBe(1) + expect(calls).toEqual([join(completedPath, release.filename)]) + expect(repository.list()[0]?.status).toBe('import_queued') + handle.close() + }) + + test('marks superseded duplicate stale rows (same destPath) failed and re-drives only one', async () => { + const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) + const repository = new DownloadsRepository(handle.db) + const calls: string[] = [] + const peer = fakePeer({ + downloadFile: async (_itemId: string, destPath: string, options: any) => { + calls.push(destPath) + await options.onProgress({ type: 'completed', downloadedBytes: 10, expectedBytes: 10 }) + }, + }) + const destPath = join(completedPath, release.filename) + const base = { + peerId: 'peer-1', + peerName: 'Friend Jack', + itemId: 'movie:1', + filename: release.filename, + destPath, + partPath: `${destPath}.part`, + releaseSize: release.size, + release, + } + repository.create({ ...base, torrentFilename: 'first.torrent' }) + repository.create({ ...base, torrentFilename: 'second.torrent' }) + const service = new DownloadsService(downloadsConfig(), [peer as any], [fakeDestination() as any], repository) + + const resumed = await service.resumeStaleDownloads() + for (let i = 0; i < 50 && !repository.list().some(d => d.status === 'import_queued'); i++) + await Bun.sleep(10) + + expect(resumed).toBe(1) + expect(calls).toEqual([destPath]) // only one of the two same-destPath rows is re-driven + const rows = repository.list() + expect(rows.filter(d => d.status === 'import_queued')).toHaveLength(1) + expect(rows.find(d => d.status === 'failed')?.error).toContain('superseded') + handle.close() + }) + + test('releases the re-enqueue claim after a successful resume so the filename can be processed again', async () => { + const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) + const repository = new DownloadsRepository(handle.db) + const calls: string[] = [] + const peer = fakePeer({ + downloadFile: async (itemId: string, _destPath: string, options: any) => { + calls.push(itemId) + await options.onProgress({ type: 'completed', downloadedBytes: 10, expectedBytes: 10 }) + }, + }) + repository.create({ + torrentFilename: 'movie.torrent', + peerId: 'peer-1', + peerName: 'Friend Jack', + itemId: 'movie:1', + filename: release.filename, + destPath: join(completedPath, release.filename), + partPath: `${join(completedPath, release.filename)}.part`, + releaseSize: release.size, + release, + }) + const service = new DownloadsService(downloadsConfig(), [peer as any], [fakeDestination() as any], repository) + + await service.resumeStaleDownloads() + for (let i = 0; i < 50 && repository.list()[0]?.status !== 'import_queued'; i++) + await Bun.sleep(10) + expect(calls).toHaveLength(1) + + // A later legitimate re-drop of the same torrent filename must NOT be skipped + // by a stale re-enqueue claim once the resume has completed. + const filePath = await writeTorrent('movie.torrent') + await service.processTorrentFile(filePath, 'movie.torrent') + + expect(calls).toHaveLength(2) + expect(repository.list().filter(d => d.status === 'import_queued')).toHaveLength(2) handle.close() }) }) diff --git a/apps/backend/src/__tests__/integration.test.ts b/apps/backend/src/__tests__/integration.test.ts index c8edc95..98c7153 100644 --- a/apps/backend/src/__tests__/integration.test.ts +++ b/apps/backend/src/__tests__/integration.test.ts @@ -1,4 +1,3 @@ -import type { AppConfig } from '../lib/config' import type { Envs } from '../lib/envs' import type { Release } from '../lib/release' import { Database } from 'bun:sqlite' @@ -9,6 +8,7 @@ import { setupServer } from 'msw/node' import { getApp } from '../app' import { runMigrations } from '../database/connection' import * as schema from '../database/schema' +import { AppConfig } from '../lib/config' import { RadarrServerConnector } from '../lib/servers/arr/radarr' import { PeerConnector } from '../lib/servers/peer' import { DownloadsRepository } from '../modules/downloads/downloads.repository' @@ -101,12 +101,12 @@ afterEach(() => { }) afterAll(() => server.close()) -const config: AppConfig = { +const config = AppConfig.parse({ jack: { baseUrl: 'http://localhost:3000', apiKey: 'test-api-key' }, downloads: { watchPath: '/tmp/jack-test-watch', completedPath: '/tmp/jack-test-completed' }, servers: [], peers: [], -} +}) const envs: Envs = { APP_CONFIG_PATH: '/data/config.json', diff --git a/apps/backend/src/__tests__/retry.test.ts b/apps/backend/src/__tests__/retry.test.ts new file mode 100644 index 0000000..ceb4c7f --- /dev/null +++ b/apps/backend/src/__tests__/retry.test.ts @@ -0,0 +1,181 @@ +import { describe, expect, spyOn, test } from 'bun:test' +import { FetchError } from '../lib/errors/FetchError' +import { IncompleteDownloadError } from '../lib/errors/IncompleteDownloadError' +import { retry } from '../lib/retry' +import { downloadRetryAfterMs, isTransientDownloadError } from '../modules/downloads/retry-policy' + +async function noSleep() {} + +describe('retry', () => { + test('returns immediately on first success', async () => { + let calls = 0 + const result = await retry(async () => { + calls++ + return 'ok' + }, { + maxAttempts: 3, + baseDelayMs: 1, + maxDelayMs: 10, + isRetryable: () => true, + sleep: noSleep, + }) + expect(result).toBe('ok') + expect(calls).toBe(1) + }) + + test('retries a retryable failure then succeeds', async () => { + let calls = 0 + const result = await retry(async () => { + calls++ + if (calls < 2) + throw new Error('boom') + return 'ok' + }, { maxAttempts: 3, baseDelayMs: 1, maxDelayMs: 10, isRetryable: () => true, sleep: noSleep, random: () => 1 }) + expect(result).toBe('ok') + expect(calls).toBe(2) + }) + + test('does not retry a non-retryable error', async () => { + let calls = 0 + await expect(retry(async () => { + calls++ + throw new Error('nope') + }, { + maxAttempts: 3, + baseDelayMs: 1, + maxDelayMs: 10, + isRetryable: () => false, + sleep: noSleep, + })).rejects.toThrow('nope') + expect(calls).toBe(1) + }) + + test('throws the last error after exhausting attempts', async () => { + let calls = 0 + await expect(retry(async () => { + calls++ + throw new Error(`fail-${calls}`) + }, { + maxAttempts: 3, + baseDelayMs: 1, + maxDelayMs: 10, + isRetryable: () => true, + sleep: noSleep, + random: () => 1, + })).rejects.toThrow('fail-3') + expect(calls).toBe(3) + }) + + test('uses retryAfterMs over the jittered backoff', async () => { + const delays: number[] = [] + let calls = 0 + await retry(async () => { + calls++ + if (calls < 2) + throw new Error('429') + return 'ok' + }, { + maxAttempts: 3, + baseDelayMs: 1, + maxDelayMs: 10_000, + isRetryable: () => true, + retryAfterMs: () => 2000, + sleep: async (ms) => { + delays.push(ms) + }, + random: () => 1, + }) + expect(delays).toEqual([2000]) + }) + + test('rejects invalid maxAttempts without calling the operation', async () => { + let calls = 0 + await expect(retry(async () => { + calls++ + return 'ok' + }, { + maxAttempts: 0, + baseDelayMs: 1, + maxDelayMs: 10, + isRetryable: () => true, + sleep: noSleep, + })).rejects.toThrow('maxAttempts must be at least 1') + expect(calls).toBe(0) + }) +}) + +describe('isTransientDownloadError', () => { + function fetchError(status: number, headers?: Record) { + return new FetchError('x', new Response(null, { status, headers })) + } + + test('treats 5xx and 429 as transient', () => { + expect(isTransientDownloadError(fetchError(500))).toBe(true) + expect(isTransientDownloadError(fetchError(503))).toBe(true) + expect(isTransientDownloadError(fetchError(429))).toBe(true) + }) + + test('treats non-429 4xx as permanent', () => { + expect(isTransientDownloadError(fetchError(400))).toBe(false) + expect(isTransientDownloadError(fetchError(404))).toBe(false) + }) + + test('treats timeouts and network TypeErrors as transient', () => { + const timeout = new Error('timed out') + timeout.name = 'TimeoutError' + expect(isTransientDownloadError(timeout)).toBe(true) + expect(isTransientDownloadError(new TypeError('fetch failed'))).toBe(true) + }) + + test('treats manual aborts as permanent', () => { + const abort = new Error('shutting down') + abort.name = 'AbortError' + expect(isTransientDownloadError(abort)).toBe(false) + }) + + test('treats an incomplete download as transient', () => { + expect(isTransientDownloadError(new IncompleteDownloadError('got 3 bytes, expected 5'))).toBe(true) + }) + + test('treats other plain errors as permanent', () => { + expect(isTransientDownloadError(new Error('Unsafe release filename from peer'))).toBe(false) + }) +}) + +describe('downloadRetryAfterMs', () => { + test('parses a seconds Retry-After header', () => { + const err = new FetchError('x', new Response(null, { status: 429, headers: { 'Retry-After': '2' } })) + expect(downloadRetryAfterMs(err)).toBe(2000) + }) + + test('parses an HTTP-date Retry-After header', () => { + const now = Date.UTC(2098, 11, 31, 23, 59, 58) + const nowSpy = spyOn(Date, 'now').mockReturnValue(now) + const err = new FetchError('x', new Response(null, { status: 429, headers: { 'Retry-After': 'Thu, 01 Jan 2099 00:00:00 GMT' } })) + + try { + expect(downloadRetryAfterMs(err)).toBe(2000) + } + finally { + nowSpy.mockRestore() + } + }) + + test('floors past HTTP-date Retry-After headers at zero', () => { + const now = Date.UTC(2099, 0, 1, 0, 0, 0) + const nowSpy = spyOn(Date, 'now').mockReturnValue(now) + const err = new FetchError('x', new Response(null, { status: 429, headers: { 'Retry-After': 'Wed, 31 Dec 2098 23:59:59 GMT' } })) + + try { + expect(downloadRetryAfterMs(err)).toBe(0) + } + finally { + nowSpy.mockRestore() + } + }) + + test('returns null without a header or for non-FetchErrors', () => { + expect(downloadRetryAfterMs(new FetchError('x', new Response(null, { status: 429 })))).toBeNull() + expect(downloadRetryAfterMs(new Error('x'))).toBeNull() + }) +}) diff --git a/apps/backend/src/__tests__/semaphore.test.ts b/apps/backend/src/__tests__/semaphore.test.ts new file mode 100644 index 0000000..8c7c461 --- /dev/null +++ b/apps/backend/src/__tests__/semaphore.test.ts @@ -0,0 +1,45 @@ +import { describe, expect, test } from 'bun:test' +import { Semaphore } from '../lib/semaphore' + +describe('Semaphore', () => { + test('limits concurrency to the permit count', async () => { + const sem = new Semaphore(2) + let active = 0 + let maxActive = 0 + const task = () => sem.run(async () => { + active++ + maxActive = Math.max(maxActive, active) + await Bun.sleep(10) + active-- + }) + await Promise.all(Array.from({ length: 6 }, task)) + expect(maxActive).toBe(2) + }) + + test('hands permits to waiters in FIFO order', async () => { + const sem = new Semaphore(1) + const order: number[] = [] + await sem.acquire() + const waiters = [1, 2, 3].map(n => sem.acquire().then(() => { + order.push(n) + })) + sem.release() + sem.release() + sem.release() + await Promise.all(waiters) + expect(order).toEqual([1, 2, 3]) + }) + + test('releases the permit when the task throws', async () => { + const sem = new Semaphore(1) + await expect(sem.run(async () => { + throw new Error('boom') + })).rejects.toThrow('boom') + const ran = await sem.run(async () => 'next') + expect(ran).toBe('next') + }) + + test('rejects a non-positive permit count', () => { + expect(() => new Semaphore(0)).toThrow() + }) +}) diff --git a/apps/backend/src/database/schema.ts b/apps/backend/src/database/schema.ts index 3168888..a957bbb 100644 --- a/apps/backend/src/database/schema.ts +++ b/apps/backend/src/database/schema.ts @@ -20,6 +20,7 @@ export const downloads = sqliteTable('downloads', { expectedBytesSource: text('expected_bytes_source').$type(), expectedBytesMismatch: integer('expected_bytes_mismatch', { mode: 'boolean' }).notNull().default(false), downloadedBytes: integer('downloaded_bytes').notNull().default(0), + attempts: integer('attempts').notNull().default(0), status: text('status').$type().notNull(), startedAt: text('started_at').notNull(), updatedAt: text('updated_at').notNull(), diff --git a/apps/backend/src/index.ts b/apps/backend/src/index.ts index db0c1e5..dc15b1a 100644 --- a/apps/backend/src/index.ts +++ b/apps/backend/src/index.ts @@ -31,10 +31,6 @@ const destinations = connectors.servers.filter(s => s.canDestination) const database = await openDatabase({ appConfigPath: envs.APP_CONFIG_PATH }) const downloadsRepository = new DownloadsRepository(database.db) -const reconciledDownloads = await downloadsRepository.reconcileStaleDownloads() - -if (reconciledDownloads > 0) - logger.warn({ downloads: reconciledDownloads, databasePath: database.path }, 'Reconciled stale downloads from previous Jack run') const app = getApp(envs, config, connectors, { downloadsRepository }) const server = Bun.serve({ @@ -100,13 +96,25 @@ if (config.jack) { } } -// Start blackhole watcher +// Start blackhole watcher (and re-drive interrupted downloads from a prior run) let blackholeWatcher: BlackholeWatcher | null = null if (config.downloads) { const downloadsService = new DownloadsService(config.downloads, connectors.peers, destinations, downloadsRepository) + // Active re-enqueue: resume stale `downloading` rows in place before the + // watcher scans, so the leftover .torrent stubs are not re-processed as new rows. + const resumed = await downloadsService.resumeStaleDownloads() + if (resumed > 0) + logger.warn({ downloads: resumed, databasePath: database.path }, 'Re-enqueued interrupted downloads from previous Jack run') + blackholeWatcher = new BlackholeWatcher(config.downloads, downloadsService) await blackholeWatcher.start() } +else { + // No downloads config means stale rows cannot be resumed — mark them failed. + const failed = await downloadsRepository.reconcileStaleDownloads() + if (failed > 0) + logger.warn({ downloads: failed, databasePath: database.path }, 'Marked stale downloads failed (no downloads config to resume them)') +} process.on('SIGINT', async () => { logger.info('SIGINT received, exiting') diff --git a/apps/backend/src/lib/config.ts b/apps/backend/src/lib/config.ts index 6257d61..4129b38 100644 --- a/apps/backend/src/lib/config.ts +++ b/apps/backend/src/lib/config.ts @@ -133,6 +133,13 @@ export type JackConfig = z.infer export const DownloadsConfig = z.object({ watchPath: z.string().min(1), completedPath: z.string().min(1), + // Max peer file downloads running at once (an async semaphore guards the + // expensive download step). Defaults keep existing configs working. + maxConcurrentDownloads: z.number().int().min(1).default(3), + // Bounded retries for transient failures, with exponential backoff + jitter. + maxDownloadAttempts: z.number().int().min(1).default(5), + retryBaseDelayMs: z.number().int().min(0).default(1000), + retryMaxDelayMs: z.number().int().min(0).default(60_000), }) export type DownloadsConfig = z.infer diff --git a/apps/backend/src/lib/retry.ts b/apps/backend/src/lib/retry.ts new file mode 100644 index 0000000..59c01f3 --- /dev/null +++ b/apps/backend/src/lib/retry.ts @@ -0,0 +1,43 @@ +export interface RetryOptions { + /** Total attempts, including the first. */ + maxAttempts: number + baseDelayMs: number + maxDelayMs: number + /** Whether a thrown error should be retried. */ + isRetryable: (error: unknown) => boolean + /** Optional explicit delay (e.g. from a Retry-After header), capped at maxDelayMs. */ + retryAfterMs?: (error: unknown) => number | null + onRetry?: (info: { attempt: number, delayMs: number, error: unknown }) => void + /** Injectable for tests (defaults to setTimeout). */ + sleep?: (ms: number) => Promise + /** Injectable for tests (defaults to Math.random). */ + random?: () => number +} + +const defaultSleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)) + +export async function retry(fn: (attempt: number) => Promise, options: RetryOptions): Promise { + const { maxAttempts, baseDelayMs, maxDelayMs, isRetryable, retryAfterMs, onRetry } = options + const sleep = options.sleep ?? defaultSleep + const random = options.random ?? Math.random + + if (!Number.isInteger(maxAttempts) || maxAttempts < 1) + throw new RangeError('maxAttempts must be at least 1') + + for (let attempt = 1; ; attempt++) { + try { + return await fn(attempt) + } + catch (error) { + if (attempt >= maxAttempts || !isRetryable(error)) + throw error + + const explicit = retryAfterMs?.(error) ?? null + // Full jitter (AWS): pick uniformly in [0, exponential backoff]. + const backoff = Math.min(maxDelayMs, baseDelayMs * 2 ** (attempt - 1)) + const delayMs = explicit != null ? Math.min(explicit, maxDelayMs) : random() * backoff + onRetry?.({ attempt, delayMs, error }) + await sleep(delayMs) + } + } +} diff --git a/apps/backend/src/lib/semaphore.ts b/apps/backend/src/lib/semaphore.ts new file mode 100644 index 0000000..7f41f4c --- /dev/null +++ b/apps/backend/src/lib/semaphore.ts @@ -0,0 +1,39 @@ +/** A counting semaphore with FIFO fairness for limiting concurrent async work. */ +export class Semaphore { + private available: number + private readonly waiters: Array<() => void> = [] + + constructor(permits: number) { + if (!Number.isInteger(permits) || permits < 1) + throw new Error(`Semaphore permits must be a positive integer, got ${permits}`) + this.available = permits + } + + async acquire(): Promise { + if (this.available > 0) { + this.available-- + return + } + await new Promise(resolve => this.waiters.push(resolve)) + } + + release(): void { + const next = this.waiters.shift() + if (next) { + // Hand the permit straight to the next waiter (keeps `available` at 0). + next() + return + } + this.available++ + } + + async run(fn: () => Promise): Promise { + await this.acquire() + try { + return await fn() + } + finally { + this.release() + } + } +} diff --git a/apps/backend/src/modules/downloads/downloads.repository.ts b/apps/backend/src/modules/downloads/downloads.repository.ts index 2e066af..67eaf9a 100644 --- a/apps/backend/src/modules/downloads/downloads.repository.ts +++ b/apps/backend/src/modules/downloads/downloads.repository.ts @@ -1,7 +1,7 @@ import type { AppDatabase } from '../../database/connection' import type { DownloadRow, DownloadStatus, ExpectedBytesSource, NewDownloadRow } from '../../database/schema' import type { Release } from '../../lib/release' -import { desc, eq } from 'drizzle-orm' +import { desc, eq, sql } from 'drizzle-orm' import { downloads } from '../../database/schema' export interface DownloadRecord { @@ -19,6 +19,7 @@ export interface DownloadRecord { expectedBytesSource: ExpectedBytesSource | null expectedBytesMismatch: boolean downloadedBytes: number + attempts: number status: DownloadStatus startedAt: string updatedAt: string @@ -58,6 +59,7 @@ function toRecord(row: DownloadRow): DownloadRecord { expectedBytesSource: row.expectedBytesSource ?? null, expectedBytesMismatch: row.expectedBytesMismatch, downloadedBytes: row.downloadedBytes, + attempts: row.attempts, status: row.status, startedAt: row.startedAt, updatedAt: row.updatedAt, @@ -138,6 +140,27 @@ export class DownloadsRepository { .run() } + incrementAttempts(id: number): number { + const row = this.db.update(downloads) + .set({ attempts: sql`${downloads.attempts} + 1`, updatedAt: nowIso() }) + .where(eq(downloads.id, id)) + .returning() + .get() + return row?.attempts ?? 0 + } + + markResumeReset(id: number): void { + this.db.update(downloads) + .set({ downloadedBytes: 0, error: 'resume validation failed; restarted from byte 0', updatedAt: nowIso() }) + .where(eq(downloads.id, id)) + .run() + } + + /** Stale `downloading` rows from a prior run, returned for active re-drive (no mutation). */ + listStaleDownloads(): DownloadRecord[] { + return this.db.select().from(downloads).where(eq(downloads.status, 'downloading')).all().map(toRecord) + } + async reconcileStaleDownloads(): Promise { const staleRows = this.db.select().from(downloads).where(eq(downloads.status, 'downloading')).all() diff --git a/apps/backend/src/modules/downloads/downloads.service.ts b/apps/backend/src/modules/downloads/downloads.service.ts index 1a0932a..39fdfa6 100644 --- a/apps/backend/src/modules/downloads/downloads.service.ts +++ b/apps/backend/src/modules/downloads/downloads.service.ts @@ -1,25 +1,50 @@ import type { AppConfig } from '../../lib/config' import type { ArrServerConnector } from '../../lib/servers/arr/base' import type { PeerConnector, PeerDownloadProgressEvent } from '../../lib/servers/peer' -import type { DownloadsRepository } from './downloads.repository' +import type { DownloadRecord, DownloadsRepository } from './downloads.repository' import { Buffer } from 'node:buffer' import { unlink } from 'node:fs/promises' import { basename, join } from 'node:path' +import { retry } from '../../lib/retry' +import { Semaphore } from '../../lib/semaphore' import { withSpan } from '../../lib/tracing' import { logger } from '../../logger' import { parseTorrentStub } from '../torznab/torrent' +import { downloadRetryAfterMs, isTransientDownloadError } from './retry-policy' + +type DownloadsServiceConfig = NonNullable export class DownloadsService { + private readonly semaphore: Semaphore + // Dest paths with a download in flight — guards two concurrent live drops that + // resolve to the same destination (no duplicate rows / writers). + private readonly active = new Set() + // Torrent filenames owned by the startup re-enqueue. Their leftover stubs are + // skipped by the watcher's initial scan for the rest of the run, so a re-drive + // that fails fast cannot be re-processed into a duplicate row. + private readonly reenqueued = new Set() + constructor( - private readonly config: Pick, 'completedPath'>, + private readonly config: DownloadsServiceConfig, private readonly peers: PeerConnector[], private readonly destinations: ArrServerConnector[], private readonly downloadsRepository?: DownloadsRepository, - ) {} + ) { + this.semaphore = new Semaphore(config.maxConcurrentDownloads) + } async processTorrentFile(filePath: string, filename: string) { try { await withSpan('blackhole.process_torrent', { 'torrent.filename': filename }, async (span) => { + // The startup re-enqueue owns this stub — it is being (or will be) + // re-driven from the persisted row. Skip it so we never create a + // duplicate row, even if that re-drive already failed and cleared `active`. + if (this.reenqueued.has(filename)) { + span.setAttribute('torrent.reenqueued', true) + logger.debug({ torrentFilename: filename }, 'Stub owned by startup re-enqueue; skipping watcher processing') + return + } + const file = Bun.file(filePath) if (!await file.exists()) { span.setAttribute('torrent.exists', false) @@ -54,8 +79,6 @@ export class DownloadsService { // `release.filename` is peer-controlled and only validated as a string. // Force it to a plain basename inside `completedPath` so a value like // `../../evil.mkv` or an absolute path cannot escape the directory. - // Reject (rather than silently rewrite) anything that is not already a - // plain filename, so a malicious peer cannot smuggle in path separators. const safeName = basename(release.filename) const isSafeName = safeName.length > 0 && safeName !== '.' && safeName !== '..' && !safeName.includes('/') && !safeName.includes('\\') @@ -68,7 +91,12 @@ export class DownloadsService { const partPath = `${destPath}.part` span.setAttributes({ 'release.filename': safeName, 'release.size': release.size }) - const download = this.downloadsRepository?.create({ + if (this.active.has(destPath)) { + logger.debug({ torrentFilename: filename, destPath }, 'A download for this destination is already active; skipping duplicate') + return + } + + const created = this.downloadsRepository?.create({ torrentFilename: filename, peerId, peerName: peer.name ?? peer.url, @@ -80,56 +108,150 @@ export class DownloadsService { release, }) - const onProgress = async (event: PeerDownloadProgressEvent) => { - if (!download) - return + const record: DownloadRecord = created ?? { + id: -1, + torrentFilename: filename, + peerId, + peerName: peer.name ?? peer.url, + itemId, + filename: safeName, + destPath, + partPath, + releaseSize: release.size, + release, + expectedBytes: null, + expectedBytesSource: null, + expectedBytesMismatch: false, + downloadedBytes: 0, + attempts: 0, + status: 'downloading', + startedAt: '', + updatedAt: '', + completedAt: null, + error: null, + } - if (event.type === 'headers') { - this.downloadsRepository?.setExpectedBytes(download.id, event.expectedBytes, event.expectedBytesSource, event.expectedBytesMismatch) - return - } + await this.runDownload(record) + }) + } + catch (err) { + const message = err instanceof Error ? err.message : String(err) + logger.error({ torrentFilename: filename, filename, error: message }, 'Failed to process torrent') + } + } - if (event.type === 'progress') { - this.downloadsRepository?.updateProgress(download.id, event.downloadedBytes) - return - } + /** Re-drive stale `downloading` rows from a prior run, resuming from their .part files. */ + async resumeStaleDownloads(): Promise { + const repo = this.downloadsRepository + if (!repo) + return 0 + // Dedupe by destPath: a prior run could leave more than one stale row for the + // same destination (they share the same .part), but only one can be resumed. + // Re-driving two would make the second silently early-return in runDownload + // and stay stuck in `downloading`, so mark the superseded ones failed instead. + const seen = new Set() + const resumable: DownloadRecord[] = [] + for (const record of repo.listStaleDownloads()) { + if (seen.has(record.destPath)) { + repo.markFailed(record.id, 'superseded by another resumable download for the same destination') + continue + } + seen.add(record.destPath) + resumable.push(record) + } + // Claim every resumable stub up-front (synchronously, before the watcher + // starts) so the initial scan skips them regardless of re-drive timing/outcome. + for (const record of resumable) + this.reenqueued.add(record.torrentFilename) + for (const record of resumable) { + // Fire-and-forget: the semaphore caps concurrency, and the stub is already + // claimed in `reenqueued` so the watcher won't duplicate it. + void this.runDownload(record).catch((err) => { + const message = err instanceof Error ? err.message : String(err) + logger.error({ torrentFilename: record.torrentFilename, error: message }, 'Failed to resume stale download') + }) + } + if (resumable.length > 0) + logger.info({ downloads: resumable.length }, 'Re-enqueued interrupted downloads') + return resumable.length + } - if (event.type === 'restart') { - // Resume validation failed; the download restarts from byte 0. - // Persisting this is handled in a later phase. - return - } + private async runDownload(record: DownloadRecord): Promise { + if (this.active.has(record.destPath)) + return + this.active.add(record.destPath) + try { + await this.semaphore.run(() => this.downloadWithRetry(record)) + } + finally { + this.active.delete(record.destPath) + } + } - this.downloadsRepository?.markCompleted(download.id, event.downloadedBytes) - } + private async downloadWithRetry(record: DownloadRecord): Promise { + const repo = this.downloadsRepository + const peer = this.peers.find(p => p.id === record.peerId) + if (!peer) { + repo?.markFailed(record.id, `Peer ${record.peerId} not found`) + logger.error({ torrentFilename: record.torrentFilename, peerId: record.peerId }, 'Cannot run download: peer not found') + return + } - // Everything after the row is created is wrapped so any failure - // (download, stub unlink, or import trigger) marks the row failed - // instead of leaving it stuck in `completed`/`downloading`. - // `import_queued` means: the file downloaded AND triggerImport was - // attempted (best-effort per destination — see triggerImport below). - try { - await peer.downloadFile(itemId, destPath, { torrentFilename: filename, partPath, releaseSize: release.size, onProgress }) - await unlink(filePath) - await this.triggerImport(filename) - - if (download) - this.downloadsRepository?.markImportQueued(download.id) - } - catch (err) { - if (download) { - const message = err instanceof Error ? err.message : String(err) - this.downloadsRepository?.markFailed(download.id, message) - } - throw err - } + const onProgress = async (event: PeerDownloadProgressEvent) => { + if (event.type === 'headers') { + repo?.setExpectedBytes(record.id, event.expectedBytes, event.expectedBytesSource, event.expectedBytesMismatch) + return + } + if (event.type === 'progress') { + repo?.updateProgress(record.id, event.downloadedBytes) + return + } + if (event.type === 'restart') { + repo?.markResumeReset(record.id) + return + } + repo?.markCompleted(record.id, event.downloadedBytes) + } + + const stubPath = join(this.config.watchPath, record.torrentFilename) - logger.info({ torrentFilename: filename, filename: safeName }, 'Download complete, triggered import') + try { + await retry(async () => { + repo?.incrementAttempts(record.id) + await peer.downloadFile(record.itemId, record.destPath, { + torrentFilename: record.torrentFilename, + partPath: record.partPath, + releaseSize: record.releaseSize, + onProgress, + }) + }, { + maxAttempts: this.config.maxDownloadAttempts, + baseDelayMs: this.config.retryBaseDelayMs, + maxDelayMs: this.config.retryMaxDelayMs, + isRetryable: isTransientDownloadError, + retryAfterMs: downloadRetryAfterMs, + onRetry: ({ attempt, delayMs, error }) => { + const message = error instanceof Error ? error.message : String(error) + logger.warn({ torrentFilename: record.torrentFilename, attempt, delayMs, error: message }, 'Retrying peer download after transient failure') + }, }) + + await unlink(stubPath).catch(() => {}) + await this.triggerImport(record.torrentFilename) + repo?.markImportQueued(record.id) + // Release the startup-re-enqueue claim now that the stub is gone, so a + // later legitimate re-drop of the same filename isn't silently skipped. + // Only on success: a failed re-drive keeps its stub, so it stays claimed + // (and is re-driven on the next restart) to avoid in-session hammering. + this.reenqueued.delete(record.torrentFilename) + logger.info({ torrentFilename: record.torrentFilename, filename: record.filename }, 'Download complete, triggered import') } catch (err) { const message = err instanceof Error ? err.message : String(err) - logger.error({ torrentFilename: filename, filename, error: message }, 'Failed to process torrent') + // The .part file is preserved by downloadFile on failure, so a later + // restart re-enqueue can resume from it. + repo?.markFailed(record.id, message) + logger.error({ torrentFilename: record.torrentFilename, filename: record.filename, error: message }, 'Download failed') } } diff --git a/apps/backend/src/modules/downloads/retry-policy.ts b/apps/backend/src/modules/downloads/retry-policy.ts new file mode 100644 index 0000000..71c786e --- /dev/null +++ b/apps/backend/src/modules/downloads/retry-policy.ts @@ -0,0 +1,45 @@ +import { FetchError } from '../../lib/errors/FetchError' +import { IncompleteDownloadError } from '../../lib/errors/IncompleteDownloadError' + +/** + * Transient = worth retrying: network failures, timeouts, HTTP 5xx, 429, and a + * truncated stream (the .part is preserved, so a retry resumes). Non-429 4xx and + * any other error (unsafe filename, file-too-large) are permanent (not retried). + */ +export function isTransientDownloadError(error: unknown): boolean { + if (error instanceof IncompleteDownloadError) + return true + if (error instanceof FetchError) { + const status = error.response?.status ?? error.extras.status + if (status == null) + return true + if (status === 429) + return true + return status >= 500 && status <= 599 + } + if (error instanceof Error) { + // AbortSignal.timeout rejects with a TimeoutError; manual aborts are permanent. + if (error.name === 'TimeoutError') + return true + // A failed fetch (DNS, connection refused, reset) rejects with a TypeError. + if (error instanceof TypeError) + return true + } + return false +} + +/** Milliseconds to wait per a 429 `Retry-After` header (seconds or HTTP-date), or null. */ +export function downloadRetryAfterMs(error: unknown): number | null { + if (!(error instanceof FetchError)) + return null + const header = error.response?.headers?.get('Retry-After') + if (!header) + return null + const seconds = Number(header) + if (Number.isFinite(seconds)) + return Math.max(0, seconds * 1000) + const dateMs = Date.parse(header) + if (!Number.isNaN(dateMs)) + return Math.max(0, dateMs - Date.now()) + return null +} diff --git a/examples/config.jsonc b/examples/config.jsonc index f78bb97..cf832c1 100644 --- a/examples/config.jsonc +++ b/examples/config.jsonc @@ -25,7 +25,14 @@ // and moves finished downloads. Paths are *inside the container*. "downloads": { "watchPath": "/data/torrents/watch", - "completedPath": "/data/torrents/completed" + "completedPath": "/data/torrents/completed", + // Optional download hardening knobs — values below are the defaults, so you + // can delete any line to keep the default. (Note the comma after + // "completedPath" above is required once any field follows it.) + "maxConcurrentDownloads": 3, + "maxDownloadAttempts": 5, + "retryBaseDelayMs": 1000, + "retryMaxDelayMs": 60000 }, // Your Radarr/Sonarr servers. apiKey is the *arr API key: exactly 32 hex From de2370c1d5f258f16ebbdcb5e753f0e493e3b8f2 Mon Sep 17 00:00:00 2001 From: Roz Date: Sat, 6 Jun 2026 12:32:39 +0200 Subject: [PATCH 3/5] fix: guard non-ok resume responses --- .../src/__tests__/peer-download.test.ts | 22 +++++++++++++++++++ apps/backend/src/lib/servers/peer.ts | 3 +++ 2 files changed, 25 insertions(+) diff --git a/apps/backend/src/__tests__/peer-download.test.ts b/apps/backend/src/__tests__/peer-download.test.ts index 9ac1261..2211dbf 100644 --- a/apps/backend/src/__tests__/peer-download.test.ts +++ b/apps/backend/src/__tests__/peer-download.test.ts @@ -333,6 +333,28 @@ describe('PeerConnector.downloadFile resume', () => { } }) + test('rejects non-ok resume responses without appending the response body', async () => { + server.use( + http.get(`${PEER_JACK_URL}/peer/items/:itemId/file`, () => + new Response(streamOf([9, 9]), { status: 500, statusText: 'Server Error', headers: { 'Content-Length': '2' } })), + ) + const peer = markInitialized(new PeerConnector({ url: PEER_JACK_URL, apiKey: 'peer-api-key', name: 'Friend Jack' })) + const dir = await mkdtemp(join(tmpdir(), 'jack-resume-non-ok-')) + const destPath = join(dir, 'Movie.mkv') + const partPath = `${destPath}.part` + await writeFile(partPath, new Uint8Array([0, 1])) + + try { + await expect(peer.downloadFile('remote1:movie:99', destPath, { partPath, releaseSize: 5 })).rejects.toThrow('Failed to resume download from peer') + expect(await Bun.file(partPath).exists()).toBe(true) + expect(new Uint8Array(await Bun.file(partPath).arrayBuffer())).toEqual(new Uint8Array([0, 1])) + expect(await Bun.file(destPath).exists()).toBe(false) + } + finally { + await rm(dir, { recursive: true, force: true }) + } + }) + test('preserves the .part file when a download fails mid-stream', async () => { server.use( http.get(`${PEER_JACK_URL}/peer/items/:itemId/file`, () => diff --git a/apps/backend/src/lib/servers/peer.ts b/apps/backend/src/lib/servers/peer.ts index 592fda3..70961ae 100644 --- a/apps/backend/src/lib/servers/peer.ts +++ b/apps/backend/src/lib/servers/peer.ts @@ -220,6 +220,9 @@ export class PeerConnector extends ServerConnector { response = await restartFresh(response, 'range_not_satisfiable', existingBytes) span.setAttribute('http.response.status_code', response.status) } + else if (!response.ok) { + throw new FetchError(`Failed to resume download from peer: ${response.statusText}`, response) + } else if (response.ok) { // Peer ignored the Range header and is streaming the whole file from // byte 0. Discard the stale .part and use this response as-is. From ef1708350fef0f65bf88e385ce747b9d00bc7e45 Mon Sep 17 00:00:00 2001 From: Roz Date: Sat, 6 Jun 2026 12:41:51 +0200 Subject: [PATCH 4/5] fix: avoid leaked peer download reader lock --- .../src/__tests__/peer-download.test.ts | 26 ++++++++++++++++++- apps/backend/src/lib/servers/peer.ts | 2 +- 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/apps/backend/src/__tests__/peer-download.test.ts b/apps/backend/src/__tests__/peer-download.test.ts index 2211dbf..9505e90 100644 --- a/apps/backend/src/__tests__/peer-download.test.ts +++ b/apps/backend/src/__tests__/peer-download.test.ts @@ -2,7 +2,7 @@ import type { PeerDownloadProgressEvent } from '../lib/servers/peer' import { mkdtemp, rm, writeFile } from 'node:fs/promises' import { tmpdir } from 'node:os' import { join } from 'node:path' -import { afterAll, afterEach, beforeAll, describe, expect, test } from 'bun:test' +import { afterAll, afterEach, beforeAll, describe, expect, spyOn, test } from 'bun:test' import { http, HttpResponse } from 'msw' import { setupServer } from 'msw/node' import { PeerConnector } from '../lib/servers/peer' @@ -207,6 +207,30 @@ describe('PeerConnector.downloadFile', () => { await rm(dir, { recursive: true, force: true }) } }) + + test('does not leave the response body locked when opening the .part file fails', async () => { + let body: ReadableStream | null = null + const fetchSpy = spyOn(globalThis, 'fetch').mockImplementation(async () => { + const response = new Response(streamOf([1, 2, 3]), { headers: { 'Content-Length': '3' } }) + body = response.body + return response + }, + ) + + const peer = markInitialized(new PeerConnector({ url: PEER_JACK_URL, apiKey: 'peer-api-key', name: 'Friend Jack' })) + const dir = await mkdtemp(join(tmpdir(), 'jack-peer-open-fails-')) + const destPath = join(dir, 'missing-parent', 'Movie.mkv') + + try { + await expect(peer.downloadFile('remote1:movie:99', destPath, { partPath: `${destPath}.part`, releaseSize: 3 })).rejects.toThrow() + expect(body).not.toBeNull() + expect(body?.locked).toBe(false) + } + finally { + fetchSpy.mockRestore() + await rm(dir, { recursive: true, force: true }) + } + }) }) describe('PeerConnector.downloadFile resume', () => { diff --git a/apps/backend/src/lib/servers/peer.ts b/apps/backend/src/lib/servers/peer.ts index 70961ae..b985952 100644 --- a/apps/backend/src/lib/servers/peer.ts +++ b/apps/backend/src/lib/servers/peer.ts @@ -279,8 +279,8 @@ export class PeerConnector extends ServerConnector { if (expectedBytes != null && expectedBytes > MAX_DOWNLOAD_BYTES) throw new Error(`File too large: ${expectedBytes} bytes exceeds ${MAX_DOWNLOAD_BYTES} byte limit`) - const reader = response.body.getReader() const handle = await open(partPath, resuming ? 'a' : 'w') + const reader = response.body.getReader() let downloadedBytes = existingBytes let lastLoggedAt = Date.now() let lastLoggedBytes = downloadedBytes From 47daec3836f8d50b28e8d4dd79286ade739bcd0b Mon Sep 17 00:00:00 2001 From: Roz Date: Sat, 6 Jun 2026 12:55:56 +0200 Subject: [PATCH 5/5] fix: close peer download handle on reader failure --- .../src/__tests__/peer-download.test.ts | 35 ++++++++++++++++++- apps/backend/src/lib/servers/peer.ts | 9 ++++- 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/apps/backend/src/__tests__/peer-download.test.ts b/apps/backend/src/__tests__/peer-download.test.ts index 9505e90..487e1b8 100644 --- a/apps/backend/src/__tests__/peer-download.test.ts +++ b/apps/backend/src/__tests__/peer-download.test.ts @@ -1,5 +1,5 @@ import type { PeerDownloadProgressEvent } from '../lib/servers/peer' -import { mkdtemp, rm, writeFile } from 'node:fs/promises' +import { mkdtemp, readdir, rm, writeFile } from 'node:fs/promises' import { tmpdir } from 'node:os' import { join } from 'node:path' import { afterAll, afterEach, beforeAll, describe, expect, spyOn, test } from 'bun:test' @@ -43,6 +43,12 @@ async function waitFor(predicate: () => Promise) { throw new Error('Timed out waiting for condition') } +async function openFileDescriptorCount() { + if (process.platform !== 'linux') + return null + return (await readdir('/proc/self/fd')).length +} + describe('PeerConnector.downloadFile', () => { test('streams to a .part file before renaming to the completed file', async () => { const firstChunk = new Uint8Array([1, 2, 3]) @@ -231,6 +237,33 @@ describe('PeerConnector.downloadFile', () => { await rm(dir, { recursive: true, force: true }) } }) + + test('closes the .part file handle when getting the response reader fails', async () => { + const before = await openFileDescriptorCount() + if (before == null) + return + + const fetchSpy = spyOn(globalThis, 'fetch').mockImplementation(async () => { + return new Response(streamOf([1, 2, 3]), { headers: { 'Content-Length': '3' } }) + }) + const getReaderSpy = spyOn(ReadableStream.prototype, 'getReader').mockImplementation(() => { + throw new Error('reader failed') + }) + + const peer = markInitialized(new PeerConnector({ url: PEER_JACK_URL, apiKey: 'peer-api-key', name: 'Friend Jack' })) + const dir = await mkdtemp(join(tmpdir(), 'jack-peer-reader-fails-')) + const destPath = join(dir, 'Movie.mkv') + + try { + await expect(peer.downloadFile('remote1:movie:99', destPath, { partPath: `${destPath}.part`, releaseSize: 3 })).rejects.toThrow('reader failed') + expect(await openFileDescriptorCount()).toBeLessThanOrEqual(before) + } + finally { + getReaderSpy.mockRestore() + fetchSpy.mockRestore() + await rm(dir, { recursive: true, force: true }) + } + }) }) describe('PeerConnector.downloadFile resume', () => { diff --git a/apps/backend/src/lib/servers/peer.ts b/apps/backend/src/lib/servers/peer.ts index b985952..fcb9e5d 100644 --- a/apps/backend/src/lib/servers/peer.ts +++ b/apps/backend/src/lib/servers/peer.ts @@ -280,7 +280,14 @@ export class PeerConnector extends ServerConnector { throw new Error(`File too large: ${expectedBytes} bytes exceeds ${MAX_DOWNLOAD_BYTES} byte limit`) const handle = await open(partPath, resuming ? 'a' : 'w') - const reader = response.body.getReader() + let reader: ReadableStreamDefaultReader + try { + reader = response.body.getReader() + } + catch (err) { + await handle.close().catch(() => {}) + throw err + } let downloadedBytes = existingBytes let lastLoggedAt = Date.now() let lastLoggedBytes = downloadedBytes