系列 · 推荐系统 · 第 8 篇

推荐系统(八)—— 知识图谱增强推荐系统

知识图谱增强推荐系统全面解析:知识图谱嵌入(TransE/TransR/DistMult),RippleNet 涟漪传播,KGCN 图卷积,KGAT 注意力网络,CKE 多信号融合,附完整 PyTorch 实现。

在流媒体平台搜索《蝙蝠侠:黑暗骑士》时,系统不仅记录你观看了这部电影,还知道克里斯蒂安·贝尔饰演了蝙蝠侠、克里斯托弗·诺兰是导演、这部电影属于蝙蝠侠三部曲,并且与许多烧脑动作片有相似的电影基因。这种丰富的语义网络就是 知识图谱(KG)——一个由实体(电影、演员、导演、类型)通过带类型的关系(acted_indirected_bypart_of)连接而成的结构化网络。

为什么这对推荐系统很重要?因为纯协同过滤有一个致命缺陷——它只能推荐已有交互历史的物品。一部刚上线、播放量为零的新电影,在协同过滤中是完全“隐形”的;但若该电影的导演是你喜欢的诺兰,知识图谱在第一天就能发现这一关联。知识图谱将推荐从简单的模式匹配提升到 语义推理 的层面。


推荐系统(八)—— 知识图谱增强推荐系统 — 章节概览图

你将学到什么#

  • 知识图谱的定义及如何用三元组编码现实世界中的事实
  • 知识图谱嵌入方法(如 TransE、TransR、DistMult)如何用向量表示实体和关系
  • 传播类方法:RippleNet 模拟水波扩散,传播用户偏好
  • 图卷积类方法:KGCNKGAT 从知识图谱邻居中学习物品表示
  • 嵌入融合:CKE 整合协同信号、结构信号和文本信号
  • 基于路径推理的可解释推荐方法
  • 每种主流方法都提供完整的 PyTorch 实现代码

