@@ -58,12 +58,16 @@ import {
5858 WorkspaceFile ,
5959 WorkspaceFileOperation ,
6060} from '@/lib/copilot/generated/tool-catalog-v1'
61- import { parsePersistedStreamEventEnvelopeJson } from '@/lib/copilot/request/session/contract'
61+ import {
62+ type ParseStreamEventEnvelopeFailure ,
63+ parsePersistedStreamEventEnvelope ,
64+ parsePersistedStreamEventEnvelopeJson ,
65+ } from '@/lib/copilot/request/session/contract'
6266import {
6367 type FilePreviewSession ,
6468 isFilePreviewSession ,
6569} from '@/lib/copilot/request/session/file-preview-session-contract'
66- import { isStreamBatchEvent , type StreamBatchEvent } from '@/lib/copilot/request/session/types'
70+ import { type StreamBatchEvent } from '@/lib/copilot/request/session/types'
6771import {
6872 extractResourcesFromToolResult ,
6973 isResourceToolName ,
@@ -509,27 +513,80 @@ function isRecord(value: unknown): value is Record<string, unknown> {
509513 return Boolean ( value ) && typeof value === 'object' && ! Array . isArray ( value )
510514}
511515
516+ const STREAM_SCHEMA_ENFORCEMENT_PREFIX = 'Client stream schema enforcement failed.'
517+
518+ class StreamSchemaValidationError extends Error {
519+ constructor ( message : string ) {
520+ super ( message )
521+ this . name = 'StreamSchemaValidationError'
522+ }
523+ }
524+
525+ function createStreamSchemaValidationError (
526+ failure : ParseStreamEventEnvelopeFailure ,
527+ context ?: string
528+ ) : StreamSchemaValidationError {
529+ const details = failure . errors ?. filter ( Boolean ) . join ( '; ' )
530+ return new StreamSchemaValidationError (
531+ [ STREAM_SCHEMA_ENFORCEMENT_PREFIX , context , failure . message , details ]
532+ . filter ( Boolean )
533+ . join ( ' ' )
534+ )
535+ }
536+
537+ function createBatchSchemaValidationError ( message : string ) : StreamSchemaValidationError {
538+ return new StreamSchemaValidationError ( [ STREAM_SCHEMA_ENFORCEMENT_PREFIX , message ] . join ( ' ' ) )
539+ }
540+
541+ function isStreamSchemaValidationError ( error : unknown ) : error is StreamSchemaValidationError {
542+ return error instanceof StreamSchemaValidationError
543+ }
544+
512545function parseStreamBatchResponse ( value : unknown ) : StreamBatchResponse {
513546 if ( ! isRecord ( value ) ) {
514547 throw new Error ( 'Invalid stream batch response' )
515548 }
516549
517550 const rawEvents = Array . isArray ( value . events ) ? value . events : [ ]
518551 const events : StreamBatchEvent [ ] = [ ]
519- for ( const entry of rawEvents ) {
520- if ( ! isStreamBatchEvent ( entry ) ) {
521- throw new Error ( 'Invalid stream batch event' )
552+ for ( const [ index , entry ] of rawEvents . entries ( ) ) {
553+ if ( ! isRecord ( entry ) ) {
554+ throw createBatchSchemaValidationError ( `Reconnect batch event ${ index + 1 } is not an object.` )
522555 }
523- events . push ( entry )
556+ if (
557+ typeof entry . eventId !== 'number' ||
558+ ! Number . isFinite ( entry . eventId ) ||
559+ typeof entry . streamId !== 'string'
560+ ) {
561+ throw createBatchSchemaValidationError (
562+ `Reconnect batch event ${ index + 1 } is missing required metadata.`
563+ )
564+ }
565+
566+ const parsedEvent = parsePersistedStreamEventEnvelope ( entry . event )
567+ if ( ! parsedEvent . ok ) {
568+ throw createStreamSchemaValidationError (
569+ parsedEvent ,
570+ `Reconnect batch event ${ index + 1 } .`
571+ )
572+ }
573+
574+ events . push ( {
575+ eventId : entry . eventId ,
576+ streamId : entry . streamId ,
577+ event : parsedEvent . event ,
578+ } )
524579 }
525580
526581 const rawPreviewSessions = Array . isArray ( value . previewSessions )
527582 ? value . previewSessions
528583 : undefined
529584 const previewSessions =
530- rawPreviewSessions ?. map ( ( session ) => {
585+ rawPreviewSessions ?. map ( ( session , index ) => {
531586 if ( ! isFilePreviewSession ( session ) ) {
532- throw new Error ( 'Invalid stream preview session' )
587+ throw createBatchSchemaValidationError (
588+ `Reconnect preview session ${ index + 1 } failed validation.`
589+ )
533590 }
534591 return session
535592 } ) ?? undefined
@@ -1579,12 +1636,14 @@ export function useChat(
15791636
15801637 const parsedResult = parsePersistedStreamEventEnvelopeJson ( raw )
15811638 if ( ! parsedResult . ok ) {
1582- logger . warn ( 'Failed to parse chat SSE event' , {
1639+ const error = createStreamSchemaValidationError ( parsedResult , 'Live SSE event.' )
1640+ logger . error ( 'Rejected chat SSE event due to client-side schema enforcement' , {
15831641 reason : parsedResult . reason ,
15841642 message : parsedResult . message ,
15851643 errors : parsedResult . errors ,
1644+ error : error . message ,
15861645 } )
1587- continue
1646+ throw error
15881647 }
15891648 const parsed = parsedResult . event
15901649
@@ -2533,6 +2592,17 @@ export function useChat(
25332592 }
25342593 return true
25352594 }
2595+ if ( isStreamSchemaValidationError ( err ) ) {
2596+ logger . error ( 'Reconnect halted by client-side stream schema enforcement' , {
2597+ streamId,
2598+ attempt : attempt + 1 ,
2599+ error : err . message ,
2600+ } )
2601+ if ( streamGenRef . current === gen ) {
2602+ setError ( err . message )
2603+ }
2604+ return false
2605+ }
25362606 logger . warn ( 'Reconnect attempt failed' , {
25372607 streamId,
25382608 attempt : attempt + 1 ,
@@ -2892,6 +2962,13 @@ export function useChat(
28922962 }
28932963 } catch ( err ) {
28942964 if ( err instanceof Error && err . name === 'AbortError' ) return consumedByTranscript
2965+ if ( isStreamSchemaValidationError ( err ) ) {
2966+ setError ( err . message )
2967+ if ( streamGenRef . current === gen ) {
2968+ finalize ( { error : true } )
2969+ }
2970+ return consumedByTranscript
2971+ }
28952972
28962973 const activeStreamId = streamIdRef . current
28972974 if ( activeStreamId && streamGenRef . current === gen ) {
0 commit comments