Perform Incremental Map-Reduce执行增量映射缩减

On this page本页内容

Aggregation Pipeline as Alternative聚合管道作为替代方案

Aggregation pipeline provides better performance and a more coherent interface than map-reduce.聚合管道提供了比map-reduce更好的性能和更一致的接口。

Various map-reduce operations can be rewritten using aggregation pipeline operators, such as $group, $merge, etc.可以使用聚合管道运算符重写各种map-reduce操作,例如$group$merge等。For map-reduce operations that require custom functionality, MongoDB provides the $accumulator and $function aggregation operators starting in version 4.4.对于需要自定义功能的map reduce操作,MongoDB从4.4版开始提供$accumulator$function聚合操作符。

The example below includes aggregation pipeline alternative without requiring custom function.下面的示例包括不需要自定义函数的聚合管道替代方案。

For examples using the custom aggregation function, see Map-Reduce to Aggregation Pipeline.有关使用自定义聚合函数的示例,请参见Map-Reduce到聚合管道

To perform map-reduce operations, MongoDB provides the mapReduce command and, in the mongo shell, the db.collection.mapReduce() wrapper method.为了执行map-reduce操作,MongoDB提供了mapReduce命令,并且在mongoshell中提供db.collection.mapReduce()包装方法。

If the map-reduce data set is constantly growing, you may want to perform an incremental map-reduce rather than performing the map-reduce operation over the entire data set each time.如果map-reduce数据集不断增长,则可能需要执行增量map-reduce,而不是每次对整个数据集执行map-reduce操作。

To perform incremental map-reduce:要执行增量映射缩减,请执行以下操作:

  1. Run a map-reduce job over the current collection and output the result to a separate collection.在当前集合上运行map reduce作业,并将结果输出到单独的集合。
  2. When you have more data to process, run subsequent map-reduce job with:当有更多数据要处理时,请使用以下命令运行后续的map reduce作业:
    • the query parameter that specifies conditions that match only the new documents.指定仅与新文档匹配的条件的query参数。
    • the out parameter that specifies the reduce action to merge the new results into the existing output collection.out参数,指定将新结果合并到现有输出集合的reduce操作。

Consider the following example where you schedule a map-reduce operation on a usersessions collection to run at the end of each day.考虑以下示例,其中您计划在每天结束时对usersessions集合运行map-reduce操作。

Data Setup数据设置

The usersessions collection contains documents that log users’ sessions each day, for example:usersessions集合包含记录用户每天会话的文档,例如:

db.usersessions.insertMany([
   { userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 },
   { userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 },
   { userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 },
   { userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 },
   { userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 },
   { userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 },
   { userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 },
   { userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 }
])

Initial Map-Reduce of Current Collection当前集合的初始映射

Run the first map-reduce operation as follows:运行第一个map-reduce操作,如下所示:

  1. Define the map function that maps the userid to an object that contains the fields total_time, count, and avg_time:定义映射函数,将userid映射到包含字段total_timecountavg_time的对象:

    var mapFunction = function() {
        var key = this.userid;
        var value = { total_time: this.length, count: 1, avg_time: 0 };
    
        emit( key, value );
    };
  2. Define the corresponding reduce function with two arguments key and values to calculate the total time and the count.使用两个参数的keyvalue定义相应的reduce函数,以计算总时间和计数。The key corresponds to the userid, and the values is an array whose elements corresponds to the individual objects mapped to the userid in the mapFunction.key对应于useridvalues是一个数组,其元素对应于mapFunction中映射到userid的各个对象。

    var reduceFunction = function(key, values) {
    
       var reducedObject = { total_time: 0, count:0, avg_time:0 };
    
       values.forEach(function(value) {
          reducedObject.total_time += value.total_time;
          reducedObject.count += value.count;
       });
    
       return reducedObject;
    };
  3. Define the finalize function with two arguments key and reducedValue.使用两个参数keyreducedValue定义finalize函数。The function modifies the reducedValue document to add another field average and returns the modified document.函数修改reducedValue文档以添加另一个字段average,并返回修改后的文档。

    var finalizeFunction = function(key, reducedValue) {
    
       if (reducedValue.count > 0)
          reducedValue.avg_time = reducedValue.total_time / reducedValue.count;
    
       return reducedValue;
    };
  4. Perform map-reduce on the usersessions collection using the mapFunction, the reduceFunction, and the finalizeFunction functions. Output the results to a collection session_stats. If the session_stats collection already exists, the operation will replace the contents:

    db.usersessions.mapReduce(
       mapFunction,
       reduceFunction,
       {
         out: "session_stats",
         finalize: finalizeFunction
       }
    )
  5. Query the session_stats collection to verify the results:

    db.session_stats.find().sort( { _id: 1 } )

    The operation returns the following document:操作返回以下文档:

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }

Subsequent Incremental Map-Reduce后续增量Map Reduce

Later, as the usersessions collection grows, you can run additional map-reduce operations. For example, add new documents to the usersessions collection:

db.usersessions.insertMany([
   { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 },
   { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 },
   { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 },
   { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 }
])

At the end of the day, perform incremental map-reduce on the usersessions collection, but use the query field to select only the new documents. Output the results to the collection session_stats, but reduce the contents with the results of the incremental map-reduce:

db.usersessions.mapReduce(
   mapFunction,
   reduceFunction,
   {
     query: { ts: { $gte: ISODate('2020-03-05 00:00:00') } },
     out: { reduce: "session_stats" },
     finalize: finalizeFunction
   }
);

Query the session_stats collection to verify the results:

db.session_stats.find().sort( { _id: 1 } )

The operation returns the following document:操作返回以下文档:

{ "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } }
{ "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } }
{ "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } }
{ "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }

Aggregation Alternative聚合替代方案

Prereq: Set up the collection to its original state:前提条件:将集合设置为其原始状态:

db.usersessions.drop();

db.usersessions.insertMany([
   { userid: "a", start: ISODate('2020-03-03 14:17:00'), length: 95 },
   { userid: "b", start: ISODate('2020-03-03 14:23:00'), length: 110 },
   { userid: "c", start: ISODate('2020-03-03 15:02:00'), length: 120 },
   { userid: "d", start: ISODate('2020-03-03 16:45:00'), length: 45 },
   { userid: "a", start: ISODate('2020-03-04 11:05:00'), length: 105 },
   { userid: "b", start: ISODate('2020-03-04 13:14:00'), length: 120 },
   { userid: "c", start: ISODate('2020-03-04 17:00:00'), length: 130 },
   { userid: "d", start: ISODate('2020-03-04 15:37:00'), length: 65 }
])

Using the available aggregation pipeline operators, you can rewrite the map-reduce example without defining custom functions:使用可用的聚合管道操作符,可以重写map reduce示例,而无需定义自定义函数:

db.usersessions.aggregate([
   { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
   { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
   { $merge: {
      into: "session_stats_agg",
      whenMatched: [ { $set: {
         "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
         "value.count": { $add: [ "$value.count", "$$new.value.count" ] },
         "value.avg": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] },  { $add: [ "$value.count", "$$new.value.count" ] } ] }
      } } ],
      whenNotMatched: "insert"
   }}
])
  1. The $group groups by the userid and calculates:

    • The total_time using the $sum operator
    • The count using the $sum operator
    • The avg_time using the $avg operator

    The operation returns the following documents:该操作返回以下文档:

    { "_id" : "c", "total_time" : 250, "count" : 2, "avg_time" : 125 }
    { "_id" : "d", "total_time" : 110, "count" : 2, "avg_time" : 55 }
    { "_id" : "a", "total_time" : 200, "count" : 2, "avg_time" : 100 }
    { "_id" : "b", "total_time" : 230, "count" : 2, "avg_time" : 115 }
  2. The $project stage reshapes the output document to mirror the map-reduce’s output to have two fields _id and value. The stage is optional if you do not need to mirror the _id and value structure.

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
  3. The $merge stage outputs the results to a session_stats_agg collection. If an existing document has the same _id as the new result, the operation applies the specified pipeline to calculate the total_time, count, and avg_time from the result and the existing document. If there is no existing document with the same _id in the session_stats_agg, the operation inserts the document.
  4. Query the session_stats_agg collection to verify the results:

    db.session_stats_agg.find().sort( { _id: 1 } )

    The operation returns the following document:操作返回以下文档:

    { "_id" : "a", "value" : { "total_time" : 200, "count" : 2, "avg_time" : 100 } }
    { "_id" : "b", "value" : { "total_time" : 230, "count" : 2, "avg_time" : 115 } }
    { "_id" : "c", "value" : { "total_time" : 250, "count" : 2, "avg_time" : 125 } }
    { "_id" : "d", "value" : { "total_time" : 110, "count" : 2, "avg_time" : 55 } }
  5. Add new documents to the usersessions collection:

    db.usersessions.insertMany([
       { userid: "a", ts: ISODate('2020-03-05 14:17:00'), length: 130 },
       { userid: "b", ts: ISODate('2020-03-05 14:23:00'), length: 40 },
       { userid: "c", ts: ISODate('2020-03-05 15:02:00'), length: 110 },
       { userid: "d", ts: ISODate('2020-03-05 16:45:00'), length: 100 }
    ])
  6. Add a $match stage at the start of the pipeline to specify the date filter:

    db.usersessions.aggregate([
       { $match: { ts: { $gte: ISODate('2020-03-05 00:00:00') } } },
       { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
       { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
       { $merge: {
          into: "session_stats_agg",
          whenMatched: [ { $set: {
             "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
             "value.count": { $add: [ "$value.count", "$$new.value.count" ] },
             "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] },  { $add: [ "$value.count", "$$new.value.count" ] } ] }
          } } ],
          whenNotMatched: "insert"
       }}
    ])
  7. Query the session_stats_agg collection to verify the results:

    db.session_stats_agg.find().sort( { _id: 1 } )

    The operation returns the following document:操作返回以下文档:

    { "_id" : "a", "value" : { "total_time" : 330, "count" : 3, "avg_time" : 110 } }
    { "_id" : "b", "value" : { "total_time" : 270, "count" : 3, "avg_time" : 90 } }
    { "_id" : "c", "value" : { "total_time" : 360, "count" : 3, "avg_time" : 120 } }
    { "_id" : "d", "value" : { "total_time" : 210, "count" : 3, "avg_time" : 70 } }
  8. Optional.可选。To avoid having to modify the aggregation pipeline’s $match date condition each time you run, you can define wrap the aggregation in a helper function:

    updateSessionStats = function(startDate) {
       db.usersessions.aggregate([
          { $match: { ts: { $gte: startDate } } },
          { $group: { _id: "$userid", total_time: { $sum: "$length" }, count: { $sum: 1 }, avg_time: { $avg: "$length" } } },
          { $project: { value: { total_time: "$total_time", count: "$count", avg_time: "$avg_time" } } },
          { $merge: {
             into: "session_stats_agg",
             whenMatched: [ { $set: {
                "value.total_time": { $add: [ "$value.total_time", "$$new.value.total_time" ] },
                "value.count": { $add: [ "$value.count", "$$new.value.count" ] },
                "value.avg_time": { $divide: [ { $add: [ "$value.total_time", "$$new.value.total_time" ] },  { $add: [ "$value.count", "$$new.value.count" ] } ] }
             } } ],
             whenNotMatched: "insert"
          }}
       ]);
    };

    Then, to run, you would just pass in the start date to the updateSessionStats() function:然后,要运行,只需将开始日期传递给updateSessionStats()函数:

    updateSessionStats(ISODate('2020-03-05 00:00:00'))