推荐系统(十五)—— 实时推荐与在线学习

实时推荐的工程师视角:流式管道(Kafka + Flink)、在线学习(SGD、FTRL、AdaGrad)、Bandit(UCB、Thompson Sampling、LinUCB)、延迟预算、特征新鲜度、概念漂移检测,以及生产环境真正在调的缓存与计算权衡。

用户 14:02 打开 App 搜「越野跑鞋」,到 15:30 兴趣已经飘到厨房好物。如果你的模型还在跑昨晚的离线快照,16:00 推给他的依然是萨洛蒙广告——这条时间差就是实时系统真正要补上的洞。但有意思的不是「让一切更快」,而是「到底什么需要快」:大量特征即便实时化也基本不影响 AUC,方向选错就是花钱买空气。

你将学到

  • 两条路径:实时推荐其实是两条管道的拼接——异步的写路径(事件 → 状态 → 模型)和同步的读路径(请求 → 召回 → 排序 → 返回)。
  • 毫秒去哪了:延迟不是均值游戏,p99 才是用户感受到的指标。我们会按阶段、按分位拆解 100 ms 的预算。
  • 在线学习的工程实现:SGD、AdaGrad,以及生产里真正在跑的 FTRL-Proximal——也就是 Google 在广告 CTR 系统中公开的算法。
  • Bandit:UCB1、Thompson Sampling、上下文版 LinUCB,以及它们的 regret 界在物品每天都换的现实里到底意味着什么。
  • 流式架构:一套具体的 Kafka + Flink + KV 存储方案,包含 checkpoint 与 exactly-once 语义。
  • 概念漂移:怎么检测(ADWIN、DDM、Page-Hinkley),检测到了又该如何反应。
  • 缓存 vs 计算:你在生产里真正在调的那条权衡曲线——以及为什么答案几乎总是「混合」。

前置知识

  • Python 与 NumPy(第 1-2 篇)
  • 梯度下降与损失函数(第 7 篇)
  • 推荐系统管道总览(第 11 篇)

1. 为什么要实时,以及「实时」到底指什么

三个现实把人推向实时:

  1. 会话很短。 Feed 类应用的中位会话时长 3-7 分钟。一个每天才更新一次的模型,在大多数会话结束之前根本没机会看到它们。
  2. 趋势更短。 一条爆款视频可能在前 6 小时就吃掉一生 80% 的互动。昨晚的离线模型对它一无所知。
  3. 反馈循环就是模型本身。 推送出去的物品、回流的点击,本身就是下一条训练样本。把这条循环从「天级」缩到「秒级」,是「学习系统」与「停滞系统」的分水岭。

但「实时」不是单点,而是一道光谱:

层级更新频率典型用途
实时< 1 秒会话意图、Feed 内去重、风控信号
准实时1 秒 – 1 小时近期点击序列、单作者 CTR
小时级1 – 24 小时热门话题、物品热度衰减
离线1 天以上用户人口属性、物品 Embedding、召回索引

最常见的错误就是把所有东西都做成实时。后面图 4 会显示:人口属性、物品元数据基本不会从实时管道里得到任何 AUC 收益,但近期点击序列能拿到 2-3 个百分点。实时是一份预算,要花在真正能撬动指标的特征上。


2. 双路径架构

实时推荐系统:异步写路径不停刷新状态,同步读路径在硬性 SLO 下提供服务

实时推荐天然分成两条路径:

写路径(异步、看吞吐)。 客户端事件进入按 user_id 分区的 Kafka topic,被 Flink 任务聚合成滚动窗口(最近 N 次点击、10 分钟 CTR 等),最后落到两个地方:一个特征存储(Redis 或 RocksDB)供读路径取用,一个在线学习器更新模型权重。每隔几分钟,新的模型快照被推到注册中心。

读路径(同步、看延迟)。 请求到达后做召回(Embedding 上的 ANN,加上倒排索引覆盖新物品),然后做单次往返的特征拉取,进入排序,再做多样性与业务规则的重排,最后返回结果。整体预算 < 100 ms。

工程纪律是把这两条路径解耦。读路径永远不写模型、不训练、不阻塞流式计算。即便流式那边掉队了,读路径仍能用稍微旧一点的特征继续工作——退化但不宕机。


3. 延迟预算 —— 每一毫秒去了哪

延迟预算按阶段、按分位拆解,展示召回、特征拉取、排序、重排各自在 p50/p95/p99 上的占比

100 ms 端到端是 Feed 类产品的行业惯例(人因实验把视觉更新「感觉瞬时」的阈值定在 100-200 ms 上下)。在这份预算里,典型分配大致如下:

阶段p50p95p99备注
入向网络4712长连接摊薄了 TLS 握手
召回(ANN)101828上亿物品的 HNSW 或 ScaNN
特征拉取61430Redis pipeline;尾部来自 GC / 网络抖动
排序(DNN)183255~500 候选批量打分
重排 + 日志4918多样性、业务规则、异步落日志
出向网络3611
端到端4586154p99 越线,这是常态

两条工程经验:

  1. 均值会撒谎。 p50 = 45 ms 看起来宽裕。p99 = 154 ms 意味着每一百次请求就有一次超 SLO——日活十亿的平台,一天就是数百万次。
  2. 排序最费时间。 候选批处理、模型蒸馏、TensorRT/ONNX-Runtime 量化,单点收益超过其他所有优化的总和。Pinterest 公开过把 6 层 DNN 蒸馏成 2 层学生模型加特征交叉,p99 直接降了 30%。

流式参考架构:客户端写入 Kafka topic,Flink 进行有状态聚合与在线学习,输出扇出到特征存储、模型注册中心、监控指标

4.1 Kafka —— 持久化的传输层

Kafka 的角色窄而关键:一条持久化、分区、可重放的日志。三个属性最重要:

  • user_id 分区:保证同一个用户的事件落在同一分区,因果顺序得以保留——「点击发生在曝光之前还是之后」这类有状态 join 必须依赖此性质。
  • 副本(一般 replication-factor=3):单 broker 挂掉不会丢数据。
  • 保留策略:让你能回放过去 7 天来回填一个新模型——线上路径与恢复路径走同一份代码。

Kafka 做的事情:聚合、join、机器学习。它就是一家邮局。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from kafka import KafkaProducer
import json, time

producer = KafkaProducer(
    bootstrap_servers=["kafka-1:9092", "kafka-2:9092", "kafka-3:9092"],
    value_serializer=lambda v: json.dumps(v).encode(),
    key_serializer=lambda k: k.encode(),
    acks="all",        # 等待所有 ISR 副本确认
    enable_idempotence=True,  # 生产者侧 exactly-once
    compression_type="lz4",
    linger_ms=5,       # 5 毫秒微批—— 5ms 延迟换 10x 吞吐
)

def emit_click(user_id: str, item_id: str, position: int) -> None:
    """发送一次点击事件。同一用户落同一分区。"""
    producer.send(
        topic="clicks",
        key=user_id,                # 分区键
        value={
            "user_id": user_id,
            "item_id": item_id,
            "position": position,
            "ts": int(time.time() * 1000),
        },
    )

如果说 Kafka 是传输,Flink 就是有状态的流处理器。它的杀手锏是故障下的 exactly-once——基于 Chandy-Lamport 风格的分布式快照实现:每个 checkpoint 周期(默认 60 秒),Flink 原子地捕获每个算子的状态并持久化到 S3。失败发生时,回退 Kafka offset 到上一个 checkpoint 重放,对外表现就像没出过事一样。

一个典型的 Flink 点击归因任务:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# Flink SQL —— 每用户最近 10 分钟点击数,写入特征存储
from pyflink.table import EnvironmentSettings, StreamTableEnvironment

