RuleGo RuleGo
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 流式计算
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • 🌊StreamSQL
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 流式计算
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • 🌊StreamSQL
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文

广告采用随机轮播方式显示 ❤️成为赞助商
  • 快速入门

  • 规则链

  • 标准组件

  • 扩展组件

    • 扩展组件概述
    • 过滤器

    • 动作

      • Python脚本节点
        • 前置条件
        • 配置
          • script 字段说明
        • 函数参数
          • 参数适配规则
        • 返回值
          • 1. 元组(Tuple)
          • 2. 字典(Dict)
          • 3. 单值(Single Value)
        • 错误处理
        • Relation Type
        • 配置示例
          • 基本消息转换
          • 修改元数据
          • 修改消息类型
          • 使用 vars 配置变量
          • 使用 globalProps 全局属性
          • 使用外部 Python 文件
          • 使用第三方库
          • 定义辅助函数
        • 限制与注意事项
        • 环境安装
          • Linux / macOS
          • Windows
          • Docker
          • 环境变量
    • 转换器

    • 外部的

    • AI

    • CI

    • IoT

    • 流式计算

    • 文件

  • 自定义组件

  • 组件市场

  • 可视化

  • AOP

  • 触发器

  • 高级主题

  • RuleGo-Server

  • 问题

目录

Python脚本节点

x/python 组件:Python 脚本动作节点。通过子进程执行 Python 脚本,对消息进行转换、过滤或增强处理。支持内联脚本和外部 .py 文件两种模式。

# 前置条件

  • 需要安装 Python 3.x,并确保 python3 或 python 命令在系统 PATH 中可用
  • 支持所有 Python 3 标准库和已安装的第三方库

# 配置

字段 类型 说明 默认值
script string Python 脚本内容(内联模式)或 .py 文件路径(文件模式) 无(必填)
pythonPath string Python 可执行文件路径 自动检测(依次尝试 python3、python)
timeout string 脚本执行超时时间。支持 Go 时长格式("5s"、"1000ms"、"1m")或纯数字秒数("5") 使用规则引擎全局配置 ScriptMaxExecutionTime
maxRunning int 最大并发 Python 子进程数量 10

# script 字段说明

内联模式:只提供函数体内容,框架自动生成函数定义。不要写 def Process(...): 行。

# 内联模式:只需要写函数体
import json
data = json.loads(msg) if isinstance(msg, str) else msg
data["processed"] = True
return json.dumps(data), metadata, msgType
1
2
3
4
5

文件模式:以 .py 结尾的路径。文件中必须定义 Process 函数。

# 文件模式:完整函数定义
import json

def Process(msg, metadata, msgType, dataType):
    data = msg if isinstance(msg, dict) else json.loads(msg)
    data["processed"] = True
    return json.dumps(data), metadata, msgType
1
2
3
4
5
6
7

内联模式限制

内联模式下不要写 def Process(...): 行,否则会报初始化错误。定义辅助函数(如 def helper(x):)是允许的。

# 函数参数

Process 函数支持 4~6 个参数,框架通过 inspect.signature 自动适配:

参数 类型 说明 是否必须
msg dict / str / list 消息内容。如果 dataType 为 JSON,自动解析为 Python dict;否则为字符串 是
metadata dict 消息元数据,str -> str 字典 是
msgType str 消息类型 是
dataType str 数据类型("JSON"、"TEXT"、"BINARY" 等) 是
vars dict 节点配置变量,str -> str 字典 否
globalProps dict 规则引擎全局属性,str -> str 字典 否

# 参数适配规则

# 4 参数:标准签名
def Process(msg, metadata, msgType, dataType):
    ...

# 5 参数:额外接收 vars
def Process(msg, metadata, msgType, dataType, vars={}):
    server = vars.get("server", "unknown")

# 6 参数:额外接收 vars 和 globalProps
def Process(msg, metadata, msgType, dataType, vars={}, globalProps={}):
    env = globalProps.get("env", "unknown")
