Stream Processing
# Stream Processing Components
RuleGo provides stream processing components based on the StreamSQL engine, supporting real-time data processing using SQL syntax.
# Component List
# streamTransform
Node Type: x/streamTransform
Stream Transformer component, used to handle non-aggregate SQL queries, supports:
- Data filtering and field selection
- Field renaming and calculation
- Conditional filtering and data validation
- Single and batch data processing
- 60+ built-in function support
Applicable Scenarios: Real-time data cleaning, format conversion, simple calculation
# streamAggregator
Node Type: x/streamAggregator
Stream Aggregator component, used to handle aggregate SQL queries, supports:
- Window aggregation (Tumbling Window, Sliding Window, Count Window, etc.)
- Group aggregation and multi-dimensional statistics
- Aggregation functions (COUNT, SUM, AVG, MAX, MIN, etc.)
- Real-time calculation and result output
Applicable Scenarios: Real-time statistical analysis, monitoring alarms, data summarization
# Quick Start
# 1. Install Dependencies
go get github.com/rulego/rulego-components
1
# 2. Register Components
import _ "github.com/rulego/rulego-components/stats/streamsql"
1
# 3. Usage Examples
# Data Transformation Example
{
"id": "transform1",
"type": "x/streamTransform",
"name": "Temperature Conversion",
"configuration": {
"sql": "SELECT deviceId, temperature, temperature * 1.8 + 32 as temp_fahrenheit FROM stream WHERE temperature > 0"
}
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# Data Aggregation Example
{
"id": "aggregator1",
"type": "x/streamAggregator",
"name": "Temperature Statistics",
"configuration": {
"sql": "SELECT deviceId, AVG(temperature) as avg_temp, COUNT(*) as count FROM stream GROUP BY deviceId, TumblingWindow('5m')"
}
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# Application Scenarios
# IoT Data Processing
- Sensor data cleaning and formatting
- Real-time monitoring of indicators such as temperature and humidity
- Device status statistics and alarms
# Real-time Monitoring
- System performance indicator aggregation
- Anomaly detection and alerting
- Real-time dashboard data processing
# Data Analysis
- Stream data preprocessing
- Real-time statistical calculation
- Multi-dimensional data analysis
Edit this page on GitHub (opens new window)
Last Updated: 2026/02/04, 10:50:11