Stream Aggregator
# streamAggregator
Node Type: x/streamAggregator
Description: Stream aggregator node, used for processing aggregate SQL queries, such as window aggregation, group aggregation, etc. This component is based on the StreamSQL engine and supports aggregation calculations for various window types such as Tumbling Window and Sliding Window. Supports single data and array data input.
# Input Data Support
This node supports two input data formats:
# Single Data Input
Directly process a single JSON object:
{"deviceId": "sensor001", "temperature": 25.5, "humidity": 60.2}
# Array Data Input
Automatically process JSON arrays, adding each element in the array to the aggregation stream one by one:
[
{"deviceId": "sensor001", "temperature": 25.5, "humidity": 60.2},
{"deviceId": "sensor002", "temperature": 28.3, "humidity": 55.8},
{"deviceId": "sensor003", "temperature": 22.1, "humidity": 65.4}
]
2
3
4
5
Array Processing Description
- Each element in the array will be added to the aggregation stream one by one to participate in the aggregation calculation.
- The original array message will continue to be passed through the Success chain, maintaining data flow continuity.
- Aggregation results are still passed through the window_event chain.
# Configuration
| Field | Type | Description | Default Value |
|---|---|---|---|
| sql | string | Aggregate SQL query statement, must contain aggregation functions (e.g., COUNT, SUM, AVG, MAX, MIN) or window functions | None |
# SQL Syntax Support
Detailed Syntax Reference
For complete SQL syntax instructions, please refer to: StreamSQL SQL Syntax Reference
# Relation Types
- Success: After the original message is successfully processed, the original message is passed through this relation chain.
- window_event: Aggregation results are passed through this relation chain. The message body is the result of the aggregation calculation, and the result format is a multi-column array.
- Failure: When processing fails, error information is passed through this relation chain.
# Execution Results
# Success Chain Output
The original message remains unchanged and continues to be passed to the next node.
# window_event Chain Output
The aggregation result is passed as a new message, format:
[
{
"field1": "value1",
"field2": "value2",
"count": 10,
"avg_temperature": 25.5
}
]
2
3
4
5
6
7
8
# Failure Chain Output
Error message, containing specific error descriptions.
# Configuration Examples
# Basic Group Aggregation
{
"id": "s1",
"type": "x/streamAggregator",
"name": "Device Temperature Aggregation",
"configuration": {
"sql": "SELECT deviceId, AVG(temperature) as avg_temp, MAX(temperature) as max_temp, COUNT(*) as count FROM stream GROUP BY deviceId, TumblingWindow('2s')"
}
}
2
3
4
5
6
7
8
# Sliding Window Aggregation
{
"id": "s2",
"type": "x/streamAggregator",
"name": "Sliding Window Analysis",
"configuration": {
"sql": "SELECT AVG(temperature) as avg_temp, COUNT(*) as count FROM stream GROUP BY SlidingWindow('10s', '2s')"
}
}
2
3
4
5
6
7
8
# Multi-field Aggregation
{
"id": "s3",
"type": "x/streamAggregator",
"name": "Multi-dimensional Aggregation",
"configuration": {
"sql": "SELECT deviceType, location, AVG(temperature) as avg_temp, MIN(humidity) as min_humidity, MAX(pressure) as max_pressure FROM stream GROUP BY deviceType, location, TumblingWindow('5m')"
}
}
2
3
4
5
6
7
8
# Application Examples
# Example 1: Device Status Monitoring
Scenario: Monitor IoT device temperature data, calculating the average and maximum temperature of each device every 2 seconds.
Rule Chain Configuration:
{
"ruleChain": {
"id": "device_monitoring",
"name": "Device Monitoring Rule Chain",
"root": true
},
"metadata": {
"nodes": [
{
"id": "s1",
"type": "x/streamAggregator",
"name": "Temperature Aggregation",
"configuration": {
"sql": "SELECT deviceId, AVG(temperature) as avg_temp, MAX(temperature) as max_temp, COUNT(*) as count FROM stream GROUP BY deviceId, TumblingWindow('2s')"
}
},
{
"id": "s2",
"type": "jsTransform",
"name": "Result Processing",
"configuration": {
"jsScript": "msg.timestamp = new Date().toISOString(); return {'msg': msg, 'metadata': metadata, 'msgType': msgType};"
}
},
{
"id": "s3",
"type": "log",
"name": "Aggregation Result Log",
"configuration": {
"jsScript": "return 'Aggregation Result: ' + JSON.stringify(msg);"
}
},
{
"id": "s4",
"type": "log",
"name": "Original Data Log",
"configuration": {
"jsScript": "return 'Original Data: ' + JSON.stringify(msg);"
}
}
],
"connections": [
{
"fromId": "s1",
"toId": "s2",
"type": "window_event"
},
{
"fromId": "s1",
"toId": "s4",
"type": "Success"
},
{
"fromId": "s2",
"toId": "s3",
"type": "Success"
}
]
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
Input Data:
{"deviceId": "device001", "temperature": 25.5, "timestamp": "2023-09-13T10:00:00Z"}
{"deviceId": "device001", "temperature": 26.2, "timestamp": "2023-09-13T10:00:01Z"}
{"deviceId": "device002", "temperature": 24.8, "timestamp": "2023-09-13T10:00:01Z"}
2
3
Aggregation Result Output:
{
"deviceId": "device001",
"avg_temp": 25.85,
"max_temp": 26.2,
"count": 2
}
2
3
4
5
6