系列 · 推荐系统 · 第 15 篇

推荐系统(十五)—— 实时推荐与在线学习

实时推荐的工程师视角:流式管道(Kafka + Flink)、在线学习(SGD、FTRL、AdaGrad)、Bandit(UCB、Thompson Sampling、LinUCB)、延迟预算、特征新鲜度、概念漂移检测,以及生产环境真正在调的缓存与计算权衡。

用户在 14:02 打开 App 搜索「越野跑鞋」,到了 15:30 已经开始浏览厨房用品的评测;如果模型还在使用昨晚训练的离线快照,16:00 推给他的很可能仍是萨洛蒙广告——这个时间差正是实时系统要解决的核心问题。真正关键的并非「让所有东西都变快」,而是「到底哪些东西值得变快」:许多特征即使做到实时,对 AUC 的提升也微乎其微,选错方向只会白白浪费资源。

推荐系统(十五)—— 实时推荐与在线学习 — 章节概览图


你将学到什么#

  • 两条路径:实时推荐系统本质上是两条管道拼接而成——异步的写路径(事件 → 状态 → 模型)和同步的读路径(请求 → 召回 → 排序 → 返回)。
  • 毫秒花在哪了:延迟不能只看平均值,用户实际感知的是 p99 尾部延迟。我们会按阶段、按分位数拆解 100 ms 的预算。
  • 在线学习的实际应用:SGD、AdaGrad,以及生产环境中广泛使用的 FTRL-Proximal——这正是 Google 广告点击率预测系统所采用的算法。
  • Bandit 算法:UCB1、Thompson Sampling 和上下文版 LinUCB,以及它们的 regret 界在物品每日更替的现实场景中究竟意味着什么。
  • 流式架构:一个具体的 Kafka + Flink + KV 存储方案,支持 checkpoint 和 exactly-once 语义。
  • 概念漂移:如何检测(ADWIN、DDM、Page-Hinkley),以及检测到之后该如何应对。
  • 缓存 vs 计算:生产环境中真正需要调优的权衡点——答案几乎总是「混合策略」。

前置知识#

  • Python 和 NumPy(第 1–2 篇)
  • SGD 和损失函数(第 7 篇)
  • 推荐系统流程概览(第 11 篇)

为什么需要实时,以及“实时”到底是什么#

三个现实因素推动我们走向实时:

  1. 会话很短:Feed 类应用的中位会话时长仅为 3–7 分钟。每天更新一次的模型,在大多数会话结束前根本来不及看到它们。
  2. 热点转瞬即逝:一个爆款视频可能在发布后 6 小时内就完成了 80% 的总互动量。昨天训练的离线模型对此完全无能为力。
  3. 反馈闭环即模型本身:一旦你推送了一条推荐,用户的点击行为就成为下一条训练样本。将这个闭环从“天级”缩短到“秒级”,是区分一个持续学习系统与一个停滞系统的根本所在。

但“实时”并非单一概念,而是一个光谱:

层级更新频率典型用途
实时< 1 秒会话意图、Feed 内去重、风控信号
准实时1 秒 – 1 小时近期点击序列、单创作者 CTR
小时级1 – 24 小时热门话题、物品热度衰减
离线1 天以上用户人口属性、物品 Embedding、召回索引

常见的误区是试图让所有特征都实时化。如图 4 所示,人口属性和物品元数据在实时管道中几乎无法带来 AUC 提升,而近期点击序列却能带来 2–3 个 AUC 百分点的收益。实时是一种有限资源,应精准投放在真正能推动指标的特征上。


双路径架构#

实时推荐系统:异步写路径不停刷新状态,同步读路径在硬性 SLO 下提供服务

实时推荐系统可清晰拆分为两条路径:

