On this page本页内容
Aggregation Pipeline as Alternative聚合管道作为替代方案
Aggregation pipeline provides better performance and a more coherent interface than map-reduce.聚合管道提供了比map-reduce更好的性能和更一致的接口。
Various map-reduce operations can be rewritten using aggregation pipeline operators, such as 可以使用聚合管道运算符重写各种map-reduce操作,例如$group
, $merge
, etc.$group
、$merge
等。For map-reduce operations that require custom functionality, MongoDB provides the 对于需要自定义功能的map reduce操作,MongoDB从4.4版开始提供$accumulator
and $function
aggregation operators starting in version 4.4.$accumulator
和$function
聚合操作符。
The example below includes aggregation pipeline alternative without requiring custom function.下面的示例包括不需要自定义函数的聚合管道替代方案。
For examples using the custom aggregation function, see Map-Reduce to Aggregation Pipeline.有关使用自定义聚合函数的示例,请参见Map-Reduce到聚合管道。
To perform map-reduce operations, MongoDB provides the 为了执行map-reduce操作,MongoDB提供了mapReduce
command and, in the mongo
shell, the db.collection.mapReduce()
wrapper method.mapReduce
命令,并且在mongo
shell中提供db.collection.mapReduce()
包装方法。
If the map-reduce data set is constantly growing, you may want to perform an incremental map-reduce rather than performing the map-reduce operation over the entire data set each time.如果map-reduce数据集不断增长,则可能需要执行增量map-reduce,而不是每次对整个数据集执行map-reduce操作。
To perform incremental map-reduce:要执行增量映射缩减,请执行以下操作:
query
parameter that specifies conditions that match only the new documents.query
参数。out
parameter that specifies the reduce
action to merge the new results into the existing output collection.out
参数,指定将新结果合并到现有输出集合的reduce
操作。Consider the following example where you schedule a map-reduce operation on a 考虑以下示例,其中您计划在每天结束时对usersessions
collection to run at the end of each day.usersessions
集合运行map-reduce操作。
The usersessions
collection contains documents that log users’ sessions each day, for example:usersessions
集合包含记录用户每天会话的文档,例如:
Run the first map-reduce operation as follows:运行第一个map-reduce操作,如下所示:
userid
to an object that contains the fields total_time
, count
, and avg_time
:userid
映射到包含字段total_time
、count
和avg_time
的对象:
key
and values
to calculate the total time and the count.key
和value
定义相应的reduce函数,以计算总时间和计数。key
corresponds to the userid
, and the values
is an array whose elements corresponds to the individual objects mapped to the userid
in the mapFunction
.key
对应于userid
,values
是一个数组,其元素对应于mapFunction
中映射到userid
的各个对象。
key
and reducedValue
.key
和reducedValue
定义finalize函数。reducedValue
document to add another field average
and returns the modified document.reducedValue
文档以添加另一个字段average
,并返回修改后的文档。
usersessions
collection using the mapFunction
, the reduceFunction
, and the finalizeFunction
functions. Output the results to a collection session_stats
. If the session_stats
collection already exists, the operation will replace the contents:
session_stats
collection to verify the results:
The operation returns the following document:操作返回以下文档:
Later, as the usersessions
collection grows, you can run additional map-reduce operations. For example, add new documents to the usersessions
collection:
At the end of the day, perform incremental map-reduce on the usersessions
collection, but use the query
field to select only the new documents. Output the results to the collection session_stats
, but reduce
the contents with the results of the incremental map-reduce:
Query the session_stats
collection to verify the results:
The operation returns the following document:操作返回以下文档:
Prereq: Set up the collection to its original state:前提条件:将集合设置为其原始状态:
Using the available aggregation pipeline operators, you can rewrite the map-reduce example without defining custom functions:使用可用的聚合管道操作符,可以重写map reduce示例,而无需定义自定义函数:
$group
groups by the userid
and calculates:
total_time
using the $sum
operatorcount
using the $sum
operatoravg_time
using the $avg
operatorThe operation returns the following documents:该操作返回以下文档:
$project
stage reshapes the output document to mirror the map-reduce’s output to have two fields _id
and value
. The stage is optional if you do not need to mirror the _id
and value
structure.
$merge
stage outputs the results to a session_stats_agg
collection. If an existing document has the same _id
as the new result, the operation applies the specified pipeline to calculate the total_time, count, and avg_time from the result and the existing document. If there is no existing document with the same _id
in the session_stats_agg
, the operation inserts the document.session_stats_agg
collection to verify the results:
The operation returns the following document:操作返回以下文档:
usersessions
collection:
$match
stage at the start of the pipeline to specify the date filter:
session_stats_agg
collection to verify the results:
The operation returns the following document:操作返回以下文档:
$match
date condition each time you run, you can define wrap the aggregation in a helper function:
Then, to run, you would just pass in the start date to the 然后,要运行,只需将开始日期传递给updateSessionStats()
function:updateSessionStats()
函数: