Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ export interface ParagraphConfig {
}

export interface ParagraphResults {
[index: number]: Record<string, unknown>;

code?: string;
msg?: ParagraphIResultsMsgItem[];

[index: number]: Record<string, unknown>;
}

export enum DatasetType {
Expand Down
29 changes: 28 additions & 1 deletion zeppelin-web-angular/projects/zeppelin-sdk/src/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ export type ReceiveArgumentsType<K extends keyof MessageReceiveDataTypeMap> =
MessageReceiveDataTypeMap[K] extends undefined ? () => void : (data: MessageReceiveDataTypeMap[K]) => void;

export class Message {
/** Prevent unbounded growth of the short-circuit tracker */
private static readonly MAX_SHORT_CIRCUIT_SIZE = 100;

public connectedStatus = false;
public connectedStatus$ = new Subject<boolean>();
private ws: WebSocketSubject<WebSocketMessage<MessageDataTypeMap>> | null = null;
Expand All @@ -54,6 +57,13 @@ export class Message {
private uniqueClientId = Math.random().toString(36).substring(2, 7);
private lastMsgIdSeqSent = 0;
private readonly normalCloseCode = 1000;
/**
* Track which PARAGRAPH message seq IDs were explicitly short-circuited.
* Only these should be filtered out — not all messages where
* lastMsgIdSeqSent > receivedSeq (which can happen legitimately when
* unrelated requests like EDITOR_SETTING are interleaved).
*/
private shortCircuitedParagraphMsgIds = new Set<number>();

constructor() {
this.open$.subscribe(() => {
Expand Down Expand Up @@ -185,8 +195,11 @@ export class Message {
const isResponseForRequestFromThisClient = uniqueClientId === this.uniqueClientId;

if (message.op === OP.PARAGRAPH) {
if (isResponseForRequestFromThisClient && this.lastMsgIdSeqSent > msgIdSeqReceived) {
if (isResponseForRequestFromThisClient &&
this.shortCircuitedParagraphMsgIds.has(msgIdSeqReceived)
) {
console.log('PARAPGRAPH is already updated by shortcircuit');
this.shortCircuitedParagraphMsgIds.delete(msgIdSeqReceived);
return false;
} else {
return true;
Expand All @@ -200,6 +213,20 @@ export class Message {
}

shortCircuit(message: WebSocketMessage<MessageReceiveDataTypeMap>) {
// Track which PARAGRAPH responses were explicitly short-circuited
// so the receive filter can correctly identify and skip them
if (message.op === OP.PARAGRAPH && message.msgId) {
const msgIdSeq = parseInt(message.msgId.split('-')[1], 10);
this.shortCircuitedParagraphMsgIds.add(msgIdSeq);
// Prevent unbounded growth: evict the oldest (smallest seq) entries
if (this.shortCircuitedParagraphMsgIds.size > Message.MAX_SHORT_CIRCUIT_SIZE) {
const sorted = [...this.shortCircuitedParagraphMsgIds].sort((a, b) => a - b);
const toDelete = sorted.slice(0, sorted.length - Message.MAX_SHORT_CIRCUIT_SIZE / 2);
for (const id of toDelete) {
this.shortCircuitedParagraphMsgIds.delete(id);
}
}
}
this.received$.next(this.interceptReceived(message));
}

Expand Down
Loading