From c0e6d8029558b519dbf57bc948df636899afb2f3 Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Thu, 16 Apr 2026 13:06:33 -0700 Subject: [PATCH 1/2] fix(executor): subflow edge keys mismatch' --- .../executor/execution/edge-manager.test.ts | 165 ++++++++++++++++++ apps/sim/executor/execution/edge-manager.ts | 12 +- apps/sim/executor/orchestrators/loop.ts | 34 +--- apps/sim/executor/orchestrators/parallel.ts | 37 +--- apps/sim/executor/utils/subflow-utils.ts | 59 +++++++ .../execution/trace-spans/trace-spans.test.ts | 112 ++++++++++++ .../logs/execution/trace-spans/trace-spans.ts | 6 +- 7 files changed, 356 insertions(+), 69 deletions(-) diff --git a/apps/sim/executor/execution/edge-manager.test.ts b/apps/sim/executor/execution/edge-manager.test.ts index a1319574959..44fb3b10944 100644 --- a/apps/sim/executor/execution/edge-manager.test.ts +++ b/apps/sim/executor/execution/edge-manager.test.ts @@ -600,6 +600,171 @@ describe('EdgeManager', () => { }) expect(readyNodes).toContain(function1Id) }) + + /** + * Regression for the substring-match bug in clearDeactivatedEdgesForNodes. + * + * Reproduces the real workflow pattern where an empty upstream loop (e.g. KG) cascade + * deactivates its `loop_exit` edge into the next loop's sentinel-start (e.g. SBJ). When + * SBJ iterates and resets its state between iterations, the old buggy `includes(\`-${nodeId}-\`)` + * check matched edge keys where the sentinel was the TARGET (not the source), wrongly + * reactivating that external edge. That made countActiveIncomingEdges see a phantom pending + * upstream and SBJ's sentinel-start stopped being ready, stalling the loop after iteration 1. + */ + it('should not re-activate external cascade-deactivated edges pointing INTO a loop node', () => { + const externalNodeId = 'external-node' + const sbjSentinelStartId = 'loop-sbj-sentinel-start' + const sbjSentinelEndId = 'loop-sbj-sentinel-end' + const bodyNodeId = 'body-node' + + const externalNode = createMockNode(externalNodeId, [ + { target: sbjSentinelStartId, sourceHandle: 'condition-if' }, + ]) + const sbjSentinelStartNode = createMockNode( + sbjSentinelStartId, + [{ target: bodyNodeId }], + [externalNodeId] + ) + const bodyNode = createMockNode( + bodyNodeId, + [{ target: sbjSentinelEndId }], + [sbjSentinelStartId] + ) + const sbjSentinelEndNode = createMockNode(sbjSentinelEndId, [], [bodyNodeId]) + + const nodes = new Map([ + [externalNodeId, externalNode], + [sbjSentinelStartId, sbjSentinelStartNode], + [bodyNodeId, bodyNode], + [sbjSentinelEndId, sbjSentinelEndNode], + ]) + + const dag = createMockDAG(nodes) + const edgeManager = new EdgeManager(dag) + + edgeManager.processOutgoingEdges(externalNode, { selectedOption: 'else' }) + + expect(edgeManager.isNodeReady(sbjSentinelStartNode)).toBe(true) + + edgeManager.clearDeactivatedEdgesForNodes( + new Set([sbjSentinelStartId, sbjSentinelEndId, bodyNodeId]) + ) + + expect(edgeManager.isNodeReady(sbjSentinelStartNode)).toBe(true) + }) + + /** + * End-to-end regression: after a loop reset while an external edge is cascade-deactivated, + * the backwards `loop_continue` edge from sentinel-end must still mark sentinel-start as + * ready. The old code removed the external edge's deactivation entry, leaving a phantom + * active incoming and producing the exact "loop stops after 1 iteration" symptom the user + * hit on the Group A workflow. + */ + it('should leave sbjSentinelStart ready after loop reset when external edge is cascade-deactivated', () => { + const externalNodeId = 'external-node' + const sbjSentinelStartId = 'loop-sbj-sentinel-start' + const sbjSentinelEndId = 'loop-sbj-sentinel-end' + const bodyNodeId = 'body-node' + + const externalNode = createMockNode(externalNodeId, [ + { target: sbjSentinelStartId, sourceHandle: 'condition-if' }, + ]) + const sbjSentinelStartNode = createMockNode( + sbjSentinelStartId, + [{ target: bodyNodeId }], + [externalNodeId] + ) + const bodyNode = createMockNode( + bodyNodeId, + [{ target: sbjSentinelEndId }], + [sbjSentinelStartId] + ) + const sbjSentinelEndNode = createMockNode( + sbjSentinelEndId, + [{ target: sbjSentinelStartId, sourceHandle: 'loop_continue' }], + [bodyNodeId] + ) + + const nodes = new Map([ + [externalNodeId, externalNode], + [sbjSentinelStartId, sbjSentinelStartNode], + [bodyNodeId, bodyNode], + [sbjSentinelEndId, sbjSentinelEndNode], + ]) + + const dag = createMockDAG(nodes) + const edgeManager = new EdgeManager(dag) + + edgeManager.processOutgoingEdges(externalNode, { selectedOption: 'else' }) + + edgeManager.clearDeactivatedEdgesForNodes( + new Set([sbjSentinelStartId, sbjSentinelEndId, bodyNodeId]) + ) + + const readyNodes = edgeManager.processOutgoingEdges(sbjSentinelEndNode, { + selectedRoute: 'loop_continue', + }) + + expect(readyNodes).toContain(sbjSentinelStartId) + }) + + /** + * Guard against an overly narrow fix: edges whose SOURCE is inside the loop (e.g. a body + * node that deactivated its outgoing edge during the previous iteration) must still be + * cleared on reset so the next iteration can traverse them. + */ + it('should re-activate internal loop edges (source inside loop) when resetting loop state', () => { + const sbjSentinelStartId = 'loop-sbj-sentinel-start' + const sbjSentinelEndId = 'loop-sbj-sentinel-end' + const conditionInLoopId = 'condition-in-loop' + const thenBranchId = 'then-branch' + + const sbjSentinelStartNode = createMockNode(sbjSentinelStartId, [ + { target: conditionInLoopId }, + ]) + const conditionInLoopNode = createMockNode( + conditionInLoopId, + [ + { target: thenBranchId, sourceHandle: 'condition-if' }, + { target: sbjSentinelEndId, sourceHandle: 'condition-else' }, + ], + [sbjSentinelStartId] + ) + const thenBranchNode = createMockNode( + thenBranchId, + [{ target: sbjSentinelEndId }], + [conditionInLoopId] + ) + const sbjSentinelEndNode = createMockNode( + sbjSentinelEndId, + [], + [conditionInLoopId, thenBranchId] + ) + + const nodes = new Map([ + [sbjSentinelStartId, sbjSentinelStartNode], + [conditionInLoopId, conditionInLoopNode], + [thenBranchId, thenBranchNode], + [sbjSentinelEndId, sbjSentinelEndNode], + ]) + + const dag = createMockDAG(nodes) + const edgeManager = new EdgeManager(dag) + + edgeManager.processOutgoingEdges(conditionInLoopNode, { selectedOption: 'else' }) + + edgeManager.clearDeactivatedEdgesForNodes( + new Set([sbjSentinelStartId, sbjSentinelEndId, conditionInLoopId, thenBranchId]) + ) + + thenBranchNode.incomingEdges.add(conditionInLoopId) + + const readyNodes = edgeManager.processOutgoingEdges(conditionInLoopNode, { + selectedOption: 'if', + }) + + expect(readyNodes).toContain(thenBranchId) + }) }) describe('restoreIncomingEdge', () => { diff --git a/apps/sim/executor/execution/edge-manager.ts b/apps/sim/executor/execution/edge-manager.ts index a64ec7b09b6..7bedea3a5a2 100644 --- a/apps/sim/executor/execution/edge-manager.ts +++ b/apps/sim/executor/execution/edge-manager.ts @@ -128,12 +128,21 @@ export class EdgeManager { /** * Clear deactivated edges for a set of nodes (used when restoring loop state for next iteration). + * + * Only clears edges whose SOURCE is in the provided set. Edges pointing INTO a node in the set + * whose source lives outside (e.g. an external branch whose path was cascade-deactivated) must + * remain deactivated — otherwise `countActiveIncomingEdges` would count a source that will never + * fire again, stalling the loop on its next iteration. + * + * Edge-key format is `${sourceId}-${targetId}-${handle}`, so `startsWith("${nodeId}-")` uniquely + * matches "node is source". An `includes("-${nodeId}-")` check would also match "node is target" + * and is unsafe for the reset semantics. */ clearDeactivatedEdgesForNodes(nodeIds: Set): void { const edgesToRemove: string[] = [] for (const edgeKey of this.deactivatedEdges) { for (const nodeId of nodeIds) { - if (edgeKey.startsWith(`${nodeId}-`) || edgeKey.includes(`-${nodeId}-`)) { + if (edgeKey.startsWith(`${nodeId}-`)) { edgesToRemove.push(edgeKey) break } @@ -142,7 +151,6 @@ export class EdgeManager { for (const edgeKey of edgesToRemove) { this.deactivatedEdges.delete(edgeKey) } - // Also clear activated edge tracking for these nodes for (const nodeId of nodeIds) { this.nodesWithActivatedEdge.delete(nodeId) } diff --git a/apps/sim/executor/orchestrators/loop.ts b/apps/sim/executor/orchestrators/loop.ts index c8b4ea4c9a9..eb94c668764 100644 --- a/apps/sim/executor/orchestrators/loop.ts +++ b/apps/sim/executor/orchestrators/loop.ts @@ -7,13 +7,8 @@ import type { DAG } from '@/executor/dag/builder' import type { EdgeManager } from '@/executor/execution/edge-manager' import type { LoopScope } from '@/executor/execution/state' import type { BlockStateController, ContextExtensions } from '@/executor/execution/types' -import { - type ExecutionContext, - getNextExecutionOrder, - type NormalizedBlockOutput, -} from '@/executor/types' +import type { ExecutionContext, NormalizedBlockOutput } from '@/executor/types' import type { LoopConfigWithNodes } from '@/executor/types/loop' -import { buildContainerIterationContext } from '@/executor/utils/iteration-context' import { replaceValidReferences } from '@/executor/utils/reference-validation' import { addSubflowErrorLog, @@ -22,6 +17,7 @@ import { buildSentinelEndId, buildSentinelStartId, emitEmptySubflowEvents, + emitSubflowSuccessEvents, extractBaseBlockId, resolveArrayInput, validateMaxCount, @@ -319,31 +315,7 @@ export class LoopOrchestrator { const output = { results } this.state.setBlockOutput(loopId, output, DEFAULTS.EXECUTION_TIME) - if (this.contextExtensions?.onBlockComplete) { - const now = new Date().toISOString() - const iterationContext = buildContainerIterationContext(ctx, loopId) - - try { - await this.contextExtensions.onBlockComplete( - loopId, - 'Loop', - 'loop', - { - output, - executionTime: DEFAULTS.EXECUTION_TIME, - startedAt: now, - executionOrder: getNextExecutionOrder(ctx), - endedAt: now, - }, - iterationContext - ) - } catch (error) { - logger.warn('Loop completion callback failed', { - loopId, - error: error instanceof Error ? error.message : String(error), - }) - } - } + await emitSubflowSuccessEvents(ctx, loopId, 'loop', output, this.contextExtensions) return { shouldContinue: false, diff --git a/apps/sim/executor/orchestrators/parallel.ts b/apps/sim/executor/orchestrators/parallel.ts index eb4ea1ce05b..2af4308fe62 100644 --- a/apps/sim/executor/orchestrators/parallel.ts +++ b/apps/sim/executor/orchestrators/parallel.ts @@ -3,17 +3,13 @@ import { DEFAULTS } from '@/executor/constants' import type { DAG } from '@/executor/dag/builder' import type { ParallelScope } from '@/executor/execution/state' import type { BlockStateWriter, ContextExtensions } from '@/executor/execution/types' -import { - type ExecutionContext, - getNextExecutionOrder, - type NormalizedBlockOutput, -} from '@/executor/types' +import type { ExecutionContext, NormalizedBlockOutput } from '@/executor/types' import type { ParallelConfigWithNodes } from '@/executor/types/parallel' -import { buildContainerIterationContext } from '@/executor/utils/iteration-context' import { ParallelExpander } from '@/executor/utils/parallel-expansion' import { addSubflowErrorLog, emitEmptySubflowEvents, + emitSubflowSuccessEvents, extractBranchIndex, resolveArrayInput, validateMaxCount, @@ -318,34 +314,7 @@ export class ParallelOrchestrator { const output = { results } this.state.setBlockOutput(parallelId, output) - // Emit onBlockComplete for the parallel container so the UI can track it. - // When this parallel is nested inside a parent subflow (parallel or loop), emit - // iteration context so the terminal can group this event under the parent container. - if (this.contextExtensions?.onBlockComplete) { - const now = new Date().toISOString() - const iterationContext = buildContainerIterationContext(ctx, parallelId) - - try { - await this.contextExtensions.onBlockComplete( - parallelId, - 'Parallel', - 'parallel', - { - output, - executionTime: 0, - startedAt: now, - executionOrder: getNextExecutionOrder(ctx), - endedAt: now, - }, - iterationContext - ) - } catch (error) { - logger.warn('Parallel completion callback failed', { - parallelId, - error: error instanceof Error ? error.message : String(error), - }) - } - } + await emitSubflowSuccessEvents(ctx, parallelId, 'parallel', output, this.contextExtensions) return { allBranchesComplete: true, diff --git a/apps/sim/executor/utils/subflow-utils.ts b/apps/sim/executor/utils/subflow-utils.ts index 98391442a78..12194623b01 100644 --- a/apps/sim/executor/utils/subflow-utils.ts +++ b/apps/sim/executor/utils/subflow-utils.ts @@ -396,3 +396,62 @@ export async function emitEmptySubflowEvents( } } } + +/** + * Emits the BlockLog + onBlockComplete callback for a loop/parallel container that + * finished successfully with at least one iteration. Without this, successful container + * runs produce no top-level BlockLog, which forces the trace-span builder to fall back + * to generic counter-based names ("Loop 1", "Parallel 1") instead of the user-configured + * block name. + */ +export async function emitSubflowSuccessEvents( + ctx: ExecutionContext, + blockId: string, + blockType: 'loop' | 'parallel', + output: { results: any[] }, + contextExtensions: ContextExtensions | null +): Promise { + const now = new Date().toISOString() + const executionOrder = getNextExecutionOrder(ctx) + const block = ctx.workflow?.blocks.find((b) => b.id === blockId) + const blockName = block?.metadata?.name ?? blockType + const iterationContext = buildContainerIterationContext(ctx, blockId) + + ctx.blockLogs.push({ + blockId, + blockName, + blockType, + startedAt: now, + endedAt: now, + durationMs: DEFAULTS.EXECUTION_TIME, + success: true, + output, + executionOrder, + }) + + if (contextExtensions?.onBlockComplete) { + try { + await contextExtensions.onBlockComplete( + blockId, + blockName, + blockType, + { + output, + executionTime: DEFAULTS.EXECUTION_TIME, + startedAt: now, + executionOrder, + endedAt: now, + }, + iterationContext + ) + } catch (error) { + logger.warn('Subflow success completion callback failed', { + blockId, + blockType, + error: error instanceof Error ? error.message : String(error), + }) + } + } + + return blockName +} diff --git a/apps/sim/lib/logs/execution/trace-spans/trace-spans.test.ts b/apps/sim/lib/logs/execution/trace-spans/trace-spans.test.ts index f223656c76f..dd226ee857a 100644 --- a/apps/sim/lib/logs/execution/trace-spans/trace-spans.test.ts +++ b/apps/sim/lib/logs/execution/trace-spans/trace-spans.test.ts @@ -1970,4 +1970,116 @@ describe('nested subflow grouping via parentIterations', () => { expect(nestedP2).toBeDefined() expect(nestedP2!.children).toHaveLength(1) }) + + it.concurrent( + 'uses the user-configured loop name for the container span when a success BlockLog is present', + () => { + const result: ExecutionResult = { + success: true, + output: { content: 'done' }, + metadata: { duration: 3000, startTime: '2024-01-01T10:00:00.000Z' }, + logs: [ + { + blockId: 'loop-sbj', + blockName: 'LoopGroupA (SBJ)', + blockType: 'loop', + startedAt: '2024-01-01T10:00:00.000Z', + endedAt: '2024-01-01T10:00:03.000Z', + durationMs: 3000, + success: true, + output: { results: [[{ value: 1 }], [{ value: 2 }]] }, + executionOrder: 10, + }, + { + blockId: 'api-1', + blockName: 'Send (iteration 0)', + blockType: 'api', + startedAt: '2024-01-01T10:00:00.000Z', + endedAt: '2024-01-01T10:00:01.000Z', + durationMs: 1000, + success: true, + loopId: 'loop-sbj', + iterationIndex: 0, + executionOrder: 1, + }, + { + blockId: 'api-1', + blockName: 'Send (iteration 1)', + blockType: 'api', + startedAt: '2024-01-01T10:00:01.000Z', + endedAt: '2024-01-01T10:00:02.000Z', + durationMs: 1000, + success: true, + loopId: 'loop-sbj', + iterationIndex: 1, + executionOrder: 2, + }, + ], + } + + const { traceSpans } = buildTraceSpans(result) + const workflow = traceSpans[0] + const loop = workflow.children!.find((s) => s.type === 'loop') + + expect(loop).toBeDefined() + expect(loop!.name).toBe('LoopGroupA (SBJ)') + expect(loop!.children).toHaveLength(2) + } + ) + + it.concurrent( + 'uses the user-configured parallel name for the container span when a success BlockLog is present', + () => { + const result: ExecutionResult = { + success: true, + output: { content: 'done' }, + metadata: { duration: 2000, startTime: '2024-01-01T10:00:00.000Z' }, + logs: [ + { + blockId: 'parallel-a', + blockName: 'FanOutCalls', + blockType: 'parallel', + startedAt: '2024-01-01T10:00:00.000Z', + endedAt: '2024-01-01T10:00:02.000Z', + durationMs: 2000, + success: true, + output: { results: [[{ v: 1 }], [{ v: 2 }]] }, + executionOrder: 10, + }, + { + blockId: 'api-1', + blockName: 'Call (iteration 0)', + blockType: 'api', + startedAt: '2024-01-01T10:00:00.000Z', + endedAt: '2024-01-01T10:00:01.000Z', + durationMs: 1000, + success: true, + parallelId: 'parallel-a', + iterationIndex: 0, + executionOrder: 1, + }, + { + blockId: 'api-1', + blockName: 'Call (iteration 1)', + blockType: 'api', + startedAt: '2024-01-01T10:00:01.000Z', + endedAt: '2024-01-01T10:00:02.000Z', + durationMs: 1000, + success: true, + parallelId: 'parallel-a', + iterationIndex: 1, + executionOrder: 2, + }, + ], + } + + const { traceSpans } = buildTraceSpans(result) + const workflow = traceSpans[0] + const parallel = workflow.children!.find((s) => s.type === 'parallel') + + expect(parallel).toBeDefined() + expect(parallel!.name).toBe('FanOutCalls') + expect(parallel!.children).toHaveLength(2) + } + ) }) diff --git a/apps/sim/lib/logs/execution/trace-spans/trace-spans.ts b/apps/sim/lib/logs/execution/trace-spans/trace-spans.ts index 1910f8c605c..f367058fd6f 100644 --- a/apps/sim/lib/logs/execution/trace-spans/trace-spans.ts +++ b/apps/sim/lib/logs/execution/trace-spans/trace-spans.ts @@ -744,8 +744,10 @@ function buildContainerChildren( * chain determines the top-level container. Iteration spans are peeled one level at a * time and recursed. * - * Sentinel blocks (parallel/loop containers) do NOT produce BlockLogs, so there are no - * sentinel spans to anchor grouping. Containers are synthesized from the iteration data. + * Container BlockLogs (parallel/loop) are produced on skip (empty collection), error, and + * successful completion. When present, they supply the user-configured container name via + * `resolveContainerName`; otherwise the container is synthesized from iteration data with a + * counter-based fallback name. */ function groupIterationBlocksRecursive( spans: TraceSpan[], From 5e143b6260a09e3aa97b16bf39b46ba7ca8c5b8b Mon Sep 17 00:00:00 2001 From: Vikhyath Mondreti Date: Thu, 16 Apr 2026 13:34:10 -0700 Subject: [PATCH 2/2] improve style --- apps/sim/executor/utils/subflow-utils.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/apps/sim/executor/utils/subflow-utils.ts b/apps/sim/executor/utils/subflow-utils.ts index 12194623b01..89a1e7b6948 100644 --- a/apps/sim/executor/utils/subflow-utils.ts +++ b/apps/sim/executor/utils/subflow-utils.ts @@ -410,7 +410,7 @@ export async function emitSubflowSuccessEvents( blockType: 'loop' | 'parallel', output: { results: any[] }, contextExtensions: ContextExtensions | null -): Promise { +): Promise { const now = new Date().toISOString() const executionOrder = getNextExecutionOrder(ctx) const block = ctx.workflow?.blocks.find((b) => b.id === blockId) @@ -452,6 +452,4 @@ export async function emitSubflowSuccessEvents( }) } } - - return blockName }