Skip to content

Commit 33037dc

Browse files
committed
address comments
1 parent a50c7c7 commit 33037dc

6 files changed

Lines changed: 450 additions & 24 deletions

File tree

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/**
2+
* @vitest-environment node
3+
*/
4+
5+
import { beforeEach, describe, expect, it, vi } from 'vitest'
6+
7+
const {
8+
mockBatchDeleteByWorkspaceAndTimestamp,
9+
mockDeleteFileMetadata,
10+
mockDeleteFiles,
11+
mockDeleteRowsById,
12+
mockIsUsingCloudStorage,
13+
mockLimit,
14+
mockOrderBy,
15+
mockPrepareChatCleanup,
16+
mockSelect,
17+
mockSelectRowsByIdChunks,
18+
mockTask,
19+
mockWhere,
20+
} = vi.hoisted(() => {
21+
const mockLimit = vi.fn(async () => [] as Array<{ key: string }>)
22+
const mockOrderBy = vi.fn(() => ({ limit: mockLimit }))
23+
const mockWhere = vi.fn(() => ({ orderBy: mockOrderBy, limit: mockLimit }))
24+
const mockFrom = vi.fn(() => ({
25+
where: mockWhere,
26+
leftJoin: vi.fn(() => ({ where: mockWhere })),
27+
}))
28+
const mockSelect = vi.fn(() => ({ from: mockFrom }))
29+
30+
return {
31+
mockBatchDeleteByWorkspaceAndTimestamp: vi.fn(async () => ({ deleted: 0, failed: 0 })),
32+
mockDeleteFileMetadata: vi.fn(async () => true),
33+
mockDeleteFiles: vi.fn(async () => ({ deleted: 0, failed: [] as Array<{ key: string }> })),
34+
mockDeleteRowsById: vi.fn(async () => ({ deleted: 0, failed: 0 })),
35+
mockIsUsingCloudStorage: vi.fn(() => true),
36+
mockLimit,
37+
mockOrderBy,
38+
mockPrepareChatCleanup: vi.fn(async () => ({ execute: vi.fn(async () => undefined) })),
39+
mockSelect,
40+
mockSelectRowsByIdChunks: vi.fn(async () => [] as unknown[]),
41+
mockTask: vi.fn((config: unknown) => config),
42+
mockWhere,
43+
}
44+
})
45+
46+
vi.mock('@sim/db', () => ({ db: { select: mockSelect } }))
47+
48+
vi.mock('@sim/db/schema', () => {
49+
const table = (cols: string[]) =>
50+
Object.fromEntries(cols.map((c) => [c, `col.${c}`])) as Record<string, string>
51+
const wsFileCols = ['id', 'key', 'context', 'workspaceId', 'deletedAt', 'uploadedAt']
52+
const softCols = ['id', 'archivedAt', 'deletedAt', 'workspaceId']
53+
return {
54+
a2aAgent: table(softCols),
55+
copilotChats: table(['id', 'workflowId']),
56+
document: table(['id', 'storageKey', 'knowledgeBaseId']),
57+
knowledgeBase: table(softCols),
58+
mcpServers: table(softCols),
59+
memory: table(softCols),
60+
userTableDefinitions: table(softCols),
61+
workflow: table(softCols),
62+
workflowFolder: table(softCols),
63+
workflowMcpServer: table(softCols),
64+
workspaceFile: table(wsFileCols),
65+
workspaceFiles: table(wsFileCols),
66+
}
67+
})
68+
69+
vi.mock('@sim/logger', () => ({
70+
createLogger: vi.fn(() => ({ error: vi.fn(), info: vi.fn(), warn: vi.fn() })),
71+
}))
72+
73+
vi.mock('@trigger.dev/sdk', () => ({ task: mockTask }))
74+
75+
vi.mock('drizzle-orm', () => ({
76+
and: vi.fn((...args: unknown[]) => ({ op: 'and', args })),
77+
asc: vi.fn((column: unknown) => ({ op: 'asc', column })),
78+
eq: vi.fn((...args: unknown[]) => ({ op: 'eq', args })),
79+
inArray: vi.fn((...args: unknown[]) => ({ op: 'inArray', args })),
80+
isNotNull: vi.fn((...args: unknown[]) => ({ op: 'isNotNull', args })),
81+
isNull: vi.fn((...args: unknown[]) => ({ op: 'isNull', args })),
82+
lt: vi.fn((...args: unknown[]) => ({ op: 'lt', args })),
83+
sql: vi.fn((strings: TemplateStringsArray, ...values: unknown[]) => ({ strings, values })),
84+
}))
85+
86+
vi.mock('@/lib/cleanup/batch-delete', () => ({
87+
batchDeleteByWorkspaceAndTimestamp: mockBatchDeleteByWorkspaceAndTimestamp,
88+
chunkArray: (items: string[], size: number) => {
89+
const chunks: string[][] = []
90+
for (let index = 0; index < items.length; index += size) {
91+
chunks.push(items.slice(index, index + size))
92+
}
93+
return chunks
94+
},
95+
deleteRowsById: mockDeleteRowsById,
96+
selectRowsByIdChunks: mockSelectRowsByIdChunks,
97+
}))
98+
99+
vi.mock('@/lib/cleanup/chat-cleanup', () => ({ prepareChatCleanup: mockPrepareChatCleanup }))
100+
101+
vi.mock('@/lib/uploads', () => ({
102+
isUsingCloudStorage: mockIsUsingCloudStorage,
103+
StorageService: { deleteFiles: mockDeleteFiles },
104+
}))
105+
106+
vi.mock('@/lib/uploads/server/metadata', () => ({ deleteFileMetadata: mockDeleteFileMetadata }))
107+
108+
import { runCleanupSoftDeletes } from '@/background/cleanup-soft-deletes'
109+
110+
const basePayload = {
111+
label: 'free/1',
112+
plan: 'free' as const,
113+
retentionHours: 720,
114+
workspaceIds: ['ws-1'],
115+
}
116+
117+
describe('cleanup soft deletes — orphan KB binding sweep', () => {
118+
beforeEach(() => {
119+
vi.clearAllMocks()
120+
mockIsUsingCloudStorage.mockReturnValue(true)
121+
mockLimit.mockResolvedValue([])
122+
})
123+
124+
it('soft-deletes abandoned KB bindings and removes their storage objects', async () => {
125+
mockLimit
126+
.mockResolvedValueOnce([{ key: 'kb/orphan-1' }, { key: 'kb/orphan-2' }])
127+
.mockResolvedValueOnce([])
128+
129+
await runCleanupSoftDeletes(basePayload)
130+
131+
expect(mockDeleteFiles).toHaveBeenCalledWith(['kb/orphan-1', 'kb/orphan-2'], 'knowledge-base')
132+
expect(mockDeleteFileMetadata).toHaveBeenCalledWith('kb/orphan-1')
133+
expect(mockDeleteFileMetadata).toHaveBeenCalledWith('kb/orphan-2')
134+
expect(mockDeleteFileMetadata).toHaveBeenCalledTimes(2)
135+
})
136+
137+
it('still removes bindings but skips object deletion without cloud storage', async () => {
138+
mockIsUsingCloudStorage.mockReturnValue(false)
139+
mockLimit.mockResolvedValueOnce([{ key: 'kb/orphan-1' }]).mockResolvedValueOnce([])
140+
141+
await runCleanupSoftDeletes(basePayload)
142+
143+
expect(mockDeleteFiles).not.toHaveBeenCalled()
144+
expect(mockDeleteFileMetadata).toHaveBeenCalledWith('kb/orphan-1')
145+
})
146+
147+
it('stops the batch loop when binding deletion makes no progress', async () => {
148+
mockLimit.mockResolvedValue([{ key: 'kb/stuck' }])
149+
mockDeleteFileMetadata.mockRejectedValue(new Error('db down'))
150+
151+
await runCleanupSoftDeletes(basePayload)
152+
153+
// One batch attempted, then the no-progress guard breaks the loop.
154+
expect(mockDeleteFileMetadata).toHaveBeenCalledTimes(1)
155+
})
156+
157+
it('does not run the sweep when there are no workspaces', async () => {
158+
await runCleanupSoftDeletes({ ...basePayload, workspaceIds: [] })
159+
160+
expect(mockSelect).not.toHaveBeenCalled()
161+
expect(mockDeleteFiles).not.toHaveBeenCalled()
162+
expect(mockDeleteFileMetadata).not.toHaveBeenCalled()
163+
})
164+
})

