Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 220 additions & 25 deletions apps/backend/src/__tests__/downloads-service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ import type { Release } from '../lib/release'
import { mkdtemp, rm, writeFile } from 'node:fs/promises'
import { tmpdir } from 'node:os'
import { join } from 'node:path'
import { afterEach, beforeEach, describe, expect, test } from 'bun:test'
import { afterEach, beforeEach, describe, expect, spyOn, test } from 'bun:test'
import { openDatabase } from '../database/connection'
import { FetchError } from '../lib/errors/FetchError'
import { DownloadsRepository } from '../modules/downloads/downloads.repository'
import { DownloadsService } from '../modules/downloads/downloads.service'
import { createTorrentStub } from '../modules/torznab/torrent'
Expand Down Expand Up @@ -31,6 +32,18 @@ afterEach(async () => {
await rm(tempDir, { recursive: true, force: true })
})

function downloadsConfig(overrides: Partial<Record<string, number>> = {}) {
return {
watchPath,
completedPath,
maxConcurrentDownloads: 2,
maxDownloadAttempts: 3,
retryBaseDelayMs: 0,
retryMaxDelayMs: 0,
...overrides,
}
}
Comment thread
roziscoding marked this conversation as resolved.

function fakePeer(overrides: Partial<Record<'getRelease' | 'downloadFile', any>> = {}) {
return {
id: 'peer-1',
Expand All @@ -49,9 +62,9 @@ function fakeDestination() {
return { isInitialized: true, canDestination: true, name: 'Radarr', triggerImport: async () => {} }
}

async function writeTorrent(filename = 'movie.torrent') {
async function writeTorrent(filename = 'movie.torrent', itemId = 'movie:1') {
const filePath = join(watchPath, filename)
await writeFile(filePath, createTorrentStub({ name: release.title, size: release.size, peerId: 'peer-1', itemId: 'movie:1' }))
await writeFile(filePath, createTorrentStub({ name: release.title, size: release.size, peerId: 'peer-1', itemId }))
return filePath
}

Expand All @@ -68,12 +81,11 @@ describe('DownloadsService download progress persistence', () => {
await options.onProgress({ type: 'completed', downloadedBytes: 10, expectedBytes: 10 })
},
})
const service = new DownloadsService({ completedPath }, [peer as any], [fakeDestination() as any], repository)
const service = new DownloadsService(downloadsConfig(), [peer as any], [fakeDestination() as any], repository)
const filePath = await writeTorrent()

await service.processTorrentFile(filePath, 'movie.torrent')

// The service forwards dest/part/size/torrentFilename into downloadFile.
expect(calls).toHaveLength(1)
expect(calls[0].destPath).toBe(join(completedPath, release.filename))
expect(calls[0].options.partPath).toBe(`${join(completedPath, release.filename)}.part`)
Expand All @@ -82,14 +94,9 @@ describe('DownloadsService download progress persistence', () => {

const downloads = repository.list()
expect(downloads).toHaveLength(1)
expect(downloads[0]?.torrentFilename).toBe('movie.torrent')
expect(downloads[0]?.filename).toBe(release.filename)
expect(downloads[0]?.destPath).toBe(join(completedPath, release.filename))
expect(downloads[0]?.partPath).toBe(`${join(completedPath, release.filename)}.part`)
expect(downloads[0]?.releaseSize).toBe(10)
expect(downloads[0]?.expectedBytes).toBe(10)
expect(downloads[0]?.downloadedBytes).toBe(10)
expect(downloads[0]?.status).toBe('import_queued')
expect(downloads[0]?.downloadedBytes).toBe(10)
expect(downloads[0]?.attempts).toBe(1)
handle.close()
})

Expand All @@ -99,7 +106,7 @@ describe('DownloadsService download progress persistence', () => {
const peer = fakePeer({ getRelease: async () => {
throw new Error('metadata failed')
} })
const service = new DownloadsService({ completedPath }, [peer as any], [], repository)
const service = new DownloadsService(downloadsConfig(), [peer as any], [], repository)
const filePath = await writeTorrent()

await service.processTorrentFile(filePath, 'movie.torrent')
Expand All @@ -108,7 +115,7 @@ describe('DownloadsService download progress persistence', () => {
handle.close()
})

test('rejects a peer release with a path-traversal filename and does not write outside completedPath', async () => {
test('rejects a peer release with a path-traversal filename', async () => {
const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') })
const repository = new DownloadsRepository(handle.db)
const writtenPaths: string[] = []
Expand All @@ -118,36 +125,224 @@ describe('DownloadsService download progress persistence', () => {
writtenPaths.push(destPath)
},
})
const service = new DownloadsService({ completedPath }, [peer as any], [fakeDestination() as any], repository)
const service = new DownloadsService(downloadsConfig(), [peer as any], [fakeDestination() as any], repository)
const filePath = await writeTorrent()

await service.processTorrentFile(filePath, 'movie.torrent')

// The unsafe name must never reach downloadFile / be written to disk.
expect(writtenPaths).toHaveLength(0)
const evilOutside = join(tempDir, 'evil.mkv')
expect(await Bun.file(evilOutside).exists()).toBe(false)
expect(await Bun.file(`${evilOutside}.part`).exists()).toBe(false)

expect(repository.list()).toHaveLength(0)
handle.close()
})

test('marks an existing row failed when download fails after metadata resolves', async () => {
test('marks an existing row failed when a permanent download error occurs', async () => {
const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') })
const repository = new DownloadsRepository(handle.db)
let calls = 0
const peer = fakePeer({ downloadFile: async () => {
throw new Error('download failed')
calls++
throw new FetchError('not found', new Response(null, { status: 404 }))
} })
const service = new DownloadsService({ completedPath }, [peer as any], [], repository)
const service = new DownloadsService(downloadsConfig(), [peer as any], [], repository)
const filePath = await writeTorrent()

await service.processTorrentFile(filePath, 'movie.torrent')

expect(calls).toBe(1) // 404 is permanent — no retry
const downloads = repository.list()
expect(downloads).toHaveLength(1)
expect(downloads[0]?.status).toBe('failed')
expect(downloads[0]?.error).toContain('download failed')
handle.close()
})

test('retries a transient failure then succeeds', async () => {
const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') })
const repository = new DownloadsRepository(handle.db)
let calls = 0
const peer = fakePeer({
downloadFile: async (_itemId: string, _destPath: string, options: any) => {
calls++
if (calls === 1)
throw new FetchError('busy', new Response(null, { status: 503 }))
await options.onProgress({ type: 'completed', downloadedBytes: 10, expectedBytes: 10 })
},
})
const service = new DownloadsService(downloadsConfig(), [peer as any], [fakeDestination() as any], repository)
const filePath = await writeTorrent()

await service.processTorrentFile(filePath, 'movie.torrent')

expect(calls).toBe(2)
const downloads = repository.list()
expect(downloads[0]?.status).toBe('import_queued')
expect(downloads[0]?.attempts).toBe(2)
handle.close()
})

test('persists a resume reset from a restart event', async () => {
const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') })
const repository = new DownloadsRepository(handle.db)
// Spy, because a realistic download ends with a `completed` event whose
// markCompleted() clears the error markResumeReset() set — so asserting the
// final row state cannot prove the reset ran.
const resetSpy = spyOn(repository, 'markResumeReset')
const peer = fakePeer({
downloadFile: async (_itemId: string, _destPath: string, options: any) => {
await options.onProgress({ type: 'headers', expectedBytes: 10, expectedBytesSource: 'content_length', expectedBytesMismatch: false })
await options.onProgress({ type: 'restart', reason: 'range_ignored', discardedBytes: 4 })
await options.onProgress({ type: 'completed', downloadedBytes: 10, expectedBytes: 10 })
},
})
const service = new DownloadsService(downloadsConfig(), [peer as any], [fakeDestination() as any], repository)
const filePath = await writeTorrent()

await service.processTorrentFile(filePath, 'movie.torrent')

expect(resetSpy).toHaveBeenCalledTimes(1)
expect(repository.list()[0]?.status).toBe('import_queued')
resetSpy.mockRestore()
handle.close()
})

test('limits concurrent downloads to maxConcurrentDownloads', async () => {
const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') })
const repository = new DownloadsRepository(handle.db)
let active = 0
let maxActive = 0
const peer = {
id: 'peer-1',
name: 'Friend Jack',
url: 'http://peer.test',
// Distinct filename per item so each maps to a distinct destPath.
getRelease: async (itemId: string) => ({ ...release, id: `remote:${itemId}`, filename: `${itemId.replace(':', '_')}.mkv` }),
downloadFile: async (_itemId: string, _destPath: string, options: any) => {
active++
maxActive = Math.max(maxActive, active)
await Bun.sleep(20)
active--
await options.onProgress({ type: 'completed', downloadedBytes: 10, expectedBytes: 10 })
},
}
const service = new DownloadsService(downloadsConfig({ maxConcurrentDownloads: 1 }), [peer as any], [fakeDestination() as any], repository)
const a = await writeTorrent('a.torrent', 'movie:1')
const b = await writeTorrent('b.torrent', 'movie:2')

await Promise.all([
service.processTorrentFile(a, 'a.torrent'),
service.processTorrentFile(b, 'b.torrent'),
])

expect(maxActive).toBe(1)
expect(repository.list().filter(d => d.status === 'import_queued')).toHaveLength(2)
handle.close()
})

test('resumeStaleDownloads re-drives a stale downloading row to import_queued', async () => {
const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') })
const repository = new DownloadsRepository(handle.db)
const calls: string[] = []
const peer = fakePeer({
downloadFile: async (_itemId: string, destPath: string, options: any) => {
calls.push(destPath)
await options.onProgress({ type: 'completed', downloadedBytes: 10, expectedBytes: 10 })
},
})
// Seed a stale `downloading` row, as if Jack crashed mid-download.
repository.create({
torrentFilename: 'movie.torrent',
peerId: 'peer-1',
peerName: 'Friend Jack',
itemId: 'movie:1',
filename: release.filename,
destPath: join(completedPath, release.filename),
partPath: `${join(completedPath, release.filename)}.part`,
releaseSize: release.size,
release,
})
const service = new DownloadsService(downloadsConfig(), [peer as any], [fakeDestination() as any], repository)

const resumed = await service.resumeStaleDownloads()
// resumeStaleDownloads fires in the background; wait for the row to settle.
for (let i = 0; i < 50 && repository.list()[0]?.status !== 'import_queued'; i++)
await Bun.sleep(10)

expect(resumed).toBe(1)
expect(calls).toEqual([join(completedPath, release.filename)])
expect(repository.list()[0]?.status).toBe('import_queued')
handle.close()
})

test('marks superseded duplicate stale rows (same destPath) failed and re-drives only one', async () => {
const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') })
const repository = new DownloadsRepository(handle.db)
const calls: string[] = []
const peer = fakePeer({
downloadFile: async (_itemId: string, destPath: string, options: any) => {
calls.push(destPath)
await options.onProgress({ type: 'completed', downloadedBytes: 10, expectedBytes: 10 })
},
})
const destPath = join(completedPath, release.filename)
const base = {
peerId: 'peer-1',
peerName: 'Friend Jack',
itemId: 'movie:1',
filename: release.filename,
destPath,
partPath: `${destPath}.part`,
releaseSize: release.size,
release,
}
repository.create({ ...base, torrentFilename: 'first.torrent' })
repository.create({ ...base, torrentFilename: 'second.torrent' })
const service = new DownloadsService(downloadsConfig(), [peer as any], [fakeDestination() as any], repository)

const resumed = await service.resumeStaleDownloads()
for (let i = 0; i < 50 && !repository.list().some(d => d.status === 'import_queued'); i++)
await Bun.sleep(10)

expect(resumed).toBe(1)
expect(calls).toEqual([destPath]) // only one of the two same-destPath rows is re-driven
const rows = repository.list()
expect(rows.filter(d => d.status === 'import_queued')).toHaveLength(1)
expect(rows.find(d => d.status === 'failed')?.error).toContain('superseded')
handle.close()
})

test('releases the re-enqueue claim after a successful resume so the filename can be processed again', async () => {
const handle = await openDatabase({ appConfigPath: join(tempDir, 'config.jsonc') })
const repository = new DownloadsRepository(handle.db)
const calls: string[] = []
const peer = fakePeer({
downloadFile: async (itemId: string, _destPath: string, options: any) => {
calls.push(itemId)
await options.onProgress({ type: 'completed', downloadedBytes: 10, expectedBytes: 10 })
},
})
repository.create({
torrentFilename: 'movie.torrent',
peerId: 'peer-1',
peerName: 'Friend Jack',
itemId: 'movie:1',
filename: release.filename,
destPath: join(completedPath, release.filename),
partPath: `${join(completedPath, release.filename)}.part`,
releaseSize: release.size,
release,
})
const service = new DownloadsService(downloadsConfig(), [peer as any], [fakeDestination() as any], repository)

await service.resumeStaleDownloads()
for (let i = 0; i < 50 && repository.list()[0]?.status !== 'import_queued'; i++)
await Bun.sleep(10)
expect(calls).toHaveLength(1)

// A later legitimate re-drop of the same torrent filename must NOT be skipped
// by a stale re-enqueue claim once the resume has completed.
const filePath = await writeTorrent('movie.torrent')
await service.processTorrentFile(filePath, 'movie.torrent')

expect(calls).toHaveLength(2)
expect(repository.list().filter(d => d.status === 'import_queued')).toHaveLength(2)
handle.close()
})
})
18 changes: 13 additions & 5 deletions apps/backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,6 @@ const destinations = connectors.servers.filter(s => s.canDestination)

const database = await openDatabase({ appConfigPath: envs.APP_CONFIG_PATH })
const downloadsRepository = new DownloadsRepository(database.db)
const reconciledDownloads = await downloadsRepository.reconcileStaleDownloads()

if (reconciledDownloads > 0)
logger.warn({ downloads: reconciledDownloads, databasePath: database.path }, 'Reconciled stale downloads from previous Jack run')

const app = getApp(envs, config, connectors, { downloadsRepository })
const server = Bun.serve({
Expand Down Expand Up @@ -100,13 +96,25 @@ if (config.jack) {
}
}

// Start blackhole watcher
// Start blackhole watcher (and re-drive interrupted downloads from a prior run)
let blackholeWatcher: BlackholeWatcher | null = null
if (config.downloads) {
const downloadsService = new DownloadsService(config.downloads, connectors.peers, destinations, downloadsRepository)
// Active re-enqueue: resume stale `downloading` rows in place before the
// watcher scans, so the leftover .torrent stubs are not re-processed as new rows.
const resumed = await downloadsService.resumeStaleDownloads()
if (resumed > 0)
logger.warn({ downloads: resumed, databasePath: database.path }, 'Re-enqueued interrupted downloads from previous Jack run')

blackholeWatcher = new BlackholeWatcher(config.downloads, downloadsService)
await blackholeWatcher.start()
}
else {
// No downloads config means stale rows cannot be resumed — mark them failed.
const failed = await downloadsRepository.reconcileStaleDownloads()
if (failed > 0)
logger.warn({ downloads: failed, databasePath: database.path }, 'Marked stale downloads failed (no downloads config to resume them)')
}

process.on('SIGINT', async () => {
logger.info('SIGINT received, exiting')
Expand Down
Loading
Loading