RuleGo RuleGo
🏠Home
  • Quick Start
  • Rule Chain
  • Standard Components
  • Extension Components
  • Custom Components
  • Visualization
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • Trigger
  • Advanced Topics
  • Performance
  • Standard Components
  • Extension Components
  • Custom Components
  • Components Marketplace
  • Overview
  • Quick Start
  • Routing
  • DSL
  • API
  • Options
  • Components
🔥Editor (opens new window)
  • RuleGo Editor (opens new window)
  • RuleGo Server (opens new window)
  • StreamSQL
  • Github (opens new window)
  • Gitee (opens new window)
  • Changelog (opens new window)
  • English
  • 简体中文
🏠Home
  • Quick Start
  • Rule Chain
  • Standard Components
  • Extension Components
  • Custom Components
  • Visualization
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • Trigger
  • Advanced Topics
  • Performance
  • Standard Components
  • Extension Components
  • Custom Components
  • Components Marketplace
  • Overview
  • Quick Start
  • Routing
  • DSL
  • API
  • Options
  • Components
🔥Editor (opens new window)
  • RuleGo Editor (opens new window)
  • RuleGo Server (opens new window)
  • StreamSQL
  • Github (opens new window)
  • Gitee (opens new window)
  • Changelog (opens new window)
  • English
  • 简体中文

广告采用随机轮播方式显示 ❤️成为赞助商
  • Quick Start

  • Rule Chain

  • Standard Components

  • Extension Components

    • Extension Components Overview
    • filter

    • transform

    • external

    • ai

    • ci

    • IoT

    • Stream Processing

      • Stream Processing
      • Stream Aggregator
      • Stream Transformer
        • Features
        • Input Data Support
          • Single Data Input
          • Array Data Input
        • Configuration
        • SQL Syntax Support
        • Relation Types
        • Execution Results
          • Success Chain Output
          • Failure Chain Output
        • Configuration Examples
          • Basic Field Transformation
          • Data Filtering and Calculation
          • String Processing
        • Application Examples
          • Example 1: IoT Data Preprocessing
    • file

  • Custom Components

  • Components marketplace

  • Visualization

  • AOP

  • Trigger

  • Advanced Topic

  • RuleGo-Server

  • FAQ

  • Endpoint Module

  • Support

  • StreamSQL

目录

Stream Transformer

# streamTransform

Node Type: x/streamTransform

Description: Stream transformer node, based on the StreamSQL engine, uses SQL syntax to filter, transform, and process fields of real-time data streams. Specifically handles non-aggregate queries, such as data filtering, field transformation, format conversion, etc. Supports single data and array data input.

# Features

  • SQL Syntax: Use standard SQL syntax for data transformation, low learning cost.
  • Real-time Processing: Synchronous processing of single data and array data.
  • Field Operations: Supports field selection, renaming, calculation, and conditional filtering.
  • Function Support: 60+ built-in functions, including math, string, time, etc.
  • Conditional Filtering: Supports WHERE clause for data filtering.
  • Array Processing: Automatically processes array data, transforms each element and merges the results.

# Input Data Support

This node supports two input data formats:

# Single Data Input

Directly process a single JSON object. If the transformation is successful, it is output through the Success chain; if it fails or does not meet the WHERE condition, it is output through the Failure chain:

{"deviceId": "sensor001", "temperature": 25.5, "humidity": 60.2}
1

# Array Data Input

Automatically process JSON arrays, traverse each element for transformation, and merge the successfully transformed results into a new array for output:

[
  {"deviceId": "sensor001", "temperature": 25.5, "humidity": 60.2},
  {"deviceId": "sensor002", "temperature": 28.3, "humidity": 55.8},
  {"deviceId": "sensor003", "temperature": 22.1, "humidity": 65.4}
]
1
2
3
4
5

Array Processing Description

  • Each element in the array will be processed by SQL transformation one by one.
  • Only elements that are successfully transformed and meet the WHERE condition will be included in the output array.
  • If at least one element is successfully transformed, the merged array is output through the Success chain.
  • If all elements fail to transform or are filtered by the WHERE condition, error information is output through the Failure chain.
  • Message metadata will contain processing statistics: originalCount, transformedCount, failedCount.

# Configuration

Field Type Description Default Value
sql string Transformation SQL query statement, must be a non-aggregate query (cannot contain GROUP BY, aggregation functions, etc.) None

# SQL Syntax Support

Detailed Syntax Reference

For complete SQL syntax instructions, please refer to: StreamSQL SQL Syntax Reference

# Relation Types

  • Success: After successful data transformation, the transformed data is passed through this relation chain.
  • Failure: When transformation fails, error information is passed through this relation chain.

# Execution Results

# Success Chain Output

Transformed data, the format is determined by the SQL query result:

{
  "field1": "transformed_value1",
  "field2": "transformed_value2",
  "calculated_field": 123.45
}
1
2
3
4
5

# Failure Chain Output

Error message, containing specific error descriptions.

# Configuration Examples

# Basic Field Transformation

{
  "id": "s1",
  "type": "x/streamTransform",
  "name": "Temperature Unit Conversion",
  "configuration": {
    "sql": "SELECT deviceId, temperature, humidity, temperature * 1.8 + 32 as temp_fahrenheit FROM stream WHERE temperature IS NOT NULL",
    "debug": false
  }
}
1
2
3
4
5
6
7
8
9

# Data Filtering and Calculation

{
  "id": "s2",
  "type": "x/streamTransform",
  "name": "High Temperature Data Processing",
  "configuration": {
    "sql": "SELECT deviceId, temperature, CASE WHEN temperature > 30 THEN 'HIGH' WHEN temperature < 10 THEN 'LOW' ELSE 'NORMAL' END as temp_level FROM stream WHERE temperature > 20",
    "debug": true
  }
}
1
2
3
4
5
6
7
8
9

# String Processing

{
  "id": "s3",
  "type": "x/streamTransform",
  "name": "Device Information Formatting",
  "configuration": {
    "sql": "SELECT UPPER(deviceId) as device_id, CONCAT(location, '-', deviceType) as device_info, ROUND(temperature, 2) as temp FROM stream",
    "debug": false
  }
}
1
2
3
4
5
6
7
8
9

# Application Examples

# Example 1: IoT Data Preprocessing

Scenario: Clean and format raw data reported by IoT devices.

Rule Chain Configuration:

{
  "ruleChain": {
    "id": "iot_data_preprocessing",
    "name": "IoT Data Preprocessing",
    "root": true
  },
  "metadata": {
    "nodes": [
      {
        "id": "s1",
        "type": "x/streamTransform",
        "name": "Data Cleaning",
        "configuration": {
          "sql": "SELECT deviceId, temperature, humidity, pressure, CASE WHEN temperature > 50 OR temperature < -20 THEN 'INVALID' ELSE 'VALID' END as data_quality FROM stream WHERE deviceId IS NOT NULL"
        }
      },
      {
        "id": "s2",
        "type": "jsFilter",
        "name": "Effective Data Filtering",
        "configuration": {
          "jsScript": "return msg.data_quality === 'VALID';"
        }
      },
      {
        "id": "s3",
        "type": "x/streamTransform",
        "name": "Unit Conversion",
        "configuration": {
          "sql": "SELECT deviceId, ROUND(temperature, 2) as temperature_c, ROUND(temperature * 1.8 + 32, 2) as temperature_f, ROUND(humidity, 1) as humidity_percent, pressure FROM stream"
        }
      },
      {
        "id": "s4",
        "type": "log",
        "name": "Processing Result",
        "configuration": {
          "jsScript": "return 'Processed: ' + JSON.stringify(msg);"
        }
      },
      {
        "id": "s5",
        "type": "log",
        "name": "Invalid Data",
        "configuration": {
          "jsScript": "return 'Invalid data: ' + JSON.stringify(msg);"
        }
      }
    ],
    "connections": [
      {
        "fromId": "s1",
        "toId": "s2",
        "type": "Success"
      },
      {
        "fromId": "s2",
        "toId": "s3",
        "type": "True"
      },
      {
        "fromId": "s2",
        "toId": "s5",
        "type": "False"
      }
    ]
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
Edit this page on GitHub (opens new window)
Last Updated: 2026/02/04, 10:50:11
Stream Aggregator
File Component

← Stream Aggregator File Component→

Theme by Vdoing | Copyright © 2023-2026 RuleGo Team | Apache 2.0 License

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式