工业级推荐系统最难的部分不是模型本身,而是模型周围的系统:消除训练-服务偏差的特征仓库、能在亿级用户面前及时拦下回归的金丝雀发布、能在 100 ms p95 预算内串联起四个 ML 模型的编排层。本文是系列的最后一篇,描述了所有大型互联网公司最终都收敛到的那套架构,以及每一层背后的取舍。
你将学到什么
- 多阶段流水线——召回、粗排、精排、重排序,每一阶段的约束如何决定它的形态
- 多路召回——协同过滤、双塔深度模型、图遍历、实时行为信号的并行与融合
- 生产级排序模型——Wide & Deep、DeepFM、DIN 的可运行实现
- 重排序策略——多样性(MMR)、业务规则、时效性加权
- 特征仓库——离线 + 在线双路径如何彻底解耦训练与服务
- A/B 测试——一致性分流、比例 z 检验、实验该跑多久
- 性能优化——量化、蒸馏、预测缓存
- 部署与监控——金丝雀发布、漂移检测、自动回滚
- 团队分工——召回、排序、特征仓库、服务到底归谁
前置知识
- 系列前面所有章节(特别是第 7、11、15 篇)
- 分布式系统基本概念(负载均衡、消息队列)
- 熟悉 Python、PyTorch 和 REST API
工业推荐系统全景
架构总览

无论是 Google、Amazon,还是阿里、字节跳动,所有大型推荐系统最终都收敛到了同一套三平面架构:
- 数据平面:从日志和内容生产样本和特征,由 Hive、Spark、Flink、Kafka 组成。
- 训练平面:把样本变成模型,离线评估通过后写入模型注册中心。
- 服务平面:用户真正在等待的实时漏斗,是唯一有严格延迟预算的平面。
服务平面本身是一个逐级收窄候选集、同时提高打分精度的漏斗:
用户请求 → 召回 (10⁶ → 2K) → 粗排 (2K → 200) → 精排 (200 → 50) → 重排 (50 → 20) → 返回
| 阶段 | 输入 → 输出 | 模型类别 | 延迟预算 |
|---|
| 召回 | 10⁶ → ~2,000 | 双塔 DNN、ANN、简单 CF | 20-30 ms |
| 粗排 | ~2,000 → ~200 | 浅层 DNN 或 XGBoost | 10-20 ms |
| 精排 | ~200 → ~50 | Wide & Deep、DeepFM、DIN | 30-50 ms |
| 重排 | ~50 → ~20 | 规则 + 轻量 ML | 10-20 ms |
| 总计 | | | < 100 ms p95 |
可以把它想象成一个招聘漏斗:召回是简历筛选(快、撒大网),粗排是电话面试(粗筛),精排是 onsite(深度评估),重排是录用委员会(最后从团队搭配和多样性角度做调整)。
为什么是漏斗而不是一个大模型?
朴素方案——用一个大模型给每个候选打分——单次请求要花数秒。漏斗能换来量级上的速度,是因为每一阶段都用了与其候选规模相匹配的模型:候选多用便宜的模型,候选少用贵的模型。召回阶段每个候选大约花 5 微秒,精排大约 250 微秒,乘起来才能塞进延迟预算。
关键设计原则
无状态服务:所有服务必须可水平扩展。状态(用户向量、最近行为)放在 Redis、KV 存储或特征仓库里,永远不要放进进程内存。
1
2
3
4
5
6
7
8
9
10
11
| class RankingService:
"""无状态排序服务——可任意复制副本。"""
def __init__(self, model_path: str):
self.model = load_model(model_path)
self.feature_extractor = FeatureExtractor()
def rank(self, user_id, candidates, context):
features = self.feature_extractor.extract(user_id, candidates, context)
scores = self.model.predict(features)
return sorted(zip(candidates, scores), key=lambda x: x[1], reverse=True)
|
优雅降级:每个组件都必须有 fallback。深度召回超时就退回协同过滤,所有召回都失败就返回热门——用户绝对不能看到空白页。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
| class FaultTolerantRecall:
def __init__(self, channels):
self.channels = channels
self.fallback = PopularItemsRecall()
def recall(self, user_id, context):
results = []
for channel in self.channels:
try:
results.extend(channel.recall(user_id, context, timeout=20))
except Exception as exc:
logger.warning("channel %s failed: %s", channel.name, exc)
if not results:
return self.fallback.recall(user_id, context)
return deduplicate(results)
|
强制延迟预算:每次调用都设激进的超时,由编排器强制执行。某个慢召回不能拖慢整条流水线,要的话就直接丢掉它的结果。
漏斗细节

