kafka客户端
x/kafkaProducer
组件:v0.23.0+ Kafka生产者组件。用于将消息发布到Kafka指定主题。
# 配置
该组件支持通过server
字段复用共享的Kafka连接客户端,避免重复创建连接。详见组件连接复用。
字段 | 类型 | 必填 | 说明 | 默认值 |
---|---|---|---|---|
brokers | []string | 是 | Kafka服务器地址列表 | ["localhost:9092"] |
topic | string | 是 | 发布主题,支持使用组件配置变量进行动态配置 | 无 |
key | string | 否 | 消息分区键,支持使用组件配置变量进行动态配置。用于控制消息分区分配 | 无 |
partition | int | 否 | 指定分区编号。如果设置,消息将直接发送到指定分区,key配置将被忽略 | -1 |
# 工作原理
- 组件初始化时会根据配置连接到Kafka集群
- 接收到消息后,将消息内容发布到指定的topic
- 发布成功后通过Success链路由,失败则通过Failure链路由
- 组件会自动管理连接的生命周期,包括重连等
# Relation Type
- Success: 以下情况消息发送到
Success
链路:- 消息成功发布到Kafka集群
- 收到Kafka服务器确认
- Failure: 以下情况消息发送到
Failure
链路:- 连接Kafka集群失败
- 发布消息超时
- 发布消息失败
- 配置参数错误
# 执行结果
组件执行完成后会更新消息的元数据信息:
- msg.data保持不变
- metadata会添加以下字段:
- partition: 消息实际写入的分区编号
- offset: 消息在分区中的偏移量
- msgType保持不变
# 配置示例
{
"id": "s5",
"type": "x/kafkaProducer",
"name": "发布到kafka",
"debugMode": true,
"configuration": {
"topic": "device.msg.request",
"brokers": ["localhost:9092"]
}
}
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
# 应用示例
在 GitHub 上编辑此页 (opens new window)
上次更新: 2024/12/22, 03:38:12