Python脚本节点
x/python 组件:Python 脚本动作节点。通过子进程执行 Python 脚本,对消息进行转换、过滤或增强处理。支持内联脚本和外部 .py 文件两种模式。
# 前置条件
- 需要安装 Python 3.x,并确保
python3或python命令在系统 PATH 中可用 - 支持所有 Python 3 标准库和已安装的第三方库
# 配置
| 字段 | 类型 | 说明 | 默认值 |
|---|---|---|---|
| script | string | Python 脚本内容(内联模式)或 .py 文件路径(文件模式) | 无(必填) |
| pythonPath | string | Python 可执行文件路径 | 自动检测(依次尝试 python3、python) |
| timeout | string | 脚本执行超时时间。支持 Go 时长格式("5s"、"1000ms"、"1m")或纯数字秒数("5") | 使用规则引擎全局配置 ScriptMaxExecutionTime |
| maxRunning | int | 最大并发 Python 子进程数量 | 10 |
# script 字段说明
内联模式:只提供函数体内容,框架自动生成函数定义。不要写 def Process(...): 行。
# 内联模式:只需要写函数体
import json
data = json.loads(msg) if isinstance(msg, str) else msg
data["processed"] = True
return json.dumps(data), metadata, msgType
1
2
3
4
5
2
3
4
5
文件模式:以 .py 结尾的路径。文件中必须定义 Process 函数。
# 文件模式:完整函数定义
import json
def Process(msg, metadata, msgType, dataType):
data = msg if isinstance(msg, dict) else json.loads(msg)
data["processed"] = True
return json.dumps(data), metadata, msgType
1
2
3
4
5
6
7
2
3
4
5
6
7
内联模式限制
内联模式下不要写 def Process(...): 行,否则会报初始化错误。定义辅助函数(如 def helper(x):)是允许的。
# 函数参数
Process 函数支持 4~6 个参数,框架通过 inspect.signature 自动适配:
| 参数 | 类型 | 说明 | 是否必须 |
|---|---|---|---|
| msg | dict / str / list | 消息内容。如果 dataType 为 JSON,自动解析为 Python dict;否则为字符串 | 是 |
| metadata | dict | 消息元数据,str -> str 字典 | 是 |
| msgType | str | 消息类型 | 是 |
| dataType | str | 数据类型("JSON"、"TEXT"、"BINARY" 等) | 是 |
| vars | dict | 节点配置变量,str -> str 字典 | 否 |
| globalProps | dict | 规则引擎全局属性,str -> str 字典 | 否 |
# 参数适配规则
# 4 参数:标准签名
def Process(msg, metadata, msgType, dataType):
...
# 5 参数:额外接收 vars
def Process(msg, metadata, msgType, dataType, vars={}):
server = vars.get("server", "unknown")
# 6 参数:额外接收 vars 和 globalProps
def Process(msg, metadata, msgType, dataType, vars={}, globalProps={}):
env = globalProps.get("env", "unknown")
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
# 返回值
Process 函数支持 3 种返回形式:
# 1. 元组(Tuple)
返回最多 3 个元素,分别对应 msg、metadata、msgType:
return new_msg, metadata, "NEW_TYPE" # 完整三元组
return new_msg, metadata # 只修改 msg 和 metadata
return new_msg # 只修改 msg
1
2
3
2
3
# 2. 字典(Dict)
返回包含 msg、metadata、msgType 键的字典:
return {"msg": "transformed", "msgType": "PROCESSED"}
return {"msg": new_data, "metadata": metadata, "msgType": msgType}
1
2
2
字典识别规则
返回的 dict 只有包含 msg、metadata 或 msgType 中的任意一个键时,才被视为结构化结果。否则会被整体包装为 {"msg": result}。
# 3. 单值(Single Value)
返回任意值,自动包装为 {"msg": value}:
return "plain string result"
return 42
return [1, 2, 3]
return {"data": 123} # 不含 msg/metadata/msgType 键,整体作为 msg
1
2
3
4
2
3
4
# 错误处理
脚本中抛出异常时,节点会将消息发送到 Failure 链,错误信息包含 Python traceback:
# 脚本中抛出异常
if temperature > 100:
raise ValueError(f"温度超限: {temperature}")
1
2
3
2
3
节点在 Failure 链回调中可以获取错误信息:
ctx.TellFailure(msg, err) // err 包含 Python 的 traceback 信息
1
常见错误场景
- 语法错误:脚本代码不符合 Python 语法规范
- 运行时错误:如 KeyError、ValueError、TypeError 等
- 超时:脚本执行时间超过配置的 timeout
- Python 未安装:系统找不到 python 可执行文件
# Relation Type
- Success: 脚本执行成功,把转换后的消息发送到
Success链 - Failure: 脚本执行失败(异常、超时、语法错误等),把原始消息发送到
Failure链
# 配置示例
# 基本消息转换
{
"id": "s1",
"type": "x/python",
"name": "温度转换",
"configuration": {
"script": "import json\ndata = json.loads(msg) if isinstance(msg, str) else msg\ndata[\"temperature\"] = data[\"temperature\"] * 1.8 + 32\ndata[\"unit\"] = \"F\"\nreturn json.dumps(data), metadata, msgType"
}
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 修改元数据
{
"id": "s2",
"type": "x/python",
"name": "添加元数据",
"configuration": {
"script": "metadata[\"processedAt\"] = \"2026-04-23\"\nmetadata[\"processor\"] = \"python\"\nreturn msg, metadata, msgType"
}
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 修改消息类型
{
"id": "s3",
"type": "x/python",
"name": "修改消息类型",
"configuration": {
"script": "return msg, metadata, \"ALARM\""
}
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 使用 vars 配置变量
{
"id": "s4",
"type": "x/python",
"name": "使用配置变量",
"configuration": {
"script": "metadata[\"server\"] = vars.get(\"server\", \"unknown\")\nmetadata[\"env\"] = vars.get(\"env\", \"unknown\")\nreturn msg, metadata, msgType",
"vars": {
"server": "prod-server-01",
"env": "production"
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
vars vs globalProps
- vars:节点级别配置变量,在节点配置中设置,适合存放服务器地址、阈值等特定于节点的参数
- globalProps:规则引擎全局属性,通过
config.Properties设置,适合存放环境变量、公共配置等
# 使用 globalProps 全局属性
config := types.NewConfig()
config.Properties = types.Properties{
"env": "production",
"region": "us-east-1",
}
1
2
3
4
5
2
3
4
5
{
"id": "s5",
"type": "x/python",
"name": "使用全局属性",
"configuration": {
"script": "import json\ndata = json.loads(msg) if isinstance(msg, str) else msg\ndata[\"region\"] = globalProps.get(\"region\", \"unknown\")\nreturn json.dumps(data), metadata, msgType"
}
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 使用外部 Python 文件
{
"id": "s6",
"type": "x/python",
"name": "外部脚本",
"configuration": {
"script": "/path/to/process.py",
"pythonPath": "python3",
"timeout": "10s",
"maxRunning": 5
}
}
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
对应的 process.py 文件:
import json
def Process(msg, metadata, msgType, dataType, vars={}, globalProps={}):
data = msg if isinstance(msg, dict) else json.loads(msg)
data["server"] = vars.get("server", "unknown")
data["env"] = globalProps.get("env", "unknown")
metadata["processedBy"] = "python"
return json.dumps(data), metadata, msgType
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 使用第三方库
{
"id": "s7",
"type": "x/python",
"name": "HTTP请求",
"configuration": {
"script": "import urllib.request\nimport json\nresp = urllib.request.urlopen('http://api.example.com/data')\ndata = json.loads(resp.read())\nreturn json.dumps(data), metadata, msgType",
"timeout": "30s"
}
}
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# 定义辅助函数
{
"id": "s8",
"type": "x/python",
"name": "辅助函数",
"configuration": {
"script": "def calculate_factorial(n):\n if n <= 1:\n return 1\n return n * calculate_factorial(n - 1)\nimport json\ndata = json.loads(msg) if isinstance(msg, str) else msg\ndata[\"factorial\"] = calculate_factorial(data.get(\"n\", 5))\nreturn json.dumps(data), metadata, msgType"
}
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 限制与注意事项
- 子进程模式:每条消息都会启动一个新的 Python 子进程,存在启动开销。不适合对延迟极度敏感的场景
- 并发控制:通过
maxRunning控制最大并发子进程数量,防止资源耗尽。默认 10 - 超时保护:建议始终配置
timeout,防止脚本死循环或网络请求阻塞导致子进程无法退出 - JSON 序列化:返回值通过 JSON 序列化传递,不支持 Python 特有对象(如 datetime、set 等),需要先转换为基本类型
- 进程隔离:每次调用是独立进程,Python 全局状态(模块级变量)不会在调用之间保持
- 第三方库:可以使用所有已安装的 Python 库,但需要确保环境中已安装
- 文件路径:文件模式下的
.py文件路径需要是绝对路径或相对于工作目录的正确相对路径
# 环境安装
# Linux / macOS
# Ubuntu/Debian
sudo apt-get install python3
# macOS (Homebrew)
brew install python3
1
2
3
4
5
2
3
4
5
# Windows
- 从 Python 官网 (opens new window) 下载安装包
- 安装时勾选 Add Python to PATH
- 验证安装:
python --version
# Python 3.x.x
1
2
2
# Docker
FROM golang:1.21
RUN apt-get update && apt-get install -y python3
1
2
2
# 环境变量
可以通过 PYTHON_PATH 环境变量指定 Python 可执行文件路径:
export PYTHON_PATH=/usr/bin/python3
1
或在 Go 代码中通过 pythonPath 配置项指定:
{
"pythonPath": "/usr/bin/python3"
}
1
2
3
2
3
在 GitHub 上编辑此页 (opens new window)
上次更新: 2026/04/24, 01:25:19