系列 · 推荐系统 · 第 7 篇

推荐系统(七)—— 图神经网络与社交推荐

图神经网络推荐系统全面解析:从消息传递的直觉出发,深入 GCN、GAT、GraphSAGE,再到工业级 PinSage、LightGCN、NGCF、社交推荐与图采样、冷启动处理。配 7 张图与完整 PyTorch 实现。

当 Netflix 决定下一步推荐什么内容时,它并不会孤立地看待你的观看历史。在幕后,其实存在一张复杂的关系网络:电影之间共享演员、用户之间口味重叠、评分信息在整个目录中层层传递。“图”这个视角并非比喻——每一个交互矩阵本质上就是一张图,而将其当作图来处理,能够解锁那些扁平化的用户/物品嵌入所无法表达的丰富信息。

图神经网络(Graph Neural Networks, GNN)正是让我们能在这种图结构上进行推理的利器。它不再孤立地为每个用户和物品学习表示,而是主张:你的表征由你所处的社交圈共同塑造。这一理念催生了 Pinterest 的十亿节点模型 PinSage、设计简洁却在协同过滤任务上超越复杂基线的 LightGCN,以及融合“你自己看了什么”与“你朋友看了什么”的社交推荐系统。

本文将从直觉出发,带你一览该领域的全貌。我们会从用户-物品二部图开始,逐步构建消息传递框架,然后深入剖析 GCN、GAT、GraphSAGE、PinSage、LightGCN、NGCF、社交 GNN,以及让这一切在大规模场景下依然可行的采样技巧。每个模型都配有可运行的 PyTorch 代码。


推荐系统(七)—— 图神经网络与社交推荐 — 章节概览图

你将学到什么#

  • 推荐数据为何天然具有图结构,以及这能带来什么优势
  • GNN 的核心构建模块——GCN、GAT、GraphSAGE——及其适用场景
  • 工业级模型(PinSage、LightGCN、NGCF)如何将 GNN 投入生产
  • 社交信号如何带来 5–15% 的效果提升,以及需要警惕的失效模式
  • 小批量邻居采样技术,让 GNN 能从 MovieLens 扩展到 Pinterest 的规模
  • 一种通过归纳式聚合干净利落地解决冷启动用户问题的方法

