系列 · 推荐系统 · 第 16 篇

推荐系统(十六)—— 工业级架构与最佳实践

工业级推荐系统全景:数据/训练/服务三大平面,召回-粗排-精排-重排序四级漏斗,多路召回与融合,Wide&Deep/DeepFM/DIN 排序模型,特征仓库消除训练-服务偏差,A/B 测试方法论,金丝雀发布与回滚,团队角色分工。结合 YouTube、TikTok、淘宝、字节跳动等真实架构。

工业级推荐系统最难的部分不是模型,而是围绕模型的系统——防止训练和线上服务偏差的特征存储、在问题影响上亿用户前拦截回归的金丝雀发布、在 100 毫秒 p95 延迟预算内串联运行四个机器学习模型的编排机制。作为本系列的收官之作,本文将深入剖析各大科技公司最终采用的架构,并揭示每一层设计背后的权衡取舍。

推荐系统(十六)—— 工业级架构与最佳实践 — 章节概览图


你将学到什么#

  • 多阶段流水线——召回、粗排、精排和重排,每个阶段的设计都受特定约束影响
  • 多路召回——协同过滤、双塔深度学习模型、图遍历及实时行为信号的结合与优化
  • 生产环境中的排序模型——Wide & Deep、DeepFM 和 DIN 的实际代码实现
  • 重排策略——多样性(MMR)、业务规则调整、新鲜度提升
  • 特征存储——离线与在线架构分离,彻底解耦训练和服务
  • A/B 测试——一致性分配、比例 z 检验、实验运行时长选择
  • 性能优化——量化、蒸馏、预测缓存
  • 部署与监控——金丝雀发布、漂移检测、自动回滚
  • 团队职责划分——召回、排序、特征存储和服务的职责划分

前置知识#

  • 系列前面的所有章节(尤其是第 7、11、15 篇)
  • 分布式系统的基础知识(负载均衡、消息队列)
  • 熟练掌握 Python、PyTorch 和 REST API

工业推荐系统全景#

架构总览#

工业级推荐系统三平面架构:数据平面、训练平面、服务平面,以及召回-粗排-精排-重排序漏斗

无论是 Google、Amazon,还是阿里、字节跳动,所有大型推荐系统最终都收敛到同一套三平面架构

  1. 数据平面:从日志和内容生成样本和特征,主要依赖 Hive、Spark、Flink 和 Kafka。
  2. 训练平面:将样本转化为模型,离线验证后写入模型注册中心。
  3. 服务平面:用户实际等待的实时漏斗,也是唯一有严格延迟预算的部分。

服务平面本身是一个逐步缩小候选集、同时提升打分精度的漏斗

1
用户请求 → 召回 (10⁶ → 2K) → 粗排 (2K → 200) → 精排 (200 → 50) → 重排 (50 → 20) → 返回
阶段输入 → 输出模型类别延迟预算
召回10⁶ → ~2,000双塔 DNN、ANN、简单 CF20-30 ms
粗排~2,000 → ~200浅层 DNN 或 XGBoost10-20 ms
精排~200 → ~50Wide & Deep、DeepFM、DIN30-50 ms
重排~50 → ~20规则 + 轻量 ML10-20 ms
总计< 100 ms p95

可以将其类比为招聘流程:召回是简历筛选(快速撒网)、粗排是电话面试(初步筛选)、精排是现场面试(深度评估)、重排是录用委员会(最后调整团队搭配和多样性)。

为什么选择漏斗而不是一个大模型?#

如果用一个大模型对所有候选打分,单次请求可能需要几秒钟。漏斗设计能带来数量级的速度提升,因为每个阶段都使用了与候选规模匹配的模型——候选多时用轻量模型,候选少时用复杂模型。召回阶段每个候选大约花费 5 微秒,精排阶段则需要约 250 微秒,以满足延迟预算。

关键设计原则#

无状态服务:所有服务必须支持水平扩展,状态(如用户嵌入、最近行为)存储在 Redis、KV 存储或特征仓库中,绝不能放在进程内存里。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
class RankingService:
    """无状态排序服务——可自由复制副本。"""

    def __init__(self, model_path: str):
        self.model = load_model(model_path)
        self.feature_extractor = FeatureExtractor()

    def rank(self, user_id, candidates, context):
        features = self.feature_extractor.extract(user_id, candidates, context)
        scores = self.model.predict(features)
        return sorted(zip(candidates, scores), key=lambda x: x[1], reverse=True)

优雅降级:每个组件都必须有备用方案——如果深度召回超时,系统会回退到协同过滤;如果所有召回都失败,则返回热门内容,确保用户不会看到空白页面。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
class FaultTolerantRecall:
    def __init__(self, channels):
        self.channels = channels
        self.fallback = PopularItemsRecall()

    def recall(self, user_id, context):
        results = []
        for channel in self.channels:
            try:
                results.extend(channel.recall(user_id, context, timeout=20))
            except Exception as exc:
                logger.warning("channel %s failed: %s", channel.name, exc)

        if not results:
            return self.fallback.recall(user_id, context)
        return deduplicate(results)