上图展示了每一阶段的数量级缩减。两条规则值得记住:
- 召回阶段决定了质量上限:好物品没进入漏斗,再精的精排也救不回来。这就是为什么生产系统总是并行跑多路召回。
- 越窄的阶段允许越重的模型:精排在 200 个候选上可以跑一个 1 亿参数的 DIN;召回在百万候选上连两次 embedding 查表 + 一次内积都得抠成本。
多路召回
单一召回策略一定会漏东西。协同过滤漏冷启动物品;内容召回漏惊喜发现;实时信号漏长期兴趣。所以工业系统普遍并行跑 3-5 路召回再融合结果。
通道一:双塔深度召回
双塔架构是现代召回的主力。用户塔在请求时实时跑,物品塔离线跑、把所有物品向量灌进 ANN 索引(Faiss 或 HNSW)。线上每次召回就是一次 ANN 查询,5-10 ms。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| import torch
import torch.nn as nn
class TwoTowerRecall(nn.Module):
"""双塔模型,物品向量离线索引。"""
def __init__(self, user_dim: int, item_dim: int, hidden=(256, 128)):
super().__init__()
self.user_tower = self._tower(user_dim, hidden)
self.item_tower = self._tower(item_dim, hidden)
@staticmethod
def _tower(in_dim, hidden):
layers, prev = [], in_dim
for h in hidden:
layers += [nn.Linear(prev, h), nn.ReLU(), nn.BatchNorm1d(h)]
prev = h
return nn.Sequential(*layers)
def recall(self, user_features, ann_index, top_k=1000):
with torch.no_grad():
user_emb = self.user_tower(user_features)
# ann_index 例如是离线物品向量构建的 Faiss IVF-PQ 索引
_, top_ids = ann_index.search(user_emb.cpu().numpy(), top_k)
return top_ids
|
两个工程上的细节。其一,损失函数很关键:带 logQ 修正的 batch 内 sampled softmax 已经成为标配(YouTube 团队 RecSys 2019 那篇)——目的是抵消热门物品在 batch 中过采样带来的偏差。其二,物品索引必须随 embedding 漂移定期重建:高频品类小时级、其他场景天级。
通道二:图召回
图召回通过多跳遍历找物品:用户 A 喜欢物品 X 和 Y;用户 B 喜欢 Y 和 Z;那么 Z 就是 A 的候选。这能发现纯 embedding 相似度漏掉的关联。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
| from collections import defaultdict
class GraphRecall:
"""基于共同用户的物品 Jaccard 相似度做物品到物品的召回。"""
def __init__(self, interaction_graph):
self.graph = interaction_graph
self.item_similarity = self._compute_similarity()
def _compute_similarity(self):
sim = defaultdict(dict)
items = [n for n in self.graph.nodes() if self.graph.nodes[n]["type"] == "item"]
for a in items:
users_a = set(self.graph.neighbors(a))
for b in items:
if a == b:
continue
users_b = set(self.graph.neighbors(b))
inter, union = len(users_a & users_b), len(users_a | users_b)
if union:
sim[a][b] = inter / union
return sim
def recall(self, user_id, top_k=1000):
seen = [n for n in self.graph.neighbors(user_id)
if self.graph.nodes[n]["type"] == "item"]
scores = defaultdict(float)
for item in seen:
for neighbour, w in self.item_similarity.get(item, {}).items():
scores[neighbour] += w
return [i for i, _ in sorted(scores.items(), key=lambda x: -x[1])[:top_k]]
|
通道三:实时行为召回
这是一条捕捉用户此刻正在做什么的通道。如果用户刚刚连点了同品类的三个物品,下一刷就应该立刻反映出来,而不是等到第二天的离线训练。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
| from collections import defaultdict, deque
from datetime import datetime, timedelta
import numpy as np
class RealTimeBehaviorRecall:
"""基于最近 30 分钟行为的召回,时间衰减 + 行为加权。"""
def __init__(self, window_minutes=30):
self.window = timedelta(minutes=window_minutes)
self.behaviours = defaultdict(deque)
def add_behavior(self, user_id, item_id, action_type, ts):
self.behaviours[user_id].append({"item": item_id, "action": action_type, "ts": ts})
cutoff = ts - self.window
while self.behaviours[user_id] and self.behaviours[user_id][0]["ts"] < cutoff:
self.behaviours[user_id].popleft()
def recall(self, user_id, top_k=500):
weights = {"view": 1.0, "click": 2.0, "purchase": 5.0}
scores = defaultdict(float)
now = datetime.now()
for b in self.behaviours.get(user_id, []):
age_min = (now - b["ts"]).total_seconds() / 60
recency = np.exp(-age_min / 10)
scores[b["item"]] += recency * weights.get(b["action"], 1.0)
return [i for i, _ in sorted(scores.items(), key=lambda x: -x[1])[:top_k]]
|
多路融合
每条通道返回一个有序列表。融合时用基于排名的融合而不是原始分数(不同通道的分数没有可比性):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
class MultiChannelRecall:
def __init__(self, channels, weights=None):
self.channels = channels
self.weights = weights or {c.name: 1.0 for c in channels}
def recall(self, user_id, context, target=2000):
results = {}
with ThreadPoolExecutor(max_workers=len(self.channels)) as pool:
futures = {pool.submit(c.recall, user_id, context): c.name for c in self.channels}
for fut in as_completed(futures):
name = futures[fut]
try:
results[name] = fut.result(timeout=25)
except Exception as exc:
logger.error("channel %s failed: %s", name, exc)
scores = defaultdict(float)
for name, items in results.items():
w = self.weights.get(name, 1.0)
for rank, item in enumerate(items):
scores[item] += w / (rank + 1) # 倒数排名融合
return [i for i, _ in sorted(scores.items(), key=lambda x: -x[1])[:target]]
|
这就是搜索引擎里熟知的倒数排名融合(RRF)——对量纲差异很鲁棒。每条通道 25 ms 的超时是硬约束:慢的通道直接丢,绝不阻塞。
排序:粗排与精排
粗排
粗排把数千候选裁到数百,模型必须便宜。它的目的是廉价地剔除明显不对的候选——不是给出完美排序。两种方案最常见:
- 浅层双塔,物品侧离线算(和召回类似但带更丰富的特征)。
- XGBoost 排序模型,跑在简单特征上(热度、CTR、用户/物品基础统计)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| import xgboost as xgb
class CoarseRanker:
def __init__(self):
self.model = xgb.XGBRanker(
objective="rank:pairwise",
tree_method="hist",
max_depth=4,
n_estimators=50,
)
def fit(self, X, y, group):
self.model.fit(X, y, group=group)
def predict(self, X):
return self.model.predict(X)
|
一个常见错误是把粗排做得太好。如果它选出来的 200 已经基本就是精排会选的 200,那精排就没价值了。经验值是粗排 top-200 与精排的重合率维持在 0.7 左右——能筛掉明显垃圾,又给精排留下区分空间。
精排:Wide & Deep、DeepFM、DIN
精排在小候选集上跑重模型。生产环境里 CTR 预估有三大架构。
Wide & Deep(Google 2016)把记忆能力(线性模型 + 交叉特征)和泛化能力(深度 MLP + embedding)结合起来:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
| class WideDeepRanking(nn.Module):
"""Google Wide & Deep:线性记忆 + 深度泛化。"""
def __init__(self, wide_dim, embed_dims, deep_hidden):
super().__init__()
self.wide = nn.Linear(wide_dim, 1)
self.embeddings = nn.ModuleDict(
{name: nn.Embedding(vocab, dim) for name, (vocab, dim) in embed_dims.items()}
)
deep_in = sum(dim for _, dim in embed_dims.values())
layers, prev = [], deep_in
for h in deep_hidden:
layers += [nn.Linear(prev, h), nn.ReLU(), nn.BatchNorm1d(h), nn.Dropout(0.2)]
prev = h
layers.append(nn.Linear(prev, 1))
self.deep = nn.Sequential(*layers)
def forward(self, wide_feats, sparse_ids, dense_feats):
wide_out = self.wide(wide_feats)
emb = [self.embeddings[name](ids) for name, ids in sparse_ids.items()]
deep_out = self.deep(torch.cat(emb + [dense_feats], dim=1))
return wide_out + deep_out
|
DeepFM(华为 2017)用因子分解机自动学两阶特征交叉,省掉了人工编排 wide 侧交叉特征的工作。如果你不想花精力人工调交叉特征,这是最合理的默认选择。
DIN(Deep Interest Network,阿里 2018)在用户行为序列上加了注意力。它不再把历史行为简单平均,而是对每个候选动态地把注意力放在与候选相似的历史行为上:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
| class DIN(nn.Module):
"""Deep Interest Network:在用户行为序列上做注意力。"""
def __init__(self, item_dim, user_dim, hidden=(128, 64)):
super().__init__()
self.attention = nn.Sequential(
nn.Linear(item_dim * 4, 36), nn.ReLU(), nn.Linear(36, 1)
)
in_dim = item_dim + user_dim + item_dim
layers, prev = [], in_dim
for h in hidden:
layers += [nn.Linear(prev, h), nn.ReLU(), nn.Dropout(0.2)]
prev = h
layers.append(nn.Linear(prev, 1))
self.mlp = nn.Sequential(*layers)
def forward(self, candidate, history, user_profile):
cand_exp = candidate.unsqueeze(1).expand_as(history)
attn_in = torch.cat(
[history, cand_exp, history - cand_exp, history * cand_exp], dim=2
)
weights = torch.softmax(self.attention(attn_in).squeeze(-1), dim=1)
weighted = (history * weights.unsqueeze(-1)).sum(dim=1)
return self.mlp(torch.cat([candidate, weighted, user_profile], dim=1))
|
注意力这个 trick 很关键:一个买过 50 本书、横跨 5 个品类的用户,并不存在一个"平均兴趣"——他只有按品类分别成立的兴趣。DIN 就是用注意力把这种按候选分别激活的兴趣解锁出来。
重排序
重排序是业务逻辑遇到算法输出的地方。三种模式几乎是每个生产系统的标配。
多样性(MMR)
纯 CTR 优化产生的列表会高度同质——用户点了第一个就走人。**MMR(Maximal Marginal Relevance)**贪心选择"既相关又与已选项多样"的物品:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
| import numpy as np
class DiversityReranker:
def __init__(self, lambda_div=0.3):
self.lam = lambda_div
def rerank(self, items, scores, features, top_k=20):
selected, remaining = [], list(zip(items, scores, features))
while len(selected) < top_k and remaining:
best_idx, best_obj = None, -np.inf
for idx, (it, sc, ft) in enumerate(remaining):
if selected:
diversity = min(self._dist(ft, s_ft) for _, _, s_ft in selected)
else:
diversity = 1.0
obj = (1 - self.lam) * sc + self.lam * diversity
if obj > best_obj:
best_obj, best_idx = obj, idx
selected.append(remaining.pop(best_idx))
return [it for it, _, _ in selected]
@staticmethod
def _dist(a, b):
va, vb = np.array(list(a.values())), np.array(list(b.values()))
return 1 - va @ vb / (np.linalg.norm(va) * np.linalg.norm(vb) + 1e-8)
|
多样性权重(典型取 0.2-0.3)本身就是个 A/B 参数。太低 feed 单调;太高 CTR 下降,因为相关性被牺牲了。
业务规则
硬约束放这里,不要放进 ML 模型。下架过滤、合规要求、运营加权——这些是确定性规则,写成代码远比写成特征更容易审计。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| class BusinessRulesReranker:
def __init__(self, rules):
self.rules = rules
def rerank(self, items, scores, metadata):
out = []
for item, score in zip(items, scores):
meta, adj, ok = metadata.get(item, {}), score, True
for rule in self.rules:
if not rule.check(item, meta):
ok = False
break
adj += rule.score_adjustment(item, meta)
if ok:
out.append((item, adj))
out.sort(key=lambda x: -x[1])
return [i for i, _ in out]
|
时效性加权
新闻、视频、短内容场景里,新鲜度本身就是一个重要特征。指数衰减给最近内容一个有界的加权,既不会被旧内容压住,也不会让新内容彻底盖过老内容:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| from datetime import datetime
import numpy as np
class FreshnessReranker:
def __init__(self, decay_hours=24, max_boost=0.3):
self.decay, self.max_boost = decay_hours, max_boost
def rerank(self, items, scores, timestamps):
now, out = datetime.now(), []
for item, score in zip(items, scores):
ts = timestamps.get(item)
if ts:
age_h = (now - ts).total_seconds() / 3600
boost = self.max_boost * np.exp(-age_h / self.decay)
out.append((item, score * (1 + boost)))
else:
out.append((item, score))
out.sort(key=lambda x: -x[1])
return [i for i, _ in out]
|
特征仓库

