From e77e9a5c38c0eb3437d0ba0984adf258479f0aa1 Mon Sep 17 00:00:00 2001 From: Roz Date: Sat, 6 Jun 2026 02:51:24 +0200 Subject: [PATCH 1/3] feat: add retry, semaphore, and download concurrency/retry config Add a generic retry() helper (bounded attempts, exponential backoff with full jitter, optional Retry-After override, injectable sleep/random) and a download retry classifier (transient: network/timeout/5xx/429/incomplete stream; permanent: non-429 4xx and others). Add a FIFO async Semaphore. Extend DownloadsConfig with maxConcurrentDownloads and retry knobs (all defaulted so existing configs keep parsing). Primitives are wired into DownloadsService in a later change. Refs #13. --- apps/backend/src/__tests__/config.test.ts | 18 +++ .../src/__tests__/downloads-api.test.ts | 6 +- .../backend/src/__tests__/integration.test.ts | 6 +- apps/backend/src/__tests__/retry.test.ts | 134 ++++++++++++++++++ apps/backend/src/__tests__/semaphore.test.ts | 45 ++++++ apps/backend/src/lib/config.ts | 7 + apps/backend/src/lib/retry.ts | 43 ++++++ apps/backend/src/lib/semaphore.ts | 39 +++++ .../src/modules/downloads/retry-policy.ts | 45 ++++++ examples/config.jsonc | 9 +- 10 files changed, 345 insertions(+), 7 deletions(-) create mode 100644 apps/backend/src/__tests__/retry.test.ts create mode 100644 apps/backend/src/__tests__/semaphore.test.ts create mode 100644 apps/backend/src/lib/retry.ts create mode 100644 apps/backend/src/lib/semaphore.ts create mode 100644 apps/backend/src/modules/downloads/retry-policy.ts diff --git a/apps/backend/src/__tests__/config.test.ts b/apps/backend/src/__tests__/config.test.ts index 6e37718..a8faa35 100644 --- a/apps/backend/src/__tests__/config.test.ts +++ b/apps/backend/src/__tests__/config.test.ts @@ -277,4 +277,22 @@ describe('appConfig parsing', () => { }) expect(result.success).toBe(false) }) + + test('defaults the downloads hardening knobs', () => { + const parsed = AppConfig.parse({ + downloads: { watchPath: '/w', completedPath: '/c' }, + }) + expect(parsed.downloads).toMatchObject({ + maxConcurrentDownloads: 3, + maxDownloadAttempts: 5, + retryBaseDelayMs: 1000, + retryMaxDelayMs: 60_000, + }) + }) + + test('respects an explicit maxConcurrentDownloads and rejects non-positive values', () => { + const parsed = AppConfig.parse({ downloads: { watchPath: '/w', completedPath: '/c', maxConcurrentDownloads: 8 } }) + expect(parsed.downloads?.maxConcurrentDownloads).toBe(8) + expect(AppConfig.safeParse({ downloads: { watchPath: '/w', completedPath: '/c', maxConcurrentDownloads: 0 } }).success).toBe(false) + }) }) diff --git a/apps/backend/src/__tests__/downloads-api.test.ts b/apps/backend/src/__tests__/downloads-api.test.ts index 09d987c..4df87ff 100644 --- a/apps/backend/src/__tests__/downloads-api.test.ts +++ b/apps/backend/src/__tests__/downloads-api.test.ts @@ -1,4 +1,3 @@ -import type { AppConfig } from '../lib/config' import type { Envs } from '../lib/envs' import type { Release } from '../lib/release' import { mkdtemp, rm } from 'node:fs/promises' @@ -7,6 +6,7 @@ import { join } from 'node:path' import { afterEach, beforeEach, describe, expect, test } from 'bun:test' import { getApp } from '../app' import { openDatabase } from '../database/connection' +import { AppConfig } from '../lib/config' import { DownloadsRepository } from '../modules/downloads/downloads.repository' const envs: Envs = { @@ -20,12 +20,12 @@ const envs: Envs = { NODE_ENV: 'test', } -const config: AppConfig = { +const config = AppConfig.parse({ jack: { baseUrl: 'http://localhost:3000', apiKey: 'test-api-key' }, downloads: { watchPath: '/tmp/watch', completedPath: '/tmp/completed' }, servers: [], peers: [], -} +}) const release: Release = { id: 'remote:movie:1', diff --git a/apps/backend/src/__tests__/integration.test.ts b/apps/backend/src/__tests__/integration.test.ts index c8edc95..98c7153 100644 --- a/apps/backend/src/__tests__/integration.test.ts +++ b/apps/backend/src/__tests__/integration.test.ts @@ -1,4 +1,3 @@ -import type { AppConfig } from '../lib/config' import type { Envs } from '../lib/envs' import type { Release } from '../lib/release' import { Database } from 'bun:sqlite' @@ -9,6 +8,7 @@ import { setupServer } from 'msw/node' import { getApp } from '../app' import { runMigrations } from '../database/connection' import * as schema from '../database/schema' +import { AppConfig } from '../lib/config' import { RadarrServerConnector } from '../lib/servers/arr/radarr' import { PeerConnector } from '../lib/servers/peer' import { DownloadsRepository } from '../modules/downloads/downloads.repository' @@ -101,12 +101,12 @@ afterEach(() => { }) afterAll(() => server.close()) -const config: AppConfig = { +const config = AppConfig.parse({ jack: { baseUrl: 'http://localhost:3000', apiKey: 'test-api-key' }, downloads: { watchPath: '/tmp/jack-test-watch', completedPath: '/tmp/jack-test-completed' }, servers: [], peers: [], -} +}) const envs: Envs = { APP_CONFIG_PATH: '/data/config.json', diff --git a/apps/backend/src/__tests__/retry.test.ts b/apps/backend/src/__tests__/retry.test.ts new file mode 100644 index 0000000..ca528c3 --- /dev/null +++ b/apps/backend/src/__tests__/retry.test.ts @@ -0,0 +1,134 @@ +import { describe, expect, test } from 'bun:test' +import { FetchError } from '../lib/errors/FetchError' +import { IncompleteDownloadError } from '../lib/errors/IncompleteDownloadError' +import { retry } from '../lib/retry' +import { downloadRetryAfterMs, isTransientDownloadError } from '../modules/downloads/retry-policy' + +async function noSleep() {} + +describe('retry', () => { + test('returns immediately on first success', async () => { + let calls = 0 + const result = await retry(async () => { + calls++ + return 'ok' + }, { + maxAttempts: 3, + baseDelayMs: 1, + maxDelayMs: 10, + isRetryable: () => true, + sleep: noSleep, + }) + expect(result).toBe('ok') + expect(calls).toBe(1) + }) + + test('retries a retryable failure then succeeds', async () => { + let calls = 0 + const result = await retry(async () => { + calls++ + if (calls < 2) + throw new Error('boom') + return 'ok' + }, { maxAttempts: 3, baseDelayMs: 1, maxDelayMs: 10, isRetryable: () => true, sleep: noSleep, random: () => 1 }) + expect(result).toBe('ok') + expect(calls).toBe(2) + }) + + test('does not retry a non-retryable error', async () => { + let calls = 0 + await expect(retry(async () => { + calls++ + throw new Error('nope') + }, { + maxAttempts: 3, + baseDelayMs: 1, + maxDelayMs: 10, + isRetryable: () => false, + sleep: noSleep, + })).rejects.toThrow('nope') + expect(calls).toBe(1) + }) + + test('throws the last error after exhausting attempts', async () => { + let calls = 0 + await expect(retry(async () => { + calls++ + throw new Error(`fail-${calls}`) + }, { + maxAttempts: 3, + baseDelayMs: 1, + maxDelayMs: 10, + isRetryable: () => true, + sleep: noSleep, + random: () => 1, + })).rejects.toThrow('fail-3') + expect(calls).toBe(3) + }) + + test('uses retryAfterMs over the jittered backoff', async () => { + const delays: number[] = [] + let calls = 0 + await retry(async () => { + calls++ + if (calls < 2) + throw new Error('429') + return 'ok' + }, { + maxAttempts: 3, + baseDelayMs: 1, + maxDelayMs: 10_000, + isRetryable: () => true, + retryAfterMs: () => 2000, + sleep: async (ms) => { + delays.push(ms) + }, + random: () => 1, + }) + expect(delays).toEqual([2000]) + }) +}) + +describe('isTransientDownloadError', () => { + function fetchError(status: number, headers?: Record) { + return new FetchError('x', new Response(null, { status, headers })) + } + + test('treats 5xx and 429 as transient', () => { + expect(isTransientDownloadError(fetchError(500))).toBe(true) + expect(isTransientDownloadError(fetchError(503))).toBe(true) + expect(isTransientDownloadError(fetchError(429))).toBe(true) + }) + + test('treats non-429 4xx as permanent', () => { + expect(isTransientDownloadError(fetchError(400))).toBe(false) + expect(isTransientDownloadError(fetchError(404))).toBe(false) + }) + + test('treats timeouts and network TypeErrors as transient', () => { + const timeout = new Error('timed out') + timeout.name = 'TimeoutError' + expect(isTransientDownloadError(timeout)).toBe(true) + expect(isTransientDownloadError(new TypeError('fetch failed'))).toBe(true) + }) + + test('treats an incomplete download as transient', () => { + expect(isTransientDownloadError(new IncompleteDownloadError('got 3 bytes, expected 5'))).toBe(true) + }) + + test('treats other plain errors as permanent', () => { + expect(isTransientDownloadError(new Error('Unsafe release filename from peer'))).toBe(false) + }) +}) + +describe('downloadRetryAfterMs', () => { + test('parses a seconds Retry-After header', () => { + const err = new FetchError('x', new Response(null, { status: 429, headers: { 'Retry-After': '2' } })) + expect(downloadRetryAfterMs(err)).toBe(2000) + }) + + test('returns null without a header or for non-FetchErrors', () => { + expect(downloadRetryAfterMs(new FetchError('x', new Response(null, { status: 429 })))).toBeNull() + expect(downloadRetryAfterMs(new Error('x'))).toBeNull() + }) +}) diff --git a/apps/backend/src/__tests__/semaphore.test.ts b/apps/backend/src/__tests__/semaphore.test.ts new file mode 100644 index 0000000..8c7c461 --- /dev/null +++ b/apps/backend/src/__tests__/semaphore.test.ts @@ -0,0 +1,45 @@ +import { describe, expect, test } from 'bun:test' +import { Semaphore } from '../lib/semaphore' + +describe('Semaphore', () => { + test('limits concurrency to the permit count', async () => { + const sem = new Semaphore(2) + let active = 0 + let maxActive = 0 + const task = () => sem.run(async () => { + active++ + maxActive = Math.max(maxActive, active) + await Bun.sleep(10) + active-- + }) + await Promise.all(Array.from({ length: 6 }, task)) + expect(maxActive).toBe(2) + }) + + test('hands permits to waiters in FIFO order', async () => { + const sem = new Semaphore(1) + const order: number[] = [] + await sem.acquire() + const waiters = [1, 2, 3].map(n => sem.acquire().then(() => { + order.push(n) + })) + sem.release() + sem.release() + sem.release() + await Promise.all(waiters) + expect(order).toEqual([1, 2, 3]) + }) + + test('releases the permit when the task throws', async () => { + const sem = new Semaphore(1) + await expect(sem.run(async () => { + throw new Error('boom') + })).rejects.toThrow('boom') + const ran = await sem.run(async () => 'next') + expect(ran).toBe('next') + }) + + test('rejects a non-positive permit count', () => { + expect(() => new Semaphore(0)).toThrow() + }) +}) diff --git a/apps/backend/src/lib/config.ts b/apps/backend/src/lib/config.ts index 6257d61..4129b38 100644 --- a/apps/backend/src/lib/config.ts +++ b/apps/backend/src/lib/config.ts @@ -133,6 +133,13 @@ export type JackConfig = z.infer export const DownloadsConfig = z.object({ watchPath: z.string().min(1), completedPath: z.string().min(1), + // Max peer file downloads running at once (an async semaphore guards the + // expensive download step). Defaults keep existing configs working. + maxConcurrentDownloads: z.number().int().min(1).default(3), + // Bounded retries for transient failures, with exponential backoff + jitter. + maxDownloadAttempts: z.number().int().min(1).default(5), + retryBaseDelayMs: z.number().int().min(0).default(1000), + retryMaxDelayMs: z.number().int().min(0).default(60_000), }) export type DownloadsConfig = z.infer diff --git a/apps/backend/src/lib/retry.ts b/apps/backend/src/lib/retry.ts new file mode 100644 index 0000000..2259fde --- /dev/null +++ b/apps/backend/src/lib/retry.ts @@ -0,0 +1,43 @@ +export interface RetryOptions { + /** Total attempts, including the first. */ + maxAttempts: number + baseDelayMs: number + maxDelayMs: number + /** Whether a thrown error should be retried. */ + isRetryable: (error: unknown) => boolean + /** Optional explicit delay (e.g. from a Retry-After header), capped at maxDelayMs. */ + retryAfterMs?: (error: unknown) => number | null + onRetry?: (info: { attempt: number, delayMs: number, error: unknown }) => void + /** Injectable for tests (defaults to setTimeout). */ + sleep?: (ms: number) => Promise + /** Injectable for tests (defaults to Math.random). */ + random?: () => number +} + +const defaultSleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms)) + +export async function retry(fn: (attempt: number) => Promise, options: RetryOptions): Promise { + const { maxAttempts, baseDelayMs, maxDelayMs, isRetryable, retryAfterMs, onRetry } = options + const sleep = options.sleep ?? defaultSleep + const random = options.random ?? Math.random + + let lastError: unknown + for (let attempt = 1; attempt <= maxAttempts; attempt++) { + try { + return await fn(attempt) + } + catch (error) { + lastError = error + if (attempt >= maxAttempts || !isRetryable(error)) + throw error + + const explicit = retryAfterMs?.(error) ?? null + // Full jitter (AWS): pick uniformly in [0, exponential backoff]. + const backoff = Math.min(maxDelayMs, baseDelayMs * 2 ** (attempt - 1)) + const delayMs = explicit != null ? Math.min(explicit, maxDelayMs) : random() * backoff + onRetry?.({ attempt, delayMs, error }) + await sleep(delayMs) + } + } + throw lastError +} diff --git a/apps/backend/src/lib/semaphore.ts b/apps/backend/src/lib/semaphore.ts new file mode 100644 index 0000000..7f41f4c --- /dev/null +++ b/apps/backend/src/lib/semaphore.ts @@ -0,0 +1,39 @@ +/** A counting semaphore with FIFO fairness for limiting concurrent async work. */ +export class Semaphore { + private available: number + private readonly waiters: Array<() => void> = [] + + constructor(permits: number) { + if (!Number.isInteger(permits) || permits < 1) + throw new Error(`Semaphore permits must be a positive integer, got ${permits}`) + this.available = permits + } + + async acquire(): Promise { + if (this.available > 0) { + this.available-- + return + } + await new Promise(resolve => this.waiters.push(resolve)) + } + + release(): void { + const next = this.waiters.shift() + if (next) { + // Hand the permit straight to the next waiter (keeps `available` at 0). + next() + return + } + this.available++ + } + + async run(fn: () => Promise): Promise { + await this.acquire() + try { + return await fn() + } + finally { + this.release() + } + } +} diff --git a/apps/backend/src/modules/downloads/retry-policy.ts b/apps/backend/src/modules/downloads/retry-policy.ts new file mode 100644 index 0000000..b159597 --- /dev/null +++ b/apps/backend/src/modules/downloads/retry-policy.ts @@ -0,0 +1,45 @@ +import { FetchError } from '../../lib/errors/FetchError' +import { IncompleteDownloadError } from '../../lib/errors/IncompleteDownloadError' + +/** + * Transient = worth retrying: network failures, timeouts, HTTP 5xx, 429, and a + * truncated stream (the .part is preserved, so a retry resumes). Non-429 4xx and + * any other error (unsafe filename, file-too-large) are permanent (not retried). + */ +export function isTransientDownloadError(error: unknown): boolean { + if (error instanceof IncompleteDownloadError) + return true + if (error instanceof FetchError) { + const status = error.response?.status ?? error.extras.status + if (status == null) + return true + if (status === 429) + return true + return status >= 500 && status <= 599 + } + if (error instanceof Error) { + // AbortSignal.timeout rejects with a TimeoutError; manual aborts with AbortError. + if (error.name === 'TimeoutError' || error.name === 'AbortError') + return true + // A failed fetch (DNS, connection refused, reset) rejects with a TypeError. + if (error instanceof TypeError) + return true + } + return false +} + +/** Milliseconds to wait per a 429 `Retry-After` header (seconds or HTTP-date), or null. */ +export function downloadRetryAfterMs(error: unknown): number | null { + if (!(error instanceof FetchError)) + return null + const header = error.response?.headers?.get('Retry-After') + if (!header) + return null + const seconds = Number(header) + if (Number.isFinite(seconds)) + return Math.max(0, seconds * 1000) + const dateMs = Date.parse(header) + if (!Number.isNaN(dateMs)) + return Math.max(0, dateMs - Date.now()) + return null +} diff --git a/examples/config.jsonc b/examples/config.jsonc index f78bb97..cf832c1 100644 --- a/examples/config.jsonc +++ b/examples/config.jsonc @@ -25,7 +25,14 @@ // and moves finished downloads. Paths are *inside the container*. "downloads": { "watchPath": "/data/torrents/watch", - "completedPath": "/data/torrents/completed" + "completedPath": "/data/torrents/completed", + // Optional download hardening knobs — values below are the defaults, so you + // can delete any line to keep the default. (Note the comma after + // "completedPath" above is required once any field follows it.) + "maxConcurrentDownloads": 3, + "maxDownloadAttempts": 5, + "retryBaseDelayMs": 1000, + "retryMaxDelayMs": 60000 }, // Your Radarr/Sonarr servers. apiKey is the *arr API key: exactly 32 hex From 7c8844e1829a1d839999c3d582bac8abe92ad3d1 Mon Sep 17 00:00:00 2001 From: Roz <3948961+roziscoding@users.noreply.github.com> Date: Sat, 6 Jun 2026 11:48:55 +0200 Subject: [PATCH 2/3] feat: track download attempts and expose stale rows for re-drive (#13 4/5) (#19) * feat: track download attempts and expose stale rows for re-drive Add an attempts column to the downloads table (additive migration) and repository methods: incrementAttempts, markResumeReset (reset downloadedBytes and record the resume-from-zero transition), and listStaleDownloads (returns stale downloading rows without mutating them, for active startup re-enqueue). reconcileStaleDownloads is kept as the fallback for when downloads is unconfigured. Refs #13. * feat: bound, retry, and resume peer downloads end-to-end (#13 5/5) (#20) * feat: bound, retry, and resume peer downloads end-to-end Rewire DownloadsService around a shared Semaphore (maxConcurrentDownloads), a retry loop (bounded backoff+jitter, attempts tracked, transient vs permanent classification, Retry-After honored), and resume: the restart progress event persists via markResumeReset, and an active/reenqueued dedupe prevents duplicate rows. On startup, index.ts re-drives stale downloading rows with resumeStaleDownloads() before the watcher scans, falling back to reconcileStaleDownloads() when downloads is unconfigured. Closes #13. * fix: harden startup re-enqueue dedupe (review feedback) - Dedupe stale downloading rows by destPath before re-driving: only one row per destination is resumable (they share the same .part), so mark the superseded duplicates failed instead of letting the second silently early-return in runDownload and stay stuck in downloading. - Release the reenqueued claim on successful resume (stub already unlinked, so no scan race) so a later legitimate re-drop of the same torrent filename is not silently skipped for the rest of the process. Refs #13. --- .../drizzle/0001_tearful_the_fallen.sql | 1 + apps/backend/drizzle/meta/0001_snapshot.json | 195 ++++++++++++++ apps/backend/drizzle/meta/_journal.json | 7 + apps/backend/src/__tests__/database.test.ts | 61 +++++ .../src/__tests__/downloads-service.test.ts | 245 ++++++++++++++++-- apps/backend/src/database/schema.ts | 1 + apps/backend/src/index.ts | 18 +- .../modules/downloads/downloads.repository.ts | 25 +- .../modules/downloads/downloads.service.ts | 214 +++++++++++---- 9 files changed, 690 insertions(+), 77 deletions(-) create mode 100644 apps/backend/drizzle/0001_tearful_the_fallen.sql create mode 100644 apps/backend/drizzle/meta/0001_snapshot.json diff --git a/apps/backend/drizzle/0001_tearful_the_fallen.sql b/apps/backend/drizzle/0001_tearful_the_fallen.sql new file mode 100644 index 0000000..f59c3a9 --- /dev/null +++ b/apps/backend/drizzle/0001_tearful_the_fallen.sql @@ -0,0 +1 @@ +ALTER TABLE `downloads` ADD `attempts` integer DEFAULT 0 NOT NULL; \ No newline at end of file diff --git a/apps/backend/drizzle/meta/0001_snapshot.json b/apps/backend/drizzle/meta/0001_snapshot.json new file mode 100644 index 0000000..892f474 --- /dev/null +++ b/apps/backend/drizzle/meta/0001_snapshot.json @@ -0,0 +1,195 @@ +{ + "version": "6", + "dialect": "sqlite", + "id": "0e170273-951e-4b46-b8be-8d77d6188596", + "prevId": "7511a5bb-12b1-4836-8099-498542b3d20d", + "tables": { + "downloads": { + "name": "downloads", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "autoincrement": true + }, + "torrent_filename": { + "name": "torrent_filename", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "peer_id": { + "name": "peer_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "peer_name": { + "name": "peer_name", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "item_id": { + "name": "item_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "filename": { + "name": "filename", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "dest_path": { + "name": "dest_path", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "part_path": { + "name": "part_path", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "release_size": { + "name": "release_size", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "release_json": { + "name": "release_json", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "expected_bytes": { + "name": "expected_bytes", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "expected_bytes_source": { + "name": "expected_bytes_source", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "expected_bytes_mismatch": { + "name": "expected_bytes_mismatch", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": false + }, + "downloaded_bytes": { + "name": "downloaded_bytes", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": 0 + }, + "attempts": { + "name": "attempts", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": 0 + }, + "status": { + "name": "status", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "started_at": { + "name": "started_at", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "updated_at": { + "name": "updated_at", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "completed_at": { + "name": "completed_at", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "error": { + "name": "error", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + } + }, + "indexes": { + "downloads_status_idx": { + "name": "downloads_status_idx", + "columns": [ + "status" + ], + "isUnique": false + }, + "downloads_updated_at_idx": { + "name": "downloads_updated_at_idx", + "columns": [ + "updated_at" + ], + "isUnique": false + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": { + "downloads_status_check": { + "name": "downloads_status_check", + "value": "\"downloads\".\"status\" in ('downloading', 'completed', 'failed', 'import_queued')" + }, + "downloads_expected_bytes_source_check": { + "name": "downloads_expected_bytes_source_check", + "value": "\"downloads\".\"expected_bytes_source\" is null or \"downloads\".\"expected_bytes_source\" = 'content_length'" + } + } + } + }, + "views": {}, + "enums": {}, + "_meta": { + "schemas": {}, + "tables": {}, + "columns": {} + }, + "internal": { + "indexes": {} + } +} diff --git a/apps/backend/drizzle/meta/_journal.json b/apps/backend/drizzle/meta/_journal.json index 35efc42..643afd8 100644 --- a/apps/backend/drizzle/meta/_journal.json +++ b/apps/backend/drizzle/meta/_journal.json @@ -8,6 +8,13 @@ "when": 1780589033291, "tag": "0000_mysterious_vin_gonzales", "breakpoints": true + }, + { + "idx": 1, + "version": "6", + "when": 1780707145431, + "tag": "0001_tearful_the_fallen", + "breakpoints": true } ] } diff --git a/apps/backend/src/__tests__/database.test.ts b/apps/backend/src/__tests__/database.test.ts index 82c618a..31058e3 100644 --- a/apps/backend/src/__tests__/database.test.ts +++ b/apps/backend/src/__tests__/database.test.ts @@ -151,4 +151,65 @@ describe('DownloadsRepository', () => { expect(stale.error).toContain('stale') handle.close() }) + + test('increments attempts and records a resume reset', async () => { + const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) + const repository = new DownloadsRepository(handle.db) + const created = repository.create({ + torrentFilename: 'movie.torrent', + peerId: 'peer-1', + peerName: 'Friend Jack', + itemId: 'movie:1', + filename: release.filename, + destPath: join(tempDir, release.filename), + partPath: join(tempDir, `${release.filename}.part`), + releaseSize: release.size, + release, + }) + + expect(repository.incrementAttempts(created.id)).toBe(1) + expect(repository.incrementAttempts(created.id)).toBe(2) + repository.updateProgress(created.id, 40) + repository.markResumeReset(created.id) + + const row = repository.get(created.id)! + expect(row.attempts).toBe(2) + expect(row.downloadedBytes).toBe(0) + expect(row.status).toBe('downloading') + expect(row.error).toContain('resume validation failed') + handle.close() + }) + + test('lists stale downloading rows without mutating them', async () => { + const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) + const repository = new DownloadsRepository(handle.db) + const a = repository.create({ + torrentFilename: 'a.torrent', + peerId: 'peer-1', + peerName: 'Friend Jack', + itemId: 'movie:1', + filename: 'A.mkv', + destPath: join(tempDir, 'A.mkv'), + partPath: join(tempDir, 'A.mkv.part'), + releaseSize: 10, + release, + }) + const b = repository.create({ + torrentFilename: 'b.torrent', + peerId: 'peer-1', + peerName: 'Friend Jack', + itemId: 'movie:2', + filename: 'B.mkv', + destPath: join(tempDir, 'B.mkv'), + partPath: join(tempDir, 'B.mkv.part'), + releaseSize: 10, + release, + }) + repository.markCompleted(b.id, 10) + + const stale = repository.listStaleDownloads() + expect(stale.map(r => r.id)).toEqual([a.id]) + expect(repository.get(a.id)?.status).toBe('downloading') + handle.close() + }) }) diff --git a/apps/backend/src/__tests__/downloads-service.test.ts b/apps/backend/src/__tests__/downloads-service.test.ts index ff12025..9e333ee 100644 --- a/apps/backend/src/__tests__/downloads-service.test.ts +++ b/apps/backend/src/__tests__/downloads-service.test.ts @@ -2,8 +2,9 @@ import type { Release } from '../lib/release' import { mkdtemp, rm, writeFile } from 'node:fs/promises' import { tmpdir } from 'node:os' import { join } from 'node:path' -import { afterEach, beforeEach, describe, expect, test } from 'bun:test' +import { afterEach, beforeEach, describe, expect, spyOn, test } from 'bun:test' import { openDatabase } from '../database/connection' +import { FetchError } from '../lib/errors/FetchError' import { DownloadsRepository } from '../modules/downloads/downloads.repository' import { DownloadsService } from '../modules/downloads/downloads.service' import { createTorrentStub } from '../modules/torznab/torrent' @@ -31,6 +32,18 @@ afterEach(async () => { await rm(tempDir, { recursive: true, force: true }) }) +function downloadsConfig(overrides: Partial> = {}) { + return { + watchPath, + completedPath, + maxConcurrentDownloads: 2, + maxDownloadAttempts: 3, + retryBaseDelayMs: 0, + retryMaxDelayMs: 0, + ...overrides, + } +} + function fakePeer(overrides: Partial> = {}) { return { id: 'peer-1', @@ -49,9 +62,9 @@ function fakeDestination() { return { isInitialized: true, canDestination: true, name: 'Radarr', triggerImport: async () => {} } } -async function writeTorrent(filename = 'movie.torrent') { +async function writeTorrent(filename = 'movie.torrent', itemId = 'movie:1') { const filePath = join(watchPath, filename) - await writeFile(filePath, createTorrentStub({ name: release.title, size: release.size, peerId: 'peer-1', itemId: 'movie:1' })) + await writeFile(filePath, createTorrentStub({ name: release.title, size: release.size, peerId: 'peer-1', itemId })) return filePath } @@ -68,12 +81,11 @@ describe('DownloadsService download progress persistence', () => { await options.onProgress({ type: 'completed', downloadedBytes: 10, expectedBytes: 10 }) }, }) - const service = new DownloadsService({ completedPath }, [peer as any], [fakeDestination() as any], repository) + const service = new DownloadsService(downloadsConfig(), [peer as any], [fakeDestination() as any], repository) const filePath = await writeTorrent() await service.processTorrentFile(filePath, 'movie.torrent') - // The service forwards dest/part/size/torrentFilename into downloadFile. expect(calls).toHaveLength(1) expect(calls[0].destPath).toBe(join(completedPath, release.filename)) expect(calls[0].options.partPath).toBe(`${join(completedPath, release.filename)}.part`) @@ -82,14 +94,9 @@ describe('DownloadsService download progress persistence', () => { const downloads = repository.list() expect(downloads).toHaveLength(1) - expect(downloads[0]?.torrentFilename).toBe('movie.torrent') - expect(downloads[0]?.filename).toBe(release.filename) - expect(downloads[0]?.destPath).toBe(join(completedPath, release.filename)) - expect(downloads[0]?.partPath).toBe(`${join(completedPath, release.filename)}.part`) - expect(downloads[0]?.releaseSize).toBe(10) - expect(downloads[0]?.expectedBytes).toBe(10) - expect(downloads[0]?.downloadedBytes).toBe(10) expect(downloads[0]?.status).toBe('import_queued') + expect(downloads[0]?.downloadedBytes).toBe(10) + expect(downloads[0]?.attempts).toBe(1) handle.close() }) @@ -99,7 +106,7 @@ describe('DownloadsService download progress persistence', () => { const peer = fakePeer({ getRelease: async () => { throw new Error('metadata failed') } }) - const service = new DownloadsService({ completedPath }, [peer as any], [], repository) + const service = new DownloadsService(downloadsConfig(), [peer as any], [], repository) const filePath = await writeTorrent() await service.processTorrentFile(filePath, 'movie.torrent') @@ -108,7 +115,7 @@ describe('DownloadsService download progress persistence', () => { handle.close() }) - test('rejects a peer release with a path-traversal filename and does not write outside completedPath', async () => { + test('rejects a peer release with a path-traversal filename', async () => { const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) const repository = new DownloadsRepository(handle.db) const writtenPaths: string[] = [] @@ -118,36 +125,224 @@ describe('DownloadsService download progress persistence', () => { writtenPaths.push(destPath) }, }) - const service = new DownloadsService({ completedPath }, [peer as any], [fakeDestination() as any], repository) + const service = new DownloadsService(downloadsConfig(), [peer as any], [fakeDestination() as any], repository) const filePath = await writeTorrent() await service.processTorrentFile(filePath, 'movie.torrent') - // The unsafe name must never reach downloadFile / be written to disk. expect(writtenPaths).toHaveLength(0) - const evilOutside = join(tempDir, 'evil.mkv') - expect(await Bun.file(evilOutside).exists()).toBe(false) - expect(await Bun.file(`${evilOutside}.part`).exists()).toBe(false) - expect(repository.list()).toHaveLength(0) handle.close() }) - test('marks an existing row failed when download fails after metadata resolves', async () => { + test('marks an existing row failed when a permanent download error occurs', async () => { const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) const repository = new DownloadsRepository(handle.db) + let calls = 0 const peer = fakePeer({ downloadFile: async () => { - throw new Error('download failed') + calls++ + throw new FetchError('not found', new Response(null, { status: 404 })) } }) - const service = new DownloadsService({ completedPath }, [peer as any], [], repository) + const service = new DownloadsService(downloadsConfig(), [peer as any], [], repository) const filePath = await writeTorrent() await service.processTorrentFile(filePath, 'movie.torrent') + expect(calls).toBe(1) // 404 is permanent — no retry const downloads = repository.list() - expect(downloads).toHaveLength(1) expect(downloads[0]?.status).toBe('failed') - expect(downloads[0]?.error).toContain('download failed') + handle.close() + }) + + test('retries a transient failure then succeeds', async () => { + const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) + const repository = new DownloadsRepository(handle.db) + let calls = 0 + const peer = fakePeer({ + downloadFile: async (_itemId: string, _destPath: string, options: any) => { + calls++ + if (calls === 1) + throw new FetchError('busy', new Response(null, { status: 503 })) + await options.onProgress({ type: 'completed', downloadedBytes: 10, expectedBytes: 10 }) + }, + }) + const service = new DownloadsService(downloadsConfig(), [peer as any], [fakeDestination() as any], repository) + const filePath = await writeTorrent() + + await service.processTorrentFile(filePath, 'movie.torrent') + + expect(calls).toBe(2) + const downloads = repository.list() + expect(downloads[0]?.status).toBe('import_queued') + expect(downloads[0]?.attempts).toBe(2) + handle.close() + }) + + test('persists a resume reset from a restart event', async () => { + const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) + const repository = new DownloadsRepository(handle.db) + // Spy, because a realistic download ends with a `completed` event whose + // markCompleted() clears the error markResumeReset() set — so asserting the + // final row state cannot prove the reset ran. + const resetSpy = spyOn(repository, 'markResumeReset') + const peer = fakePeer({ + downloadFile: async (_itemId: string, _destPath: string, options: any) => { + await options.onProgress({ type: 'headers', expectedBytes: 10, expectedBytesSource: 'content_length', expectedBytesMismatch: false }) + await options.onProgress({ type: 'restart', reason: 'range_ignored', discardedBytes: 4 }) + await options.onProgress({ type: 'completed', downloadedBytes: 10, expectedBytes: 10 }) + }, + }) + const service = new DownloadsService(downloadsConfig(), [peer as any], [fakeDestination() as any], repository) + const filePath = await writeTorrent() + + await service.processTorrentFile(filePath, 'movie.torrent') + + expect(resetSpy).toHaveBeenCalledTimes(1) + expect(repository.list()[0]?.status).toBe('import_queued') + resetSpy.mockRestore() + handle.close() + }) + + test('limits concurrent downloads to maxConcurrentDownloads', async () => { + const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) + const repository = new DownloadsRepository(handle.db) + let active = 0 + let maxActive = 0 + const peer = { + id: 'peer-1', + name: 'Friend Jack', + url: 'http://peer.test', + // Distinct filename per item so each maps to a distinct destPath. + getRelease: async (itemId: string) => ({ ...release, id: `remote:${itemId}`, filename: `${itemId.replace(':', '_')}.mkv` }), + downloadFile: async (_itemId: string, _destPath: string, options: any) => { + active++ + maxActive = Math.max(maxActive, active) + await Bun.sleep(20) + active-- + await options.onProgress({ type: 'completed', downloadedBytes: 10, expectedBytes: 10 }) + }, + } + const service = new DownloadsService(downloadsConfig({ maxConcurrentDownloads: 1 }), [peer as any], [fakeDestination() as any], repository) + const a = await writeTorrent('a.torrent', 'movie:1') + const b = await writeTorrent('b.torrent', 'movie:2') + + await Promise.all([ + service.processTorrentFile(a, 'a.torrent'), + service.processTorrentFile(b, 'b.torrent'), + ]) + + expect(maxActive).toBe(1) + expect(repository.list().filter(d => d.status === 'import_queued')).toHaveLength(2) + handle.close() + }) + + test('resumeStaleDownloads re-drives a stale downloading row to import_queued', async () => { + const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) + const repository = new DownloadsRepository(handle.db) + const calls: string[] = [] + const peer = fakePeer({ + downloadFile: async (_itemId: string, destPath: string, options: any) => { + calls.push(destPath) + await options.onProgress({ type: 'completed', downloadedBytes: 10, expectedBytes: 10 }) + }, + }) + // Seed a stale `downloading` row, as if Jack crashed mid-download. + repository.create({ + torrentFilename: 'movie.torrent', + peerId: 'peer-1', + peerName: 'Friend Jack', + itemId: 'movie:1', + filename: release.filename, + destPath: join(completedPath, release.filename), + partPath: `${join(completedPath, release.filename)}.part`, + releaseSize: release.size, + release, + }) + const service = new DownloadsService(downloadsConfig(), [peer as any], [fakeDestination() as any], repository) + + const resumed = await service.resumeStaleDownloads() + // resumeStaleDownloads fires in the background; wait for the row to settle. + for (let i = 0; i < 50 && repository.list()[0]?.status !== 'import_queued'; i++) + await Bun.sleep(10) + + expect(resumed).toBe(1) + expect(calls).toEqual([join(completedPath, release.filename)]) + expect(repository.list()[0]?.status).toBe('import_queued') + handle.close() + }) + + test('marks superseded duplicate stale rows (same destPath) failed and re-drives only one', async () => { + const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) + const repository = new DownloadsRepository(handle.db) + const calls: string[] = [] + const peer = fakePeer({ + downloadFile: async (_itemId: string, destPath: string, options: any) => { + calls.push(destPath) + await options.onProgress({ type: 'completed', downloadedBytes: 10, expectedBytes: 10 }) + }, + }) + const destPath = join(completedPath, release.filename) + const base = { + peerId: 'peer-1', + peerName: 'Friend Jack', + itemId: 'movie:1', + filename: release.filename, + destPath, + partPath: `${destPath}.part`, + releaseSize: release.size, + release, + } + repository.create({ ...base, torrentFilename: 'first.torrent' }) + repository.create({ ...base, torrentFilename: 'second.torrent' }) + const service = new DownloadsService(downloadsConfig(), [peer as any], [fakeDestination() as any], repository) + + const resumed = await service.resumeStaleDownloads() + for (let i = 0; i < 50 && !repository.list().some(d => d.status === 'import_queued'); i++) + await Bun.sleep(10) + + expect(resumed).toBe(1) + expect(calls).toEqual([destPath]) // only one of the two same-destPath rows is re-driven + const rows = repository.list() + expect(rows.filter(d => d.status === 'import_queued')).toHaveLength(1) + expect(rows.find(d => d.status === 'failed')?.error).toContain('superseded') + handle.close() + }) + + test('releases the re-enqueue claim after a successful resume so the filename can be processed again', async () => { + const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) + const repository = new DownloadsRepository(handle.db) + const calls: string[] = [] + const peer = fakePeer({ + downloadFile: async (itemId: string, _destPath: string, options: any) => { + calls.push(itemId) + await options.onProgress({ type: 'completed', downloadedBytes: 10, expectedBytes: 10 }) + }, + }) + repository.create({ + torrentFilename: 'movie.torrent', + peerId: 'peer-1', + peerName: 'Friend Jack', + itemId: 'movie:1', + filename: release.filename, + destPath: join(completedPath, release.filename), + partPath: `${join(completedPath, release.filename)}.part`, + releaseSize: release.size, + release, + }) + const service = new DownloadsService(downloadsConfig(), [peer as any], [fakeDestination() as any], repository) + + await service.resumeStaleDownloads() + for (let i = 0; i < 50 && repository.list()[0]?.status !== 'import_queued'; i++) + await Bun.sleep(10) + expect(calls).toHaveLength(1) + + // A later legitimate re-drop of the same torrent filename must NOT be skipped + // by a stale re-enqueue claim once the resume has completed. + const filePath = await writeTorrent('movie.torrent') + await service.processTorrentFile(filePath, 'movie.torrent') + + expect(calls).toHaveLength(2) + expect(repository.list().filter(d => d.status === 'import_queued')).toHaveLength(2) handle.close() }) }) diff --git a/apps/backend/src/database/schema.ts b/apps/backend/src/database/schema.ts index 3168888..a957bbb 100644 --- a/apps/backend/src/database/schema.ts +++ b/apps/backend/src/database/schema.ts @@ -20,6 +20,7 @@ export const downloads = sqliteTable('downloads', { expectedBytesSource: text('expected_bytes_source').$type(), expectedBytesMismatch: integer('expected_bytes_mismatch', { mode: 'boolean' }).notNull().default(false), downloadedBytes: integer('downloaded_bytes').notNull().default(0), + attempts: integer('attempts').notNull().default(0), status: text('status').$type().notNull(), startedAt: text('started_at').notNull(), updatedAt: text('updated_at').notNull(), diff --git a/apps/backend/src/index.ts b/apps/backend/src/index.ts index db0c1e5..dc15b1a 100644 --- a/apps/backend/src/index.ts +++ b/apps/backend/src/index.ts @@ -31,10 +31,6 @@ const destinations = connectors.servers.filter(s => s.canDestination) const database = await openDatabase({ appConfigPath: envs.APP_CONFIG_PATH }) const downloadsRepository = new DownloadsRepository(database.db) -const reconciledDownloads = await downloadsRepository.reconcileStaleDownloads() - -if (reconciledDownloads > 0) - logger.warn({ downloads: reconciledDownloads, databasePath: database.path }, 'Reconciled stale downloads from previous Jack run') const app = getApp(envs, config, connectors, { downloadsRepository }) const server = Bun.serve({ @@ -100,13 +96,25 @@ if (config.jack) { } } -// Start blackhole watcher +// Start blackhole watcher (and re-drive interrupted downloads from a prior run) let blackholeWatcher: BlackholeWatcher | null = null if (config.downloads) { const downloadsService = new DownloadsService(config.downloads, connectors.peers, destinations, downloadsRepository) + // Active re-enqueue: resume stale `downloading` rows in place before the + // watcher scans, so the leftover .torrent stubs are not re-processed as new rows. + const resumed = await downloadsService.resumeStaleDownloads() + if (resumed > 0) + logger.warn({ downloads: resumed, databasePath: database.path }, 'Re-enqueued interrupted downloads from previous Jack run') + blackholeWatcher = new BlackholeWatcher(config.downloads, downloadsService) await blackholeWatcher.start() } +else { + // No downloads config means stale rows cannot be resumed — mark them failed. + const failed = await downloadsRepository.reconcileStaleDownloads() + if (failed > 0) + logger.warn({ downloads: failed, databasePath: database.path }, 'Marked stale downloads failed (no downloads config to resume them)') +} process.on('SIGINT', async () => { logger.info('SIGINT received, exiting') diff --git a/apps/backend/src/modules/downloads/downloads.repository.ts b/apps/backend/src/modules/downloads/downloads.repository.ts index 2e066af..67eaf9a 100644 --- a/apps/backend/src/modules/downloads/downloads.repository.ts +++ b/apps/backend/src/modules/downloads/downloads.repository.ts @@ -1,7 +1,7 @@ import type { AppDatabase } from '../../database/connection' import type { DownloadRow, DownloadStatus, ExpectedBytesSource, NewDownloadRow } from '../../database/schema' import type { Release } from '../../lib/release' -import { desc, eq } from 'drizzle-orm' +import { desc, eq, sql } from 'drizzle-orm' import { downloads } from '../../database/schema' export interface DownloadRecord { @@ -19,6 +19,7 @@ export interface DownloadRecord { expectedBytesSource: ExpectedBytesSource | null expectedBytesMismatch: boolean downloadedBytes: number + attempts: number status: DownloadStatus startedAt: string updatedAt: string @@ -58,6 +59,7 @@ function toRecord(row: DownloadRow): DownloadRecord { expectedBytesSource: row.expectedBytesSource ?? null, expectedBytesMismatch: row.expectedBytesMismatch, downloadedBytes: row.downloadedBytes, + attempts: row.attempts, status: row.status, startedAt: row.startedAt, updatedAt: row.updatedAt, @@ -138,6 +140,27 @@ export class DownloadsRepository { .run() } + incrementAttempts(id: number): number { + const row = this.db.update(downloads) + .set({ attempts: sql`${downloads.attempts} + 1`, updatedAt: nowIso() }) + .where(eq(downloads.id, id)) + .returning() + .get() + return row?.attempts ?? 0 + } + + markResumeReset(id: number): void { + this.db.update(downloads) + .set({ downloadedBytes: 0, error: 'resume validation failed; restarted from byte 0', updatedAt: nowIso() }) + .where(eq(downloads.id, id)) + .run() + } + + /** Stale `downloading` rows from a prior run, returned for active re-drive (no mutation). */ + listStaleDownloads(): DownloadRecord[] { + return this.db.select().from(downloads).where(eq(downloads.status, 'downloading')).all().map(toRecord) + } + async reconcileStaleDownloads(): Promise { const staleRows = this.db.select().from(downloads).where(eq(downloads.status, 'downloading')).all() diff --git a/apps/backend/src/modules/downloads/downloads.service.ts b/apps/backend/src/modules/downloads/downloads.service.ts index 1a0932a..39fdfa6 100644 --- a/apps/backend/src/modules/downloads/downloads.service.ts +++ b/apps/backend/src/modules/downloads/downloads.service.ts @@ -1,25 +1,50 @@ import type { AppConfig } from '../../lib/config' import type { ArrServerConnector } from '../../lib/servers/arr/base' import type { PeerConnector, PeerDownloadProgressEvent } from '../../lib/servers/peer' -import type { DownloadsRepository } from './downloads.repository' +import type { DownloadRecord, DownloadsRepository } from './downloads.repository' import { Buffer } from 'node:buffer' import { unlink } from 'node:fs/promises' import { basename, join } from 'node:path' +import { retry } from '../../lib/retry' +import { Semaphore } from '../../lib/semaphore' import { withSpan } from '../../lib/tracing' import { logger } from '../../logger' import { parseTorrentStub } from '../torznab/torrent' +import { downloadRetryAfterMs, isTransientDownloadError } from './retry-policy' + +type DownloadsServiceConfig = NonNullable export class DownloadsService { + private readonly semaphore: Semaphore + // Dest paths with a download in flight — guards two concurrent live drops that + // resolve to the same destination (no duplicate rows / writers). + private readonly active = new Set() + // Torrent filenames owned by the startup re-enqueue. Their leftover stubs are + // skipped by the watcher's initial scan for the rest of the run, so a re-drive + // that fails fast cannot be re-processed into a duplicate row. + private readonly reenqueued = new Set() + constructor( - private readonly config: Pick, 'completedPath'>, + private readonly config: DownloadsServiceConfig, private readonly peers: PeerConnector[], private readonly destinations: ArrServerConnector[], private readonly downloadsRepository?: DownloadsRepository, - ) {} + ) { + this.semaphore = new Semaphore(config.maxConcurrentDownloads) + } async processTorrentFile(filePath: string, filename: string) { try { await withSpan('blackhole.process_torrent', { 'torrent.filename': filename }, async (span) => { + // The startup re-enqueue owns this stub — it is being (or will be) + // re-driven from the persisted row. Skip it so we never create a + // duplicate row, even if that re-drive already failed and cleared `active`. + if (this.reenqueued.has(filename)) { + span.setAttribute('torrent.reenqueued', true) + logger.debug({ torrentFilename: filename }, 'Stub owned by startup re-enqueue; skipping watcher processing') + return + } + const file = Bun.file(filePath) if (!await file.exists()) { span.setAttribute('torrent.exists', false) @@ -54,8 +79,6 @@ export class DownloadsService { // `release.filename` is peer-controlled and only validated as a string. // Force it to a plain basename inside `completedPath` so a value like // `../../evil.mkv` or an absolute path cannot escape the directory. - // Reject (rather than silently rewrite) anything that is not already a - // plain filename, so a malicious peer cannot smuggle in path separators. const safeName = basename(release.filename) const isSafeName = safeName.length > 0 && safeName !== '.' && safeName !== '..' && !safeName.includes('/') && !safeName.includes('\\') @@ -68,7 +91,12 @@ export class DownloadsService { const partPath = `${destPath}.part` span.setAttributes({ 'release.filename': safeName, 'release.size': release.size }) - const download = this.downloadsRepository?.create({ + if (this.active.has(destPath)) { + logger.debug({ torrentFilename: filename, destPath }, 'A download for this destination is already active; skipping duplicate') + return + } + + const created = this.downloadsRepository?.create({ torrentFilename: filename, peerId, peerName: peer.name ?? peer.url, @@ -80,56 +108,150 @@ export class DownloadsService { release, }) - const onProgress = async (event: PeerDownloadProgressEvent) => { - if (!download) - return + const record: DownloadRecord = created ?? { + id: -1, + torrentFilename: filename, + peerId, + peerName: peer.name ?? peer.url, + itemId, + filename: safeName, + destPath, + partPath, + releaseSize: release.size, + release, + expectedBytes: null, + expectedBytesSource: null, + expectedBytesMismatch: false, + downloadedBytes: 0, + attempts: 0, + status: 'downloading', + startedAt: '', + updatedAt: '', + completedAt: null, + error: null, + } - if (event.type === 'headers') { - this.downloadsRepository?.setExpectedBytes(download.id, event.expectedBytes, event.expectedBytesSource, event.expectedBytesMismatch) - return - } + await this.runDownload(record) + }) + } + catch (err) { + const message = err instanceof Error ? err.message : String(err) + logger.error({ torrentFilename: filename, filename, error: message }, 'Failed to process torrent') + } + } - if (event.type === 'progress') { - this.downloadsRepository?.updateProgress(download.id, event.downloadedBytes) - return - } + /** Re-drive stale `downloading` rows from a prior run, resuming from their .part files. */ + async resumeStaleDownloads(): Promise { + const repo = this.downloadsRepository + if (!repo) + return 0 + // Dedupe by destPath: a prior run could leave more than one stale row for the + // same destination (they share the same .part), but only one can be resumed. + // Re-driving two would make the second silently early-return in runDownload + // and stay stuck in `downloading`, so mark the superseded ones failed instead. + const seen = new Set() + const resumable: DownloadRecord[] = [] + for (const record of repo.listStaleDownloads()) { + if (seen.has(record.destPath)) { + repo.markFailed(record.id, 'superseded by another resumable download for the same destination') + continue + } + seen.add(record.destPath) + resumable.push(record) + } + // Claim every resumable stub up-front (synchronously, before the watcher + // starts) so the initial scan skips them regardless of re-drive timing/outcome. + for (const record of resumable) + this.reenqueued.add(record.torrentFilename) + for (const record of resumable) { + // Fire-and-forget: the semaphore caps concurrency, and the stub is already + // claimed in `reenqueued` so the watcher won't duplicate it. + void this.runDownload(record).catch((err) => { + const message = err instanceof Error ? err.message : String(err) + logger.error({ torrentFilename: record.torrentFilename, error: message }, 'Failed to resume stale download') + }) + } + if (resumable.length > 0) + logger.info({ downloads: resumable.length }, 'Re-enqueued interrupted downloads') + return resumable.length + } - if (event.type === 'restart') { - // Resume validation failed; the download restarts from byte 0. - // Persisting this is handled in a later phase. - return - } + private async runDownload(record: DownloadRecord): Promise { + if (this.active.has(record.destPath)) + return + this.active.add(record.destPath) + try { + await this.semaphore.run(() => this.downloadWithRetry(record)) + } + finally { + this.active.delete(record.destPath) + } + } - this.downloadsRepository?.markCompleted(download.id, event.downloadedBytes) - } + private async downloadWithRetry(record: DownloadRecord): Promise { + const repo = this.downloadsRepository + const peer = this.peers.find(p => p.id === record.peerId) + if (!peer) { + repo?.markFailed(record.id, `Peer ${record.peerId} not found`) + logger.error({ torrentFilename: record.torrentFilename, peerId: record.peerId }, 'Cannot run download: peer not found') + return + } - // Everything after the row is created is wrapped so any failure - // (download, stub unlink, or import trigger) marks the row failed - // instead of leaving it stuck in `completed`/`downloading`. - // `import_queued` means: the file downloaded AND triggerImport was - // attempted (best-effort per destination — see triggerImport below). - try { - await peer.downloadFile(itemId, destPath, { torrentFilename: filename, partPath, releaseSize: release.size, onProgress }) - await unlink(filePath) - await this.triggerImport(filename) - - if (download) - this.downloadsRepository?.markImportQueued(download.id) - } - catch (err) { - if (download) { - const message = err instanceof Error ? err.message : String(err) - this.downloadsRepository?.markFailed(download.id, message) - } - throw err - } + const onProgress = async (event: PeerDownloadProgressEvent) => { + if (event.type === 'headers') { + repo?.setExpectedBytes(record.id, event.expectedBytes, event.expectedBytesSource, event.expectedBytesMismatch) + return + } + if (event.type === 'progress') { + repo?.updateProgress(record.id, event.downloadedBytes) + return + } + if (event.type === 'restart') { + repo?.markResumeReset(record.id) + return + } + repo?.markCompleted(record.id, event.downloadedBytes) + } + + const stubPath = join(this.config.watchPath, record.torrentFilename) - logger.info({ torrentFilename: filename, filename: safeName }, 'Download complete, triggered import') + try { + await retry(async () => { + repo?.incrementAttempts(record.id) + await peer.downloadFile(record.itemId, record.destPath, { + torrentFilename: record.torrentFilename, + partPath: record.partPath, + releaseSize: record.releaseSize, + onProgress, + }) + }, { + maxAttempts: this.config.maxDownloadAttempts, + baseDelayMs: this.config.retryBaseDelayMs, + maxDelayMs: this.config.retryMaxDelayMs, + isRetryable: isTransientDownloadError, + retryAfterMs: downloadRetryAfterMs, + onRetry: ({ attempt, delayMs, error }) => { + const message = error instanceof Error ? error.message : String(error) + logger.warn({ torrentFilename: record.torrentFilename, attempt, delayMs, error: message }, 'Retrying peer download after transient failure') + }, }) + + await unlink(stubPath).catch(() => {}) + await this.triggerImport(record.torrentFilename) + repo?.markImportQueued(record.id) + // Release the startup-re-enqueue claim now that the stub is gone, so a + // later legitimate re-drop of the same filename isn't silently skipped. + // Only on success: a failed re-drive keeps its stub, so it stays claimed + // (and is re-driven on the next restart) to avoid in-session hammering. + this.reenqueued.delete(record.torrentFilename) + logger.info({ torrentFilename: record.torrentFilename, filename: record.filename }, 'Download complete, triggered import') } catch (err) { const message = err instanceof Error ? err.message : String(err) - logger.error({ torrentFilename: filename, filename, error: message }, 'Failed to process torrent') + // The .part file is preserved by downloadFile on failure, so a later + // restart re-enqueue can resume from it. + repo?.markFailed(record.id, message) + logger.error({ torrentFilename: record.torrentFilename, filename: record.filename, error: message }, 'Download failed') } } From 3498c1d93c3648265035a0779b4161af9220574c Mon Sep 17 00:00:00 2001 From: Roz Date: Sat, 6 Jun 2026 11:59:41 +0200 Subject: [PATCH 3/3] fix: address retry review feedback --- apps/backend/src/__tests__/retry.test.ts | 49 ++++++++++++++++++- apps/backend/src/lib/retry.ts | 8 +-- .../src/modules/downloads/retry-policy.ts | 4 +- 3 files changed, 54 insertions(+), 7 deletions(-) diff --git a/apps/backend/src/__tests__/retry.test.ts b/apps/backend/src/__tests__/retry.test.ts index ca528c3..ceb4c7f 100644 --- a/apps/backend/src/__tests__/retry.test.ts +++ b/apps/backend/src/__tests__/retry.test.ts @@ -1,4 +1,4 @@ -import { describe, expect, test } from 'bun:test' +import { describe, expect, spyOn, test } from 'bun:test' import { FetchError } from '../lib/errors/FetchError' import { IncompleteDownloadError } from '../lib/errors/IncompleteDownloadError' import { retry } from '../lib/retry' @@ -87,6 +87,21 @@ describe('retry', () => { }) expect(delays).toEqual([2000]) }) + + test('rejects invalid maxAttempts without calling the operation', async () => { + let calls = 0 + await expect(retry(async () => { + calls++ + return 'ok' + }, { + maxAttempts: 0, + baseDelayMs: 1, + maxDelayMs: 10, + isRetryable: () => true, + sleep: noSleep, + })).rejects.toThrow('maxAttempts must be at least 1') + expect(calls).toBe(0) + }) }) describe('isTransientDownloadError', () => { @@ -112,6 +127,12 @@ describe('isTransientDownloadError', () => { expect(isTransientDownloadError(new TypeError('fetch failed'))).toBe(true) }) + test('treats manual aborts as permanent', () => { + const abort = new Error('shutting down') + abort.name = 'AbortError' + expect(isTransientDownloadError(abort)).toBe(false) + }) + test('treats an incomplete download as transient', () => { expect(isTransientDownloadError(new IncompleteDownloadError('got 3 bytes, expected 5'))).toBe(true) }) @@ -127,6 +148,32 @@ describe('downloadRetryAfterMs', () => { expect(downloadRetryAfterMs(err)).toBe(2000) }) + test('parses an HTTP-date Retry-After header', () => { + const now = Date.UTC(2098, 11, 31, 23, 59, 58) + const nowSpy = spyOn(Date, 'now').mockReturnValue(now) + const err = new FetchError('x', new Response(null, { status: 429, headers: { 'Retry-After': 'Thu, 01 Jan 2099 00:00:00 GMT' } })) + + try { + expect(downloadRetryAfterMs(err)).toBe(2000) + } + finally { + nowSpy.mockRestore() + } + }) + + test('floors past HTTP-date Retry-After headers at zero', () => { + const now = Date.UTC(2099, 0, 1, 0, 0, 0) + const nowSpy = spyOn(Date, 'now').mockReturnValue(now) + const err = new FetchError('x', new Response(null, { status: 429, headers: { 'Retry-After': 'Wed, 31 Dec 2098 23:59:59 GMT' } })) + + try { + expect(downloadRetryAfterMs(err)).toBe(0) + } + finally { + nowSpy.mockRestore() + } + }) + test('returns null without a header or for non-FetchErrors', () => { expect(downloadRetryAfterMs(new FetchError('x', new Response(null, { status: 429 })))).toBeNull() expect(downloadRetryAfterMs(new Error('x'))).toBeNull() diff --git a/apps/backend/src/lib/retry.ts b/apps/backend/src/lib/retry.ts index 2259fde..59c01f3 100644 --- a/apps/backend/src/lib/retry.ts +++ b/apps/backend/src/lib/retry.ts @@ -21,13 +21,14 @@ export async function retry(fn: (attempt: number) => Promise, options: Ret const sleep = options.sleep ?? defaultSleep const random = options.random ?? Math.random - let lastError: unknown - for (let attempt = 1; attempt <= maxAttempts; attempt++) { + if (!Number.isInteger(maxAttempts) || maxAttempts < 1) + throw new RangeError('maxAttempts must be at least 1') + + for (let attempt = 1; ; attempt++) { try { return await fn(attempt) } catch (error) { - lastError = error if (attempt >= maxAttempts || !isRetryable(error)) throw error @@ -39,5 +40,4 @@ export async function retry(fn: (attempt: number) => Promise, options: Ret await sleep(delayMs) } } - throw lastError } diff --git a/apps/backend/src/modules/downloads/retry-policy.ts b/apps/backend/src/modules/downloads/retry-policy.ts index b159597..71c786e 100644 --- a/apps/backend/src/modules/downloads/retry-policy.ts +++ b/apps/backend/src/modules/downloads/retry-policy.ts @@ -18,8 +18,8 @@ export function isTransientDownloadError(error: unknown): boolean { return status >= 500 && status <= 599 } if (error instanceof Error) { - // AbortSignal.timeout rejects with a TimeoutError; manual aborts with AbortError. - if (error.name === 'TimeoutError' || error.name === 'AbortError') + // AbortSignal.timeout rejects with a TimeoutError; manual aborts are permanent. + if (error.name === 'TimeoutError') return true // A failed fetch (DNS, connection refused, reset) rejects with a TypeError. if (error instanceof TypeError)