Starting in version 4.4, MongoDB adds the $accumulator
and $function
aggregation operators. These operators provide users the ability to define custom aggregation expressions. Using these operations, the map-reduce expressions can be approximately re-written as in the following table.
Note
Various map-reduce expressions can be rewritten using aggregation pipeline operators, such as $group
, $merge
, etc., without requiring custom functions.
For examples, see Map-Reduce Examples.
The table is only an approximate translation. For instance, the table shows an approximate translation of mapFunction
using the $project
.
mapFunction
logic may require additional stages, such as if the logic includes iteration over an array:
Then, the aggregation pipeline includes an $unwind
and a $project
:
emits
field in $project
may be named something else. For visual comparison, the field name emits
was chosen.Map-Reduce | Aggregation Pipeline |
---|---|
db.collection.mapReduce(
<mapFunction>,
<reduceFunction>,
{
query: <queryFilter>,
sort: <sortOrder>,
limit: <number>,
finalize: <finalizeFunction>,
out: <collection> } ) |
db.collection.aggregate( [
{ $match: <queryFilter> },
{ $sort: <sortOrder> },
{ $limit: <number> },
{ $project: { emits: { k: <expression>, v: <expression> } } },
{ $unwind: “$emits” },
{ $group: {
_id: “$emits.k”},
value: { $accumulator: {
init: <initCode>,
accumulate: <reduceFunction>,
accumulateArgs: [ “$emit.v”],
merge: <reduceFunction>,
finalize: <finalizeFunction>,
lang: “js” }} } },
{ $out: <collection> } ] ) |
db.collection.mapReduce(
<mapFunction>,
<reduceFunction>,
{
query: <queryFilter>,
sort: <sortOrder>,
limit: <number>,
finalize: <finalizeFunction>,
out: { replace: <collection>, db:<db> } } ) |
db.collection.aggregate( [
{ $match: <queryFilter> },
{ $sort: <sortOrder> },
{ $limit: <number> },
{ $project: { emits: { k: <expression>, v: <expression> } } },
{ $unwind: “$emits” },
{ $group: {
_id: “$emits.k”},
value: { $accumulator: {
init: <initCode>,
accumulate: <reduceFunction>,
accumulateArgs: [ “$emit.v”],
merge: <reduceFunction>,
finalize: <finalizeFunction>,
lang: “js” }} } },
{ $out: { db: <db>, coll: <collection> } } ] ) |
db.collection.mapReduce(
<mapFunction>,
<reduceFunction>,
{
query: <queryFilter>,
sort: <sortOrder>,
limit: <number>,
finalize: <finalizeFunction>,
out: { merge: <collection>, db: <db> } } ) |
db.collection.aggregate( [
{ $match: <queryFilter> },
{ $sort: <sortOrder> },
{ $limit: <number> },
{ $project: { emits: { k: <expression>, v: <expression> } } },
{ $unwind: “$emits” },
{ $group: {
_id: “$emits.k”},
value: { $accumulator: {
init: <initCode>,
accumulate: <reduceFunction>,
accumulateArgs: [ “$emit.v”],
merge: <reduceFunction>,
finalize: <finalizeFunction>,
lang: “js” }} } },
{ $merge: {
into: { db: <db>, coll: <collection>},
on: “_id”
whenMatched: “replace”,
whenNotMatched: “insert” } }, ] ) |
db.collection.mapReduce(
<mapFunction>,
<reduceFunction>,
{
query: <queryFilter>,
sort: <sortOrder>,
limit: <number>,
finalize: <finalizeFunction>,
out: { reduce: <collection>, db: <db> } } ) |
db.collection.aggregate( [
{ $match: <queryFilter> },
{ $sort: <sortOrder> },
{ $limit: <number> },
{ $project: { emits: { k: <expression>, v: <expression> } } },
{ $unwind: “$emits” },
{ $group: {
_id: “$emits.k”},
value: { $accumulator: {
init: <initCode>,
accumulate: <reduceFunction>,
accumulateArgs: [ “$emit.v”],
merge: <reduceFunction>,
finalize: <finalizeFunction>,
lang: “js” }} } },
{ $merge: {
into: { db: <db>, coll: <collection> },
on: “_id”
whenMatched: [
{ $project: {
value: { $function: {
body: <reduceFunction>,
args: [
“$_id”,
[ “$value”, “$$new.value” ] ],
lang: “js” } } } } ]
whenNotMatched: “insert” } }, ] ) |
db.collection.mapReduce(
<mapFunction>,
<reduceFunction>,
{
query: <queryFilter>,
sort: <sortOrder>,
limit: <number>,
finalize: <finalizeFunction>,
out: { inline: 1 } } ) |
db.collection.aggregate( [
{ $match: <queryFilter> },
{ $sort: <sortOrder> },
{ $limit: <number> },
{ $project: { emits: { k: <expression>, v: <expression> } } },
{ $unwind: “$emits” },
{ $group: {
_id: “$emits.k”},
value: { $accumulator: {
init: <initCode>,
accumulate: <reduceFunction>,
accumulateArgs: [ “$emit.v”],
merge: <reduceFunction>,
finalize: <finalizeFunction>,
lang: “js” }} } } ] ) |
Various map-reduce expressions can be rewritten using aggregation pipeline operators, such as $group
, $merge
, etc., without requiring custom functions. However, for illustrative purposes, the following examples provide both alternatives.
The following map-reduce operation on the orders
collection groups by the cust_id
, and calculates the sum of the price
for each cust_id
:
Alternative 1: (Recommended) You can rewrite the operation into an aggregation pipeline without translating the map-reduce function to equivalent pipeline stages:
Alternative 2: (For illustrative purposes only) The following aggregation pipeline provides a translation of the various map-reduce functions, using $accumulator
to define custom functions:
$project
stage outputs documents with an emit
field. The emit
field is a document with the fields:
key
that contains the cust_id
value for the documentvalue
that contains the price
value for the document$group
uses the $accumulator
operator to add the emitted values:
$out
writes the output to the collection agg_alternative_2
. Alternatively, you could use $merge
instead of $out
.The following map-reduce operation on the orders
collection groups by the item.sku
field and calculates the number of orders and the total quantity ordered for each sku. The operation then calculates the average quantity per order for each sku value and merges the results into the output collection.
Alternative 1: (Recommended) You can rewrite the operation into an aggregation pipeline without translating the map-reduce function to equivalent pipeline stages:
Alternative 2: (For illustrative purposes only) The following aggregation pipeline provides a translation of the various map-reduce functions, using $accumulator
to define custom functions:
$match
stage selects only those documents with ord_date
greater than or equal to new Date("2020-03-01")
.$unwinds
stage breaks down the document by the items
array field to output a document for each array element. $project
stage outputs documents with an emit
field. The emit
field is a document with the fields:
key
that contains the items.sku
valuevalue
that contains a document with the qty
value and a count
value$group
uses the $accumulator
operator to add the emitted count
and qty
and calculate the avg
field:
$merge
writes the output to the collection agg_alternative_4
. If an existing document has the same key _id
as the new result, the operation overwrites the existing document. If there is no existing document with the same key, the operation inserts the document.See also参阅