<< ..

Time Window in PostgreSQL

1. 引子:一个让人后悔的架构决定

几年前,团队为了做一个「近 7 天用户活跃趋势」的看板,搭了这样一套架构:

PostgreSQL/MongoDB/MySQL → Kafka → Flink → ClickHouse → Grafana

运维了三个月。Flink 任务隔三差五挂掉,ClickHouse 版本升级踩了无数坑,告警半夜叫人。最后发现——整个看板的核心逻辑,其实是这一条 SQL:

SELECT
    date_trunc('day', created_at) AS day,
    COUNT(DISTINCT user_id) AS dau
FROM user_events
WHERE created_at >= NOW() - INTERVAL '7 days'
GROUP BY 1
ORDER BY 1;

PostgreSQL 跑它,50ms。

这不是说 Flink 没用。它在真正的流处理场景——毫秒级响应、无界数据流、复杂 CEP——是不可替代的。但很多中小团队的「时序分析需求」,根本到不了那个量级。

如果你有 Spark 或 Flink 背景,这张映射表会让后续内容一眼看懂——PostgreSQL 的窗口函数几乎是这些框架在单机上的平替。

Flink 概念PostgreSQL 实现业务场景支持情况
Tumbling Windowdate_trunc / time_bucket每 5 分钟统计订单量✓ 原生支持
Sliding WindowRANGE BETWEEN INTERVAL ... PRECEDING过去 30 分钟平均 CPU✓ PG 11+
Session WindowLAG + SUM() OVER 手动模拟用户浏览会话识别✓ 需 3 步实现
State StoreMaterialized View仪表盘预聚合✓ 定时刷新
Watermark❌ 不支持乱序事件处理✗ 需应用层处理

PostgreSQL 不能替代真正的流处理系统——没有 watermark、没有 exactly-once stream、没有分布式流状态。但对于秒级 dashboard、用户行为分析、监控指标、IoT 数据,它已经足够强。

3. 统一案例背景与数据准备

全文所有 SQL 跑在同一套表上,读者可直接复现。

建表语句:

-- 订单表
CREATE TABLE orders (
    order_id    BIGSERIAL PRIMARY KEY,
    user_id     BIGINT NOT NULL,
    amount      NUMERIC(12, 2) NOT NULL,
    created_at  TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- 用户行为事件表
CREATE TABLE user_events (
    event_id    BIGSERIAL PRIMARY KEY,
    user_id     BIGINT NOT NULL,
    page        TEXT,
    action      TEXT,
    event_time  TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

-- 服务器监控指标表(主案例)
CREATE TABLE server_metrics (
    id          BIGSERIAL PRIMARY KEY,
    server_id   TEXT NOT NULL,
    cpu         NUMERIC(5, 2),
    mem         NUMERIC(5, 2),
    ts          TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

模拟数据:


-- 10 万条订单
INSERT INTO orders (user_id, amount, created_at)
SELECT
    (random() * 9999 + 1)::BIGINT,
    (random() * 9900 + 100)::NUMERIC(12,2),
    NOW() - (random() * INTERVAL '90 days')
FROM generate_series(1, 100000);

-- 50 万条用户事件
INSERT INTO user_events (user_id, page, action, event_time)
SELECT
    (random() * 9999 + 1)::BIGINT,
    (ARRAY['/home','/product','/cart','/checkout'])[ceil(random()*4)::INT],
    (ARRAY['click','view','scroll','submit'])[ceil(random()*4)::INT],
    NOW() - (random() * INTERVAL '30 days')
FROM generate_series(1, 500000);

-- 30 天服务器监控数据(每分钟一条,5 台服务器,含随机异常峰值)
INSERT INTO server_metrics (server_id, cpu, mem, ts)
SELECT
    'server-' || (i % 5 + 1)::TEXT,
    (40 + random() * 50 + CASE WHEN random() > 0.95 THEN 30 ELSE 0 END)::NUMERIC(5,2),
    (50 + random() * 40)::NUMERIC(5,2),
    NOW() - (43200 - i) * INTERVAL '1 minute'
FROM generate_series(1, 43200) AS i;

不要用裸 TIMESTAMP。如果服务器在北京、用户在旧金山,某天改了服务器时区,TIMESTAMP 列里的历史数据含义就变了。TIMESTAMPTZ 统一存 UTC,展示时按时区转换,不会有这个问题。跨国系统尤其如此。

4. 时序基础工具箱

date_trunc —— 时序分析的时间转换函数。

将任意精度的时间戳「对齐」到指定粒度。支持: microsecondsmillisecondssecondminutehourdayweekmonthquarteryear

SELECT
    date_trunc('hour', created_at) AS hour,
    COUNT(*)                        AS order_count,
    SUM(amount)                     AS gmv
FROM orders
WHERE created_at >= NOW() - INTERVAL '7 days'
GROUP BY 1
ORDER BY 1;

generate_series —— 补零神器(折线图必备)

这是被大多数时序分析中忽略的技巧。如果某个小时没有订单,上面的查询会直接跳过那个时间点——前端渲染折线图就会出现断裂。正确做法:先生成完整时间轴,再 LEFT JOIN 实际数据。

这是一个补全时间点的样例:

WITH time_spine AS (
    SELECT generate_series(
        date_trunc('hour', NOW() - INTERVAL '7 days'),
        date_trunc('hour', NOW()),
        INTERVAL '1 hour'
    ) AS hour
),
hourly_stats AS (
    SELECT
        date_trunc('hour', created_at) AS hour,
        COUNT(*)    AS order_count,
        SUM(amount) AS gmv
    FROM orders
    WHERE created_at >= NOW() - INTERVAL '7 days'
    GROUP BY 1
)
SELECT
    t.hour,
    COALESCE(s.order_count, 0) AS order_count,
    COALESCE(s.gmv, 0)         AS gmv
FROM time_spine t
LEFT JOIN hourly_stats s USING (hour)
ORDER BY t.hour;

这条 SQL 保证每个小时都有一行,没数据的时间点补 0。

5. 三种时间窗口模式

滚动窗口(Tumbling Window)

固定时间桶,互不重叠,实现最简单。场景:大促期间每 15 分钟统计下单量,用于监控大屏。

SELECT
    TO_TIMESTAMP(
        FLOOR(EXTRACT(EPOCH FROM created_at) / 900) * 900
    ) AS window_start,
    COUNT(*)    AS order_count,
    SUM(amount) AS gmv
FROM orders
WHERE created_at >= NOW() - INTERVAL '3 hours'
GROUP BY 1
ORDER BY 1;

如果用了 TimescaleDB,更简洁:

SELECT time_bucket('15 minutes', created_at), COUNT(*), SUM(amount)
FROM orders GROUP BY 1 ORDER BY 1;

所以该用工具用对工具,没必要为了什么架构简洁给自己找罪受。 理解和可解读是第一性原理。

滑动窗口(Sliding Window)

窗口随时间滑动,每个时间点都有一个「往前看 N 天 / N 小时」的聚合值。这是量化交易均线、监控移动平均的基础。

在这里必须搞清楚 ROWS vs RANGE 的区别——这是 99% 的教程一笔带过的地方:

ROWS —— 物理行数
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW

往前数 6 行,不管时间间隔多久。数据采集不均匀时会严重失真。

RANGE —— 逻辑时间范围
RANGE BETWEEN INTERVAL '7 days' PRECEDING AND CURRENT ROW

过去 7 天内的所有行,PG 11+ 支持 interval。稀疏事件必须用这个。

判断原则:

  1. 数据是规则采样(每分钟一条监控)→ 用 ROWS;
  2. 数据是稀疏事件(订单、点击)→ 必须用 RANGE。

以一个 7 日 GMV 移动平均为例:

WITH daily_gmv AS (
    SELECT
        date_trunc('day', created_at) AS day,
        SUM(amount) AS gmv
    FROM orders
    WHERE created_at >= NOW() - INTERVAL '60 days'
    GROUP BY 1
)
SELECT
    day,
    gmv,
    AVG(gmv) OVER (
        ORDER BY day
        ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
    ) AS gmv_ma7,
    AVG(gmv) OVER (
        ORDER BY day
        ROWS BETWEEN 29 PRECEDING AND CURRENT ROW
    ) AS gmv_ma30
FROM daily_gmv
ORDER BY day;

以一个过去 30 分钟平均 CPU(稀疏采样)为例:

SELECT
    ts,
    server_id,
    cpu,
    AVG(cpu) OVER (
        PARTITION BY server_id
        ORDER BY ts
        RANGE BETWEEN INTERVAL '30 minutes' PRECEDING AND CURRENT ROW
    ) AS cpu_ma_30m
FROM server_metrics
WHERE ts >= NOW() - INTERVAL '3 hours'
ORDER BY server_id, ts;

会话窗口(Session Window)

PostgreSQL 原生不支持 Session Window,但可以用纯 SQL 完整模拟。这是 Flink 里内置的能力,在 PG 里用三步走实现。

场景:识别用户浏览会话——超过 30 分钟没有操作,视为新会话开始。

LAG() 计算每个事件与上一事件的时间间隔:

SELECT
    user_id,
    event_time,
    LAG(event_time) OVER (
        PARTITION BY user_id
        ORDER BY event_time
    ) AS prev_event_time,
    event_time - LAG(event_time) OVER (
        PARTITION BY user_id ORDER BY event_time
    ) AS gap
FROM user_events
ORDER BY user_id, event_time;

标记会话边界:间隔 > 30 分钟或首个事件,标记 is_new_session = 1

WITH gaps AS (
    SELECT
        user_id, event_time, page, action,
        CASE
            WHEN event_time - LAG(event_time) OVER (
                PARTITION BY user_id ORDER BY event_time
            ) > INTERVAL '30 minutes'
            OR LAG(event_time) OVER (
                PARTITION BY user_id ORDER BY event_time
            ) IS NULL
            THEN 1 ELSE 0
        END AS is_new_session
    FROM user_events
)

SUM() OVER 累积生成 session_id,再聚合得到每个会话的完整信息

WITH gaps AS (
    SELECT
        user_id, event_time, page, action,
        CASE
            WHEN event_time - LAG(event_time) OVER (
                PARTITION BY user_id ORDER BY event_time
            ) > INTERVAL '30 minutes'
            OR LAG(event_time) OVER (
                PARTITION BY user_id ORDER BY event_time
            ) IS NULL
            THEN 1 ELSE 0
        END AS is_new_session
    FROM user_events
),
sessions AS (
    SELECT
        user_id, event_time, page, action,
        SUM(is_new_session) OVER (
            PARTITION BY user_id
            ORDER BY event_time
            ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
        ) AS session_id
    FROM gaps
)
SELECT
    user_id,
    session_id,
    MIN(event_time)                   AS session_start,
    MAX(event_time)                   AS session_end,
    MAX(event_time) - MIN(event_time) AS session_duration,
    COUNT(*)                          AS event_count
FROM sessions
GROUP BY user_id, session_id
ORDER BY user_id, session_id;

这段 SQL 的逻辑链路:

→
原始事件流(按 user_id + event_time 排序)
↓
LAG() 计算每个事件与上一事件的时间差
↓
时间差 > 30min 或首个事件 → is_new_session = 1,否则 = 0
↓
SUM(is_new_session) OVER (...) 累积求和 → 生成递增的 session_id
↓
按 user_id + session_id 聚合 → 每个会话的开始 / 结束 / 时长 / 事件数

Flink 里内置的 Session Window,在 PG 里用约 40 行 SQL 完整模拟,避免了任何循环和存储过程,纯窗口函数向量化实现。

6. 常用时序分析模式

同比和环比

WITH daily AS (
    SELECT
        date_trunc('day', created_at) AS day,
        SUM(amount) AS gmv
    FROM orders
    GROUP BY 1
)
SELECT
    day, gmv,
    LAG(gmv, 1) OVER (ORDER BY day) AS gmv_yesterday,
    LAG(gmv, 7) OVER (ORDER BY day) AS gmv_last_week,
    ROUND(
        (gmv - LAG(gmv, 1) OVER (ORDER BY day))
        / NULLIF(LAG(gmv, 1) OVER (ORDER BY day), 0) * 100, 2
    ) AS mom_pct,   -- 日环比
    ROUND(
        (gmv - LAG(gmv, 7) OVER (ORDER BY day))
        / NULLIF(LAG(gmv, 7) OVER (ORDER BY day), 0) * 100, 2
    ) AS wow_pct    -- 周同比
FROM daily
ORDER BY day;

NULLIF(x, 0) 防止除零错误——生产代码中必须加。

用户留存分析(Day1 / Day7 / Day30)

WITH first_day AS (
    SELECT
        user_id,
        date_trunc('day', MIN(event_time)) AS cohort_day
    FROM user_events
    GROUP BY user_id
),
activity AS (
    SELECT DISTINCT
        user_id,
        date_trunc('day', event_time) AS active_day
    FROM user_events
)
SELECT
    f.cohort_day,
    COUNT(DISTINCT f.user_id) AS cohort_size,
    COUNT(DISTINCT CASE WHEN a.active_day = f.cohort_day + INTERVAL '1 day'  THEN a.user_id END) AS d1,
    COUNT(DISTINCT CASE WHEN a.active_day = f.cohort_day + INTERVAL '7 days' THEN a.user_id END) AS d7,
    COUNT(DISTINCT CASE WHEN a.active_day = f.cohort_day + INTERVAL '30 days' THEN a.user_id END) AS d30,
    ROUND(
        COUNT(DISTINCT CASE WHEN a.active_day = f.cohort_day + INTERVAL '1 day' THEN a.user_id END)
        * 100.0 / NULLIF(COUNT(DISTINCT f.user_id), 0), 1
    ) AS d1_rate
FROM first_day f
LEFT JOIN activity a USING (user_id)
GROUP BY f.cohort_day
ORDER BY f.cohort_day;

7. 异常检测:统计方法替代硬编码阈值

规则阈值(「CPU > 90% 就告警」)的问题是不随数据分布变化。更健壮的方式是用 Z-Score:超过均值 ± 2σ 的点视为异常。

WITH stats AS (
    SELECT
        server_id,
        AVG(cpu)    AS cpu_mean,
        STDDEV(cpu) AS cpu_stddev
    FROM server_metrics
    WHERE ts >= NOW() - INTERVAL '30 days'
    GROUP BY server_id
),
labeled AS (
    SELECT
        m.ts, m.server_id, m.cpu,
        ABS(m.cpu - s.cpu_mean) / NULLIF(s.cpu_stddev, 0) AS z_score
    FROM server_metrics m
    JOIN stats s USING (server_id)
    WHERE m.ts >= NOW() - INTERVAL '24 hours'
)
SELECT
    ts, server_id, cpu,
    ROUND(z_score, 2) AS z_score,
    CASE WHEN z_score > 2 THEN '⚠️ 异常' ELSE '正常' END AS status
FROM labeled
WHERE z_score > 2
ORDER BY z_score DESC;

进阶:连续 N 次异常才告警(减少误报)

WITH anomalies AS (
    SELECT
        ts, server_id, cpu,
        CASE WHEN cpu > 90 THEN 1 ELSE 0 END AS is_anomaly
    FROM server_metrics
    WHERE ts >= NOW() - INTERVAL '24 hours'
),
streaks AS (
    SELECT
        ts, server_id, cpu, is_anomaly,
        SUM(is_anomaly) OVER (
            PARTITION BY server_id
            ORDER BY ts
            ROWS BETWEEN 4 PRECEDING AND CURRENT ROW
        ) AS streak_5
    FROM anomalies
)
SELECT * FROM streaks
WHERE streak_5 = 5   -- 连续 5 次都异常,才是真正需要处理的
ORDER BY server_id, ts;

ROWS BETWEEN 4 PRECEDING AND CURRENT ROW 表示含当前行在内的最近 5 行。5 行异常标记之和等于 5,说明连续 5 次都超阈值——这才是真正的告警信号,而不是单点噪声。

8. 性能调优:工程视角

问题方案说明
全表扫描慢CREATE INDEX ON orders (created_at DESC)时序查询几乎都带时间过滤
索引体积过大改用 BRIN 索引写入有序时,体积是 B-Tree 的 ~1%
数据量大按月 PARTITION BY RANGE(ts)分区裁剪大幅减少扫描量
仪表盘重复计算物化视图 + 定时刷新模拟 Continuous Aggregate
索引失效避免在 WHERE 对 ts 做函数运算见下方对比

BRIN vs B-Tree 索引

-- B-Tree(默认):体积大,支持任意顺序查询
CREATE INDEX ON server_metrics (ts);

-- BRIN:体积极小,适合写入有序的时序数据,时序场景首选
CREATE INDEX ON server_metrics USING BRIN (ts);

-- 组合索引:按 server_id 分区查询时更快
CREATE INDEX ON server_metrics (server_id, ts DESC);

BRIN 记录每个数据块的 min/max 值。时序数据按时间顺序写入,天然适合——索引体积可能只有 B-Tree 的 1%,查询扫描量同样大幅减少。

实际检索中,避免索引失效需要写明日期值:

-- ❌ 错误:对索引列做了函数运算,索引失效,全表扫描
WHERE date_trunc('day', created_at) = '2026-01-15'

-- ✅ 正确:范围查询,索引生效
WHERE created_at >= '2026-01-15' AND created_at < '2026-01-16'

还有一种工程上的实践是,按月分区表

CREATE TABLE orders_partitioned (
    order_id   BIGSERIAL,
    user_id    BIGINT NOT NULL,
    amount     NUMERIC(12, 2),
    created_at TIMESTAMPTZ NOT NULL
) PARTITION BY RANGE (created_at);

CREATE TABLE orders_2024_01
    PARTITION OF orders_partitioned
    FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');

CREATE TABLE orders_2024_02
    PARTITION OF orders_partitioned
    FOR VALUES FROM ('2024-02-01') TO ('2024-03-01');

查询时 PostgreSQL 自动只扫描相关分区——90 天的查询不会碰 2 年前的数据。

9. 横向对比:什么时候引入专用工具

PostgreSQL 的隐藏优势:OLTP + 分析同库。ClickHouse 和 Flink 需要从业务数据库同步数据,这意味着额外的延迟、额外的运维、额外的一致性风险。PostgreSQL 直接在业务数据上跑分析,是中小规模场景真正的杀手锏。

能力维度PostgreSQLClickHouseFlink
OLTP / 事务写入★★★
窗口函数灵活性★★★★★★★★
实时流处理★★★
聚合查询性能★★★★★★★
运维复杂度
与业务库同源✓ 同一个库✗ 需要同步✗ 需要同步

10. 何时升级到 TimescaleDB?

TimescaleDB 是 PostgreSQL 的扩展,不是替代品。原有 SQL 无需修改,加上它只会让查询更快。

  • 写入频率持续超过 5 万行/秒
  • 单表数据量超过 百亿行
  • 需要自动数据压缩(时序数据压缩率可达 90%+)
  • 需要自动数据过期(retention policy,老数据自动删除)
  • 想用 time_bucket_gapfill(自动补零,比 generate_series 更优雅)

迁移只需一行,之后所有原有查询继续有效:

SELECT create_hypertable('server_metrics', 'ts');

11. 总结

本文覆盖的核心技术点:

  • date_trunc + generate_series:时序对齐与补零,折线图不断裂的关键
  • ROWS vs RANGE:滑动窗口两种语义,稀疏事件数据必须用 RANGE
  • 三种窗口模式:滚动(date_trunc)、滑动(RANGE BETWEEN)、会话(LAG + SUM 模拟)
  • 异常检测:Z-Score + 连续 N 次检测,比硬编码阈值健壮得多
  • BRIN 索引:时序数据专属,体积是 B-Tree 的 ~1%

窗口函数是 SQL 里被低估最严重的功能。很多工程师绕了一大圈 Spark DataFrame、Flink DataStream,回头发现想要的结果一条 OVER (ORDER BY ts RANGE BETWEEN …) 就能给出来。

不是说大数据工具没价值,而是——在引入复杂架构之前,请先确认你真的需要它。