写路径(异步,吞吐优先):用户行为事件从客户端发出,进入按 user_id 分区的 Kafka Topic。Flink 作业对这些事件进行聚合,生成滚动窗口特征(如最近 N 次点击、过去 10 分钟的 CTR 等),并将结果写入两个地方:一是供读路径消费的特征存储(如 Redis 或 RocksDB),二是用于更新模型权重的在线学习器。每隔几分钟,新的模型快照会被推送到模型注册中心。

读路径(同步,延迟敏感):当推荐请求到达时,系统先执行召回(基于 Embedding 的 ANN 搜索,辅以倒排索引覆盖新鲜物品),然后通过单次往返从特征存储中拉取所需特征,接着进行排序,并在重排阶段加入多样性控制和业务规则,最终返回结果。整个过程必须控制在 100 毫秒以内。

工程上的核心原则是保持两条路径解耦:读路径绝不参与模型训练、权重更新或流处理阻塞操作。即使流处理侧出现延迟,读路径仍能使用稍显陈旧的特征继续服务——性能可能略有下降,但系统不会中断。

延迟预算 —— 每一毫秒花在哪#

延迟预算按阶段、按分位拆解,展示召回、特征拉取、排序、重排各自在 p50/p95/p99 上的占比

对于 Feed 类产品,100 ms 的端到端 SLO 是行业标准(人因研究表明,视觉更新“感觉即时”的阈值约为 100–200 ms)。在这 100 ms 预算内,各阶段耗时大致如下:

阶段p50p95p99备注
入向网络4712长连接摊薄 TLS 握手开销
召回(ANN)101828HNSW 或 ScaNN 在亿级物品上的查询
特征拉取61430Redis pipeline;尾部延迟主要来自 GC 或网络抖动
排序(DNN)183255对约 500 个候选进行批量打分
重排 + 日志4918多样性调整、业务规则过滤、异步日志记录
出向网络3611
端到端4586154p99 超出 SLO —— 这是常态

两条实用经验:

  1. 平均延迟具有欺骗性:p50 为 45 ms 看似宽裕,但 p99 达 154 ms 意味着 1% 的请求超时。在日均十亿请求的平台上,这相当于每天数百万次失败体验。
  2. 排序是延迟大头:候选批处理、模型蒸馏、TensorRT/ONNX-Runtime 量化等优化带来的收益远超其他环节。Pinterest 曾报告,将 6 层 DNN 蒸馏为 2 层学生模型并引入特征交叉后,p99 延迟降低了 30%。

流式参考架构:客户端写入 Kafka 主题,Flink 执行有状态聚合与在线学习,输出分发到特征存储、模型注册中心和指标系统

Kafka —— 可靠的传输层#

Kafka 的角色明确而关键:提供持久化、分区化且可重放的日志。三个核心特性至关重要:

  • user_id 分区:确保同一用户的所有事件落在同一分区,从而保留因果顺序——这对“点击是否发生在曝光之后”这类有状态关联至关重要。
  • 副本机制(通常 replication-factor=3):即使某个 Broker 宕机,也不会丢失数据。
  • 数据保留策略:支持回放最近 7 天的数据,便于新模型回填——线上服务与故障恢复共用同一套代码路径。

Kafka 不负责聚合、Join 或机器学习。它本质上是一个高可靠的消息邮局。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from kafka import KafkaProducer
import json, time

producer = KafkaProducer(
    bootstrap_servers=["kafka-1:9092", "kafka-2:9092", "kafka-3:9092"],
    value_serializer=lambda v: json.dumps(v).encode(),
    key_serializer=lambda k: k.encode(),
    acks="all",        # 等待所有 ISR 副本确认
    enable_idempotence=True,  # 生产者端 exactly-once 语义
    compression_type="lz4",
    linger_ms=5,       # 5 毫秒微批处理,用 5ms 延迟换取 10 倍吞吐
)

def emit_click(user_id: str, item_id: str, position: int) -> None:
    """发送一次点击事件,同一用户始终落同一个分区。"""
    producer.send(
        topic="clicks",
        key=user_id,                # 分区键
        value={
            "user_id": user_id,
            "item_id": item_id,
            "position": position,
            "ts": int(time.time() * 1000),
        },
    )

