跳到主要内容

聚合框架

问题

MongoDB 的聚合框架(Aggregation Pipeline)是什么?如何使用 Pipeline 阶段完成复杂数据分析?

答案

聚合管道概念

MongoDB 的聚合框架采用 管道(Pipeline) 模式:数据像在流水线上一样依次流过多个 阶段(Stage),每个阶段对数据进行一种变换,最终输出结果。

db.orders.aggregate([
{ $match: { status: "paid" } }, // 阶段1:过滤
{ $group: { _id: "$category", total: { $sum: "$amount" } } }, // 阶段2:分组聚合
{ $sort: { total: -1 } }, // 阶段3:排序
{ $limit: 10 } // 阶段4:取前10
]);
聚合管道 vs SQL

聚合管道是 MongoDB 版的 SELECT ... FROM ... WHERE ... GROUP BY ... ORDER BY,但更灵活 —— 可以任意组合阶段,同一个阶段可以出现多次。

核心阶段详解

$match — 过滤

等价于 SQL 的 WHERE应尽早使用 $match,它能利用索引,减少后续阶段处理的数据量。

{ $match: { status: "paid", amount: { $gte: 100 } } }
性能关键

$match 放在管道最前面时可以利用索引。如果放在 $group$project 之后,已经改变了文档结构,无法再使用原始索引

$group — 分组聚合

等价于 SQL 的 GROUP BY_id 是分组键,可以是单字段、多字段或表达式。

// 按类别统计:总金额、平均金额、订单数
{
$group: {
_id: "$category", // 分组键
totalAmount: { $sum: "$amount" }, // SUM
avgAmount: { $avg: "$amount" }, // AVG
orderCount: { $count: {} }, // COUNT(5.0+)
maxAmount: { $max: "$amount" }, // MAX
minAmount: { $min: "$amount" }, // MIN
allProducts: { $push: "$product" }, // 收集所有值到数组
uniqueProducts: { $addToSet: "$product" }, // 收集去重值
firstOrder: { $first: "$orderId" }, // 每组第一条
lastOrder: { $last: "$orderId" } // 每组最后一条
}
}

// 多字段分组
{
$group: {
_id: { category: "$category", year: { $year: "$createdAt" } },
total: { $sum: "$amount" }
}
}

// _id: null 表示不分组,对所有文档聚合
{
$group: {
_id: null,
totalRevenue: { $sum: "$amount" },
totalOrders: { $count: {} }
}
}

$project — 投影/字段变换

等价于 SQL 的 SELECT。可以包含/排除字段、重命名、计算新字段。

{
$project: {
_id: 0, // 排除 _id
name: 1, // 包含 name
fullName: { $concat: ["$firstName", " ", "$lastName"] }, // 字符串拼接
discountedPrice: { $multiply: ["$price", 0.8] }, // 计算
year: { $year: "$createdAt" }, // 提取年份
status: { // 条件表达式
$cond: {
if: { $gte: ["$amount", 1000] },
then: "大额",
else: "普通"
}
}
}
}

$sort — 排序

{ $sort: { totalAmount: -1, name: 1 } }  // 金额降序,名称升序
内存限制

排序操作默认最多使用 100 MB 内存。超过时需要添加 { allowDiskUse: true } 参数允许使用磁盘临时文件。

$lookup — 关联查询

等价于 SQL 的 LEFT JOIN。跨集合关联。

// 基础用法:等值匹配
{
$lookup: {
from: "products", // 关联的集合
localField: "productId", // 当前集合的字段
foreignField: "_id", // 关联集合的字段
as: "productDetail" // 输出的数组字段名
}
}
// 结果:productDetail 是一个数组(即使只有一个匹配)

// 配合 $unwind 展开为单个对象
{ $lookup: { from: "products", localField: "productId", foreignField: "_id", as: "product" } },
{ $unwind: "$product" } // 数组 → 单个文档

// 高级用法:子管道(5.0+,更灵活)
{
$lookup: {
from: "orders",
let: { userId: "$_id" }, // 传递当前文档的变量
pipeline: [ // 在关联集合上执行子管道
{ $match: {
$expr: { $eq: ["$customerId", "$$userId"] } // $$ 引用 let 变量
}},
{ $match: { status: "paid" } },
{ $group: { _id: null, totalSpent: { $sum: "$amount" } } }
],
as: "orderStats"
}
}
$lookup 性能

$lookup 类似嵌套循环 JOIN,没有哈希 JOIN 或排序合并 JOIN 优化。对于大数据集,性能不如关系型数据库的 JOIN。如果频繁需要关联,应考虑将数据嵌入而非引用。确保关联字段有索引。

$unwind — 展开数组

将数组字段拆分为多个文档,每个文档包含数组中的一个元素。

// 原始文档
{ _id: 1, name: "Alice", tags: ["a", "b", "c"] }

// $unwind 后
{ $unwind: "$tags" }
// → { _id: 1, name: "Alice", tags: "a" }
// → { _id: 1, name: "Alice", tags: "b" }
// → { _id: 1, name: "Alice", tags: "c" }

// 保留空数组/缺失字段的文档
{
$unwind: {
path: "$tags",
preserveNullAndEmptyArrays: true // 默认 false(丢弃)
}
}

典型用途:先 $unwind 展开数组,再 $group 按数组元素统计。

addFields/addFields / set — 添加新字段

$addFields$set(4.2+ 别名)向文档添加新字段或覆盖已有字段,保留其他所有字段:

{
$addFields: {
totalPrice: { $multiply: ["$price", "$quantity"] },
discounted: { $multiply: ["$price", "$quantity", 0.9] }
}
}

