db.collection.watch()

On this page本页内容

Definition定义

db.collection.watch(pipeline, options)

mongo Shell Method

This page documents the mongo shell method, and does not refer to the MongoDB Node.js driver (or any other driver) method. 本页记录了mongo shell方法,未提及MongoDB Node.js驱动程序(或任何其他驱动程序)方法。For corresponding MongoDB driver API, refer to your specific MongoDB driver documentation instead.有关相应的MongoDB驱动程序API,请参阅特定的MongoDB驱动程序文档。

For replica sets and sharded clusters only仅适用于副本集和分片群集

Opens a change stream cursor on the collection.在集合上打开更改流游标

Parameter参数Type类型Description描述
pipeline array

Aggregation pipeline聚合管道 consisting of one or more of the following aggregation stages:由以下一个或多个聚合阶段组成:

Specify a pipeline to filter/modify the change events output.指定用于筛选/修改更改事件输出的管道。

Starting in MongoDB 4.2, change streams will throw an exception if the change stream aggregation pipeline modifies an event’s _id field.从MongoDB 4.2开始,如果变更流聚合管道修改事件的_id字段,则变更流将引发异常。

options document

Optional.可选。Additional options that modify the behavior of watch().修改watch()行为的其他选项。

You must pass an empty array [] to the pipeline parameter if you are not specifying a pipeline but are passing the options document.如果未指定管道,但要传递options文档,则必须向pipeline参数传递空数组[]

The options document can contain the following fields and values:options文档可以包含以下字段和值:

Field字段Type类型Description描述
resumeAfter document

Optional.可选。Directs watch to attempt resuming notifications starting after the operation specified in the resume token.指示watch尝试在恢复令牌中指定的操作之后开始恢复通知。

Each change stream event document includes a resume token as the _id field. 每个变更流事件文档都包含一个恢复令牌作为_id字段。Pass the entire _id field of the change event document that represents the operation you want to resume after.传递更改事件文档的整个_id字段,该字段表示要在之后继续的操作。

resumeAfter is mutually exclusive with startAfter and startAtOperationTime.resumeAfterstartAfterstartAtOperationTime是互斥的。

startAfter document

Optional.可选。Directs watch to attempt starting a new change stream after the operation specified in the resume token. 指示watch在resume令牌中指定的操作之后尝试启动新的更改流。Allows notifications to resume after an invalidate event.允许在发生失效事件后恢复通知。

Each change stream event document includes a resume token as the _id field. 每个变更流事件文档都包含一个恢复令牌作为_id字段。Pass the entire _id field of the change event document that represents the operation you want to resume after.传递更改事件文档的整个_id字段,该字段表示要在之后继续的操作。

startAfter is mutually exclusive with resumeAfter and startAtOperationTime.startAfterresumeAfterstartAtOperationTime是互斥的。

New in version 4.2.版本4.2中的新功能。

fullDocument string

Optional.可选。By default, watch() returns the delta of those fields modified by an update operation, instead of the entire updated document.默认情况下,watch()返回更新操作修改的字段的增量,而不是整个更新的文档。

Set fullDocument to "updateLookup" to direct watch() to look up the most current majority-committed version of the updated document. fullDocument设置为"updateLookup"以指向watch()查找更新文档的最新多数提交版本。watch() returns a fullDocument field with the document lookup in addition to the updateDescription delta.watch()返回一个fullDocument字段,除了updateDescriptiondelta之外,还包含文档查找。

batchSize int

Optional.可选。Specifies the maximum number of change events to return in each batch of the response from the MongoDB cluster.指定在MongoDB集群的每批响应中返回的最大更改事件数。

Has the same functionality as cursor.batchSize().具有与cursor.batchSize()相同的功能。

maxAwaitTimeMS int

Optional.可选。The maximum amount of time in milliseconds the server waits for new data changes to report to the change stream cursor before returning an empty batch.服务器在返回空批之前等待新数据更改报告到更改流游标的最长时间(毫秒)。

Defaults to 1000 milliseconds.默认值为1000毫秒。

collation document

Optional.可选。Pass a collation document to specify a collation for the change stream cursor.传递排序规则文档以指定更改流游标的排序规则

