推荐系统(九)—— 多任务学习与多目标优化

线上推荐从来不是优化一个数。本文从样本选择偏差、负迁移、梯度冲突这些真实痛点出发,把 Shared-Bottom、ESMM、MMoE、PLE 四种工业级架构讲透,配上完整 PyTorch 代码、损失平衡策略与可上线的训练流水线。

打开淘宝刷一会,你看到的每一张商品图背后,模型都在同时回答好几个问题:用户会点击吗?点了会加购吗?加购了会下单吗?下单了会退货吗?买了会写好评吗?这些都是不同的任务,分布、稀疏度、商业含义各不相同;但它们又彼此牵连——肯点的人更可能转化,转化的人更可能复购,但 CTR 高的吸睛缩略图也可能拉低观看时长。

多任务学习(Multi-Task Learning,MTL)就是工业界给这个局面的统一答案。与其每个目标训一个模型再人工拼分,不如让一张神经网络把所有目标都吐出来,让共享表示同时服务多个 Head。难的不是画架构图,而是要让这些 Head 在共享参数上配合而不是打架

本文是这套体系的"思维模型 + 可运行代码"。我们会过完工业上真正会用到的四种架构——Shared-Bottom、ESMM、MMoE、PLE——再讲清楚为什么它们各自必要:负迁移、梯度冲突、样本选择偏差是怎么把 Naive 方案撕碎的,不确定性加权、GradNorm、Pareto 又是怎么把缝缝补补到能上线的。

你将学到

  • 推荐为什么本质上是多目标问题,强行单目标会发生什么
  • CVR 预估里的样本选择偏差和 ESMM 的链式法则解法
  • 四种架构 Shared-Bottom、ESMM、MMoE、PLE 各自适合什么场景
  • 损失平衡:不确定性加权、GradNorm、Pareto 前沿
  • 一套可以直接搬进项目的 PyTorch 训练流水线

前置知识

  • PyTorch 基本用法(Module、forward、loss)
  • CTR 预估基础(参见第四篇
  • Embedding 层(参见第五篇

推荐系统为何天然是多目标

单目标优化是错的题

想一个餐厅推荐器。如果只优化点击,模型学到的就是浮夸图片+夸张标题,转化率跟着崩;只优化预订,推出来的全是连锁店,毫无发现感;只优化评分,全是没人订的米其林。诚实的目标是一

  • 用户会点击吗?(曝光)
  • 会真的去到店吗?(转化)
  • 去了满意吗?(满意度)
  • 之后会回头吗?(留存)

不同业务里这套配方完全一样:

业务常见目标
电商CTR、CVR、单次曝光 GMV、退货率、评价质量
短视频CTR、播放时长、点赞/分享率、关注率
广告CTR、CVR、CPA、用户生命周期价值

这些指标相关但不相同。MTL 模型的工作就是在利用相关性(用 CTR 信号教 CVR Head 理解兴趣)的同时,不让任何一个目标把别人压垮。

样本选择偏差:Naive CVR 为什么是坏的

下面这个细节是 ESMM 的全部动机。假设你想做一个转化预估模型:给定用户和商品,预测 $P(\text{购买} \mid \text{曝光})$。

  • 训练标签只在点击样本上存在——只有点击之后才能观察到购买。
  • 线上要给所有候选打分——包括用户从未点过的那些。

你在"被点击的曝光"这个切片上训,到线上却要预测整个曝光集合。这就是教科书级别的样本选择偏差:训练分布和线上分布不一致。

ESMM 的逃逸路径用的是概率链式法则:

$$P(\text{购买} \mid \text{曝光}) = P(\text{点击} \mid \text{曝光}) \cdot P(\text{购买} \mid \text{点击})$$

翻成人话:“看到一个商品后买它的概率,等于点它的概率乘以点了之后买的概率。” 第一项(CTR)和乘积(CTCVR)都可以在完整曝光空间上观测。我们直接监督这两个,CVR 就以"副产品"的形式被训出来——再也不用在偏差切片上学习了。

MTL 的甜头与代价

甜头

  • 样本利用率:稀疏任务(购买、关注)能蹭上密集任务(曝光、点击)的信号。
  • 隐式正则:跨任务共享参数,本身就是一种防过拟合的约束。
  • 推理便宜:一次前向算出排序需要的所有分数。
  • 泛化更好:联合训练倾向于学到跨目标稳健的表示。

代价

  • 负迁移:任务把共享参数往相反方向拽,每个 Head 都比单任务基线差。
  • 损失平衡:CTR 损失约 0.3,营收 MSE 上百,直接相加只会让回归任务把别人吃掉。
  • 架构设计:哪些层共享、哪些独立,几乎全靠经验。

下面四种架构本质上就是同一个问题的四种答案:共享多少?共享在哪?


架构一:Shared-Bottom

Shared-Bottom 架构示意:一条共享 MLP 主干分叉为多个任务塔

最朴素的起点。一个 MLP 主干算出表示 $h$,然后每个任务挂一个小塔:

$$h = f_{\text{shared}}(x), \qquad \hat{y}_k = f_k(h), \quad k=1,\dots,K$$

所有任务共用同一个 $h$。这是它唯一的设计假设,也是它唯一的失败模式。

实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import torch
import torch.nn as nn
import torch.nn.functional as F


class SharedBottomMTL(nn.Module):
    """Shared-Bottom:一根主干 + K 个任务塔。

    任务方向一致时简单好用;方向冲突时主干被两边拉扯,反而每个任务都比单任务基线差。
    """

    def __init__(self, input_dim, shared_hidden_dims, task_hidden_dims,
                 num_tasks, task_types):
        super().__init__()
        self.num_tasks = num_tasks
        self.task_types = task_types

        # ---- 共享主干
        shared, prev = [], input_dim
        for h in shared_hidden_dims:
            shared += [nn.Linear(prev, h), nn.BatchNorm1d(h),
                       nn.ReLU(), nn.Dropout(0.2)]
            prev = h
        self.shared_bottom = nn.Sequential(*shared)

        # ---- 每个任务独立的塔
        self.task_towers = nn.ModuleList()
        for k in range(num_tasks):
            layers, prev = [], shared_hidden_dims[-1]
            for h in task_hidden_dims:
                layers += [nn.Linear(prev, h), nn.BatchNorm1d(h),
                           nn.ReLU(), nn.Dropout(0.1)]
                prev = h
            layers.append(nn.Linear(prev, 1))
            if task_types[k] == 'binary':
                layers.append(nn.Sigmoid())
            self.task_towers.append(nn.Sequential(*layers))

    def forward(self, x):
        h = self.shared_bottom(x)
        return [tower(h) for tower in self.task_towers]


# 三个任务:CTR(二分类)、CVR(二分类)、Revenue(回归)
model = SharedBottomMTL(
    input_dim=128,
    shared_hidden_dims=[256, 128, 64],
    task_hidden_dims=[32, 16],
    num_tasks=3,
    task_types=['binary', 'binary', 'regression'],
)

x = torch.randn(32, 128)
ctr, cvr, rev = model(x)
print(ctr.shape, cvr.shape, rev.shape)  # 三个都是 (32, 1)

为什么最终会失效

举个例子:CTR 偏好"新颖、吸睛"的特征,CVR 偏好"稳定、可预期"的特征——两边对共享表示的诉求相反。主干被迫做妥协,结果两个任务都比单任务版本差。这就是负迁移,也是后面所有架构存在的全部理由。


架构二:ESMM(Entire Space Multi-Task Model)

ESMM 是阿里给 CVR 样本选择偏差开的方子。架构本身不复杂,关键在于监督信号放在哪

ESMM 架构:共享 Embedding 同时喂给 CTR 塔和 CVR 塔,CTCVR 是两者乘积,在全曝光空间上监督

思路

不要直接在"被点击的切片"上训 CVR。在完整曝光空间上训两件事:

  • pCTR = $P(\text{点击} \mid \text{曝光})$,监督信号是点击标签
  • pCTCVR = $P(\text{点击且购买} \mid \text{曝光})$,监督信号是 点击 AND 购买

CVR 通过乘法约束 $\text{pCTCVR} = \text{pCTR} \times \text{pCVR}$ 被隐式监督。反向传播会让 CVR 塔学到合理的概率,但它从头到尾都没有被强行去拟合一个有偏的标签。

实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class ESMM(nn.Module):
    """Entire-Space Multi-Task Model。

    分解 P(购买|曝光) = P(点击|曝光) * P(购买|点击)。
    CTR 和 CTCVR 在完整曝光空间上训练;CVR 通过乘积间接监督,没有选择偏差。
    """

    def __init__(self, input_dim, hidden_dims):
        super().__init__()
        self.embedding = nn.Sequential(
            nn.Linear(input_dim, hidden_dims[0]),
            nn.ReLU(), nn.Dropout(0.2),
        )
        self.ctr_tower = self._tower(hidden_dims)
        self.cvr_tower = self._tower(hidden_dims)

    def _tower(self, dims):
        layers, prev = [], dims[0]
        for h in dims[1:]:
            layers += [nn.Linear(prev, h), nn.ReLU(), nn.Dropout(0.2)]
            prev = h
        layers += [nn.Linear(prev, 1), nn.Sigmoid()]
        return nn.Sequential(*layers)

    def forward(self, x):
        h = self.embedding(x)
        pctr = self.ctr_tower(h)            # P(点击 | 曝光)
        pcvr = self.cvr_tower(h)            # P(购买 | 点击)
        pctcvr = pctr * pcvr                # P(点击且购买 | 曝光)
        return pctr, pcvr, pctcvr


def esmm_loss(pctr, pcvr, pctcvr, click_label, buy_label):
    """ESMM 损失:

    - CTR 损失在所有曝光样本上(每条曝光都有点击标签)。
    - CTCVR 损失在所有曝光样本上("点击 AND 购买"对所有曝光都可观测)。
    - CVR 没有直接损失——梯度通过乘积反向传到 CVR 塔。
    """
    ctcvr_label = click_label * buy_label  # 仅"点击且购买"为 1
    loss_ctr = F.binary_cross_entropy(pctr, click_label.float())
    loss_ctcvr = F.binary_cross_entropy(pctcvr, ctcvr_label.float())
    return loss_ctr + loss_ctcvr


# 简化的训练步
model = ESMM(input_dim=128, hidden_dims=[256, 128, 64])
opt = torch.optim.Adam(model.parameters(), lr=1e-3)

x = torch.randn(32, 128)
click = torch.randint(0, 2, (32, 1)).float()
buy = torch.randint(0, 2, (32, 1)).float()  # 仅在 click=1 时有意义

pctr, pcvr, pctcvr = model(x)
loss = esmm_loss(pctr, pcvr, pctcvr, click, buy)
loss.backward(); opt.step()
print(f"loss = {loss.item():.4f}")

为什么有效

  • 没有偏差切片:CTR 和 CTCVR 都活在完整曝光空间里,CVR 从来不被要求拟合"只在点击后出现"的标签。
  • 数学一致:构造上 $\text{pCTCVR} = \text{pCTR} \times \text{pCVR}$,线上的三个分数不会互相矛盾。
  • 部署便宜:和"两个二分类 Head 的 Shared-Bottom"形状一样,排序流水线几乎不用动。

阿里原论文里这一招在淘宝把 CVR AUC 拉了 2-3 个点。对于一行架构改动来说非常可观。


架构三:MMoE(多门控混合专家)

Shared-Bottom 把所有任务挤进同一个瓶颈。MMoE 维护一池"专家子网络",让每个任务自己学一个软门控去挑专家。

MMoE 架构:一池专家 MLP,每个任务都有自己的门控网络

类比

办家庭聚会,要做三件事:做饭、布置、放音乐。你不再请一个全能选手(Shared-Bottom),而是请四个专家。每件事配一位经理(门控)来决定要听谁的话多一点。做饭经理多听大厨和侍酒师,布置经理多听花艺师和灯光师;但大厨可能也对摆盘有想法——经理可以混搭。

数学

任务 $k$ 的表示是所有 $n$ 个专家的门控加权和:

$$f_k(x) = \sum_{i=1}^{n} g_k^{(i)}(x) \cdot E_i(x), \qquad g_k(x) = \mathrm{softmax}(W_k x)$$

每个门控就是"线性层 + Softmax"的小网络。冲突的任务会自然学会指向不同的专家,相关的任务会学会共享同一批专家。

实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
class Expert(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, hidden_dim), nn.ReLU(), nn.Dropout(0.1),
            nn.Linear(hidden_dim, output_dim), nn.ReLU(),
        )

    def forward(self, x):
        return self.net(x)


class MMoE(nn.Module):
    """Multi-gate Mixture-of-Experts。

    一池 n 个专家;K 个任务,每个任务有自己的门控来对专家做软加权混合。
    冲突任务可以自动路由到不同专家——由模型自己学。
    """

    def __init__(self, input_dim, num_experts, expert_hidden_dim,
                 expert_output_dim, num_tasks, task_hidden_dims, task_types):
        super().__init__()
        self.num_tasks = num_tasks

        self.experts = nn.ModuleList([
            Expert(input_dim, expert_hidden_dim, expert_output_dim)
            for _ in range(num_experts)
        ])
        self.gates = nn.ModuleList([
            nn.Sequential(nn.Linear(input_dim, num_experts), nn.Softmax(dim=1))
            for _ in range(num_tasks)
        ])

        self.task_towers = nn.ModuleList()
        for k in range(num_tasks):
            layers, prev = [], expert_output_dim
            for h in task_hidden_dims:
                layers += [nn.Linear(prev, h), nn.ReLU(), nn.Dropout(0.1)]
                prev = h
            layers.append(nn.Linear(prev, 1))
            if task_types[k] == 'binary':
                layers.append(nn.Sigmoid())
            self.task_towers.append(nn.Sequential(*layers))

    def forward(self, x):
        # 把所有专家堆成一个张量:(B, n_experts, expert_dim)
        E = torch.stack([e(x) for e in self.experts], dim=1)

        outputs, gates_out = [], []
        for k in range(self.num_tasks):
            w = self.gates[k](x)               # (B, n_experts)
            gates_out.append(w)
            mix = (w.unsqueeze(2) * E).sum(1)  # (B, expert_dim)
            outputs.append(self.task_towers[k](mix))
        return outputs, gates_out


model = MMoE(
    input_dim=128, num_experts=4, expert_hidden_dim=64,
    expert_output_dim=32, num_tasks=3, task_hidden_dims=[16],
    task_types=['binary', 'binary', 'regression'],
)
x = torch.randn(32, 128)
outs, gates = model(x)

# 看路由:往往任务 1 和任务 3 偏好的专家子集就不一样
print("Task 1 gate weights:", gates[0][0].detach().round(decimals=2))
print("Task 3 gate weights:", gates[2][0].detach().round(decimals=2))

什么时候选 MMoE

  • 怀疑某些任务相互冲突,但说不清是哪些。
  • 想要一种架构能同时优雅处理"配合"与"对抗"两种情况。
  • 门控权重本身就是有用的可观测指标——画出来就知道任务实际共享了哪些专家。

架构四:PLE(渐进式分层提取)

PLE 是腾讯对 MMoE 的修补。即便有了门控,共享专家池仍然可能让一个任务的梯度污染到另一个任务依赖的特征——这就是著名的跷跷板现象:CTR 升多少,观看时长就掉多少。

PLE 架构:共享专家 + 任务特定专家,每个任务通过门控组合自己可见的专家

思路

把共享和私有显式拆开。每一层有:

  • 共享专家:所有任务可见,学跨任务的公共模式。
  • 任务特定专家:只属于一个任务,永远不会被别的任务的梯度碰到。

每个任务的门控只看得到共享专家加上自己的任务特定专家。叠几层就有了"渐进提取"——底层做通用特征,高层做任务特化。

实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
class PLELayer(nn.Module):
    """单层 PLE:共享专家 + 任务特定专家 + 任务级门控。"""

    def __init__(self, input_dim, num_shared, num_per_task,
                 expert_hidden, expert_out, num_tasks):
        super().__init__()
        self.num_tasks = num_tasks
        self.shared_experts = nn.ModuleList([
            Expert(input_dim, expert_hidden, expert_out)
            for _ in range(num_shared)
        ])
        self.task_experts = nn.ModuleList([
            nn.ModuleList([
                Expert(input_dim, expert_hidden, expert_out)
                for _ in range(num_per_task)
            ])
            for _ in range(num_tasks)
        ])
        total = num_shared + num_per_task
        self.gates = nn.ModuleList([
            nn.Sequential(nn.Linear(input_dim, total), nn.Softmax(dim=1))
            for _ in range(num_tasks)
        ])

    def forward(self, x):
        shared = torch.stack([e(x) for e in self.shared_experts], dim=1)
        out = []
        for k in range(self.num_tasks):
            task_e = torch.stack([e(x) for e in self.task_experts[k]], dim=1)
            pool = torch.cat([shared, task_e], dim=1)
            w = self.gates[k](x).unsqueeze(2)
            out.append((w * pool).sum(1))
        return out


class PLE(nn.Module):
    """Progressive Layered Extraction:把共享和任务特定专家显式分开。"""

    def __init__(self, input_dim, num_layers, num_shared, num_per_task,
                 expert_hidden, expert_out, num_tasks,
                 task_hidden_dims, task_types):
        super().__init__()
        self.num_tasks = num_tasks
        self.layers = nn.ModuleList()
        prev = input_dim
        for _ in range(num_layers):
            self.layers.append(PLELayer(
                prev, num_shared, num_per_task,
                expert_hidden, expert_out, num_tasks,
            ))
            prev = expert_out

        self.task_towers = nn.ModuleList()
        for k in range(num_tasks):
            layers, p = [], expert_out
            for h in task_hidden_dims:
                layers += [nn.Linear(p, h), nn.ReLU(), nn.Dropout(0.1)]
                p = h
            layers.append(nn.Linear(p, 1))
            if task_types[k] == 'binary':
                layers.append(nn.Sigmoid())
            self.task_towers.append(nn.Sequential(*layers))

    def forward(self, x):
        # 每个任务沿着层栈各自携带表示往上走
        reps = [x] * self.num_tasks
        for layer in self.layers:
            # 每个任务把自己的当前表示送进下一层
            reps = [layer(reps[k])[k] for k in range(self.num_tasks)]
        return [self.task_towers[k](reps[k]) for k in range(self.num_tasks)]


model = PLE(
    input_dim=128, num_layers=2,
    num_shared=2, num_per_task=2,
    expert_hidden=64, expert_out=32,
    num_tasks=3, task_hidden_dims=[16],
    task_types=['binary', 'binary', 'regression'],
)
out = model(torch.randn(32, 128))
print([o.shape for o in out])

为什么这种"分家"有效

任务特定专家接收属于它自己的梯度。CTR 的梯度往一个 CVR 不喜欢的方向拽时,伤害被限制在共享专家里——CVR 的私有专家毫发无损。腾讯视频的报告里 PLE 在 MMoE 之上又拿了约 0.4% 的 AUC,并显著缓解了跷跷板。


怎么选架构

场景推荐理由
任务方向一致,工程资源有限Shared-Bottom简单、快、好的基线
CVR 预估,标签只在点击样本ESMM构造上消除选择偏差
不确定任务是否冲突MMoE门控自动学路由
已知有共享也有冲突PLE显式分家封顶负迁移

工程上比较稳的演进路径:先上 Shared-Bottom;如果 MTL 在某个任务上还不如对应的单任务基线,换 MMoE;观察到跷跷板,再换 PLE。


损失平衡:别让一个任务把别人吃掉

CTR 损失约 0.3,CVR 约 0.05,营收 MSE 上百。直接相加,回归任务就把别人灭了。损失平衡不是收尾的小修饰,而是"模型真的在学两个任务"和"在学一个半"的分水岭。

不确定性加权(Kendall et al., 2018)

把每个任务看作带噪声的回归/分类,每个任务有自己的方差 $\sigma_k^2$,并把方差当成可学习参数:

$$\mathcal{L}_{\text{total}} = \sum_k \frac{1}{2\sigma_k^2}\, \mathcal{L}_k + \log \sigma_k$$

难任务的 $\sigma_k$ 自然变大,对应权重就被压低;$\log \sigma_k$ 项防止 $\sigma_k$ 跑到无穷。下图左侧就是预期的演化:易任务的归一化权重慢慢长起来,难任务的慢慢退下去。

不确定性加权与 GradNorm:训练过程中权重和梯度范数如何被自动调整

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
class UncertaintyWeighting(nn.Module):
    """学习每个任务的 log-variance,自动压低难任务。"""

    def __init__(self, num_tasks):
        super().__init__()
        self.log_var = nn.Parameter(torch.zeros(num_tasks))  # = 2 log sigma

    def forward(self, task_losses):
        total = 0.0
        for k, loss in enumerate(task_losses):
            precision = torch.exp(-self.log_var[k])  # 1 / sigma^2
            total = total + 0.5 * precision * loss + 0.5 * self.log_var[k]
        return total

GradNorm

不确定性加权平衡的是损失GradNorm 平衡的是共享主干上的梯度范数。直觉上,谁的梯度范数最大,谁就在主导主干往哪走。GradNorm 调整任务权重让每个任务对主干的"拉力"差不多,并且可以偏向"学得慢"的任务。上图右侧展示了不平衡时各任务范数漂移开,加上 GradNorm 后被拉回到一个共同的速率。

Pareto 优化

任务真冲突时,不存在一个"最优"的加权方式——只有不同的取舍。把可达到的 (CTR-AUC, CVR-AUC) 点画出来,外缘就是Pareto 前沿:在这条线上你没法在不牺牲另一个指标的前提下改进任何一个。

CTR AUC 与 CVR AUC 之间的 Pareto 前沿,标出了三个典型工作点

线上要落在这条线上的哪一点?这其实不是数学题,是产品判断:业务现在更看重哪个指标?


为什么任务会打架:梯度冲突

讲了这么多架构,根本原因是什么?因为共享参数同时被往两个方向拽。挑一个共享权重 $\theta$:任务 A 说"加大",任务 B 说"减小",联合更新就是两个相反箭头的——通常比任何一边想要的步长都小,方向也是两边都不喜欢的。

共享参数上两个任务的梯度可能方向冲突;训练过程中梯度余弦相似度甚至会翻负

右侧那张图正是线上你应该去监控的指标:在共享主干上算两个任务损失的梯度余弦相似度。一旦它长期为负,模型就在打梯度战。这就是该考虑 PLE(拆专家)或 PCGrad(把冲突分量投影掉再走)的精确时刻。这个数算起来很便宜,比看汇总损失曲线信息量大得多。


工业参考点

公司架构场景提升
阿里ESMM淘宝 CVR较有偏 CVR 基线提升 ~2-3% AUC
GoogleMMoEYouTube 排序多目标全面优于 Shared-Bottom
腾讯PLE腾讯视频较 MMoE 再提 ~0.4% AUC,跷跷板缓解

数字来自原始论文;实际生产线还会继续迭代。


一份可以直接跑的训练流水线

把 MMoE 加上不确定性加权,串成完整的训练循环:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import numpy as np


class RecDataset(Dataset):
    def __init__(self, features, labels):
        self.x = torch.FloatTensor(features)
        self.y = {k: torch.FloatTensor(v) for k, v in labels.items()}
        self.tasks = list(labels.keys())

    def __len__(self):
        return len(self.x)

    def __getitem__(self, i):
        return {'x': self.x[i], **{t: self.y[t][i] for t in self.tasks}}


def train_epoch(model, loader, opt, balancer, tasks, device):
    model.train()
    running = {t: 0.0 for t in tasks}
    total = 0.0
    for batch in loader:
        x = batch['x'].to(device)
        y = {t: batch[t].to(device) for t in tasks}

        outs, _ = model(x)

        losses = []
        for k, t in enumerate(tasks):
            if t in ('ctr', 'cvr'):
                losses.append(F.binary_cross_entropy(outs[k].squeeze(), y[t]))
            else:
                losses.append(F.mse_loss(outs[k].squeeze(), y[t]))
            running[t] += losses[-1].item()

        loss = balancer(losses) if balancer else sum(losses)
        opt.zero_grad(); loss.backward(); opt.step()
        total += loss.item()

    n = len(loader)
    return total / n, {t: v / n for t, v in running.items()}


# ------ 跑起来
N = 10_000
features = np.random.randn(N, 128).astype(np.float32)
labels = {
    'ctr': np.random.randint(0, 2, N).astype(np.float32),
    'cvr': np.random.randint(0, 2, N).astype(np.float32),
    'revenue': (np.random.rand(N) * 100).astype(np.float32),
}
loader = DataLoader(RecDataset(features, labels), batch_size=64, shuffle=True)

model = MMoE(
    input_dim=128, num_experts=4, expert_hidden_dim=64,
    expert_output_dim=32, num_tasks=3, task_hidden_dims=[16],
    task_types=['binary', 'binary', 'regression'],
)
balancer = UncertaintyWeighting(num_tasks=3)
opt = optim.Adam(list(model.parameters()) + list(balancer.parameters()), lr=1e-3)

for epoch in range(5):
    loss, per_task = train_epoch(model, loader, opt, balancer,
                                 ['ctr', 'cvr', 'revenue'], 'cpu')
    print(f"epoch {epoch+1}: loss={loss:.4f} | "
          f"ctr={per_task['ctr']:.4f} | cvr={per_task['cvr']:.4f} | "
          f"rev={per_task['revenue']:.4f}")

FAQ

什么时候 MTL 真的值得这套复杂度?

任务之间确实共享底层的用户/物品结构、且至少有一个稀疏任务能蹭上密集任务的信号、且你在意推理成本时——值得。如果这些都不成立(任务彻底独立、推理预算宽裕),分开训反而更简单且常常更好。

MMoE / PLE 该用多少个专家?

经验:至少和任务数一样多;通常 2-3 个任务用 2-4 个专家,更多任务用 4-8 个。太少专家无法分化;太多则容易过拟合且门控会塌缩到少数几个专家上。

标签缺失怎么办?

掩码损失。每条样本只对有标签的任务贡献损失。CVR 这类稀疏标签和 CTR 这类密集标签共存时这是标准做法。

MTL 能帮冷启动吗?

能,间接地。密集任务(点击)塑造的表示,稀疏 Head(购买、关注)可以直接用。新用户只点过几次也能拿到合理的 CVR 预估,单任务 CVR 模型在这种情况下基本无话可说。

怎么调试 MTL 模型?

按"性价比"排序:

  1. 把每个 Head 和它的单任务基线对比。MTL 在某个 Head 上输了 → 有负迁移。
  2. 画门控权重(MMoE / PLE)。任务路由到不同专家了吗?符合直觉吗?
  3. 监控两个任务在共享主干上的梯度余弦相似度。长期为负 → 上 PLE 或 PCGrad。
  4. 做消融。砍一个任务、砍一个专家、砍损失平衡,谁砍掉效果反而更好,谁就是模型其实不需要的。

收尾

Shared-Bottom → ESMM → MMoE → PLE 这条架构演进线,本质上是对"任务在打架"承认得越来越彻底的过程。Shared-Bottom 假装它们不打。ESMM 绕开一个特定的偏差问题。MMoE 让模型自己决定共享在哪。PLE 把"共享 / 私有"做成显式的、结构性的隔离。把合适的架构和理性的损失平衡(先上不确定性加权)配在一起,盯紧梯度余弦相似度,一个能活在生产环境里的 MTL 系统就成形了。


系列导航

本文是推荐系统 16 篇系列的第 9 篇

上一篇下一篇
第八篇:知识图谱增强推荐系统所有文章第十篇:深度兴趣网络与注意力机制

Liked this piece?

Follow on GitHub for the next one — usually one a week.

GitHub