Skip to content

Commit 4967305

Browse files
improvement(logs): object storage backed tracespans (#4787)
* improvement(logs): obj storage backed tracespans * fix storage write context * fix tests * address comments * address comments * chore(db): remove migration 0219 to regenerate after staging merge Drops the 0219_robust_shard SQL, its snapshot, and the journal entry so the trace-spans/cost schema migration can be regenerated on top of the latest staging migration chain (avoids a number collision with staging's migrations). Co-authored-by: Cursor <cursoragent@cursor.com> * improvement(billing): accurate per-member usage via shared ledger helper Per-member/per-user usage in the org-member routes now adds the usage_log ledger to the currentPeriodCost baseline (which is no longer incremented), via a shared getOrgMemberLedgerByUser helper to avoid repeating the subscription→period→ledger lookup across the admin and member-facing routes. Co-authored-by: Cursor <cursoragent@cursor.com> * regen migrations * update migration * address comments * more code cleanup * incorrect type cast --------- Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent c95aa87 commit 4967305

61 files changed

Lines changed: 20011 additions & 940 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

apps/sim/app/api/billing/route.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,6 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
140140
members: rawBillingData.members.map((m) => ({
141141
...m,
142142
joinedAt: m.joinedAt.toISOString(),
143-
lastActive: m.lastActive?.toISOString() || null,
144143
})),
145144
}
146145

apps/sim/app/api/knowledge/utils.test.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,12 @@ describe('Knowledge Utils', () => {
259259
{}
260260
)
261261

262-
expect(dbOps.order).toEqual(['insert', 'updateDoc'])
262+
// Embeddings are inserted first, then the document counter update. A
263+
// usage_log billing insert (recordUsage) may trail after updateDoc and is
264+
// irrelevant to this ordering invariant, so assert position rather than
265+
// exact array equality.
266+
expect(dbOps.order[0]).toBe('insert')
267+
expect(dbOps.order.indexOf('updateDoc')).toBeGreaterThan(0)
263268

264269
expect(dbOps.updatePayloads[0]).toMatchObject({
265270
processingStatus: 'completed',

apps/sim/app/api/logs/execution/[executionId]/route.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { executionIdParamsSchema } from '@/lib/api/contracts/logs'
1313
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
1414
import { generateRequestId } from '@/lib/core/utils/request'
1515
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
16+
import { materializeExecutionData } from '@/lib/logs/execution/trace-store'
1617
import type { TraceSpan, WorkflowExecutionLog } from '@/lib/logs/types'
1718

1819
const logger = createLogger('LogsByExecutionIdAPI')
@@ -39,13 +40,14 @@ export const GET = withRouteHandler(
3940
.select({
4041
id: workflowExecutionLogs.id,
4142
workflowId: workflowExecutionLogs.workflowId,
43+
workspaceId: workflowExecutionLogs.workspaceId,
4244
executionId: workflowExecutionLogs.executionId,
4345
stateSnapshotId: workflowExecutionLogs.stateSnapshotId,
4446
trigger: workflowExecutionLogs.trigger,
4547
startedAt: workflowExecutionLogs.startedAt,
4648
endedAt: workflowExecutionLogs.endedAt,
4749
totalDurationMs: workflowExecutionLogs.totalDurationMs,
48-
cost: workflowExecutionLogs.cost,
50+
costTotal: workflowExecutionLogs.costTotal,
4951
executionData: workflowExecutionLogs.executionData,
5052
})
5153
.from(workflowExecutionLogs)
@@ -119,7 +121,14 @@ export const GET = withRouteHandler(
119121
return NextResponse.json({ error: 'Workflow state snapshot not found' }, { status: 404 })
120122
}
121123

122-
const executionData = workflowLog.executionData as WorkflowExecutionLog['executionData']
124+
const executionData = (await materializeExecutionData(
125+
workflowLog.executionData as Record<string, unknown> | null,
126+
{
127+
workspaceId: workflowLog.workspaceId,
128+
workflowId: workflowLog.workflowId,
129+
executionId: workflowLog.executionId,
130+
}
131+
)) as WorkflowExecutionLog['executionData']
123132
const traceSpans = (executionData?.traceSpans as TraceSpan[]) || []
124133
const childSnapshotIds = new Set<string>()
125134
const collectSnapshotIds = (spans: TraceSpan[]) => {
@@ -163,7 +172,7 @@ export const GET = withRouteHandler(
163172
startedAt: workflowLog.startedAt.toISOString(),
164173
endedAt: workflowLog.endedAt?.toISOString(),
165174
totalDurationMs: workflowLog.totalDurationMs,
166-
cost: workflowLog.cost || null,
175+
cost: workflowLog.costTotal != null ? { total: Number(workflowLog.costTotal) } : null,
167176
},
168177
}
169178

apps/sim/app/api/logs/export/route.ts

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import { createLogger } from '@sim/logger'
44
import { and, desc, eq, sql } from 'drizzle-orm'
55
import { type NextRequest, NextResponse } from 'next/server'
66
import { getSession } from '@/lib/auth'
7+
import { MATERIALIZE_CONCURRENCY, mapWithConcurrency } from '@/lib/core/utils/concurrency'
78
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
9+
import { materializeExecutionData } from '@/lib/logs/execution/trace-store'
810
import { buildFilterConditions, LogFilterParamsSchema } from '@/lib/logs/filters'
911
import { expandFolderIdsWithDescendants } from '@/lib/logs/folder-expansion'
1012

@@ -41,7 +43,7 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
4143
startedAt: workflowExecutionLogs.startedAt,
4244
endedAt: workflowExecutionLogs.endedAt,
4345
totalDurationMs: workflowExecutionLogs.totalDurationMs,
44-
cost: workflowExecutionLogs.cost,
46+
costTotal: workflowExecutionLogs.costTotal,
4547
executionData: workflowExecutionLogs.executionData,
4648
workflowName: sql<string>`COALESCE(${workflow.name}, 'Deleted Workflow')`,
4749
}
@@ -96,32 +98,55 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
9698

9799
if (!rows.length) break
98100

99-
for (const r of rows as any[]) {
101+
// Heavy execution data may live in object storage; materialize per
102+
// row with bounded concurrency so a 1000-row page doesn't fan out
103+
// into 1000 simultaneous reads.
104+
const materialized = await mapWithConcurrency(
105+
rows as any[],
106+
MATERIALIZE_CONCURRENCY,
107+
(r) =>
108+
materializeExecutionData(r.executionData as Record<string, unknown> | null, {
109+
workspaceId: params.workspaceId,
110+
workflowId: r.workflowId,
111+
executionId: r.executionId,
112+
})
113+
)
114+
115+
for (let j = 0; j < rows.length; j++) {
116+
const r = rows[j] as any
117+
const ed = materialized[j] as Record<string, any>
118+
// A single malformed/unserializable row must not abort the whole CSV
119+
// stream — derive the message/trace columns defensively and fall back
120+
// to empty on error so the row's metadata still exports.
100121
let message = ''
101-
let traces: any = null
122+
let tracesJson = ''
102123
try {
103-
const ed = (r as any).executionData
104124
if (ed) {
105125
if (ed.finalOutput)
106126
message =
107127
typeof ed.finalOutput === 'string'
108128
? ed.finalOutput
109129
: JSON.stringify(ed.finalOutput)
110130
if (ed.message) message = ed.message
111-
if (ed.traceSpans) traces = ed.traceSpans
131+
if (ed.traceSpans) tracesJson = JSON.stringify(ed.traceSpans)
112132
}
113-
} catch {}
133+
} catch (rowError) {
134+
logger.warn('Skipping unserializable execution data for export row', {
135+
executionId: r.executionId,
136+
error: rowError instanceof Error ? rowError.message : String(rowError),
137+
})
138+
}
114139
const line = [
115140
escapeCsv(r.startedAt?.toISOString?.() || r.startedAt),
116141
escapeCsv(r.level),
117142
escapeCsv(r.workflowName),
118143
escapeCsv(r.trigger),
119144
escapeCsv(r.totalDurationMs ?? ''),
120-
escapeCsv(r.cost?.total ?? r.cost?.value?.total ?? ''),
145+
escapeCsv(r.costTotal ?? ''),
121146
escapeCsv(r.workflowId ?? ''),
122147
escapeCsv(r.executionId ?? ''),
123148
escapeCsv(message),
124-
escapeCsv(traces ? JSON.stringify(traces) : ''),
149+
escapeCsv(tracesJson),
125150
].join(',')
126151
controller.enqueue(encoder.encode(`${line}\n`))
127152
}

apps/sim/app/api/logs/route.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import { listLogsContract, type WorkflowLogSummary } from '@/lib/api/contracts/l
3131
import { parseRequest } from '@/lib/api/server'
3232
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
3333
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
34+
import { jobCostTotal } from '@/lib/logs/fetch-log-detail'
3435
import { buildFilterConditions } from '@/lib/logs/filters'
3536
import { expandFolderIdsWithDescendants } from '@/lib/logs/folder-expansion'
3637

@@ -81,7 +82,8 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
8182
case 'duration':
8283
return sql`${workflowExecutionLogs.totalDurationMs}`
8384
case 'cost':
84-
return sql`(${workflowExecutionLogs.cost}->>'total')::numeric`
85+
// Indexed projection of the usage_log ledger (dollars); no live aggregation.
86+
return sql`${workflowExecutionLogs.costTotal}`
8587
case 'status':
8688
return sql`${workflowExecutionLogs.status}`
8789
default:
@@ -201,7 +203,7 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
201203
startedAt: workflowExecutionLogs.startedAt,
202204
endedAt: workflowExecutionLogs.endedAt,
203205
totalDurationMs: workflowExecutionLogs.totalDurationMs,
204-
cost: workflowExecutionLogs.cost,
206+
costTotal: workflowExecutionLogs.costTotal,
205207
createdAt: workflowExecutionLogs.createdAt,
206208
workflowName: workflow.name,
207209
workflowDescription: workflow.description,
@@ -379,7 +381,9 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
379381
}
380382
: null,
381383
jobTitle: null,
382-
cost: (log.cost as WorkflowLogSummary['cost']) ?? null,
384+
// List cost is the cost_total projection (faithful ledger sum). Null until
385+
// completion (running) or until the one-time legacy backfill populates it.
386+
cost: log.costTotal != null ? { total: Number(log.costTotal) } : null,
383387
pauseSummary: {
384388
status: log.pausedStatus ?? null,
385389
total: totalPauseCount,
@@ -405,7 +409,7 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
405409
createdAt: log.startedAt.toISOString(),
406410
workflow: null,
407411
jobTitle: log.jobTitle ?? null,
408-
cost: (log.cost as WorkflowLogSummary['cost']) ?? null,
412+
cost: jobCostTotal(log.cost),
409413
pauseSummary: { status: null, total: 0, resumed: 0 },
410414
hasPendingPause: false,
411415
}

apps/sim/app/api/organizations/[id]/members/[memberId]/route.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
import { parseRequest } from '@/lib/api/server'
1212
import { getSession } from '@/lib/auth'
1313
import { setActiveOrganizationForCurrentSession } from '@/lib/auth/active-organization'
14+
import { getOrgMemberLedgerByUser } from '@/lib/billing/core/organization'
1415
import { getUserUsageData } from '@/lib/billing/core/usage'
1516
import {
1617
removeExternalUserFromOrganizationWorkspaces,
@@ -101,10 +102,25 @@ export const GET = withRouteHandler(
101102
const computed = await getUserUsageData(memberId)
102103

103104
if (usageData.length > 0) {
105+
// currentPeriodCost is only a baseline; add this member's attributed
106+
// usage_log for the period. (getUserUsageData returns the org POOL for
107+
// org-scoped members, so it can't supply the per-member figure.)
108+
const memberLedger =
109+
(
110+
await getOrgMemberLedgerByUser(
111+
organizationId,
112+
computed.billingPeriodStart && computed.billingPeriodEnd
113+
? { start: computed.billingPeriodStart, end: computed.billingPeriodEnd }
114+
: null
115+
)
116+
).get(memberId) ?? 0
104117
memberData = {
105118
...memberData,
106119
usage: {
107120
...usageData[0],
121+
currentPeriodCost: (
122+
Number(usageData[0].currentPeriodCost ?? 0) + memberLedger
123+
).toString(),
108124
billingPeriodStart: computed.billingPeriodStart,
109125
billingPeriodEnd: computed.billingPeriodEnd,
110126
},

apps/sim/app/api/organizations/[id]/members/route.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import {
1717
} from '@/lib/api/contracts/organization'
1818
import { getValidationErrorMessage, parseRequest } from '@/lib/api/server'
1919
import { getSession } from '@/lib/auth'
20+
import { getOrgMemberLedgerByUser } from '@/lib/billing/core/organization'
2021
import { ENTITLED_SUBSCRIPTION_STATUSES } from '@/lib/billing/subscriptions/utils'
2122
import { validateSeatAvailability } from '@/lib/billing/validation/seat-management'
2223
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
@@ -139,8 +140,21 @@ export const GET = withRouteHandler(
139140
const billingPeriodStart = orgSub?.periodStart ?? null
140141
const billingPeriodEnd = orgSub?.periodEnd ?? null
141142

143+
// currentPeriodCost is only a baseline; add each member's attributed
144+
// usage_log for the period (batched, one query) so the roster shows real
145+
// usage rather than the frozen baseline.
146+
const usageByUser = await getOrgMemberLedgerByUser(
147+
organizationId,
148+
billingPeriodStart && billingPeriodEnd
149+
? { start: billingPeriodStart, end: billingPeriodEnd }
150+
: null
151+
)
152+
142153
const membersWithUsage = base.map((row) => ({
143154
...row,
155+
currentPeriodCost: (
156+
Number(row.currentPeriodCost ?? 0) + (usageByUser.get(row.userId) ?? 0)
157+
).toString(),
144158
billingPeriodStart,
145159
billingPeriodEnd,
146160
}))

apps/sim/app/api/v1/admin/organizations/[id]/members/[memberId]/route.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import {
3535
adminV1UpdateOrganizationMemberContract,
3636
} from '@/lib/api/contracts/v1/admin'
3737
import { parseRequest } from '@/lib/api/server'
38+
import { getOrgMemberLedgerByUser } from '@/lib/billing/core/organization'
3839
import { removeUserFromOrganization } from '@/lib/billing/organizations/membership'
3940
import { isBillingEnabled } from '@/lib/core/config/feature-flags'
4041
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
@@ -86,7 +87,6 @@ export const GET = withRouteHandler(
8687
userEmail: user.email,
8788
currentPeriodCost: userStats.currentPeriodCost,
8889
currentUsageLimit: userStats.currentUsageLimit,
89-
lastActive: userStats.lastActive,
9090
billingBlocked: userStats.billingBlocked,
9191
})
9292
.from(member)
@@ -99,6 +99,10 @@ export const GET = withRouteHandler(
9999
return notFoundResponse('Member')
100100
}
101101

102+
// currentPeriodCost is only a baseline; add this member's attributed
103+
// usage_log for the org's period so admin shows real current usage.
104+
const ledgerByUser = await getOrgMemberLedgerByUser(organizationId)
105+
102106
const data: AdminMemberDetail = {
103107
id: memberData.id,
104108
userId: memberData.userId,
@@ -107,9 +111,10 @@ export const GET = withRouteHandler(
107111
createdAt: memberData.createdAt.toISOString(),
108112
userName: memberData.userName,
109113
userEmail: memberData.userEmail,
110-
currentPeriodCost: memberData.currentPeriodCost ?? '0',
114+
currentPeriodCost: (
115+
Number(memberData.currentPeriodCost ?? 0) + (ledgerByUser.get(memberData.userId) ?? 0)
116+
).toString(),
111117
currentUsageLimit: memberData.currentUsageLimit,
112-
lastActive: memberData.lastActive?.toISOString() ?? null,
113118
billingBlocked: memberData.billingBlocked ?? false,
114119
}
115120

apps/sim/app/api/v1/admin/organizations/[id]/members/route.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import {
3737
adminV1ListOrganizationMembersContract,
3838
} from '@/lib/api/contracts/v1/admin'
3939
import { parseRequest } from '@/lib/api/server'
40+
import { getOrgMemberLedgerByUser } from '@/lib/billing/core/organization'
4041
import { addUserToOrganization } from '@/lib/billing/organizations/membership'
4142
import { isBillingEnabled } from '@/lib/core/config/feature-flags'
4243
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
@@ -96,7 +97,6 @@ export const GET = withRouteHandler(
9697
userEmail: user.email,
9798
currentPeriodCost: userStats.currentPeriodCost,
9899
currentUsageLimit: userStats.currentUsageLimit,
99-
lastActive: userStats.lastActive,
100100
billingBlocked: userStats.billingBlocked,
101101
})
102102
.from(member)
@@ -109,6 +109,11 @@ export const GET = withRouteHandler(
109109
])
110110

111111
const total = countResult[0].count
112+
113+
// currentPeriodCost is only a baseline; add each member's attributed
114+
// usage_log for the org's period so admin shows real current usage.
115+
const usageByUser = await getOrgMemberLedgerByUser(organizationId)
116+
112117
const data: AdminMemberDetail[] = membersData.map((m) => ({
113118
id: m.id,
114119
userId: m.userId,
@@ -117,9 +122,10 @@ export const GET = withRouteHandler(
117122
createdAt: m.createdAt.toISOString(),
118123
userName: m.userName,
119124
userEmail: m.userEmail,
120-
currentPeriodCost: m.currentPeriodCost ?? '0',
125+
currentPeriodCost: (
126+
Number(m.currentPeriodCost ?? 0) + (usageByUser.get(m.userId) ?? 0)
127+
).toString(),
121128
currentUsageLimit: m.currentUsageLimit,
122-
lastActive: m.lastActive?.toISOString() ?? null,
123129
billingBlocked: m.billingBlocked ?? false,
124130
}))
125131

apps/sim/app/api/v1/admin/organizations/[id]/seats/route.ts

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -51,16 +51,6 @@ export const GET = withRouteHandler(
5151
subscriptionPlan: analytics.subscriptionPlan,
5252
canAddSeats: analytics.canAddSeats,
5353
utilizationRate: analytics.utilizationRate,
54-
activeMembers: analytics.activeMembers,
55-
inactiveMembers: analytics.inactiveMembers,
56-
memberActivity: analytics.memberActivity.map((m) => ({
57-
userId: m.userId,
58-
userName: m.userName,
59-
userEmail: m.userEmail,
60-
role: m.role,
61-
joinedAt: m.joinedAt.toISOString(),
62-
lastActive: m.lastActive?.toISOString() ?? null,
63-
})),
6454
}
6555

6656
logger.info(`Admin API: Retrieved seat analytics for organization ${organizationId}`)

0 commit comments

Comments
 (0)