8000 Parallel&InclusiveGateway数据合并部分有Bug · Issue #76 · didi/turbo · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Parallel&InclusiveGateway数据合并部分有Bug #76

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
cfanlu opened this issue May 28, 2025 · 0 comments
Open

Parallel&InclusiveGateway数据合并部分有Bug #76

cfanlu opened this issue May 28, 2025 · 0 comments

Comments

@cfanlu
Copy link
cfanlu commented May 28, 2025

好像并行网关数据合并时候有bug,之保留了最后一个task的输出,改了下就好了。
public class TurboBranchMergeJoinAll extends BranchMergeJoinAll {

@Override
public void joinMerge(RuntimeContext runtimeContext, NodeInstancePO joinNodeInstancePo, NodeInstanceBO currentNodeInstance,
                      String parentExecuteId, String currentExecuteId, Set<String> allExecuteIdSet, DataMergeStrategy dataMergeStrategy) {
    Set<String> arrivedExecuteIds = ExecutorUtil.getExecuteIdSet((String) joinNodeInstancePo.get("executeId"));
    arrivedExecuteIds.add(currentExecuteId);

    // 1. 获取当前分支的数据
    InstanceDataPO currentBranchData = instanceDataDAO.select(runtimeContext.getFlowInstanceId(), runtimeContext.getInstanceDataId());

    // 2. 获取已合并的数据(之前所有分支的合并结果)
    InstanceDataPO mergedData = instanceDataDAO.select(runtimeContext.getFlowInstanceId(), joinNodeInstancePo.getInstanceDataId());
    if (mergedData == null) {
        mergedData = new InstanceDataPO();
        mergedData.setInstanceDataId(genId());
        joinNodeInstancePo.setInstanceDataId(mergedData.getInstanceDataId());
    }

    // 3. 合并数据
    InstanceDataPO mergePo = dataMergeStrategy.merge(runtimeContext, mergedData, currentBranchData);

    // 4. 根据是否所有分支都到达,决定是完成还是等待
    if (ExecutorUtil.allArrived(allExecuteIdSet, arrivedExecuteIds)) {
        // 所有分支都到达,更新状态为完成
        if (StringUtils.isBlank(mergedData.getInstanceDataId())) {
            instanceDataDAO.insert(mergePo);
        } else {
            instanceDataDAO.updateData(mergePo);
        }
        buildParallelNodeInstancePo(joinNodeInstancePo, currentNodeInstance, NodeInstanceStatus.COMPLETED);
        nodeInstanceDAO.updateById(joinNodeInstancePo);
        nodeInstanceLogDAO.insert(buildCurrentNodeInstanceLogPO(currentNodeInstance, currentExecuteId, joinNodeInstancePo));
    } else {
        // 还有分支未到达,更新状态为等待
        if (StringUtils.isBlank(mergedData.getInstanceDataId())) {
            instanceDataDAO.insert(mergePo);
        } else {
            instanceDataDAO.updateData(mergePo);
        }
        buildParallelNodeInstancePo(joinNodeInstancePo, currentNodeInstance, ParallelNodeInstanceStatus.WAITING);
        nodeInstanceDAO.updateById(joinNodeInstancePo);
        nodeInstanceLogDAO.insert(buildNodeInstanceLogPO(joinNodeInstancePo));

        throw new SuspendException(ParallelErrorEnum.WAITING_SUSPEND.getErrNo(), MessageFormat.format(Constants.NODE_INSTANCE_FORMAT,
                runtimeContext.getCurrentNodeModel().getKey(),
                runtimeContext.getCurrentNodeModel().getProperties().getOrDefault(Constants.ELEMENT_PROPERTIES.NAME, StringUtils.EMPTY),
                currentNodeInstance.getNodeInstanceId()));
    }
}

}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant
0