t_env = StreamTableEnvironment.create(
    environment_settings=EnvironmentSettings.in_streaming_mode()
)

t_env.execute_sql("""
CREATE TABLE clicks (
    user_id STRING,
    item_id STRING,
    ts BIGINT,
    event_time AS TO_TIMESTAMP_LTZ(ts, 3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'clicks',
    'properties.bootstrap.servers' = 'kafka-1:9092',
    'format' = 'json',
    'scan.startup.mode' = 'latest-offset'
)
""")

t_env.execute_sql("""
CREATE TABLE user_features (
    user_id STRING,
    window_end TIMESTAMP(3),
    clicks_10m BIGINT,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'redis',
    'host' = 'redis.prod',
    'ttl-sec' = '900'
)
""")

# 10 分钟跳跃窗口,步长 1 分钟 —— 每分钟吐一次新鲜的 CTR
t_env.execute_sql("""
INSERT INTO user_features
SELECT
    user_id,
    HOP_END(event_time, INTERVAL '1' MINUTE, INTERVAL '10' MINUTE) AS window_end,
    COUNT(*) AS clicks_10m
FROM clicks
GROUP BY
    user_id,
    HOP(event_time, INTERVAL '1' MINUTE, INTERVAL '10' MINUTE)
""")

watermark 是大多数人最容易写错的地方。它声明的是「我不会再看到 event_time < watermark 的事件」,从而授权 Flink 关闭并发射一个窗口。设置太紧会丢晚到事件,设置太松特征就被系统性地拖延。


5. 在线学习:从 SGD 到 FTRL

在线学习 vs 离线重训。左:稳态任务的收敛过程;右:概念漂移下的表现

5.1 核心更新

对于带参数 $\theta$ 的逻辑回归排序器,单条样本 $(x_t, y_t)$ 的更新是:

$$\theta_{t+1} = \theta_t - \eta_t \, \nabla_\theta \mathcal{L}(\sigma(\theta_t^\top x_t), y_t)$$

这就是普通 SGD。能用,但放到 Web 规模的 CTR 数据上——上百万稀疏特征、每条只在很小一部分样本里出现——它有两个问题:

  1. 没有按特征的学习率:万分之一才出现一次的特征,其步长本应远大于每次都出现的特征。
  2. 没有稀疏性:权重会在噪声里游走,离不开零点。一个 10⁹ 参数永不归零的模型根本没法服务化。

5.2 AdaGrad —— 按特征自适应步长

AdaGrad 通过累积每个特征的梯度平方解决第一个问题:

$$\theta_{t+1, i} = \theta_{t, i} - \frac{\eta}{\sqrt{G_{t, i} + \varepsilon}} g_{t, i}, \quad G_{t, i} = \sum_{s=1}^{t} g_{s, i}^2$$

罕见特征 $G$ 小,少数几次出现时步子可以迈大;高频特征 $G$ 大,更新趋于谨慎。

5.3 FTRL-Proximal —— 生产里的主力

FTRL-Proximal(McMahan 等,Ad Click Prediction: a View from the Trenches,KDD 2013——这是 Google 公开介绍其广告系统所用算法的那篇论文)把 AdaGrad 的按特征自适应学习率,与能产生真正零值而非小权重的 $L_1$ 正则结合起来。逐坐标更新规则:

$$ z_{t,i} \leftarrow z_{t-1,i} + g_{t,i} - \frac{\sigma_{t,i}}{\eta} \theta_{t-1,i}, \qquad \theta_{t,i} = \begin{cases} 0 & \text{若 } |z_{t,i}| \le \lambda_1 \\ -\frac{1}{\eta_{t,i}} \big(z_{t,i} - \mathrm{sign}(z_{t,i}) \lambda_1\big) & \text{否则} \end{cases} $$

其中 $\sigma_{t,i} = \frac{1}{\eta_{t,i}} - \frac{1}{\eta_{t-1,i}}$,$\eta_{t,i} = \alpha / (\beta + \sqrt{\sum_s g_{s,i}^2})$。

关键性质:模型是真正稀疏的。Google 报告 FTRL-Proximal 相比朴素 SGD + $L_2$ 在同等 AUC 下把模型体积压了一个数量级。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import numpy as np

class FTRLProximal:
    """FTRL-Proximal 在线逻辑回归。

    参考文献:McMahan 等,"Ad Click Prediction: a View from the
    Trenches",KDD 2013。Google 当时广告系统真正运行的算法。
    """
    def __init__(self, n_features: int, alpha=0.1, beta=1.0,
                 l1=1.0, l2=1.0):
        self.n = n_features
        self.alpha, self.beta, self.l1, self.l2 = alpha, beta, l1, l2
        # 只持久化 z 和 n,权重 w 按需懒计算
        self.z = np.zeros(n_features)
        self.n_sq = np.zeros(n_features)

    def _weight(self, i: int) -> float:
        """从 (z_i, n_sq_i) 懒计算 w_i。"""
        z, n_sq = self.z[i], self.n_sq[i]
        if abs(z) <= self.l1:
            return 0.0
        sign = 1.0 if z > 0 else -1.0
        return -(z - sign * self.l1) / (
            (self.beta + np.sqrt(n_sq)) / self.alpha + self.l2
        )

    def predict_proba(self, x_indices: np.ndarray, x_values: np.ndarray) -> float:
        """稀疏预测,只在非零特征上累加。"""
        s = sum(self._weight(i) * v for i, v in zip(x_indices, x_values))
        return 1.0 / (1.0 + np.exp(-max(min(s, 35.0), -35.0)))

    def update(self, x_indices: np.ndarray, x_values: np.ndarray, y: int) -> float:
        """单样本更新,返回更新前的预测概率。"""
        p = self.predict_proba(x_indices, x_values)
        for i, v in zip(x_indices, x_values):
            g = (p - y) * v                       # 该特征上的梯度
            sigma = (np.sqrt(self.n_sq[i] + g * g) - np.sqrt(self.n_sq[i])) / self.alpha
            self.z[i]   += g - sigma * self._weight(i)
            self.n_sq[i] += g * g
        return p

5.4 在线 vs 离线 —— 全景图

图 3 左侧是稳态情形:在线学习平滑收敛,离线重训呈阶梯——每 200 条事件出一个新快照,中间是平台。在静态任务上两者期望差距会逐渐收敛,但随时间积分,在线明显占优。

右侧才是真正的胜负手:突发的分布漂移(爆款事件、新品上线、节日效应)。离线模型对训练窗口外的世界一无所知,要等满一个完整窗口才能恢复;在线学习从漂移后的第一条样本就开始调整,大约 100 条事件后就回到接近峰值的 AUC。生产环境里漂移是常态,不是例外——这才是在线学习不只是学术偏好,而是真正的工程杠杆的原因。


6. 特征新鲜度 —— 到底有多重要?

特征陈旧度 vs AUC:AUC 在 log 陈旧度上近似线性下降,几乎所有损失都集中在行为类特征

新鲜度问题是认真的团队真正会争论的,因为把一个特征做成实时是昂贵的。来自公开报告(Meta 的深度学习推荐模型、Pinterest 的 PinnerSAGE、字节的 Monolith)的经验模式高度一致:

  • AUC 在 log 陈旧度上大致线性下降。 1 秒到 1 分钟:损失微乎其微。1 分钟到 1 小时:开始有感(约 0.005 AUC)。1 小时到 1 天:损失显著(约 0.015-0.020 AUC)。
  • 损失集中在行为类特征。 近期点击序列与会话意图贡献了约 80% 的新鲜度溢价。人口属性与物品元数据基本是平的——每周更新一次都看不出影响。

这决定了架构形态:不要为不能回本的特征支付流式成本。一个典型的 Feed 系统会并行跑三条管道——行为特征走实时、热度类聚合走小时级、Embedding 与人口属性走天级。


7. Bandit —— 理论可证的探索故事

7.1 一句话描述问题

你有 $K$ 个物品要选。每个物品都有未知的点击概率 $\mu_i$。每一轮你挑一个、看到它的点击、然后更新。$T$ 轮之后,遗憾就是你相对于一直挑最好那个所损失的:

$$R_T = T \mu^* - \mathbb{E}\!\left[\sum_{t=1}^T \mu_{a_t}\right]$$

「好」算法的标准是次线性遗憾:$R_T = o(T)$,即每轮平均损失 → 0。

7.2 UCB1 —— 不确定时持乐观态度

UCB1(Auer 等,2002)选择上置信界最高的臂:

$$a_t = \arg\max_i \left( \hat\mu_i + \sqrt{\frac{2 \ln t}{n_i}} \right)$$

其中 $n_i$ 是臂 $i$ 已被拉的次数。奖金随 $n_i$ 上升而收缩——「不确定就探索,确定了就利用」。UCB1 达到 $O(\log T)$ 遗憾,对稳态 Bandit 而言这在常数因子内可证最优(Lai-Robbins 下界)。

7.3 Thompson Sampling —— 贝叶斯路线

对每个臂的成功率维护一个后验分布。伯努利奖励的共轭先验是 Beta:

$$\theta_i \sim \text{Beta}(\alpha_i, \beta_i), \quad \text{更新:} (\alpha_i, \beta_i) \leftarrow (\alpha_i + r, \beta_i + 1 - r)$$

每一轮,从每个臂的后验里采样一个 $\theta_i$,挑采样值最高的。后验宽(不确定)的臂会被探索——它的样本噪声大,时不时会高出别人;后验窄的臂会被利用。Thompson Sampling 在渐近意义上匹配 UCB1 的 $O(\log T)$(Agrawal & Goyal,2012),且在大多数 benchmark 上实测更优,更易扩展到延迟反馈、批量更新和复杂奖励模型。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class ThompsonSampling:
    """伯努利 Thompson Sampling。生产实现就是这样——
    唯一要加的是衰减(每天 alpha, beta *= 0.99),
    用来处理非稳态的物品池。"""

    def __init__(self, n_arms: int):
        self.alpha = np.ones(n_arms)   # 成功次数 + 1
        self.beta  = np.ones(n_arms)   # 失败次数 + 1

    def select(self) -> int:
        return int(np.argmax(np.random.beta(self.alpha, self.beta)))

    def update(self, arm: int, reward: int) -> None:
        if reward:
            self.alpha[arm] += 1
        else:
            self.beta[arm] += 1

7.4 LinUCB —— 真正能用的「上下文」

普通 Bandit 学的是全局排序。但推荐的全部价值就在个性化。LinUCB(Li 等,A Contextual-Bandit Approach to Personalized News Article Recommendation,WWW 2010 —— 雅虎当年用于首页新闻个性化的算法)假设期望奖励对上下文向量 $x_t$ 线性:

$$\mathbb{E}[r_a \mid x_t] = x_t^\top \theta_a$$

每个臂维护一个岭回归 $(A_a, b_a)$,其中 $A_a = I + \sum X X^\top$、$b_a = \sum r X$。选择规则:

$$a_t = \arg\max_a \left( x_t^\top \hat\theta_a + \alpha \sqrt{x_t^\top A_a^{-1} x_t} \right), \quad \hat\theta_a = A_a^{-1} b_a$$

奖金项 $\sqrt{x_t^\top A_a^{-1} x_t}$ 就是岭回归在该上下文下的预测标准差——当前上下文与该臂历史样本越不像,奖金越大。遗憾界是 $\tilde O(\sqrt{d T})$,$d$ 是上下文维度。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
class LinUCB:
    """LinUCB —— disjoint 模型版本。
    参考文献:Li, Chu, Langford, Schapire,WWW 2010。"""

    def __init__(self, n_arms: int, n_features: int, alpha: float = 1.0):
        self.alpha = alpha
        self.A = [np.eye(n_features) for _ in range(n_arms)]
        self.b = [np.zeros(n_features) for _ in range(n_arms)]

    def select(self, x: np.ndarray) -> int:
        ucbs = []
        for A_a, b_a in zip(self.A, self.b):
            A_inv = np.linalg.inv(A_a)
            theta = A_inv @ b_a
            ucbs.append(x @ theta + self.alpha * np.sqrt(x @ A_inv @ x))
        return int(np.argmax(ucbs))

    def update(self, arm: int, x: np.ndarray, reward: float) -> None:
        self.A[arm] += np.outer(x, x)
        self.b[arm] += reward * x

实战中你几乎不会用「纯」LinUCB。线上跑的是深度排序模型,Bandit 在它上面一层,决定给某次请求分配哪种策略(创作者助推、新物品助推、纯利用)。臂是策略,不是物品。


8. 概念漂移 —— 检测它,不要假装它不存在

概念漂移检测:上图是观测 CTR 加滚动均值与参考窗口;下图是在渐进与突发漂移上触发的 z-score 检测器

只有学习器适应,在线学习才会真的适应漂移——如果学习率早就衰减到几乎为零,它什么都做不了。生产系统会显式检测漂移并主动反应。

8.1 三个经典检测器

  • Page-Hinkley 检验:累积偏离均值的差值,CUSUM 超阈值即报警。适合单调漂移。
  • DDM(Drift Detection Method,Gama 等 2004):跟踪二项分布错误率 $p_t$ 与其标准差 $s_t$。$p_t + s_t \ge p_{\min} + 2 s_{\min}$ 触发预警,$\ge p_{\min} + 3 s_{\min}$ 触发报警。
  • ADWIN(Bifet & Gavaldà 2007):维护自适应窗口,只要两个子窗口的均值差超过 Hoeffding 界,就丢弃旧的一半。可证的有界误报率。

图 6 用的是简化版 z-score 检测器,也是你会真的部署的 v1:保留一个「已知良态」的参考窗口,计算近期滚动均值 CTR 相对参考的 z-score,$|z| > 3$ 时报警。

8.2 检测之后的反应

检测没有响应只是个告警。从便宜到昂贵的实战反应:

  1. 拉高学习率(便宜、可逆)。在固定窗口内将 $\eta$ 乘以 2-5x。
  2. 重置 Bandit 后验(中等代价)。把受影响的臂的 $\alpha, \beta$ 减半——保留先验形状,但翻倍了不确定性。
  3. 强制回滚 checkpoint(昂贵)。退回上一个良态快照,从 Kafka 重放,在漂移后窗口上重训。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class DriftAdaptiveLearner:
    def __init__(self, base_learner, base_lr=0.1, window=200, z_thresh=3.0):
        self.learner = base_learner
        self.base_lr = base_lr
        self.window = window
        self.z_thresh = z_thresh
        self.recent = []        # 滚动误差
        self.ref_mean = None
        self.ref_std = None

    def update(self, x, y):
        pred = self.learner.predict(x)
        err = abs(pred - y)
        self.recent.append(err)
        if len(self.recent) > self.window:
            self.recent.pop(0)

        # 等第一个窗口稳定后建立参考
        if self.ref_mean is None and len(self.recent) == self.window:
            self.ref_mean = np.mean(self.recent)
            self.ref_std = np.std(self.recent) + 1e-6

        # 检测漂移
        if self.ref_mean is not None:
            z = (np.mean(self.recent[-50:]) - self.ref_mean) / self.ref_std
            if abs(z) > self.z_thresh:
                self.learner.learning_rate = self.base_lr * 4   # 拉高
                # 处理后重置参考——避免永久过激状态
                self.recent = []
                self.ref_mean = None

        self.learner.update(x, y)