强制延迟预算:每次调用都有严格的超时限制,由编排器强制执行,某个慢召回不会拖累整个流水线,直接丢弃其结果即可。

漏斗细节#

召回、粗排、精排、重排序漏斗,标注了候选数量与每阶段延迟预算

上图清楚地展示了每个阶段的数量级缩减。我总结了两条关键规则:

  • 召回决定了质量的天花板。如果优质物品在召回阶段就被漏掉,后续的精排再强也无济于事。这就是为什么生产环境通常会并行运行多路召回。
  • 阶段越窄,模型可以越复杂。比如在 200 个候选上做精排,完全可以用一个 1 亿参数的 DIN 模型;但在百万量级的召回阶段,连两次 embedding 查表加一次内积都得精打细算。

多路召回#

推荐系统(十六)—— 工业级架构与最佳实践 — 章节小结图

单一召回策略总会漏掉一些东西。协同过滤漏掉冷启动物品,内容召回漏掉惊喜发现,实时信号漏掉用户的长期兴趣。所以工业界普遍会并行运行 3-5 条召回通道,最后再融合结果。

通道一:双塔深度召回#

双塔架构是现代召回的核心主力。用户塔在请求时实时计算,物品塔离线计算,所有物品嵌入向量加载到 ANN 索引(Faiss 或 HNSW)。线上每次召回就是一次 ANN 查询,耗时 5-10 毫秒。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import torch
import torch.nn as nn

class TwoTowerRecall(nn.Module):
    """双塔模型,物品向量离线索引。"""

    def __init__(self, user_dim: int, item_dim: int, hidden=(256, 128)):
        super().__init__()
        self.user_tower = self._tower(user_dim, hidden)
        self.item_tower = self._tower(item_dim, hidden)

    @staticmethod
    def _tower(in_dim, hidden):
        layers, prev = [], in_dim
        for h in hidden:
            layers += [nn.Linear(prev, h), nn.ReLU(), nn.BatchNorm1d(h)]
            prev = h
        return nn.Sequential(*layers)

    def recall(self, user_features, ann_index, top_k=1000):
        with torch.no_grad():
            user_emb = self.user_tower(user_features)
        # ann_index 是离线物品向量构建的 Faiss IVF-PQ 索引
        _, top_ids = ann_index.search(user_emb.cpu().numpy(), top_k)
        return top_ids

这里有两个工程细节需要注意。第一,损失函数很关键:带 logQ 修正 的 batch 内 sampled softmax 已经成为标配(YouTube 团队 RecSys 2019 那篇),目的是抵消热门物品在 batch 中过采样带来的热度偏差。第二,物品索引需要定期重建,因为物品嵌入会漂移:高频更新的品类每小时重建一次,其他场景每天重建一次。

通道二:图召回#

图召回通过多跳遍历找物品:用户 A 喜欢物品 X 和 Y,用户 B 喜欢 Y 和 Z,那么 Z 就是 A 的候选。这种方式能捕捉纯嵌入相似度漏掉的关联。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from collections import defaultdict

class GraphRecall:
    """基于共同用户的物品 Jaccard 相似度做物品到物品的召回。"""

    def __init__(self, interaction_graph):
        self.graph = interaction_graph
        self.item_similarity = self._compute_similarity()

    def _compute_similarity(self):
        sim = defaultdict(dict)
        items = [n for n in self.graph.nodes() if self.graph.nodes[n]["type"] == "item"]
        for a in items:
            users_a = set(self.graph.neighbors(a))
            for b in items:
                if a == b:
                    continue
                users_b = set(self.graph.neighbors(b))
                inter, union = len(users_a & users_b), len(users_a | users_b)
                if union:
                    sim[a][b] = inter / union
        return sim

    def recall(self, user_id, top_k=1000):
        seen = [n for n in self.graph.neighbors(user_id)
                if self.graph.nodes[n]["type"] == "item"]
        scores = defaultdict(float)
        for item in seen:
            for neighbour, w in self.item_similarity.get(item, {}).items():
                scores[neighbour] += w
        return [i for i, _ in sorted(scores.items(), key=lambda x: -x[1])[:top_k]]

通道三:实时行为召回#

这条通道专门捕捉用户此刻的行为。如果用户刚刚连点了同品类的三个物品,下一刷就应该立刻反映出来,而不是等到第二天的离线训练。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from collections import defaultdict, deque
from datetime import datetime, timedelta

import numpy as np

