RuleGo RuleGo
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 流式计算
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • 🌊StreamSQL
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 流式计算
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • 🌊StreamSQL
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文

广告采用随机轮播方式显示 ❤️成为赞助商
  • Endpoint概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件

    • Rest Endpoint
    • Websocket Endpoint
    • MQTT Endpoint
      • Type
      • 启动配置
      • 响应
      • 示例
        • 注意事项:主题重叠(Topic Overlap)与重复消息
    • Schedule Endpoint
    • Net Endpoint
    • Kafka Endpoint
    • Nats Endpoint
    • Redis Sub Endpoint
    • Redis Steam Endpoint
    • Rabbitmq Endpoint
    • MYSQL CDC Endpoint
    • OPC_UA Endpoint
    • GRPC Stream Endpoint
    • Beanstalkd Endpoint
    • Wukongim Endpoint
    • 扩展Endpoint
    • NSQ Endpoint
    • Pulsar Endpoint
目录

MQTT Endpoint

Mqtt Endpoint 用来创建和启动MQTT接收服务,它可以订阅不同主题数据,然后路由到不同规则链进行处理。

# Type

endpoint/mqtt

# 启动配置

该组件允许通关过server字段复用共享的连接客户端。参考组件连接复用 。

字段 类型 是否必填 说明 默认值
server string 是 mqtt broker地址 -
username string 否 用户名 0
password string 否 密码 -
qOS int 否 QOS 0
cleanSession bool 否 CleanSession false
clientID string 否 客户端ID 默认随机数
cAFile string 否 CA文件路径 -
certFile string 否 Cert文件路径 -
certKeyFile string 否 CertKey文件路径 -

# 响应

exchange.Out.SetBody响应之前,需要通过exchange.Out.Headers()或者exchange.Out.Msg.Metadata指定responseTopic参数,组件就会往指定的主题发送数据:

exchange.Out.GetMsg().Metadata.PutValue("responseTopic", "device.msg.response")
// or
exchange.Out.Headers().Add("responseTopic", "device.msg.response")

exchange.Out.SetBody([]byte("ok"))
1
2
3
4
5

响应参数配置:

字段 类型 是否必填 说明 默认值
responseTopic string 是 响应主题 -
responseQos int 否 响应QOS 0

# 示例

以下是使用endpoint的示例代码:

  • RestEndpoint (opens new window)
  • WebsocketEndpoint (opens new window)
  • MqttEndpoint (opens new window)
  • ScheduleEndpoint (opens new window)
  • NetEndpoint (opens new window)
  • KafkaEndpoint (opens new window) (扩展组件库)

# 注意事项:主题重叠(Topic Overlap)与重复消息

在部分 MQTT Broker(例如 EMQX)中,如果同一个客户端同时订阅了存在重叠关系的主题模式,匹配到同一条消息时,Broker 会针对每个订阅分别投递一次,导致客户端收到重复数据。典型示例:

  • /sys/msg/a/+
  • /sys/msg/+/+

上述两个订阅的匹配范围存在交集,当消息主题命中交集部分时,同一条消息会被投递两次(或多次,取决于订阅数量)。在使用 Mqtt Endpoint 路由到规则链处理时,需要考虑去重或避免重叠订阅。

建议:通过只订阅一个最大覆盖的主题,然后通过节点进行路由分发,例如:通过msgType(msgType=topic)或者 metadata.topic 进行分发。或者js脚本进行复杂的分发处理。

1)分发方案A:通过 msgTypeSwitch 组件使用 msgType 匹配分发

{
	"ruleChain": {
		"id": "tgimYPt5L06J",
		"name": "test",
		"root": true,
		"debugMode": true,
		"additionalInfo": {
			"description": "",
			"noDefaultInput": false,
			"layoutX": "306",
			"layoutY": "285"
		},
		"configuration": {}
	},
	"metadata": {
		"endpoints": [
			{
				"id": "node_1",
				"type": "endpoint/mqtt",
				"name": "MQTT",
				"configuration": {
					"clientID": "test5888",
					"maxReconnectInterval": 0,
					"password": "nc_admin",
					"qOS": 0,
					"server": "192.168.62.20:1883",
					"username": "nc_admin"
				},
				"debugMode": false,
				"additionalInfo": {
					"layoutX": 351,
					"layoutY": 132
				},
				"routers": [
					{
						"id": "m62quFtZlMMZ",
						"params": [],
						"from": {
							"path": "/sys/msg/+/+",
							"configuration": null,
							"processors": []
						},
						"to": {
							"path": "tgimYPt5L06J:node_3",
							"configuration": null,
							"wait": false,
							"processors": []
						}
					}
				]
			}
		],
		"nodes": [
			{
				"id": "node_6",
				"type": "log",
				"name": "日志",
				"configuration": {
					"jsScript": "return 'Incoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);"
				},
				"debugMode": false,
				"additionalInfo": {
					"layoutX": 1048,
					"layoutY": 248
				}
			},
			{
				"id": "node_5",
				"type": "log",
				"name": "日志",
				"configuration": {
					"jsScript": "return 'Incoming message:\\n' + JSON.stringify(msg) + '\\nIncoming metadata:\\n' + JSON.stringify(metadata);"
				},
				"debugMode": false,
				"additionalInfo": {
					"layoutX": 1040,
					"layoutY": -29
				}
			},
			{
				"type": "msgTypeSwitch",
				"debugMode": false,
				"id": "node_3",
				"name": "消息路由",
				"additionalInfo": {
					"layoutX": 748,
					"layoutY": 126
				}
			}
		],
		"connections": [
			{
				"fromId": "node_3",
				"toId": "node_5",
				"type": "/sys/msg/device01/data"
			},
			{
				"fromId": "node_3",
				"toId": "node_6",
				"type": "/sys/msg/device02/data"
			}
		]
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104

mqtt_endpoint_demo.png

在 GitHub 上编辑此页 (opens new window)
上次更新: 2026/01/07, 01:45:04
Websocket Endpoint
Schedule Endpoint

← Websocket Endpoint Schedule Endpoint→

Theme by Vdoing | Copyright © 2023-2026 RuleGo Team | Apache 2.0 License

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式