如果说 Kafka 是传输层,Flink 则是有状态的流处理器。其杀手锏是故障下的 exactly-once 语义,通过 Chandy-Lamport 风格的分布式快照实现:每 60 秒(默认 checkpoint 间隔),Flink 会原子性地捕获所有算子的状态并持久化到 S3。发生故障时,系统将 Kafka offset 回退至上一个 checkpoint 并重放事件——对外表现如同从未发生过故障。

一个典型的 Flink 点击归因任务如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# Flink SQL —— 每用户最近 10 分钟点击数,写入特征存储
from pyflink.table import EnvironmentSettings, StreamTableEnvironment

t_env = StreamTableEnvironment.create(
    environment_settings=EnvironmentSettings.in_streaming_mode()
)

t_env.execute_sql("""
CREATE TABLE clicks (
    user_id STRING,
    item_id STRING,
    ts BIGINT,
    event_time AS TO_TIMESTAMP_LTZ(ts, 3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'clicks',
    'properties.bootstrap.servers' = 'kafka-1:9092',
    'format' = 'json',
    'scan.startup.mode' = 'latest-offset'
)
""")

t_env.execute_sql("""
CREATE TABLE user_features (
    user_id STRING,
    window_end TIMESTAMP(3),
    clicks_10m BIGINT,
    PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
    'connector' = 'redis',
    'host' = 'redis.prod',
    'ttl-sec' = '900'
)
""")

# 10 分钟滑动窗口,步长 1 分钟 —— 每分钟输出一次最新的 CTR
t_env.execute_sql("""
INSERT INTO user_features
SELECT
    user_id,
    HOP_END(event_time, INTERVAL '1' MINUTE, INTERVAL '10' MINUTE) AS window_end,
    COUNT(*) AS clicks_10m
FROM clicks
GROUP BY
    user_id,
    HOP(event_time, INTERVAL '1' MINUTE, INTERVAL '10' MINUTE)
""")

Watermark 是最容易出错的部分。它声明:“我不会再收到 event_time < watermark 的事件。”这授权 Flink 关闭窗口并输出结果。若设置过紧,会丢弃晚到事件;若设置过松,特征更新将系统性延迟。

在线学习:从 SGD 到 FTRL#

推荐系统(十五)—— 实时推荐与在线学习 — 章节小结图

在线学习 vs 离线重训。左:稳态任务的收敛过程;右:概念漂移下的表现

核心更新公式#

$$ \theta_{t+1} = \theta_t - \eta_t \, \nabla_\theta \mathcal{L}(\sigma(\theta_t^\top x_t), y_t) $$

这是标准的 SGD。但在 Web 规模 CTR 数据上——特征数以百万计,每个特征仅在极小比例样本中出现——它存在两大问题:

  1. 缺乏按特征的学习率:一个每 1 万条样本才出现一次的特征,显然需要比高频特征更大的更新步长。
  2. 缺乏稀疏性:权重会在噪声中漂移,无法真正归零。一个拥有 $10^9$ 参数且永不归零的模型,根本无法上线服务。

AdaGrad —— 自适应按特征步长#

$$ \theta_{t+1, i} = \theta_{t, i} - \frac{\eta}{\sqrt{G_{t, i} + \varepsilon}} g_{t, i}, \quad G_{t, i} = \sum_{s=1}^{t} g_{s, i}^2 $$

罕见特征的 $G$ 较小,因此在少数几次出现时可迈出大步;高频特征的 $G$ 较大,更新更为谨慎。

FTRL-Proximal —— 生产环境的主力算法#

$$ z_{t,i} \leftarrow z_{t-1,i} + g_{t,i} - \frac{\sigma_{t,i}}{\eta} \theta_{t-1,i}, \qquad \theta_{t,i} = \begin{cases} 0 & \text{若 } |z_{t,i}| \le \lambda_1 \\ -\frac{1}{\eta_{t,i}} \big(z_{t,i} - \mathrm{sign}(z_{t,i}) \lambda_1\big) & \text{否则} \end{cases} $$