特征仓库是成熟推荐系统里最重要、却又最常被最后才动手建的基础设施。它的核心职责是消除训练-服务偏差:保证一个特征在离线训练时计算的语义,与它在线上服务时计算的语义完全一致。
架构上是两条路径共享一份特征定义:
- 离线路径:Spark/Flink 跑在数据湖上,把特征物化成 Parquet 文件,喂给训练流水线。
- 在线路径:Flink 消费 Kafka 事件流,把聚合特征写入 Redis,线上 p99 < 5 ms 取出。
两条路径执行的是同一份特征定义(通常是 SQL 或 YAML)。改特征时两边一起改。如果没有这个纪律,迟早会发生这样的事:训练时用的特征语义和线上服务时不一致,AUC 静悄悄掉一两个点,等业务指标发现已经晚了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
| import json
class FeatureStore:
"""Redis 后端的在线特征仓库,支持批量取。"""
def __init__(self, redis_client, ttl=3600):
self.redis, self.ttl = redis_client, ttl
def set(self, entity, eid, name, value):
self.redis.setex(f"{entity}:{eid}:{name}", self.ttl, json.dumps(value))
def batch_get(self, entity, eids, names):
keys = [f"{entity}:{e}:{n}" for e in eids for n in names]
values = self.redis.mget(keys)
n = len(names)
return [
[json.loads(v) if v else None for v in values[i:i + n]]
for i in range(0, len(values), n)
]
|
值得了解的开源/商业方案:Feast(最流行的开源),Tecton(商业),阿里内部的 Feathr。底层都遵循同样的"离线 + 在线双路径"模式。
时间特征的循环编码
一个看起来很小、却影响很大的细节。23 点和 0 点在时间上相邻,但线性模型会把它们看成相差 23 个单位。用 (sin, cos) 编码可以让模型把它们看成邻居:
1
2
3
4
5
6
7
8
9
| def temporal_features(ts):
dt = datetime.fromtimestamp(ts)
return {
"hour_sin": np.sin(2 * np.pi * dt.hour / 24),
"hour_cos": np.cos(2 * np.pi * dt.hour / 24),
"dow_sin": np.sin(2 * np.pi * dt.weekday() / 7),
"dow_cos": np.cos(2 * np.pi * dt.weekday() / 7),
"is_weekend": int(dt.weekday() >= 5),
}
|
A/B 测试框架