前置知识#

  • 熟悉 Python 和 PyTorch(张量、nn.Module、训练循环)
  • 了解基本的嵌入和协同过滤知识(第 2 篇
  • 熟悉矩阵表示法;无需谱图理论——我们将从零开始亲手推导所有内容

为什么推荐系统要用 GNN?#

推荐数据本身就是一张图#

用户-物品二部图:高亮的边显示 Alice 与 Bob 共享两部电影,构成隐式的协同信号

随便观察任何一个推荐系统,仔细审视它的交互矩阵:行代表用户,列代表物品,每个非零项都是一次点击、观看、购买或评分。把这个矩阵重新绘制为一张 二部图

  • 节点 是用户和物品。
  • 连接用户与其交互过的所有物品。
  • 协同信号 隐藏在图的拓扑结构中:交互过相似物品的用户,会通过这些共享物品被隐式地连接起来。

如上图所示,Alice 和 Bob 从未直接互动,但高亮的边揭示了他们共同看过两部电影。GNN 可以跨跳传播这一信号:“Bob 喜欢《银翼杀手》,而 Bob 通过共享电影在图上与 Alice 相似,因此 Alice 很可能也喜欢《银翼杀手》。” 矩阵分解从不会提出这样的问题——它只看到单个单元格。

传统方法遗漏了什么?#

方法它捕捉了什么它遗漏了什么
矩阵分解用户/物品的潜在因子,点积得分每个嵌入独立学习;没有“邻居的邻居”的概念
神经协同过滤 / 自编码器非线性交互仍然是逐对处理;无法沿图结构传播信息
GNN节点嵌入由其 $L$ 跳邻域共同塑造(最终会遇到扩展性和过平滑问题——我们后面会解决)

核心观点:传统方法孤立地学习嵌入;GNN 在上下文中学习嵌入。

GNN 能带来什么?#

  • 显式的图建模 —— 直接在拓扑结构上操作。
  • 邻域聚合 —— 协同信号作为架构的副产品自然传播。
  • 多跳推理 —— 堆叠两层就能捕捉“喜欢与你喜欢的物品相似的物品的用户”。
  • 归纳式变体(GraphSAGE)—— 无需重新训练即可为全新用户/物品生成嵌入。
  • 异构图 —— 能融合社交关系、知识图谱、属性等信息。

图神经网络基础#

图的基本概念(快速回顾)#

一个图 $G=(V,E)$ 包含顶点集合 $V=\{v_1,\dots,v_n\}$ 和边集合 $E\subseteq V\times V$邻接矩阵 $A\in\{0,1\}^{n\times n}$$v_i$$v_j$ 之间有边时满足 $A_{ij}=1$

推荐系统中常见的三种图类型如下:

类型描述示例
二部图两种节点类型;边仅存在于不同类型之间用户 ↔ 物品
加权图边权重表示交互强度评分、点击次数
属性图节点携带特征向量人口统计信息、物品类别

一张图讲清消息传递#

消息传递的三步:邻居生成消息 → 目标节点聚合 → 更新自身状态

无论包装得多复杂,所有 GNN 本质上都是同一个三步流程的变体:

  1. 消息生成。每个邻居计算要发送的内容。
  2. 消息聚合。目标节点汇总收到的消息(求和、取均值、注意力加权等)。
  3. 状态更新。目标节点将聚合结果与自身先前状态融合。
$$\mathbf{m}_v^{(l)} = \mathrm{AGGREGATE}^{(l)}\!\bigl(\{\mathbf{h}_u^{(l-1)} : u\in\mathcal{N}(v)\}\bigr)$$ $$\mathbf{h}_v^{(l)} = \mathrm{UPDATE}^{(l)}\!\bigl(\mathbf{h}_v^{(l-1)},\; \mathbf{m}_v^{(l)}\bigr)$$

用大白话说就是:“看看邻居知道什么,总结一下,再和自己已有的知识融合。” 堆叠 $L$ 层后,每个节点就能感知到其 $L$ 跳范围内的邻居。

电话游戏类比。想象一场电话游戏,每个人同时向所有朋友耳语。一轮之后,你知道了朋友的想法;两轮之后,你知道了朋友的朋友的想法。GNN 正是用可学习、可微分的函数实现了这一过程。


图卷积网络(GCN)#

推荐系统(七)—— 图神经网络与社交推荐 — 章节小结图

核心思想#

GCN(Kipf & Welling, 2017)在图上定义了一种类似卷积的操作。正如 CNN 从局部像素邻域聚合信息一样,GCN 从节点的图邻域聚合特征向量。

GCN 层#

$$\mathbf{H}^{(l+1)} = \sigma\!\Bigl(\tilde{D}^{-1/2}\, \tilde{A}\, \tilde{D}^{-1/2}\, \mathbf{H}^{(l)}\, \mathbf{W}^{(l)}\Bigr)$$

看起来很复杂,但拆解后就很清晰:

符号含义
$\tilde{A} = A + I$添加自环后的邻接矩阵(每个节点也听取自己的意见)
$\tilde{D}$$\tilde{A}$ 的度矩阵
$\tilde{D}^{-1/2}\tilde{A}\tilde{D}^{-1/2}$对称归一化——防止高度数节点主导信息流
$\mathbf{H}^{(l)}$$l$ 层的节点特征矩阵(每行是一个节点的嵌入)
$\mathbf{W}^{(l)}$可学习的权重矩阵
$\sigma$非线性激活函数,通常为 ReLU
$$\mathbf{h}_v^{(l+1)} = \sigma\!\left(\sum_{u\in\mathcal{N}(v)\cup\{v\}} \frac{1}{\sqrt{d_v\,d_u}}\; \mathbf{h}_u^{(l)}\, \mathbf{W}^{(l)}\right)$$
  1. 收集 所有邻居(包括自己)的嵌入。
  2. 归一化 每个贡献项,按 $1/\sqrt{d_v\cdot d_u}$ 缩放——热门节点的影响被削弱,避免喧宾夺主。
  3. 变换 使用共享权重矩阵 $\mathbf{W}$ 进行线性变换。
  4. 激活 应用 ReLU。

实现:GCN 层#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.utils import add_self_loops, degree

class GCNLayer(nn.Module):
    """单层图卷积。"""

    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.linear = nn.Linear(in_channels, out_channels)

    def forward(self, x, edge_index):
        # 1. 添加自环,让节点也关注自身
        edge_index, _ = add_self_loops(edge_index, num_nodes=x.size(0))

        # 2. 计算对称归一化系数
        row, col = edge_index
        deg = degree(col, x.size(0), dtype=x.dtype)
        deg_inv_sqrt = deg.pow(-0.5)
        deg_inv_sqrt[deg_inv_sqrt == float('inf')] = 0
        norm = deg_inv_sqrt[row] * deg_inv_sqrt[col]

        # 3. 线性变换
        x = self.linear(x)

        # 4. 使用 scatter-add 聚合归一化后的邻居特征
        out = torch.zeros_like(x)
        out.scatter_add_(0, col.unsqueeze(1).expand_as(x), norm.unsqueeze(1) * x[row])
        return out

多层 GCN#

堆叠层数可以扩展感受野。两层覆盖两跳邻域,三层覆盖三跳。但需警惕 过平滑:层数过多会导致所有节点的嵌入趋于一致。在推荐任务中,2 到 3 层通常是最佳选择。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class GCN(nn.Module):
    """多层 GCN。"""

    def __init__(self, num_nodes, in_channels, hidden_channels,
                 out_channels, num_layers=2, dropout=0.5):
        super().__init__()
        self.num_layers = num_layers
        self.dropout = dropout

        # 如果节点没有原始特征,用 ID 嵌入代替
        self.embedding = None
        if in_channels == 0:
            self.embedding = nn.Embedding(num_nodes, hidden_channels)
            in_channels = hidden_channels

        self.convs = nn.ModuleList()
        if num_layers == 1:
            self.convs.append(GCNLayer(in_channels, out_channels))
        else:
            self.convs.append(GCNLayer(in_channels, hidden_channels))
            for _ in range(num_layers - 2):
                self.convs.append(GCNLayer(hidden_channels, hidden_channels))
            self.convs.append(GCNLayer(hidden_channels, out_channels))

    def forward(self, x, edge_index):
        if self.embedding is not None:
            x = self.embedding.weight
        for i, conv in enumerate(self.convs):
            x = conv(x, edge_index)
            if i < self.num_layers - 1:
                x = F.relu(x)
                x = F.dropout(x, p=self.dropout, training=self.training)
        return x

图注意力网络(GAT)#

为何需要注意力?#

GCN 对所有邻居一视同仁(最多通过度归一化调整)。但并非所有邻居都同等重要——你最好朋友的电影品味显然比某个泛泛之交更有参考价值。GAT(Veličković et al., 2018)通过为每条边学习 自适应的注意力权重 来解决这个问题。

GAT 如何工作#

$$e_{ij} = \mathrm{LeakyReLU}\!\bigl(\mathbf{a}^\top [\mathbf{W}\mathbf{h}_i \,\|\, \mathbf{W}\mathbf{h}_j]\bigr)$$ $$\alpha_{ij} = \frac{\exp(e_{ij})}{\sum_{k\in\mathcal{N}(i)} \exp(e_{ik})}$$ $$\mathbf{h}_i^{(l+1)} = \sigma\!\left(\sum_{j\in\mathcal{N}(i)\cup\{i\}} \alpha_{ij}\, \mathbf{W}^{(l)}\, \mathbf{h}_j^{(l)}\right)$$

通俗地说:“对每个邻居,学习它对我有多相关,然后取它们特征的加权平均。”

多头注意力#

$$\mathbf{h}_i^{(l+1)} = \big\|_{k=1}^{K}\; \sigma\!\left(\sum_{j\in\mathcal{N}(i)\cup\{i\}} \alpha_{ij}^{(k)}\, \mathbf{W}^{(k)}\, \mathbf{h}_j^{(l)}\right)$$

实现:GAT 层#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import MessagePassing
from torch_geometric.utils import add_self_loops, softmax

class GATLayer(MessagePassing):
    """带多头注意力的 GAT 层。"""

    def __init__(self, in_channels, out_channels, heads=1,
                 dropout=0.0, concat=True):
        super().__init__(aggr='add', node_dim=0)
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.heads = heads
        self.concat = concat
        self.dropout = dropout

        self.lin = nn.Linear(in_channels, heads * out_channels, bias=False)
        self.att = nn.Parameter(torch.empty(1, heads, 2 * out_channels))
        nn.init.xavier_uniform_(self.lin.weight)
        nn.init.xavier_uniform_(self.att)

    def forward(self, x, edge_index):
        edge_index, _ = add_self_loops(edge_index, num_nodes=x.size(0))
        # 投影到多头空间:[N, heads, out_channels]
        x = self.lin(x).view(-1, self.heads, self.out_channels)
        out = self.propagate(edge_index, x=x)
        return out.view(-1, self.heads * self.out_channels) if self.concat else out.mean(dim=1)

    def message(self, x_i, x_j, index):
        # x_i 是目标节点特征,x_j 是源节点特征
        alpha = (torch.cat([x_i, x_j], dim=-1) * self.att).sum(dim=-1)
        alpha = F.leaky_relu(alpha, negative_slope=0.2)
        alpha = softmax(alpha, index)  # 按目标节点归一化
        alpha = F.dropout(alpha, p=self.dropout, training=self.training)
        return x_j * alpha.unsqueeze(-1)

GraphSAGE:可扩展的归纳学习#

GCN 的问题#

GCN 是 直推式 的:它需要在训练时看到整张图,且无法为之后出现的新节点生成嵌入。如果明天有新用户注册,GCN 必须重新训练才能处理。

GraphSAGE(Hamilton et al., 2017)通过 归纳学习 解决了这一问题。它不记忆每个节点的固定嵌入,而是学习 如何聚合邻居信息。当新节点出现时,GraphSAGE 对其邻居运行相同的聚合逻辑,即时生成嵌入。

采样与聚合#

GraphSAGE 有两个关键思想:

  1. 邻居采样。不用所有邻居(开销太大),每跳固定采样 $k$ 个。
  2. 可学习的聚合函数。对采样的邻居应用可训练的函数(均值、LSTM、最大池化)。
$$\mathbf{h}_{\mathcal{N}(v)}^{(l)} = \mathrm{AGGREGATE}^{(l)}\!\bigl(\{\mathbf{h}_u^{(l-1)} : u\in\mathcal{N}_{\text{sampled}}(v)\}\bigr)$$ $$\mathbf{h}_v^{(l)} = \sigma\!\bigl(\mathbf{W}^{(l)}\cdot[\mathbf{h}_v^{(l-1)} \,\|\, \mathbf{h}_{\mathcal{N}(v)}^{(l)}]\bigr)$$

通俗地说:“采一些邻居,总结它们的特征,拼上自己的特征,再过一个线性层。”

聚合器工作原理
Mean邻居嵌入取平均:$\tfrac{1}{\lvert\mathcal{N}\rvert}\sum_u \mathbf{h}_u$
Max-pool共享 MLP 后逐元素取最大:$\max(\sigma(\mathbf{W}_{\text{pool}}\mathbf{h}_u + \mathbf{b}))$
LSTM将邻居视为序列输入 LSTM(对顺序敏感)

三者快速对比#

GCN vs GAT vs GraphSAGE:等权 / 学习注意力 / 采样后聚合

GCN 给所有邻居相同的(度归一化后的)话语权。GAT 学习谁更重要,并放大其权重。GraphSAGE 则选取一部分邻居,确保整个系统在超大图上依然高效。

实现:GraphSAGE 层#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import MessagePassing
from torch_geometric.utils import add_self_loops

class SAGEConv(MessagePassing):
    """采用均值聚合的 GraphSAGE 层。"""

    def __init__(self, in_channels, out_channels, normalize=False):
        super().__init__(aggr='mean')
        self.normalize = normalize
        self.lin_self = nn.Linear(in_channels, out_channels)   # 自身特征
        self.lin_neigh = nn.Linear(in_channels, out_channels)  # 邻居特征

    def forward(self, x, edge_index):
        edge_index, _ = add_self_loops(edge_index, num_nodes=x.size(0))
        neigh_agg = self.propagate(edge_index, x=x)
        out = self.lin_self(x) + self.lin_neigh(neigh_agg)
        return F.normalize(out, p=2, dim=1) if self.normalize else out

    def message(self, x_j):
        return x_j

能够为全新节点生成嵌入,是 GraphSAGE 在工业界广受欢迎的最主要原因。Pinterest 的 PinSage(下一节)正是它的直系后代。


PinSage:Pinterest 的十亿级推荐系统#

PinSage(Ying et al., 2018)是 Pinterest 的生产级 GNN,运行在 30 亿节点和 180 亿条边 上。三大创新使其成为可能:

随机游走采样#

PinSage 不再均匀采样邻居,而是从每个节点出发执行短随机游走,并保留访问频率最高的 top-$k$ 邻居。这聚焦于 最重要 的邻居,而非仅仅是最近的邻居。

重要性加权聚合#

$$\mathbf{h}_v^{(l)} = \sigma\!\Bigl(\mathbf{W}^{(l)}\cdot \bigl[\mathbf{h}_v^{(l-1)} \,\big\|\, \mathrm{AGG}\bigl(\{\alpha_{uv}\, \mathbf{h}_u^{(l-1)} : u\in\mathcal{N}_{\text{top-}k}(v)\}\bigr)\bigr]\Bigr)$$

困难负样本挖掘#

训练时,PinSage 会采样 困难负样本——那些得分很高但并非真实正样本的物品。这迫使模型做出更精细的区分,而非简单地将明显无关的物品与正样本分开。

实现:PinSage 卷积#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import torch
import torch.nn as nn
import numpy as np
from collections import Counter, defaultdict

class PinSageConv(nn.Module):
    """带重要性加权聚合的 PinSage 卷积模块。"""

    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.aggregator = nn.Sequential(
            nn.Linear(in_channels, out_channels),
            nn.ReLU(),
            nn.Linear(out_channels, out_channels),
        )
        self.combine = nn.Linear(in_channels + out_channels, out_channels)

    def forward(self, x, edge_index, importance_weights=None):
        row, col = edge_index
        neighbor_feats = x[col]
        if importance_weights is not None:
            neighbor_feats = neighbor_feats * importance_weights.unsqueeze(1)

        aggregated = torch.zeros(x.size(0), x.size(1), device=x.device)
        aggregated.scatter_add_(
            0, row.unsqueeze(1).expand_as(neighbor_feats), neighbor_feats
        )
        aggregated = self.aggregator(aggregated)
        return self.combine(torch.cat([x, aggregated], dim=1))

def compute_random_walk_importance(adj_list, num_nodes, num_walks=10,
                                   walk_length=5, top_k=10):
    """通过短随机游走计算邻居重要性。"""
    visit_counts = Counter()
    for start in range(num_nodes):
        if start not in adj_list:
            continue
        for _ in range(num_walks):
            current = start
            for _ in range(walk_length):
                neighbors = adj_list.get(current, [])
                if not neighbors:
                    break
                current = np.random.choice(neighbors)
                visit_counts[(start, current)] += 1

    per_source = defaultdict(list)
    for (src, tgt), cnt in visit_counts.items():
        per_source[src].append((tgt, cnt))

    scores = {}
    for src, neighbors in per_source.items():
        neighbors.sort(key=lambda t: t[1], reverse=True)
        top = neighbors[:top_k]
        total = sum(c for _, c in top)
        for tgt, cnt in top:
            scores[(src, tgt)] = cnt / total if total > 0 else 0
    return scores

LightGCN:少即是多#

一个惊人的发现#

LightGCN(He et al., 2020)提出了一个大胆的问题:“如果我们把 GCN 中的所有东西都去掉——没有可学习的权重矩阵,没有非线性激活,没有自环——只保留邻域聚合,会怎样?”

答案是,在协同过滤任务上,效果反而更好。图结构本身已包含足够信号,而标准 GCN 中的特征变换和激活函数实际上 有害,因为它们引入了不必要的复杂性和过拟合风险。

标准 GCN 保留自环、权重、ReLU;LightGCN 只保留聚合和层组合

架构#

$$\mathbf{e}_u^{(l+1)} = \sum_{i\in\mathcal{N}(u)} \frac{1}{\sqrt{|\mathcal{N}(u)|\cdot|\mathcal{N}(i)|}}\; \mathbf{e}_i^{(l)}$$ $$\mathbf{e}_i^{(l+1)} = \sum_{u\in\mathcal{N}(i)} \frac{1}{\sqrt{|\mathcal{N}(u)|\cdot|\mathcal{N}(i)|}}\; \mathbf{e}_u^{(l)}$$

通俗地说:“对邻居的嵌入取平均(按度归一化)。没有激活,没有权重矩阵。这就是一层。”

$$\mathbf{e}_u = \sum_{l=0}^{L} \alpha_l\, \mathbf{e}_u^{(l)}, \qquad \mathbf{e}_i = \sum_{l=0}^{L} \alpha_l\, \mathbf{e}_i^{(l)}$$

其中 $\alpha_l = \tfrac{1}{L+1}$ (等权重)。第 0 层是原始嵌入,第 1 层是一跳,第 2 层是两跳,依此类推。组合它们能得到多尺度表示,而无需锁定单一深度。

为何有效?#

  • 平滑效应。聚合使相似节点的嵌入更接近——这正是协同过滤想要的。
  • 层组合。混合各层能平衡直接信号(第 0 层)与高阶协同信号(深层),并缓解过平滑。
  • 简洁即正则。参数更少,意味着在稀疏交互数据上过拟合风险更低。

实现:LightGCN#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import torch
import torch.nn as nn
from torch_geometric.nn import MessagePassing
from torch_geometric.utils import degree

class LightGCNLayer(MessagePassing):
    """LightGCN 的一层:归一化邻居聚合,仅此而已。"""

    def __init__(self):
        super().__init__(aggr='add')

    def forward(self, x, edge_index):
        row, col = edge_index
        deg = degree(col, x.size(0), dtype=x.dtype)
        deg_inv_sqrt = deg.pow(-0.5)
        deg_inv_sqrt[deg_inv_sqrt == float('inf')] = 0
        norm = deg_inv_sqrt[row] * deg_inv_sqrt[col]
        return self.propagate(edge_index, x=x, norm=norm)

    def message(self, x_j, norm):
        return norm.unsqueeze(1) * x_j

class LightGCN(nn.Module):
    """完整的 LightGCN 协同过滤模型。"""

    def __init__(self, num_users, num_items, embedding_dim=64, num_layers=3):
        super().__init__()
        self.num_users = num_users
        self.num_items = num_items
        self.num_layers = num_layers

        self.user_embedding = nn.Embedding(num_users, embedding_dim)
        self.item_embedding = nn.Embedding(num_items, embedding_dim)
        nn.init.normal_(self.user_embedding.weight, std=0.1)
        nn.init.normal_(self.item_embedding.weight, std=0.1)

        self.convs = nn.ModuleList([LightGCNLayer() for _ in range(num_layers)])

    def forward(self, edge_index):
        x = torch.cat([self.user_embedding.weight, self.item_embedding.weight], dim=0)
        layer_embeddings = [x]  # 第 0 层 = 原始嵌入
        for conv in self.convs:
            x = conv(x, edge_index)
            layer_embeddings.append(x)

        # 各层等权组合
        final = torch.stack(layer_embeddings, dim=0).mean(dim=0)
        return final[:self.num_users], final[self.num_users:]

    def predict(self, user_emb, item_emb, user_ids, item_ids):
        return (user_emb[user_ids] * item_emb[item_ids]).sum(dim=1)

NGCF:神经图协同过滤#

与 LightGCN 的区别#

NGCF(Wang et al., 2019)采取了相反的哲学:在消息传递中显式加入特征变换和用户-物品交互项,应能产生更丰富的嵌入。LightGCN 做减法,NGCF 则做加法。

消息构造#

$$\mathbf{m}_{u\leftarrow i} = \frac{1}{\sqrt{|\mathcal{N}(u)|\cdot|\mathcal{N}(i)|}}\;\bigl(\mathbf{W}_1\, \mathbf{e}_i^{(l)} + \mathbf{W}_2\, (\mathbf{e}_i^{(l)} \odot \mathbf{e}_u^{(l)})\bigr)$$

其中 $\odot$ (逐元素乘积)捕捉用户和物品特征的交互。如果用户在“动作”维度上得分高,物品在同一维度也高,乘积会放大该信号。

$$\mathbf{e}_u^{(l+1)} = \mathrm{LeakyReLU}\!\bigl(\mathbf{m}_{u\leftarrow u} + \sum_{i\in\mathcal{N}(u)} \mathbf{m}_{u\leftarrow i}\bigr)$$

实现:NGCF#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import MessagePassing
from torch_geometric.utils import degree

class NGCFLayer(MessagePassing):
    """带特征变换与交互的 NGCF 层。"""

    def __init__(self, in_channels, out_channels, dropout=0.0):
        super().__init__(aggr='add')
        self.W1 = nn.Linear(in_channels, out_channels, bias=False)
        self.W2 = nn.Linear(in_channels, out_channels, bias=False)
        self.W_self = nn.Linear(in_channels, out_channels, bias=False)
        self.dropout = dropout

    def forward(self, x, edge_index):
        row, col = edge_index
        deg = degree(col, x.size(0), dtype=x.dtype)
        deg_inv_sqrt = deg.pow(-0.5)
        deg_inv_sqrt[deg_inv_sqrt == float('inf')] = 0
        norm = deg_inv_sqrt[row] * deg_inv_sqrt[col]

        self_emb = self.W_self(x)
        neighbor_emb = self.propagate(edge_index, x=x, norm=norm)
        out = F.leaky_relu(self_emb + neighbor_emb, negative_slope=0.2)
        return F.dropout(out, p=self.dropout, training=self.training)

    def message(self, x_i, x_j, norm):
        # W1 * 邻居 + W2 * (用户 . 物品) —— 特征交互
        msg = self.W1(x_j) + self.W2(x_i * x_j)
        return norm.unsqueeze(1) * msg

class NGCF(nn.Module):
    """神经图协同过滤。"""

    def __init__(self, num_users, num_items, embedding_dim=64,
                 num_layers=3, dropout=0.1):
        super().__init__()
        self.num_users = num_users
        self.num_items = num_items
        self.user_embedding = nn.Embedding(num_users, embedding_dim)
        self.item_embedding = nn.Embedding(num_items, embedding_dim)
        nn.init.normal_(self.user_embedding.weight, std=0.1)
        nn.init.normal_(self.item_embedding.weight, std=0.1)

        self.convs = nn.ModuleList([
            NGCFLayer(embedding_dim, embedding_dim, dropout)
            for _ in range(num_layers)
        ])

    def forward(self, edge_index):
        x = torch.cat([self.user_embedding.weight, self.item_embedding.weight], dim=0)
        for conv in self.convs:
            x = conv(x, edge_index)
        return x[:self.num_users], x[self.num_users:]

LightGCN vs. NGCF:该如何选择?#

LightGCNNGCF
参数量仅有嵌入嵌入 + 权重矩阵
性能在稀疏数据上通常更好在特征丰富时表现更佳
过拟合风险较高(参数更多)
训练速度较慢
推荐用于大多数 CF 任务特征密集的任务

默认选择:LightGCN。只有在拥有强用户/物品特征且数据集足够大以支撑额外参数时,才考虑 NGCF。


为何要堆叠层数?多跳聚合#

多跳邻域:目标用户先从交互过的物品收集信息,再从相似用户收集信息,最后从候选物品收集信息

一旦画出每一跳的传播路径,推荐系统为何关心深度就显而易见了:

  • 第 1 层 —— 目标用户从其交互过的物品中收集特征。
  • 第 2 层 —— 信号到达相似用户(经典的协同过滤跳)。
  • 第 3 层 —— 信号到达那些相似用户喜欢的物品:即候选推荐。

两层已能捕捉“喜欢相似物品的用户”这一模式,这是矩阵分解无法表达的。三层在稀疏图上偶尔有帮助,但容易引发过平滑——若想获得深度而不崩溃,可采用 LightGCN 式的层组合。


社交推荐#

核心思想#

朋友会影响你购买、观看和收听的内容。社交推荐在用户-物品图中加入了 社交边(好友、关注、信任链接)。其假设是:社交相连的用户往往偏好相似。

图结构#

社交推荐图包含两种边类型:

  • 交互边 $(u_i, i_j)$ :用户 $u_i$ 与物品 $i_j$ 有交互。
  • 社交边 $(u_i, u_j)$ :用户 $u_i$$u_j$ 是好友。

两种机制#

同质性(Homophily)。朋友因共享背景和文化而兴趣相似。

社会影响(Social influence)。朋友因相互影响而随时间变得更相似。

对模型而言,两者都归结为同一处方:沿社交边传播偏好。

实现:Social GCN#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch_geometric.nn import MessagePassing

class SocialGCNLayer(MessagePassing):
    """同时处理用户-物品边和社交边的 GCN 层。"""

    def __init__(self, in_channels, out_channels):
        super().__init__(aggr='mean')
        self.linear = nn.Linear(in_channels, out_channels)

    def forward(self, x, edge_index_ui, edge_index_social, num_users):
        x_ui = self.propagate(edge_index_ui, x=x)
        x_social = self.propagate(edge_index_social, x=x[:num_users])
        x_combined = x_ui.clone()
        x_combined[:num_users] = x_combined[:num_users] + x_social
        return self.linear(x_combined)

class SocialGCN(nn.Module):
    """基于 GCN 的社交推荐模型。"""

    def __init__(self, num_users, num_items, embedding_dim=64, num_layers=2):
        super().__init__()
        self.num_users = num_users
        self.user_embedding = nn.Embedding(num_users, embedding_dim)
        self.item_embedding = nn.Embedding(num_items, embedding_dim)
        self.convs = nn.ModuleList([
            SocialGCNLayer(embedding_dim, embedding_dim)
            for _ in range(num_layers)
        ])

    def forward(self, edge_index_ui, edge_index_social):
        x = torch.cat([self.user_embedding.weight, self.item_embedding.weight], dim=0)
        for conv in self.convs:
            x = conv(x, edge_index_ui, edge_index_social, self.num_users)
            x = F.relu(x)
        return x[:self.num_users], x[self.num_users:]

实证研究表明,当社交信号有效时,NDCG 通常能提升 5–15%——这在生产环境中意义重大。失效模式则是:噪声大的社交关系(如随机 Facebook 好友,无共同品味)反而有害。可通过注意力机制降低无关社交边的权重,或在训练前根据交互重叠度过滤社交图。


图采样实现可扩展性#

问题#

全批量 GNN 训练意味着每次前向传播都要将 整张图 加载到内存,并为 所有节点 计算嵌入。这对 MovieLens(10 万次交互)可行,但对 Pinterest(180 亿条边)则完全不可能。

完整图 vs 采样小批:选定一个目标节点,采样少数一跳邻居,再采样少数二跳邻居,只有这些节点参与一次前向传播

采样策略#

策略工作原理示例
邻居采样每层为每个节点采样固定数量的邻居GraphSAGE:第 1 跳 10 个,第 2 跳 5 个
节点采样采样子集节点,使用其邻域FastGCN
子图采样采样连通子图Cluster-GCN, GraphSAINT
随机游走采样用随机游走找到重要邻居PinSage

邻居采样是主力:限制每个节点的感受野,使内存占用可预测;每轮重新采样还能减少方差。

实现:邻居采样器#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import numpy as np
import torch

class NeighborSampler:
    """GraphSAGE 风格的邻居采样器,用于小批量训练。"""

    def __init__(self, edge_index, num_nodes, num_neighbors=(10, 5)):
        """初始化采样器。
        
        Args:
            num_neighbors: (第一跳采样数, 第二跳采样数, …)
        """
        self.num_nodes = num_nodes
        self.num_neighbors = num_neighbors

        self.adj = {i: [] for i in range(num_nodes)}
        row, col = edge_index.cpu().numpy()
        for r, c in zip(row, col):
            self.adj[r].append(c)

    def sample(self, target_nodes):
        """为目标节点采样多跳邻居。"""
        current_nodes = set(target_nodes.tolist())
        all_layers = []

        for num_nbrs in self.num_neighbors:
            edges, next_nodes = [], set()
            for node in current_nodes:
                neighbors = self.adj[node]
                if not neighbors:
                    continue
                if len(neighbors) > num_nbrs:
                    sampled = np.random.choice(
                        neighbors, num_nbrs, replace=False
                    ).tolist()
                else:
                    sampled = neighbors
                for nbr in sampled:
                    edges.append([nbr, node])
                    next_nodes.add(nbr)

            if edges:
                all_layers.append(torch.tensor(edges).t().contiguous())
            else:
                all_layers.append(torch.empty((2, 0), dtype=torch.long))
            current_nodes = next_nodes
        return all_layers

训练技巧#

BPR 损失(贝叶斯个性化排序)#

$$\mathcal{L}_{\text{BPR}} = -\sum_{(u,i,j)} \ln\, \sigma(\hat{r}_{ui} - \hat{r}_{uj}) + \lambda \|\Theta\|^2$$

通俗地说:“对每个用户 $u$ ,让正样本物品 $i$ (交互过)的得分高于负样本物品 $j$ (未交互)。sigmoid 和 log 使其成为光滑、可微的目标。”

  • $(u,i,j)$ = (用户, 正物品, 负物品) 三元组
  • $\hat{r}_{ui} = \mathbf{e}_u^\top \mathbf{e}_i$ = 点积得分
  • $\lambda \|\Theta\|^2$ = L2 正则化

负采样策略#

策略描述何时使用
均匀采样随机选择未交互物品基线
基于流行度按物品流行度比例采样应对长尾问题
困难负样本选择高分负样本后期精调

训练循环#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import torch
import torch.nn as nn
import torch.optim as optim

class BPRLoss(nn.Module):
    """带 L2 正则化的 BPR 损失。"""

    def __init__(self, reg_lambda=1e-4):
        super().__init__()
        self.reg_lambda = reg_lambda

    def forward(self, pos_scores, neg_scores, *embeddings):
        bpr = -torch.log(torch.sigmoid(pos_scores - neg_scores) + 1e-10).mean()
        reg = self.reg_lambda * sum(emb.norm(2).pow(2) for emb in embeddings)
        return bpr + reg

def train_lightgcn(model, edge_index, train_pairs, num_items,
                   num_epochs=100, lr=0.001, batch_size=1024):
    """LightGCN 的训练循环。train_pairs: [user, pos_item] 张量。"""
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion = BPRLoss()

    for epoch in range(num_epochs):
        model.train()
        perm = torch.randperm(len(train_pairs))
        total_loss, num_batches = 0.0, 0

        for start in range(0, len(train_pairs), batch_size):
            batch = train_pairs[perm[start:start + batch_size]]
            user_ids, pos_items = batch[:, 0], batch[:, 1]
            neg_items = torch.randint(0, num_items, (len(batch),))

            optimizer.zero_grad()
            user_emb, item_emb = model(edge_index)

            pos_scores = (user_emb[user_ids] * item_emb[pos_items]).sum(1)
            neg_scores = (user_emb[user_ids] * item_emb[neg_items]).sum(1)

            loss = criterion(pos_scores, neg_scores,
                             user_emb[user_ids],
                             item_emb[pos_items],
                             item_emb[neg_items])
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
            num_batches += 1

        if (epoch + 1) % 10 == 0:
            print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss/num_batches:.4f}")

评估指标#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import numpy as np
import torch

def evaluate_topk(model, edge_index, test_user_items, train_user_items, k=10):
    """计算 Recall@K 和 NDCG@K。"""
    model.eval()
    recalls, ndcgs = [], []

    with torch.no_grad():
        user_emb, item_emb = model(edge_index)
        for user_id, test_items in test_user_items.items():
            scores = (user_emb[user_id] * item_emb).sum(dim=1)

            # 排除训练集中的物品
            train_items = train_user_items.get(user_id, [])
            scores[train_items] = -float('inf')

            _, top_k = torch.topk(scores, k)
            top_k = set(top_k.cpu().numpy())

            hits = len(top_k & set(test_items))
            recalls.append(hits / len(test_items) if test_items else 0)

            # NDCG:奖励排名靠前的命中
            dcg = sum(1.0 / np.log2(idx + 2)
                      for idx, item in enumerate(top_k) if item in test_items)
            idcg = sum(1.0 / np.log2(idx + 2)
                       for idx in range(min(k, len(test_items))))
            ndcgs.append(dcg / idcg if idcg > 0 else 0)

    return np.mean(recalls), np.mean(ndcgs)

冷启动:图模型大显身手之处#

矩阵分解无法为新用户提供 embedding;GraphSAGE 风格的聚合通过一次交互就能生成 embedding