class RealTimeBehaviorRecall:
    """基于最近 30 分钟行为的召回,时间衰减 + 行为加权。"""

    def __init__(self, window_minutes=30):
        self.window = timedelta(minutes=window_minutes)
        self.behaviours = defaultdict(deque)

    def add_behavior(self, user_id, item_id, action_type, ts):
        self.behaviours[user_id].append({"item": item_id, "action": action_type, "ts": ts})
        cutoff = ts - self.window
        while self.behaviours[user_id] and self.behaviours[user_id][0]["ts"] < cutoff:
            self.behaviours[user_id].popleft()

    def recall(self, user_id, top_k=500):
        weights = {"view": 1.0, "click": 2.0, "purchase": 5.0}
        scores = defaultdict(float)
        now = datetime.now()
        for b in self.behaviours.get(user_id, []):
            age_min = (now - b["ts"]).total_seconds() / 60
            recency = np.exp(-age_min / 10)
            scores[b["item"]] += recency * weights.get(b["action"], 1.0)
        return [i for i, _ in sorted(scores.items(), key=lambda x: -x[1])[:top_k]]

多路融合#

每条通道返回一个有序列表。融合时用基于排名的融合,而不是原始分数,因为不同通道的分数没有可比性。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed

class MultiChannelRecall:
    def __init__(self, channels, weights=None):
        self.channels = channels
        self.weights = weights or {c.name: 1.0 for c in channels}

    def recall(self, user_id, context, target=2000):
        results = {}
        with ThreadPoolExecutor(max_workers=len(self.channels)) as pool:
            futures = {pool.submit(c.recall, user_id, context): c.name for c in self.channels}
            for fut in as_completed(futures):
                name = futures[fut]
                try:
                    results[name] = fut.result(timeout=25)
                except Exception as exc:
                    logger.error("channel %s failed: %s", name, exc)

        scores = defaultdict(float)
        for name, items in results.items():
            w = self.weights.get(name, 1.0)
            for rank, item in enumerate(items):
                scores[item] += w / (rank + 1)  # 倒数排名融合
        return [i for i, _ in sorted(scores.items(), key=lambda x: -x[1])[:target]]

这就是搜索引擎里熟知的倒数排名融合(RRF),对量纲差异非常鲁棒。每条通道 25 毫秒的超时是硬约束:慢的通道直接丢弃,绝不阻塞。

排序:粗排与精排#

粗排#

粗排的任务是从几千个候选中筛选出几百个,模型必须轻量高效。目标是快速剔除明显不合适的候选,而不是追求完美排序。两种方案最常用:

  • 浅层双塔模型,物品侧离线计算(类似召回,但特征更丰富)。
  • XGBoost 排序模型,基于简单特征(热度、CTR、用户/物品基础统计)。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
import xgboost as xgb

class CoarseRanker:
    def __init__(self):
        self.model = xgb.XGBRanker(
            objective="rank:pairwise",
            tree_method="hist",
            max_depth=4,
            n_estimators=50,
        )

    def fit(self, X, y, group):
        self.model.fit(X, y, group=group)

    def predict(self, X):
        return self.model.predict(X)

一个常见误区是把粗排做得太强。如果粗排选出的 top-200 和精排结果几乎一致,那精排就失去了意义。我的经验是,粗排的 recall@200 相对于精排保持在 0.7 左右——既能过滤掉垃圾候选,又给精排留出足够的区分空间。

精排:Wide & Deep、DeepFM、DIN#

精排在缩小后的候选集上运行复杂模型。生产环境中 CTR 预估主要依赖三种架构。

Wide & Deep(Google 2016)结合了记忆能力(线性模型 + 交叉特征)和泛化能力(深度 MLP + 嵌入):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
class WideDeepRanking(nn.Module):
    """Google Wide & Deep:线性记忆 + 深度泛化。"""

    def __init__(self, wide_dim, embed_dims, deep_hidden):
        super().__init__()
        self.wide = nn.Linear(wide_dim, 1)
        self.embeddings = nn.ModuleDict(
            {name: nn.Embedding(vocab, dim) for name, (vocab, dim) in embed_dims.items()}
        )
        deep_in = sum(dim for _, dim in embed_dims.values())
        layers, prev = [], deep_in
        for h in deep_hidden:
            layers += [nn.Linear(prev, h), nn.ReLU(), nn.BatchNorm1d(h), nn.Dropout(0.2)]
            prev = h
        layers.append(nn.Linear(prev, 1))
        self.deep = nn.Sequential(*layers)

    def forward(self, wide_feats, sparse_ids, dense_feats):
        wide_out = self.wide(wide_feats)
        emb = [self.embeddings[name](ids) for name, ids in sparse_ids.items()]
        deep_out = self.deep(torch.cat(emb + [dense_feats], dim=1))
        return wide_out + deep_out

DeepFM(华为 2017)用因子分解机自动学习两阶特征交互,省去了人工设计交叉特征的工作。如果你不想花时间调交叉特征,这是个不错的默认选择。