A/B 测试是你发现"离线 AUC 涨 3% 的模型,上线后实际跌 1.5%“的唯一手段——这种事比想象中常见得多。三个属性必须保证:
- 一致性哈希分流:同一个用户必须始终被分到同一个分桶。中途换组既毁体验,又毁统计有效性。
- 预先注册的指标:包括"护栏指标”(延迟、错误率、营收),即使主指标涨了,护栏破了也不能上线。
- 事先做功效分析:开始实验前就要算清楚需要多少样本量。看到一个"看起来不错"的中间结果就提前停止,假阳率会爆炸式增长。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
| import numpy as np
from collections import defaultdict
from scipy.stats import norm
class ABTestFramework:
"""生产级 A/B 框架:一致性分流 + 比例 z 检验。"""
def __init__(self):
self.experiments, self.events = {}, defaultdict(list)
def create(self, exp_id, variants_split): # 例如 {"control": 50, "v1": 50}
self.experiments[exp_id] = variants_split
def assign(self, user_id, exp_id):
h = hash(f"{user_id}:{exp_id}") % 100
cum = 0
for variant, split in self.experiments[exp_id].items():
cum += split
if h < cum:
return variant
return "control"
def log(self, user_id, exp_id, variant, event):
self.events[exp_id].append({"user": user_id, "variant": variant, "event": event})
def analyse(self, exp_id):
stats = defaultdict(lambda: {"impr": 0, "click": 0})
for ev in self.events[exp_id]:
if ev["event"] == "impression":
stats[ev["variant"]]["impr"] += 1
elif ev["event"] == "click":
stats[ev["variant"]]["click"] += 1
out, ctrl = {}, stats.get("control")
for v, s in stats.items():
ctr = s["click"] / max(s["impr"], 1)
out[v] = {"ctr": ctr, **s}
if ctrl:
for v, s in out.items():
if v == "control":
continue
p_pool = (ctrl["click"] + s["click"]) / (ctrl["impr"] + s["impr"])
se = np.sqrt(p_pool * (1 - p_pool) * (1 / ctrl["impr"] + 1 / s["impr"]))
z = (s["ctr"] - out["control"]["ctr"]) / (se + 1e-8)
s["lift"] = (s["ctr"] - out["control"]["ctr"]) / (out["control"]["ctr"] + 1e-8)
s["p_value"] = 2 * (1 - norm.cdf(abs(z)))
s["significant"] = s["p_value"] < 0.05
return out
|
**A/B 实验该跑多久?**至少要满足三个条件:(1) 覆盖一个完整的星期周期(一般两周);(2) 达到功效分析算出的样本量;(3) 越过新奇效应(用户对新东西天然有点击冲动)。两周是最常见的答案,少于一周的实验都要打个问号。
常见陷阱:变体之间的网络效应(实验组用户通过共享内容影响对照组)、SUTVA 假设违反、不同用户群的异质处理效应、累积效应(实验组提升了长期留存但短期 CTR 下降)。这些问题的根本解法是分层实验平台——这也是为什么 Google、Meta、字节都自己搭了一套实验平台。
持续训练

模型会衰减。用户行为漂移、物品池变化、季节性切换。上个月还是 SOTA 的模型,下个月不重训就会变成包袱。训练流水线必须自动跑,触发条件包括:
- 定时调度——精排日级,增量更新小时级,最易变特征近实时。
- 漂移检测——重要特征 PSI(Population Stability Index)超过 0.2 就触发,不等定时调度。
- 指标衰减——离线 AUC 相对上一个 checkpoint 下降超过 2%。
- 代码变更——新的特征定义或模型结构。
训练的输出不是已部署的模型,而是模型注册中心里的一个带元数据的 artifact:版本号、训练数据窗口、离线指标、血缘信息。部署是一个独立的、有门禁的步骤。
部署:影子 → 金丝雀 → A/B → 全量
新模型不可能从注册中心一步到 100% 流量。标准的阶梯式发布:
- 影子流量——0% 流量,但模型并行运行,预测结果记日志。这能在不影响用户的前提下抓出延迟回归、schema 不一致、服务 bug。
- 金丝雀——1-10% 流量跑 1-24 小时。护栏指标破线就自动回滚。
- A/B 测试——50% 流量跑 1-2 周做正式统计验证。
- 全量——100% 流量。
自动回滚没得商量。判断标准很硬:p95 延迟超 SLO、错误率超 1%、CTR 跌幅超 5%——任意一条触发就回滚并寻呼人工。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
| class CanaryDeployment:
def deploy(self, version, initial_pct=10):
if not self.validate_offline(version):
raise ValueError("offline validation failed")
canary = self.deploy_canary(version, traffic=initial_pct)
metrics = self.monitor(canary, minutes=60)
if self.healthy(metrics):
self.rollout(canary, target=100)
else:
self.rollback(canary)
self.alert("canary failed", metrics)
@staticmethod
def healthy(m):
return (m["p95_latency"] < 100
and m["error_rate"] < 0.01
and m.get("ctr_delta", 0) > -0.05)
|
服务基础设施

