Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 56 additions & 13 deletions lib/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
DeleteObjectsCommand,
GetObjectCommand,
HeadBucketCommand,
HeadObjectCommand,
ListObjectsV2Command,
S3Client,
} from '@aws-sdk/client-s3'
Expand Down Expand Up @@ -46,6 +47,8 @@ export class ObjectNotFoundError extends Error {
}
}

export const PARTS_DELETE_GRACE_MS = 15 * 60 * 1000

export class Storage {
adapter
private db
Expand Down Expand Up @@ -250,16 +253,6 @@ export class Storage {
})
.where('id', '=', storageLocation.id)
.execute()
await this.db.transaction().execute(async (tx) => {
await tx
.updateTable('storage_locations')
.set({
partsDeletedAt: Date.now(),
})
.where('id', '=', storageLocation.id)
.execute()
await this.adapter.deleteFolder(`${storageLocation.folderName}/parts`)
})
})
.catch(async () => {
await this.db
Expand All @@ -278,14 +271,19 @@ export class Storage {
this.pumpPartsToStreams(storageLocation, responseStream, mergerStream).catch((err) => {
responseStream.destroy(err)
mergerStream.destroy(err)
if (err instanceof ObjectNotFoundError)
if (err instanceof ObjectNotFoundError) {
logger.warn(`Stale cache entry ${cacheEntryId}: ${err.message}`)
void this.deleteStaleCacheEntry(cacheEntryId).catch((deleteErr) => {
logger.error(`Failed to delete stale cache entry ${cacheEntryId}`, deleteErr)
})
}
})

return responseStream
} catch (err) {
if (err instanceof ObjectNotFoundError) {
logger.warn(`Stale cache entry ${cacheEntryId}: ${err.message}`)
await this.deleteStaleCacheEntry(cacheEntryId)
return
}
throw err
Expand All @@ -294,8 +292,17 @@ export class Storage {

private async ensurePartsExist(location: StorageLocation) {
const partsFolder = `${location.folderName}/parts`
const actualPartCount = await this.adapter.countFilesInFolder(partsFolder)
if (actualPartCount < location.partCount) throw new ObjectNotFoundError(partsFolder)
const partExists = await Promise.all(
Array.from({ length: location.partCount }, (_, i) =>
this.adapter.fileExists(`${partsFolder}/${i}`),
),
)
const missingPartIndex = partExists.findIndex((exists) => !exists)
if (missingPartIndex !== -1) throw new ObjectNotFoundError(`${partsFolder}/${missingPartIndex}`)
}

private async deleteStaleCacheEntry(cacheEntryId: string) {
await this.db.deleteFrom('cache_entries').where('id', '=', cacheEntryId).execute()
}

private async downloadFromCacheEntryLocation(location: StorageLocation) {
Expand Down Expand Up @@ -500,6 +507,7 @@ interface StorageAdapter {
createDownloadStream(objectName: string): Promise<Readable>
uploadStream(objectName: string, stream: Readable): Promise<void>
deleteFolder(folderName: string): Promise<void>
fileExists(objectName: string): Promise<boolean>
countFilesInFolder(folderName: string): Promise<number>
createDownloadUrl?(objectName: string): Promise<string>
clear(): Promise<void>
Expand Down Expand Up @@ -564,6 +572,26 @@ class S3Adapter implements StorageAdapter {
}
}

async fileExists(objectName: string) {
try {
await this.s3.send(
new HeadObjectCommand({
Bucket: this.bucket,
Key: `${this.keyPrefix}/${objectName}`,
}),
)
return true
} catch (err: any) {
if (
err.name === 'NoSuchKey' ||
err.name === 'NotFound' ||
err.$metadata?.httpStatusCode === 404
)
return false
throw err
}
}

async deleteFolder(folderName: string) {
return this.deleteByPrefix(`${this.keyPrefix}/${folderName}/`)
}
Expand Down Expand Up @@ -676,6 +704,16 @@ class FileSystemAdapter implements StorageAdapter {
return createReadStream(filePath)
}

async fileExists(objectName: string) {
try {
await fs.access(this.safePath(objectName))
return true
} catch (err: any) {
if (err.code === 'ENOENT') return false
throw err
}
}

async deleteFolder(folderName: string) {
await fs.rm(this.safePath(folderName), {
recursive: true,
Expand Down Expand Up @@ -744,6 +782,11 @@ class GcsAdapter implements StorageAdapter {
return file.createReadStream()
}

async fileExists(objectName: string) {
const [exists] = await this.bucket.file(`${this.keyPrefix}/${objectName}`).exists()
return exists
}

async deleteFolder(folderName: string) {
await this.bucket.deleteFiles({
prefix: `${this.keyPrefix}/${folderName}/`,
Expand Down
4 changes: 3 additions & 1 deletion tasks/cleanup/parts.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { getDatabase } from '~/lib/db'
import { env } from '~/lib/env'
import { getStorage } from '~/lib/storage'
import { getStorage, PARTS_DELETE_GRACE_MS } from '~/lib/storage'

const itemsPerPage = 10

Expand All @@ -17,10 +17,12 @@ export default defineTask({

let deletedCount = 0
let page = 0
const deleteBefore = Date.now() - PARTS_DELETE_GRACE_MS
while (true) {
const storageLocations = await db
.selectFrom('storage_locations')
.where('mergedAt', 'is not', null)
.where('mergedAt', '<', deleteBefore)
.where('partsDeletedAt', 'is', null)
.select(['folderName', 'id', 'partCount'])
.limit(itemsPerPage)
Expand Down
57 changes: 57 additions & 0 deletions tests/stale-cache.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import path from 'node:path'
import { restoreCache, saveCache } from '@actions/cache'
import { SignJWT } from 'jose'
import { afterAll, beforeAll, describe, expect, test } from 'vitest'
import { getDatabase } from '~/lib/db'
import { Storage } from '~/lib/storage'
import { TEST_TEMP_DIR } from './setup'

Expand All @@ -29,6 +30,44 @@ describe('stale cache entry handling (missing storage objects)', () => {
delete process.env.ACTIONS_RUNTIME_TOKEN
})

test(
'keeps parts after merge so parallel part restores can finish',
{ timeout: 30_000 },
async () => {
const key = 'stale-merge-grace-key'
const contents = crypto.randomBytes(1024)
await fs.writeFile(testFilePath, contents)
await saveCache([testFilePath], key)
await fs.rm(testFilePath)

const hitKey = await restoreCache([testFilePath], key)
expect(hitKey).toBe(key)
await fs.rm(testFilePath)

// Wait for the background merge to flush before checking retained parts.
await new Promise((resolve) => setTimeout(resolve, 2000))

const db = await getDatabase()
const location = await db
.selectFrom('cache_entries')
.innerJoin('storage_locations', 'storage_locations.id', 'cache_entries.locationId')
.where('cache_entries.key', '=', key)
.select([
'storage_locations.folderName',
'storage_locations.partCount',
'storage_locations.mergedAt',
'storage_locations.partsDeletedAt',
])
.executeTakeFirstOrThrow()

expect(location.mergedAt).not.toBeNull()
expect(location.partsDeletedAt).toBeNull()
await expect(adapter.countFilesInFolder(`${location.folderName}/parts`)).resolves.toBe(
location.partCount,
)
},
)

test(
'returns cache miss when parts are wiped before first download (unmerged entry)',
{ timeout: 30_000 },
Expand All @@ -45,6 +84,15 @@ describe('stale cache entry handling (missing storage objects)', () => {

const missKey2 = await restoreCache([testFilePath], 'stale-fresh-key')
expect(missKey2).toBeUndefined()

const db = await getDatabase()
await expect(
db
.selectFrom('cache_entries')
.where('key', '=', 'stale-fresh-key')
.select('id')
.executeTakeFirst(),
).resolves.toBeUndefined()
},
)

Expand All @@ -71,6 +119,15 @@ describe('stale cache entry handling (missing storage objects)', () => {

const missKey2 = await restoreCache([testFilePath], 'stale-merged-key')
expect(missKey2).toBeUndefined()

const db = await getDatabase()
await expect(
db
.selectFrom('cache_entries')
.where('key', '=', 'stale-merged-key')
.select('id')
.executeTakeFirst(),
).resolves.toBeUndefined()
},
)
})