Starting in MongoDB 4.2, defaults to simple binary comparison if omitted. 从MongoDB 4.2开始,如果省略,默认为simple的二进制比较。In earlier versions, change streams opened on a single collection would inherit the collection’s default collation.在早期版本中,在单个集合上打开的更改流将继承该集合的默认排序规则。

startAtOperationTime Timestamp

Optional.可选。The starting point for the change stream. 变更流的起点。If the specified starting point is in the past, it must be in the time range of the oplog. 如果指定的起点在过去,则它必须在oplog的时间范围内。To check the time range of the oplog, see rs.printReplicationInfo().要检查oplog的时间范围,请参阅rs.printReplicationInfo()

startAtOperationTime is mutually exclusive with resumeAfter and startAfter.startAtOperationTimeresumeAfterstartAfter是互斥的。

New in version 4.0.版本4.0中的新功能。

Returns:返回:A cursor that remains open as long as a connection to the MongoDB deployment remains open and the collection exists. 只要与MongoDB部署的连接保持打开且集合存在,就会保持打开状态的游标See Change Events for examples of change event documents.有关更改事件文档的示例,请参阅更改事件

See also参阅

db.watch() and Mongo.watch()

Availability可利用性

Deployment部署

db.collection.watch() is available for replica set and sharded cluster deployments :可用于副本集和分片群集部署:

Storage Engine存储引擎

You can only use db.collection.watch() with the Wired Tiger storage engine.只能将db.collection.watch()Wired Tiger存储引擎一起使用。

Read Concern majority Support阅读关注majority支持

Starting in MongoDB 4.2, change streams are available regardless of the "majority" read concern support; that is, read concern majority support can be either enabled (default) or disabled to use change streams.从MongoDB 4.2开始,无论支持"majority"读关注点,都可以使用更改流;也就是说,可以启用(默认)或禁用读关注majority支持来使用更改流。

In MongoDB 4.0 and earlier, change streams are available only if "majority" read concern support is enabled (default).在MongoDB 4.0及更早版本中,只有在启用了"majority"读关注点支持(默认)的情况下,更改流才可用。

Behavior行为

Resumability可恢复性

Unlike the MongoDB drivers, the mongo shell does not automatically attempt to resume a change stream cursor after an error. 与MongoDB驱动程序不同,mongo shell不会在出现错误后自动尝试恢复更改流游标。The MongoDB drivers make one attempt to automatically resume a change stream cursor after certain errors.MongoDB驱动程序尝试在出现某些错误后自动恢复更改流游标。

db.collection.watch() uses information stored in the oplog to produce the change event description and generate a resume token associated to that operation. 使用oplog中存储的信息生成更改事件描述,并生成与该操作关联的恢复令牌。If the operation identified by the resume token passed to the resumeAfter or startAfter option has already dropped off the oplog, db.collection.watch() cannot resume the change stream.如果传递给resumeAfterstartAfter选项的resume令牌所标识的操作已从oplog中删除,则db.collection.watch()无法恢复更改流。

See Resume a Change Stream for more information on resuming a change stream.有关恢复更改流的更多信息,请参阅恢复更改流

Note

  • You cannot use resumeAfter to resume a change stream after an invalidate event (for example, a collection drop or rename) closes the stream. 失效事件(例如,集合删除或重命名)关闭更改流后,不能使用resumeAfter恢复更改流。Starting in MongoDB 4.2, you can use startAfter to start a new change stream after an invalidate event.从MongoDB 4.2开始,可以使用startAfter失效事件后启动新的更改流。
  • If the deployment is a sharded cluster, a shard removal may cause an open change stream cursor to close, and the closed change stream cursor may not be fully resumable.如果部署是分片集群,则分片移除可能会导致打开的更改流光标关闭,而关闭的更改流光标可能无法完全恢复。

Resume Token恢复令牌

The resume token _data type depends on the MongoDB versions and, in some cases, the feature compatibility version (fcv) at the time of the change stream’s opening/resumption (i.e. a change in fcv value does not affect the resume tokens for already opened change streams):恢复令牌_data类型取决于MongoDB版本,在某些情况下,还取决于更改流打开/恢复时的功能兼容性版本(fcv)(即fcv值的更改不会影响已打开更改流的恢复令牌):