服务栈分四层,每一层都是无状态、可水平扩展的:
- API 网关 + 负载均衡(Nginx、Envoy 或云 LB):处理 TLS、鉴权、限流、路由。
- 推荐编排器:无状态服务,串联整个漏斗,依次调用召回、排序、重排,再合并结果。
- 后端服务:召回服务背后是 Faiss/HNSW;排序服务跑 TF-Serving 或 Triton 在 GPU 上;特征服务用 Redis(热数据)+ HBase(冷数据)。
- 多层缓存:特征缓存、向量缓存、整次预测缓存。预测缓存命中率 30-50% 是常态,相应地节省同样比例的计算成本。
性能优化
三种技术叠加可以拿到 5-10 倍加速,且几乎不损失质量:
量化(FP32 → INT8):CPU 上 2-4 倍,GPU 上小幅提升。
1
2
3
4
5
6
7
8
9
10
| import torch.quantization as quant
def quantize(model, calibration_loader):
model.eval()
model.qconfig = quant.get_default_qconfig("fbgemm")
quant.prepare(model, inplace=True)
with torch.no_grad():
for batch in calibration_loader:
model(batch)
return quant.convert(model, inplace=False)
|
知识蒸馏:用一个小学生模型模仿大老师模型。学生不仅学硬标签,还学老师输出的软概率——后者携带了"物品之间相对优劣"的信息。
1
2
3
4
5
6
7
8
| import torch.nn.functional as F
def distill_loss(student_logits, teacher_logits, labels, T=3.0, alpha=0.7):
s_soft = F.log_softmax(student_logits / T, dim=1)
t_soft = F.softmax(teacher_logits / T, dim=1)
soft = F.kl_div(s_soft, t_soft, reduction="batchmean") * (T ** 2)
hard = F.cross_entropy(student_logits, labels)
return alpha * soft + (1 - alpha) * hard
|
预测缓存:针对长尾的重复请求。TTL 5 分钟是个不错的默认值——足够摊薄计算成本,又能反映新行为。
标准配方是先蒸馏,再剪枝,最后量化。这个顺序能让每一步都保留前一步的质量收益。
监控
三类指标,全部带告警:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
| class RecommendationMonitor:
def __init__(self, metrics):
self.metrics = metrics
def log(self, user_id, scores, latency_ms):
self.metrics.histogram("pred.latency", latency_ms)
self.metrics.histogram("pred.score_mean", float(np.mean(scores)))
def check(self):
recent = self.metrics.recent("pred.latency", minutes=5)
if recent["p95"] > 150:
self.alert("high_latency", recent)
if self.metrics.error_rate(minutes=5) > 0.05:
self.alert("high_error_rate")
scores = self.metrics.recent("pred.score_mean", minutes=30)
baseline = self.metrics.baseline("pred.score_mean")
if abs(scores["mean"] - baseline["mean"]) > 2 * baseline["std"]:
self.alert("distribution_shift", scores)
|
最隐蔽、也最有价值的告警是第三条——预测分布漂移。如果平均预测 CTR 突然偏离基线两个标准差,上游一定出了问题:某个特征损坏、某个 embedding 索引过期、某个模型悄悄发了错版本。等业务指标动起来再发现,已经损失一个小时了;分布监控能在分钟级把这种问题揪出来。
团队分工

