Map-Reduce Examples映射-减少示例

On this page本页内容

In the mongo shell, the db.collection.mapReduce() method is a wrapper around the mapReduce command. mongo shell中,db.collection.mapReduce()方法是mapReduce命令的包装器。The following examples use the db.collection.mapReduce() method:以下示例使用db.collection.mapReduce()方法:

Aggregation Pipeline as Alternative聚合管道作为替代方案

Aggregation pipeline provides better performance and a simpler interface than map-reduce, and map-reduce expressions can be rewritten using aggregation pipeline operators such as $group, $merge, and others.聚合管道提供了比map-reduce更好的性能和更简单的接口,并且可以使用聚合管道运算符(如$group$merge等)重写map-reduce表达式。

For map-reduce expressions that require custom functionality, MongoDB provides the $accumulator and $function aggregation operators starting in version 4.4. These operators provide the ability to define custom aggregation expressions in JavaScript.对于需要自定义功能的map-reduce表达式,MongoDB从4.4版开始提供$accumulator$function聚合运算符。这些运算符提供了在JavaScript中定义自定义聚合表达式的能力。

The examples in this section include aggregation pipeline alternatives without custom aggregation expressions. 本节中的示例包括没有自定义聚合表达式的聚合管道备选方案。For alternatives that use custom expressions, see Map-Reduce to Aggregation Pipeline Translation Examples.有关使用自定义表达式的替代方案,请参阅对聚合管道转换Map-Reduce示例

Create a sample collection orders with these documents:使用以下文档创建orders示例:

db.orders.insertMany([
   { _id: 1, cust_id: "Ant O. Knee", ord_date: new Date("2020-03-01"), price: 25, items: [ { sku: "oranges", qty: 5, price: 2.5 }, { sku: "apples", qty: 5, price: 2.5 } ], status: "A" },
   { _id: 2, cust_id: "Ant O. Knee", ord_date: new Date("2020-03-08"), price: 70, items: [ { sku: "oranges", qty: 8, price: 2.5 }, { sku: "chocolates", qty: 5, price: 10 } ], status: "A" },
   { _id: 3, cust_id: "Busby Bee", ord_date: new Date("2020-03-08"), price: 50, items: [ { sku: "oranges", qty: 10, price: 2.5 }, { sku: "pears", qty: 10, price: 2.5 } ], status: "A" },
   { _id: 4, cust_id: "Busby Bee", ord_date: new Date("2020-03-18"), price: 25, items: [ { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" },
   { _id: 5, cust_id: "Busby Bee", ord_date: new Date("2020-03-19"), price: 50, items: [ { sku: "chocolates", qty: 5, price: 10 } ], status: "A"},
   { _id: 6, cust_id: "Cam Elot", ord_date: new Date("2020-03-19"), price: 35, items: [ { sku: "carrots", qty: 10, price: 1.0 }, { sku: "apples", qty: 10, price: 2.5 } ], status: "A" },
   { _id: 7, cust_id: "Cam Elot", ord_date: new Date("2020-03-20"), price: 25, items: [ { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" },
   { _id: 8, cust_id: "Don Quis", ord_date: new Date("2020-03-20"), price: 75, items: [ { sku: "chocolates", qty: 5, price: 10 }, { sku: "apples", qty: 10, price: 2.5 } ], status: "A" },
   { _id: 9, cust_id: "Don Quis", ord_date: new Date("2020-03-20"), price: 55, items: [ { sku: "carrots", qty: 5, price: 1.0 }, { sku: "apples", qty: 10, price: 2.5 }, { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" },
   { _id: 10, cust_id: "Don Quis", ord_date: new Date("2020-03-23"), price: 25, items: [ { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" }
])

Return the Total Price Per Customer返回每个客户的总价

Perform the map-reduce operation on the orders collection to group by the cust_id, and calculate the sum of the price for each cust_id:orders集合执行map-reduce操作以按cust_id分组,并计算每个cust_idprice总和:

  1. Define the map function to process each input document:定义映射函数以处理每个输入文档:

    • In the function, this refers to the document that the map-reduce operation is processing.在函数中,this指的是map-reduce操作正在处理的文档。
    • The function maps the price to the cust_id for each document and emits the cust_id and price.该函数将price映射到每个文档的cust_id,并发出cust_id和价格。
    var mapFunction1 = function() {
       emit(this.cust_id, this.price);
    };
  2. Define the corresponding reduce function with two arguments keyCustId and valuesPrices:使用两个参数keyCustIdvaluesprice定义相应的reduce函数:

    • The valuesPrices is an array whose elements are the price values emitted by the map function and grouped by keyCustId.valuesPrices是一个数组,其元素是map函数发出的价格值,并按keyCustId分组。
    • The function reduces the valuesPrice array to the sum of its elements.该函数将valuesPrice数组减少为其元素之和。
    var reduceFunction1 = function(keyCustId, valuesPrices) {
       return Array.sum(valuesPrices);
    };
  3. Perform map-reduce on all documents in the orders collection using the mapFunction1 map function and the reduceFunction1 reduce function:使用mapFunction1映射函数和reduceFunction1缩减函数对orders集合中的所有文档执行map-reduce:

    db.orders.mapReduce(
       mapFunction1,
       reduceFunction1,
       { out: "map_reduce_example" }
    )

    This operation outputs the results to a collection named map_reduce_example. 此操作将结果输出到名为map_reduce_example的集合。If the map_reduce_example collection already exists, the operation will replace the contents with the results of this map-reduce operation.如果map_reduce_example集合已存在,则该操作将使用此map-reduce操作的结果替换内容。

  4. Query the map_reduce_example collection to verify the results:查询map_reduce_example集合以验证结果:

    db.map_reduce_example.find().sort( { _id: 1 } )

    The operation returns these documents:该操作将返回以下文档:

    { "_id" : "Ant O. Knee", "value" : 95 }
    { "_id" : "Busby Bee", "value" : 125 }
    { "_id" : "Cam Elot", "value" : 60 }
    { "_id" : "Don Quis", "value" : 155 }

Aggregation Alternative聚合替代方案

Using the available aggregation pipeline operators, you can rewrite the map-reduce operation without defining custom functions:使用可用的聚合管道操作符,您可以重写map-reduce操作,而无需定义自定义函数:

db.orders.aggregate([
   { $group: { _id: "$cust_id", value: { $sum: "$price" } } },
   { $out: "agg_alternative_1" }
])
  1. The $group stage groups by the cust_id and calculates the value field using $sum. $group阶段按cust_id分组,并使用$sum计算value字段。The value field contains the total price for each cust_id.value字段包含每个cust_id的总价。

    This stage outputs these documents to the next stage:本阶段将这些文件输出到下一阶段:

    { "_id" : "Don Quis", "value" : 155 }
    { "_id" : "Ant O. Knee", "value" : 95 }
    { "_id" : "Cam Elot", "value" : 60 }
    { "_id" : "Busby Bee", "value" : 125 }
  2. Then, the $out writes the output to the collection agg_alternative_1. 然后,$out将输出写入集合agg_alternative_1Alternatively, you could use $merge instead of $out.或者,您可以使用$merge而不是$out
  3. Query the agg_alternative_1 collection to verify the results:查询agg_alternative_1集合以验证结果:

    db.agg_alternative_1.find().sort( { _id: 1 } )

    The operation returns these documents:该操作将返回以下文档:

    { "_id" : "Ant O. Knee", "value" : 95 }
    { "_id" : "Busby Bee", "value" : 125 }
    { "_id" : "Cam Elot", "value" : 60 }
    { "_id" : "Don Quis", "value" : 155 }

See also参阅

For an alternative that uses custom aggregation expressions, see Map-Reduce to Aggregation Pipeline Translation Examples.有关使用自定义聚合表达式的替代方法,请参阅映射-减少到聚合管道转换示例

Calculate Order and Total Quantity with Average Quantity Per Item使用每件商品的平均数量计算订单和总数量

In the following example, you will see a map-reduce operation on the orders collection for all documents that have an ord_date value greater than or equal to 2020-03-01.在以下示例中,您将看到orders集合上ord_date值大于或等于2020-03-01的所有文档的的map-reduce操作。

The operation in the example:示例中的操作如下:

  1. Groups by the item.sku field, and calculates the number of orders and the total quantity ordered for each sku.item.sku字段分组,并计算每个sku的订单数量和订购总量。
  2. Calculates the average quantity per order for each sku value and merges the results into the output collection.计算每个sku值的每个订单的平均数量,并将结果合并到输出集合中。

When merging results, if an existing document has the same key as the new result, the operation overwrites the existing document. 合并结果时,如果现有文档与新结果具有相同的键,则该操作将覆盖现有文档。If there is no existing document with the same key, the operation inserts the document.如果没有具有相同密钥的现有文档,则操作将插入该文档。

Example steps:示例步骤:

  1. Define the map function to process each input document:定义映射函数以处理每个输入文档:

    • In the function, this refers to the document that the map-reduce operation is processing.在函数中,this引用正在被处理的映射-减少操作的文档。
    • For each item, the function associates the sku with a new object value that contains the count of 1 and the item qty for the order and emits the sku (stored in the key) and the value.对于每个项目,函数将sku与一个新的对象value关联,该对象value包含订单数目count: 1和项目数量qty,并发出sku(存储在key中)和value
     var mapFunction2 = function() {
        for (var idx = 0; idx < this.items.length; idx++) {
           var key = this.items[idx].sku;
           var value = { count: 1, qty: this.items[idx].qty };
    
           emit(key, value);
        }
    };
  2. Define the corresponding reduce function with two arguments keySKU and countObjVals:使用两个参数keySKUcountObjVals定义相应的reduce函数:

    • countObjVals is an array whose elements are the objects mapped to the grouped keySKU values passed by map function to the reducer function.是一个数组,其元素是映射到由map函数传递给reducer函数的分组keySKU值的对象。
    • The function reduces the countObjVals array to a single object reducedValue that contains the count and the qty fields.该函数将countObjVals数组缩减为单个对象reducedValue,其中包含countqty字段。
    • In reducedVal, the count field contains the sum of the count fields from the individual array elements, and the qty field contains the sum of the qty fields from the individual array elements.reducedVal中,count字段包含单个数组元素的count字段之和,qty字段包含单个数组元素的qty字段之和。
    var reduceFunction2 = function(keySKU, countObjVals) {
       reducedVal = { count: 0, qty: 0 };
    
       for (var idx = 0; idx < countObjVals.length; idx++) {
           reducedVal.count += countObjVals[idx].count;
           reducedVal.qty += countObjVals[idx].qty;
       }
    
       return reducedVal;
    };
  3. Define a finalize function with two arguments key and reducedVal. 使用两个参数keyreducedVal定义finalize函数。The function modifies the reducedVal object to add a computed field named avg and returns the modified object:函数修改reducedVal对象以添加名为avg的计算字段,并返回修改后的对象:

    var finalizeFunction2 = function (key, reducedVal) {
      reducedVal.avg = reducedVal.qty/reducedVal.count;
      return reducedVal;
    };
  4. Perform the map-reduce operation on the orders collection using the mapFunction2, reduceFunction2, and finalizeFunction2 functions:使用mapFunction2reduceFunction2和finalizeFunction2函数对orders集合执行map reduce操作:

    db.orders.mapReduce(
       mapFunction2,
       reduceFunction2,
       {
         out: { merge: "map_reduce_example2" },
         query: { ord_date: { $gte: new Date("2020-03-01") } },
         finalize: finalizeFunction2
       }
     );

    This operation uses the query field to select only those documents with ord_date greater than or equal to new Date("2020-03-01"). 该操作使用query字段只选择ord_date大于或等于new Date("2020-03-01")的文档。Then it outputs the results to a collection map_reduce_example2.然后,它将结果输出到集合map_reduce_example2

    If the map_reduce_example2 collection already exists, the operation will merge the existing contents with the results of this map-reduce operation. That is, if an existing document has the same key as the new result, the operation overwrites the existing document. If there is no existing document with the same key, the operation inserts the document.

  5. Query the map_reduce_example2 collection to verify the results:

    db.map_reduce_example2.find().sort( { _id: 1 } )

    The operation returns these documents:

    { "_id" : "apples", "value" : { "count" : 4, "qty" : 35, "avg" : 8.75 } }
    { "_id" : "carrots", "value" : { "count" : 2, "qty" : 15, "avg" : 7.5 } }
    { "_id" : "chocolates", "value" : { "count" : 3, "qty" : 15, "avg" : 5 } }
    { "_id" : "oranges", "value" : { "count" : 7, "qty" : 63, "avg" : 9 } }
    { "_id" : "pears", "value" : { "count" : 1, "qty" : 10, "avg" : 10 } }

Aggregation Alternative

Using the available aggregation pipeline operators, you can rewrite the map-reduce operation without defining custom functions:

db.orders.aggregate( [
   { $match: { ord_date: { $gte: new Date("2020-03-01") } } },
   { $unwind: "$items" },
   { $group: { _id: "$items.sku", qty: { $sum: "$items.qty" }, orders_ids: { $addToSet: "$_id" } }  },
   { $project: { value: { count: { $size: "$orders_ids" }, qty: "$qty", avg: { $divide: [ "$qty", { $size: "$orders_ids" } ] } } } },
   { $merge: { into: "agg_alternative_3", on: "_id", whenMatched: "replace",  whenNotMatched: "insert" } }
] )
  1. The $match stage selects only those documents with ord_date greater than or equal to new Date("2020-03-01").
  2. The $unwinds stage breaks down the document by the items array field to output a document for each array element. For example:例如:

    { "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 5, "price" : 2.5 }, "status" : "A" }
    { "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "apples", "qty" : 5, "price" : 2.5 }, "status" : "A" }
    { "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "oranges", "qty" : 8, "price" : 2.5 }, "status" : "A" }
    { "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" }
    { "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" }
    { "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "pears", "qty" : 10, "price" : 2.5 }, "status" : "A" }
    { "_id" : 4, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-18T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" }
    { "_id" : 5, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-19T00:00:00Z"), "price" : 50, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" }
    ...
  3. The $group stage groups by the items.sku, calculating for each sku:

    • The qty field. The qty field contains the total qty ordered per each items.sku using $sum.
    • The orders_ids array. The orders_ids field contains an array of distinct order _id’s for the items.sku using $addToSet.
    { "_id" : "chocolates", "qty" : 15, "orders_ids" : [ 2, 5, 8 ] }
    { "_id" : "oranges", "qty" : 63, "orders_ids" : [ 4, 7, 3, 2, 9, 1, 10 ] }
    { "_id" : "carrots", "qty" : 15, "orders_ids" : [ 6, 9 ] }
    { "_id" : "apples", "qty" : 35, "orders_ids" : [ 9, 8, 1, 6 ] }
    { "_id" : "pears", "qty" : 10, "orders_ids" : [ 3 ] }
  4. The $project stage reshapes the output document to mirror the map-reduce’s output to have two fields _id and value. The $project sets:

    • the value.count to the size of the orders_ids array using $size.
    • the value.qty to the qty field of input document.
    • the value.avg to the average number of qty per order using $divide and $size.
    { "_id" : "apples", "value" : { "count" : 4, "qty" : 35, "avg" : 8.75 } }
    { "_id" : "pears", "value" : { "count" : 1, "qty" : 10, "avg" : 10 } }
    { "_id" : "chocolates", "value" : { "count" : 3, "qty" : 15, "avg" : 5 } }
    { "_id" : "oranges", "value" : { "count" : 7, "qty" : 63, "avg" : 9 } }
    { "_id" : "carrots", "value" : { "count" : 2, "qty" : 15, "avg" : 7.5 } }
  5. Finally, the $merge writes the output to the collection agg_alternative_3. If an existing document has the same key _id as the new result, the operation overwrites the existing document. If there is no existing document with the same key, the operation inserts the document.
  6. Query the agg_alternative_3 collection to verify the results:

    db.agg_alternative_3.find().sort( { _id: 1 } )

    The operation returns these documents:

    { "_id" : "apples", "value" : { "count" : 4, "qty" : 35, "avg" : 8.75 } }
    { "_id" : "carrots", "value" : { "count" : 2, "qty" : 15, "avg" : 7.5 } }
    { "_id" : "chocolates", "value" : { "count" : 3, "qty" : 15, "avg" : 5 } }
    { "_id" : "oranges", "value" : { "count" : 7, "qty" : 63, "avg" : 9 } }
    { "_id" : "pears", "value" : { "count" : 1, "qty" : 10, "avg" : 10 } }

See also参阅

For an alternative that uses custom aggregation expressions, see Map-Reduce to Aggregation Pipeline Translation Examples.