RuleGo RuleGo
🏠Home
  • Quick Start
  • Rule Chain
  • Standard Components
  • Extension Components
  • Custom Components
  • Visualization
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • Trigger
  • Advanced Topics
  • Performance
  • Standard Components
  • Extension Components
  • Custom Components
  • Components Marketplace
  • Overview
  • Quick Start
  • Routing
  • DSL
  • API
  • Options
  • Components
🔥Editor (opens new window)
  • RuleGo Editor (opens new window)
  • RuleGo Server (opens new window)
  • StreamSQL
  • Github (opens new window)
  • Gitee (opens new window)
  • Changelog (opens new window)
  • English
  • 简体中文
🏠Home
  • Quick Start
  • Rule Chain
  • Standard Components
  • Extension Components
  • Custom Components
  • Visualization
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • Trigger
  • Advanced Topics
  • Performance
  • Standard Components
  • Extension Components
  • Custom Components
  • Components Marketplace
  • Overview
  • Quick Start
  • Routing
  • DSL
  • API
  • Options
  • Components
🔥Editor (opens new window)
  • RuleGo Editor (opens new window)
  • RuleGo Server (opens new window)
  • StreamSQL
  • Github (opens new window)
  • Gitee (opens new window)
  • Changelog (opens new window)
  • English
  • 简体中文

广告采用随机轮播方式显示 ❤️成为赞助商
  • Quick Start

  • Rule Chain

  • Standard Components

  • Extension Components

    • Extension Components Overview
    • filter

    • transform

    • external

    • ai

    • ci

    • IoT

    • Stream Processing

      • Stream Processing
      • Stream Aggregator
        • Input Data Support
          • Single Data Input
          • Array Data Input
        • Configuration
        • SQL Syntax Support
        • Relation Types
        • Execution Results
          • Success Chain Output
          • window_event Chain Output
          • Failure Chain Output
        • Configuration Examples
          • Basic Group Aggregation
          • Sliding Window Aggregation
          • Multi-field Aggregation
        • Application Examples
          • Example 1: Device Status Monitoring
      • Stream Transformer
    • file

  • Custom Components

  • Components marketplace

  • Visualization

  • AOP

  • Trigger

  • Advanced Topic

  • RuleGo-Server

  • FAQ

  • Endpoint Module

  • Support

  • StreamSQL

目录

Stream Aggregator

# streamAggregator

Node Type: x/streamAggregator

Description: Stream aggregator node, used for processing aggregate SQL queries, such as window aggregation, group aggregation, etc. This component is based on the StreamSQL engine and supports aggregation calculations for various window types such as Tumbling Window and Sliding Window. Supports single data and array data input.

# Input Data Support

This node supports two input data formats:

# Single Data Input

Directly process a single JSON object:

{"deviceId": "sensor001", "temperature": 25.5, "humidity": 60.2}
1

# Array Data Input

Automatically process JSON arrays, adding each element in the array to the aggregation stream one by one:

[
  {"deviceId": "sensor001", "temperature": 25.5, "humidity": 60.2},
  {"deviceId": "sensor002", "temperature": 28.3, "humidity": 55.8},
  {"deviceId": "sensor003", "temperature": 22.1, "humidity": 65.4}
]
1
2
3
4
5

Array Processing Description

  • Each element in the array will be added to the aggregation stream one by one to participate in the aggregation calculation.
  • The original array message will continue to be passed through the Success chain, maintaining data flow continuity.
  • Aggregation results are still passed through the window_event chain.

# Configuration

Field Type Description Default Value
sql string Aggregate SQL query statement, must contain aggregation functions (e.g., COUNT, SUM, AVG, MAX, MIN) or window functions None

# SQL Syntax Support

Detailed Syntax Reference

For complete SQL syntax instructions, please refer to: StreamSQL SQL Syntax Reference

# Relation Types

  • Success: After the original message is successfully processed, the original message is passed through this relation chain.
  • window_event: Aggregation results are passed through this relation chain. The message body is the result of the aggregation calculation, and the result format is a multi-column array.
  • Failure: When processing fails, error information is passed through this relation chain.

# Execution Results

# Success Chain Output

The original message remains unchanged and continues to be passed to the next node.

# window_event Chain Output

The aggregation result is passed as a new message, format:

[
  {
    "field1": "value1",
    "field2": "value2",
    "count": 10,
    "avg_temperature": 25.5
  }
]
1
2
3
4
5
6
7
8

# Failure Chain Output

Error message, containing specific error descriptions.

# Configuration Examples

# Basic Group Aggregation

{
  "id": "s1",
  "type": "x/streamAggregator",
  "name": "Device Temperature Aggregation",
  "configuration": {
    "sql": "SELECT deviceId, AVG(temperature) as avg_temp, MAX(temperature) as max_temp, COUNT(*) as count FROM stream GROUP BY deviceId, TumblingWindow('2s')"
  }
}
1
2
3
4
5
6
7
8

# Sliding Window Aggregation

{
  "id": "s2",
  "type": "x/streamAggregator",
  "name": "Sliding Window Analysis",
  "configuration": {
    "sql": "SELECT AVG(temperature) as avg_temp, COUNT(*) as count FROM stream GROUP BY SlidingWindow('10s', '2s')"
  }
}
1
2
3
4
5
6
7
8

# Multi-field Aggregation

{
  "id": "s3",
  "type": "x/streamAggregator",
  "name": "Multi-dimensional Aggregation",
  "configuration": {
    "sql": "SELECT deviceType, location, AVG(temperature) as avg_temp, MIN(humidity) as min_humidity, MAX(pressure) as max_pressure FROM stream GROUP BY deviceType, location, TumblingWindow('5m')"
  }
}
1
2
3
4
5
6
7
8

# Application Examples

# Example 1: Device Status Monitoring

Scenario: Monitor IoT device temperature data, calculating the average and maximum temperature of each device every 2 seconds.

Rule Chain Configuration:

{
  "ruleChain": {
    "id": "device_monitoring",
    "name": "Device Monitoring Rule Chain",
    "root": true
  },
  "metadata": {
    "nodes": [
      {
        "id": "s1",
        "type": "x/streamAggregator",
        "name": "Temperature Aggregation",
        "configuration": {
          "sql": "SELECT deviceId, AVG(temperature) as avg_temp, MAX(temperature) as max_temp, COUNT(*) as count FROM stream GROUP BY deviceId, TumblingWindow('2s')"
        }
      },
      {
        "id": "s2",
        "type": "jsTransform",
        "name": "Result Processing",
        "configuration": {
          "jsScript": "msg.timestamp = new Date().toISOString(); return {'msg': msg, 'metadata': metadata, 'msgType': msgType};"
        }
      },
      {
        "id": "s3",
        "type": "log",
        "name": "Aggregation Result Log",
        "configuration": {
          "jsScript": "return 'Aggregation Result: ' + JSON.stringify(msg);"
        }
      },
      {
        "id": "s4",
        "type": "log",
        "name": "Original Data Log",
        "configuration": {
          "jsScript": "return 'Original Data: ' + JSON.stringify(msg);"
        }
      }
    ],
    "connections": [
      {
        "fromId": "s1",
        "toId": "s2",
        "type": "window_event"
      },
      {
        "fromId": "s1",
        "toId": "s4",
        "type": "Success"
      },
      {
        "fromId": "s2",
        "toId": "s3",
        "type": "Success"
      }
    ]
  }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60

Input Data:

{"deviceId": "device001", "temperature": 25.5, "timestamp": "2023-09-13T10:00:00Z"}
{"deviceId": "device001", "temperature": 26.2, "timestamp": "2023-09-13T10:00:01Z"}
{"deviceId": "device002", "temperature": 24.8, "timestamp": "2023-09-13T10:00:01Z"}
1
2
3

Aggregation Result Output:

{
  "deviceId": "device001",
  "avg_temp": 25.85,
  "max_temp": 26.2,
  "count": 2
}
1
2
3
4
5
6
Edit this page on GitHub (opens new window)
Last Updated: 2026/02/04, 10:50:11
Stream Processing
Stream Transformer

← Stream Processing Stream Transformer→

Theme by Vdoing | Copyright © 2023-2026 RuleGo Team | Apache 2.0 License

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式