9. 缓存 vs 计算 —— 你真正在调的那条权衡

缓存 vs 计算:左图是延迟、新鲜度、成本随缓存 TTL 的变化;右图是各策略的 Pareto 前沿,混合策略胜出

读路径上的每个决定都落在这条权衡曲线上:

  • 总是重算:正确但昂贵——每次请求都打模型、每个特征都现算。
  • 激进缓存:便宜又快,但陈旧——刚换了意图的会话拿到的还是几分钟前的特征。
  • 混合模式:生产系统几乎都收敛到这个形态:
    • 热路径(头部用户 / 活跃会话 / 新物品):实时计算,不缓存。
    • 冷路径(其余流量):从缓存返回,TTL 30-60 秒。
    • 负缓存:把「啥都没变」的响应也缓存起来,避免对静默用户的重复计算。

图 7 右侧解释了为什么:60 秒 TTL 拿走了大部分延迟红利,新鲜度损失却很小;纯重算花了 5 倍计算只换来约 0.005 AUC。「总是重算」点在 Pareto 前沿上,但很少值得。


10. 拼到一起 —— 一个最小但接近真实的系统

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class RealtimeRanker:
    """端到端实时排序器。整合:
      - 特征存储(Flink 产出的新鲜特征)
      - FTRL-Proximal 在线学习器(按事件更新权重)
      - 在学习器输出之上加一层 LinUCB(探索)
      - 漂移检测器(在 regime change 时拉高学习率)
      - 快照回滚(生产安全)
    """

    def __init__(self, n_features: int, n_strategies: int = 4):
        self.scorer = FTRLProximal(n_features=n_features, alpha=0.1,
                                    beta=1.0, l1=1.0, l2=1.0)
        # Bandit 在排序策略上选择,而非物品上
        self.bandit = LinUCB(n_arms=n_strategies, n_features=n_features, alpha=0.5)
        self.drift  = DriftDetector(z_thresh=3.0)
        self.snapshots = []  # (ts, scorer) 的环形缓冲

    def rank(self, ctx: np.ndarray, candidates: list[dict]) -> list[str]:
        # 1. Bandit 选策略
        strategy = self.bandit.select(ctx)
        # 2. FTRL 给每个候选打分——稀疏向量
        scored = [
            (c["id"], self.scorer.predict_proba(c["idx"], c["val"]), c)
            for c in candidates
        ]
        # 3. 策略修正打分(如新品助推、创作者助推)
        scored = apply_strategy(strategy, scored)
        scored.sort(key=lambda r: -r[1])
        return [r[0] for r in scored[:20]]

    def observe(self, ctx, x_idx, x_val, y, strategy):
        """一次反馈循环。"""
        # 更新在线学习器
        p = self.scorer.update(x_idx, x_val, y)
        # 给实际播出去的策略更新 bandit
        self.bandit.update(strategy, ctx, reward=float(y))
        # 把校准误差喂给漂移检测器
        if self.drift.update(abs(p - y)):
            self.scorer.alpha *= 4              # 拉高学习率
        # 周期性安全快照
        if len(self.snapshots) == 0 or time.time() - self.snapshots[-1][0] > 600:
            self.snapshots.append((time.time(), copy.deepcopy(self.scorer)))
            self.snapshots = self.snapshots[-12:]  # 保留 2 小时

几个看似细节、但生产里很要紧的点:

  • Bandit 跑在策略层,不是物品层。「从一亿候选里选一个」的动作空间,没有任何上下文 Bandit 能在合理时间内收敛;但「从 4 个排序策略里选一个」恰好是它的舒适区。
  • 快照存的是在线学习器的状态,不是预测结果。万一一批坏样本污染了权重,回滚状态、从 Kafka 重放干净事件就能恢复。
  • DriftDetector 读的是校准误差 |p - y|,不是原始 CTR。校准漂移能捕捉到比单看 CTR 更多的失效模式。

