Skip to content

Commit dceda7b

Browse files
committed
Checkpointed reconnect logic
1 parent 6f13c0b commit dceda7b

File tree

1 file changed

+28
-114
lines changed
  • apps/sim/app/workspace/[workspaceId]/home/hooks

1 file changed

+28
-114
lines changed

apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts

Lines changed: 28 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -169,9 +169,6 @@ const MAX_RECONNECT_ATTEMPTS = 10
169169
const RECONNECT_BASE_DELAY_MS = 1000
170170
const RECONNECT_MAX_DELAY_MS = 30_000
171171
const RECONNECT_EMPTY_BATCH_DELAY_MS = 500
172-
const RECONNECT_BATCH_REQUEST_TIMEOUT_MS = 15_000
173-
const RECONNECT_LIVE_TAIL_TIMEOUT_MS = 60_000
174-
const RECONNECT_RECOVERY_TIMEOUT_MS = 2 * 60_000
175172

176173
const logger = createLogger('useChat')
177174

@@ -197,43 +194,6 @@ function stringArrayParam(value: unknown): string[] {
197194
return value.filter((item): item is string => typeof item === 'string' && item.trim().length > 0)
198195
}
199196

200-
function createTimedAbortSignal(
201-
parentSignal: AbortSignal | undefined,
202-
timeoutMs: number,
203-
timeoutMessage: string
204-
): {
205-
signal: AbortSignal
206-
didTimeout: () => boolean
207-
cleanup: () => void
208-
} {
209-
const controller = new AbortController()
210-
let timedOut = false
211-
212-
const onAbort = () => {
213-
controller.abort(parentSignal?.reason)
214-
}
215-
216-
if (parentSignal?.aborted) {
217-
controller.abort(parentSignal.reason)
218-
} else if (parentSignal) {
219-
parentSignal.addEventListener('abort', onAbort, { once: true })
220-
}
221-
222-
const timeoutId = setTimeout(() => {
223-
timedOut = true
224-
controller.abort(new Error(timeoutMessage))
225-
}, timeoutMs)
226-
227-
return {
228-
signal: controller.signal,
229-
didTimeout: () => timedOut,
230-
cleanup: () => {
231-
clearTimeout(timeoutId)
232-
parentSignal?.removeEventListener('abort', onAbort)
233-
},
234-
}
235-
}
236-
237197
function resolveWorkflowNameForDisplay(workflowId: unknown): string | undefined {
238198
const id = stringParam(workflowId)
239199
if (!id) return undefined
@@ -2340,36 +2300,21 @@ export function useChat(
23402300
afterCursor: string,
23412301
signal?: AbortSignal
23422302
): Promise<StreamBatchResponse> => {
2343-
const timeoutMessage = `Timed out fetching stream batch for ${streamId}`
2344-
const timedAbort = createTimedAbortSignal(
2345-
signal,
2346-
RECONNECT_BATCH_REQUEST_TIMEOUT_MS,
2347-
timeoutMessage
2303+
const response = await fetch(
2304+
`/api/mothership/chat/stream?streamId=${encodeURIComponent(streamId)}&after=${encodeURIComponent(afterCursor)}&batch=true`,
2305+
{ signal }
23482306
)
2349-
try {
2350-
const response = await fetch(
2351-
`/api/mothership/chat/stream?streamId=${encodeURIComponent(streamId)}&after=${encodeURIComponent(afterCursor)}&batch=true`,
2352-
{ signal: timedAbort.signal }
2307+
if (!response.ok) {
2308+
throw await createResumeTransportError(
2309+
response,
2310+
`Stream resume batch failed: ${response.status}`
23532311
)
2354-
if (!response.ok) {
2355-
throw await createResumeTransportError(
2356-
response,
2357-
`Stream resume batch failed: ${response.status}`
2358-
)
2359-
}
2360-
const batch = parseStreamBatchResponse(await response.json())
2361-
if (Array.isArray(batch.previewSessions) && batch.previewSessions.length > 0) {
2362-
seedPreviewSessions(batch.previewSessions)
2363-
}
2364-
return batch
2365-
} catch (error) {
2366-
if (timedAbort.didTimeout() && !signal?.aborted) {
2367-
throw new Error(timeoutMessage)
2368-
}
2369-
throw error
2370-
} finally {
2371-
timedAbort.cleanup()
23722312
}
2313+
const batch = parseStreamBatchResponse(await response.json())
2314+
if (Array.isArray(batch.previewSessions) && batch.previewSessions.length > 0) {
2315+
seedPreviewSessions(batch.previewSessions)
2316+
}
2317+
return batch
23732318
},
23742319
[seedPreviewSessions]
23752320
)
@@ -2387,7 +2332,6 @@ export function useChat(
23872332
let seedEvents = opts.initialBatch?.events ?? []
23882333
let streamStatus = opts.initialBatch?.status ?? 'unknown'
23892334
let streamError = opts.initialBatch?.error
2390-
const reconnectStartedAt = Date.now()
23912335

23922336
const isStaleReconnect = () =>
23932337
streamGenRef.current !== expectedGen || abortControllerRef.current?.signal.aborted === true
@@ -2401,15 +2345,6 @@ export function useChat(
24012345

24022346
try {
24032347
while (streamGenRef.current === expectedGen) {
2404-
if (Date.now() - reconnectStartedAt >= RECONNECT_RECOVERY_TIMEOUT_MS) {
2405-
logger.warn('Reconnect tail timed out waiting for terminal state', {
2406-
streamId,
2407-
afterCursor: latestCursor,
2408-
elapsedMs: Date.now() - reconnectStartedAt,
2409-
})
2410-
throw new Error(RECONNECT_TAIL_ERROR)
2411-
}
2412-
24132348
if (seedEvents.length > 0) {
24142349
const replayResult = await processSSEStreamRef.current(
24152350
buildReplayStream(seedEvents).getReader(),
@@ -2440,47 +2375,26 @@ export function useChat(
24402375

24412376
logger.info('Opening live stream tail', { streamId, afterCursor: latestCursor })
24422377

2443-
const tailTimeoutMessage = `Timed out waiting for live reconnect tail for ${streamId}`
2444-
const timedTailAbort = createTimedAbortSignal(
2445-
activeAbort.signal,
2446-
RECONNECT_LIVE_TAIL_TIMEOUT_MS,
2447-
tailTimeoutMessage
2378+
const sseRes = await fetch(
2379+
`/api/mothership/chat/stream?streamId=${encodeURIComponent(streamId)}&after=${encodeURIComponent(latestCursor)}`,
2380+
{ signal: activeAbort.signal }
24482381
)
2449-
let liveResult: { sawStreamError: boolean; sawComplete: boolean }
2450-
try {
2451-
const sseRes = await fetch(
2452-
`/api/mothership/chat/stream?streamId=${encodeURIComponent(streamId)}&after=${encodeURIComponent(latestCursor)}`,
2453-
{ signal: timedTailAbort.signal }
2454-
)
2455-
if (!sseRes.ok || !sseRes.body) {
2456-
throw await createResumeTransportError(sseRes, RECONNECT_TAIL_ERROR)
2457-
}
2382+
if (!sseRes.ok || !sseRes.body) {
2383+
throw await createResumeTransportError(sseRes, RECONNECT_TAIL_ERROR)
2384+
}
24582385

2459-
if (isStaleReconnect()) {
2460-
return { error: false, aborted: true }
2461-
}
2386+
if (isStaleReconnect()) {
2387+
return { error: false, aborted: true }
2388+
}
24622389

2463-
setTransportStreaming()
2390+
setTransportStreaming()
24642391

2465-
liveResult = await processSSEStreamRef.current(
2466-
sseRes.body.getReader(),
2467-
assistantId,
2468-
expectedGen,
2469-
{ preserveExistingState: true }
2470-
)
2471-
} catch (error) {
2472-
if (timedTailAbort.didTimeout() && !activeAbort.signal.aborted) {
2473-
logger.warn('Live reconnect tail request timed out', {
2474-
streamId,
2475-
afterCursor: latestCursor,
2476-
timeoutMs: RECONNECT_LIVE_TAIL_TIMEOUT_MS,
2477-
})
2478-
throw new Error(RECONNECT_TAIL_ERROR)
2479-
}
2480-
throw error
2481-
} finally {
2482-
timedTailAbort.cleanup()
2483-
}
2392+
const liveResult = await processSSEStreamRef.current(
2393+
sseRes.body.getReader(),
2394+
assistantId,
2395+
expectedGen,
2396+
{ preserveExistingState: true }
2397+
)
24842398

24852399
if (liveResult.sawStreamError) {
24862400
return { error: true, aborted: false }

0 commit comments

Comments
 (0)