其中 $\sigma_{t,i} = \frac{1}{\eta_{t,i}} - \frac{1}{\eta_{t-1,i}}$$\eta_{t,i} = \alpha / (\beta + \sqrt{\sum_s g_{s,i}^2})$

关键优势在于:模型真正稀疏。Google 报告称,相比带 $L_2$ 正则化的朴素 SGD,FTRL-Proximal 在相同 AUC 下将模型体积压缩了一个数量级。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import numpy as np

class FTRLProximal:
    """FTRL-Proximal 在线逻辑回归。

    参考文献:McMahan 等,"Ad Click Prediction: a View from the
    Trenches",KDD 2013。Google 当时广告系统真正运行的算法。
    """
    def __init__(self, n_features: int, alpha=0.1, beta=1.0,
                 l1=1.0, l2=1.0):
        self.n = n_features
        self.alpha, self.beta, self.l1, self.l2 = alpha, beta, l1, l2
        # 只持久化 z 和 n,权重 w 按需懒计算
        self.z = np.zeros(n_features)
        self.n_sq = np.zeros(n_features)

    def _weight(self, i: int) -> float:
        """从 (z_i, n_sq_i) 懒计算 w_i。"""
        z, n_sq = self.z[i], self.n_sq[i]
        if abs(z) <= self.l1:
            return 0.0
        sign = 1.0 if z > 0 else -1.0
        return -(z - sign * self.l1) / (
            (self.beta + np.sqrt(n_sq)) / self.alpha + self.l2
        )

    def predict_proba(self, x_indices: np.ndarray, x_values: np.ndarray) -> float:
        """稀疏预测,只在非零特征上累加。"""
        s = sum(self._weight(i) * v for i, v in zip(x_indices, x_values))
        return 1.0 / (1.0 + np.exp(-max(min(s, 35.0), -35.0)))

    def update(self, x_indices: np.ndarray, x_values: np.ndarray, y: int) -> float:
        """单样本更新,返回更新前的预测概率。"""
        p = self.predict_proba(x_indices, x_values)
        for i, v in zip(x_indices, x_values):
            g = (p - y) * v                       # 该特征上的梯度
            sigma = (np.sqrt(self.n_sq[i] + g * g) - np.sqrt(self.n_sq[i])) / self.alpha
            self.z[i]   += g - sigma * self._weight(i)
            self.n_sq[i] += g * g
        return p

在线 vs 离线 —— 全景对比#

图 3 左侧展示了稳态场景:在线学习平滑收敛,而离线重训呈现阶梯状——每 200 条事件生成一个新快照,中间则停滞不前。在静态任务上,两者长期表现趋同,但从时间积分角度看,在线学习更优。

右侧才是关键:突发分布漂移(如爆款事件、新品上线或节日效应)。离线模型对训练窗口外的变化完全“失明”,需等待完整窗口才能恢复;而在线学习从第一条新样本就开始调整,约 100 条事件后即可恢复至接近峰值 AUC。在生产环境中,漂移是常态而非例外——这正是在线学习成为工程利器的根本原因,而非仅是学术偏好。


特征新鲜度 —— 真的那么重要吗?#

特征陈旧度 vs AUC:AUC 在 log 陈旧度上近似线性下降,几乎所有损失都集中在行为类特征

特征新鲜度是认真做推荐的团队必争之地,因为实时化成本高昂。公开报告(如 Meta 的深度学习推荐模型、Pinterest 的 PinnerSAGE、字节的 Monolith)呈现出一致的经验规律:

  • AUC 随 log 陈旧度近似线性下降:从 1 秒延迟到 1 分钟,损失微乎其微;从 1 分钟到 1 小时,AUC 损失约 0.005,已具实际意义;从 1 小时到 1 天,损失显著,达 0.015–0.020 AUC。
  • 损失集中在行为特征:近期点击序列和会话意图贡献了约 80% 的新鲜度溢价;而人口属性和物品元数据几乎不受影响——即使每周更新一次,也难以观测到显著差异。

这直接塑造了系统架构:不要为无法带来收益的特征支付流式处理成本。典型 Feed 系统通常并行三条流水线——行为特征走实时更新,热度类聚合按小时级更新,Embedding 和人口属性则按天级更新。

Bandit —— 探索与利用的理论故事#

一句话讲清问题#

$$ R_T = T \mu^* - \mathbb{E}\!\left[\sum_{t=1}^T \mu_{a_t}\right] $$

好的算法应具备次线性遗憾$R_T = o(T)$ ,即平均每轮损失趋于零。

UCB1 —— 不确定时保持乐观#

$$ a_t = \arg\max_i \left( \hat\mu_i + \sqrt{\frac{2 \ln t}{n_i}} \right) $$

其中 $n_i$ 是臂 $i$ 被选择的次数。探索奖励随 $n_i$ 增大而衰减——先探索,后利用。UCB1 实现 $O(\log T)$ 遗憾,对稳态 Bandit 而言,这在理论上是最优的(Lai-Robbins 下界)。

Thompson Sampling —— 贝叶斯之道#

$$ \theta_i \sim \text{Beta}(\alpha_i, \beta_i), \quad \text{更新:} (\alpha_i, \beta_i) \leftarrow (\alpha_i + r, \beta_i + 1 - r) $$

每轮从各臂后验中采样 $\theta_i$ ,选择最高者。后验宽的臂因采样噪声大而被更多探索;后验窄的臂则被稳定利用。Thompson Sampling 渐近达到与 UCB1 相同的 $O(\log T)$ 遗憾(Agrawal & Goyal, 2012),且在多数基准测试中表现更优,同时更易扩展至延迟反馈、批量更新和复杂奖励模型。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class ThompsonSampling:
    """伯努利 Thompson Sampling。生产环境实现基本就是这样——
    唯一需要加的是衰减(每天 alpha, beta *= 0.99),
    用来应对非稳态的物品池。"""

    def __init__(self, n_arms: int):
        self.alpha = np.ones(n_arms)   # 成功次数 + 1
        self.beta  = np.ones(n_arms)   # 失败次数 + 1

    def select(self) -> int:
        return int(np.argmax(np.random.beta(self.alpha, self.beta)))

    def update(self, arm: int, reward: int) -> None:
        if reward:
            self.alpha[arm] += 1
        else:
            self.beta[arm] += 1

LinUCB —— 上下文驱动的个性化#

$$ \mathbb{E}[r_a \mid x_t] = x_t^\top \theta_a $$ $$ a_t = \arg\max_a \left( x_t^\top \hat\theta_a + \alpha \sqrt{x_t^\top A_a^{-1} x_t} \right), \quad \hat\theta_a = A_a^{-1} b_a $$

奖金项 $\sqrt{x_t^\top A_a^{-1} x_t}$ 即岭回归在当前上下文下的预测标准差——当上下文与该臂历史样本差异大时,奖金更高。遗憾界为 $\tilde O(\sqrt{d T})$$d$ 为上下文维度。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
class LinUCB:
    """LinUCB —— disjoint 模型版本。
    参考文献:Li, Chu, Langford, Schapire,WWW 2010。"""

    def __init__(self, n_arms: int, n_features: int, alpha: float = 1.0):
        self.alpha = alpha
        self.A = [np.eye(n_features) for _ in range(n_arms)]
        self.b = [np.zeros(n_features) for _ in range(n_arms)]

    def select(self, x: np.ndarray) -> int:
        ucbs = []
        for A_a, b_a in zip(self.A, self.b):
            A_inv = np.linalg.inv(A_a)
            theta = A_inv @ b_a
            ucbs.append(x @ theta + self.alpha * np.sqrt(x @ A_inv @ x))
        return int(np.argmax(ucbs))

    def update(self, arm: int, x: np.ndarray, reward: float) -> None:
        self.A[arm] += np.outer(x, x)
        self.b[arm] += reward * x