DIN(Deep Interest Network,阿里 2018) 在用户行为序列上引入注意力机制。它不再简单平均历史行为的嵌入,而是针对每个候选动态关注与其相似的历史行为

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class DIN(nn.Module):
    """Deep Interest Network:在用户行为序列上做注意力。"""

    def __init__(self, item_dim, user_dim, hidden=(128, 64)):
        super().__init__()
        self.attention = nn.Sequential(
            nn.Linear(item_dim * 4, 36), nn.ReLU(), nn.Linear(36, 1)
        )
        in_dim = item_dim + user_dim + item_dim
        layers, prev = [], in_dim
        for h in hidden:
            layers += [nn.Linear(prev, h), nn.ReLU(), nn.Dropout(0.2)]
            prev = h
        layers.append(nn.Linear(prev, 1))
        self.mlp = nn.Sequential(*layers)

    def forward(self, candidate, history, user_profile):
        cand_exp = candidate.unsqueeze(1).expand_as(history)
        attn_in = torch.cat(
            [history, cand_exp, history - cand_exp, history * cand_exp], dim=2
        )
        weights = torch.softmax(self.attention(attn_in).squeeze(-1), dim=1)
        weighted = (history * weights.unsqueeze(-1)).sum(dim=1)
        return self.mlp(torch.cat([candidate, weighted, user_profile], dim=1))

注意力机制的作用很关键:一个买过 50 本书、涉及 5 个品类的用户,并没有一个统一的“平均兴趣”——他的兴趣是按品类划分的。DIN 的作用就是通过注意力机制,针对每个候选解锁对应的兴趣分布。


重排序#

重排序是业务逻辑与算法输出交汇的地方。在几乎所有的生产系统中,都会出现三种常见的模式。

多样性(MMR)#

单纯优化点击率(CTR)会导致推荐列表高度同质化——用户点了一个就流失了。最大边际相关性(MMR, Maximal Marginal Relevance) 通过贪心策略选择那些既相关又与已选内容保持多样性的物品:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import numpy as np

class DiversityReranker:
    def __init__(self, lambda_div=0.3):
        self.lam = lambda_div

    def rerank(self, items, scores, features, top_k=20):
        selected, remaining = [], list(zip(items, scores, features))
        while len(selected) < top_k and remaining:
            best_idx, best_obj = None, -np.inf
            for idx, (it, sc, ft) in enumerate(remaining):
                if selected:
                    diversity = min(self._dist(ft, s_ft) for _, _, s_ft in selected)
                else:
                    diversity = 1.0
                obj = (1 - self.lam) * sc + self.lam * diversity
                if obj > best_obj:
                    best_obj, best_idx = obj, idx
            selected.append(remaining.pop(best_idx))
        return [it for it, _, _ in selected]

    @staticmethod
    def _dist(a, b):
        va, vb = np.array(list(a.values())), np.array(list(b.values()))
        return 1 - va @ vb / (np.linalg.norm(va) * np.linalg.norm(vb) + 1e-8)

多样性权重(通常取值 0.2 到 0.3)本身是一个 A/B 测试参数。如果权重太低,推荐流会显得单调;如果太高,点击率(CTR)会下降,因为牺牲了相关性。

业务规则#

硬约束应该放在这里,而不是塞进机器学习模型里。比如库存过滤、合规检查、运营加权——这些都是确定性规则,用代码实现比用特征表达更容易理解和维护。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class BusinessRulesReranker:
    def __init__(self, rules):
        self.rules = rules

    def rerank(self, items, scores, metadata):
        out = []
        for item, score in zip(items, scores):
            meta, adj, ok = metadata.get(item, {}), score, True
            for rule in self.rules:
                if not rule.check(item, meta):
                    ok = False
                    break
                adj += rule.score_adjustment(item, meta)
            if ok:
                out.append((item, adj))
        out.sort(key=lambda x: -x[1])
        return [i for i, _ in out]

新鲜度加权#

在新闻、视频和短内容场景中,新鲜度本身就是一个重要特征。指数衰减给最近的内容一个有界的提升,既不会被旧内容完全压制,也不会让新内容彻底盖过老内容:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
from datetime import datetime
import numpy as np

class FreshnessReranker:
    def __init__(self, decay_hours=24, max_boost=0.3):
        self.decay, self.max_boost = decay_hours, max_boost

    def rerank(self, items, scores, timestamps):
        now, out = datetime.now(), []
        for item, score in zip(items, scores):
            ts = timestamps.get(item)
            if ts:
                age_h = (now - ts).total_seconds() / 3600
                boost = self.max_boost * np.exp(-age_h / self.decay)
                out.append((item, score * (1 + boost)))
            else:
                out.append((item, score))
        out.sort(key=lambda x: -x[1])
        return [i for i, _ in out]

特征仓库#

特征仓库架构:离线批处理路径与在线实时路径共享同一份特征定义

