RuleGo RuleGo
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 流式计算
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • 🌊StreamSQL
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文
🏠首页
  • 快速入门
  • 规则链
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 可视化
  • RuleGo-Server
  • RuleGo-MCP-Server
  • AOP
  • 触发器
  • 高级主题
  • 性能
  • 标准组件
  • 扩展组件
  • 自定义组件
  • 流式计算
  • 组件市场
  • 概述
  • 快速入门
  • 路由
  • DSL
  • API
  • Options
  • 组件
🔥编辑器 (opens new window)
  • 可视化编辑器 (opens new window)
  • RuleGo-Server (opens new window)
  • 🌊StreamSQL
  • ❓问答

    • FAQ
💖支持
👥加入社区
  • Github (opens new window)
  • Gitee (opens new window)
  • GitCode (opens new window)
  • 更新日志 (opens new window)
  • English
  • 简体中文

广告采用随机轮播方式显示 ❤️成为赞助商
  • 概述
  • 快速开始
  • 核心概念
  • SQL参考
    • 📋 SQL语法概览
    • SQL语法概览
      • 基本查询结构
      • 支持的子句
    • SELECT 子句
      • 基本语法
      • 支持的选择类型
      • 1. 字段选择
      • 2. 嵌套字段访问
      • 3. 表达式计算
      • 4. 聚合函数
      • DISTINCT 去重
      • 基本语法
      • 字段选择
      • 1. 直接字段引用
      • 2. 表达式计算
      • 3. 函数调用
      • 别名 (AS)
      • DISTINCT
    • FROM 子句
      • 基本语法
      • 数据源类型
      • 数据源指定
    • WHERE 子句
      • 基本语法
      • 支持的条件类型
      • 1. 比较操作符
      • 2. 逻辑操作符
      • 3. 范围操作
      • 4. 模式匹配
      • 5. NULL 检查
      • 6. 嵌套字段条件
      • 过滤条件
      • 1. 比较操作
      • 2. 逻辑操作
      • 3. 复杂条件
      • 支持的操作符
    • GROUP BY 子句
      • 基本语法
      • 分组类型
      • 1. 字段分组
      • 2. 窗口分组
      • 滚动窗口 (Tumbling Window)
      • 滑动窗口 (Sliding Window)
      • 计数窗口 (Counting Window)
      • 会话窗口 (Session Window)
      • 窗口函数
      • 分组字段
      • 窗口函数
      • 1. 滚动窗口 (TumblingWindow)
      • 2. 滑动窗口 (SlidingWindow)
      • 3. 计数窗口 (CountingWindow)
      • 4. 会话窗口 (SessionWindow)
    • HAVING 子句
      • 基本语法
      • 使用示例
      • 1. 聚合函数过滤
      • 2. 复合条件
      • WHERE vs HAVING
    • ORDER BY 子句
    • LIMIT 子句
      • 基本语法
      • 使用示例
      • 注意事项
    • WITH 子句
      • 基本语法
      • 核心作用
      • 支持的选项
      • 1. TIMESTAMP - 时间戳字段配置
      • 2. TIMEUNIT - 时间单位配置
      • 3. MAXOUTOFORDERNESS - 最大乱序时间配置
      • 4. ALLOWEDLATENESS - 允许延迟时间配置
      • 5. IDLETIMEOUT - 空闲超时配置
      • 完整示例
      • 示例1:订单流统计(事件时间)
      • 示例2:订单流统计(处理时间)
      • 配置项
      • 配置项说明
      • MaxOutOfOrderness vs AllowedLateness
    • 数据类型和常量
      • 数值常量
      • 字符串常量
      • 布尔常量
      • NULL值
    • 表达式和操作符
      • 算术操作符
      • 字符串操作
      • 条件表达式
      • CASE表达式
    • 内置函数
      • 聚合函数
      • 数学函数
      • 字符串函数
      • 时间函数
      • 类型转换函数
    • 完整示例
      • 1. 基础查询
      • 2. 聚合分析
      • 3. 复杂表达式
      • 4. 多窗口分析
    • 语法限制
      • 不支持的特性
      • 限制说明
    • 性能优化建议
      • 1. WHERE子句优化
      • 2. 合理使用窗口
      • 3. 表达式优化
  • API参考
  • RuleGo集成
  • 加入社区讨论
  • 函数

  • 案例集锦

目录

SQL参考

# SQL参考

本章提供StreamSQL支持的完整SQL语法参考,包括所有支持的子句、函数和操作符。

# 📋 SQL语法概览

StreamSQL支持标准SQL语法的子集,专门针对流处理进行了优化。

SELECT [DISTINCT] select_list
FROM stream
[WHERE condition]
[GROUP BY grouping_element [, ...]]
[HAVING condition]
[LIMIT count]
[WITH (option = value [, ...])]
1
2
3
4
5
6
7

StreamSQL支持标准SQL语法的子集,专门针对流处理场景进行了优化。本章提供完整的SQL语法参考。

# SQL语法概览

# 基本查询结构

SELECT [DISTINCT] select_list
FROM stream_name
[WHERE condition]
[GROUP BY grouping_list]
[HAVING condition]
[ORDER BY ordering_list]
[LIMIT number]
[WITH (option_list)]
1
2
3
4
5
6
7
8

# 支持的子句

子句 必需 说明
SELECT 是 指定输出字段
FROM 是 指定数据源
WHERE 否 过滤条件
GROUP BY 否 分组和窗口
HAVING 否 聚合结果过滤
ORDER BY 否 排序(有限支持)
LIMIT 否 限制结果数量
WITH 否 配置选项

# SELECT 子句

SELECT子句定义查询的输出字段和计算表达式。

# 基本语法

SELECT column1, column2, expression AS alias
FROM stream
1
2

# 支持的选择类型

# 1. 字段选择

-- 选择所有字段
SELECT * FROM stream

-- 选择特定字段
SELECT deviceId, temperature FROM stream

-- 字段别名
SELECT deviceId AS device, temperature AS temp FROM stream
1
2
3
4
5
6
7
8

# 2. 嵌套字段访问

-- 点号语法访问嵌套字段
SELECT device.info.name, device.location.building FROM stream

-- 深层嵌套
SELECT sensor.data.temperature.value FROM stream
1
2
3
4
5

# 3. 表达式计算

-- 算术表达式
SELECT temperature * 1.8 + 32 AS fahrenheit FROM stream

-- 字符串连接
SELECT CONCAT(deviceId, '-', location) AS full_id FROM stream

-- 条件表达式
SELECT CASE 
    WHEN temperature > 30 THEN 'hot'
    WHEN temperature < 10 THEN 'cold'
    ELSE 'normal'
END AS temp_level FROM stream
1
2
3
4
5
6
7
8
9
10
11
12

# 4. 聚合函数

-- 基础聚合
SELECT COUNT(*), AVG(temperature), MAX(humidity) FROM stream

-- 带分组的聚合
SELECT deviceId, AVG(temperature) FROM stream GROUP BY deviceId
1
2
3
4
5

# DISTINCT 去重

-- 去重查询
SELECT DISTINCT deviceType FROM stream

-- 多字段去重
SELECT DISTINCT deviceId, location FROM stream
1
2
3
4
5

# 基本语法

SELECT column1, column2, ...
SELECT expression AS alias
SELECT *
SELECT DISTINCT column1
1
2
3
4

# 字段选择

# 1. 直接字段引用

-- 选择特定字段
SELECT deviceId, temperature, humidity FROM stream

-- 选择所有字段 
SELECT * FROM stream
1
2
3
4
5

# 2. 表达式计算

-- 算术表达式
SELECT deviceId, temperature * 1.8 + 32 as fahrenheit FROM stream

-- 字符串连接
SELECT CONCAT(deviceId, '_', status) as device_status FROM stream

-- 条件表达式
SELECT deviceId,
       CASE 
           WHEN temperature > 30 THEN 'HIGH'
           WHEN temperature > 20 THEN 'NORMAL' 
           ELSE 'LOW'
       END as temp_level
FROM stream
1
2
3
4
5
6
7
8
9
10
11
12
13
14

# 3. 函数调用

-- 内置函数
SELECT deviceId, UPPER(status), ABS(temperature) FROM stream

-- 聚合函数
SELECT deviceId, AVG(temperature), COUNT(*) FROM stream
GROUP BY deviceId, TumblingWindow('1m')

-- 自定义函数
SELECT deviceId, custom_function(temperature) FROM stream
1
2
3
4
5
6
7
8
9

# 别名 (AS)

-- 字段别名
SELECT temperature AS temp, humidity AS hum FROM stream

-- 表达式别名 
SELECT temperature * 1.8 + 32 AS fahrenheit FROM stream

-- AS关键字可省略
SELECT temperature temp, humidity hum FROM stream
1
2
3
4
5
6
7
8

# DISTINCT

-- 去重(在窗口聚合中使用)
SELECT DISTINCT deviceId, location 
FROM stream 
GROUP BY TumblingWindow('1m')
1
2
3
4

# FROM 子句

FROM子句指定数据源,在StreamSQL中通常为stream。

# 基本语法

SELECT * FROM stream
1

# 数据源类型

  • stream: 默认的流数据源
  • 未来版本可能支持多个命名流
-- 当前支持
SELECT * FROM stream

-- 未来可能支持
SELECT * FROM sensor_stream
SELECT * FROM device_stream
1
2
3
4
5
6

# 数据源指定

-- 标准数据源名称
FROM stream

-- 自定义数据源名称
FROM sensor_data
FROM device_stream
1
2
3
4
5
6

提示

FROM子句中的名称是逻辑概念,实际数据通过 AddData() 方法输入。

# WHERE 子句

WHERE子句用于过滤数据,只处理满足条件的记录。

# 基本语法

SELECT * FROM stream WHERE condition
1

# 支持的条件类型

# 1. 比较操作符

-- 数值比较
SELECT * FROM stream WHERE temperature > 25
SELECT * FROM stream WHERE humidity <= 60
SELECT * FROM stream WHERE pressure = 1013.25
SELECT * FROM stream WHERE voltage != 0

-- 字符串比较
SELECT * FROM stream WHERE deviceId = 'sensor001'
SELECT * FROM stream WHERE location != 'offline'
1
2
3
4
5
6
7
8
9

# 2. 逻辑操作符

-- AND 操作
SELECT * FROM stream WHERE temperature > 20 AND humidity < 80

-- OR 操作
SELECT * FROM stream WHERE deviceType = 'temperature' OR deviceType = 'humidity'

-- NOT 操作
SELECT * FROM stream WHERE NOT (temperature < 0)

-- 复合条件
SELECT * FROM stream WHERE (temperature > 30 OR humidity > 90) AND deviceId LIKE 'sensor%'
1
2
3
4
5
6
7
8
9
10
11

# 3. 范围操作

-- BETWEEN 范围
SELECT * FROM stream WHERE temperature BETWEEN 20 AND 30

-- IN 列表
SELECT * FROM stream WHERE deviceType IN ('temperature', 'humidity', 'pressure')

-- NOT IN
SELECT * FROM stream WHERE deviceId NOT IN ('test001', 'test002')
1
2
3
4
5
6
7
8

# 4. 模式匹配

-- LIKE 模式匹配
SELECT * FROM stream WHERE deviceId LIKE 'sensor%'  -- 以sensor开头
SELECT * FROM stream WHERE location LIKE '%room%'   -- 包含room
SELECT * FROM stream WHERE deviceId LIKE 'dev___'   -- dev后跟3个字符

-- 字符串函数
SELECT * FROM stream WHERE STARTSWITH(deviceId, 'sensor')
SELECT * FROM stream WHERE ENDSWITH(location, 'floor')
SELECT * FROM stream WHERE CONTAINS(description, 'temperature')
1
2
3
4
5
6
7
8
9

# 5. NULL 检查

-- NULL 检查
SELECT * FROM stream WHERE temperature IS NOT NULL
SELECT * FROM stream WHERE error_msg IS NULL

-- 空字符串检查
SELECT * FROM stream WHERE deviceId != ''
SELECT * FROM stream WHERE TRIM(location) != ''
1
2
3
4
5
6
7

# 6. 嵌套字段条件

-- 嵌套字段过滤
SELECT * FROM stream WHERE device.info.status = 'active'
SELECT * FROM stream WHERE sensor.data.temperature > 25
SELECT * FROM stream WHERE config.settings.enabled = true
1
2
3
4

# 过滤条件

# 1. 比较操作

-- 数值比较
WHERE temperature > 25
WHERE humidity BETWEEN 30 AND 70
WHERE pressure != 1013.25

-- 字符串比较 
WHERE deviceId = 'sensor001'
WHERE status IN ('active', 'online')
WHERE location LIKE 'building_%'
1
2
3
4
5
6
7
8
9

# 2. 逻辑操作

-- 逻辑组合
WHERE temperature > 25 AND humidity < 60
WHERE status = 'active' OR status = 'standby'
WHERE NOT (temperature < 0)

-- 空值检查
WHERE temperature IS NOT NULL
WHERE deviceId IS NULL
1
2
3
4
5
6
7
8

# 3. 复杂条件

-- 嵌套条件
WHERE (temperature > 30 AND humidity > 80) 
   OR (temperature < 0 AND pressure > 1020)

-- 函数条件
WHERE ABS(temperature - 25) > 5
WHERE LENGTH(deviceId) > 10
1
2
3
4
5
6
7

# 支持的操作符

操作符 说明 示例
= 等于 temperature = 25
!=, <> 不等于 status != 'offline'
>, >= 大于、大于等于 humidity > 50
<, <= 小于、小于等于 pressure <= 1000
BETWEEN 范围 temperature BETWEEN 20 AND 30
IN 包含 status IN ('on', 'off')
LIKE 模式匹配 name LIKE 'sensor%'
IS NULL 空值检查 value IS NULL
AND 逻辑与 temp > 20 AND humid < 60
OR 逻辑或 status = 'on' OR status = 'ready'
NOT 逻辑非 NOT (temperature < 0)

# GROUP BY 子句

GROUP BY子句用于数据分组和窗口定义,是流处理中的核心功能。

# 基本语法

SELECT aggregate_function(column)
FROM stream
GROUP BY grouping_columns [, window_function]
1
2
3

# 分组类型

# 1. 字段分组

-- 单字段分�?
SELECT deviceId, COUNT(*) FROM stream GROUP BY deviceId

-- 多字段分�?
SELECT deviceId, location, AVG(temperature) 
FROM stream 
GROUP BY deviceId, location

-- 表达式分�?
SELECT HOUR(timestamp) as hour, COUNT(*) 
FROM stream 
GROUP BY HOUR(timestamp)
1
2
3
4
5
6
7
8
9
10
11
12

# 2. 窗口分组

# 滚动窗口 (Tumbling Window)
-- 时间滚动窗口
SELECT COUNT(*) FROM stream GROUP BY TumblingWindow('5s')
SELECT AVG(temperature) FROM stream GROUP BY TumblingWindow('1m')
SELECT SUM(value) FROM stream GROUP BY TumblingWindow('1h')

-- 带字段分组的滚动窗口
SELECT deviceId, AVG(temperature) 
FROM stream 
GROUP BY deviceId, TumblingWindow('5s')
1
2
3
4
5
6
7
8
9
# 滑动窗口 (Sliding Window)
-- 滑动窗口:窗口大小,滑动间隔
SELECT AVG(temperature) FROM stream GROUP BY SlidingWindow('30s', '10s')
SELECT MAX(pressure) FROM stream GROUP BY SlidingWindow('1m', '30s')

-- 带分组的滑动窗口
SELECT deviceId, AVG(temperature) 
FROM stream 
GROUP BY deviceId, SlidingWindow('30s', '10s')
1
2
3
4
5
6
7
8
# 计数窗口 (Counting Window)
-- 每N条记录触发一�?
SELECT COUNT(*) FROM stream GROUP BY CountingWindow(100)
SELECT AVG(value) FROM stream GROUP BY CountingWindow(50)

-- 带分组的计数窗口
SELECT deviceId, COUNT(*) 
FROM stream 
GROUP BY deviceId, CountingWindow(10)
1
2
3
4
5
6
7
8
# 会话窗口 (Session Window)
-- 会话超时窗口
SELECT userId, COUNT(*) FROM stream GROUP BY userId, SessionWindow('5m')
SELECT deviceId, SUM(events) FROM stream GROUP BY deviceId, SessionWindow('30s')
1
2
3

# 窗口函数

在GROUP BY中可以使用窗口相关的函数�?

SELECT deviceId,
       COUNT(*) as event_count,
       window_start() as start_time,
       window_end() as end_time,
       window_duration() as duration
FROM stream 
GROUP BY deviceId, TumblingWindow('5s')
1
2
3
4
5
6
7

# 分组字段

-- 单字段分�?
GROUP BY deviceId

-- 多字段分�? 
GROUP BY deviceId, location, status

-- 表达式分�?
GROUP BY FLOOR(temperature / 10) * 10  -- 按温度区间分�?
1
2
3
4
5
6
7
8

# 窗口函数

# 1. 滚动窗口 (TumblingWindow)

-- 基本语法
GROUP BY deviceId, TumblingWindow('5m')

-- 支持的时间单位
GROUP BY TumblingWindow('30s')   -- 30秒
GROUP BY TumblingWindow('5m')    -- 5分钟  
GROUP BY TumblingWindow('1h')    -- 1小时
GROUP BY TumblingWindow('1d')    -- 1天
1
2
3
4
5
6
7
8

# 2. 滑动窗口 (SlidingWindow)

-- 基本语法: SlidingWindow(窗口大小, 滑动间隔)
GROUP BY deviceId, SlidingWindow('10m', '2m')

-- 示例
GROUP BY SlidingWindow('1h', '15m')    -- 1小时窗口,每15分钟滑动
GROUP BY SlidingWindow('30s', '5s')    -- 30秒窗口,每5秒滑动
1
2
3
4
5
6

# 3. 计数窗口 (CountingWindow)

-- 基本语法
GROUP BY deviceId, CountingWindow(100)   -- 每100条数据

-- 示例  
GROUP BY CountingWindow(50)     -- 每50条数据触发
GROUP BY CountingWindow(1000)   -- 每1000条数据触发
1
2
3
4
5
6

# 4. 会话窗口 (SessionWindow)

-- 基本语法
GROUP BY user_id, SessionWindow('5m')    -- 5分钟超时

-- 示例
GROUP BY device_id, SessionWindow('30s') -- 30秒无数据则关闭会话
GROUP BY session_key, SessionWindow('10m') -- 10分钟会话超时
1
2
3
4
5
6

# HAVING 子句

HAVING子句用于过滤聚合结果,类似于WHERE但作用于GROUP BY之后。

# 基本语法

SELECT aggregate_function(column)
FROM stream
GROUP BY grouping_columns
HAVING aggregate_condition
1
2
3
4

# 使用示例

# 1. 聚合函数过滤

-- 过滤平均温度
SELECT deviceId, AVG(temperature) as avg_temp
FROM stream
GROUP BY deviceId, TumblingWindow('5s')
HAVING AVG(temperature) > 25

-- 过滤计数
SELECT location, COUNT(*) as event_count
FROM stream
GROUP BY location, TumblingWindow('1m')
HAVING COUNT(*) >= 10

-- 多个聚合条件
SELECT deviceId, AVG(temperature), MAX(humidity)
FROM stream
GROUP BY deviceId, TumblingWindow('5s')
HAVING AVG(temperature) > 20 AND MAX(humidity) < 80
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# 2. 复合条件

-- 复杂的HAVING条件
SELECT deviceType, COUNT(*) as count, AVG(value) as avg_value
FROM stream
GROUP BY deviceType, TumblingWindow('1m')
HAVING COUNT(*) > 5 AND (AVG(value) > 100 OR MAX(value) > 500)
1
2
3
4
5

# WHERE vs HAVING

-- 正确:WHERE过滤原始数据,HAVING过滤聚合结果
SELECT deviceId, AVG(temperature)
FROM stream
WHERE temperature > 0          -- 过滤原始数据
GROUP BY deviceId, TumblingWindow('5s')
HAVING AVG(temperature) > 25    -- 过滤聚合结果

-- 错误:在HAVING中过滤原始字段
SELECT deviceId, AVG(temperature)
FROM stream
GROUP BY deviceId, TumblingWindow('5s')
HAVING deviceId = 'sensor001'   -- 应该在WHERE中
1
2
3
4
5
6
7
8
9
10
11
12

用于过滤聚合结果。

-- 基本用法
SELECT deviceId, AVG(temperature) as avg_temp
FROM stream  
GROUP BY deviceId, TumblingWindow('5m')
HAVING avg_temp > 25

-- 复杂条件
SELECT deviceId, COUNT(*) as count, AVG(temperature) as avg_temp
FROM stream
GROUP BY deviceId, TumblingWindow('5m')  
HAVING count > 10 AND avg_temp BETWEEN 20 AND 30

-- 使用聚合函数
SELECT location, AVG(temperature) as avg_temp
FROM stream
GROUP BY location, TumblingWindow('10m')
HAVING AVG(temperature) > 25 AND COUNT(*) >= 5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

# ORDER BY 子句

注意

ORDER BY在流处理中支持有限,主要用于窗口结果排序。

-- 按聚合结果排序(在窗口内)
SELECT deviceId, AVG(temperature) as avg_temp
FROM stream
GROUP BY deviceId, TumblingWindow('5m')
ORDER BY avg_temp DESC

-- 多字段排序
ORDER BY avg_temp DESC, deviceId ASC
1
2
3
4
5
6
7
8

# LIMIT 子句

LIMIT子句限制查询结果的数量,在流处理中通常用于限制窗口输出的记录数。

# 基本语法

SELECT columns FROM stream [WHERE condition] LIMIT count
1

# 使用示例

-- 限制结果数量
SELECT * FROM stream LIMIT 100

-- 与窗口结合使用
SELECT deviceId, temperature
FROM stream
GROUP BY TumblingWindow('5s')
LIMIT 10

-- 获取最新的N条记录
SELECT deviceId, temperature, timestamp
FROM stream
WHERE deviceId = 'sensor001'
ORDER BY timestamp DESC
LIMIT 5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 注意事项

  • LIMIT在流处理中主要用于控制输出量
  • 对于聚合查询,LIMIT限制的是聚合结果的数量
  • 建议合理设置LIMIT以避免内存压力

限制输出结果数量。

-- 基本用法
SELECT deviceId, temperature FROM stream LIMIT 10

-- 与窗口结合
SELECT deviceId, AVG(temperature) as avg_temp
FROM stream  
GROUP BY deviceId, TumblingWindow('5m')
LIMIT 5   -- 每个窗口最多5个结果
1
2
3
4
5
6
7
8

# WITH 子句

WITH子句用于指定查询的配置选项,主要用于配置事件时间窗口的时间戳字段和时间单位。

# 基本语法

SELECT columns FROM stream
[WHERE condition]
[GROUP BY grouping]
WITH (option = value [, ...])
1
2
3
4

# 核心作用

WITH子句的主要作用是将窗口从处理时间模式切换到事件时间模式:

  • 不指定 WITH (TIMESTAMP=...):使用处理时间(默认)

    • 窗口基于数据到达系统的时间划分
    • 不管数据中的时间字段是什么值
  • 指定 WITH (TIMESTAMP='field_name'):使用事件时间

    • 窗口基于数据中指定字段的时间值划分
    • 即使数据延迟到达,也能正确统计到对应窗口

# 支持的选项

# 1. TIMESTAMP - 时间戳字段配置

作用:指定数据中用于事件时间的字段名,启用事件时间窗口模式。

语法:

WITH (TIMESTAMP = 'field_name')
1

示例:

-- 使用 order_time 字段作为事件时间
SELECT COUNT(*) as order_count
FROM stream
GROUP BY TumblingWindow('5m')
WITH (TIMESTAMP = 'order_time')

-- 使用 event_time 字段作为事件时间
SELECT deviceId, AVG(temperature)
FROM stream
GROUP BY deviceId, TumblingWindow('1m')
WITH (TIMESTAMP = 'event_time')

-- 使用 timestamp 字段作为事件时间
SELECT COUNT(*) FROM stream
GROUP BY SlidingWindow('30s', '10s')
WITH (TIMESTAMP = 'timestamp')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

重要说明:

  • 指定 TIMESTAMP 后,窗口将基于该字段的值来划分窗口
  • 如果数据中没有该字段,或字段值为空,将使用系统当前时间作为回退
  • 字段值可以是 time.Time 类型或整数类型(需配合 TIMEUNIT)

# 2. TIMEUNIT - 时间单位配置

作用:当时间戳字段是整数类型(如 Unix 时间戳)时,指定时间单位以正确解析时间戳。

语法:

WITH (TIMEUNIT = 'unit')
1

支持的时间单位:

单位值 说明 示例
'ns' 纳秒 纳秒级时间戳
'ms' 毫秒(默认) Unix 时间戳(毫秒)
'ss' 秒 Unix 时间戳(秒)
'mi' 分钟 分钟级时间戳
'hh' 小时 小时级时间戳
'dd' 天 天级时间戳

示例:

-- 时间戳字段是毫秒级整数(如 1704067200000)
SELECT COUNT(*) FROM stream
GROUP BY TumblingWindow('5m')
WITH (TIMESTAMP = 'event_time', TIMEUNIT = 'ms')

-- 时间戳字段是秒级整数(如 1704067200)
SELECT AVG(temperature) FROM stream
GROUP BY TumblingWindow('1m')
WITH (TIMESTAMP = 'timestamp', TIMEUNIT = 'ss')
1
2
3
4
5
6
7
8
9

重要说明:

  • 如果时间戳字段是 time.Time 类型,不需要指定 TIMEUNIT
  • 如果时间戳字段是整数类型(int64),必须指定 TIMEUNIT
  • 默认单位为 'ms'(毫秒)

# 3. MAXOUTOFORDERNESS - 最大乱序时间配置

作用:设置允许的最大乱序时间,用于计算 Watermark。Watermark = max(event_time) - MaxOutOfOrderness。

语法:

WITH (MAXOUTOFORDERNESS = 'duration')
1

支持的时长格式:

  • '5s' - 5秒
  • '2m' - 2分钟
  • '1h' - 1小时
  • '500ms' - 500毫秒
  • 支持 Go 的 time.ParseDuration 格式

示例:

-- 允许5秒的乱序数据
SELECT COUNT(*) FROM stream
GROUP BY TumblingWindow('5m')
WITH (TIMESTAMP = 'event_time', MAXOUTOFORDERNESS = '5s')

