From d59a0353bc6680c46621cb3a76f5588239a3b975 Mon Sep 17 00:00:00 2001 From: Manhua Date: Thu, 21 May 2026 15:40:03 +0800 Subject: [PATCH 1/3] [ZEPPELIN-6419] Fix clone paragraph content loss caused by overly aggressive shortCircuit seq filtering --- .../projects/zeppelin-sdk/src/message.ts | 28 ++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts b/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts index 29a05ddf8ca..a307cab9c83 100644 --- a/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts +++ b/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts @@ -54,6 +54,15 @@ export class Message { private uniqueClientId = Math.random().toString(36).substring(2, 7); private lastMsgIdSeqSent = 0; private readonly normalCloseCode = 1000; + /** + * Track which PARAGRAPH message seq IDs were explicitly short-circuited. + * Only these should be filtered out — not all messages where + * lastMsgIdSeqSent > receivedSeq (which can happen legitimately when + * unrelated requests like EDITOR_SETTING are interleaved). + */ + private shortCircuitedParagraphMsgIds = new Set(); + /** Prevent unbounded growth of the short-circuit tracker */ + private static readonly MAX_SHORT_CIRCUIT_SIZE = 100; constructor() { this.open$.subscribe(() => { @@ -185,8 +194,11 @@ export class Message { const isResponseForRequestFromThisClient = uniqueClientId === this.uniqueClientId; if (message.op === OP.PARAGRAPH) { - if (isResponseForRequestFromThisClient && this.lastMsgIdSeqSent > msgIdSeqReceived) { + if (isResponseForRequestFromThisClient && + this.shortCircuitedParagraphMsgIds.has(msgIdSeqReceived) + ) { console.log('PARAPGRAPH is already updated by shortcircuit'); + this.shortCircuitedParagraphMsgIds.delete(msgIdSeqReceived); return false; } else { return true; @@ -200,6 +212,20 @@ export class Message { } shortCircuit(message: WebSocketMessage) { + // Track which PARAGRAPH responses were explicitly short-circuited + // so the receive filter can correctly identify and skip them + if (message.op === OP.PARAGRAPH && message.msgId) { + const msgIdSeq = parseInt(message.msgId.split('-')[1], 10); + this.shortCircuitedParagraphMsgIds.add(msgIdSeq); + // Prevent unbounded growth: evict the oldest (smallest seq) entries + if (this.shortCircuitedParagraphMsgIds.size > Message.MAX_SHORT_CIRCUIT_SIZE) { + const sorted = [...this.shortCircuitedParagraphMsgIds].sort((a, b) => a - b); + const toDelete = sorted.slice(0, sorted.length - Message.MAX_SHORT_CIRCUIT_SIZE / 2); + for (const id of toDelete) { + this.shortCircuitedParagraphMsgIds.delete(id); + } + } + } this.received$.next(this.interceptReceived(message)); } From 77c59abc22f8d4ba3c6ba9c35dcf78ad5dcb9ec8 Mon Sep 17 00:00:00 2001 From: Manhua Date: Fri, 22 May 2026 09:00:24 +0800 Subject: [PATCH 2/3] fix CI --- .../src/interfaces/message-paragraph.interface.ts | 4 ++-- zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-paragraph.interface.ts b/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-paragraph.interface.ts index 689104668db..2ea3916ea1e 100644 --- a/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-paragraph.interface.ts +++ b/zeppelin-web-angular/projects/zeppelin-sdk/src/interfaces/message-paragraph.interface.ts @@ -77,10 +77,10 @@ export interface ParagraphConfig { } export interface ParagraphResults { + [index: number]: Record; + code?: string; msg?: ParagraphIResultsMsgItem[]; - - [index: number]: Record; } export enum DatasetType { diff --git a/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts b/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts index a307cab9c83..401a7bcf637 100644 --- a/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts +++ b/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts @@ -40,6 +40,9 @@ export type ReceiveArgumentsType = MessageReceiveDataTypeMap[K] extends undefined ? () => void : (data: MessageReceiveDataTypeMap[K]) => void; export class Message { + /** Prevent unbounded growth of the short-circuit tracker */ + private static readonly MAX_SHORT_CIRCUIT_SIZE = 100; + public connectedStatus = false; public connectedStatus$ = new Subject(); private ws: WebSocketSubject> | null = null; @@ -61,8 +64,6 @@ export class Message { * unrelated requests like EDITOR_SETTING are interleaved). */ private shortCircuitedParagraphMsgIds = new Set(); - /** Prevent unbounded growth of the short-circuit tracker */ - private static readonly MAX_SHORT_CIRCUIT_SIZE = 100; constructor() { this.open$.subscribe(() => { From 3e8d464beb8805f1e2fb58c2c40bf5e7b0563fa6 Mon Sep 17 00:00:00 2001 From: Manhua Date: Tue, 26 May 2026 14:29:20 +0800 Subject: [PATCH 3/3] update --- .../projects/zeppelin-sdk/src/message.ts | 48 ------------------- 1 file changed, 48 deletions(-) diff --git a/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts b/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts index 401a7bcf637..3471ec1ca7c 100644 --- a/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts +++ b/zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts @@ -40,9 +40,6 @@ export type ReceiveArgumentsType = MessageReceiveDataTypeMap[K] extends undefined ? () => void : (data: MessageReceiveDataTypeMap[K]) => void; export class Message { - /** Prevent unbounded growth of the short-circuit tracker */ - private static readonly MAX_SHORT_CIRCUIT_SIZE = 100; - public connectedStatus = false; public connectedStatus$ = new Subject(); private ws: WebSocketSubject> | null = null; @@ -57,13 +54,6 @@ export class Message { private uniqueClientId = Math.random().toString(36).substring(2, 7); private lastMsgIdSeqSent = 0; private readonly normalCloseCode = 1000; - /** - * Track which PARAGRAPH message seq IDs were explicitly short-circuited. - * Only these should be filtered out — not all messages where - * lastMsgIdSeqSent > receivedSeq (which can happen legitimately when - * unrelated requests like EDITOR_SETTING are interleaved). - */ - private shortCircuitedParagraphMsgIds = new Set(); constructor() { this.open$.subscribe(() => { @@ -184,49 +174,11 @@ export class Message { receive(op: K): Observable[K]> { return this.received$.pipe( filter(message => message.op === op), - filter(message => { - if (!message.msgId) { - // when msgId is not specified, it is not response to client request. - // always process them - return true; - } - const uniqueClientId = message.msgId.split('-')[0]; - const msgIdSeqReceived = parseInt(message.msgId.split('-')[1], 10); - const isResponseForRequestFromThisClient = uniqueClientId === this.uniqueClientId; - - if (message.op === OP.PARAGRAPH) { - if (isResponseForRequestFromThisClient && - this.shortCircuitedParagraphMsgIds.has(msgIdSeqReceived) - ) { - console.log('PARAPGRAPH is already updated by shortcircuit'); - this.shortCircuitedParagraphMsgIds.delete(msgIdSeqReceived); - return false; - } else { - return true; - } - } else { - return true; - } - }), map(message => message.data) ) as Observable[K]>; } shortCircuit(message: WebSocketMessage) { - // Track which PARAGRAPH responses were explicitly short-circuited - // so the receive filter can correctly identify and skip them - if (message.op === OP.PARAGRAPH && message.msgId) { - const msgIdSeq = parseInt(message.msgId.split('-')[1], 10); - this.shortCircuitedParagraphMsgIds.add(msgIdSeq); - // Prevent unbounded growth: evict the oldest (smallest seq) entries - if (this.shortCircuitedParagraphMsgIds.size > Message.MAX_SHORT_CIRCUIT_SIZE) { - const sorted = [...this.shortCircuitedParagraphMsgIds].sort((a, b) => a - b); - const toDelete = sorted.slice(0, sorted.length - Message.MAX_SHORT_CIRCUIT_SIZE / 2); - for (const id of toDelete) { - this.shortCircuitedParagraphMsgIds.delete(id); - } - } - } this.received$.next(this.interceptReceived(message)); }