实践中几乎从不使用“纯” LinUCB。线上服务运行深度排序模型,Bandit 位于其上一层,决定为当前请求分配哪种策略(如创作者助推、新物品助推或纯利用)。此时,臂是策略,而非具体物品。

概念漂移 —— 检测它,别装看不见#

概念漂移检测:上图是观测 CTR 加滚动均值与参考窗口;下图是在渐进与突发漂移上触发的 z-score 检测器

在线学习仅在学习器适应时才有效。若学习率已衰减至零,模型便无法响应漂移。生产系统会主动检测漂移并作出反应。

三种经典检测器#

  • Page-Hinkley 检验:累积偏离均值的偏差,CUSUM 超阈值即报警。适用于单调漂移。
  • DDM(Drift Detection Method, Gama et al. 2004):跟踪二项错误率 $p_t$ 及其标准差 $s_t$ 。当 $p_t + s_t \ge p_{\min} + 2 s_{\min}$ 时预警,$\ge p_{\min} + 3 s_{\min}$ 时报警。
  • ADWIN(Bifet & Gavaldà 2007):维护自适应窗口,当两子窗口均值差异超 Hoeffding 界时,丢弃较旧一半。误报率有理论保证。

图 6 中的简化 z-score 检测器才是 v1 实际部署的选择:保留“已知良好”参考窗口,计算近期滚动均值 CTR 相对该窗口的 z-score,当 $|z| > 3$ 时触发报警。

应对漂移的策略#

仅有检测而无响应只是告警。实用响应策略按成本由低到高:

  1. 提高学习率(低成本、可逆):在固定窗口内将 $\eta$ 放大 2–5 倍。
  2. 重置 Bandit 后验(中等成本):将受影响臂的 $\alpha, \beta$ 减半——保留先验形状,但加倍不确定性。
  3. 强制回滚 checkpoint(高成本):回退至上一个已知良好快照,从 Kafka 重放事件,在漂移后窗口重新训练。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class DriftAdaptiveLearner:
    def __init__(self, base_learner, base_lr=0.1, window=200, z_thresh=3.0):
        self.learner = base_learner
        self.base_lr = base_lr
        self.window = window
        self.z_thresh = z_thresh
        self.recent = []        # 滚动误差
        self.ref_mean = None
        self.ref_std = None

    def update(self, x, y):
        pred = self.learner.predict(x)
        err = abs(pred - y)
        self.recent.append(err)
        if len(self.recent) > self.window:
            self.recent.pop(0)

        # 第一个窗口稳定后建立参考
        if self.ref_mean is None and len(self.recent) == self.window:
            self.ref_mean = np.mean(self.recent)
            self.ref_std = np.std(self.recent) + 1e-6

        # 检测漂移
        if self.ref_mean is not None:
            z = (np.mean(self.recent[-50:]) - self.ref_mean) / self.ref_std
            if abs(z) > self.z_thresh:
                self.learner.learning_rate = self.base_lr * 4   # 提高学习率
                # 处理后重置参考,避免长期处于过激状态
                self.recent = []
                self.ref_mean = None

        self.learner.update(x, y)

缓存与计算 —— 真正需要调优的权衡#

缓存 vs 计算:左图是延迟、新鲜度、成本随缓存 TTL 的变化;右图是各策略的 Pareto 前沿,混合策略胜出

