聚合框架
问题
MongoDB 的聚合框架(Aggregation Pipeline)是什么?如何使用 Pipeline 阶段完成复杂数据分析?
答案
聚合管道概念
MongoDB 的聚合框架采用 管道(Pipeline) 模式:数据像在流水线上一样依次流过多个 阶段(Stage),每个阶段对数据进行一种变换,最终输出结果。
db.orders.aggregate([
{ $match: { status: "paid" } }, // 阶段1:过滤
{ $group: { _id: "$category", total: { $sum: "$amount" } } }, // 阶段2:分组聚合
{ $sort: { total: -1 } }, // 阶段3:排序
{ $limit: 10 } // 阶段4:取前10
]);
聚合管道是 MongoDB 版的 SELECT ... FROM ... WHERE ... GROUP BY ... ORDER BY,但更灵活 —— 可以任意组合阶段,同一个阶段可以出现多次。
核心阶段详解
$match — 过滤
等价于 SQL 的 WHERE。应尽早使用 $match,它能利用索引,减少后续阶段处理的数据量。
{ $match: { status: "paid", amount: { $gte: 100 } } }
$match 放在管道最前面时可以利用索引。如果放在 $group 或 $project 之后,已经改变了文档结构,无法再使用原始索引。
$group — 分组聚合
等价于 SQL 的 GROUP BY。_id 是分组键,可以是单字段、多字段或表达式。
// 按类别统计:总金额、平均金额、订单数
{
$group: {
_id: "$category", // 分组键
totalAmount: { $sum: "$amount" }, // SUM
avgAmount: { $avg: "$amount" }, // AVG
orderCount: { $count: {} }, // COUNT(5.0+)
maxAmount: { $max: "$amount" }, // MAX
minAmount: { $min: "$amount" }, // MIN
allProducts: { $push: "$product" }, // 收集所有值到数组
uniqueProducts: { $addToSet: "$product" }, // 收集去重值
firstOrder: { $first: "$orderId" }, // 每组第一条
lastOrder: { $last: "$orderId" } // 每组最后一条
}
}
// 多字段分组
{
$group: {
_id: { category: "$category", year: { $year: "$createdAt" } },
total: { $sum: "$amount" }
}
}
// _id: null 表示不分组,对所有文档聚合
{
$group: {
_id: null,
totalRevenue: { $sum: "$amount" },
totalOrders: { $count: {} }
}
}
$project — 投影/字段变换
等价于 SQL 的 SELECT。可以包含/排除字段、重命名、计算新字段。
{
$project: {
_id: 0, // 排除 _id
name: 1, // 包含 name
fullName: { $concat: ["$firstName", " ", "$lastName"] }, // 字符串拼接
discountedPrice: { $multiply: ["$price", 0.8] }, // 计算
year: { $year: "$createdAt" }, // 提取年份
status: { // 条件表达式
$cond: {
if: { $gte: ["$amount", 1000] },
then: "大额",
else: "普通"
}
}
}
}
$sort — 排序
{ $sort: { totalAmount: -1, name: 1 } } // 金额降序,名称升序
排序操作默认最多使用 100 MB 内存。超过时需要添加 { allowDiskUse: true } 参数允许使用磁盘临时文件。
$lookup — 关联查询
等价于 SQL 的 LEFT JOIN。跨集合关联。
// 基础用法:等值匹配
{
$lookup: {
from: "products", // 关联的集合
localField: "productId", // 当前集合的字段
foreignField: "_id", // 关联集合的字段
as: "productDetail" // 输出的数组字段名
}
}
// 结果:productDetail 是一个数组(即使只有一个匹配)
// 配合 $unwind 展开为单个对象
{ $lookup: { from: "products", localField: "productId", foreignField: "_id", as: "product" } },
{ $unwind: "$product" } // 数组 → 单个文档
// 高级用法:子管道(5.0+,更灵活)
{
$lookup: {
from: "orders",
let: { userId: "$_id" }, // 传递当前文档的变量
pipeline: [ // 在关联集合上执行子管道
{ $match: {
$expr: { $eq: ["$customerId", "$$userId"] } // $$ 引用 let 变量
}},
{ $match: { status: "paid" } },
{ $group: { _id: null, totalSpent: { $sum: "$amount" } } }
],
as: "orderStats"
}
}
$lookup 类似嵌套循环 JOIN,没有哈希 JOIN 或排序合并 JOIN 优化。对于大数据集,性能不如关系型数据库的 JOIN。如果频繁需要关联,应考虑将数据嵌入而非引用。确保关联字段有索引。
$unwind — 展开数组
将数组字段拆分为多个文档,每个文档包含数组中的一个元素。
// 原始文档
{ _id: 1, name: "Alice", tags: ["a", "b", "c"] }
// $unwind 后
{ $unwind: "$tags" }
// → { _id: 1, name: "Alice", tags: "a" }
// → { _id: 1, name: "Alice", tags: "b" }
// → { _id: 1, name: "Alice", tags: "c" }
// 保留空数组/缺失字段的文档
{
$unwind: {
path: "$tags",
preserveNullAndEmptyArrays: true // 默认 false(丢弃)
}
}
典型用途:先 $unwind 展开数组,再 $group 按数组元素统计。
set — 添加新字段
$addFields 和 $set(4.2+ 别名)向文档添加新字段或覆盖已有字段,保留其他所有字段:
{
$addFields: {
totalPrice: { $multiply: ["$price", "$quantity"] },
discounted: { $multiply: ["$price", "$quantity", 0.9] }
}
}
与 $project 的区别:$project 只输出指定的字段,$addFields 保留所有原字段。
$facet — 多管道分支
在一次聚合中执行 多条独立的管道,返回多个结果集。适合同时计算不同维度的统计。
db.orders.aggregate([
{
$facet: {
// 分支1:按状态统计
byStatus: [
{ $group: { _id: "$status", count: { $count: {} } } }
],
// 分支2:按月份统计
byMonth: [
{ $group: {
_id: { $month: "$createdAt" },
total: { $sum: "$amount" }
}},
{ $sort: { "_id": 1 } }
],
// 分支3:总计
summary: [
{ $group: {
_id: null,
totalOrders: { $count: {} },
totalRevenue: { $sum: "$amount" }
}}
]
}
}
]);
其他常用阶段
| 阶段 | 作用 | 等价 SQL |
|---|---|---|
$skip | 跳过文档 | OFFSET |
$limit | 限制数量 | LIMIT |
$count | 计数 | SELECT COUNT(*) |
$out | 输出到新集合 | CREATE TABLE ... AS SELECT |
$merge | 合并到已有集合 | INSERT ... ON CONFLICT UPDATE |
$bucket | 分桶统计 | CASE WHEN ... THEN |
$replaceRoot | 替换根文档 | — |
$unionWith | 合并另一个集合 | UNION ALL |
实战示例
示例1:电商销售分析
// 统计每个类别的月度销售额 Top 5
db.orders.aggregate([
// 1. 只看已支付订单
{ $match: { status: "paid", createdAt: { $gte: ISODate("2024-01-01") } } },
// 2. 展开订单项
{ $unwind: "$items" },
// 3. 按类别+月份分组统计
{ $group: {
_id: {
category: "$items.category",
month: { $dateToString: { format: "%Y-%m", date: "$createdAt" } }
},
revenue: { $sum: { $multiply: ["$items.price", "$items.qty"] } },
orderCount: { $count: {} }
}},
// 4. 按收入降序
{ $sort: { revenue: -1 } },
// 5. 整理输出格式
{ $project: {
_id: 0,
category: "$_id.category",
month: "$_id.month",
revenue: { $round: ["$revenue", 2] },
orderCount: 1
}}
]);
示例2:用户活跃度分析
// 统计用户留存:注册后 7 天内是否有活跃操作
db.users.aggregate([
// 关联活动日志
{ $lookup: {
from: "activities",
let: { uid: "$_id", regDate: "$registeredAt" },
pipeline: [
{ $match: {
$expr: {
$and: [
{ $eq: ["$userId", "$$uid"] },
{ $lte: [
"$timestamp",
{ $dateAdd: { startDate: "$$regDate", unit: "day", amount: 7 } }
]}
]
}
}},
{ $count: "activityCount" }
],
as: "weekActivity"
}},
// 判断是否留存
{ $addFields: {
isRetained: {
$cond: {
if: { $gt: [{ $size: "$weekActivity" }, 0] },
then: true,
else: false
}
}
}},
// 按注册月份和留存状态统计
{ $group: {
_id: {
month: { $dateToString: { format: "%Y-%m", date: "$registeredAt" } },
retained: "$isRetained"
},
count: { $count: {} }
}}
]);
聚合管道优化
| 优化手段 | 说明 |
|---|---|
$match 前置 | 放在管道最前面,利用索引过滤 |
$project 前置 | 尽早去除不需要的字段,减少内存消耗 |
| 索引覆盖 | $match 和 $sort 使用的字段要有索引 |
$limit + $sort | MongoDB 会自动合并为内存内的 Top-N 排序 |
避免 $lookup | 尽量通过 Schema 设计(嵌入)避免跨集合关联 |
allowDiskUse | 大数据量聚合时允许使用磁盘临时存储 |
常见面试问题
Q1: 聚合管道和 MapReduce 有什么区别?
答案:
| 维度 | 聚合管道 | MapReduce |
|---|---|---|
| 性能 | 使用 C++ 实现,性能更好 | JavaScript 引擎执行,性能差 |
| 易用性 | 声明式阶段组合,直观 | 需要编写 map/reduce 函数 |
| 灵活性 | 内置丰富操作符 | 可以写任意 JS 逻辑 |
| 索引利用 | $match/$sort 可用索引 | 不能使用索引 |
| 推荐程度 | 官方推荐 | 5.0+ 已废弃 |
MongoDB 5.0 以后已经完全推荐使用聚合管道,MapReduce 已被标记为废弃。
Q2: $lookup 的性能如何?如何优化?
答案:
$lookup 的实现类似嵌套循环 JOIN,性能受限于:
- 外层文档数量 × 内层查询次数
- 没有哈希 JOIN 等高级优化
优化方法:
- 确保
foreignField有索引(最关键) $match前置,减少需要关联的文档数量- 使用子管道(pipeline 形式)在关联时就过滤和投影,减少返回数据量
- 考虑将频繁关联的数据嵌入到文档中,从根本上消除
$lookup
Q3: 聚合管道的内存限制是多少?如何突破?
答案:
- 单个管道阶段最多使用 100 MB 内存
- 超过时会报错:
"Sort exceeded memory limit" - 设置
{ allowDiskUse: true }允许使用磁盘临时文件:
db.orders.aggregate([...], { allowDiskUse: true });
但使用磁盘会显著降低性能,更好的做法是通过 $match 提前过滤、$project 减少字段来控制内存用量。
Q4: 如何用聚合管道实现 SQL 的 HAVING?
答案:
SQL:
SELECT category, SUM(amount) as total
FROM orders
GROUP BY category
HAVING SUM(amount) > 10000
MongoDB 等价写法 — 在 $group 后再加一个 $match:
db.orders.aggregate([
{ $group: { _id: "$category", total: { $sum: "$amount" } } },
{ $match: { total: { $gt: 10000 } } } // 等价于 HAVING
]);