diff --git a/lib/storage.ts b/lib/storage.ts index d3c51ee..2919590 100644 --- a/lib/storage.ts +++ b/lib/storage.ts @@ -17,6 +17,7 @@ import { DeleteObjectsCommand, GetObjectCommand, HeadBucketCommand, + HeadObjectCommand, ListObjectsV2Command, S3Client, } from '@aws-sdk/client-s3' @@ -46,6 +47,8 @@ export class ObjectNotFoundError extends Error { } } +export const PARTS_DELETE_GRACE_MS = 15 * 60 * 1000 + export class Storage { adapter private db @@ -250,16 +253,6 @@ export class Storage { }) .where('id', '=', storageLocation.id) .execute() - await this.db.transaction().execute(async (tx) => { - await tx - .updateTable('storage_locations') - .set({ - partsDeletedAt: Date.now(), - }) - .where('id', '=', storageLocation.id) - .execute() - await this.adapter.deleteFolder(`${storageLocation.folderName}/parts`) - }) }) .catch(async () => { await this.db @@ -278,14 +271,19 @@ export class Storage { this.pumpPartsToStreams(storageLocation, responseStream, mergerStream).catch((err) => { responseStream.destroy(err) mergerStream.destroy(err) - if (err instanceof ObjectNotFoundError) + if (err instanceof ObjectNotFoundError) { logger.warn(`Stale cache entry ${cacheEntryId}: ${err.message}`) + void this.deleteStaleCacheEntry(cacheEntryId).catch((deleteErr) => { + logger.error(`Failed to delete stale cache entry ${cacheEntryId}`, deleteErr) + }) + } }) return responseStream } catch (err) { if (err instanceof ObjectNotFoundError) { logger.warn(`Stale cache entry ${cacheEntryId}: ${err.message}`) + await this.deleteStaleCacheEntry(cacheEntryId) return } throw err @@ -294,8 +292,17 @@ export class Storage { private async ensurePartsExist(location: StorageLocation) { const partsFolder = `${location.folderName}/parts` - const actualPartCount = await this.adapter.countFilesInFolder(partsFolder) - if (actualPartCount < location.partCount) throw new ObjectNotFoundError(partsFolder) + const partExists = await Promise.all( + Array.from({ length: location.partCount }, (_, i) => + this.adapter.fileExists(`${partsFolder}/${i}`), + ), + ) + const missingPartIndex = partExists.findIndex((exists) => !exists) + if (missingPartIndex !== -1) throw new ObjectNotFoundError(`${partsFolder}/${missingPartIndex}`) + } + + private async deleteStaleCacheEntry(cacheEntryId: string) { + await this.db.deleteFrom('cache_entries').where('id', '=', cacheEntryId).execute() } private async downloadFromCacheEntryLocation(location: StorageLocation) { @@ -500,6 +507,7 @@ interface StorageAdapter { createDownloadStream(objectName: string): Promise uploadStream(objectName: string, stream: Readable): Promise deleteFolder(folderName: string): Promise + fileExists(objectName: string): Promise countFilesInFolder(folderName: string): Promise createDownloadUrl?(objectName: string): Promise clear(): Promise @@ -564,6 +572,26 @@ class S3Adapter implements StorageAdapter { } } + async fileExists(objectName: string) { + try { + await this.s3.send( + new HeadObjectCommand({ + Bucket: this.bucket, + Key: `${this.keyPrefix}/${objectName}`, + }), + ) + return true + } catch (err: any) { + if ( + err.name === 'NoSuchKey' || + err.name === 'NotFound' || + err.$metadata?.httpStatusCode === 404 + ) + return false + throw err + } + } + async deleteFolder(folderName: string) { return this.deleteByPrefix(`${this.keyPrefix}/${folderName}/`) } @@ -676,6 +704,16 @@ class FileSystemAdapter implements StorageAdapter { return createReadStream(filePath) } + async fileExists(objectName: string) { + try { + await fs.access(this.safePath(objectName)) + return true + } catch (err: any) { + if (err.code === 'ENOENT') return false + throw err + } + } + async deleteFolder(folderName: string) { await fs.rm(this.safePath(folderName), { recursive: true, @@ -744,6 +782,11 @@ class GcsAdapter implements StorageAdapter { return file.createReadStream() } + async fileExists(objectName: string) { + const [exists] = await this.bucket.file(`${this.keyPrefix}/${objectName}`).exists() + return exists + } + async deleteFolder(folderName: string) { await this.bucket.deleteFiles({ prefix: `${this.keyPrefix}/${folderName}/`, diff --git a/tasks/cleanup/parts.ts b/tasks/cleanup/parts.ts index f289346..cd867ff 100644 --- a/tasks/cleanup/parts.ts +++ b/tasks/cleanup/parts.ts @@ -1,6 +1,6 @@ import { getDatabase } from '~/lib/db' import { env } from '~/lib/env' -import { getStorage } from '~/lib/storage' +import { getStorage, PARTS_DELETE_GRACE_MS } from '~/lib/storage' const itemsPerPage = 10 @@ -17,10 +17,12 @@ export default defineTask({ let deletedCount = 0 let page = 0 + const deleteBefore = Date.now() - PARTS_DELETE_GRACE_MS while (true) { const storageLocations = await db .selectFrom('storage_locations') .where('mergedAt', 'is not', null) + .where('mergedAt', '<', deleteBefore) .where('partsDeletedAt', 'is', null) .select(['folderName', 'id', 'partCount']) .limit(itemsPerPage) diff --git a/tests/stale-cache.test.ts b/tests/stale-cache.test.ts index 8c6e813..2e8d57c 100644 --- a/tests/stale-cache.test.ts +++ b/tests/stale-cache.test.ts @@ -5,6 +5,7 @@ import path from 'node:path' import { restoreCache, saveCache } from '@actions/cache' import { SignJWT } from 'jose' import { afterAll, beforeAll, describe, expect, test } from 'vitest' +import { getDatabase } from '~/lib/db' import { Storage } from '~/lib/storage' import { TEST_TEMP_DIR } from './setup' @@ -29,6 +30,44 @@ describe('stale cache entry handling (missing storage objects)', () => { delete process.env.ACTIONS_RUNTIME_TOKEN }) + test( + 'keeps parts after merge so parallel part restores can finish', + { timeout: 30_000 }, + async () => { + const key = 'stale-merge-grace-key' + const contents = crypto.randomBytes(1024) + await fs.writeFile(testFilePath, contents) + await saveCache([testFilePath], key) + await fs.rm(testFilePath) + + const hitKey = await restoreCache([testFilePath], key) + expect(hitKey).toBe(key) + await fs.rm(testFilePath) + + // Wait for the background merge to flush before checking retained parts. + await new Promise((resolve) => setTimeout(resolve, 2000)) + + const db = await getDatabase() + const location = await db + .selectFrom('cache_entries') + .innerJoin('storage_locations', 'storage_locations.id', 'cache_entries.locationId') + .where('cache_entries.key', '=', key) + .select([ + 'storage_locations.folderName', + 'storage_locations.partCount', + 'storage_locations.mergedAt', + 'storage_locations.partsDeletedAt', + ]) + .executeTakeFirstOrThrow() + + expect(location.mergedAt).not.toBeNull() + expect(location.partsDeletedAt).toBeNull() + await expect(adapter.countFilesInFolder(`${location.folderName}/parts`)).resolves.toBe( + location.partCount, + ) + }, + ) + test( 'returns cache miss when parts are wiped before first download (unmerged entry)', { timeout: 30_000 }, @@ -45,6 +84,15 @@ describe('stale cache entry handling (missing storage objects)', () => { const missKey2 = await restoreCache([testFilePath], 'stale-fresh-key') expect(missKey2).toBeUndefined() + + const db = await getDatabase() + await expect( + db + .selectFrom('cache_entries') + .where('key', '=', 'stale-fresh-key') + .select('id') + .executeTakeFirst(), + ).resolves.toBeUndefined() }, ) @@ -71,6 +119,15 @@ describe('stale cache entry handling (missing storage objects)', () => { const missKey2 = await restoreCache([testFilePath], 'stale-merged-key') expect(missKey2).toBeUndefined() + + const db = await getDatabase() + await expect( + db + .selectFrom('cache_entries') + .where('key', '=', 'stale-merged-key') + .select('id') + .executeTakeFirst(), + ).resolves.toBeUndefined() }, ) })