推荐系统(十六)—— 工业级架构与最佳实践

工业级推荐系统全景:数据/训练/服务三大平面,召回-粗排-精排-重排序四级漏斗,多路召回与融合,Wide&Deep/DeepFM/DIN 排序模型,特征仓库消除训练-服务偏差,A/B 测试方法论,金丝雀发布与回滚,团队角色分工。结合 YouTube、TikTok、淘宝、字节跳动等真实架构。

工业级推荐系统最难的部分不是模型本身,而是模型周围的系统:消除训练-服务偏差的特征仓库、能在亿级用户面前及时拦下回归的金丝雀发布、能在 100 ms p95 预算内串联起四个 ML 模型的编排层。本文是系列的最后一篇,描述了所有大型互联网公司最终都收敛到的那套架构,以及每一层背后的取舍。

你将学到什么

  • 多阶段流水线——召回、粗排、精排、重排序,每一阶段的约束如何决定它的形态
  • 多路召回——协同过滤、双塔深度模型、图遍历、实时行为信号的并行与融合
  • 生产级排序模型——Wide & Deep、DeepFM、DIN 的可运行实现
  • 重排序策略——多样性(MMR)、业务规则、时效性加权
  • 特征仓库——离线 + 在线双路径如何彻底解耦训练与服务
  • A/B 测试——一致性分流、比例 z 检验、实验该跑多久
  • 性能优化——量化、蒸馏、预测缓存
  • 部署与监控——金丝雀发布、漂移检测、自动回滚
  • 团队分工——召回、排序、特征仓库、服务到底归谁

前置知识

  • 系列前面所有章节(特别是第 7、11、15 篇)
  • 分布式系统基本概念(负载均衡、消息队列)
  • 熟悉 Python、PyTorch 和 REST API

工业推荐系统全景

架构总览

工业级推荐系统三平面架构:数据平面、训练平面、服务平面,以及召回-粗排-精排-重排序漏斗

无论是 Google、Amazon,还是阿里、字节跳动,所有大型推荐系统最终都收敛到了同一套三平面架构

  1. 数据平面:从日志和内容生产样本和特征,由 Hive、Spark、Flink、Kafka 组成。
  2. 训练平面:把样本变成模型,离线评估通过后写入模型注册中心。
  3. 服务平面:用户真正在等待的实时漏斗,是唯一有严格延迟预算的平面。

服务平面本身是一个逐级收窄候选集、同时提高打分精度的漏斗

用户请求 → 召回 (10⁶ → 2K) → 粗排 (2K → 200) → 精排 (200 → 50) → 重排 (50 → 20) → 返回
阶段输入 → 输出模型类别延迟预算
召回10⁶ → ~2,000双塔 DNN、ANN、简单 CF20-30 ms
粗排~2,000 → ~200浅层 DNN 或 XGBoost10-20 ms
精排~200 → ~50Wide & Deep、DeepFM、DIN30-50 ms
重排~50 → ~20规则 + 轻量 ML10-20 ms
总计< 100 ms p95

可以把它想象成一个招聘漏斗:召回是简历筛选(快、撒大网),粗排是电话面试(粗筛),精排是 onsite(深度评估),重排是录用委员会(最后从团队搭配和多样性角度做调整)。

为什么是漏斗而不是一个大模型?

朴素方案——用一个大模型给每个候选打分——单次请求要花数秒。漏斗能换来量级上的速度,是因为每一阶段都用了与其候选规模相匹配的模型:候选多用便宜的模型,候选少用贵的模型。召回阶段每个候选大约花 5 微秒,精排大约 250 微秒,乘起来才能塞进延迟预算。

关键设计原则

无状态服务:所有服务必须可水平扩展。状态(用户向量、最近行为)放在 Redis、KV 存储或特征仓库里,永远不要放进进程内存

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
class RankingService:
    """无状态排序服务——可任意复制副本。"""

    def __init__(self, model_path: str):
        self.model = load_model(model_path)
        self.feature_extractor = FeatureExtractor()

    def rank(self, user_id, candidates, context):
        features = self.feature_extractor.extract(user_id, candidates, context)
        scores = self.model.predict(features)
        return sorted(zip(candidates, scores), key=lambda x: x[1], reverse=True)

优雅降级:每个组件都必须有 fallback。深度召回超时就退回协同过滤,所有召回都失败就返回热门——用户绝对不能看到空白页。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
class FaultTolerantRecall:
    def __init__(self, channels):
        self.channels = channels
        self.fallback = PopularItemsRecall()

    def recall(self, user_id, context):
        results = []
        for channel in self.channels:
            try:
                results.extend(channel.recall(user_id, context, timeout=20))
            except Exception as exc:
                logger.warning("channel %s failed: %s", channel.name, exc)

        if not results:
            return self.fallback.recall(user_id, context)
        return deduplicate(results)