1
2
3
4
5
6
7
8
9
10
11

# 返回值

Process 函数支持 3 种返回形式:

# 1. 元组(Tuple)

返回最多 3 个元素,分别对应 msg、metadata、msgType:

return new_msg, metadata, "NEW_TYPE"   # 完整三元组
return new_msg, metadata                # 只修改 msg 和 metadata
return new_msg                          # 只修改 msg
1
2
3

# 2. 字典(Dict)

返回包含 msg、metadata、msgType 键的字典:

return {"msg": "transformed", "msgType": "PROCESSED"}
return {"msg": new_data, "metadata": metadata, "msgType": msgType}
1
2

字典识别规则

返回的 dict 只有包含 msg、metadata 或 msgType 中的任意一个键时,才被视为结构化结果。否则会被整体包装为 {"msg": result}。

# 3. 单值(Single Value)

返回任意值,自动包装为 {"msg": value}:

return "plain string result"
return 42
return [1, 2, 3]
return {"data": 123}  # 不含 msg/metadata/msgType 键,整体作为 msg
1
2
3
4

# 错误处理

脚本中抛出异常时,节点会将消息发送到 Failure 链,错误信息包含 Python traceback:

# 脚本中抛出异常
if temperature > 100:
    raise ValueError(f"温度超限: {temperature}")
1
2
3

节点在 Failure 链回调中可以获取错误信息:

ctx.TellFailure(msg, err)  // err 包含 Python 的 traceback 信息
1

常见错误场景

  • 语法错误:脚本代码不符合 Python 语法规范
  • 运行时错误:如 KeyError、ValueError、TypeError 等
  • 超时:脚本执行时间超过配置的 timeout
  • Python 未安装:系统找不到 python 可执行文件

# Relation Type

  • Success: 脚本执行成功,把转换后的消息发送到 Success 链
  • Failure: 脚本执行失败(异常、超时、语法错误等),把原始消息发送到 Failure 链

# 配置示例

# 基本消息转换

{
  "id": "s1",
  "type": "x/python",
  "name": "温度转换",
  "configuration": {
    "script": "import json\ndata = json.loads(msg) if isinstance(msg, str) else msg\ndata[\"temperature\"] = data[\"temperature\"] * 1.8 + 32\ndata[\"unit\"] = \"F\"\nreturn json.dumps(data), metadata, msgType"
  }
}
1
2
3
4
5
6
7
8

# 修改元数据

{
  "id": "s2",
  "type": "x/python",
  "name": "添加元数据",
  "configuration": {
    "script": "metadata[\"processedAt\"] = \"2026-04-23\"\nmetadata[\"processor\"] = \"python\"\nreturn msg, metadata, msgType"
  }
}
1
2
3
4
5
6
7
8

# 修改消息类型

{
  "id": "s3",
  "type": "x/python",
  "name": "修改消息类型",
  "configuration": {
    "script": "return msg, metadata, \"ALARM\""
  }
}
1
2
3
4
5
6
7
8

# 使用 vars 配置变量

{
  "id": "s4",
  "type": "x/python",
  "name": "使用配置变量",
  "configuration": {
    "script": "metadata[\"server\"] = vars.get(\"server\", \"unknown\")\nmetadata[\"env\"] = vars.get(\"env\", \"unknown\")\nreturn msg, metadata, msgType",
    "vars": {
      "server": "prod-server-01",
      "env": "production"
    }
  }
}
1
2
3
4
5
6
7
8
9
10
11
12

vars vs globalProps

  • vars:节点级别配置变量,在节点配置中设置,适合存放服务器地址、阈值等特定于节点的参数
  • globalProps:规则引擎全局属性,通过 config.Properties 设置,适合存放环境变量、公共配置等

# 使用 globalProps 全局属性

