汇聚
join
组件:汇聚合并节点。用于汇聚并合并多个异步并行执行节点的结果。常见场景包括:从多个数据源(如不同数据库)获取数据后合并、并行调用多个API后合并结果等。
# 工作原理
- 接收到消息后,等待所有前置异步节点执行完成
- 合并所有节点的执行结果(metadata和data)
- 根据执行情况决定路由方向:
- 所有节点执行成功,通过Success链路由
- 存在失败节点或超时,通过Failure链路由
# 配置
字段 | 类型 | 必填 | 说明 | 默认值 |
---|---|---|---|---|
timeout | int | 否 | 执行超时时间(秒),0表示不设置超时 | 0 |
# Relation Type
- Success: 所有节点执行成功时,合并后的消息发送到
Success
链路 - Failure: 以下情况消息发送到
Failure
链路:- 执行超时
- 存在节点执行失败
- 合并结果失败
# 执行结果
组件会合并所有节点的执行结果:
- metadata: 合并所有节点的metadata,相同key时后执行的节点会覆盖先执行节点的值
- data: 将所有节点处理后的消息封装成WrapperMsg数组
WrapperMsg结构:
字段 | 类型 | 说明 | 默认值 |
---|---|---|---|
msg | types.RuleMsg 不合并metadata | 节点处理后的消息 | 无 |
err | string | 错误信息 | "" |
nodeId | string | 最后处理该消息的节点 | "" |
# 配置示例
{
"ruleChain": {
"id": "frcYgBtVbDaV",
"name": "测试合并节点",
"debugMode": true,
"root": true,
"additionalInfo": {
"createTime": "2024/09/24 16:56:52",
"description": "",
"layoutX": "280",
"layoutY": "280",
"updateTime": "2024/09/24 21:50:38",
"username": "admin"
}
},
"metadata": {
"endpoints": [],
"nodes": [
{
"id": "node_a",
"additionalInfo": {
"description": "",
"layoutX": 490,
"layoutY": 280
},
"type": "jsTransform",
"name": "A",
"debugMode": false,
"configuration": {
"jsScript": "msg.a=\"aa\"\nreturn {'msg':msg,'metadata':metadata,'msgType':msgType};"
}
},
{
"id": "node_b",
"additionalInfo": {
"description": "",
"layoutX": 730,
"layoutY": 210
},
"type": "jsTransform",
"name": "B",
"debugMode": false,
"configuration": {
"jsScript": "msg.b=\"bb\"\nreturn {'msg':msg,'metadata':metadata,'msgType':msgType};"
}
},
{
"id": "node_c",
"additionalInfo": {
"description": "",
"layoutX": 730,
"layoutY": 340
},
"type": "jsTransform",
"name": "C",
"debugMode": false,
"configuration": {
"jsScript": "msg.c=\"cc\"\nreturn {'msg':msg,'metadata':metadata,'msgType':msgType};"
}
},
{
"id": "node_d",
"additionalInfo": {
"description": "",
"layoutX": 1010,
"layoutY": 260
},
"type": "join",
"name": "D",
"debugMode": false,
"configuration": {
"timeout": 1
}
}
],
"connections": [
{
"fromId": "node_a",
"toId": "node_b",
"type": "Success"
},
{
"fromId": "node_a",
"toId": "node_c",
"type": "Success"
},
{
"fromId": "node_b",
"toId": "node_d",
"type": "Success"
},
{
"fromId": "node_c",
"toId": "node_d",
"type": "Success"
}
]
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
在 GitHub 上编辑此页 (opens new window)
上次更新: 2024/12/22, 03:38:12