diff --git a/apps/backend/drizzle/0003_high_makkari.sql b/apps/backend/drizzle/0003_high_makkari.sql new file mode 100644 index 0000000..aa1c825 --- /dev/null +++ b/apps/backend/drizzle/0003_high_makkari.sql @@ -0,0 +1,34 @@ +PRAGMA foreign_keys=OFF;--> statement-breakpoint +CREATE TABLE `__new_downloads` ( + `id` integer PRIMARY KEY AUTOINCREMENT NOT NULL, + `torrent_filename` text NOT NULL, + `peer_id` text NOT NULL, + `peer_name` text NOT NULL, + `item_id` text NOT NULL, + `filename` text NOT NULL, + `dest_path` text NOT NULL, + `part_path` text NOT NULL, + `release_size` integer NOT NULL, + `release_json` text NOT NULL, + `expected_bytes` integer, + `expected_bytes_source` text, + `expected_bytes_mismatch` integer DEFAULT false NOT NULL, + `downloaded_bytes` integer DEFAULT 0 NOT NULL, + `attempts` integer DEFAULT 0 NOT NULL, + `status` text NOT NULL, + `started_at` text NOT NULL, + `updated_at` text NOT NULL, + `completed_at` text, + `error` text, + `qb_category` text, + `qb_source_server` text, + CONSTRAINT "downloads_status_check" CHECK("__new_downloads"."status" in ('downloading', 'completed', 'failed', 'import_queued')), + CONSTRAINT "downloads_expected_bytes_source_check" CHECK("__new_downloads"."expected_bytes_source" is null or "__new_downloads"."expected_bytes_source" in ('content_length', 'release_size')) +); +--> statement-breakpoint +INSERT INTO `__new_downloads`("id", "torrent_filename", "peer_id", "peer_name", "item_id", "filename", "dest_path", "part_path", "release_size", "release_json", "expected_bytes", "expected_bytes_source", "expected_bytes_mismatch", "downloaded_bytes", "attempts", "status", "started_at", "updated_at", "completed_at", "error", "qb_category", "qb_source_server") SELECT "id", "torrent_filename", "peer_id", "peer_name", "item_id", "filename", "dest_path", "part_path", "release_size", "release_json", "expected_bytes", "expected_bytes_source", "expected_bytes_mismatch", "downloaded_bytes", "attempts", "status", "started_at", "updated_at", "completed_at", "error", "qb_category", "qb_source_server" FROM `downloads`;--> statement-breakpoint +DROP TABLE `downloads`;--> statement-breakpoint +ALTER TABLE `__new_downloads` RENAME TO `downloads`;--> statement-breakpoint +PRAGMA foreign_keys=ON;--> statement-breakpoint +CREATE INDEX `downloads_status_idx` ON `downloads` (`status`);--> statement-breakpoint +CREATE INDEX `downloads_updated_at_idx` ON `downloads` (`updated_at`); \ No newline at end of file diff --git a/apps/backend/drizzle/0004_sleepy_retro_girl.sql b/apps/backend/drizzle/0004_sleepy_retro_girl.sql new file mode 100644 index 0000000..753730c --- /dev/null +++ b/apps/backend/drizzle/0004_sleepy_retro_girl.sql @@ -0,0 +1,34 @@ +PRAGMA foreign_keys=OFF;--> statement-breakpoint +CREATE TABLE `__new_downloads` ( + `id` integer PRIMARY KEY AUTOINCREMENT NOT NULL, + `torrent_filename` text NOT NULL, + `peer_id` text NOT NULL, + `peer_name` text NOT NULL, + `item_id` text NOT NULL, + `filename` text NOT NULL, + `dest_path` text NOT NULL, + `part_path` text NOT NULL, + `release_size` integer NOT NULL, + `release_json` text NOT NULL, + `expected_bytes` integer, + `expected_bytes_source` text, + `expected_bytes_mismatch` integer DEFAULT false NOT NULL, + `downloaded_bytes` integer DEFAULT 0 NOT NULL, + `attempts` integer DEFAULT 0 NOT NULL, + `status` text NOT NULL, + `started_at` text NOT NULL, + `updated_at` text NOT NULL, + `completed_at` text, + `error` text, + `qb_category` text, + `qb_source_server` text, + CONSTRAINT "downloads_status_check" CHECK("__new_downloads"."status" in ('downloading', 'completed', 'failed', 'import_queued')), + CONSTRAINT "downloads_expected_bytes_source_check" CHECK("__new_downloads"."expected_bytes_source" is null or "__new_downloads"."expected_bytes_source" in ('content_length', 'content_range', 'release_size')) +); +--> statement-breakpoint +INSERT INTO `__new_downloads`("id", "torrent_filename", "peer_id", "peer_name", "item_id", "filename", "dest_path", "part_path", "release_size", "release_json", "expected_bytes", "expected_bytes_source", "expected_bytes_mismatch", "downloaded_bytes", "attempts", "status", "started_at", "updated_at", "completed_at", "error", "qb_category", "qb_source_server") SELECT "id", "torrent_filename", "peer_id", "peer_name", "item_id", "filename", "dest_path", "part_path", "release_size", "release_json", "expected_bytes", "expected_bytes_source", "expected_bytes_mismatch", "downloaded_bytes", "attempts", "status", "started_at", "updated_at", "completed_at", "error", "qb_category", "qb_source_server" FROM `downloads`;--> statement-breakpoint +DROP TABLE `downloads`;--> statement-breakpoint +ALTER TABLE `__new_downloads` RENAME TO `downloads`;--> statement-breakpoint +PRAGMA foreign_keys=ON;--> statement-breakpoint +CREATE INDEX `downloads_status_idx` ON `downloads` (`status`);--> statement-breakpoint +CREATE INDEX `downloads_updated_at_idx` ON `downloads` (`updated_at`); \ No newline at end of file diff --git a/apps/backend/drizzle/meta/0003_snapshot.json b/apps/backend/drizzle/meta/0003_snapshot.json new file mode 100644 index 0000000..976684c --- /dev/null +++ b/apps/backend/drizzle/meta/0003_snapshot.json @@ -0,0 +1,209 @@ +{ + "version": "6", + "dialect": "sqlite", + "id": "6c9640c3-d5d1-4bd2-82e3-bc067ea7f95f", + "prevId": "5fc3b7b0-f29c-475e-937f-e87648e8bf40", + "tables": { + "downloads": { + "name": "downloads", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "autoincrement": true + }, + "torrent_filename": { + "name": "torrent_filename", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "peer_id": { + "name": "peer_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "peer_name": { + "name": "peer_name", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "item_id": { + "name": "item_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "filename": { + "name": "filename", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "dest_path": { + "name": "dest_path", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "part_path": { + "name": "part_path", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "release_size": { + "name": "release_size", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "release_json": { + "name": "release_json", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "expected_bytes": { + "name": "expected_bytes", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "expected_bytes_source": { + "name": "expected_bytes_source", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "expected_bytes_mismatch": { + "name": "expected_bytes_mismatch", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": false + }, + "downloaded_bytes": { + "name": "downloaded_bytes", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": 0 + }, + "attempts": { + "name": "attempts", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": 0 + }, + "status": { + "name": "status", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "started_at": { + "name": "started_at", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "updated_at": { + "name": "updated_at", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "completed_at": { + "name": "completed_at", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "error": { + "name": "error", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "qb_category": { + "name": "qb_category", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "qb_source_server": { + "name": "qb_source_server", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + } + }, + "indexes": { + "downloads_status_idx": { + "name": "downloads_status_idx", + "columns": [ + "status" + ], + "isUnique": false + }, + "downloads_updated_at_idx": { + "name": "downloads_updated_at_idx", + "columns": [ + "updated_at" + ], + "isUnique": false + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": { + "downloads_status_check": { + "name": "downloads_status_check", + "value": "\"downloads\".\"status\" in ('downloading', 'completed', 'failed', 'import_queued')" + }, + "downloads_expected_bytes_source_check": { + "name": "downloads_expected_bytes_source_check", + "value": "\"downloads\".\"expected_bytes_source\" is null or \"downloads\".\"expected_bytes_source\" in ('content_length', 'release_size')" + } + } + } + }, + "views": {}, + "enums": {}, + "_meta": { + "schemas": {}, + "tables": {}, + "columns": {} + }, + "internal": { + "indexes": {} + } +} diff --git a/apps/backend/drizzle/meta/0004_snapshot.json b/apps/backend/drizzle/meta/0004_snapshot.json new file mode 100644 index 0000000..ba893ba --- /dev/null +++ b/apps/backend/drizzle/meta/0004_snapshot.json @@ -0,0 +1,209 @@ +{ + "version": "6", + "dialect": "sqlite", + "id": "3692fbf7-4334-4a78-a0a0-4ad366a37cec", + "prevId": "6c9640c3-d5d1-4bd2-82e3-bc067ea7f95f", + "tables": { + "downloads": { + "name": "downloads", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "autoincrement": true + }, + "torrent_filename": { + "name": "torrent_filename", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "peer_id": { + "name": "peer_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "peer_name": { + "name": "peer_name", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "item_id": { + "name": "item_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "filename": { + "name": "filename", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "dest_path": { + "name": "dest_path", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "part_path": { + "name": "part_path", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "release_size": { + "name": "release_size", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "release_json": { + "name": "release_json", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "expected_bytes": { + "name": "expected_bytes", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "expected_bytes_source": { + "name": "expected_bytes_source", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "expected_bytes_mismatch": { + "name": "expected_bytes_mismatch", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": false + }, + "downloaded_bytes": { + "name": "downloaded_bytes", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": 0 + }, + "attempts": { + "name": "attempts", + "type": "integer", + "primaryKey": false, + "notNull": true, + "autoincrement": false, + "default": 0 + }, + "status": { + "name": "status", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "started_at": { + "name": "started_at", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "updated_at": { + "name": "updated_at", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "completed_at": { + "name": "completed_at", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "error": { + "name": "error", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "qb_category": { + "name": "qb_category", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "qb_source_server": { + "name": "qb_source_server", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + } + }, + "indexes": { + "downloads_status_idx": { + "name": "downloads_status_idx", + "columns": [ + "status" + ], + "isUnique": false + }, + "downloads_updated_at_idx": { + "name": "downloads_updated_at_idx", + "columns": [ + "updated_at" + ], + "isUnique": false + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": { + "downloads_status_check": { + "name": "downloads_status_check", + "value": "\"downloads\".\"status\" in ('downloading', 'completed', 'failed', 'import_queued')" + }, + "downloads_expected_bytes_source_check": { + "name": "downloads_expected_bytes_source_check", + "value": "\"downloads\".\"expected_bytes_source\" is null or \"downloads\".\"expected_bytes_source\" in ('content_length', 'content_range', 'release_size')" + } + } + } + }, + "views": {}, + "enums": {}, + "_meta": { + "schemas": {}, + "tables": {}, + "columns": {} + }, + "internal": { + "indexes": {} + } +} diff --git a/apps/backend/drizzle/meta/_journal.json b/apps/backend/drizzle/meta/_journal.json index 3908574..6c836c2 100644 --- a/apps/backend/drizzle/meta/_journal.json +++ b/apps/backend/drizzle/meta/_journal.json @@ -22,6 +22,20 @@ "when": 1780763364775, "tag": "0002_romantic_stryfe", "breakpoints": true + }, + { + "idx": 3, + "version": "6", + "when": 1780791754764, + "tag": "0003_high_makkari", + "breakpoints": true + }, + { + "idx": 4, + "version": "6", + "when": 1780819908039, + "tag": "0004_sleepy_retro_girl", + "breakpoints": true } ] } diff --git a/apps/backend/src/__tests__/config.test.ts b/apps/backend/src/__tests__/config.test.ts index d22d023..6db093c 100644 --- a/apps/backend/src/__tests__/config.test.ts +++ b/apps/backend/src/__tests__/config.test.ts @@ -287,6 +287,7 @@ describe('appConfig parsing', () => { maxDownloadAttempts: 5, retryBaseDelayMs: 1000, retryMaxDelayMs: 60_000, + idleTimeoutMs: 60_000, }) }) diff --git a/apps/backend/src/__tests__/database.test.ts b/apps/backend/src/__tests__/database.test.ts index f7d067e..5c8c8b5 100644 --- a/apps/backend/src/__tests__/database.test.ts +++ b/apps/backend/src/__tests__/database.test.ts @@ -125,6 +125,29 @@ describe('DownloadsRepository', () => { handle.close() }) + test('persists release_size as an expected-bytes source (CHECK accepts it)', async () => { + const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) + const repository = new DownloadsRepository(handle.db) + const created = repository.create({ + torrentFilename: 'movie.torrent', + peerId: 'peer-1', + peerName: 'Friend Jack', + itemId: 'movie:1', + filename: release.filename, + destPath: join(tempDir, release.filename), + partPath: join(tempDir, `${release.filename}.part`), + releaseSize: release.size, + release, + }) + + repository.setExpectedBytes(created.id, 123, 'release_size', false) + + const row = repository.get(created.id)! + expect(row.expectedBytes).toBe(123) + expect(row.expectedBytesSource).toBe('release_size') + handle.close() + }) + test('reconciles stale downloading rows using .part file size', async () => { const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') }) const repository = new DownloadsRepository(handle.db) diff --git a/apps/backend/src/__tests__/peer-download.test.ts b/apps/backend/src/__tests__/peer-download.test.ts index 487e1b8..baaad12 100644 --- a/apps/backend/src/__tests__/peer-download.test.ts +++ b/apps/backend/src/__tests__/peer-download.test.ts @@ -5,6 +5,8 @@ import { join } from 'node:path' import { afterAll, afterEach, beforeAll, describe, expect, spyOn, test } from 'bun:test' import { http, HttpResponse } from 'msw' import { setupServer } from 'msw/node' +import { IdleTimeoutError } from '../lib/errors/IdleTimeoutError' +import { UnknownSizeError } from '../lib/errors/UnknownSizeError' import { PeerConnector } from '../lib/servers/peer' const PEER_JACK_URL = 'http://download-peer.test:3000' @@ -134,7 +136,7 @@ describe('PeerConnector.downloadFile', () => { } }) - test('reports indeterminate expected bytes when Content-Length is missing or invalid', async () => { + test('falls back to releaseSize for expected bytes when Content-Length is missing or invalid', async () => { for (const contentLength of [null, 'not-a-number']) { server.resetHandlers() server.use( @@ -145,16 +147,17 @@ describe('PeerConnector.downloadFile', () => { ) const peer = markInitialized(new PeerConnector({ url: PEER_JACK_URL, apiKey: 'peer-api-key', name: 'Friend Jack' })) - const dir = await mkdtemp(join(tmpdir(), 'jack-peer-indeterminate-')) + const dir = await mkdtemp(join(tmpdir(), 'jack-peer-fallback-')) const events: unknown[] = [] try { await peer.downloadFile('remote1:movie:99', join(dir, 'Movie.mkv'), { + releaseSize: 2, onProgress: (event) => { events.push(event) }, }) - expect(events).toContainEqual({ type: 'headers', expectedBytes: null, expectedBytesSource: null, expectedBytesMismatch: false }) - expect(events).toContainEqual({ type: 'completed', downloadedBytes: 2, expectedBytes: null }) + expect(events).toContainEqual({ type: 'headers', expectedBytes: 2, expectedBytesSource: 'release_size', expectedBytesMismatch: false }) + expect(events).toContainEqual({ type: 'completed', downloadedBytes: 2, expectedBytes: 2 }) } finally { await rm(dir, { recursive: true, force: true }) @@ -162,6 +165,29 @@ describe('PeerConnector.downloadFile', () => { } }) + test('fails fast with UnknownSizeError when neither Content-Length nor releaseSize is known', async () => { + server.use( + http.get(`${PEER_JACK_URL}/peer/items/:itemId/file`, () => { + return new Response(streamOf([1, 2]), { headers: {} }) + }), + ) + + const peer = markInitialized(new PeerConnector({ url: PEER_JACK_URL, apiKey: 'peer-api-key', name: 'Friend Jack' })) + const dir = await mkdtemp(join(tmpdir(), 'jack-peer-nosize-')) + const destPath = join(dir, 'Movie.mkv') + const partPath = `${destPath}.part` + + try { + await expect(peer.downloadFile('remote1:movie:99', destPath, { partPath })).rejects.toThrow(UnknownSizeError) + // Fail-fast: nothing is written before the size is known. + expect(await Bun.file(destPath).exists()).toBe(false) + expect(await Bun.file(partPath).exists()).toBe(false) + } + finally { + await rm(dir, { recursive: true, force: true }) + } + }) + test('reports Content-Length mismatches against releaseSize', async () => { server.use( http.get(`${PEER_JACK_URL}/peer/items/:itemId/file`, () => { @@ -264,6 +290,69 @@ describe('PeerConnector.downloadFile', () => { await rm(dir, { recursive: true, force: true }) } }) + + test('aborts with IdleTimeoutError when the peer stops sending bytes', async () => { + // Use a fetch spy so the body stream reliably errors when the connector's + // idle abort fires (MSW's mock doesn't propagate the fetch signal mid-body). + // Real fetch rejects an in-flight read on abort, which this mirrors. + const fetchSpy = spyOn(globalThis, 'fetch').mockImplementation(async (_url, init?: RequestInit) => { + const signal = init?.signal + const body = new ReadableStream({ + start(controller) { + controller.enqueue(new Uint8Array([1, 2])) + // No further chunks → the transfer stalls. Error the stream when the + // connector aborts (its idle timer), like real fetch would. + signal?.addEventListener('abort', () => controller.error(new DOMException('aborted', 'AbortError'))) + }, + }) + return new Response(body, { headers: { 'Content-Length': '5' } }) + }) + + const peer = markInitialized(new PeerConnector({ url: PEER_JACK_URL, apiKey: 'peer-api-key', name: 'Friend Jack' })) + const dir = await mkdtemp(join(tmpdir(), 'jack-peer-stall-')) + const destPath = join(dir, 'Movie.mkv') + const partPath = `${destPath}.part` + + try { + await expect(peer.downloadFile('remote1:movie:99', destPath, { partPath, releaseSize: 5, idleTimeoutMs: 50 })) + .rejects + .toThrow(IdleTimeoutError) + expect(await Bun.file(destPath).exists()).toBe(false) + expect(await Bun.file(partPath).exists()).toBe(true) // preserved for resume + } + finally { + fetchSpy.mockRestore() + await rm(dir, { recursive: true, force: true }) + } + }) + + test('does not abort a slow but active download (chunks within the idle window)', async () => { + server.use( + http.get(`${PEER_JACK_URL}/peer/items/:itemId/file`, () => { + return new Response(new ReadableStream({ + async start(controller) { + for (const b of [1, 2, 3, 4]) { + await Bun.sleep(20) + controller.enqueue(new Uint8Array([b])) + } + controller.close() + }, + }), { headers: { 'Content-Length': '4' } }) + }), + ) + + const peer = markInitialized(new PeerConnector({ url: PEER_JACK_URL, apiKey: 'peer-api-key', name: 'Friend Jack' })) + const dir = await mkdtemp(join(tmpdir(), 'jack-peer-slow-')) + const destPath = join(dir, 'Movie.mkv') + + try { + await peer.downloadFile('remote1:movie:99', destPath, { partPath: `${destPath}.part`, releaseSize: 4, idleTimeoutMs: 200 }) + expect(new Uint8Array(await Bun.file(destPath).arrayBuffer())).toEqual(new Uint8Array([1, 2, 3, 4])) + } + finally { + await rm(dir, { recursive: true, force: true }) + } + }) }) describe('PeerConnector.downloadFile resume', () => { @@ -295,6 +384,8 @@ describe('PeerConnector.downloadFile resume', () => { expect(seen.range).toBe('bytes=2-') expect(new Uint8Array(await Bun.file(destPath).arrayBuffer())).toEqual(new Uint8Array([0, 1, 2, 3, 4])) expect(events.some(e => e.type === 'restart')).toBe(false) + // On a 206 resume the size comes from Content-Range, not Content-Length. + expect(events).toContainEqual({ type: 'headers', expectedBytes: 5, expectedBytesSource: 'content_range', expectedBytesMismatch: false }) expect(events).toContainEqual({ type: 'completed', downloadedBytes: 5, expectedBytes: 5 }) } finally { @@ -360,7 +451,9 @@ describe('PeerConnector.downloadFile resume', () => { } }) - test('restarts when the peer returns 416 for the resume range', async () => { + test('restarts when the peer returns 416 for the resume range (releaseSize unknown)', async () => { + // releaseSize omitted so the pre-fetch oversize guard is skipped and the + // Range is actually sent — exercising the 416 restart path. server.use( http.get(`${PEER_JACK_URL}/peer/items/:itemId/file`, ({ request }) => { if (request.headers.get('Range')) @@ -378,7 +471,6 @@ describe('PeerConnector.downloadFile resume', () => { try { await peer.downloadFile('remote1:movie:99', destPath, { partPath, - releaseSize: 5, onProgress: (e) => { events.push(e) }, }) @@ -390,6 +482,71 @@ describe('PeerConnector.downloadFile resume', () => { } }) + test('discards and restarts when the .part is larger than releaseSize', async () => { + let rangeSent = false + server.use( + http.get(`${PEER_JACK_URL}/peer/items/:itemId/file`, ({ request }) => { + if (request.headers.get('Range')) + rangeSent = true + return new Response(streamOf([0, 1, 2, 3, 4]), { headers: { 'Content-Length': '5' } }) + }), + ) + const peer = markInitialized(new PeerConnector({ url: PEER_JACK_URL, apiKey: 'peer-api-key', name: 'Friend Jack' })) + const dir = await mkdtemp(join(tmpdir(), 'jack-resume-oversize-')) + const destPath = join(dir, 'Movie.mkv') + const partPath = `${destPath}.part` + await writeFile(partPath, new Uint8Array([0, 1, 2, 3, 4, 5, 6])) // 7 > releaseSize 5 + const events: PeerDownloadProgressEvent[] = [] + + try { + await peer.downloadFile('remote1:movie:99', destPath, { + partPath, + releaseSize: 5, + onProgress: (e) => { events.push(e) }, + }) + + expect(rangeSent).toBe(false) // discarded before requesting; fresh download + expect(events.some(e => e.type === 'restart' && e.reason === 'part_oversize')).toBe(true) + expect(new Uint8Array(await Bun.file(destPath).arrayBuffer())).toEqual(new Uint8Array([0, 1, 2, 3, 4])) + } + finally { + await rm(dir, { recursive: true, force: true }) + } + }) + + test('finalizes without re-downloading when the .part already equals releaseSize', async () => { + let fetched = false + server.use( + http.get(`${PEER_JACK_URL}/peer/items/:itemId/file`, () => { + fetched = true + return new Response(streamOf([0, 1, 2]), { headers: { 'Content-Length': '3' } }) + }), + ) + const peer = markInitialized(new PeerConnector({ url: PEER_JACK_URL, apiKey: 'peer-api-key', name: 'Friend Jack' })) + const dir = await mkdtemp(join(tmpdir(), 'jack-resume-exact-')) + const destPath = join(dir, 'Movie.mkv') + const partPath = `${destPath}.part` + await writeFile(partPath, new Uint8Array([7, 8, 9])) // 3 === releaseSize 3 + const events: PeerDownloadProgressEvent[] = [] + + try { + await peer.downloadFile('remote1:movie:99', destPath, { + partPath, + releaseSize: 3, + onProgress: (e) => { events.push(e) }, + }) + + expect(fetched).toBe(false) // no HTTP request at all + expect(await Bun.file(partPath).exists()).toBe(false) + expect(new Uint8Array(await Bun.file(destPath).arrayBuffer())).toEqual(new Uint8Array([7, 8, 9])) + expect(events).toContainEqual({ type: 'headers', expectedBytes: 3, expectedBytesSource: 'release_size', expectedBytesMismatch: false }) + expect(events).toContainEqual({ type: 'completed', downloadedBytes: 3, expectedBytes: 3 }) + } + finally { + await rm(dir, { recursive: true, force: true }) + } + }) + test('rejects non-ok resume responses without appending the response body', async () => { server.use( http.get(`${PEER_JACK_URL}/peer/items/:itemId/file`, () => diff --git a/apps/backend/src/__tests__/retry-policy.test.ts b/apps/backend/src/__tests__/retry-policy.test.ts new file mode 100644 index 0000000..949c2fb --- /dev/null +++ b/apps/backend/src/__tests__/retry-policy.test.ts @@ -0,0 +1,18 @@ +import { describe, expect, test } from 'bun:test' +import { IdleTimeoutError } from '../lib/errors/IdleTimeoutError' +import { IncompleteDownloadError } from '../lib/errors/IncompleteDownloadError' +import { isTransientDownloadError } from '../modules/downloads/retry-policy' + +describe('isTransientDownloadError', () => { + test('IdleTimeoutError is transient (retryable)', () => { + expect(isTransientDownloadError(new IdleTimeoutError('stalled'))).toBe(true) + }) + + test('IncompleteDownloadError is transient (retryable)', () => { + expect(isTransientDownloadError(new IncompleteDownloadError('short'))).toBe(true) + }) + + test('a plain AbortError (manual cancel, not idle) is not transient', () => { + expect(isTransientDownloadError(new DOMException('aborted', 'AbortError'))).toBe(false) + }) +}) diff --git a/apps/backend/src/database/schema.ts b/apps/backend/src/database/schema.ts index 2817cd1..62fd745 100644 --- a/apps/backend/src/database/schema.ts +++ b/apps/backend/src/database/schema.ts @@ -3,7 +3,7 @@ import { check, index, integer, sqliteTable, text } from 'drizzle-orm/sqlite-cor export const DOWNLOAD_STATUSES = ['downloading', 'completed', 'failed', 'import_queued'] as const export type DownloadStatus = typeof DOWNLOAD_STATUSES[number] -export type ExpectedBytesSource = 'content_length' +export type ExpectedBytesSource = 'content_length' | 'content_range' | 'release_size' export const downloads = sqliteTable('downloads', { id: integer('id').primaryKey({ autoIncrement: true }), @@ -33,7 +33,7 @@ export const downloads = sqliteTable('downloads', { qbSourceServer: text('qb_source_server'), }, t => [ check('downloads_status_check', sql`${t.status} in ('downloading', 'completed', 'failed', 'import_queued')`), - check('downloads_expected_bytes_source_check', sql`${t.expectedBytesSource} is null or ${t.expectedBytesSource} = 'content_length'`), + check('downloads_expected_bytes_source_check', sql`${t.expectedBytesSource} is null or ${t.expectedBytesSource} in ('content_length', 'content_range', 'release_size')`), index('downloads_status_idx').on(t.status), index('downloads_updated_at_idx').on(t.updatedAt), ]) diff --git a/apps/backend/src/lib/config.ts b/apps/backend/src/lib/config.ts index a496d21..2075614 100644 --- a/apps/backend/src/lib/config.ts +++ b/apps/backend/src/lib/config.ts @@ -139,6 +139,9 @@ export const DownloadsConfig = z.object({ maxDownloadAttempts: z.number().int().min(1).default(5), retryBaseDelayMs: z.number().int().min(0).default(1000), retryMaxDelayMs: z.number().int().min(0).default(60_000), + // Abort a peer download if no bytes arrive for this long (inactivity timeout). + // Resets on every received chunk; replaces the old whole-request deadline. + idleTimeoutMs: z.number().int().min(1000).default(60_000), }) export type DownloadsConfig = z.infer diff --git a/apps/backend/src/lib/errors/IdleTimeoutError.ts b/apps/backend/src/lib/errors/IdleTimeoutError.ts new file mode 100644 index 0000000..73dea5f --- /dev/null +++ b/apps/backend/src/lib/errors/IdleTimeoutError.ts @@ -0,0 +1,12 @@ +import { AppError } from './AppError' + +/** + * A peer download received no bytes for longer than the idle timeout and was + * aborted. The `.part` is preserved, so this is retryable: the next attempt + * resumes from where it stalled. + */ +export class IdleTimeoutError extends AppError { + constructor(message: string, cause?: unknown) { + super(message, 'IDLE_TIMEOUT', { cause }) + } +} diff --git a/apps/backend/src/lib/errors/UnknownSizeError.ts b/apps/backend/src/lib/errors/UnknownSizeError.ts new file mode 100644 index 0000000..a3ecb0e --- /dev/null +++ b/apps/backend/src/lib/errors/UnknownSizeError.ts @@ -0,0 +1,12 @@ +import { AppError } from './AppError' + +/** + * The download has no known expected size (no Content-Length / Content-Range and + * no releaseSize), so completeness can't be verified. Fail-fast and permanent — + * retrying won't make a size appear. + */ +export class UnknownSizeError extends AppError { + constructor(message: string, cause?: unknown) { + super(message, 'UNKNOWN_SIZE', { cause }) + } +} diff --git a/apps/backend/src/lib/servers/peer.ts b/apps/backend/src/lib/servers/peer.ts index fcb9e5d..901b036 100644 --- a/apps/backend/src/lib/servers/peer.ts +++ b/apps/backend/src/lib/servers/peer.ts @@ -4,7 +4,9 @@ import z from 'zod' import { logger } from '../../logger' import { requireInitialization } from '../decorators/require-initialization' import { FetchError } from '../errors/FetchError' +import { IdleTimeoutError } from '../errors/IdleTimeoutError' import { IncompleteDownloadError } from '../errors/IncompleteDownloadError' +import { UnknownSizeError } from '../errors/UnknownSizeError' import { normalizeImdbId, Release } from '../release' import { withSpan } from '../tracing' import { ServerConnector } from './base' @@ -16,13 +18,13 @@ const DOWNLOAD_PROGRESS_BYTES = 64 * 1024 * 1024 const CONTENT_RANGE_PATTERN = /^bytes (\d+)-(\d+)\/(\d+)$/ export type PeerDownloadProgressEvent - = | { type: 'headers', expectedBytes: number | null, expectedBytesSource: 'content_length' | null, expectedBytesMismatch: boolean } + = | { type: 'headers', expectedBytes: number | null, expectedBytesSource: 'content_length' | 'content_range' | 'release_size' | null, expectedBytesMismatch: boolean } | { type: 'progress', downloadedBytes: number, expectedBytes: number | null } - | { type: 'restart', reason: 'range_ignored' | 'content_range_mismatch' | 'range_not_satisfiable', discardedBytes: number } + | { type: 'restart', reason: 'range_ignored' | 'content_range_mismatch' | 'range_not_satisfiable' | 'part_oversize', discardedBytes: number } | { type: 'completed', downloadedBytes: number, expectedBytes: number | null } export interface PeerDownloadOptions { - timeoutMs?: number + idleTimeoutMs?: number torrentFilename?: string partPath?: string releaseSize?: number @@ -166,22 +168,56 @@ export class PeerConnector extends ServerConnector { 'item.id': id, 'torrent.filename': options.torrentFilename, }, async (span) => { - const timeoutMs = options.timeoutMs ?? 30 * 60 * 1000 + const idleTimeoutMs = options.idleTimeoutMs ?? 60_000 const torrentFilename = options.torrentFilename const url = new URL(`/peer/items/${encodeURIComponent(id)}/file`, this.url) const partPath = options.partPath ?? `${destPath}.part` const baseHeaders = { ...this.headers, 'X-Api-Key': this.apiKey } - span.setAttributes({ 'http.request.timeout_ms': timeoutMs, 'url.path': url.pathname }) + span.setAttributes({ 'http.request.idle_timeout_ms': idleTimeoutMs, 'url.path': url.pathname }) + + // Idle (inactivity) timeout, armed ONLY around network waits (fetch + each + // read) and cleared before local file/progress work, so slow disk I/O never + // trips it. The abort carries a sentinel reason so only it — not a later real + // error — is reclassified as a retryable IdleTimeoutError. + const controller = new AbortController() + const IDLE_ABORT_REASON = 'jack:idle-timeout' + let idleTimer: ReturnType | undefined + const clearIdle = () => { + if (idleTimer) { + clearTimeout(idleTimer) + idleTimer = undefined + } + } + const armIdle = () => { + clearIdle() + idleTimer = setTimeout(() => controller.abort(IDLE_ABORT_REASON), idleTimeoutMs) + idleTimer.unref?.() + } + const isIdleAbort = () => controller.signal.aborted && controller.signal.reason === IDLE_ABORT_REASON + const idleTimeout = () => new IdleTimeoutError(`Peer download stalled: no data received for ${idleTimeoutMs}ms`) const partFile = Bun.file(partPath) let existingBytes = await partFile.exists() ? partFile.size : 0 - const doFetch = (withRange: boolean) => fetch(url, { - headers: withRange ? { ...baseHeaders, Range: `bytes=${existingBytes}-` } : baseHeaders, - signal: AbortSignal.timeout(timeoutMs), - }) + const doFetch = async (withRange: boolean): Promise => { + armIdle() + try { + return await fetch(url, { + headers: withRange ? { ...baseHeaders, Range: `bytes=${existingBytes}-` } : baseHeaders, + signal: controller.signal, + }) + } + catch (err) { + if (isIdleAbort()) + throw idleTimeout() + throw err + } + finally { + clearIdle() + } + } - const emitRestart = async (reason: 'range_ignored' | 'content_range_mismatch' | 'range_not_satisfiable', discardedBytes: number) => { + const emitRestart = async (reason: 'range_ignored' | 'content_range_mismatch' | 'range_not_satisfiable' | 'part_oversize', discardedBytes: number) => { logger.warn({ id, torrentFilename, partPath, discardedBytes, reason, peer: this.name }, 'Resume validation failed; restarting download from byte 0') try { await options.onProgress?.({ type: 'restart', reason, discardedBytes }) @@ -203,6 +239,26 @@ export class PeerConnector extends ServerConnector { return doFetch(false) } + // Resume sanity vs the known release size (available before the request): + // a .part larger than the whole file is corrupt → discard; a .part already + // equal to the file is complete → finalize without re-downloading. + if (existingBytes > 0 && options.releaseSize != null) { + if (existingBytes > options.releaseSize) { + await unlink(partPath).catch(() => {}) + await emitRestart('part_oversize', existingBytes) + existingBytes = 0 + } + else if (existingBytes === options.releaseSize) { + await rename(partPath, destPath) + span.setAttribute('download.downloaded_bytes', existingBytes) + // Emit headers too so the service persists expectedBytes/source (the + // fast path otherwise skips the headers event). + await options.onProgress?.({ type: 'headers', expectedBytes: options.releaseSize, expectedBytesSource: 'release_size', expectedBytesMismatch: false }) + await options.onProgress?.({ type: 'completed', downloadedBytes: existingBytes, expectedBytes: options.releaseSize }) + return + } + } + let response = await doFetch(existingBytes > 0) span.setAttribute('http.response.status_code', response.status) @@ -251,32 +307,44 @@ export class PeerConnector extends ServerConnector { if (!response.body) throw new Error('Peer returned a file response without a body') - // Total file size: from Content-Range on a resume (206), else Content-Length. - const expectedBytes = resuming + // Expected total size: the transfer header (Content-Range total on resume, + // else Content-Length), falling back to the *arr release size. Fail-fast if + // none is known — we never import a file we can't size-check. + const transferSize = resuming ? parseContentRange(response.headers.get('Content-Range'))?.total ?? null : parseContentLength(response.headers) - const expectedBytesMismatch = expectedBytes != null && options.releaseSize != null && expectedBytes !== options.releaseSize - if (expectedBytes != null) - span.setAttribute('download.expected_bytes', expectedBytes) + const expectedBytes = transferSize ?? options.releaseSize ?? null + // Source = where the TRANSFER advertised the size: Content-Range on a resume + // (206), else Content-Length; 'release_size' means it came only from *arr + // metadata (the peer advertised no size). + const expectedBytesSource: 'content_length' | 'content_range' | 'release_size' | null + = transferSize != null ? (resuming ? 'content_range' : 'content_length') : (expectedBytes != null ? 'release_size' : null) + const expectedBytesMismatch = transferSize != null && options.releaseSize != null && transferSize !== options.releaseSize span.setAttributes({ 'download.resuming': resuming, 'download.resume_from_bytes': existingBytes, - 'download.expected_bytes_source': expectedBytes == null ? 'unknown' : 'content_length', + 'download.expected_bytes_source': expectedBytesSource ?? 'unknown', 'download.expected_bytes_mismatch': expectedBytesMismatch, }) + if (expectedBytes == null) { + void response.body.cancel().catch(() => {}) + throw new UnknownSizeError(`Cannot verify download for item ${id}: no Content-Length/Content-Range and no release size`) + } + span.setAttribute('download.expected_bytes', expectedBytes) + if (expectedBytesMismatch) { - logger.warn({ id, torrentFilename, releaseSize: options.releaseSize, expectedBytes, peer: this.name }, 'Peer file total size differs from release metadata size') + logger.warn({ id, torrentFilename, releaseSize: options.releaseSize, expectedBytes: transferSize, peer: this.name }, 'Peer file total size differs from release metadata size') } await options.onProgress?.({ type: 'headers', expectedBytes, - expectedBytesSource: expectedBytes == null ? null : 'content_length', + expectedBytesSource, expectedBytesMismatch, }) - if (expectedBytes != null && expectedBytes > MAX_DOWNLOAD_BYTES) + if (expectedBytes > MAX_DOWNLOAD_BYTES) throw new Error(`File too large: ${expectedBytes} bytes exceeds ${MAX_DOWNLOAD_BYTES} byte limit`) const handle = await open(partPath, resuming ? 'a' : 'w') @@ -302,7 +370,19 @@ export class PeerConnector extends ServerConnector { try { while (true) { - const { done, value } = await reader.read() + // Arm the idle timer only for the network read; clear it immediately + // after so disk writes / progress callbacks don't count as "idle". + armIdle() + let done: boolean + let value: Uint8Array | undefined + try { + const result = await reader.read() + done = result.done + value = result.value + } + finally { + clearIdle() + } if (done) break if (!value) @@ -327,10 +407,13 @@ export class PeerConnector extends ServerConnector { await handle.datasync().catch(() => {}) await closeHandle() - reader.releaseLock() - if (expectedBytes != null && downloadedBytes !== expectedBytes) + // Release the lock only after the completeness check passes, so the catch + // block's cancel+release runs against a still-locked reader on the + // IncompleteDownloadError path (which is the one that gets retried). + if (downloadedBytes !== expectedBytes) throw new IncompleteDownloadError(`Incomplete file download: got ${downloadedBytes} bytes, expected ${expectedBytes}`) + reader.releaseLock() await rename(partPath, destPath) span.setAttribute('download.downloaded_bytes', downloadedBytes) @@ -344,13 +427,15 @@ export class PeerConnector extends ServerConnector { } catch (err) { await closeHandle() + // Cancel the reader so the remote stream is torn down (not left to GC), + // then release. Leave the .part in place so the next attempt resumes. + await reader.cancel().catch(() => {}) try { reader.releaseLock() } catch {} - // Leave the .part file in place so the next attempt can resume from the - // durable bytes written so far. The atomic rename above means an - // incomplete download never reaches `destPath`. + if (isIdleAbort()) + throw idleTimeout() throw err } }) diff --git a/apps/backend/src/modules/downloads/downloads.service.ts b/apps/backend/src/modules/downloads/downloads.service.ts index 9827dc9..8f74cb0 100644 --- a/apps/backend/src/modules/downloads/downloads.service.ts +++ b/apps/backend/src/modules/downloads/downloads.service.ts @@ -211,6 +211,7 @@ export class DownloadsService { torrentFilename: record.torrentFilename, partPath: record.partPath, releaseSize: record.releaseSize, + idleTimeoutMs: this.config.idleTimeoutMs, onProgress, }) }, { diff --git a/apps/backend/src/modules/downloads/retry-policy.ts b/apps/backend/src/modules/downloads/retry-policy.ts index 71c786e..bd247cb 100644 --- a/apps/backend/src/modules/downloads/retry-policy.ts +++ b/apps/backend/src/modules/downloads/retry-policy.ts @@ -1,4 +1,5 @@ import { FetchError } from '../../lib/errors/FetchError' +import { IdleTimeoutError } from '../../lib/errors/IdleTimeoutError' import { IncompleteDownloadError } from '../../lib/errors/IncompleteDownloadError' /** @@ -9,6 +10,8 @@ import { IncompleteDownloadError } from '../../lib/errors/IncompleteDownloadErro export function isTransientDownloadError(error: unknown): boolean { if (error instanceof IncompleteDownloadError) return true + if (error instanceof IdleTimeoutError) + return true if (error instanceof FetchError) { const status = error.response?.status ?? error.extras.status if (status == null)