config := types.NewConfig()
config.Properties = types.Properties{
    "env":  "production",
    "region": "us-east-1",
}
1
2
3
4
5
{
  "id": "s5",
  "type": "x/python",
  "name": "使用全局属性",
  "configuration": {
    "script": "import json\ndata = json.loads(msg) if isinstance(msg, str) else msg\ndata[\"region\"] = globalProps.get(\"region\", \"unknown\")\nreturn json.dumps(data), metadata, msgType"
  }
}
1
2
3
4
5
6
7
8

# 使用外部 Python 文件

{
  "id": "s6",
  "type": "x/python",
  "name": "外部脚本",
  "configuration": {
    "script": "/path/to/process.py",
    "pythonPath": "python3",
    "timeout": "10s",
    "maxRunning": 5
  }
}
1
2
3
4
5
6
7
8
9
10
11

对应的 process.py 文件:

import json

def Process(msg, metadata, msgType, dataType, vars={}, globalProps={}):
    data = msg if isinstance(msg, dict) else json.loads(msg)
    data["server"] = vars.get("server", "unknown")
    data["env"] = globalProps.get("env", "unknown")
    metadata["processedBy"] = "python"
    return json.dumps(data), metadata, msgType
1
2
3
4
5
6
7
8

# 使用第三方库

{
  "id": "s7",
  "type": "x/python",
  "name": "HTTP请求",
  "configuration": {
    "script": "import urllib.request\nimport json\nresp = urllib.request.urlopen('http://api.example.com/data')\ndata = json.loads(resp.read())\nreturn json.dumps(data), metadata, msgType",
    "timeout": "30s"
  }
}
1
2
3
4
5
6
7
8
9

# 定义辅助函数

{
  "id": "s8",
  "type": "x/python",
  "name": "辅助函数",
  "configuration": {
    "script": "def calculate_factorial(n):\n    if n <= 1:\n        return 1\n    return n * calculate_factorial(n - 1)\nimport json\ndata = json.loads(msg) if isinstance(msg, str) else msg\ndata[\"factorial\"] = calculate_factorial(data.get(\"n\", 5))\nreturn json.dumps(data), metadata, msgType"
  }
}
1
2
3
4
5
6
7
8

# 限制与注意事项

  1. 子进程模式:每条消息都会启动一个新的 Python 子进程,存在启动开销。不适合对延迟极度敏感的场景
  2. 并发控制:通过 maxRunning 控制最大并发子进程数量,防止资源耗尽。默认 10
  3. 超时保护:建议始终配置 timeout,防止脚本死循环或网络请求阻塞导致子进程无法退出
  4. JSON 序列化:返回值通过 JSON 序列化传递,不支持 Python 特有对象(如 datetime、set 等),需要先转换为基本类型
  5. 进程隔离:每次调用是独立进程,Python 全局状态(模块级变量)不会在调用之间保持
  6. 第三方库:可以使用所有已安装的 Python 库,但需要确保环境中已安装
  7. 文件路径:文件模式下的 .py 文件路径需要是绝对路径或相对于工作目录的正确相对路径

# 环境安装

# Linux / macOS

# Ubuntu/Debian
sudo apt-get install python3

# macOS (Homebrew)
brew install python3
1
2
3
4
5

# Windows

  1. 从 Python 官网 (opens new window) 下载安装包
  2. 安装时勾选 Add Python to PATH
  3. 验证安装:
python --version
# Python 3.x.x
1
2

# Docker

FROM golang:1.21
RUN apt-get update && apt-get install -y python3
1
2

# 环境变量

可以通过 PYTHON_PATH 环境变量指定 Python 可执行文件路径:

export PYTHON_PATH=/usr/bin/python3
1

或在 Go 代码中通过 pythonPath 配置项指定:

{
  "pythonPath": "/usr/bin/python3"
}
1
2
3
在 GitHub 上编辑此页 (opens new window)
上次更新: 2026/04/24, 01:25:19
lua脚本过滤器
lua脚本转换器

← lua脚本过滤器 lua脚本转换器→

Theme by Vdoing | Copyright © 2023-2026 RuleGo Team | Apache 2.0 License

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式