在推荐系统中,特征仓库是最关键的基础设施,但往往也是最后才被搭建的部分。它的核心任务是消除训练和线上服务之间的偏差,确保离线训练时计算的特征定义与线上服务时完全一致。

架构设计上,特征仓库通过两条路径共享同一套特征定义:

  • 离线路径:使用 Spark 或 Flink 在数据湖上运行任务,将特征物化为 Parquet 文件,供训练流水线使用。
  • 在线路径:Flink 消费 Kafka 事件流,将聚合后的特征写入 Redis,保证线上服务的 p99 延迟小于 5 毫秒。

两条路径执行的是同一套特征定义(通常是 SQL 或 YAML)。修改特征时,两条路径同步更新。如果没有这种约束,迟早会出现问题:训练时用的特征语义和线上服务时不一致,导致 AUC 静默下降,等到业务指标报警时已经晚了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import json

class FeatureStore:
    """基于 Redis 的在线特征仓库,支持批量读取。"""

    def __init__(self, redis_client, ttl=3600):
        self.redis, self.ttl = redis_client, ttl

    def set(self, entity, eid, name, value):
        self.redis.setex(f"{entity}:{eid}:{name}", self.ttl, json.dumps(value))

    def batch_get(self, entity, eids, names):
        keys = [f"{entity}:{e}:{n}" for e in eids for n in names]
        values = self.redis.mget(keys)
        n = len(names)
        return [
            [json.loads(v) if v else None for v in values[i:i + n]]
            for i in range(0, len(values), n)
        ]

值得了解的开源和商业方案包括:Feast(最流行的开源方案)、Tecton(商业化产品)以及阿里内部使用的 Feathr。这些工具都遵循“离线 + 在线双路径”的设计模式。

时间特征的循环编码#

一个看似不起眼、却非常重要的细节是时间特征的处理。23 点和 0 点在时间上是相邻的,但线性模型会认为它们相差 23 个单位。通过 (sin, cos) 编码,可以让模型正确识别它们的邻接关系:

1
2
3
4
5
6
7
8
9
def temporal_features(ts):
    dt = datetime.fromtimestamp(ts)
    return {
        "hour_sin": np.sin(2 * np.pi * dt.hour / 24),
        "hour_cos": np.cos(2 * np.pi * dt.hour / 24),
        "dow_sin":  np.sin(2 * np.pi * dt.weekday() / 7),
        "dow_cos":  np.cos(2 * np.pi * dt.weekday() / 7),
        "is_weekend": int(dt.weekday() >= 5),
    }

A/B 测试框架#

A/B 测试结果:14 天对照组与实验组 CTR 趋势、置信带、显著性标记,以及各指标提升分解

A/B 测试是发现“离线涨 3% 的模型,上线后实际跌 1.5%”的唯一方法——这种情况比大家愿意承认的更常见。做好 A/B 测试,必须关注三点:

  • 一致性哈希分流:确保同一个用户始终看到同一个变体。频繁切换变体会毁掉用户体验和统计有效性。
  • 预先定义的指标:包括护栏指标(延迟、错误率、营收)。即使主指标涨了,护栏指标不达标也不能上线。
  • 提前做功效分析:在实验开始前明确需要多少样本量。看到“看起来不错”的中间结果就提前停止,假阳性率会大幅上升。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import numpy as np
from collections import defaultdict
from scipy.stats import norm

class ABTestFramework:
    """生产级 A/B 框架:一致性分流 + z 检验。"""

    def __init__(self):
        self.experiments, self.events = {}, defaultdict(list)

    def create(self, exp_id, variants_split):  # 例如 {"control": 50, "v1": 50}
        self.experiments[exp_id] = variants_split

    def assign(self, user_id, exp_id):
        h = hash(f"{user_id}:{exp_id}") % 100
        cum = 0
        for variant, split in self.experiments[exp_id].items():
            cum += split
            if h < cum:
                return variant
        return "control"

    def log(self, user_id, exp_id, variant, event):
        self.events[exp_id].append({"user": user_id, "variant": variant, "event": event})

    def analyse(self, exp_id):
        stats = defaultdict(lambda: {"impr": 0, "click": 0})
        for ev in self.events[exp_id]:
            if ev["event"] == "impression":
                stats[ev["variant"]]["impr"] += 1
            elif ev["event"] == "click":
                stats[ev["variant"]]["click"] += 1

        out, ctrl = {}, stats.get("control")
        for v, s in stats.items():
            ctr = s["click"] / max(s["impr"], 1)
            out[v] = {"ctr": ctr, **s}

        if ctrl:
            for v, s in out.items():
                if v == "control":
                    continue
                p_pool = (ctrl["click"] + s["click"]) / (ctrl["impr"] + s["impr"])
                se = np.sqrt(p_pool * (1 - p_pool) * (1 / ctrl["impr"] + 1 / s["impr"]))
                z = (s["ctr"] - out["control"]["ctr"]) / (se + 1e-8)
                s["lift"] = (s["ctr"] - out["control"]["ctr"]) / (out["control"]["ctr"] + 1e-8)
                s["p_value"] = 2 * (1 - norm.cdf(abs(z)))
                s["significant"] = s["p_value"] < 0.05
        return out

A/B 实验要跑多久? 至少满足三个条件:(1) 覆盖一个完整的星期周期,通常是两周;(2) 达到功效分析计算出的样本量;(3) 超过新奇效应的影响,用户对新内容天然有点击冲动。两周是最常见的答案,少于一周的实验结果通常不可信。

常见坑点:变体间的网络效应(实验组用户通过共享内容影响对照组);SUTVA 假设被破坏;不同用户群体的异质处理效应;累积效应(实验组提升长期留存但短期 CTR 下降)。解决这些问题的关键是分层实验平台——这也是 Google、Meta 和字节跳动都自建实验系统的原因。

持续训练#

持续训练流水线:四种重训练触发条件、四阶段部署、监控反馈闭环

模型会衰减。用户行为在变,物品池在更新,季节性也在切换。上个月还是 SOTA 的模型,下个月如果不重新训练,就会拖后腿。训练流水线必须自动化运行,触发条件包括:

  • 定时调度——精排模型每天一次,增量更新每小时一次,高波动特征近实时更新。
  • 漂移检测——重要特征的 PSI(Population Stability Index)超过 0.2,直接触发重训,不用等定时任务。
  • 指标衰减——离线 AUC 相比上一个 checkpoint 下降超过 2%。
  • 代码变更——新增特征定义或调整模型结构。

训练的输出不是直接上线的模型,而是模型注册中心里的一个带元数据的 artifact,包含版本号、训练数据时间窗口、离线指标和血缘信息。部署是独立的,且有严格的门控流程。

部署:影子 → 金丝雀 → A/B → 全量#

新模型从注册中心到 100% 流量,不可能一步到位。标准流程是阶梯式推进:

  1. 影子流量——模型并行运行,但不接入真实流量,预测结果只记录日志。这种方式能提前发现延迟问题、schema 不匹配和服务 bug,完全不影响用户体验。
  2. 金丝雀——接入 1%-10% 的流量,持续 1-24 小时。如果护栏指标异常,自动回滚。
  3. A/B 测试——分配 50% 流量,持续 1-2 周,进行统计验证。
  4. 全量——最终覆盖 100% 流量。

自动回滚是硬性要求,没有商量余地。判断标准很简单:p95 延迟超出 SLO,错误率高于 1%,或者 CTR 下降超过 5%,只要满足任意一条,就立即回滚并通知人工介入。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class CanaryDeployment:
    def deploy(self, version, initial_pct=10):
        if not self.validate_offline(version):
            raise ValueError("offline validation failed")
        canary = self.deploy_canary(version, traffic=initial_pct)
        metrics = self.monitor(canary, minutes=60)
        if self.healthy(metrics):
            self.rollout(canary, target=100)
        else:
            self.rollback(canary)
            self.alert("canary failed", metrics)

    @staticmethod
    def healthy(m):
        return (m["p95_latency"] < 100
                and m["error_rate"] < 0.01
                and m.get("ctr_delta", 0) > -0.05)

服务基础设施#

服务基础设施:API 网关、负载均衡、推荐编排器,下游召回/排序/特征/缓存服务

服务栈分为四层,全部无状态且支持水平扩展:

  • API 网关 + 负载均衡(Nginx、Envoy 或云 LB):负责 TLS、鉴权、限流和路由。
  • 推荐编排器:一个无状态服务,串联整个漏斗逻辑。依次调用召回、排序和重排模块,并合并结果。
  • 后端服务:召回服务基于 Faiss/HNSW;排序服务运行在 GPU 上,使用 TensorFlow Serving 或 Triton;特征服务由 Redis(热数据)和 HBase(冷数据)支撑。
  • 多级缓存:包括特征缓存、嵌入缓存和完整预测缓存。预测缓存的命中率通常在 30%-50%,计算成本也相应降低。

性能优化#

三种技术叠加,可以实现 5-10 倍加速,同时几乎不损失质量:

量化(FP32 → INT8):在 CPU 上提速 2-4 倍,GPU 上也有小幅提升。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import torch.quantization as quant

def quantize(model, calibration_loader):
    model.eval()
    model.qconfig = quant.get_default_qconfig("fbgemm")
    quant.prepare(model, inplace=True)
    with torch.no_grad():
        for batch in calibration_loader:
            model(batch)
    return quant.convert(model, inplace=False)

知识蒸馏:训练一个小模型(学生)模仿大模型(老师)。学生不仅学习硬标签,还学习老师的软概率分布,后者包含物品相对质量的信息。

1
2
3
4
5
6
7
8
import torch.nn.functional as F