MongoDB VersionMongoDB版本Feature Compatibility Version功能兼容性版本Resume Token _data Type恢复令牌_data类型
MongoDB 4.2 and laterMongoDB 4.2及更高版本 “4.2” or “4.0” Hex-encoded string (v1)
MongoDB 4.0.7 and laterMongoDB 4.0.7及更高版本 “4.0” or “3.6” Hex-encoded string (v1)
MongoDB 4.0.6 and earlierMongoDB 4.0.6及更早版本 “4.0” Hex-encoded string (v0)
MongoDB 4.0.6 and earlierMongoDB 4.0.6及更早版本 “3.6” BinData
MongoDB 3.6 “3.6” BinData

With hex-encoded string resume tokens, you can compare and sort the resume tokens.使用十六进制编码的字符串恢复令牌,可以对恢复令牌进行比较和排序。

Regardless of the fcv value, a 4.0 deployment can use either BinData resume tokens or hex string resume tokens to resume a change stream. 无论fcv值是多少,4.0部署都可以使用BinData恢复令牌或十六进制字符串恢复令牌来恢复更改流。As such, a 4.0 deployment can use a resume token from a change stream opened on a collection from a 3.6 deployment.因此,4.0部署可以使用在3.6部署的集合上打开的更改流中的恢复令牌。

New resume token formats introduced in a MongoDB version cannot be consumed by earlier MongoDB versions.早期MongoDB版本无法使用MongoDB版本中引入的新恢复令牌格式。

Full Document Lookup of Update Operations更新操作的完整文档查找

By default, the change stream cursor returns specific field changes/deltas for update operations. 默认情况下,更改流游标返回更新操作的特定字段更改/增量。You can also configure the change stream to look up and return the current majority-committed version of the changed document. 您还可以配置变更流,以查找并返回变更文档的当前多数提交版本。Depending on other write operations that may have occurred between the update and the lookup, the returned document may differ significantly from the document at the time of the update.根据更新和查找之间可能发生的其他写入操作,返回的文档可能与更新时的文档存在显著差异。

Depending on the number of changes applied during the update operation and the size of the full document, there is a risk that the size of the change event document for an update operation is greater than the 16MB BSON document limit. 根据更新操作期间应用的更改数量和完整文档的大小,存在更新操作的更改事件文档大小大于16MB BSON文档限制的风险。If this occurs, the server closes the change stream cursor and returns an error.如果发生这种情况,服务器将关闭更改流光标并返回错误。

Access Control访问控制

When running with access control, the user must have the find and changeStream privilege actions on the collection resource. 使用访问控制运行时,用户必须对集合资源具有findchangeStream权限操作。That is, a user must have a role that grants the following privilege:也就是说,用户必须具有授予以下权限角色

{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }

The built-in read role provides the appropriate privileges.内置read角色提供适当的权限。

Examples示例

Open a Change Stream开启变革之流

The following operation opens a change stream cursor against the data.sensors collection:以下操作将针对data.sensors集合打开更改流光标:

watchCursor = db.getSiblingDB("data").sensors.watch()

Iterate the cursor to check for new events. 迭代光标以检查新事件。Use the cursor.isExhausted() method to ensure the loop only exits if the change stream cursor is closed and there are no objects remaining in the latest batch:使用cursor.isExhausted()方法确保循环仅在更改流光标关闭且最新批处理中没有剩余对象时退出:

while (!watchCursor.isExhausted()){
   if (watchCursor.hasNext()){
      printjson(watchCursor.next());
   }
}

For complete documentation on change stream output, see Change Events.有关更改流输出的完整文档,请参阅更改事件

Change Stream with Full Document Update Lookup具有完整文档更新查找的更改流

Set the fullDocument option to "updateLookup" to direct the change stream cursor to lookup the most current majority-committed version of the document associated to an update change stream event.fullDocument选项设置为"updateLookup",以指示更改流光标查找与更新更改流事件关联的文档的最新多数提交版本。