强制延迟预算:每次调用都设激进的超时,由编排器强制执行。某个慢召回不能拖慢整条流水线,要的话就直接丢掉它的结果。


漏斗细节

召回、粗排、精排、重排序漏斗,标注了候选数量与每阶段延迟预算

上图展示了每一阶段的数量级缩减。两条规则值得记住:

  • 召回阶段决定了质量上限:好物品没进入漏斗,再精的精排也救不回来。这就是为什么生产系统总是并行跑多路召回。
  • 越窄的阶段允许越重的模型:精排在 200 个候选上可以跑一个 1 亿参数的 DIN;召回在百万候选上连两次 embedding 查表 + 一次内积都得抠成本。

多路召回

单一召回策略一定会漏东西。协同过滤漏冷启动物品;内容召回漏惊喜发现;实时信号漏长期兴趣。所以工业系统普遍并行跑 3-5 路召回再融合结果。

通道一:双塔深度召回

双塔架构是现代召回的主力。用户塔在请求时实时跑,物品塔离线跑、把所有物品向量灌进 ANN 索引(Faiss 或 HNSW)。线上每次召回就是一次 ANN 查询,5-10 ms。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import torch
import torch.nn as nn


class TwoTowerRecall(nn.Module):
    """双塔模型,物品向量离线索引。"""

    def __init__(self, user_dim: int, item_dim: int, hidden=(256, 128)):
        super().__init__()
        self.user_tower = self._tower(user_dim, hidden)
        self.item_tower = self._tower(item_dim, hidden)

    @staticmethod
    def _tower(in_dim, hidden):
        layers, prev = [], in_dim
        for h in hidden:
            layers += [nn.Linear(prev, h), nn.ReLU(), nn.BatchNorm1d(h)]
            prev = h
        return nn.Sequential(*layers)

    def recall(self, user_features, ann_index, top_k=1000):
        with torch.no_grad():
            user_emb = self.user_tower(user_features)
        # ann_index 例如是离线物品向量构建的 Faiss IVF-PQ 索引
        _, top_ids = ann_index.search(user_emb.cpu().numpy(), top_k)
        return top_ids

两个工程上的细节。其一,损失函数很关键:带 logQ 修正的 batch 内 sampled softmax 已经成为标配(YouTube 团队 RecSys 2019 那篇)——目的是抵消热门物品在 batch 中过采样带来的偏差。其二,物品索引必须随 embedding 漂移定期重建:高频品类小时级、其他场景天级。

通道二:图召回

图召回通过多跳遍历找物品:用户 A 喜欢物品 X 和 Y;用户 B 喜欢 Y 和 Z;那么 Z 就是 A 的候选。这能发现纯 embedding 相似度漏掉的关联。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from collections import defaultdict


class GraphRecall:
    """基于共同用户的物品 Jaccard 相似度做物品到物品的召回。"""

    def __init__(self, interaction_graph):
        self.graph = interaction_graph
        self.item_similarity = self._compute_similarity()

    def _compute_similarity(self):
        sim = defaultdict(dict)
        items = [n for n in self.graph.nodes() if self.graph.nodes[n]["type"] == "item"]
        for a in items:
            users_a = set(self.graph.neighbors(a))
            for b in items:
                if a == b:
                    continue
                users_b = set(self.graph.neighbors(b))
                inter, union = len(users_a & users_b), len(users_a | users_b)
                if union:
                    sim[a][b] = inter / union
        return sim

    def recall(self, user_id, top_k=1000):
        seen = [n for n in self.graph.neighbors(user_id)
                if self.graph.nodes[n]["type"] == "item"]
        scores = defaultdict(float)
        for item in seen:
            for neighbour, w in self.item_similarity.get(item, {}).items():
                scores[neighbour] += w
        return [i for i, _ in sorted(scores.items(), key=lambda x: -x[1])[:top_k]]

通道三:实时行为召回

这是一条捕捉用户此刻正在做什么的通道。如果用户刚刚连点了同品类的三个物品,下一刷就应该立刻反映出来,而不是等到第二天的离线训练。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from collections import defaultdict, deque
from datetime import datetime, timedelta

import numpy as np