def distill_loss(student_logits, teacher_logits, labels, T=3.0, alpha=0.7):
    s_soft = F.log_softmax(student_logits / T, dim=1)
    t_soft = F.softmax(teacher_logits / T, dim=1)
    soft = F.kl_div(s_soft, t_soft, reduction="batchmean") * (T ** 2)
    hard = F.cross_entropy(student_logits, labels)
    return alpha * soft + (1 - alpha) * hard

预测缓存:针对长尾请求中的重复部分。默认 TTL 设置为 5 分钟——足够摊薄计算成本,又能及时反映用户行为变化。

标准流程是:先蒸馏,再剪枝,最后量化。按这个顺序操作,每一步都能保留前一步的质量收益。

监控#

三类核心指标,全部带告警功能:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
class RecommendationMonitor:
    def __init__(self, metrics):
        self.metrics = metrics

    def log(self, user_id, scores, latency_ms):
        self.metrics.histogram("pred.latency", latency_ms)
        self.metrics.histogram("pred.score_mean", float(np.mean(scores)))

    def check(self):
        recent = self.metrics.recent("pred.latency", minutes=5)
        if recent["p95"] > 150:
            self.alert("high_latency", recent)
        if self.metrics.error_rate(minutes=5) > 0.05:
            self.alert("high_error_rate")
        scores = self.metrics.recent("pred.score_mean", minutes=30)
        baseline = self.metrics.baseline("pred.score_mean")
        if abs(scores["mean"] - baseline["mean"]) > 2 * baseline["std"]:
            self.alert("distribution_shift", scores)

最有价值但也最容易被忽视的告警是第三条——预测分布漂移。如果平均预测 CTR 突然偏离基线两个标准差,说明上游一定有问题:可能是某个特征损坏、某个嵌入索引过期,或者某个模型发错了版本。等到业务指标波动时再发现,已经损失了一个小时;而分布监控可以在几分钟内发现问题。


团队分工#

生产推荐团队的角色分工,以及各阶段的主要负责方

生产级推荐系统太大,一个人搞不定;耦合又太深,完全独立的小团队也搞不定。实践中,角色边界是这样划分的:

  • 算法工程师:写模型代码,设计特征,跑 A/B 实验。召回、排序、重排模型都归他们。
  • 数据工程师:负责 ETL 流水线、样本生成和特征仓库的离线部分。他们是数据质量的守门人。
  • MLOps / 平台工程师:管训练基础设施、模型注册中心、CI/CD 和服务运行时。他们让新模型上线从一个月缩短到一天。
  • SRE / 基础设施:盯延迟 SLO,做容量规划,处理故障。凌晨三点被叫起来的就是他们。
  • 分析师 / 研究:做长期评估、因果推断和排序诊断。他们能发现“指标好看但营收没动”的问题。
  • 产品:对业务 KPI 和内容政策负责。

图右侧的矩阵列出了每个阶段的主要负责方。规律很明显:每个阶段至少有两个负责方,因为每个阶段都有“模型质量”和“运维”两个维度需要关注。


工业级框架#

阿里 EasyRec(开源)
一个端到端的框架,集成了特征工程、预置模型(Wide & Deep、DeepFM、DIN、MMoE),支持在 PAI/MaxCompute 上训练,并通过 PAI-EAS 提供服务。如果用的是阿里云,这是最快搭建生产级基线的方法。

Meta TorchRec
TorchRec 是 Meta 内部推荐系统栈的开源实现。它最大的亮点是分片嵌入表的支持,这是推荐系统训练中分布式系统最难解决的问题之一。

字节跳动 Monolith(开源)
专为十亿参数规模的在线学习设计。核心是无碰撞嵌入哈希表和异步训练机制,能够从生产日志中近乎实时地更新模型。这套框架支撑了 TikTok 推荐系统的一部分。

YouTube 两阶段系统
经典论文是 2016 年 Covington 等人的工作——双塔深度召回模型加深度排序模型。后续改进中,2019 年的 sampled softmax 论文影响最大,但两阶段架构依然是大多数团队参考的模板。


常见问题#

召回路数应该有多少?#

从三条召回通道开始:协同过滤、双塔深度模型、实时行为。只有在离线分析发现现有通道漏掉了某些物品时,才加入专门的通道(比如图、内容、地理、社交)。超过十条通道后,大部分精力会花在维护管道上,而不是提升质量。

粗排到精排的合理收缩比是多少?#

通常 10:1(2,000 → 200 → 20)。端到端监控 recall@K:粗排太激进会丢掉精排本可以选出的好物品;粗排太宽松又浪费精排的计算资源。

精排模型该多复杂?#

先用 Wide & Deep 或 DeepFM。只有在测量发现现有模型没有充分利用序列信息时(比如用户历史丰富但预测分值平平),再加入 DIN 这种基于注意力的序列建模。每次增加复杂度,都要确保服务成本的提升能被指标收益抵消。

