diff --git a/lib/storage.ts b/lib/storage.ts index 18e51a3..1e0839d 100644 --- a/lib/storage.ts +++ b/lib/storage.ts @@ -30,6 +30,14 @@ import { match } from 'ts-pattern' import { getDatabase } from './db' import { env } from './env' import { generateNumberId } from './helpers' +import { logger } from './logger' + +export class ObjectNotFoundError extends Error { + constructor(objectName: string) { + super(`Object not found in storage: ${objectName}`) + this.name = 'ObjectNotFoundError' + } +} export class Storage { adapter @@ -208,22 +216,24 @@ export class Storage { .where('id', '=', storageLocation.id) .execute() - if (storageLocation.mergedAt || storageLocation.mergeStartedAt) - return this.downloadFromCacheEntryLocation(storageLocation) + try { + if (storageLocation.mergedAt || storageLocation.mergeStartedAt) + return await this.downloadFromCacheEntryLocation(storageLocation) - await this.db - .updateTable('storage_locations') - .set({ - mergeStartedAt: Date.now(), - }) - .where('id', '=', storageLocation.id) - .execute() + await this.ensurePartsExist(storageLocation) - const responseStream = new PassThrough() - const mergerStream = new PassThrough() + await this.db + .updateTable('storage_locations') + .set({ + mergeStartedAt: Date.now(), + }) + .where('id', '=', storageLocation.id) + .execute() - try { - const promise = this.adapter + const responseStream = new PassThrough() + const mergerStream = new PassThrough() + + const mergePromise = this.adapter .uploadStream(`${storageLocation.folderName}/merged`, mergerStream) .then(async () => { await this.db @@ -255,31 +265,36 @@ export class Storage { .execute() mergerStream.destroy() }) - this.mergeStreamPromises.add(promise) - promise.finally(() => this.mergeStreamPromises.delete(promise)) + this.mergeStreamPromises.add(mergePromise) + mergePromise.finally(() => this.mergeStreamPromises.delete(mergePromise)) + + this.pumpPartsToStreams(storageLocation, responseStream, mergerStream).catch((err) => { + responseStream.destroy(err) + mergerStream.destroy(err) + if (err instanceof ObjectNotFoundError) + logger.warn(`Stale cache entry ${cacheEntryId}: ${err.message}`) + }) + + return responseStream } catch (err) { - await this.db - .updateTable('storage_locations') - .set({ - mergedAt: null, - mergeStartedAt: null, - }) - .where('id', '=', storageLocation.id) - .execute() + if (err instanceof ObjectNotFoundError) { + logger.warn(`Stale cache entry ${cacheEntryId}: ${err.message}`) + return + } throw err } + } - this.pumpPartsToStreams(storageLocation, responseStream, mergerStream).catch((err) => { - responseStream.destroy(err) - mergerStream.destroy(err) - }) - - return responseStream + private async ensurePartsExist(location: StorageLocation) { + const partsFolder = `${location.folderName}/parts` + const actualPartCount = await this.adapter.countFilesInFolder(partsFolder) + if (actualPartCount < location.partCount) throw new ObjectNotFoundError(partsFolder) } private async downloadFromCacheEntryLocation(location: StorageLocation) { if (location.mergedAt) return this.adapter.createDownloadStream(`${location.folderName}/merged`) + await this.ensurePartsExist(location) return Readable.from(this.streamParts(location)) } @@ -522,15 +537,20 @@ class S3Adapter implements StorageAdapter { } async createDownloadStream(objectName: string) { - const response = await this.s3.send( - new GetObjectCommand({ - Bucket: this.bucket, - Key: `${this.keyPrefix}/${objectName}`, - }), - ) - if (!response.Body) throw new Error('No body in S3 get object response') + try { + const response = await this.s3.send( + new GetObjectCommand({ + Bucket: this.bucket, + Key: `${this.keyPrefix}/${objectName}`, + }), + ) + if (!response.Body) throw new Error('No body in S3 get object response') - return response.Body as Readable + return response.Body as Readable + } catch (err: any) { + if (err.name === 'NoSuchKey') throw new ObjectNotFoundError(objectName) + throw err + } } async deleteFolder(folderName: string) { @@ -629,7 +649,13 @@ class FileSystemAdapter implements StorageAdapter { } async createDownloadStream(objectName: string) { - return createReadStream(path.join(this.rootFolder, objectName)) + const filePath = path.join(this.rootFolder, objectName) + try { + await fs.access(filePath) + } catch { + throw new ObjectNotFoundError(objectName) + } + return createReadStream(filePath) } async deleteFolder(folderName: string) { @@ -656,11 +682,15 @@ class FileSystemAdapter implements StorageAdapter { } async countFilesInFolder(folderName: string) { - const dir = await fs.readdir(path.join(this.rootFolder, folderName), { - withFileTypes: true, - }) - - return dir.filter((item) => item.isFile()).length + try { + const dir = await fs.readdir(path.join(this.rootFolder, folderName), { + withFileTypes: true, + }) + return dir.filter((item) => item.isFile()).length + } catch (err: any) { + if (err.code === 'ENOENT') return 0 + throw err + } } } @@ -690,7 +720,10 @@ class GcsAdapter implements StorageAdapter { } async createDownloadStream(objectName: string) { - return this.bucket.file(`${this.keyPrefix}/${objectName}`).createReadStream() + const file = this.bucket.file(`${this.keyPrefix}/${objectName}`) + const [exists] = await file.exists() + if (!exists) throw new ObjectNotFoundError(objectName) + return file.createReadStream() } async deleteFolder(folderName: string) { diff --git a/tests/stale-cache.test.ts b/tests/stale-cache.test.ts new file mode 100644 index 0000000..8c6e813 --- /dev/null +++ b/tests/stale-cache.test.ts @@ -0,0 +1,76 @@ +import crypto from 'node:crypto' +import fs from 'node:fs/promises' +import path from 'node:path' + +import { restoreCache, saveCache } from '@actions/cache' +import { SignJWT } from 'jose' +import { afterAll, beforeAll, describe, expect, test } from 'vitest' +import { Storage } from '~/lib/storage' +import { TEST_TEMP_DIR } from './setup' + +const testFilePath = path.join(TEST_TEMP_DIR, 'test-stale.bin') + +describe('stale cache entry handling (missing storage objects)', () => { + let adapter: Awaited> + + beforeAll(async () => { + process.env.ACTIONS_CACHE_SERVICE_V2 = 'true' + process.env.ACTIONS_RUNTIME_TOKEN = await new SignJWT({ + ac: JSON.stringify([{ Scope: 'refs/heads/main', Permission: 3 }]), + repository_id: '123', + }) + .setProtectedHeader({ alg: 'HS256' }) + .sign(crypto.createSecretKey('mock-secret-key', 'ascii')) + + adapter = await Storage.getAdapterFromEnv() + }) + afterAll(() => { + delete process.env.ACTIONS_CACHE_SERVICE_V2 + delete process.env.ACTIONS_RUNTIME_TOKEN + }) + + test( + 'returns cache miss when parts are wiped before first download (unmerged entry)', + { timeout: 30_000 }, + async () => { + const contents = crypto.randomBytes(1024) + await fs.writeFile(testFilePath, contents) + await saveCache([testFilePath], 'stale-fresh-key') + await fs.rm(testFilePath) + + await adapter.clear() + + const missKey = await restoreCache([testFilePath], 'stale-fresh-key') + expect(missKey).toBeUndefined() + + const missKey2 = await restoreCache([testFilePath], 'stale-fresh-key') + expect(missKey2).toBeUndefined() + }, + ) + + test( + 'returns cache miss when the merged blob is wiped after merge completes', + { timeout: 30_000 }, + async () => { + const contents = crypto.randomBytes(1024) + await fs.writeFile(testFilePath, contents) + await saveCache([testFilePath], 'stale-merged-key') + await fs.rm(testFilePath) + + const hitKey = await restoreCache([testFilePath], 'stale-merged-key') + expect(hitKey).toBe('stale-merged-key') + await fs.rm(testFilePath) + + // Wait for the background merge to flush before wiping storage. + await new Promise((resolve) => setTimeout(resolve, 2000)) + + await adapter.clear() + + const missKey = await restoreCache([testFilePath], 'stale-merged-key') + expect(missKey).toBeUndefined() + + const missKey2 = await restoreCache([testFilePath], 'stale-merged-key') + expect(missKey2).toBeUndefined() + }, + ) +})