class RealTimeBehaviorRecall:
    """基于最近 30 分钟行为的召回,时间衰减 + 行为加权。"""

    def __init__(self, window_minutes=30):
        self.window = timedelta(minutes=window_minutes)
        self.behaviours = defaultdict(deque)

    def add_behavior(self, user_id, item_id, action_type, ts):
        self.behaviours[user_id].append({"item": item_id, "action": action_type, "ts": ts})
        cutoff = ts - self.window
        while self.behaviours[user_id] and self.behaviours[user_id][0]["ts"] < cutoff:
            self.behaviours[user_id].popleft()

    def recall(self, user_id, top_k=500):
        weights = {"view": 1.0, "click": 2.0, "purchase": 5.0}
        scores = defaultdict(float)
        now = datetime.now()
        for b in self.behaviours.get(user_id, []):
            age_min = (now - b["ts"]).total_seconds() / 60
            recency = np.exp(-age_min / 10)
            scores[b["item"]] += recency * weights.get(b["action"], 1.0)
        return [i for i, _ in sorted(scores.items(), key=lambda x: -x[1])[:top_k]]

多路融合

每条通道返回一个有序列表。融合时用基于排名的融合而不是原始分数(不同通道的分数没有可比性):

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed


class MultiChannelRecall:
    def __init__(self, channels, weights=None):
        self.channels = channels
        self.weights = weights or {c.name: 1.0 for c in channels}

    def recall(self, user_id, context, target=2000):
        results = {}
        with ThreadPoolExecutor(max_workers=len(self.channels)) as pool:
            futures = {pool.submit(c.recall, user_id, context): c.name for c in self.channels}
            for fut in as_completed(futures):
                name = futures[fut]
                try:
                    results[name] = fut.result(timeout=25)
                except Exception as exc:
                    logger.error("channel %s failed: %s", name, exc)

        scores = defaultdict(float)
        for name, items in results.items():
            w = self.weights.get(name, 1.0)
            for rank, item in enumerate(items):
                scores[item] += w / (rank + 1)  # 倒数排名融合
        return [i for i, _ in sorted(scores.items(), key=lambda x: -x[1])[:target]]

这就是搜索引擎里熟知的倒数排名融合(RRF)——对量纲差异很鲁棒。每条通道 25 ms 的超时是硬约束:慢的通道直接丢,绝不阻塞。


排序:粗排与精排

粗排

粗排把数千候选裁到数百,模型必须便宜。它的目的是廉价地剔除明显不对的候选——不是给出完美排序。两种方案最常见:

  • 浅层双塔,物品侧离线算(和召回类似但带更丰富的特征)。
  • XGBoost 排序模型,跑在简单特征上(热度、CTR、用户/物品基础统计)。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
import xgboost as xgb


class CoarseRanker:
    def __init__(self):
        self.model = xgb.XGBRanker(
            objective="rank:pairwise",
            tree_method="hist",
            max_depth=4,
            n_estimators=50,
        )

    def fit(self, X, y, group):
        self.model.fit(X, y, group=group)

    def predict(self, X):
        return self.model.predict(X)

一个常见错误是把粗排做得太好。如果它选出来的 200 已经基本就是精排会选的 200,那精排就没价值了。经验值是粗排 top-200 与精排的重合率维持在 0.7 左右——能筛掉明显垃圾,又给精排留下区分空间。

精排:Wide & Deep、DeepFM、DIN

精排在小候选集上跑重模型。生产环境里 CTR 预估有三大架构。

Wide & Deep(Google 2016)把记忆能力(线性模型 + 交叉特征)和泛化能力(深度 MLP + embedding)结合起来:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
class WideDeepRanking(nn.Module):
    """Google Wide & Deep:线性记忆 + 深度泛化。"""

    def __init__(self, wide_dim, embed_dims, deep_hidden):
        super().__init__()
        self.wide = nn.Linear(wide_dim, 1)
        self.embeddings = nn.ModuleDict(
            {name: nn.Embedding(vocab, dim) for name, (vocab, dim) in embed_dims.items()}
        )
        deep_in = sum(dim for _, dim in embed_dims.values())
        layers, prev = [], deep_in
        for h in deep_hidden:
            layers += [nn.Linear(prev, h), nn.ReLU(), nn.BatchNorm1d(h), nn.Dropout(0.2)]
            prev = h
        layers.append(nn.Linear(prev, 1))
        self.deep = nn.Sequential(*layers)

    def forward(self, wide_feats, sparse_ids, dense_feats):
        wide_out = self.wide(wide_feats)
        emb = [self.embeddings[name](ids) for name, ids in sparse_ids.items()]
        deep_out = self.deep(torch.cat(emb + [dense_feats], dim=1))
        return wide_out + deep_out