重排该用模型还是规则?#

混合使用。硬约束(合规、库存、黑名单)用确定性规则——方便审计。软优化(多样性、新鲜度、探索)用学习型重排。两者结合是常态。

量化、剪枝、蒸馏怎么选?#

量化性价比最高(CPU 上提速 2-4 倍)。如果需要的不只是更快的速度,而是更小的模型体积,那就用蒸馏。剪枝最脆弱——虽然有效,但需要小心重新训练。推荐顺序:蒸馏 → 剪枝 → 量化。

如何处理新用户?#

分级降级策略:(1) 对全新用户推荐热门和趋势内容;(2) 了解少量画像后,推荐基于人口统计的内容;(3) 在 3-10 次交互内用 bandit 风格的探索积累信号;(4) 交互达到约 50 次后切换到完整的个性化模型。详见第 14 篇关于元学习的部分。

什么时候下线一个模型?#

满足两个条件:(a) 后继模型在主指标上通过 A/B 测试胜出,没有护栏指标退化;(b) 新模型的运维成本可接受。无论如何,旧模型要保留 30 天可部署状态,以应对延迟回归。

总结#

这篇文章梳理了工业级推荐系统的完整技术栈:

  • 三个层面——数据、训练、服务,各层之间接口清晰明确
  • 四阶段漏斗——召回、粗排、精排、重排,将数亿用户请求控制在 100 毫秒内完成
  • 多路召回 + RRF 融合,因为单一召回通道无法覆盖所有质量维度
  • Wide & Deep、DeepFM、DIN 作为生产环境中的排序模型架构
  • 特征仓库通过统一的特征定义打通离线和在线路径,消除训练与服务偏差
  • A/B 测试采用一致性哈希分流、z 检验,并设置预注册的护栏指标
  • 持续训练由调度触发、漂移检测、指标衰减驱动
  • 金丝雀发布支持基于延迟、错误率、CTR 等指标自动回滚
  • 服务基础设施包括网关、编排器、GPU 模型服务器、Redis 特征存储、预测缓存
  • 团队职责明确划分并映射到流水线的各个阶段

工业实践中最有价值的经验其实很简单:从小做起,全面度量,靠 A/B 测试决定取舍。一个用热门物品、简单双塔召回、DeepFM 排序,并严格执行实验纪律的流水线,能轻松胜过没有 A/B 框架就直接上线的复杂 GNN 模型。这篇文章提到的各种框架本身并不是竞争优势的来源,真正的核心在于它们支撑起的迭代闭环。

参考文献#

  • Covington, P., Adams, J., Sargin, E. “Deep Neural Networks for YouTube Recommendations.” RecSys 2016. paper
  • Yi, X., et al. “Sampling-Bias-Corrected Neural Modeling for Large Corpus Item Recommendations.” RecSys 2019. (the “two-tower with logQ correction” paper)
  • Cheng, H., et al. “Wide & Deep Learning for Recommender Systems.” DLRS 2016. arXiv:1606.07792
  • Guo, H., et al. “DeepFM: A Factorization-Machine based Neural Network for CTR Prediction.” IJCAI 2017. arXiv:1703.04247
  • Zhou, G., et al. “Deep Interest Network for Click-Through Rate Prediction.” KDD 2018. arXiv:1706.06978
  • Liu, Z., et al. “Monolith: Real Time Recommendation System With Collisionless Embedding Table.” 2022. arXiv:2209.07663
  • Alibaba EasyRec: github.com/alibaba/EasyRec
  • TorchRec: github.com/pytorch/torchrec
  • Feast (open-source feature store): feast.dev
本系列

推荐系统 16 篇

  1. 01 推荐系统(一)—— 入门与基础概念
  2. 02 推荐系统(二)—— 协同过滤与矩阵分解
  3. 03 推荐系统(三)—— 深度学习基础模型
  4. 04 推荐系统(四)—— CTR 预估与点击率建模
  5. 05 推荐系统(五)—— Embedding 表示学习
  6. 06 推荐系统(六)—— 序列推荐与会话建模
  7. 07 推荐系统(七)—— 图神经网络与社交推荐
  8. 08 推荐系统(八)—— 知识图谱增强推荐系统
  9. 09 推荐系统(九)—— 多任务学习与多目标优化
  10. 10 推荐系统(十)—— 深度兴趣网络与注意力机制
  11. 11 推荐系统(十一)—— 对比学习与自监督学习
  12. 12 推荐系统(十二)—— 大语言模型与推荐系统
  13. 13 推荐系统(十三)—— 公平性、去偏与可解释性
  14. 14 推荐系统(十四)—— 跨域推荐与冷启动解决方案
  15. 15 推荐系统(十五)—— 实时推荐与在线学习
  16. 16 推荐系统(十六)—— 工业级架构与最佳实践 当前

读有所得?

GitHub 关注我 → 新文周更

GitHub