前置知识#

  • 掌握 Python 和 PyTorch 基础(张量、nn.Module、训练循环)
  • 了解图神经网络(本系列第 7 篇
  • 熟悉嵌入概念(本系列第 5 篇

知识图谱基础#

什么是知识图谱?#

电影知识图谱:电影、人物、类型与系列通过带类型的关系相连

知识图谱用 (头实体,关系,尾实体) 的三元组形式存储事实。每个三元组表达一个原子事实:

  • (黑暗骑士, directed_by, 诺兰)
  • (黑暗骑士, starred, 贝尔)
  • (黑暗骑士, has_genre, 动作)
  • (贝尔, acted_in, 致命魔术)

形式化定义上,知识图谱是一个集合 $\mathcal{G} = \{(h, r, t)\}$ ,其中 $h \in \mathcal{E}$头实体$r \in \mathcal{R}$关系类型$t \in \mathcal{E}$尾实体$\mathcal{E}$ 表示所有实体的集合,$\mathcal{R}$ 表示所有关系类型的集合。

类比一下:可以把知识图谱看成一个维基百科级别的事实数据库,只不过它不是用文字描述,而是用图结构存储。每个词条是节点,词条之间的链接是带标签的边——标签说明了它们为什么相关。

现实中的知识图谱#

知识图谱实体数量事实数量使用方
Freebase3900 万19 亿Google (已废弃)
Wikidata1 亿+14 亿+Wikipedia、 Google
DBpedia600 万5.8 亿学术研究
Amazon Product Graph数十亿数万亿亚马逊推荐

推荐系统中的知识图谱结构#

推荐系统用的知识图谱通常是异构图,包含多种实体和关系类型。

实体类型

  • 用户:$U = \{u_1, u_2, \ldots, u_m\}$
  • 物品:$I = \{i_1, i_2, \ldots, i_n\}$
  • 属性:类型、演员、导演、品牌……

关系类型

  • 用户—物品:(用户, 交互, 物品)
  • 物品—属性:(电影, has_genre, 动作)(电影, directed_by, 诺兰)
  • 属性—属性:(诺兰, 合作过, 贝尔)

知识图谱嵌入#

在把知识图谱输入推荐模型之前,需要将实体和关系转化为稠密向量。关键问题来了:如何训练这些嵌入,让它们保留图谱的结构信息?

TransE:头向量加上关系向量约等于尾向量;训练时把正样本拉近、把负样本推到边界之外

$$\mathcal{L} = \sum_{(h,r,t) \in \mathcal{G}}\; \sum_{(h',r,t') \notin \mathcal{G}} \bigl[\gamma + \|\mathbf{h} + \mathbf{r} - \mathbf{t}\|_2 - \|\mathbf{h}' + \mathbf{r} - \mathbf{t}'\|_2\bigr]_+$$

通俗解释:把合法三元组拉近(让 $\mathbf{h} + \mathbf{r} \approx \mathbf{t}$ ),把非法三元组推远。间隔 $\gamma$ 决定了分离的程度。上图右侧展示了这一点:绿色点(合法尾实体)被拉进间隔圆内,灰色负样本被推到圆外。

方法评分函数适用场景
TransE$\mid\mathbf{h} + \mathbf{r} - \mathbf{t}\mid$简单的 1-to-1 关系
TransR$\mid\mathbf{h} M_r + \mathbf{r} - \mathbf{t} M_r\mid$不同关系需要不同向量空间
DistMult$\mathbf{h}^T \text{diag}(\mathbf{r})\, \mathbf{t}$对称关系
ComplEx$\text{Re}(\mathbf{h}^T \text{diag}(\mathbf{r})\, \bar{\mathbf{t}})$非对称关系

TransE 类比:想象地图上的城市。“巴黎” + “向东飞 2000 公里” 应该落在 “莫斯科” 附近;“巴黎” + “向南飞 1500 公里” 应该落在 “阿尔及尔” 附近。关系向量就像地图上的位移。

实现:构建知识图谱#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import torch
import torch.nn as nn
from collections import defaultdict
from typing import List, Tuple

class KnowledgeGraph:
    """用于推荐系统的简易知识图谱。"""

    def __init__(self):
        self.entities = {}                   # entity_name -> entity_id
        self.relations = {}                  # relation_name -> relation_id
        self.triples = []                    # [(h_id, r_id, t_id), ...]
        self.entity_adj = defaultdict(list)  # entity_id -> [(r_id, t_id), ...]

    def add_entity(self, name: str) -> int:
        if name not in self.entities:
            self.entities[name] = len(self.entities)
        return self.entities[name]

    def add_relation(self, name: str) -> int:
        if name not in self.relations:
            self.relations[name] = len(self.relations)
        return self.relations[name]

    def add_triple(self, head: str, relation: str, tail: str):
        h_id = self.add_entity(head)
        r_id = self.add_relation(relation)
        t_id = self.add_entity(tail)
        self.triples.append((h_id, r_id, t_id))
        self.entity_adj[h_id].append((r_id, t_id))

    def get_neighbors(self, entity_id: int) -> List[Tuple[int, int]]:
        return self.entity_adj[entity_id]

# 构建一个小型电影知识图谱
kg = KnowledgeGraph()
kg.add_triple("黑暗骑士", "directed_by", "诺兰")
kg.add_triple("黑暗骑士", "starred", "贝尔")
kg.add_triple("黑暗骑士", "has_genre", "动作")
kg.add_triple("黑暗骑士", "has_genre", "犯罪")
kg.add_triple("盗梦空间", "directed_by", "诺兰")
kg.add_triple("盗梦空间", "starred", "迪卡普里奥")
kg.add_triple("盗梦空间", "has_genre", "科幻")
kg.add_triple("致命魔术", "directed_by", "诺兰")
kg.add_triple("致命魔术", "starred", "贝尔")
kg.add_triple("致命魔术", "has_genre", "剧情")

# 查看图谱
dk_id = kg.entities["黑暗骑士"]
print(f"黑暗骑士的邻居: {kg.get_neighbors(dk_id)}")
# (directed_by, 诺兰), (starred, 贝尔), (has_genre, 动作), (has_genre, 犯罪)

为什么知识图谱能提升推荐效果#

知识图谱解决了纯协同过滤(CF)难以处理的四个核心问题。

协同过滤常见的四种失效模式及知识图谱的应对方式

冷启动#

问题
一部新电影没有任何交互记录,协同过滤模型完全无法感知它的存在。

KG 解法
即使在上线第一天,新电影也有属性信息(导演、演员、类型),这些属性让它与知识图谱中的其他节点建立了连接。如果你喜欢诺兰的其他作品, KG 可以直接推荐他的新片,无需依赖任何交互数据。

数据稀疏#

问题
大多数用户只与极少数物品发生交互,交互矩阵中 99% 以上都是零,模型缺乏足够的信号进行学习。

KG 解法
知识图谱通过稠密的语义连接填补了空白。即使两部电影没有共同用户,它们也可能共享导演、类型或制作公司,这种语义关联本身就提供了有价值的信息。

可解释性#

问题
“喜欢 X 的用户也喜欢 Y”这样的推荐理由不够直观,用户难以理解。

KG 解法
“推荐《盗梦空间》,因为它的导演是克里斯托弗·诺兰,而你给他的另一部作品《黑暗骑士》打了 5 星。”知识图谱能够生成具体、清晰且可追溯的推理路径。

多样性#

问题
协同过滤容易陷入“信息茧房”,反复推荐相似的内容。

KG 解法
不同的关系路径会带来不同类型的推荐结果。沿着 same_director 推荐的结果和沿着 same_genresame_actor 推荐的结果完全不同,自然提升了推荐列表的多样性。

具体案例:看完《黑暗骑士》之后#

基于小型知识图谱的推理过程:从一次观影到四个候选推荐

上图展示了从用户观看《黑暗骑士》到生成四部候选电影的显式路径:

  • 盗梦空间——通过路径 黑暗骑士 -> directed_by -> 诺兰 -> directed -> 盗梦空间
  • 致命魔术——通过诺兰和贝尔两条路径同时支撑(双重信号,置信度更高)
  • 侠影之谜——通过共享主演和共享类型
  • 系统不仅能为每部候选电影生成不同的排序,还能提供对应的解释。这是纯协同过滤无法做到的。

KG 增强方法分类#

类别思路代表方法
传播类用户偏好沿知识图谱扩散RippleNet
图卷积类聚合知识图谱邻居信息学习物品嵌入KGCN、 KGAT
嵌入类联合学习用户、物品和知识图谱实体的嵌入CKE
路径类基于多跳路径推理,增强可解释性KPRN、 PathRec

RippleNet:偏好传播的涟漪效应#

推荐系统(八)—— 知识图谱增强推荐系统 — 章节小结图

RippleNet:用户偏好从历史物品出发,沿一跳、二跳邻居向外扩散

涟漪的类比#

往池塘里扔一颗石子,水波会一圈圈向外扩散。 RippleNet (Wang 等, CIKM 2018)借鉴了这一现象:当用户与某个物品交互时,这种偏好会沿着知识图谱向外传播,激活越来越远的相关实体。

工作原理#

假设用户 $u$ 的历史物品集合为 $V_u = \{v_1, v_2, \ldots\}$

  1. 第 0 跳: 用户的历史物品作为初始偏好集 $\mathcal{S}_u^0 = V_u$
  2. 第 1 跳: 找到通过任意关系与 $\mathcal{S}_u^0$ 相连的所有实体,构成 $\mathcal{S}_u^1$
  3. 第 2 跳: 再找到与 $\mathcal{S}_u^1$ 相连的所有实体,构成 $\mathcal{S}_u^2$
  4. 聚合: 对每一跳的实体用注意力机制加权,得到增强后的用户嵌入。
$$p_i^h = \text{softmax}(\mathbf{v}^T \mathbf{R}_i\, \mathbf{t})$$

简单来说:“这个 KG 实体和当前评分的物品有多相关?用关系矩阵 $\mathbf{R}$ 衡量它们的兼容性。”

$$\mathbf{o}_u^h = \sum_{(h,r,t) \in \mathcal{S}_u^h} p_i^h\, \mathbf{t}$$ $$\mathbf{u} = \sum_{h=0}^{H} \alpha_h\, \mathbf{o}_u^h$$

举个例子: 用户观看了《黑暗骑士》:

  • 第 0 跳:{黑暗骑士}
  • 第 1 跳:{诺兰、贝尔、动作、犯罪}
  • 第 2 跳:{盗梦空间、致命魔术、侠影之谜、迪卡普里奥、剧情……}

通过涟漪传播发现:用户可能喜欢《盗梦空间》(通过诺兰相连)和《致命魔术》(通过诺兰和贝尔同时相连——双重支撑,权重更高)。

实现: RippleNet#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import torch
import torch.nn as nn
import torch.nn.functional as F

class RippleNet(nn.Module):
    """RippleNet:在知识图谱上传播用户偏好。"""

    def __init__(self, num_users, num_items, num_entities, num_relations,
                 embedding_dim=64, n_hop=2, n_memory=32):
        super().__init__()
        self.n_hop = n_hop
        self.n_memory = n_memory
        self.embedding_dim = embedding_dim

        self.user_emb = nn.Embedding(num_users, embedding_dim)
        self.item_emb = nn.Embedding(num_items, embedding_dim)
        self.entity_emb = nn.Embedding(num_entities, embedding_dim)
        self.relation_emb = nn.Embedding(num_relations, embedding_dim)

        self.hop_weights = nn.Parameter(torch.ones(n_hop + 1) / (n_hop + 1))

        for emb in [self.user_emb, self.item_emb,
                    self.entity_emb, self.relation_emb]:
            nn.init.xavier_uniform_(emb.weight)

    def forward(self, user_ids, item_ids, ripple_sets):
        """
        Args:
            user_ids: [batch_size]
            item_ids: [batch_size]
            ripple_sets: ripple_sets[i][h] 是用户 i 在第 h 跳的
                         (relation_id, tail_id) 列表。
        """
        user_base = self.user_emb(user_ids)        # [B, d]
        item_vec = self.item_emb(item_ids)          # [B, d]

        user_enhanced = self._propagate(user_base, item_vec, ripple_sets)
        scores = (user_enhanced * item_vec).sum(dim=1)
        return scores

    def _propagate(self, user_base, item_vec, ripple_sets):
        batch_size = user_base.size(0)
        hop_memories = [user_base]

        for hop in range(self.n_hop):
            hop_embs = []
            for i in range(batch_size):
                rs = ripple_sets[i][hop] if hop < len(ripple_sets[i]) else []
                if not rs:
                    hop_embs.append(torch.zeros(self.embedding_dim))
                    continue

                rs = rs[:self.n_memory]                  # 限制 memory
                rels = torch.LongTensor([r for r, _ in rs])
                tails = torch.LongTensor([t for _, t in rs])

                rel_e = self.relation_emb(rels)          # [K, d]
                tail_e = self.entity_emb(tails)          # [K, d]

                # 用候选物品对每个 KG 邻居打分
                scores = (item_vec[i].unsqueeze(0) * rel_e * tail_e).sum(1)
                probs = F.softmax(scores, dim=0)         # [K]
                hop_embs.append((probs.unsqueeze(1) * tail_e).sum(0))

            hop_memories.append(torch.stack(hop_embs))

        # 多跳加权融合
        stacked = torch.stack(hop_memories, dim=1)       # [B, H+1, d]
        weights = F.softmax(self.hop_weights, dim=0)
        return (weights.unsqueeze(0).unsqueeze(2) * stacked).sum(dim=1)

    @staticmethod
    def build_ripple_sets(user_items, kg_adj, max_hops=2):
        """通过 BFS 为单个用户构建 ripple sets。"""
        ripple_sets = []
        current = set(user_items)
        for _ in range(max_hops):
            next_hop = []
            for entity in current:
                for r_id, t_id in kg_adj.get(entity, []):
                    next_hop.append((r_id, t_id))
            ripple_sets.append(next_hop)
            current = {t for _, t in next_hop}
        return ripple_sets

KGCN:知识图谱卷积网络#

换个思路#

RippleNet 把用户偏好沿着知识图谱(KG)向外传播,而 KGCN (Wang 等, WWW 2019)反过来操作:通过聚合每个物品在 KG 中的邻居信息,构建更高质量的物品表示

类比一下: RippleNet 的问题是:“从你喜欢的东西出发,知识图谱里还有什么?” 而 KGCN 的问题是:“对于每个候选物品,它的 KG 邻域能告诉我们什么?”

KGCN 的工作原理#

假设物品 $i$ 在 KG 中的邻居集合是 $\mathcal{N}_i$ , KGCN 的计算步骤如下:

  1. 采样:从 KG 中抽取 $K$ 个邻居。
  2. 打分:用学习到的关系感知注意力机制为每个邻居分配权重。
  3. 聚合:对加权后的邻居嵌入求和。
  4. 融合:将聚合结果与物品自身的嵌入结合起来。
$$\mathbf{e}_{\mathcal{N}_i}^{(l)} = \sum_{(r,e) \in \mathcal{N}_i} \pi_r(i, e)\; \mathbf{e}_e^{(l-1)}$$ $$\mathbf{e}_i^{(l)} = \sigma\!\bigl(\mathbf{W}^{(l)}[\mathbf{e}_i^{(l-1)} \,\|\, \mathbf{e}_{\mathcal{N}_i}^{(l)}] + \mathbf{b}^{(l)}\bigr)$$

通俗解释: “看看这个物品在 KG 中连接了哪些节点(演员、导演、类型)。根据每种关系的重要性给连接打分,最后把邻域信息和物品自身嵌入混合起来。”

实现: KGCN#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import torch
import torch.nn as nn
import torch.nn.functional as F

class KGCNLayer(nn.Module):
    """单层 KGCN,支持关系感知聚合。"""

    def __init__(self, embedding_dim, num_relations):
        super().__init__()
        self.embedding_dim = embedding_dim

        # 关系变换矩阵
        self.rel_transforms = nn.ModuleList([
            nn.Linear(embedding_dim, embedding_dim, bias=False)
            for _ in range(num_relations)
        ])
        # 注意力评分模块
        self.attn = nn.ModuleList([
            nn.Linear(embedding_dim, 1)
            for _ in range(num_relations)
        ])
        # 融合模块
        self.combine = nn.Linear(embedding_dim * 2, embedding_dim)

    def forward(self, item_embs, neighbors, relation_embs):
        num_items = item_embs.size(0)
        aggregated = []

        for i in range(num_items):
            nbrs = neighbors.get(i, [])
            if not nbrs:
                aggregated.append(torch.zeros(self.embedding_dim))
                continue

            nbr_embs, scores = [], []
            for r_id, n_id in nbrs:
                # 对邻居嵌入进行关系变换
                transformed = self.rel_transforms[r_id](item_embs[n_id])
                nbr_embs.append(transformed)
                # 计算兼容性得分
                compatibility = item_embs[i] + relation_embs[r_id]
                scores.append(self.attn[r_id](compatibility))

            nbr_embs = torch.stack(nbr_embs)
            scores = torch.cat(scores)
            weights = F.softmax(scores, dim=0)
            # 加权聚合邻居嵌入
            aggregated.append((weights.unsqueeze(1) * nbr_embs).sum(0))

        aggregated = torch.stack(aggregated)
        combined = torch.cat([item_embs, aggregated], dim=1)
        return self.combine(combined)

class KGCN(nn.Module):
    """基于知识图谱卷积网络的推荐模型。"""

    def __init__(self, num_users, num_items, num_entities, num_relations,
                 embedding_dim=64, n_layers=2):
        super().__init__()
        self.num_users = num_users
        self.num_items = num_items

        # 用户、实体、关系嵌入
        self.user_emb = nn.Embedding(num_users, embedding_dim)
        self.entity_emb = nn.Embedding(num_entities, embedding_dim)
        self.relation_emb = nn.Embedding(num_relations, embedding_dim)

        # 多层 KGCN
        self.layers = nn.ModuleList([
            KGCNLayer(embedding_dim, num_relations)
            for _ in range(n_layers)
        ])

        # 初始化嵌入
        for emb in [self.user_emb, self.entity_emb, self.relation_emb]:
            nn.init.xavier_uniform_(emb.weight)

    def forward(self, user_ids, item_ids, item_neighbors):
        user_vec = self.user_emb(user_ids)
        item_vec = self.entity_emb.weight[:self.num_items]
        rel_vec = self.relation_emb.weight

        for idx, layer in enumerate(self.layers):
            if idx < len(item_neighbors):
                item_vec = layer(item_vec, item_neighbors[idx], rel_vec)
                item_vec = F.relu(item_vec)

        item_final = item_vec[item_ids]
        return (user_vec * item_final).sum(dim=1)

KGAT:知识图谱注意力网络#

KGAT:注意力权重直观展示物品的哪些知识图谱邻居对用户最重要

KGAT 的改进点#

$$\mathcal{G}_\text{CKG} = \underbrace{\{(u, \text{interact}, i)\}}_{\text{用户-物品边}} \;\cup\; \underbrace{\{(h, r, t)\}}_{\text{KG 边}}$$

为什么重要? 在 KGCN 中,用户嵌入和物品嵌入位于不同的空间;而在 KGAT 中,它们都作为同一张图中的节点,因此协同信号和语义信号能够自然地共同传播。

注意力机制#

$$\pi(e, r, e') = \frac{\exp\!\bigl(\text{LeakyReLU}(\mathbf{a}^T [\mathbf{e}_e \,\|\, \mathbf{e}_r \,\|\, \mathbf{e}_{e'}])\bigr)}{\sum_{(r'', e'') \in \mathcal{N}(e)} \exp\!\bigl(\text{LeakyReLU}(\mathbf{a}^T [\mathbf{e}_e \,\|\, \mathbf{e}_{r''} \,\|\, \mathbf{e}_{e''}])\bigr)}$$

