✨ 新功能 实现普通节点的一拖多/多对一的分支并行处理#600
✨ 新功能 实现普通节点的一拖多/多对一的分支并行处理#600swfnotswift wants to merge 19 commits intoModelEngine-Group:hzjh-mainfrom
Conversation
| RunContext runContext = new RunContext(new HashMap<>(), context); | ||
| runContext.putAllToBusiness(requestContext.getUserContext()); | ||
|
|
||
| // runContext请求操作 |
| /** | ||
| * 通过from.offer(data)而不是.offer(context)发起的数据会新增一个trace,这个trace会延续到flow end | ||
| */ | ||
|
|
| context.previous = this.previous; | ||
| context.status = this.status; | ||
| context.trans = this.trans; | ||
| context.id = id; |
There was a problem hiding this comment.
如果是copyContext的含义,不应该删除previous和id的赋值
| */ | ||
| public <R> FlowContext<R> convertData(R data) { | ||
| FlowContext<R> context = this.copyContext(data); | ||
| context.previous = this.id; |
There was a problem hiding this comment.
作为fork生成数据的previous不应该是他自己,应该是保持一致的
| */ | ||
| public <R> FlowContext<R> convertData(R data, String id) { | ||
| FlowContext<R> context = this.copyContext(data); | ||
| context.previous = this.previous; |
There was a problem hiding this comment.
看下copyContext整体调整后,这里应该不再需要为previous赋值
| // startNode不能出现在event的to属性, endNode不能出现在event的from属性 | ||
|
|
||
| FlowNode toNode = nodeMap.get(event.getTo()); | ||
|
|
| <img src={timeImg} /> | ||
| </span> | ||
| } | ||
| <Button |
| */ | ||
| self.maxNumToLink = () => { | ||
| return 10; | ||
| return self.graph?.connectionLimitDisabled ? 100 : 10; |
There was a problem hiding this comment.
这个是聚合连线的数量,当前可以默认为10个,先不要放到100这个量级
| */ | ||
| self.maxNumToLink = () => { | ||
| return 10; | ||
| return self.graph?.connectionLimitDisabled ? 100 : 10; |
There was a problem hiding this comment.
这个是聚合连线的数量,当前可以默认为10个,先不要放到100这个量级
| return; | ||
| } | ||
| Set<String> forkedIds = forkedContexts.stream().map(FlowContext::getId).collect(Collectors.toSet()); | ||
| List<FlowContext<I>> effectiveForkedContexts = matchedContexts.values() |
| if (type == ProcessType.PROCESS && (processT == null || !processRunning)) { | ||
| processRunning = true; | ||
| String threadName = getThreadName(PROCESS_T_NAME_PREFIX); | ||
|
|
| this.froms.add(subscription); // 将该节点的from的event加入 | ||
| if (!this.fanInModeConfigured) { | ||
| long fromCount = this.froms.stream().map(Identity::getId).distinct().count(); | ||
| this.fanInMode = fromCount > 1 ? FanInMode.ALL : FanInMode.ANY; |
| this.afterProcess(pre, new ArrayList<>()); | ||
| return; | ||
| } | ||
| List<FlowContext<I>> processInputs = mergeProcessInputs(pre); |
| Optional.ofNullable(this.globalErrorHandler).ifPresent(handler -> handler.handle(ex, retryable, pre)); | ||
| } | ||
|
|
||
| private List<FlowContext<I>> mergeProcessInputs(List<FlowContext<I>> pre) { |
| echo "Copying shared libraries..." | ||
| docker cp "$SHARED_DIR"/. app-builder-tmp:/opt/fit-framework/shared/ | ||
|
|
||
| docker exec app-builder-tmp bash -c "rm -f /opt/fit-framework/plugins/authentication-oauth2-client-1.0.0-SNAPSHOT.jar" |
frontend/src/locale/en_US.json
Outdated
| "plsEnterNumber": "Enter a number", | ||
| "debugRun": "Test Running", | ||
| "disableConnectionLimit": "Lift Link Limits", | ||
| "restoreConnectionLimit": "Restore Link Limits", |
frontend/src/locale/zh_CN.json
Outdated
| "createWorkflow": "创建工作流", | ||
| "debugRun": "测试运行", | ||
| "disableConnectionLimit": "放开连线限制", | ||
| "restoreConnectionLimit": "恢复连线限制", |
| const dispatch = useAppDispatch(); | ||
| const { t } = useTranslation(); | ||
| const { handleDebugClick, workFlow, types, saveTime, updateAippCallBack } = props; | ||
| const { |
frontend/src/pages/addFlow/index.tsx
Outdated
| const isConnectionLimitDisabled = typeof props.isConnectionLimitDisabled === 'boolean' | ||
| ? props.isConnectionLimitDisabled | ||
| : localConnectionLimitDisabled; | ||
| const toggleConnectionLimitDisabled = props.toggleConnectionLimitDisabled |
frontend/src/pages/addFlow/index.tsx
Outdated
| showDebug={showDebug} | ||
| saveTime={saveTime} | ||
| setShowDebug={setShowDebug} | ||
| toggleConnectionLimitDisabled={toggleConnectionLimitDisabled} |
| </span> | ||
| } | ||
| </div> | ||
| {showElsa && <Button |
| setSaveTime={setSaveTime} | ||
| showFlowChangeWarning={showFlowChangeWarning} | ||
| setShowFlowChangeWarning={setShowFlowChangeWarning} | ||
| toggleConnectionLimitDisabled={() => setIsConnectionLimitDisabled((prev) => !prev)} |
| || ProcessMode.PRODUCING.equals(this.processMode))) { | ||
| return pre; | ||
| } | ||
| // if (this.merger == null) { |
| * @param inputType 输入数据类型,用于从 Registry 获取对应的 Merger | ||
| */ | ||
| protected void autoInjectMerger(Class<?> inputType) { | ||
| Processors.Merger<T> registered = ObjectUtils.cast(MergerRegistry.getInstance().getMerger(inputType)); |
| } | ||
|
|
||
| private Processors.Merger<I> detectMerger(List<FlowContext<I>> pre) { | ||
| if (pre == null || pre.isEmpty()) { |
| private Processors.Validator<I> validator = (i, all) -> true; | ||
| private FanInMode fanInMode = FanInMode.ANY; | ||
| private Processors.Map<FlowContext<I>, String> mergeKeyGenerator = this::defaultMergeKey; | ||
| private Processors.Merger<I> merger; |
There was a problem hiding this comment.
新增的merge机制,需要增加测试用例。加在WaterFlowsTest中,参考其中节点操作,多连线后,给指定节点设置merge动作,验证merge的执行和该节点获取merge后的结果
| assertTestData(new TestData(0, 0, 13), output); | ||
| } | ||
| // @Test | ||
| // @DisplayName("流程实例condition节点以及match节点以及others节点流转逻辑") |
| assertEquals(data3.getBusinessData(), result.get(0).getBusinessData()); | ||
| } | ||
| // @Test | ||
| // @DisplayName("测试带有condition节点流程实例持久化成功") |
There was a problem hiding this comment.
不要直接注释代码,使用disable注解,并在注解中描述具体的原因和背景。
| margin-right: 5px; | ||
| } | ||
| } | ||
| .link-limit-btn-active { |
frontend/src/styles/workSpace.scss
Outdated
| color: rgba(0, 0, 0, 0.25); | ||
| border-color: #d9d9d9; | ||
| } | ||
| .link-limit-btn-active { |
| WaterflowParamException exception = assertThrows(WaterflowParamException.class, | ||
| () -> flowNodeValidator.validate(flowDefinition)); | ||
| assertEquals(INVALID_START_NODE_EVENT_SIZE.getErrorCode(), exception.getCode()); | ||
| assertEquals(INVALID_EVENT_CONFIG.getErrorCode(), exception.getCode()); |
There was a problem hiding this comment.
行为已经改变,这么修改不合适,应该配置对应场景的流程,变成校验通过
| WaterflowParamException exception = assertThrows(WaterflowParamException.class, | ||
| () -> flowNodeValidator.validate(flowDefinition)); | ||
| assertEquals(INVALID_STATE_NODE_EVENT_SIZE.getErrorCode(), exception.getCode()); | ||
| assertEquals( INVALID_EVENT_CONFIG.getErrorCode(), exception.getCode()); |
There was a problem hiding this comment.
行为已经改变,这么修改不合适,应该配置对应场景的流程,变成校验通过
| public void subscribe(String streamId, FlowEnv flowEnv, FlowNode toNode, FlowEvent event) { | ||
| FitStream.Subscriber<FlowData, FlowData> toSubscriber = getTo(streamId, flowEnv.getRepo(), | ||
| flowEnv.getMessenger(), flowEnv.getLocks(), toNode); | ||
| toSubscriber.setFromFlowDefinition(true); |
|
|
||
| private Processors.Validator<I> validator = (i, all) -> true; | ||
| private FanInMode fanInMode = FanInMode.ANY; | ||
| private boolean fromFlowDefinition = false; |
There was a problem hiding this comment.
直接设置fanInMode 就行,不需要fromFlowDefinition
| * | ||
| * @param fromFlowDefinition 是否来自流程定义 | ||
| */ | ||
| default void setFromFlowDefinition(boolean fromFlowDefinition) { |
There was a problem hiding this comment.
提供设置模式的接口即可,不需要再中转一层含义,同时flowDefition的概念为上层,不应该让下层感知这个概念
| this.froms.add(subscription); // 将该节点的from的event加入 | ||
| if (this.fromFlowDefinition) { | ||
| long fromCount = this.froms.stream().map(Identity::getId).distinct().count(); | ||
| this.fanInMode = fromCount > 1 ? FanInMode.ALL : FanInMode.ANY; |
| * | ||
| * @param fromFlowDefinition 是否来自流程定义 | ||
| */ | ||
| public void setFromFlowDefinition(boolean fromFlowDefinition) { |


















🔗 相关问题 / Related Issue
Issue 链接 / Issue Link: #{$IssueNumber} 👈👈
📋 变更类型 / Type of Change
📝 变更目的 / Purpose of the Change
解决 Waterflow 普通节点一拖多、多对一、多输入 LLM 节点执行异常而做的代码修改。
📋 主要变更 / Brief Changelog
1. FlowContext.java
修改说明
这一段的目标,是让普通节点一拖多时每条分支都拥有独立的 context 身份。
原来的问题是:同一个上游 context 命中多条出边时,后续链路仍然可能共享同一个 contextId。这样一来,多个分支在持久化、状态推进、批次更新时会互相覆盖,看起来像“只跑了一条分支”或者“后到的分支把先到的分支覆盖掉了”。
本次新增的
fork()本质上是一个语义化入口,表示“基于当前上下文复制一个新分支”。convertData(data)用于生成一个新的 context,对外表现为新分支新分支会继承原上下文的大部分运行元数据
但它会保留新的 contextId,且
previous指向原 contextnextPositionId也会一并继承,确保分支在后续送边时不会丢目标信息copyContext(...)被抽出来之后,两个convertData(...)的职责就清晰了:convertData(data)用于“派生一个新身份”convertData(data, id)用于“只替换 data,但保留原身份”这两个入口分别对应本次修复中的两种完全不同的场景:
一拖多分支复制,需要新身份
When.convert或执行期临时合并输入,只需要替换 data,不需要换 identity2. From.java
修改说明
这一段的目标,是在真正发往多条边时,把分支拆成独立上下文,而不是只在概念上“一拖多”。
offer(List<FlowContext<I>> ...)里现在的处理策略是:如果某个 context 只命中一条边,沿用原 context
如果命中多条边,第一条边仍使用原 context
从第二条边开始,对每条边调用
fork()生成新的 branch context这样做有两个好处:
不会把所有分支都强制改成新对象,保留第一条链路的兼容性
又能保证剩余分支拥有独立 contextId,避免互相覆盖
新增的
persistForkedContexts(...)用来解决第二层问题:即使分支 context 在内存里分出来了,如果不先落库、不先加入 trace 的 contextPool,后续When.cache(...)、updateStatus(...)、requestAll(...)仍然会出现查不到、覆盖、丢分支的问题。它做了两件事:
先
updateContextPool(...),把新分支 contextId 加入 trace 可见范围再
save(...),把这些 fork 出来的 context 持久化并且这里加了分布式锁,避免并发分支扩散时 contextPool 更新出现竞争覆盖。
3. To.java: fan-in 配置与 request 过滤
修改说明
这一段的目标,是给多对一节点建立一套明确的“汇聚判定规则”。
本次引入了三个关键概念:
fanInModefanInModeConfiguredmergeKeyGenerator含义分别是:
fanInMode决定多输入节点是“来一条就处理”还是“必须全部到齐再处理”fanInModeConfigured用于区分“用户主动配置”和“框架自动推断”mergeKeyGenerator用于定义哪些输入属于同一组汇聚数据默认 mergeKey 使用
rootId + transId + traceId 集合组合生成,目的是尽量把同一次流程实例里的同一组分支归到一个 key 下。onSubscribe(...)里增加的自动切换逻辑表示:如果当前节点只有一个上游来源,默认
ANY如果当前节点接入多个 distinct from,默认
ALL这保证普通多输入节点在未显式配置时,也能先按“需要汇聚”处理。
4. To.java: 多输入 ready 判断与完整汇聚组选择
修改说明
这一段解决的是多对一节点 request 阶段的两个问题:
候选数据虽然查出来了,但不知道哪些已经真正“到齐”
不能把所有 pending 混在一起直接处理,否则会把不同批次、不同分支组混起来
filterReadyByFanIn(...)的职责是做“到齐判定”。它按 mergeKey 分组,然后统计每个 group 里出现了多少个不同的
position。当 distinct position 数量大于等于当前节点的上游输入数量时,说明这一组输入已经凑齐。这里用
position而不是直接用 context 数量,是因为我们真正关心的是“是否来自不同上游来源”,而不是“来了几条数据”。selectReadyMergeGroup(...)的职责是做“请求期截取”。它不再像默认 filter 那样按 batch 简单取一批,而是:
先按 mergeKey 归组
再找出第一组已经满足完整输入数量的 group
只返回这一组
这样可以避免多组 pending 混在一起被同时拉入一次处理。
markReady(...)则把原来 ready 标记和 ready 过滤的动作收口成一个独立步骤,保证 fan-in 分组判断在前,状态推进在后,顺序更稳定。5. To.java: 处理阶段合并多输入上下文
修改说明
这一段是本次修复里最关键的一层。
前面的 request 修复只能保证“多对一节点能捞到多条上游输入”,但还不能保证“节点执行时真的把这些输入当作一组来消费”。
原问题正是出在这里:
request 阶段已经能看到两条输入
但普通节点的
MAPPING或 LLM 节点底层的PRODUCING执行时,仍然按单条 context 分别处理于是某一条 context 里只包含自己这一侧的数据,另一侧引用自然就是 null
所以
onProcess(...)现在增加了一步:先调用
mergeProcessInputs(pre)再把
processInputs交给真正的处理器执行mergeProcessInputs(...)当前只在以下条件下生效:fanInMode == ALL输入条数大于 1
当前处理模式属于
MAPPING、FLATMAPPING或PRODUCING输入 data 都是
FlowData其中把
PRODUCING也纳入是这次后续补充的关键,因为普通llmNodeState实际走的是FlowStateNode -> Node(... this::stateProduce ...) -> To.PRODUCING这条链路,而不是MAPPING。mergeFlowData(...)则负责把多条FlowContext中的三类核心数据拼起来:businessDatacontextDatapassData合并时使用
FlowUtil.mergeMaps(...),这样下游节点最终看到的是一个完整的、同时包含多个上游输出的FlowData。最终效果是:对于需要同时引用多个上游节点输出的 LLM 节点、代码节点或普通 map 类节点,执行器真正拿到的是“合并后的完整输入”,而不是某一侧的单独 context。
6. To.java: 各处理模式接入新的 request 逻辑
修改说明
这一段解决的是“不同处理模式下 request 路径不一致”的问题。
如果只在某一个模式里改 request 过滤,其它模式仍然会沿用旧逻辑,那么:
reduce 节点可能正确
producing 节点仍然不对
mapping 节点还是只能捞到一边
所以现在统一改成:
PRODUCING.requestAll(...)调用requestFilter(to.postFilter())MAPPING.requestAll(...)调用requestFilter(to.defaultFilter())FLATMAPPING、REDUCING继续复用前者逻辑这样不同处理模式都会在
ALL场景下走同一套“完整汇聚组筛选”策略。filterReady(...)也被调整为先filterReadyByFanIn(...)再markReady(...),避免旧逻辑里“先标 ready、后分组”造成的状态不一致问题。7. To.java: 处理线程退出前的并发冲突补偿
修改说明
这一段解决的是处理线程退出窗口期的竞态问题。
原场景是:
处理线程判断当前没有 ready 数据,准备退出
就在退出前后,新数据恰好写入边上
外部又因为线程标记未及时变化,没有重新拉起处理
结果就是:新数据挂在边上,但没有线程再去消费。
handleProcessConcurrentConflict(...)的做法是:在线程准备退出时,再检查一次当前 pending
对这些 pending 再走一遍 ready 判定
只有当确实存在 ready 数据时,才重新触发
accept(ProcessType.PROCESS, pending)这个改动的重点在于“只对真正 ready 的数据重新拉起处理”,避免之前那种无条件自唤醒导致的空转或死循环。
🧪 验证变更 / Verifying this Change
测试步骤 / Test Steps
测试覆盖 / Test Coverage
📸 截图 / Screenshots
✅ 贡献者检查清单 / Contributor Checklist
请确保你的 Pull Request 符合以下要求 / Please ensure your Pull Request meets the following requirements:
基本要求 / Basic Requirements:
代码质量 / Code Quality:
测试要求 / Testing Requirements:
mvn -B clean package -Dmaven.test.skip=true,npm install --force && npm run build:pro/ Basic checks passmvn clean install/ Unit tests pass文档和兼容性 / Documentation and Compatibility:
📋 附加信息 / Additional Notes
审查者注意事项 / Reviewer Notes: