Time Window in PostgreSQL
1. 引子:一个让人后悔的架构决定
几年前,团队为了做一个「近 7 天用户活跃趋势」的看板,搭了这样一套架构:
PostgreSQL/MongoDB/MySQL → Kafka → Flink → ClickHouse → Grafana
运维了三个月。Flink 任务隔三差五挂掉,ClickHouse 版本升级踩了无数坑,告警半夜叫人。最后发现——整个看板的核心逻辑,其实是这一条 SQL:
SELECT
date_trunc('day', created_at) AS day,
COUNT(DISTINCT user_id) AS dau
FROM user_events
WHERE created_at >= NOW() - INTERVAL '7 days'
GROUP BY 1
ORDER BY 1;
PostgreSQL 跑它,50ms。
这不是说 Flink 没用。它在真正的流处理场景——毫秒级响应、无界数据流、复杂 CEP——是不可替代的。但很多中小团队的「时序分析需求」,根本到不了那个量级。
2. Flink vs PostgreSQL 窗口映射表
如果你有 Spark 或 Flink 背景,这张映射表会让后续内容一眼看懂——PostgreSQL 的窗口函数几乎是这些框架在单机上的平替。
| Flink 概念 | PostgreSQL 实现 | 业务场景 | 支持情况 |
|---|---|---|---|
| Tumbling Window | date_trunc / time_bucket | 每 5 分钟统计订单量 | ✓ 原生支持 |
| Sliding Window | RANGE BETWEEN INTERVAL ... PRECEDING | 过去 30 分钟平均 CPU | ✓ PG 11+ |
| Session Window | LAG + SUM() OVER 手动模拟 | 用户浏览会话识别 | ✓ 需 3 步实现 |
| State Store | Materialized View | 仪表盘预聚合 | ✓ 定时刷新 |
| Watermark | ❌ 不支持 | 乱序事件处理 | ✗ 需应用层处理 |
PostgreSQL 不能替代真正的流处理系统——没有 watermark、没有 exactly-once stream、没有分布式流状态。但对于秒级 dashboard、用户行为分析、监控指标、IoT 数据,它已经足够强。
3. 统一案例背景与数据准备
全文所有 SQL 跑在同一套表上,读者可直接复现。
建表语句:
-- 订单表
CREATE TABLE orders (
order_id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL,
amount NUMERIC(12, 2) NOT NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- 用户行为事件表
CREATE TABLE user_events (
event_id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL,
page TEXT,
action TEXT,
event_time TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- 服务器监控指标表(主案例)
CREATE TABLE server_metrics (
id BIGSERIAL PRIMARY KEY,
server_id TEXT NOT NULL,
cpu NUMERIC(5, 2),
mem NUMERIC(5, 2),
ts TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
模拟数据:
-- 10 万条订单
INSERT INTO orders (user_id, amount, created_at)
SELECT
(random() * 9999 + 1)::BIGINT,
(random() * 9900 + 100)::NUMERIC(12,2),
NOW() - (random() * INTERVAL '90 days')
FROM generate_series(1, 100000);
-- 50 万条用户事件
INSERT INTO user_events (user_id, page, action, event_time)
SELECT
(random() * 9999 + 1)::BIGINT,
(ARRAY['/home','/product','/cart','/checkout'])[ceil(random()*4)::INT],
(ARRAY['click','view','scroll','submit'])[ceil(random()*4)::INT],
NOW() - (random() * INTERVAL '30 days')
FROM generate_series(1, 500000);
-- 30 天服务器监控数据(每分钟一条,5 台服务器,含随机异常峰值)
INSERT INTO server_metrics (server_id, cpu, mem, ts)
SELECT
'server-' || (i % 5 + 1)::TEXT,
(40 + random() * 50 + CASE WHEN random() > 0.95 THEN 30 ELSE 0 END)::NUMERIC(5,2),
(50 + random() * 40)::NUMERIC(5,2),
NOW() - (43200 - i) * INTERVAL '1 minute'
FROM generate_series(1, 43200) AS i;
不要用裸 TIMESTAMP。如果服务器在北京、用户在旧金山,某天改了服务器时区,TIMESTAMP 列里的历史数据含义就变了。TIMESTAMPTZ 统一存 UTC,展示时按时区转换,不会有这个问题。跨国系统尤其如此。
4. 时序基础工具箱
date_trunc —— 时序分析的时间转换函数。
将任意精度的时间戳「对齐」到指定粒度。支持:
microseconds、
milliseconds、
second、
minute、
hour、
day、
week、
month、
quarter、
year。
SELECT
date_trunc('hour', created_at) AS hour,
COUNT(*) AS order_count,
SUM(amount) AS gmv
FROM orders
WHERE created_at >= NOW() - INTERVAL '7 days'
GROUP BY 1
ORDER BY 1;
generate_series —— 补零神器(折线图必备)
这是被大多数时序分析中忽略的技巧。如果某个小时没有订单,上面的查询会直接跳过那个时间点——前端渲染折线图就会出现断裂。正确做法:先生成完整时间轴,再 LEFT JOIN 实际数据。
这是一个补全时间点的样例:
WITH time_spine AS (
SELECT generate_series(
date_trunc('hour', NOW() - INTERVAL '7 days'),
date_trunc('hour', NOW()),
INTERVAL '1 hour'
) AS hour
),
hourly_stats AS (
SELECT
date_trunc('hour', created_at) AS hour,
COUNT(*) AS order_count,
SUM(amount) AS gmv
FROM orders
WHERE created_at >= NOW() - INTERVAL '7 days'
GROUP BY 1
)
SELECT
t.hour,
COALESCE(s.order_count, 0) AS order_count,
COALESCE(s.gmv, 0) AS gmv
FROM time_spine t
LEFT JOIN hourly_stats s USING (hour)
ORDER BY t.hour;
这条 SQL 保证每个小时都有一行,没数据的时间点补 0。
5. 三种时间窗口模式
滚动窗口(Tumbling Window)
固定时间桶,互不重叠,实现最简单。场景:大促期间每 15 分钟统计下单量,用于监控大屏。
SELECT
TO_TIMESTAMP(
FLOOR(EXTRACT(EPOCH FROM created_at) / 900) * 900
) AS window_start,
COUNT(*) AS order_count,
SUM(amount) AS gmv
FROM orders
WHERE created_at >= NOW() - INTERVAL '3 hours'
GROUP BY 1
ORDER BY 1;
如果用了 TimescaleDB,更简洁:
SELECT time_bucket('15 minutes', created_at), COUNT(*), SUM(amount)
FROM orders GROUP BY 1 ORDER BY 1;
所以该用工具用对工具,没必要为了什么架构简洁给自己找罪受。 理解和可解读是第一性原理。
滑动窗口(Sliding Window)
窗口随时间滑动,每个时间点都有一个「往前看 N 天 / N 小时」的聚合值。这是量化交易均线、监控移动平均的基础。
在这里必须搞清楚 ROWS vs RANGE 的区别——这是 99% 的教程一笔带过的地方:
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
往前数 6 行,不管时间间隔多久。数据采集不均匀时会严重失真。
RANGE BETWEEN INTERVAL '7 days' PRECEDING AND CURRENT ROW
过去 7 天内的所有行,PG 11+ 支持 interval。稀疏事件必须用这个。
判断原则:
- 数据是规则采样(每分钟一条监控)→ 用 ROWS;
- 数据是稀疏事件(订单、点击)→ 必须用 RANGE。
以一个 7 日 GMV 移动平均为例:
WITH daily_gmv AS (
SELECT
date_trunc('day', created_at) AS day,
SUM(amount) AS gmv
FROM orders
WHERE created_at >= NOW() - INTERVAL '60 days'
GROUP BY 1
)
SELECT
day,
gmv,
AVG(gmv) OVER (
ORDER BY day
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) AS gmv_ma7,
AVG(gmv) OVER (
ORDER BY day
ROWS BETWEEN 29 PRECEDING AND CURRENT ROW
) AS gmv_ma30
FROM daily_gmv
ORDER BY day;
以一个过去 30 分钟平均 CPU(稀疏采样)为例:
SELECT
ts,
server_id,
cpu,
AVG(cpu) OVER (
PARTITION BY server_id
ORDER BY ts
RANGE BETWEEN INTERVAL '30 minutes' PRECEDING AND CURRENT ROW
) AS cpu_ma_30m
FROM server_metrics
WHERE ts >= NOW() - INTERVAL '3 hours'
ORDER BY server_id, ts;
会话窗口(Session Window)
PostgreSQL 原生不支持 Session Window,但可以用纯 SQL 完整模拟。这是 Flink 里内置的能力,在 PG 里用三步走实现。
场景:识别用户浏览会话——超过 30 分钟没有操作,视为新会话开始。
用 LAG() 计算每个事件与上一事件的时间间隔:
SELECT
user_id,
event_time,
LAG(event_time) OVER (
PARTITION BY user_id
ORDER BY event_time
) AS prev_event_time,
event_time - LAG(event_time) OVER (
PARTITION BY user_id ORDER BY event_time
) AS gap
FROM user_events
ORDER BY user_id, event_time;
标记会话边界:间隔 > 30 分钟或首个事件,标记 is_new_session = 1
WITH gaps AS (
SELECT
user_id, event_time, page, action,
CASE
WHEN event_time - LAG(event_time) OVER (
PARTITION BY user_id ORDER BY event_time
) > INTERVAL '30 minutes'
OR LAG(event_time) OVER (
PARTITION BY user_id ORDER BY event_time
) IS NULL
THEN 1 ELSE 0
END AS is_new_session
FROM user_events
)
SUM() OVER 累积生成 session_id,再聚合得到每个会话的完整信息
WITH gaps AS (
SELECT
user_id, event_time, page, action,
CASE
WHEN event_time - LAG(event_time) OVER (
PARTITION BY user_id ORDER BY event_time
) > INTERVAL '30 minutes'
OR LAG(event_time) OVER (
PARTITION BY user_id ORDER BY event_time
) IS NULL
THEN 1 ELSE 0
END AS is_new_session
FROM user_events
),
sessions AS (
SELECT
user_id, event_time, page, action,
SUM(is_new_session) OVER (
PARTITION BY user_id
ORDER BY event_time
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS session_id
FROM gaps
)
SELECT
user_id,
session_id,
MIN(event_time) AS session_start,
MAX(event_time) AS session_end,
MAX(event_time) - MIN(event_time) AS session_duration,
COUNT(*) AS event_count
FROM sessions
GROUP BY user_id, session_id
ORDER BY user_id, session_id;
这段 SQL 的逻辑链路:
→
原始事件流(按 user_id + event_time 排序)
↓
LAG() 计算每个事件与上一事件的时间差
↓
时间差 > 30min 或首个事件 → is_new_session = 1,否则 = 0
↓
SUM(is_new_session) OVER (...) 累积求和 → 生成递增的 session_id
↓
按 user_id + session_id 聚合 → 每个会话的开始 / 结束 / 时长 / 事件数
Flink 里内置的 Session Window,在 PG 里用约 40 行 SQL 完整模拟,避免了任何循环和存储过程,纯窗口函数向量化实现。
6. 常用时序分析模式
同比和环比
WITH daily AS (
SELECT
date_trunc('day', created_at) AS day,
SUM(amount) AS gmv
FROM orders
GROUP BY 1
)
SELECT
day, gmv,
LAG(gmv, 1) OVER (ORDER BY day) AS gmv_yesterday,
LAG(gmv, 7) OVER (ORDER BY day) AS gmv_last_week,
ROUND(
(gmv - LAG(gmv, 1) OVER (ORDER BY day))
/ NULLIF(LAG(gmv, 1) OVER (ORDER BY day), 0) * 100, 2
) AS mom_pct, -- 日环比
ROUND(
(gmv - LAG(gmv, 7) OVER (ORDER BY day))
/ NULLIF(LAG(gmv, 7) OVER (ORDER BY day), 0) * 100, 2
) AS wow_pct -- 周同比
FROM daily
ORDER BY day;
NULLIF(x, 0) 防止除零错误——生产代码中必须加。
用户留存分析(Day1 / Day7 / Day30)
WITH first_day AS (
SELECT
user_id,
date_trunc('day', MIN(event_time)) AS cohort_day
FROM user_events
GROUP BY user_id
),
activity AS (
SELECT DISTINCT
user_id,
date_trunc('day', event_time) AS active_day
FROM user_events
)
SELECT
f.cohort_day,
COUNT(DISTINCT f.user_id) AS cohort_size,
COUNT(DISTINCT CASE WHEN a.active_day = f.cohort_day + INTERVAL '1 day' THEN a.user_id END) AS d1,
COUNT(DISTINCT CASE WHEN a.active_day = f.cohort_day + INTERVAL '7 days' THEN a.user_id END) AS d7,
COUNT(DISTINCT CASE WHEN a.active_day = f.cohort_day + INTERVAL '30 days' THEN a.user_id END) AS d30,
ROUND(
COUNT(DISTINCT CASE WHEN a.active_day = f.cohort_day + INTERVAL '1 day' THEN a.user_id END)
* 100.0 / NULLIF(COUNT(DISTINCT f.user_id), 0), 1
) AS d1_rate
FROM first_day f
LEFT JOIN activity a USING (user_id)
GROUP BY f.cohort_day
ORDER BY f.cohort_day;
7. 异常检测:统计方法替代硬编码阈值
规则阈值(「CPU > 90% 就告警」)的问题是不随数据分布变化。更健壮的方式是用 Z-Score:超过均值 ± 2σ 的点视为异常。
WITH stats AS (
SELECT
server_id,
AVG(cpu) AS cpu_mean,
STDDEV(cpu) AS cpu_stddev
FROM server_metrics
WHERE ts >= NOW() - INTERVAL '30 days'
GROUP BY server_id
),
labeled AS (
SELECT
m.ts, m.server_id, m.cpu,
ABS(m.cpu - s.cpu_mean) / NULLIF(s.cpu_stddev, 0) AS z_score
FROM server_metrics m
JOIN stats s USING (server_id)
WHERE m.ts >= NOW() - INTERVAL '24 hours'
)
SELECT
ts, server_id, cpu,
ROUND(z_score, 2) AS z_score,
CASE WHEN z_score > 2 THEN '⚠️ 异常' ELSE '正常' END AS status
FROM labeled
WHERE z_score > 2
ORDER BY z_score DESC;
进阶:连续 N 次异常才告警(减少误报)
WITH anomalies AS (
SELECT
ts, server_id, cpu,
CASE WHEN cpu > 90 THEN 1 ELSE 0 END AS is_anomaly
FROM server_metrics
WHERE ts >= NOW() - INTERVAL '24 hours'
),
streaks AS (
SELECT
ts, server_id, cpu, is_anomaly,
SUM(is_anomaly) OVER (
PARTITION BY server_id
ORDER BY ts
ROWS BETWEEN 4 PRECEDING AND CURRENT ROW
) AS streak_5
FROM anomalies
)
SELECT * FROM streaks
WHERE streak_5 = 5 -- 连续 5 次都异常,才是真正需要处理的
ORDER BY server_id, ts;
ROWS BETWEEN 4 PRECEDING AND CURRENT ROW 表示含当前行在内的最近 5 行。5 行异常标记之和等于 5,说明连续 5 次都超阈值——这才是真正的告警信号,而不是单点噪声。
8. 性能调优:工程视角
| 问题 | 方案 | 说明 |
|---|---|---|
| 全表扫描慢 | CREATE INDEX ON orders (created_at DESC) | 时序查询几乎都带时间过滤 |
| 索引体积过大 | 改用 BRIN 索引 | 写入有序时,体积是 B-Tree 的 ~1% |
| 数据量大 | 按月 PARTITION BY RANGE(ts) | 分区裁剪大幅减少扫描量 |
| 仪表盘重复计算 | 物化视图 + 定时刷新 | 模拟 Continuous Aggregate |
| 索引失效 | 避免在 WHERE 对 ts 做函数运算 | 见下方对比 |
BRIN vs B-Tree 索引
-- B-Tree(默认):体积大,支持任意顺序查询
CREATE INDEX ON server_metrics (ts);
-- BRIN:体积极小,适合写入有序的时序数据,时序场景首选
CREATE INDEX ON server_metrics USING BRIN (ts);
-- 组合索引:按 server_id 分区查询时更快
CREATE INDEX ON server_metrics (server_id, ts DESC);
BRIN 记录每个数据块的 min/max 值。时序数据按时间顺序写入,天然适合——索引体积可能只有 B-Tree 的 1%,查询扫描量同样大幅减少。
实际检索中,避免索引失效需要写明日期值:
-- ❌ 错误:对索引列做了函数运算,索引失效,全表扫描
WHERE date_trunc('day', created_at) = '2026-01-15'
-- ✅ 正确:范围查询,索引生效
WHERE created_at >= '2026-01-15' AND created_at < '2026-01-16'
还有一种工程上的实践是,按月分区表
CREATE TABLE orders_partitioned (
order_id BIGSERIAL,
user_id BIGINT NOT NULL,
amount NUMERIC(12, 2),
created_at TIMESTAMPTZ NOT NULL
) PARTITION BY RANGE (created_at);
CREATE TABLE orders_2024_01
PARTITION OF orders_partitioned
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
CREATE TABLE orders_2024_02
PARTITION OF orders_partitioned
FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');
查询时 PostgreSQL 自动只扫描相关分区——90 天的查询不会碰 2 年前的数据。
9. 横向对比:什么时候引入专用工具
PostgreSQL 的隐藏优势:OLTP + 分析同库。ClickHouse 和 Flink 需要从业务数据库同步数据,这意味着额外的延迟、额外的运维、额外的一致性风险。PostgreSQL 直接在业务数据上跑分析,是中小规模场景真正的杀手锏。
| 能力维度 | PostgreSQL | ClickHouse | Flink |
|---|---|---|---|
| OLTP / 事务写入 | ★★★ | ✗ | ✗ |
| 窗口函数灵活性 | ★★★ | ★★ | ★★★ |
| 实时流处理 | ★ | ★ | ★★★ |
| 聚合查询性能 | ★★ | ★★★ | ★★ |
| 运维复杂度 | 低 | 中 | 高 |
| 与业务库同源 | ✓ 同一个库 | ✗ 需要同步 | ✗ 需要同步 |
10. 何时升级到 TimescaleDB?
TimescaleDB 是 PostgreSQL 的扩展,不是替代品。原有 SQL 无需修改,加上它只会让查询更快。
- 写入频率持续超过 5 万行/秒
- 单表数据量超过 百亿行
- 需要自动数据压缩(时序数据压缩率可达 90%+)
- 需要自动数据过期(retention policy,老数据自动删除)
- 想用 time_bucket_gapfill(自动补零,比 generate_series 更优雅)
迁移只需一行,之后所有原有查询继续有效:
SELECT create_hypertable('server_metrics', 'ts');
11. 总结
本文覆盖的核心技术点:
- date_trunc + generate_series:时序对齐与补零,折线图不断裂的关键
- ROWS vs RANGE:滑动窗口两种语义,稀疏事件数据必须用 RANGE
- 三种窗口模式:滚动(date_trunc)、滑动(RANGE BETWEEN)、会话(LAG + SUM 模拟)
- 异常检测:Z-Score + 连续 N 次检测,比硬编码阈值健壮得多
- BRIN 索引:时序数据专属,体积是 B-Tree 的 ~1%
窗口函数是 SQL 里被低估最严重的功能。很多工程师绕了一大圈 Spark DataFrame、Flink DataStream,回头发现想要的结果一条 OVER (ORDER BY ts RANGE BETWEEN …) 就能给出来。
不是说大数据工具没价值,而是——在引入复杂架构之前,请先确认你真的需要它。