读路径的每个决策都落在缓存与计算的权衡曲线上:

  • 总是重算:结果最准,但代价高昂——每次请求都实时计算所有特征。
  • 激进缓存:速度快、成本低,但数据陈旧——用户意图刚变,拿到的仍是几分钟前的特征。
  • 混合模式:生产系统普遍采用此方案:
    • 热路径(头部用户 / 活跃会话 / 新物品):实时计算,不缓存。
    • 冷路径(其余流量):从缓存返回,TTL 设为 30–60 秒。
    • 负缓存:显式缓存“无变化”响应,避免对静默用户重复计算。

图 7 右侧说明原因:60 秒 TTL 能捕获大部分延迟优化收益,新鲜度损失却极小;纯重算耗费 5 倍计算资源,仅提升约 0.005 AUC。“总是重算”虽在 Pareto 前沿,但极少值得投入。


拼到一起 —— 一个最小但接近真实的系统#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class RealtimeRanker:
    """端到端实时排序器。整合:
      - 特征存储(Flink 提供的新鲜特征)
      - FTRL-Proximal 在线学习器(按事件更新权重)
      - LinUCB 探索层(基于学习器得分)
      - 漂移检测器(在分布变化时提升学习率)
      - 快照回滚机制(保障生产安全)
    """

    def __init__(self, n_features: int, n_strategies: int = 4):
        self.scorer = FTRLProximal(n_features=n_features, alpha=0.1,
                                    beta=1.0, l1=1.0, l2=1.0)
        # Bandit 选择的是排序策略,不是物品
        self.bandit = LinUCB(n_arms=n_strategies, n_features=n_features, alpha=0.5)
        self.drift  = DriftDetector(z_thresh=3.0)
        self.snapshots = []  # 环形缓冲区,存 (时间戳, 学习器状态)

    def rank(self, ctx: np.ndarray, candidates: list[dict]) -> list[str]:
        # 1. Bandit 选择策略
        strategy = self.bandit.select(ctx)
        # 2. FTRL 给每个候选打分——稀疏向量
        scored = [
            (c["id"], self.scorer.predict_proba(c["idx"], c["val"]), c)
            for c in candidates
        ]
        # 3. 策略调整得分(比如新品助推、创作者助推)
        scored = apply_strategy(strategy, scored)
        scored.sort(key=lambda r: -r[1])
        return [r[0] for r in scored[:20]]

    def observe(self, ctx, x_idx, x_val, y, strategy):
        """一次反馈循环。"""
        # 更新在线学习器
        p = self.scorer.update(x_idx, x_val, y)
        # 更新 Bandit,针对实际使用的策略
        self.bandit.update(strategy, ctx, reward=float(y))
        # 将校准误差喂给漂移检测器
        if self.drift.update(abs(p - y)):
            self.scorer.alpha *= 4  # 提升学习率
        # 周期性保存快照
        if len(self.snapshots) == 0 or time.time() - self.snapshots[-1][0] > 600:
            self.snapshots.append((time.time(), copy.deepcopy(self.scorer)))
            self.snapshots = self.snapshots[-12:]  # 保留最近 2 小时

几个看似琐碎却至关重要的生产细节:

  • Bandit 作用于策略而非物品。“从 1 亿物品中选一”动作空间过大,任何上下文 Bandit 都难以收敛;但“从 4 个排序策略中选一”恰是其强项。
  • 快照保存的是在线学习器状态,而非预测结果。若坏数据污染权重,可回滚状态并从 Kafka 重放干净事件流恢复。
  • DriftDetector 监控校准误差 |p - y|,而非原始 CTR。校准漂移能捕捉比单纯点击率漂移更多的失效模式。

Q & A#

Q:如何对在线学习系统进行 A/B 测试?
标准 A/B 测试假设观测独立,但在线学习会从实验组持续学习。解决方案:(1) 使用 interleaved comparisons(Chapelle et al., 2012);(2) 一组用“冻结模型”,另一组用实时在线模型,比较累积奖励而非单次 CTR。