DeepFM(华为 2017)用因子分解机自动学两阶特征交叉,省掉了人工编排 wide 侧交叉特征的工作。如果你不想花精力人工调交叉特征,这是最合理的默认选择。

DIN(Deep Interest Network,阿里 2018)在用户行为序列上加了注意力。它不再把历史行为简单平均,而是对每个候选动态地把注意力放在与候选相似的历史行为上

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class DIN(nn.Module):
    """Deep Interest Network:在用户行为序列上做注意力。"""

    def __init__(self, item_dim, user_dim, hidden=(128, 64)):
        super().__init__()
        self.attention = nn.Sequential(
            nn.Linear(item_dim * 4, 36), nn.ReLU(), nn.Linear(36, 1)
        )
        in_dim = item_dim + user_dim + item_dim
        layers, prev = [], in_dim
        for h in hidden:
            layers += [nn.Linear(prev, h), nn.ReLU(), nn.Dropout(0.2)]
            prev = h
        layers.append(nn.Linear(prev, 1))
        self.mlp = nn.Sequential(*layers)

    def forward(self, candidate, history, user_profile):
        cand_exp = candidate.unsqueeze(1).expand_as(history)
        attn_in = torch.cat(
            [history, cand_exp, history - cand_exp, history * cand_exp], dim=2
        )
        weights = torch.softmax(self.attention(attn_in).squeeze(-1), dim=1)
        weighted = (history * weights.unsqueeze(-1)).sum(dim=1)
        return self.mlp(torch.cat([candidate, weighted, user_profile], dim=1))

注意力这个 trick 很关键:一个买过 50 本书、横跨 5 个品类的用户,并不存在一个"平均兴趣"——他只有按品类分别成立的兴趣。DIN 就是用注意力把这种按候选分别激活的兴趣解锁出来。


重排序

重排序是业务逻辑遇到算法输出的地方。三种模式几乎是每个生产系统的标配。

多样性(MMR)

纯 CTR 优化产生的列表会高度同质——用户点了第一个就走人。**MMR(Maximal Marginal Relevance)**贪心选择"既相关又与已选项多样"的物品:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import numpy as np


class DiversityReranker:
    def __init__(self, lambda_div=0.3):
        self.lam = lambda_div

    def rerank(self, items, scores, features, top_k=20):
        selected, remaining = [], list(zip(items, scores, features))
        while len(selected) < top_k and remaining:
            best_idx, best_obj = None, -np.inf
            for idx, (it, sc, ft) in enumerate(remaining):
                if selected:
                    diversity = min(self._dist(ft, s_ft) for _, _, s_ft in selected)
                else:
                    diversity = 1.0
                obj = (1 - self.lam) * sc + self.lam * diversity
                if obj > best_obj:
                    best_obj, best_idx = obj, idx
            selected.append(remaining.pop(best_idx))
        return [it for it, _, _ in selected]

    @staticmethod
    def _dist(a, b):
        va, vb = np.array(list(a.values())), np.array(list(b.values()))
        return 1 - va @ vb / (np.linalg.norm(va) * np.linalg.norm(vb) + 1e-8)

多样性权重(典型取 0.2-0.3)本身就是个 A/B 参数。太低 feed 单调;太高 CTR 下降,因为相关性被牺牲了。

业务规则

硬约束放这里,不要放进 ML 模型。下架过滤、合规要求、运营加权——这些是确定性规则,写成代码远比写成特征更容易审计。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class BusinessRulesReranker:
    def __init__(self, rules):
        self.rules = rules

    def rerank(self, items, scores, metadata):
        out = []
        for item, score in zip(items, scores):
            meta, adj, ok = metadata.get(item, {}), score, True
            for rule in self.rules:
                if not rule.check(item, meta):
                    ok = False
                    break
                adj += rule.score_adjustment(item, meta)
            if ok:
                out.append((item, adj))
        out.sort(key=lambda x: -x[1])
        return [i for i, _ in out]

时效性加权

新闻、视频、短内容场景里,新鲜度本身就是一个重要特征。指数衰减给最近内容一个有界的加权,既不会被旧内容压住,也不会让新内容彻底盖过老内容:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
from datetime import datetime
import numpy as np


class FreshnessReranker:
    def __init__(self, decay_hours=24, max_boost=0.3):
        self.decay, self.max_boost = decay_hours, max_boost

    def rerank(self, items, scores, timestamps):
        now, out = datetime.now(), []
        for item, score in zip(items, scores):
            ts = timestamps.get(item)
            if ts:
                age_h = (now - ts).total_seconds() / 3600
                boost = self.max_boost * np.exp(-age_h / self.decay)
                out.append((item, score * (1 + boost)))
            else:
                out.append((item, score))
        out.sort(key=lambda x: -x[1])
        return [i for i, _ in out]

特征仓库

特征仓库架构:离线批处理路径与在线实时路径共享同一份特征定义

特征仓库是成熟推荐系统里最重要、却又最常被最后才动手建的基础设施。它的核心职责是消除训练-服务偏差:保证一个特征在离线训练时计算的语义,与它在线上服务时计算的语义完全一致

架构上是两条路径共享一份特征定义:

  • 离线路径:Spark/Flink 跑在数据湖上,把特征物化成 Parquet 文件,喂给训练流水线。
  • 在线路径:Flink 消费 Kafka 事件流,把聚合特征写入 Redis,线上 p99 < 5 ms 取出。

两条路径执行的是同一份特征定义(通常是 SQL 或 YAML)。改特征时两边一起改。如果没有这个纪律,迟早会发生这样的事:训练时用的特征语义和线上服务时不一致,AUC 静悄悄掉一两个点,等业务指标发现已经晚了。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import json


class FeatureStore:
    """Redis 后端的在线特征仓库,支持批量取。"""

    def __init__(self, redis_client, ttl=3600):
        self.redis, self.ttl = redis_client, ttl

    def set(self, entity, eid, name, value):
        self.redis.setex(f"{entity}:{eid}:{name}", self.ttl, json.dumps(value))

    def batch_get(self, entity, eids, names):
        keys = [f"{entity}:{e}:{n}" for e in eids for n in names]
        values = self.redis.mget(keys)
        n = len(names)
        return [
            [json.loads(v) if v else None for v in values[i:i + n]]
            for i in range(0, len(values), n)
        ]

值得了解的开源/商业方案:Feast(最流行的开源),Tecton(商业),阿里内部的 Feathr。底层都遵循同样的"离线 + 在线双路径"模式。

时间特征的循环编码

一个看起来很小、却影响很大的细节。23 点和 0 点在时间上相邻,但线性模型会把它们看成相差 23 个单位。用 (sin, cos) 编码可以让模型把它们看成邻居:

1
2
3
4
5
6
7
8
9
def temporal_features(ts):
    dt = datetime.fromtimestamp(ts)
    return {
        "hour_sin": np.sin(2 * np.pi * dt.hour / 24),
        "hour_cos": np.cos(2 * np.pi * dt.hour / 24),
        "dow_sin":  np.sin(2 * np.pi * dt.weekday() / 7),
        "dow_cos":  np.cos(2 * np.pi * dt.weekday() / 7),
        "is_weekend": int(dt.weekday() >= 5),
    }

A/B 测试框架

A/B 测试结果:14 天对照组与实验组 CTR 趋势、置信带、显著性达到时点,以及各指标的提升分解

A/B 测试是你发现"离线 AUC 涨 3% 的模型,上线后实际跌 1.5%“的唯一手段——这种事比想象中常见得多。三个属性必须保证:

  • 一致性哈希分流:同一个用户必须始终被分到同一个分桶。中途换组既毁体验,又毁统计有效性。
  • 预先注册的指标:包括"护栏指标”(延迟、错误率、营收),即使主指标涨了,护栏破了也不能上线。
  • 事先做功效分析:开始实验前就要算清楚需要多少样本量。看到一个"看起来不错"的中间结果就提前停止,假阳率会爆炸式增长。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import numpy as np
from collections import defaultdict
from scipy.stats import norm