The following operation opens a change stream cursor against the data.sensors collection using the fullDocument : "updateLookup" option.下面的操作使用fullDocument : "updateLookup"选项打开data.sensors集合的变更流光标。

watchCursor = db.getSiblingDB("data").sensors.watch(
   [],
   { fullDocument : "updateLookup" }
)

Iterate the cursor to check for new events. 迭代光标以检查新事件。Use the cursor.isExhausted() method to ensure the loop only exits if the change stream cursor is closed and there are no objects remaining in the latest batch:使用cursor.isExhausted()方法确保循环仅在更改流光标关闭且最新批处理中没有剩余对象时退出:

while (!watchCursor.isExhausted()){
   if (watchCursor.hasNext()){
      printjson(watchCursor.next());
   }
}

For any update operation, the change event returns the result of the document lookup in the fullDocument field.对于任何更新操作,更改事件都会在fullDocument字段中返回文档查找的结果。

For an example of the full document update output, see change stream update event.有关完整文档更新输出的示例,请参阅更改流更新事件

For complete documentation on change stream output, see Change Events.有关更改流输出的完整文档,请参阅更改事件

Change Stream with Aggregation Pipeline Filter使用聚合管道筛选器更改流

Note

Starting in MongoDB 4.2, change streams will throw an exception if the change stream aggregation pipeline modifies an event’s _id field.从MongoDB 4.2开始,如果变更流聚合管道修改事件的_id字段,则变更流将引发异常。

The following operation opens a change stream cursor against the data.sensors collection using an aggregation pipeline to filter only insert events:以下操作使用聚合管道针对data.sensors集合打开更改流光标,以仅筛选insert事件:

watchCursor = db.getSiblingDB("data").sensors.watch(
   [
      { $match : {"operationType" : "insert" } }
   ]
)

Iterate the cursor to check for new events. 迭代光标以检查新事件。Use the cursor.isExhausted() method to ensure the loop only exits if the change stream cursor is closed and there are no objects remaining in the latest batch:使用cursor.isExhausted()方法确保循环仅在更改流光标关闭且最新批处理中没有剩余对象时退出:

while (!watchCursor.isExhausted()){
   if (watchCursor.hasNext()){
      printjson(watchCursor.next());
   }
}

The change stream cursor only returns change events where the operationType is insert. 更改流游标仅返回operationTypeinsert的更改事件。For complete documentation on change stream output, see Change Events.有关更改流输出的完整文档,请参阅更改事件

Resuming a Change Stream恢复变更流

Every document returned by a change stream cursor includes a resume token as the _id field. 变更流游标返回的每个文档都包含一个resume令牌作为_id字段。To resume a change stream, pass the entire _id document of the change event you want to resume from to either the resumeAfter or startAfter option of watch().要恢复更改流,请将要从中恢复的更改事件的整个_id文档传递到watch()resumeAfterstartAfter选项。

The following operation resumes a change stream cursor against the data.sensors collection using a resume token. 下面的操作使用恢复令牌针对data.sensors集合恢复更改流光标。This assumes that the operation that generated the resume token has not rolled off the cluster’s oplog.这假设生成恢复令牌的操作没有从集群的oplog中滚出来。

let watchCursor = db.getSiblingDB("data").sensors.watch();
let firstChange;

while (!watchCursor.isExhausted()) {
   if (watchCursor.hasNext()) {
     firstChange = watchCursor.next();
     break;
   }
}

watchCursor.close();

let resumeToken = firstChange._id;

resumedWatchCursor = db.getSiblingDB("data").sensors.watch(
[],
   { resumeAfter : resumeToken }
)

Iterate the cursor to check for new events. 迭代光标以检查新事件。Use the cursor.isExhausted() method to ensure the loop only exits if the change stream cursor is closed and there are no objects remaining in the latest batch:使用cursor.isExhausted()方法确保循环仅在更改流光标关闭且最新批处理中没有剩余对象时退出:

while (!resumedWatchCursor.isExhausted()){
   if (resumedWatchCursor.hasNext()){
      printjson(watchCursor.next());
   }
}

See Resume a Change Stream for complete documentation on resuming a change stream.有关恢复更改流的完整文档,请参阅恢复更改流。