Q:Flink 和 Spark Streaming 的本质区别是什么?
Flink 逐事件处理,延迟毫秒级;Spark Streaming 微批处理,延迟秒级。对推荐系统而言,Flink 的低延迟和更成熟的 exactly-once 状态管理都至关重要——几乎所有近年大规模 Feed 系统都选用 Flink(或 Twitter Heron、字节 Aiops 等内部方案)。

Q:在线学习是否不稳定?
确实可能,而这正是需工程防护的失效模式。三大措施:(1) 限制学习率;(2) 梯度裁剪;(3) 保留快照与回滚路径。Google 的 FTRL 论文专设一节讲“tricks of the trade”——饱和保护、按坐标的学习率下限、校准损失监控——全为保障生产稳定性。

Q:Bandit 如何处理延迟反馈?
两种模式:(1) 批量更新:收集 $k$ 条观测后统一更新,理论证明最多使 regret 增加 $\log k$ 倍;(2) 乐观计数:拉臂时立即增加 $n_i$ 并假设奖励为均值(或 0),待真实奖励到达后再修正。生产系统多用后者,避免在延迟窗口内过度采样单一臂。

Q:何时不该用实时管道?
当特征变化缓慢(如人口属性、长期口味 Embedding)、陈旧推荐成本低(如仓库搜索、分类浏览),或收益难衡量(如低流量场景中 1 小时→1 秒的新鲜度提升淹没在噪声中)。实时管道成本约为日批的 10–100 倍——务必确保收益足够再投入。

总结#

  • 实时推荐系统是 两条管道通过特征存储和模型注册中心拼接而成:异步写路径持续更新状态,同步读路径在严格 SLO 下提供服务。
  • 延迟是长尾问题:优化 p99 而非 p50——排序模块占主导,蒸馏与量化在此处回报最高。
  • 生产在线学习 = FTRL-Proximal:这是 Google 广告 CTR 预测所用算法,SGD/AdaGrad 仅为铺垫。
  • 流处理 = Kafka + Flink:Kafka 负责可靠传输,Flink 借助分布式快照实现有状态计算与 exactly-once 语义。
  • 新鲜度有成本曲线:行为特征值得实时投入,人口属性与物品元数据则不值得。
  • Bandit 位于排序器之上:其动作空间是“选择哪种策略”,而非“推荐哪个物品”。
  • 漂移不可避免:检测它(z-score、ADWIN、DDM),低成本应对(调高 $\eta$ ),并保留回滚路径。
  • 缓存与计算非二元选择:生产答案是混合模式——热路径实时计算,冷路径缓存结果,30–60 秒 TTL 以极低成本覆盖 95% 流量。

我在所有见过的大规模系统中都验证过一条经验法则:只把能提升 AUC 的部分做成实时,其余按实际变化频率分配预算

本系列

推荐系统 16 篇

  1. 01 推荐系统(一)—— 入门与基础概念
  2. 02 推荐系统(二)—— 协同过滤与矩阵分解
  3. 03 推荐系统(三)—— 深度学习基础模型
  4. 04 推荐系统(四)—— CTR 预估与点击率建模
  5. 05 推荐系统(五)—— Embedding 表示学习
  6. 06 推荐系统(六)—— 序列推荐与会话建模
  7. 07 推荐系统(七)—— 图神经网络与社交推荐
  8. 08 推荐系统(八)—— 知识图谱增强推荐系统
  9. 09 推荐系统(九)—— 多任务学习与多目标优化
  10. 10 推荐系统(十)—— 深度兴趣网络与注意力机制
  11. 11 推荐系统(十一)—— 对比学习与自监督学习
  12. 12 推荐系统(十二)—— 大语言模型与推荐系统
  13. 13 推荐系统(十三)—— 公平性、去偏与可解释性
  14. 14 推荐系统(十四)—— 跨域推荐与冷启动解决方案
  15. 15 推荐系统(十五)—— 实时推荐与在线学习 当前
  16. 16 推荐系统(十六)—— 工业级架构与最佳实践

读有所得?

GitHub 关注我 → 新文周更

GitHub