生产级推荐系统对一个人来说太大,对完全独立的小团队来说又耦合太深。实践中有效的角色边界是这样的:
- 算法工程师:负责模型代码、特征设计、A/B 实验。召回、排序、重排模型都他们写。
- 数据工程师:负责 ETL 流水线、样本生成、特征仓库的离线侧。他们是数据质量问题的防火墙。
- MLOps / 平台工程师:负责训练基础设施、模型注册中心、CI/CD、服务运行时。让"上线一个新模型"从一个月变成一天。
- SRE / 基础设施:负责延迟 SLO、容量规划、故障响应。凌晨三点被叫起来的就是他们。
- 分析师 / 研究:负责长期评估、因果推断、排序诊断。他们抓"指标看着挺好但营收不动"这种问题。
- 产品:负责业务 KPI 和内容政策。
图右侧的矩阵列出了各阶段的主要负责方。规律是:每个阶段都至少有两个负责方——因为每个阶段同时有"模型质量"和"运维"两个维度。
工业级框架
阿里 EasyRec(开源):端到端框架,包含特征工程、预置模型(Wide & Deep、DeepFM、DIN、MMoE)、PAI/MaxCompute 训练、PAI-EAS 部署。如果你已经在阿里云上,这是最快上到生产级 baseline 的路径。
Meta TorchRec:TorchRec 是开源的、支撑 Meta 内部推荐栈的库。强项是分片 embedding 表——这是大规模推荐训练里最难的分布式系统问题。
字节跳动 Monolith(开源):面向十亿参数级在线学习设计。核心是无碰撞 embedding 哈希表和异步训练——能从生产日志近实时地更新模型。TikTok 推荐栈的一部分就在它上面。
YouTube 两阶段系统:经典论文是 Covington 等 2016 那篇——双塔深度候选生成 + 深度排序。后续演进里最有影响力的是 2019 年 sampled softmax 那篇,但两阶段的骨架仍然是大多数团队的模板。
常见问题 Q&A
召回路数应该有多少?
从三路开始:协同过滤、双塔深度、实时行为。专门通道(图、内容、地理、社交)只在离线分析显示"现有通道遗漏了哪些好物品"时才加。超过十路,时间和成本基本花在管道上而不是质量上了。
粗排到精排的合理收缩比是多少?
通常 10:1(2,000 → 200 → 20)。要端到端监控 recall@K:粗排太激进会丢掉精排本来能挑出来的好物品;粗排太松又浪费精排的算力预算。
精排该多复杂?
从 Wide & Deep 或 DeepFM 起步。只有在测出"现有模型没充分利用序列信息"(典型表现:历史丰富的用户预测分平淡)之后,才加 DIN 这种基于注意力的序列建模。每提升一档复杂度,都要让它服务成本的增加被指标提升 justify。
重排该用模型还是规则?
混合用。硬约束(合规、库存、黑名单)用确定性规则——可审计。软优化(多样性、新鲜度、探索)用学习型重排。两者并存是常态。
量化、剪枝、蒸馏怎么选?
量化单位投入产出比最高(CPU 上 2-4 倍)。蒸馏在你需要的不只是更快、还要更小模型时是正解。剪枝最脆弱——能用,但需要小心地重训。推荐顺序:蒸馏 → 剪枝 → 量化。
如何处理新用户?
多级降级:(1) 全新用户给热门和趋势内容;(2) 知道一点画像后给基于画像的内容推荐;(3) 用 bandit 风格的探索在 3-10 次交互内积累信号;(4) ~50 次交互后切到完整的个性化模型。详见第 14 篇关于元学习的部分。
什么时候下线一个模型?
满足两个条件:(a) 后继模型在主指标上 A/B 测试胜出,且没有任何护栏指标回退;(b) 新模型的运维成本可接受。无论如何,旧模型要保留 30 天可重新部署的状态,以应对滞后回归。
总结
本文把工业级推荐系统的完整栈搭起来了:
- 三个平面——数据、训练、服务,平面之间接口清晰
- 四级漏斗——召回、粗排、精排、重排,把数亿用户塞进 100 ms 预算
- 多路召回 + RRF 融合,因为没有单一通道能覆盖全部质量
- Wide & Deep、DeepFM、DIN 作为精排的生产级架构
- 特征仓库用一份特征定义同时驱动离线训练和在线服务,消除偏差
- A/B 测试做一致性分流、z 检验、预先注册护栏指标
- 持续训练由调度、漂移、指标衰减触发
- 金丝雀发布配合护栏指标自动回滚
- 服务基础设施——网关、编排器、GPU 模型服务、Redis 特征仓库、预测缓存
- 团队分工清晰映射到流水线各阶段
工业实践里最有价值的一条经验也是最朴素的一条:从小处起步,凡事度量,由 A/B 决定留哪个。一个用热门 + 简单双塔召回 + DeepFM 精排 + 严格实验纪律的流水线,会跑赢一个没有 A/B 框架就上线的奇异 GNN。本文里讲的所有框架都不是竞争优势的来源——它们支撑起的那套迭代闭环才是。
系列导航
本文是推荐系统 16 篇系列的第 16 篇(完结)。感谢你一路读到这里。
参考文献
- Covington, P., Adams, J., Sargin, E. “Deep Neural Networks for YouTube Recommendations.” RecSys 2016. paper ↗
- Yi, X., et al. “Sampling-Bias-Corrected Neural Modeling for Large Corpus Item Recommendations.” RecSys 2019.(“双塔 + logQ 修正"那篇)
- Cheng, H., et al. “Wide & Deep Learning for Recommender Systems.” DLRS 2016. arXiv:1606.07792 ↗
- Guo, H., et al. “DeepFM: A Factorization-Machine based Neural Network for CTR Prediction.” IJCAI 2017. arXiv:1703.04247 ↗
- Zhou, G., et al. “Deep Interest Network for Click-Through Rate Prediction.” KDD 2018. arXiv:1706.06978 ↗
- Liu, Z., et al. “Monolith: Real Time Recommendation System With Collisionless Embedding Table.” 2022. arXiv:2209.07663 ↗
- 阿里 EasyRec:github.com/alibaba/EasyRec ↗
- TorchRec:github.com/pytorch/torchrec ↗
- Feast(开源特征仓库):feast.dev ↗