class ABTestFramework:
    """生产级 A/B 框架:一致性分流 + 比例 z 检验。"""

    def __init__(self):
        self.experiments, self.events = {}, defaultdict(list)

    def create(self, exp_id, variants_split):  # 例如 {"control": 50, "v1": 50}
        self.experiments[exp_id] = variants_split

    def assign(self, user_id, exp_id):
        h = hash(f"{user_id}:{exp_id}") % 100
        cum = 0
        for variant, split in self.experiments[exp_id].items():
            cum += split
            if h < cum:
                return variant
        return "control"

    def log(self, user_id, exp_id, variant, event):
        self.events[exp_id].append({"user": user_id, "variant": variant, "event": event})

    def analyse(self, exp_id):
        stats = defaultdict(lambda: {"impr": 0, "click": 0})
        for ev in self.events[exp_id]:
            if ev["event"] == "impression":
                stats[ev["variant"]]["impr"] += 1
            elif ev["event"] == "click":
                stats[ev["variant"]]["click"] += 1

        out, ctrl = {}, stats.get("control")
        for v, s in stats.items():
            ctr = s["click"] / max(s["impr"], 1)
            out[v] = {"ctr": ctr, **s}

        if ctrl:
            for v, s in out.items():
                if v == "control":
                    continue
                p_pool = (ctrl["click"] + s["click"]) / (ctrl["impr"] + s["impr"])
                se = np.sqrt(p_pool * (1 - p_pool) * (1 / ctrl["impr"] + 1 / s["impr"]))
                z = (s["ctr"] - out["control"]["ctr"]) / (se + 1e-8)
                s["lift"] = (s["ctr"] - out["control"]["ctr"]) / (out["control"]["ctr"] + 1e-8)
                s["p_value"] = 2 * (1 - norm.cdf(abs(z)))
                s["significant"] = s["p_value"] < 0.05
        return out

**A/B 实验该跑多久?**至少要满足三个条件:(1) 覆盖一个完整的星期周期(一般两周);(2) 达到功效分析算出的样本量;(3) 越过新奇效应(用户对新东西天然有点击冲动)。两周是最常见的答案,少于一周的实验都要打个问号。

常见陷阱:变体之间的网络效应(实验组用户通过共享内容影响对照组)、SUTVA 假设违反、不同用户群的异质处理效应、累积效应(实验组提升了长期留存但短期 CTR 下降)。这些问题的根本解法是分层实验平台——这也是为什么 Google、Meta、字节都自己搭了一套实验平台。


持续训练

持续训练流水线:四种重训练触发条件、四阶段部署、监控反馈闭环

模型会衰减。用户行为漂移、物品池变化、季节性切换。上个月还是 SOTA 的模型,下个月不重训就会变成包袱。训练流水线必须自动跑,触发条件包括:

  • 定时调度——精排日级,增量更新小时级,最易变特征近实时。
  • 漂移检测——重要特征 PSI(Population Stability Index)超过 0.2 就触发,不等定时调度。
  • 指标衰减——离线 AUC 相对上一个 checkpoint 下降超过 2%。
  • 代码变更——新的特征定义或模型结构。

训练的输出不是已部署的模型,而是模型注册中心里的一个带元数据的 artifact:版本号、训练数据窗口、离线指标、血缘信息。部署是一个独立的、有门禁的步骤。

部署:影子 → 金丝雀 → A/B → 全量

新模型不可能从注册中心一步到 100% 流量。标准的阶梯式发布:

  1. 影子流量——0% 流量,但模型并行运行,预测结果记日志。这能在不影响用户的前提下抓出延迟回归、schema 不一致、服务 bug。
  2. 金丝雀——1-10% 流量跑 1-24 小时。护栏指标破线就自动回滚。
  3. A/B 测试——50% 流量跑 1-2 周做正式统计验证。
  4. 全量——100% 流量。

自动回滚没得商量。判断标准很硬:p95 延迟超 SLO、错误率超 1%、CTR 跌幅超 5%——任意一条触发就回滚并寻呼人工。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
class CanaryDeployment:
    def deploy(self, version, initial_pct=10):
        if not self.validate_offline(version):
            raise ValueError("offline validation failed")
        canary = self.deploy_canary(version, traffic=initial_pct)
        metrics = self.monitor(canary, minutes=60)
        if self.healthy(metrics):
            self.rollout(canary, target=100)
        else:
            self.rollback(canary)
            self.alert("canary failed", metrics)

    @staticmethod
    def healthy(m):
        return (m["p95_latency"] < 100
                and m["error_rate"] < 0.01
                and m.get("ctr_delta", 0) > -0.05)

服务基础设施

服务基础设施:API 网关、负载均衡、推荐编排器,下游召回/排序/特征/缓存服务

服务栈分四层,每一层都是无状态、可水平扩展的:

  • API 网关 + 负载均衡(Nginx、Envoy 或云 LB):处理 TLS、鉴权、限流、路由。
  • 推荐编排器:无状态服务,串联整个漏斗,依次调用召回、排序、重排,再合并结果。
  • 后端服务:召回服务背后是 Faiss/HNSW;排序服务跑 TF-Serving 或 Triton 在 GPU 上;特征服务用 Redis(热数据)+ HBase(冷数据)。
  • 多层缓存:特征缓存、向量缓存、整次预测缓存。预测缓存命中率 30-50% 是常态,相应地节省同样比例的计算成本。

性能优化

三种技术叠加可以拿到 5-10 倍加速,且几乎不损失质量:

量化(FP32 → INT8):CPU 上 2-4 倍,GPU 上小幅提升。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
import torch.quantization as quant

def quantize(model, calibration_loader):
    model.eval()
    model.qconfig = quant.get_default_qconfig("fbgemm")
    quant.prepare(model, inplace=True)
    with torch.no_grad():
        for batch in calibration_loader:
            model(batch)
    return quant.convert(model, inplace=False)

知识蒸馏:用一个小学生模型模仿大老师模型。学生不仅学硬标签,还学老师输出的软概率——后者携带了"物品之间相对优劣"的信息。

1
2
3
4
5
6
7
8
import torch.nn.functional as F

def distill_loss(student_logits, teacher_logits, labels, T=3.0, alpha=0.7):
    s_soft = F.log_softmax(student_logits / T, dim=1)
    t_soft = F.softmax(teacher_logits / T, dim=1)
    soft = F.kl_div(s_soft, t_soft, reduction="batchmean") * (T ** 2)
    hard = F.cross_entropy(student_logits, labels)
    return alpha * soft + (1 - alpha) * hard

预测缓存:针对长尾的重复请求。TTL 5 分钟是个不错的默认值——足够摊薄计算成本,又能反映新行为。

标准配方是先蒸馏,再剪枝,最后量化。这个顺序能让每一步都保留前一步的质量收益。

监控

三类指标,全部带告警:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
class RecommendationMonitor:
    def __init__(self, metrics):
        self.metrics = metrics

    def log(self, user_id, scores, latency_ms):
        self.metrics.histogram("pred.latency", latency_ms)
        self.metrics.histogram("pred.score_mean", float(np.mean(scores)))

    def check(self):
        recent = self.metrics.recent("pred.latency", minutes=5)
        if recent["p95"] > 150:
            self.alert("high_latency", recent)
        if self.metrics.error_rate(minutes=5) > 0.05:
            self.alert("high_error_rate")
        scores = self.metrics.recent("pred.score_mean", minutes=30)
        baseline = self.metrics.baseline("pred.score_mean")
        if abs(scores["mean"] - baseline["mean"]) > 2 * baseline["std"]:
            self.alert("distribution_shift", scores)

最隐蔽、也最有价值的告警是第三条——预测分布漂移。如果平均预测 CTR 突然偏离基线两个标准差,上游一定出了问题:某个特征损坏、某个 embedding 索引过期、某个模型悄悄发了错版本。等业务指标动起来再发现,已经损失一个小时了;分布监控能在分钟级把这种问题揪出来。


团队分工

生产推荐团队的角色分工,以及各阶段的主要负责方

生产级推荐系统对一个人来说太大,对完全独立的小团队来说又耦合太深。实践中有效的角色边界是这样的:

  • 算法工程师:负责模型代码、特征设计、A/B 实验。召回、排序、重排模型都他们写。
  • 数据工程师:负责 ETL 流水线、样本生成、特征仓库的离线侧。他们是数据质量问题的防火墙。
  • MLOps / 平台工程师:负责训练基础设施、模型注册中心、CI/CD、服务运行时。让"上线一个新模型"从一个月变成一天。
  • SRE / 基础设施:负责延迟 SLO、容量规划、故障响应。凌晨三点被叫起来的就是他们。
  • 分析师 / 研究:负责长期评估、因果推断、排序诊断。他们抓"指标看着挺好但营收不动"这种问题。
  • 产品:负责业务 KPI 和内容政策。

图右侧的矩阵列出了各阶段的主要负责方。规律是:每个阶段都至少有两个负责方——因为每个阶段同时有"模型质量"和"运维"两个维度。


工业级框架

阿里 EasyRec(开源):端到端框架,包含特征工程、预置模型(Wide & Deep、DeepFM、DIN、MMoE)、PAI/MaxCompute 训练、PAI-EAS 部署。如果你已经在阿里云上,这是最快上到生产级 baseline 的路径。

Meta TorchRec:TorchRec 是开源的、支撑 Meta 内部推荐栈的库。强项是分片 embedding 表——这是大规模推荐训练里最难的分布式系统问题。

字节跳动 Monolith(开源):面向十亿参数级在线学习设计。核心是无碰撞 embedding 哈希表和异步训练——能从生产日志近实时地更新模型。TikTok 推荐栈的一部分就在它上面。

YouTube 两阶段系统:经典论文是 Covington 等 2016 那篇——双塔深度候选生成 + 深度排序。后续演进里最有影响力的是 2019 年 sampled softmax 那篇,但两阶段的骨架仍然是大多数团队的模板。