apps/sim/background/cleanup-soft-deletes.ts

Lines changed: 98 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { db } from '@sim/db'
22
import {
33
a2aAgent,
44
copilotChats,
5+
document,
56
knowledgeBase,
67
mcpServers,
78
memory,
@@ -14,19 +15,31 @@ import {
1415
} from '@sim/db/schema'
1516
import { createLogger } from '@sim/logger'
1617
import { task } from '@trigger.dev/sdk'
17-
import { and, inArray, isNotNull, lt } from 'drizzle-orm'
18+
import { and, asc, eq, inArray, isNotNull, isNull, lt, sql } from 'drizzle-orm'
1819
import type { CleanupJobPayload } from '@/lib/billing/cleanup-dispatcher'
1920
import {
2021
batchDeleteByWorkspaceAndTimestamp,
22+
chunkArray,
2123
deleteRowsById,
2224
selectRowsByIdChunks,
2325
} from '@/lib/cleanup/batch-delete'
2426
import { prepareChatCleanup } from '@/lib/cleanup/chat-cleanup'
2527
import type { StorageContext } from '@/lib/uploads'
2628
import { isUsingCloudStorage, StorageService } from '@/lib/uploads'
29+
import { deleteFileMetadata } from '@/lib/uploads/server/metadata'
2730

2831
const logger = createLogger('CleanupSoftDeletes')
2932

33+
const KB_ORPHAN_BINDING_BATCH_SIZE = 500
34+
const KB_ORPHAN_BINDING_TOTAL_LIMIT = 5_000
35+
/**
36+
* Grace window before an unreferenced KB binding is swept. Comfortably longer
37+
* than any presign → upload → document-insert flow, so an in-flight upload is
38+
* never mistaken for an abandoned one.
39+
*/
40+
const KB_ORPHAN_BINDING_GRACE_HOURS = 7 * 24
41+
const KB_ORPHAN_BINDING_WORKSPACE_CHUNK = 50
42+
3043
interface WorkspaceFileScope {
3144
/** Rows from `workspace_file` (singular, legacy workspace-context only). */
3245
legacyRows: Array<{ id: string; key: string }>
@@ -162,6 +175,87 @@ const CLEANUP_TARGETS = [
162175
},
163176
] as const
164177

178+
/**
179+
* Sweep abandoned knowledge-base ownership bindings. The presigned upload flow
180+
* writes a `workspace_files` binding when it hands out an upload URL, before the
181+
* object is stored and before any document is created. If the upload is never
182+
* completed, that binding is orphaned — no `document.storageKey` ever references
183+
* its key. Such bindings are inert (read access requires a live document, and
184+
* the move re-point only follows referenced keys), but they accumulate, so we
185+
* drop the best-effort object and soft-delete the binding once they are older
186+
* than the grace window.
187+
*/
188+
async function cleanupOrphanedKnowledgeBaseBindings(
189+
workspaceIds: string[],
190+
label: string
191+
): Promise<{ total: number; deleted: number; failed: number }> {
192+
const stats = { total: 0, deleted: 0, failed: 0 }
193+
if (workspaceIds.length === 0) return stats
194+
195+
const orphanCutoff = new Date(Date.now() - KB_ORPHAN_BINDING_GRACE_HOURS * 60 * 60 * 1000)
196+
197+
for (const chunkIds of chunkArray(workspaceIds, KB_ORPHAN_BINDING_WORKSPACE_CHUNK)) {
198+
let attempted = 0
199+
while (attempted < KB_ORPHAN_BINDING_TOTAL_LIMIT) {
200+
const limit = Math.min(
201+
KB_ORPHAN_BINDING_BATCH_SIZE,
202+
KB_ORPHAN_BINDING_TOTAL_LIMIT - attempted
203+
)
204+
const rows = await db
205+
.select({ key: workspaceFiles.key })
206+
.from(workspaceFiles)
207+
.where(
208+
and(
209+
inArray(workspaceFiles.workspaceId, chunkIds),
210+
eq(workspaceFiles.context, 'knowledge-base'),
211+
isNull(workspaceFiles.deletedAt),
212+
lt(workspaceFiles.uploadedAt, orphanCutoff),
213+
sql`NOT EXISTS (
214+
SELECT 1 FROM ${document} AS doc
215+
WHERE doc.storage_key = ${workspaceFiles.key}
216+
)`
217+
)
218+
)
219+
.orderBy(asc(workspaceFiles.uploadedAt), asc(workspaceFiles.key))
220+
.limit(limit)
221+
222+
if (rows.length === 0) break
223+
224+
const keys = rows.map((row) => row.key)
225+
stats.total += keys.length
226+
attempted += keys.length
227+
228+
if (isUsingCloudStorage()) {
229+
const result = await StorageService.deleteFiles(keys, 'knowledge-base')
230+
stats.failed += result.failed.length
231+
for (const { key, error } of result.failed) {
232+
logger.error(`[${label}] Failed to delete orphan KB object ${key}:`, { error })
233+
}
234+
}
235+
236+
let deletedThisBatch = 0
237+
for (const key of keys) {
238+
try {
239+
await deleteFileMetadata(key)
240+
deletedThisBatch++
241+
} catch (error) {
242+
stats.failed++
243+
logger.error(`[${label}] Failed to delete orphan KB binding ${key}:`, { error })
244+
}
245+
}
246+
stats.deleted += deletedThisBatch
247+
248+
// No progress (every delete failed) — stop rather than reselect the same rows.
249+
if (deletedThisBatch === 0) break
250+
}
251+
}
252+
253+
logger.info(
254+
`[${label}/kb_orphan_bindings] Complete: ${stats.deleted}/${stats.total} bindings cleaned, ${stats.failed} failed`
255+
)
256+
return stats
257+
}
258+
165259
export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise<void> {
166260
const startTime = Date.now()
167261
const { workspaceIds, retentionHours, label } = payload
@@ -256,8 +350,10 @@ export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise
256350
totalDeleted += result.deleted
257351
}
258352

353+
const orphanBindingStats = await cleanupOrphanedKnowledgeBaseBindings(workspaceIds, label)
354+
259355
logger.info(
260-
`[${label}] Complete: ${totalDeleted} rows deleted, ${fileStats.filesDeleted} files cleaned`
356+
`[${label}] Complete: ${totalDeleted} rows deleted, ${fileStats.filesDeleted} files cleaned, ${orphanBindingStats.deleted} orphan KB bindings cleaned`
261357
)
262358

263359
// Clean up copilot backend + chat storage files after DB rows are gone

apps/sim/lib/knowledge/connectors/sync-engine.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -974,6 +974,7 @@ async function addDocument(
974974
const storageKey = extractStorageKey(urlPath)
975975
if (storageKey && storageKey !== urlPath) {
976976
await deleteFile({ key: storageKey, context: 'knowledge-base' }).catch(() => undefined)
977+
await deleteFileMetadata(storageKey).catch(() => undefined)
977978
}
978979
throw error
979980
}
@@ -1069,6 +1070,7 @@ async function updateDocument(
10691070
const storageKey = extractStorageKey(urlPath)
10701071
if (storageKey && storageKey !== urlPath) {
10711072
await deleteFile({ key: storageKey, context: 'knowledge-base' }).catch(() => undefined)
1073+
await deleteFileMetadata(storageKey).catch(() => undefined)
10721074
}
10731075
throw error
10741076
}

0 commit comments

Comments
 (0)