简单解释: “对每个邻居,拼接实体、关系和邻居的嵌入,通过一个可学习的打分函数,再用 softmax 归一化,告诉模型哪些连接更重要。” 上图清晰展示了这一点:通向“诺兰”和“迪卡普里奥”的边很粗(高注意力),而通向“2010”(上映年份)的边很细。

$$\mathbf{e}_e^{(l)} = \sigma\!\bigl(\mathbf{W}^{(l)}[\mathbf{e}_e^{(l-1)} \,\|\, \mathbf{e}_{\mathcal{N}(e)}^{(l-1)}] + \mathbf{b}^{(l)}\bigr)$$

多头注意力#

$$\mathbf{e}_{\mathcal{N}(e)} = \big\|_{k=1}^{K}\; \sum_{(r,e') \in \mathcal{N}(e)} \pi^{(k)}(e, r, e')\, \mathbf{e}_{e'}^{(k)}$$

实现: KGAT#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import torch
import torch.nn as nn
import torch.nn.functional as F

class KGATLayer(nn.Module):
    """单层 KGAT,支持关系感知注意力。"""

    def __init__(self, embedding_dim, num_heads=1, dropout=0.1):
        super().__init__()
        self.embedding_dim = embedding_dim
        self.num_heads = num_heads
        self.dropout = dropout

        # 在 [entity || relation || neighbor] 上打分
        self.attention = nn.Linear(embedding_dim * 3, num_heads)
        self.W_v = nn.Linear(embedding_dim, embedding_dim)
        self.output = nn.Linear(embedding_dim, embedding_dim)

    def forward(self, entity_embs, relation_embs, neighbors):
        outputs = []
        for i in range(entity_embs.size(0)):
            nbrs = neighbors.get(i, [])
            if not nbrs:
                outputs.append(entity_embs[i])
                continue

            scores, values = [], []
            for r_id, n_id in nbrs:
                combined = torch.cat([
                    entity_embs[i], relation_embs[r_id], entity_embs[n_id]
                ])
                scores.append(self.attention(combined))
                values.append(self.W_v(entity_embs[n_id]))

            scores = F.softmax(torch.stack(scores), dim=0)  # [K, heads]
            scores = F.dropout(scores, p=self.dropout, training=self.training)
            values = torch.stack(values)                     # [K, d]

            aggregated = (scores.mean(dim=1, keepdim=True) * values).sum(0)
            out = entity_embs[i] + aggregated
            outputs.append(F.relu(self.output(out)))

        return torch.stack(outputs)

class KGAT(nn.Module):
    """用于推荐的知识图谱注意力网络。"""

    def __init__(self, num_users, num_items, num_entities, num_relations,
                 embedding_dim=64, n_layers=2, num_heads=2, dropout=0.1):
        super().__init__()
        self.num_users = num_users

        self.entity_emb = nn.Embedding(num_entities, embedding_dim)
        self.user_emb = nn.Embedding(num_users, embedding_dim)
        self.relation_emb = nn.Embedding(num_relations, embedding_dim)

        self.layers = nn.ModuleList([
            KGATLayer(embedding_dim, num_heads, dropout)
            for _ in range(n_layers)
        ])

        for emb in [self.entity_emb, self.user_emb, self.relation_emb]:
            nn.init.xavier_uniform_(emb.weight)

    def forward(self, user_ids, item_ids, ckg_neighbors):
        # 合并嵌入表:[users | entities]
        all_embs = self.entity_emb.weight.clone()
        all_embs[:self.num_users] = self.user_emb.weight
        rel_embs = self.relation_emb.weight

        for idx, layer in enumerate(self.layers):
            if idx < len(ckg_neighbors):
                all_embs = layer(all_embs, rel_embs, ckg_neighbors[idx])

        user_final = all_embs[user_ids]
        item_final = all_embs[self.num_users + item_ids]
        return (user_final * item_final).sum(dim=1)

CKE:协同知识库嵌入#

多信号融合的思路#

$$\mathbf{i} = \mathbf{i}_\text{CF} + \mathbf{i}_\text{KG} + \mathbf{i}_\text{text}$$
分量来源学习方法
$\mathbf{i}_\text{CF}$用户-物品交互矩阵分解
$\mathbf{i}_\text{KG}$知识图谱结构TransR 嵌入
$\mathbf{i}_\text{text}$物品描述文本CNN 文本编码

为什么需要三种信号? 每种信号捕捉其他信号无法覆盖的信息:

  • CF 知道用户喜欢什么,但不知道为什么喜欢
  • KG 理解语义关系,但不了解用户行为。
  • 文本能捕捉结构化三元组难以表达的细腻描述

联合训练#

$$ \mathcal{L} = \mathcal{L}_\text{CF} + \lambda_1 \mathcal{L}_\text{KG} + \lambda_2 \mathcal{L}_\text{text} + \lambda_3 \mathcal{L}_\text{reg} $$

CF 损失基于标准矩阵分解公式 $\hat{r}_{ui} = \mathbf{u}^T \mathbf{i}$ 。 KG 损失采用 TransR 方法:拉近合法三元组,推远非法三元组。

实现: CKE#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import torch
import torch.nn as nn
import torch.nn.functional as F

class TransR(nn.Module):
    """TransR:每个关系自带一套实体投影矩阵。"""

    def __init__(self, num_entities, num_relations, entity_dim, relation_dim):
        super().__init__()
        self.entity_emb = nn.Embedding(num_entities, entity_dim)
        self.relation_emb = nn.Embedding(num_relations, relation_dim)
        self.proj_matrices = nn.Embedding(num_relations,
                                          entity_dim * relation_dim)
        self.entity_dim = entity_dim
        self.relation_dim = relation_dim

        for emb in [self.entity_emb, self.relation_emb, self.proj_matrices]:
            nn.init.xavier_uniform_(emb.weight)

    def forward(self, heads, relations, tails,
                neg_heads=None, neg_tails=None):
        h = self.entity_emb(heads)
        r = self.relation_emb(relations)
        t = self.entity_emb(tails)

        proj = self.proj_matrices(relations).view(
            -1, self.entity_dim, self.relation_dim
        )
        h_proj = torch.bmm(h.unsqueeze(1), proj).squeeze(1)
        t_proj = torch.bmm(t.unsqueeze(1), proj).squeeze(1)

        pos_score = (h_proj + r - t_proj).norm(p=2, dim=1) ** 2

        if neg_heads is not None and neg_tails is not None:
            nh = self.entity_emb(neg_heads)
            nt = self.entity_emb(neg_tails)
            nh_proj = torch.bmm(nh.unsqueeze(1), proj).squeeze(1)
            nt_proj = torch.bmm(nt.unsqueeze(1), proj).squeeze(1)
            neg_score = (nh_proj + r - nt_proj).norm(p=2, dim=1) ** 2
            return F.relu(1.0 + pos_score - neg_score).mean()

        return pos_score

class CKE(nn.Module):
    """协同知识库嵌入模型。"""

    def __init__(self, num_users, num_items, num_entities, num_relations,
                 embedding_dim=64, relation_dim=32,
                 kg_lambda=0.1, reg_lambda=0.01):
        super().__init__()
        self.num_entities = num_entities
        self.kg_lambda = kg_lambda
        self.reg_lambda = reg_lambda

        # CF 分量
        self.user_emb = nn.Embedding(num_users, embedding_dim)
        self.item_cf_emb = nn.Embedding(num_items, embedding_dim)

        # KG 分量(TransR)
        self.transr = TransR(num_entities, num_relations,
                             embedding_dim, relation_dim)

        # 将 KG 嵌入投影到 CF 空间
        self.kg_proj = nn.Linear(embedding_dim, embedding_dim)

        nn.init.xavier_uniform_(self.user_emb.weight)
        nn.init.xavier_uniform_(self.item_cf_emb.weight)

    def forward(self, user_ids, item_ids):
        user_vec = self.user_emb(user_ids)
        item_cf = self.item_cf_emb(item_ids)
        item_kg = self.kg_proj(self.transr.entity_emb(item_ids))

        item_combined = item_cf + item_kg
        return (user_vec * item_combined).sum(dim=1)

    def compute_loss(self, user_ids, item_ids, ratings,
                     kg_heads=None, kg_rels=None, kg_tails=None):
        # CF 损失
        preds = self.forward(user_ids, item_ids)
        cf_loss = F.mse_loss(preds, ratings.float())

        # KG 损失
        kg_loss = torch.tensor(0.0)
        if kg_heads is not None:
            neg_heads = torch.randint(0, self.num_entities, kg_heads.shape)
            neg_tails = torch.randint(0, self.num_entities, kg_tails.shape)
            kg_loss = self.transr(kg_heads, kg_rels, kg_tails,
                                  neg_heads, neg_tails)

        # 正则化
        reg = (self.user_emb.weight.norm(2) ** 2 +
               self.item_cf_emb.weight.norm(2) ** 2)

        return cf_loss + self.kg_lambda * kg_loss + self.reg_lambda * reg

基于路径的可解释推荐#

为什么路径重要#

前面提到的方法学习的是嵌入——一种稠密、强大但不透明的向量。基于路径的推理另辟蹊径:从用户的历史行为出发,沿着知识图谱找到通往候选物品的具体路径,并将这些路径同时用作特征和解释。

示例路径: 用户给《黑暗骑士》打了高分 -> 《黑暗骑士》 directed_by 诺兰 -> 诺兰 directed 《盗梦空间》 -> 推荐《盗梦空间》

这条路径不仅是模型的一个特征,还能直接作为人类可读的解释。

实现:多跳路径推理#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import torch
import torch.nn as nn
import torch.nn.functional as F
from collections import deque

class PathReasoning(nn.Module):
    """寻找并打分知识图谱路径,用于可解释推荐。"""

    def __init__(self, num_entities, num_relations, embedding_dim=64):
        super().__init__()
        self.entity_emb = nn.Embedding(num_entities, embedding_dim)
        self.relation_emb = nn.Embedding(num_relations, embedding_dim)

        self.path_scorer = nn.Sequential(
            nn.Linear(embedding_dim * 3, embedding_dim),
            nn.ReLU(),
            nn.Linear(embedding_dim, 1),
        )

        nn.init.xavier_uniform_(self.entity_emb.weight)
        nn.init.xavier_uniform_(self.relation_emb.weight)

    def find_paths(self, kg_adj, start, end, max_hops=3, max_paths=10):
        """使用 BFS 查找从起点到终点的路径。"""
        paths = []
        queue = deque([(start, [], [])])
        visited = set()

        while queue and len(paths) < max_paths:
            current, rel_path, ent_path = queue.popleft()

            if current == end and rel_path:
                paths.append((rel_path, ent_path + [end]))
                continue
            if len(rel_path) >= max_hops:
                continue

            for r_id, t_id in kg_adj.get(current, []):
                if (current, r_id, t_id) not in visited:
                    visited.add((current, r_id, t_id))
                    queue.append((t_id,
                                  rel_path + [r_id],
                                  ent_path + [current]))

        return paths[:max_paths]

    def score_path(self, path):
        """用嵌入对单条路径打分。"""
        rel_ids, ent_ids = path
        if not rel_ids:
            return torch.tensor(0.0)

        start = self.entity_emb(torch.tensor([ent_ids[0]]))
        rels = self.relation_emb(torch.tensor(rel_ids)).mean(0, keepdim=True)
        end = self.entity_emb(torch.tensor([ent_ids[-1]]))

        combined = torch.cat([start, rels, end], dim=1)
        return self.path_scorer(combined).squeeze()

    def recommend_with_explanations(self, user_items, kg_adj,
                                    candidates, max_hops=3):
        """返回 [(item_id, score, best_path)],按分数倒序排列。"""
        results = []
        for item_id in candidates:
            best_score = float('-inf')
            best_path = None
            for src_item in user_items:
                for path in self.find_paths(kg_adj, src_item, item_id,
                                            max_hops=max_hops):
                    score = self.score_path(path).item()
                    if score > best_score:
                        best_score = score
                        best_path = path
            if best_path is not None:
                results.append((item_id, best_score, best_path))

        results.sort(key=lambda x: x[1], reverse=True)
        return results

加 KG 与不加 KG:效果对比#

MovieLens-1M 上的定量提升,以及推荐列表的定性对比

左图展示了 KGAT 在所有标准指标上都比纯 BPR 基线有两位数的相对提升。特别是在冷启动 Recall@20 上,提升幅度更大。这正是协同过滤最难应对的场景。

右图直观地体现了差距。不加 KG 时,模型只能依赖热度,推荐的全是无关的热门大片;那部刚上线的诺兰新片完全不会出现,因为还没人看过。加上 KG 后,模型推荐的电影在主题上与用户兴趣高度相关。而且,由于图谱已经知道“诺兰是导演”,新片上线当天就能被推荐出来。


工程实践要点#

知识图谱的构建#

来源类型示例
Wikidata / DBpedia公开结构化 KG电影元数据、公司信息
产品目录内部数据库品牌、类目、规格
NER + 关系抽取非结构化文本评论、描述

实体对齐#

推荐系统中的物品需要与知识图谱中的实体建立关联。常用方法包括:

  1. 精确字符串匹配——速度快,但容易受格式变化影响。
  2. 模糊匹配——能处理拼写错误和缩写问题。
  3. 嵌入相似度——学习两边的嵌入表示,通过近邻匹配完成对齐。
  4. 人工标注——质量最高,但成本也高。

训练策略#

策略使用场景
联合训练数据量充足时,同时优化 KG 和推荐目标
预训练 + 微调KG 数据规模远大于交互数据时
多任务学习希望多个任务共享特征表示时

跳数的选择#

  • 1 跳: 只用直接属性,速度快,但信息较浅。
  • 2 跳: 大多数任务的最佳选择,能够捕捉“同导演”或“同演员”等模式。
  • 3 跳: 路径更丰富,但噪声增加,需要用注意力机制过滤。
  • 4 跳及以上: 收益递减,信噪比显著下降。

完整训练流程#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import torch
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import numpy as np

class KGRecDataset(Dataset):
    """KG 增强推荐的数据集"""

    def __init__(self, user_ids, item_ids, ratings):
        self.user_ids = torch.LongTensor(user_ids)
        self.item_ids = torch.LongTensor(item_ids)
        self.ratings = torch.FloatTensor(ratings)

    def __len__(self):
        return len(self.user_ids)

    def __getitem__(self, idx):
        return {
            'user_id': self.user_ids[idx],
            'item_id': self.item_ids[idx],
            'rating': self.ratings[idx],
        }

def train_cke(model, train_loader, val_loader, kg_triples=None,
              num_epochs=10, lr=0.001):
    """训练 CKE 模型,可选加入 KG 损失"""
    optimizer = optim.Adam(model.parameters(), lr=lr)
    best_val = float('inf')

    for epoch in range(num_epochs):
        model.train()
        train_loss = 0

        for batch in train_loader:
            optimizer.zero_grad()
            loss = model.compute_loss(
                batch['user_id'], batch['item_id'], batch['rating'],
                *kg_triples if kg_triples else (None, None, None)
            )
            loss.backward()
            optimizer.step()
            train_loss += loss.item()

        model.eval()
        val_loss = 0
        with torch.no_grad():
            for batch in val_loader:
                preds = model(batch['user_id'], batch['item_id'])
                val_loss += F.mse_loss(preds, batch['rating'].float()).item()

        train_loss /= len(train_loader)
        val_loss /= len(val_loader)
        print(f"Epoch {epoch+1}/{num_epochs} | "
              f"Train: {train_loss:.4f} | Val: {val_loss:.4f}")

        if val_loss < best_val:
            best_val = val_loss
            torch.save(model.state_dict(), 'best_kg_model.pt')

if __name__ == "__main__":
    num_users, num_items = 1000, 500
    num_entities, num_relations = 2000, 10

    train_data = KGRecDataset(
        np.random.randint(0, num_users, 8000),
        np.random.randint(0, num_items, 8000),
        np.random.uniform(1, 5, 8000),
    )
    val_data = KGRecDataset(
        np.random.randint(0, num_users, 2000),
        np.random.randint(0, num_items, 2000),
        np.random.uniform(1, 5, 2000),
    )

    model = CKE(num_users, num_items, num_entities, num_relations)
    train_cke(
        model,
        DataLoader(train_data, batch_size=64, shuffle=True),
        DataLoader(val_data, batch_size=64),
        num_epochs=10,
    )

常见问题#

知识图谱如何解决冷启动问题?#

新物品即使没有交互记录,它在知识图谱中仍然有属性信息(比如类型、导演、演员)。这些属性将它与其他有交互历史的物品关联起来。如果用户喜欢诺兰的电影,系统可以在他新片上映当天直接推荐,完全依赖 KG 的连接关系。

RippleNet 和 KGCN 有什么区别?#

RippleNet 是以用户为中心的:从用户的历史行为出发,沿着知识图谱向外扩散。
KGCN 是以物品为中心的:通过聚合物品的知识图谱邻居来构建增强的物品表示。
实际应用中, KGCN 更容易扩展,因为物品的邻居比用户的偏好波动更稳定。

什么时候该用 KGAT 而不是 KGCN?#

当你需要在一个统一图中建模协同信号和语义信号的交互时, KGAT 是更好的选择。它的协同知识图谱(CKG)将用户-物品边和知识图谱边合并,注意力机制可以同时学习两种边的信息;而 KGCN 则分开处理。

CKE 和基于图的方法相比如何?#

CKE 更简单、更快——它预先学习知识图谱嵌入,然后通过加法融合。基于图的方法(如 KGCN、 KGAT)更强大,因为它们在推理时会通过图传播信息,但计算成本也更高。工程实践中,通常先用 CKE 快速得到一个基线结果,再升级到 KGAT 追求更高的精度。

知识图谱能提升推荐多样性吗?#

当然可以。不同类型的关系会带来不同的连接路径。例如,same_directorsame_genresame_actor 的推荐结果可能完全不同。通过探索多种关系路径,系统自然能够提供多样化的推荐,避免陷入信息茧房。

如何处理噪声大或不完整的知识图谱?#

  1. 注意力机制(如 KGAT)会自动降低噪声连接的权重。
  2. TransR 嵌入在训练过程中学会忽略不一致的三元组。
  3. 数据增强:通过关系推理预测缺失的边。
  4. 多任务学习:共享信息,增强模型对缺失数据的鲁棒性。

计算瓶颈在哪里?#

最大的瓶颈是大规模知识图谱(百万级实体)上的邻居聚合。常见优化方法包括:

  • 邻居采样:每个节点最多取 $K$ 个邻居。
  • 分层聚合:先聚合属性,再聚合物品。
  • 小批量训练:结合子图采样。
  • 预计算知识图谱嵌入:采用 CKE 的方式。

知识图谱能否实现可解释推荐?#

这是知识图谱的一大优势。基于路径的方法可以直接生成解释,比如:“推荐电影 X,因为它和你高分评价的电影 Z 都是导演 Y 的作品。” 这种解释具体、易读且基于事实,远胜过“与你相似的用户也喜欢这个”。

KG 增强推荐的最新趋势是什么?#

最近的研究方向包括:

  1. 时序知识图谱:建模偏好和事实随时间的变化。
  2. 多模态知识图谱:结合结构化知识与图像、文本等多模态信息。
  3. 基于 Transformer 的知识图谱方法:用自注意力机制替代 GCN 的聚合操作。
  4. 大规模预训练知识图谱嵌入:从基础模型中获取知识图谱表示。
  5. LLM + KG 混合方法:利用大语言模型从文本中抽取知识图谱,并在图上进行推理。

总结#

  • 知识图谱为推荐系统引入语义理解,突破了纯协同信号的局限。
  • 冷启动是知识图谱的核心应用场景:即使物品没有任何交互历史,也能生成推荐。
  • RippleNet 沿知识图谱传播用户偏好,像水波一样向外扩散。
  • KGCN 和 KGAT 通过聚合知识图谱邻居信息,构建更丰富的物品表示。
  • CKE 融合三种信号(协同、结构、文本),生成统一的物品嵌入。
  • 基于路径的推理提供可解释性——每条推荐都有具体、易读的理由。
  • 2 跳是最佳选择;跳数过多会引入噪声,收益反而下降。

本文是推荐系统系列的第 8 篇,共 16 篇。

本系列

推荐系统 16 篇

  1. 01 推荐系统(一)—— 入门与基础概念
  2. 02 推荐系统(二)—— 协同过滤与矩阵分解
  3. 03 推荐系统(三)—— 深度学习基础模型
  4. 04 推荐系统(四)—— CTR 预估与点击率建模
  5. 05 推荐系统(五)—— Embedding 表示学习
  6. 06 推荐系统(六)—— 序列推荐与会话建模
  7. 07 推荐系统(七)—— 图神经网络与社交推荐
  8. 08 推荐系统(八)—— 知识图谱增强推荐系统 当前
  9. 09 推荐系统(九)—— 多任务学习与多目标优化
  10. 10 推荐系统(十)—— 深度兴趣网络与注意力机制
  11. 11 推荐系统(十一)—— 对比学习与自监督学习
  12. 12 推荐系统(十二)—— 大语言模型与推荐系统
  13. 13 推荐系统(十三)—— 公平性、去偏与可解释性
  14. 14 推荐系统(十四)—— 跨域推荐与冷启动解决方案
  15. 15 推荐系统(十五)—— 实时推荐与在线学习
  16. 16 推荐系统(十六)—— 工业级架构与最佳实践

读有所得?

GitHub 关注我 → 新文周更

GitHub