Q & A

Q:在线学习系统的 A/B 测试怎么做? 标准 A/B 假设观测彼此独立,但在线学习器会从自己的实验组里学习。修正方案:要么用 Chapelle 等(2012)的 interleaved comparison,要么把一边设置成「冻结」模型、另一边跑实时模型,比较的是累积奖励而非单请求 CTR。

Q:Flink 与 Spark Streaming 的真正区别? Flink 是真流式(逐事件、毫秒级),Spark Streaming 是微批(秒级)。对推荐而言,Flink 更低的延迟以及更成熟的 exactly-once 状态管理两件事都重要——近年大规模新建的 Feed 系统几乎清一色 Flink(或类似的内部产品,如 Twitter 的 Heron、字节的 Aiops)。

Q:在线学习是不是不稳定? 有可能不稳定,那正是你必须工程化对抗的失效模式。三道护栏:(1) 学习率有界;(2) 梯度裁剪;(3) 维护快照与回滚通道。Google 的 FTRL 论文专门有一节「trade tricks」——饱和保护、按坐标的学习率下限、校准损失监控——存在的全部目的就是让生产环境的在线学习行为可控。

Q:Bandit 怎么处理延迟反馈? 两种模式。批量更新:攒 $k$ 条观测,一次性更新,如此往复。UCB 与 Thompson Sampling 都有证明:批量化最多让遗憾乘上 $\log k$ 因子。乐观计数:拉一次臂,立刻把 $n_i$ 加一并假设奖励 = 历史均值(或保守起见 = 0),等真实奖励回来再修正。生产用的是乐观版本,因为它能避免在延迟窗口内对单一臂过度采样。

Q:什么时候应该上实时管道? 特征本身缓变(人口属性、长期口味 Embedding);或者陈旧推荐的代价很小(仓库搜索、目录浏览);或者你测不出收益(低流量场景从 1 小时到 1 秒的新鲜度提升淹没在噪声里)。一条实时管道的成本约是离线日批的 10-100 倍——挣到这份钱再上。


总结

  • 实时推荐 = 两条管道,靠特征存储与模型注册中心粘合:异步写路径不停刷新状态,同步读路径在硬性 SLO 下提供服务。
  • 延迟是尾部问题。 优化 p99,不是 p50——排序最重,蒸馏与量化的回报最大。
  • 生产中的在线学习是 FTRL-Proximal。 这是 Google 公开的广告 CTR 算法,SGD/AdaGrad 是台阶。
  • 流式 = Kafka + Flink。 Kafka 负责持久化传输,Flink 负责有状态计算,分布式快照支撑 exactly-once。
  • 新鲜度有价格曲线。 行为特征值得付实时成本,人口属性与物品元数据不值得。
  • Bandit 在排序器之上工作,不在物品层面工作。 它的动作空间是「哪种策略」,不是「哪个物品」。
  • 漂移一定会发生。 检测它(z-score、ADWIN、DDM),廉价响应(拉高 $\eta$),保留回滚通道。
  • 缓存 vs 计算不是二选一。 生产答案永远是混合——热路径实时、冷路径缓存,30-60 秒 TTL 用一小部分成本覆盖 95% 的流量。

我在每一个见过的大规模系统上都验证过的一条经验:只把真正能撬动 AUC 的东西做成实时,其余按它实际变化的频率配预算


系列导航

本文是推荐系统 16 篇系列的第 15 篇

上一篇下一篇
第十四篇:跨域推荐与冷启动解决方案所有文章第十六篇:工业级架构与最佳实践

Liked this piece?

Follow on GitHub for the next one — usually one a week.

GitHub