-- 允许2分钟的乱序数据
SELECT AVG(temperature) FROM stream
GROUP BY SlidingWindow('30s', '10s')
WITH (TIMESTAMP = 'timestamp', MAXOUTOFORDERNESS = '2m')
1
2
3
4
5
6
7
8
9

重要说明:

  • 仅用于事件时间窗口(需要指定 TIMESTAMP)
  • 影响 Watermark 的计算,从而影响窗口触发时机
  • 默认值为 0(不允许乱序)
  • 较大的值会延迟窗口触发,但能容忍更多乱序数据

工作原理:

  • Watermark = max(event_time) - MaxOutOfOrderness
  • 窗口在 watermark >= window_end 时触发
  • 例如:MaxOutOfOrderness = 5s,当最大事件时间是 10:10 时,Watermark = 10:05
  • 窗口 [10:00 - 10:05) 会在 Watermark >= 10:05 时触发(实际可能是 10:10)

# 4. ALLOWEDLATENESS - 允许延迟时间配置

作用:设置窗口触发后还能接受多长时间的延迟数据。窗口触发后保持开放,直到 watermark >= window_end + AllowedLateness。

语法:

WITH (ALLOWEDLATENESS = 'duration')
1

支持的时长格式:

  • '2s' - 2秒
  • '1m' - 1分钟
  • '30s' - 30秒
  • 支持 Go 的 time.ParseDuration 格式

示例:

-- 窗口触发后还能接受2秒的延迟数据
SELECT COUNT(*) FROM stream
GROUP BY TumblingWindow('5m')
WITH (TIMESTAMP = 'event_time', ALLOWEDLATENESS = '2s')

-- 窗口触发后还能接受1分钟的延迟数据
SELECT AVG(temperature) FROM stream
GROUP BY TumblingWindow('1m')
WITH (TIMESTAMP = 'timestamp', ALLOWEDLATENESS = '1m')
1
2
3
4
5
6
7
8
9

重要说明:

  • 仅用于事件时间窗口(需要指定 TIMESTAMP)
  • 窗口触发后,会保持开放直到 watermark >= window_end + AllowedLateness
  • 在这段时间内到达的延迟数据会触发窗口的延迟更新(窗口再次触发)
  • 默认值为 0(窗口触发后立即关闭,不接受延迟数据)
  • 主要用于需要更新已触发窗口结果的场景

工作原理:

  • 窗口 [10:00 - 10:05) 在 watermark >= 10:05 时触发
  • 如果 AllowedLateness = 2s,窗口保持开放直到 watermark >= 10:07
  • 延迟数据(事件时间在 [10:00 - 10:05) 内)在 watermark < 10:07 时到达,会触发窗口的延迟更新
  • 窗口会再次触发,输出包含延迟数据的更新结果
  • 超过 AllowedLateness 后,窗口关闭,延迟数据被忽略

# 5. IDLETIMEOUT - 空闲超时配置

作用:设置数据源空闲超时时间。当数据源空闲(无新数据到达)超过该时间时,Watermark 会基于处理时间推进,确保窗口能够关闭。

语法:

WITH (IDLETIMEOUT = 'duration')
1

支持的时长格式:

  • '5s' - 5秒
  • '2m' - 2分钟
  • '1h' - 1小时
  • '500ms' - 500毫秒
  • 支持 Go 的 time.ParseDuration 格式

示例:

-- 配置空闲超时:5秒无数据,基于处理时间推进watermark
SELECT COUNT(*) FROM stream
GROUP BY TumblingWindow('5m')
WITH (TIMESTAMP = 'event_time', IDLETIMEOUT = '5s')

-- 配置空闲超时:2分钟无数据,基于处理时间推进watermark
SELECT AVG(temperature) FROM stream
GROUP BY TumblingWindow('1m')
WITH (TIMESTAMP = 'timestamp', IDLETIMEOUT = '2m')
1
2
3
4
5
6
7
8
9

重要说明:

  • 仅用于事件时间窗口(需要指定 TIMESTAMP)
  • 默认值为 0(禁用,不启用 Idle Source 机制)
  • 当数据源空闲超过 IdleTimeout 时,Watermark 会基于处理时间推进
  • 确保窗口能够最终关闭,防止内存泄漏
  • 主要用于数据源可能停止发送数据的场景

工作原理:

  • 正常情况:Watermark 基于事件时间更新(Watermark = max(event_time) - MaxOutOfOrderness)
  • 数据源空闲时:如果 timeSinceLastEvent > IdleTimeout,Watermark 基于处理时间推进(Watermark = currentProcessingTime - MaxOutOfOrderness)
  • 窗口关闭:Watermark 推进后,满足条件的窗口会触发并关闭

示例场景:

时间轴:
10:00 → 数据A (eventTime=10:00)
10:01 → 数据B (eventTime=10:01)
10:02 → 数据C (eventTime=10:02)
10:03 → 没有新数据...
10:04 → 没有新数据...
10:05 → 没有新数据...

窗口1: [10:00, 10:02)

当前状态(IdleTimeout未启用):
- maxEventTime = 10:02(最后一条数据)
- watermark = 10:02 - 1秒 = 10:01
- 窗口1无法触发(watermark < 10:02)
- 窗口永远不会关闭 ❌

当前状态(IdleTimeout=2s启用):
- maxEventTime = 10:02
- 10:05时,timeSinceLastEvent = 3秒 > 2秒(IdleTimeout)
- watermark = 当前处理时间(10:05) - 1秒 = 10:04
- 窗口1可以触发(watermark >= 10:02)✅
- 窗口能够关闭 ✅
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

# 完整示例

# 示例1:订单流统计(事件时间)

-- 订单流:使用 order_time 作为事件时间
-- 即使订单在 11:00 才到达,只要 order_time 是 10:03,就会统计到 10:00~10:05 的窗口
SELECT 
    COUNT(*) as order_count,
    SUM(amount) as total_amount,
    AVG(amount) as avg_amount
FROM stream
GROUP BY TumblingWindow('5m')
WITH (TIMESTAMP = 'order_time')
1
2
3
4
5
6
7
8
9

# 示例2:订单流统计(处理时间)

-- 订单流:使用处理时间
-- 在 11:00~11:05 到达的所有订单,不管 order_time 是多少,都统计到 11:00~11:05 的窗口
SELECT 
    COUNT(*) as order_count,
    SUM(amount) as total_amount
FROM stream
GROUP BY TumblingWindow('5m')
-- 不指定 WITH (TIMESTAMP=...),默认使用处理时间
1
2
3
4
5
6
7
8

# 配置项

配置项 类型 说明 默认值 必需
TIMESTAMP string 事件时间字段名 无(使用处理时间) 否
TIMEUNIT string 时间单位(仅整数时间戳需要) 'ms' 否
MAXOUTOFORDERNESS string 最大乱序时间(仅事件时间窗口) '0s' 否
ALLOWEDLATENESS string 允许延迟时间(仅事件时间窗口) '0s' 否
IDLETIMEOUT string 空闲超时时间(仅事件时间窗口) '0s'(禁用) 否

# 配置项说明

# MaxOutOfOrderness vs AllowedLateness

这两个配置都用于处理延迟数据,但作用阶段不同:

特性 MaxOutOfOrderness AllowedLateness
作用阶段 窗口触发前 窗口触发后
影响对象 Watermark 计算 窗口关闭时间
目的 容忍乱序,延迟窗口触发 接受延迟数据,更新已触发窗口
计算公式 Watermark = max(event_time) - MaxOutOfOrderness 窗口关闭时间 = window_end + AllowedLateness
使用场景 数据可能乱序到达 窗口已触发,但仍有延迟数据可能到达

组合使用示例:

-- 同时配置最大乱序时间、允许延迟时间和空闲超时
SELECT COUNT(*) as order_count
FROM stream
GROUP BY TumblingWindow('5m')
WITH (
    TIMESTAMP = 'order_time',
    MAXOUTOFORDERNESS = '5s',  -- 容忍5秒的乱序
    ALLOWEDLATENESS = '2s',    -- 窗口触发后还能接受2秒的延迟数据
    IDLETIMEOUT = '5s'         -- 5秒无数据,基于处理时间推进watermark
)
1
2
3
4
5
6
7
8
9
10

工作流程:

  1. MaxOutOfOrderness = 5s:Watermark 延迟5秒推进,窗口在 watermark >= window_end 时触发
  2. AllowedLateness = 2s:窗口触发后,保持开放2秒,接受延迟数据并更新结果
  3. 超过 AllowedLateness 后,窗口关闭,延迟数据被忽略
  4. IdleTimeout = 5s:如果数据源空闲超过5秒,Watermark 基于处理时间推进,确保窗口能够关闭

三个配置的对比:

特性 MaxOutOfOrderness AllowedLateness IdleTimeout
作用阶段 窗口触发前 窗口触发后 数据源空闲时
影响对象 Watermark 计算 窗口关闭时间 Watermark 推进方式
目的 容忍乱序,延迟窗口触发 接受延迟数据,更新已触发窗口 确保窗口能够关闭,防止内存泄漏
计算公式 Watermark = max(event_time) - MaxOutOfOrderness 窗口关闭时间 = window_end + AllowedLateness Watermark = currentProcessingTime - MaxOutOfOrderness(当空闲时)
使用场景 数据可能乱序到达 窗口已触发,但仍有延迟数据可能到达 数据源可能停止发送数据

# 数据类型和常量

# 数值常量

SELECT 42, 3.14, -10.5, 1e6 FROM stream
1

# 字符串常量

SELECT 'hello', "world", 'it''s ok' FROM stream
1

# 布尔常量

SELECT true, false, temperature > 25 FROM stream  
1

# NULL值

SELECT NULL, temperature IS NULL FROM stream
1

# 表达式和操作符

# 算术操作符

操作符 说明 示例
+ 加法 temperature + 10
- 减法 temperature - 5
* 乘法 temperature * 1.8
/ 除法 total / count
% 取模 id % 10

# 字符串操作

-- 字符串连接
SELECT deviceId + '_' + status FROM stream
SELECT CONCAT(deviceId, '_', status) FROM stream

-- 字符串函数
SELECT UPPER(status), LOWER(location), LENGTH(deviceId) FROM stream
1
2
3
4
5
6

# 条件表达式

# CASE表达式

-- 简单CASE
SELECT deviceId,
       CASE status
           WHEN 'active' THEN 1
           WHEN 'inactive' THEN 0  
           ELSE -1
       END as status_code
FROM stream

-- 搜索CASE
SELECT deviceId,
       CASE 
           WHEN temperature > 30 THEN 'HOT'
           WHEN temperature > 20 THEN 'WARM'
           WHEN temperature > 10 THEN 'COOL'
           ELSE 'COLD'
       END as temp_category  
FROM stream
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 内置函数

# 聚合函数

函数 说明 示例
COUNT(*) 计数 COUNT(*)
SUM(expr) 求和 SUM(temperature)
AVG(expr) 平均值 AVG(temperature)
MIN(expr) 最小值 MIN(temperature)
MAX(expr) 最大值 MAX(temperature)
STDDEV(expr) 标准差 STDDEV(temperature)
MEDIAN(expr) 中位数 MEDIAN(temperature)

# 数学函数

函数 说明 示例
ABS(x) 绝对值 ABS(temperature)
ROUND(x, d) 四舍五入 ROUND(temperature, 2)
FLOOR(x) 向下取整 FLOOR(temperature)
CEIL(x) 向上取整 CEIL(temperature)
SQRT(x) 平方根 SQRT(area)
POWER(x, y) 幂运算 POWER(distance, 2)

# 字符串函数

函数 说明 示例
CONCAT(s1, s2, ...) 字符串连接 CONCAT(first, '_', last)
UPPER(s) 转大写 UPPER(status)
LOWER(s) 转小写 LOWER(deviceId)
LENGTH(s) 字符串长度 LENGTH(message)
SUBSTRING(s, start, len) 子字符串 SUBSTRING(deviceId, 1, 5)
TRIM(s) 去除空白 TRIM(name)

# 时间函数

函数 说明 示例
window_start() 窗口开始时间 window_start()
window_end() 窗口结束时间 window_end()
NOW() 当前时间 NOW()

# 类型转换函数

函数 说明 示例
CAST(expr AS type) 类型转换 CAST(temperature AS STRING)

# 完整示例

# 1. 基础查询

-- 简单过滤
SELECT deviceId, temperature, status 
FROM stream 
WHERE temperature > 25 AND status = 'active'
1
2
3
4

# 2. 聚合分析

-- 设备温度统计
SELECT deviceId,
       COUNT(*) as sample_count,
       AVG(temperature) as avg_temp,
       MIN(temperature) as min_temp,
       MAX(temperature) as max_temp,
       STDDEV(temperature) as temp_stddev
FROM stream
WHERE temperature IS NOT NULL
GROUP BY deviceId, TumblingWindow('5m')
HAVING sample_count >= 10
1
2
3
4
5
6
7
8
9
10
11

# 3. 复杂表达式

-- 温度异常检测
SELECT deviceId,
       temperature,
       ABS(temperature - AVG(temperature)) as deviation,
       CASE 
           WHEN ABS(temperature - AVG(temperature)) > 2 * STDDEV(temperature) 
           THEN 'ANOMALY'
           ELSE 'NORMAL'
       END as anomaly_status
FROM stream
GROUP BY deviceId, SlidingWindow('10m', '1m')
1
2
3
4
5
6
7
8
9
10
11

# 4. 多窗口分析

-- 多层级时间分析
SELECT deviceId,
       '1m' as window_type,
       AVG(temperature) as avg_temp,
       window_start() as start_time
FROM stream
GROUP BY deviceId, TumblingWindow('1m')

UNION ALL

SELECT deviceId,
       '5m' as window_type, 
       AVG(temperature) as avg_temp,
       window_start() as start_time
FROM stream  
GROUP BY deviceId, TumblingWindow('5m')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

注意

上述UNION ALL示例仅为语法展示,StreamSQL当前不支持UNION操作。

# 语法限制

# 不支持的特性

  • JOIN 操作(多表连接)
  • UNION 操作(结果合并)
  • 子查询 (嵌套SELECT)
  • INSERT/UPDATE/DELETE (数据修改)
  • CREATE TABLE (表定义)
  • 视图 (VIEW)
  • 存储过程 (PROCEDURE)
  • 触发器 (TRIGGER)

# 限制说明

  1. 单数据源:只支持单个数据流处理
  2. 无持久化:不支持数据持久化存储
  3. 无事务:不支持事务操作
  4. 内存限制:受限于单机内存容量

# 性能优化建议

# 1. WHERE子句优化

-- 好的实践:早期过滤
SELECT deviceId, AVG(temperature) 
FROM stream
WHERE temperature BETWEEN 0 AND 100  -- 先过滤异常数据
GROUP BY deviceId, TumblingWindow('5m')

-- 避免:复杂WHERE条件
WHERE UPPER(CONCAT(deviceId, status)) LIKE '%ACTIVE%'
1
2
3
4
5
6
7
8

# 2. 合理使用窗口

-- 好的实践:根据需求选择合适窗口
GROUP BY TumblingWindow('1m')    -- 需要精确周期统计
GROUP BY SlidingWindow('5m', '1m') -- 需要平滑分析

-- 避免:过小的窗口间隔
GROUP BY SlidingWindow('1h', '1s')   -- 计算开销巨大
1
2
3
4
5
6

# 3. 表达式优化

-- 好的实践:简单表达式
SELECT temperature * 1.8 + 32 as fahrenheit

-- 避免:复杂嵌套表达式  
SELECT POWER(SQRT(ABS(temperature - AVG(temperature))), 2)
1
2
3
4
5
在 GitHub 上编辑此页 (opens new window)
上次更新: 2025/11/14, 10:47:28
核心概念
API参考

← 核心概念 API参考→

Theme by Vdoing | Copyright © 2023-2025 RuleGo Team | Apache 2.0 License

  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式