On this page本页内容
Aggregation Pipeline as Alternative聚合管道作为替代方案
Aggregation pipeline provides better performance and a more coherent interface than map-reduce.聚合管道提供了比map-reduce更好的性能和更一致的接口。
Various map-reduce operations can be rewritten using aggregation pipeline operators, such as 可以使用聚合管道运算符重写各种map-reduce操作,例如$group, $merge, etc.$group、$merge等。For map-reduce operations that require custom functionality, MongoDB provides the 对于需要自定义功能的map reduce操作,MongoDB从4.4版开始提供$accumulator and $function aggregation operators starting in version 4.4.$accumulator和$function聚合操作符。
The example below includes aggregation pipeline alternative without requiring custom function.下面的示例包括不需要自定义函数的聚合管道替代方案。
For examples using the custom aggregation function, see Map-Reduce to Aggregation Pipeline.有关使用自定义聚合函数的示例,请参见Map-Reduce到聚合管道。
To perform map-reduce operations, MongoDB provides the 为了执行map-reduce操作,MongoDB提供了mapReduce command and, in the mongo shell, the db.collection.mapReduce() wrapper method.mapReduce命令,并且在mongoshell中提供db.collection.mapReduce()包装方法。
If the map-reduce data set is constantly growing, you may want to perform an incremental map-reduce rather than performing the map-reduce operation over the entire data set each time.如果map-reduce数据集不断增长,则可能需要执行增量map-reduce,而不是每次对整个数据集执行map-reduce操作。
To perform incremental map-reduce:要执行增量映射缩减,请执行以下操作:
query parameter that specifies conditions that match only the new documents.query参数。out parameter that specifies the reduce action to merge the new results into the existing output collection.out参数,指定将新结果合并到现有输出集合的reduce操作。Consider the following example where you schedule a map-reduce operation on a 考虑以下示例,其中您计划在每天结束时对usersessions collection to run at the end of each day.usersessions集合运行map-reduce操作。
The usersessions collection contains documents that log users’ sessions each day, for example:usersessions集合包含记录用户每天会话的文档,例如:
Run the first map-reduce operation as follows:运行第一个map-reduce操作,如下所示:
userid to an object that contains the fields total_time, count, and avg_time:userid映射到包含字段total_time、count和avg_time的对象:
key and values to calculate the total time and the count.key和value定义相应的reduce函数,以计算总时间和计数。key corresponds to the userid, and the values is an array whose elements corresponds to the individual objects mapped to the userid in the mapFunction.key对应于userid,values是一个数组,其元素对应于mapFunction中映射到userid的各个对象。
key and reducedValue.key和reducedValue定义finalize函数。reducedValue document to add another field average and returns the modified document.reducedValue文档以添加另一个字段average,并返回修改后的文档。
usersessions collection using the mapFunction, the reduceFunction, and the finalizeFunction functions. Output the results to a collection session_stats. If the session_stats collection already exists, the operation will replace the contents:
session_stats collection to verify the results:
The operation returns the following document:操作返回以下文档:
Later, as the usersessions collection grows, you can run additional map-reduce operations. For example, add new documents to the usersessions collection:
At the end of the day, perform incremental map-reduce on the usersessions collection, but use the query field to select only the new documents. Output the results to the collection session_stats, but reduce the contents with the results of the incremental map-reduce:
Query the session_stats collection to verify the results:
The operation returns the following document:操作返回以下文档:
Prereq: Set up the collection to its original state:前提条件:将集合设置为其原始状态:
Using the available aggregation pipeline operators, you can rewrite the map-reduce example without defining custom functions:使用可用的聚合管道操作符,可以重写map reduce示例,而无需定义自定义函数:
$group groups by the userid and calculates:
total_time using the $sum operatorcount using the $sum operatoravg_time using the $avg operatorThe operation returns the following documents:该操作返回以下文档:
$project stage reshapes the output document to mirror the map-reduce’s output to have two fields _id and value. The stage is optional if you do not need to mirror the _id and value structure.
$merge stage outputs the results to a session_stats_agg collection. If an existing document has the same _id as the new result, the operation applies the specified pipeline to calculate the total_time, count, and avg_time from the result and the existing document. If there is no existing document with the same _id in the session_stats_agg, the operation inserts the document.session_stats_agg collection to verify the results:
The operation returns the following document:操作返回以下文档:
usersessions collection:
$match stage at the start of the pipeline to specify the date filter:
session_stats_agg collection to verify the results:
The operation returns the following document:操作返回以下文档:
$match date condition each time you run, you can define wrap the aggregation in a helper function:
Then, to run, you would just pass in the start date to the 然后,要运行,只需将开始日期传递给updateSessionStats() function:updateSessionStats()函数: