8000
We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
There was an error while loading. Please reload this page.
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
好像并行网关数据合并时候有bug,之保留了最后一个task的输出,改了下就好了。 public class TurboBranchMergeJoinAll extends BranchMergeJoinAll {
@Override public void joinMerge(RuntimeContext runtimeContext, NodeInstancePO joinNodeInstancePo, NodeInstanceBO currentNodeInstance, String parentExecuteId, String currentExecuteId, Set<String> allExecuteIdSet, DataMergeStrategy dataMergeStrategy) { Set<String> arrivedExecuteIds = ExecutorUtil.getExecuteIdSet((String) joinNodeInstancePo.get("executeId")); arrivedExecuteIds.add(currentExecuteId); // 1. 获取当前分支的数据 InstanceDataPO currentBranchData = instanceDataDAO.select(runtimeContext.getFlowInstanceId(), runtimeContext.getInstanceDataId()); // 2. 获取已合并的数据(之前所有分支的合并结果) InstanceDataPO mergedData = instanceDataDAO.select(runtimeContext.getFlowInstanceId(), joinNodeInstancePo.getInstanceDataId()); if (mergedData == null) { mergedData = new InstanceDataPO(); mergedData.setInstanceDataId(genId()); joinNodeInstancePo.setInstanceDataId(mergedData.getInstanceDataId()); } // 3. 合并数据 InstanceDataPO mergePo = dataMergeStrategy.merge(runtimeContext, mergedData, currentBranchData); // 4. 根据是否所有分支都到达,决定是完成还是等待 if (ExecutorUtil.allArrived(allExecuteIdSet, arrivedExecuteIds)) { // 所有分支都到达,更新状态为完成 if (StringUtils.isBlank(mergedData.getInstanceDataId())) { instanceDataDAO.insert(mergePo); } else { instanceDataDAO.updateData(mergePo); } buildParallelNodeInstancePo(joinNodeInstancePo, currentNodeInstance, NodeInstanceStatus.COMPLETED); nodeInstanceDAO.updateById(joinNodeInstancePo); nodeInstanceLogDAO.insert(buildCurrentNodeInstanceLogPO(currentNodeInstance, currentExecuteId, joinNodeInstancePo)); } else { // 还有分支未到达,更新状态为等待 if (StringUtils.isBlank(mergedData.getInstanceDataId())) { instanceDataDAO.insert(mergePo); } else { instanceDataDAO.updateData(mergePo); } buildParallelNodeInstancePo(joinNodeInstancePo, currentNodeInstance, ParallelNodeInstanceStatus.WAITING); nodeInstanceDAO.updateById(joinNodeInstancePo); nodeInstanceLogDAO.insert(buildNodeInstanceLogPO(joinNodeInstancePo)); throw new SuspendException(ParallelErrorEnum.WAITING_SUSPEND.getErrNo(), MessageFormat.format(Constants.NODE_INSTANCE_FORMAT, runtimeContext.getCurrentNodeModel().getKey(), runtimeContext.getCurrentNodeModel().getProperties().getOrDefault(Constants.ELEMENT_PROPERTIES.NAME, StringUtils.EMPTY), currentNodeInstance.getNodeInstanceId())); } }
}
The text was updated successfully, but these errors were encountered:
No branches or pull requests
好像并行网关数据合并时候有bug,之保留了最后一个task的输出,改了下就好了。
public class TurboBranchMergeJoinAll extends BranchMergeJoinAll {
}
The text was updated successfully, but these errors were encountered: