Stream Transformer
# streamTransform
Node Type: x/streamTransform
Description: Stream transformer node, based on the StreamSQL engine, uses SQL syntax to filter, transform, and process fields of real-time data streams. Specifically handles non-aggregate queries, such as data filtering, field transformation, format conversion, etc. Supports single data and array data input.
# Features
- SQL Syntax: Use standard SQL syntax for data transformation, low learning cost.
- Real-time Processing: Synchronous processing of single data and array data.
- Field Operations: Supports field selection, renaming, calculation, and conditional filtering.
- Function Support: 60+ built-in functions, including math, string, time, etc.
- Conditional Filtering: Supports WHERE clause for data filtering.
- Array Processing: Automatically processes array data, transforms each element and merges the results.
# Input Data Support
This node supports two input data formats:
# Single Data Input
Directly process a single JSON object. If the transformation is successful, it is output through the Success chain; if it fails or does not meet the WHERE condition, it is output through the Failure chain:
{"deviceId": "sensor001", "temperature": 25.5, "humidity": 60.2}
# Array Data Input
Automatically process JSON arrays, traverse each element for transformation, and merge the successfully transformed results into a new array for output:
[
{"deviceId": "sensor001", "temperature": 25.5, "humidity": 60.2},
{"deviceId": "sensor002", "temperature": 28.3, "humidity": 55.8},
{"deviceId": "sensor003", "temperature": 22.1, "humidity": 65.4}
]
2
3
4
5
Array Processing Description
- Each element in the array will be processed by SQL transformation one by one.
- Only elements that are successfully transformed and meet the WHERE condition will be included in the output array.
- If at least one element is successfully transformed, the merged array is output through the Success chain.
- If all elements fail to transform or are filtered by the WHERE condition, error information is output through the Failure chain.
- Message metadata will contain processing statistics: originalCount, transformedCount, failedCount.
# Configuration
| Field | Type | Description | Default Value |
|---|---|---|---|
| sql | string | Transformation SQL query statement, must be a non-aggregate query (cannot contain GROUP BY, aggregation functions, etc.) | None |
# SQL Syntax Support
Detailed Syntax Reference
For complete SQL syntax instructions, please refer to: StreamSQL SQL Syntax Reference
# Relation Types
- Success: After successful data transformation, the transformed data is passed through this relation chain.
- Failure: When transformation fails, error information is passed through this relation chain.
# Execution Results
# Success Chain Output
Transformed data, the format is determined by the SQL query result:
{
"field1": "transformed_value1",
"field2": "transformed_value2",
"calculated_field": 123.45
}
2
3
4
5
# Failure Chain Output
Error message, containing specific error descriptions.
# Configuration Examples
# Basic Field Transformation
{
"id": "s1",
"type": "x/streamTransform",
"name": "Temperature Unit Conversion",
"configuration": {
"sql": "SELECT deviceId, temperature, humidity, temperature * 1.8 + 32 as temp_fahrenheit FROM stream WHERE temperature IS NOT NULL",
"debug": false
}
}
2
3
4
5
6
7
8
9
# Data Filtering and Calculation
{
"id": "s2",
"type": "x/streamTransform",
"name": "High Temperature Data Processing",
"configuration": {
"sql": "SELECT deviceId, temperature, CASE WHEN temperature > 30 THEN 'HIGH' WHEN temperature < 10 THEN 'LOW' ELSE 'NORMAL' END as temp_level FROM stream WHERE temperature > 20",
"debug": true
}
}
2
3
4
5
6
7
8
9
# String Processing
{
"id": "s3",
"type": "x/streamTransform",
"name": "Device Information Formatting",
"configuration": {
"sql": "SELECT UPPER(deviceId) as device_id, CONCAT(location, '-', deviceType) as device_info, ROUND(temperature, 2) as temp FROM stream",
"debug": false
}
}
2
3
4
5
6
7
8
9
# Application Examples
# Example 1: IoT Data Preprocessing
Scenario: Clean and format raw data reported by IoT devices.
Rule Chain Configuration:
{
"ruleChain": {
"id": "iot_data_preprocessing",
"name": "IoT Data Preprocessing",
"root": true
},
"metadata": {
"nodes": [
{
"id": "s1",
"type": "x/streamTransform",
"name": "Data Cleaning",
"configuration": {
"sql": "SELECT deviceId, temperature, humidity, pressure, CASE WHEN temperature > 50 OR temperature < -20 THEN 'INVALID' ELSE 'VALID' END as data_quality FROM stream WHERE deviceId IS NOT NULL"
}
},
{
"id": "s2",
"type": "jsFilter",
"name": "Effective Data Filtering",
"configuration": {
"jsScript": "return msg.data_quality === 'VALID';"
}
},
{
"id": "s3",
"type": "x/streamTransform",
"name": "Unit Conversion",
"configuration": {
"sql": "SELECT deviceId, ROUND(temperature, 2) as temperature_c, ROUND(temperature * 1.8 + 32, 2) as temperature_f, ROUND(humidity, 1) as humidity_percent, pressure FROM stream"
}
},
{
"id": "s4",
"type": "log",
"name": "Processing Result",
"configuration": {
"jsScript": "return 'Processed: ' + JSON.stringify(msg);"
}
},
{
"id": "s5",
"type": "log",
"name": "Invalid Data",
"configuration": {
"jsScript": "return 'Invalid data: ' + JSON.stringify(msg);"
}
}
],
"connections": [
{
"fromId": "s1",
"toId": "s2",
"type": "Success"
},
{
"fromId": "s2",
"toId": "s3",
"type": "True"
},
{
"fromId": "s2",
"toId": "s5",
"type": "False"
}
]
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68