$project 的区别:$project 只输出指定的字段,$addFields 保留所有原字段。

$facet — 多管道分支

在一次聚合中执行 多条独立的管道,返回多个结果集。适合同时计算不同维度的统计。

db.orders.aggregate([
{
$facet: {
// 分支1:按状态统计
byStatus: [
{ $group: { _id: "$status", count: { $count: {} } } }
],
// 分支2:按月份统计
byMonth: [
{ $group: {
_id: { $month: "$createdAt" },
total: { $sum: "$amount" }
}},
{ $sort: { "_id": 1 } }
],
// 分支3:总计
summary: [
{ $group: {
_id: null,
totalOrders: { $count: {} },
totalRevenue: { $sum: "$amount" }
}}
]
}
}
]);

其他常用阶段

阶段作用等价 SQL
$skip跳过文档OFFSET
$limit限制数量LIMIT
$count计数SELECT COUNT(*)
$out输出到新集合CREATE TABLE ... AS SELECT
$merge合并到已有集合INSERT ... ON CONFLICT UPDATE
$bucket分桶统计CASE WHEN ... THEN
$replaceRoot替换根文档
$unionWith合并另一个集合UNION ALL

实战示例

示例1:电商销售分析

// 统计每个类别的月度销售额 Top 5
db.orders.aggregate([
// 1. 只看已支付订单
{ $match: { status: "paid", createdAt: { $gte: ISODate("2024-01-01") } } },

// 2. 展开订单项
{ $unwind: "$items" },

// 3. 按类别+月份分组统计
{ $group: {
_id: {
category: "$items.category",
month: { $dateToString: { format: "%Y-%m", date: "$createdAt" } }
},
revenue: { $sum: { $multiply: ["$items.price", "$items.qty"] } },
orderCount: { $count: {} }
}},

// 4. 按收入降序
{ $sort: { revenue: -1 } },

// 5. 整理输出格式
{ $project: {
_id: 0,
category: "$_id.category",
month: "$_id.month",
revenue: { $round: ["$revenue", 2] },
orderCount: 1
}}
]);

示例2:用户活跃度分析

// 统计用户留存:注册后 7 天内是否有活跃操作
db.users.aggregate([
// 关联活动日志
{ $lookup: {
from: "activities",
let: { uid: "$_id", regDate: "$registeredAt" },
pipeline: [
{ $match: {
$expr: {
$and: [
{ $eq: ["$userId", "$$uid"] },
{ $lte: [
"$timestamp",
{ $dateAdd: { startDate: "$$regDate", unit: "day", amount: 7 } }
]}
]
}
}},
{ $count: "activityCount" }
],
as: "weekActivity"
}},

// 判断是否留存
{ $addFields: {
isRetained: {
$cond: {
if: { $gt: [{ $size: "$weekActivity" }, 0] },
then: true,
else: false
}
}
}},

// 按注册月份和留存状态统计
{ $group: {
_id: {
month: { $dateToString: { format: "%Y-%m", date: "$registeredAt" } },
retained: "$isRetained"
},
count: { $count: {} }
}}
]);

聚合管道优化

优化手段说明
$match 前置放在管道最前面,利用索引过滤
$project 前置尽早去除不需要的字段,减少内存消耗
索引覆盖$match$sort 使用的字段要有索引
$limit + $sortMongoDB 会自动合并为内存内的 Top-N 排序
避免 $lookup尽量通过 Schema 设计(嵌入)避免跨集合关联
allowDiskUse大数据量聚合时允许使用磁盘临时存储

常见面试问题

Q1: 聚合管道和 MapReduce 有什么区别?

答案

维度聚合管道MapReduce
性能使用 C++ 实现,性能更好JavaScript 引擎执行,性能差
易用性声明式阶段组合,直观需要编写 map/reduce 函数
灵活性内置丰富操作符可以写任意 JS 逻辑
索引利用$match/$sort 可用索引不能使用索引
推荐程度官方推荐5.0+ 已废弃

MongoDB 5.0 以后已经完全推荐使用聚合管道,MapReduce 已被标记为废弃。

Q2: $lookup 的性能如何?如何优化?

答案

$lookup 的实现类似嵌套循环 JOIN,性能受限于:

  • 外层文档数量 × 内层查询次数
  • 没有哈希 JOIN 等高级优化

优化方法

  1. 确保 foreignField 有索引(最关键)
  2. $match 前置,减少需要关联的文档数量
  3. 使用子管道(pipeline 形式)在关联时就过滤和投影,减少返回数据量
  4. 考虑将频繁关联的数据嵌入到文档中,从根本上消除 $lookup

Q3: 聚合管道的内存限制是多少?如何突破?

答案

  • 单个管道阶段最多使用 100 MB 内存
  • 超过时会报错:"Sort exceeded memory limit"
  • 设置 { allowDiskUse: true } 允许使用磁盘临时文件:
db.orders.aggregate([...], { allowDiskUse: true });

但使用磁盘会显著降低性能,更好的做法是通过 $match 提前过滤、$project 减少字段来控制内存用量。

Q4: 如何用聚合管道实现 SQL 的 HAVING?

答案

SQL:

SELECT category, SUM(amount) as total
FROM orders
GROUP BY category
HAVING SUM(amount) > 10000

MongoDB 等价写法 — 在 $group 后再加一个 $match

db.orders.aggregate([
{ $group: { _id: "$category", total: { $sum: "$amount" } } },
{ $match: { total: { $gt: 10000 } } } // 等价于 HAVING
]);

相关链接