一个新用户出现,仅有一条交互记录。矩阵分解无能为力——它没有该用户的行,添加一行意味着重新训练。而像 GraphSAGE 这样的归纳式 GNN 则不同:只需对其拥有的任意邻居运行相同的聚合函数即可。即使只有一条交互,也能生成有意义的嵌入,因为聚合会拉入我们对该物品其他欣赏者的全部认知。

处理冷启动用户的实用方案:

  1. 归纳式聚合。使用 GraphSAGE/PinSage 式模型,任何至少有一个邻居的节点都能免费获得嵌入。
  2. 特征初始化。在聚合前,用侧信息嵌入(人口统计、设备、注册渠道)初始化冷启动节点。
  3. 相似用户均值兜底。在用户交互不足时,将其嵌入与共享相同侧信息的用户均值混合。
  4. 元学习(MAML 式)。若资源允许,可训练模型从少量交互中快速适应新用户。

完整示例:在 MovieLens 上运行 LightGCN#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import torch
import numpy as np
from sklearn.model_selection import train_test_split

# --- 1. 构造模拟的 MovieLens 数据 ---
num_users, num_items = 1000, 2000
num_interactions = 10000

users = np.random.randint(0, num_users, num_interactions)
items = np.random.randint(0, num_items, num_interactions)
train_u, test_u, train_i, test_i = train_test_split(
    users, items, test_size=0.2, random_state=42
)

# 二部图边索引(无向)
edge_src = np.concatenate([train_u, train_i + num_users])
edge_dst = np.concatenate([train_i + num_users, train_u])
edge_index = torch.tensor([edge_src, edge_dst], dtype=torch.long)

# --- 2. 训练 LightGCN 模型 ---
model = LightGCN(num_users, num_items, embedding_dim=64, num_layers=3)
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = BPRLoss()

train_pairs = torch.tensor(np.stack([train_u, train_i], axis=1), dtype=torch.long)

for epoch in range(50):
    model.train()
    perm = torch.randperm(len(train_pairs))
    for start in range(0, len(train_pairs), 512):
        batch = train_pairs[perm[start:start + 512]]
        user_ids, pos_items = batch[:, 0], batch[:, 1]
        neg_items = torch.randint(0, num_items, (len(batch),))

        optimizer.zero_grad()
        user_emb, item_emb = model(edge_index)
        pos_scores = (user_emb[user_ids] * item_emb[pos_items]).sum(1)
        neg_scores = (user_emb[user_ids] * item_emb[neg_items]).sum(1)
        loss = criterion(pos_scores, neg_scores,
                         user_emb[user_ids],
                         item_emb[pos_items],
                         item_emb[neg_items])
        loss.backward()
        optimizer.step()

    if (epoch + 1) % 10 == 0:
        print(f"第 {epoch+1} 轮,损失:{loss.item():.4f}")

print("训练完成!")

常见问题#

为何 GNN 在推荐任务上优于矩阵分解?#

矩阵分解独立学习每个嵌入——它从不问“我的邻居长什么样?”。GNN 则通过图显式传播协同信号,因此相似用户(通过共享物品连接)会因架构本身而获得相似嵌入。多层 GNN 能捕捉更高阶的模式(如“喜欢与你喜欢的物品相似的物品的用户”),这是单次点积无法表达的。

GCN vs. GAT vs. GraphSAGE —— 何时用哪个?#

GCNGATGraphSAGE
优势简单、高效自适应邻居加权可扩展、支持新节点
劣势需要全图、权重固定大邻域计算昂贵采样引入方差
最适合小/中型静态图异构边大型、动态图

经验法则:协同过滤任务首选 LightGCN(简化版 GCN)。需要归纳学习时选 GraphSAGE;当不同邻居重要性差异很大时选 GAT。

GNN 应该用几层?#

推荐任务中通常用 2 到 3 层:

  • 1 层 —— 仅直接邻居;错过协同模式。
  • 2 层 —— 朋友的朋友;大多数任务的最佳点。
  • 3 层 —— 三跳模式;略有增益,但需警惕过平滑。
  • 4+ 层 —— 过平滑主导;嵌入变得难以区分。

若想获得多尺度收益而不陷入深度陷阱,可采用 LightGCN 式的层组合。

如何处理冷启动用户?#

  1. GraphSAGE 式归纳。只要节点至少有一个邻居,聚合就能工作。
  2. 特征初始化。用侧信息(人口统计、设备)或相似用户均值初始化新用户嵌入。
  3. 元学习。采用 MAML 式方法,从少量交互中快速适应。
  4. 混合模型。结合 GNN 嵌入与基于内容的特征,确保冷启动节点仍有有用表示。

参见上方冷启动示意图了解归纳式处理。

计算成本如何?#

模型前向传播内存
GCN / LightGCN$O(L\cdot\lvert E\rvert\cdot d)$$O(\lvert V\rvert\cdot d + \lvert E\rvert)$
GAT$O(L\cdot\lvert E\rvert\cdot d\cdot H)$$O(\lvert V\rvert\cdot d\cdot H + \lvert E\rvert)$
GraphSAGE(采样)$O(L\cdot\lvert V\rvert\cdot k\cdot d)$$O(\lvert V\rvert\cdot d + k\cdot\lvert V\rvert)$

其中 $L$ = 层数,$|E|$ = 边数,$d$ = 嵌入维度,$H$ = 注意力头数,$k$ = 采样邻居数。对于十亿级边的图,采样(GraphSAGE, PinSage)或子图划分(Cluster-GCN)是必须的。

社交推荐是否总是有效?#

仅当社交关系真实反映共同偏好时才有效。实证显示,在信息丰富的社交图上,NDCG 可提升 5–15%。但噪声社交连接(如随机关注列表)反而有害。可使用注意力机制降低无关边的权重,或在训练前根据交互重叠度预过滤社交图。

GNN 能处理动态图吗?#

可以,但需调整:

  • 时间感知聚合 —— 按时间远近加权邻居。
  • 时间编码 —— 为边添加时间嵌入。
  • 增量更新 —— 在新边上微调而非重新训练。
  • 时序 GNN —— TGN 等原生支持流式图。
1
2
3
4
5
6
def temporal_aggregate(neighbor_embs, timestamps, current_time, decay_rate=0.1):
    """按时间近度给邻居加权"""
    time_diffs = current_time - timestamps
    weights = torch.exp(-decay_rate * time_diffs)
    weights = weights / weights.sum()
    return (neighbor_embs * weights.unsqueeze(1)).sum(dim=0)

如何防止过平滑?#

  1. 限制深度 至 2–3 层。
  2. 层组合(LightGCN 式)—— 各层等权,确保最终表示仍包含原始嵌入。
  3. 残差连接 —— $\mathbf{h}^{(l+1)} = \mathbf{h}^{(l)} + \mathrm{GNN}(\mathbf{h}^{(l)})$
  4. 边 dropout —— 训练时随机丢弃边,减少对结构的过度依赖。

总结#

  • 推荐数据天然具有图结构。GNN 通过邻域聚合传播协同信号,这是扁平嵌入无法做到的。
  • LightGCN 证明了简洁的力量。将 GCN 精简至仅剩聚合操作,在协同过滤任务上常胜过更复杂的架构。
  • GraphSAGE 实现了归纳学习。这对生产系统至关重要——新用户和物品不断涌现,这也是处理冷启动最干净的路径。
  • PinSage 展示了十亿级规模的可行性。随机游走采样和重要性加权使 GNN 在 Pinterest 规模下变得实用。
  • 社交信号可带来 5–15% 的提升——但前提是社交连接确实反映了共同偏好。
  • 2 到 3 层是最佳选择。更深易导致过平滑;层组合可在不牺牲深度的情况下避免崩溃。

本文是推荐系统系列的第 7 篇,共 16 篇。

本系列

推荐系统 16 篇

  1. 01 推荐系统(一)—— 入门与基础概念
  2. 02 推荐系统(二)—— 协同过滤与矩阵分解
  3. 03 推荐系统(三)—— 深度学习基础模型
  4. 04 推荐系统(四)—— CTR 预估与点击率建模
  5. 05 推荐系统(五)—— Embedding 表示学习
  6. 06 推荐系统(六)—— 序列推荐与会话建模
  7. 07 推荐系统(七)—— 图神经网络与社交推荐 当前
  8. 08 推荐系统(八)—— 知识图谱增强推荐系统
  9. 09 推荐系统(九)—— 多任务学习与多目标优化
  10. 10 推荐系统(十)—— 深度兴趣网络与注意力机制
  11. 11 推荐系统(十一)—— 对比学习与自监督学习
  12. 12 推荐系统(十二)—— 大语言模型与推荐系统
  13. 13 推荐系统(十三)—— 公平性、去偏与可解释性
  14. 14 推荐系统(十四)—— 跨域推荐与冷启动解决方案
  15. 15 推荐系统(十五)—— 实时推荐与在线学习
  16. 16 推荐系统(十六)—— 工业级架构与最佳实践

读有所得?

GitHub 关注我 → 新文周更

GitHub