常见问题 Q&A

召回路数应该有多少?

从三路开始:协同过滤、双塔深度、实时行为。专门通道(图、内容、地理、社交)只在离线分析显示"现有通道遗漏了哪些好物品"时才加。超过十路,时间和成本基本花在管道上而不是质量上了。

粗排到精排的合理收缩比是多少?

通常 10:1(2,000 → 200 → 20)。要端到端监控 recall@K:粗排太激进会丢掉精排本来能挑出来的好物品;粗排太松又浪费精排的算力预算。

精排该多复杂?

从 Wide & Deep 或 DeepFM 起步。只有在测出"现有模型没充分利用序列信息"(典型表现:历史丰富的用户预测分平淡)之后,才加 DIN 这种基于注意力的序列建模。每提升一档复杂度,都要让它服务成本的增加被指标提升 justify。

重排该用模型还是规则?

混合用。硬约束(合规、库存、黑名单)用确定性规则——可审计。软优化(多样性、新鲜度、探索)用学习型重排。两者并存是常态。

量化、剪枝、蒸馏怎么选?

量化单位投入产出比最高(CPU 上 2-4 倍)。蒸馏在你需要的不只是更快、还要更小模型时是正解。剪枝最脆弱——能用,但需要小心地重训。推荐顺序:蒸馏 → 剪枝 → 量化。

如何处理新用户?

多级降级:(1) 全新用户给热门和趋势内容;(2) 知道一点画像后给基于画像的内容推荐;(3) 用 bandit 风格的探索在 3-10 次交互内积累信号;(4) ~50 次交互后切到完整的个性化模型。详见第 14 篇关于元学习的部分。

什么时候下线一个模型?

满足两个条件:(a) 后继模型在主指标上 A/B 测试胜出,没有任何护栏指标回退;(b) 新模型的运维成本可接受。无论如何,旧模型要保留 30 天可重新部署的状态,以应对滞后回归。


总结

本文把工业级推荐系统的完整栈搭起来了:

  • 三个平面——数据、训练、服务,平面之间接口清晰
  • 四级漏斗——召回、粗排、精排、重排,把数亿用户塞进 100 ms 预算
  • 多路召回 + RRF 融合,因为没有单一通道能覆盖全部质量
  • Wide & Deep、DeepFM、DIN 作为精排的生产级架构
  • 特征仓库用一份特征定义同时驱动离线训练和在线服务,消除偏差
  • A/B 测试做一致性分流、z 检验、预先注册护栏指标
  • 持续训练由调度、漂移、指标衰减触发
  • 金丝雀发布配合护栏指标自动回滚
  • 服务基础设施——网关、编排器、GPU 模型服务、Redis 特征仓库、预测缓存
  • 团队分工清晰映射到流水线各阶段

工业实践里最有价值的一条经验也是最朴素的一条:从小处起步,凡事度量,由 A/B 决定留哪个。一个用热门 + 简单双塔召回 + DeepFM 精排 + 严格实验纪律的流水线,会跑赢一个没有 A/B 框架就上线的奇异 GNN。本文里讲的所有框架都不是竞争优势的来源——它们支撑起的那套迭代闭环才是。


系列导航

本文是推荐系统 16 篇系列的第 16 篇(完结)。感谢你一路读到这里。

上一篇
第十五篇:实时推荐与在线学习所有文章

参考文献

  • Covington, P., Adams, J., Sargin, E. “Deep Neural Networks for YouTube Recommendations.” RecSys 2016. paper
  • Yi, X., et al. “Sampling-Bias-Corrected Neural Modeling for Large Corpus Item Recommendations.” RecSys 2019.(“双塔 + logQ 修正"那篇)
  • Cheng, H., et al. “Wide & Deep Learning for Recommender Systems.” DLRS 2016. arXiv:1606.07792
  • Guo, H., et al. “DeepFM: A Factorization-Machine based Neural Network for CTR Prediction.” IJCAI 2017. arXiv:1703.04247
  • Zhou, G., et al. “Deep Interest Network for Click-Through Rate Prediction.” KDD 2018. arXiv:1706.06978
  • Liu, Z., et al. “Monolith: Real Time Recommendation System With Collisionless Embedding Table.” 2022. arXiv:2209.07663
  • 阿里 EasyRec:github.com/alibaba/EasyRec
  • TorchRec:github.com/pytorch/torchrec
  • Feast(开源特征仓库):feast.dev

Liked this piece?

Follow on GitHub for the next one — usually one a week.

GitHub