On this page本页内容
New in version 3.6.版本3.6中的新功能。
Change streams allow applications to access real-time data changes without the complexity and risk of tailing the oplog. 更改流允许应用程序访问实时数据更改,而不必承担跟踪oplog的复杂性和风险。Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them. Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications at will.应用程序可以使用变更流订阅单个集合、数据库或整个部署上的所有数据变更,并立即对其做出反应。因为变更流使用聚合框架,所以应用程序还可以筛选特定的变更或随意转换通知。
Change streams are available for replica sets and sharded clusters:更改流可用于副本集和分片群集:
The replica sets and sharded clusters must use the WiredTiger storage engine. 副本集和分片集群必须使用WiredTiger存储引擎。Change streams can also be used on deployments that employ MongoDB’s encryption-at-rest feature.变更流还可以用于使用MongoDB的静态加密功能的部署。
The replica sets and sharded clusters must use replica set protocol version 1 (副本集和分片集群必须使用副本集协议版本1(pv1
).pv1
)。
Starting in MongoDB 4.2, change streams are available regardless of the 从MongoDB 4.2开始,无论支持"majority"
read concern support; that is, read concern majority
support can be either enabled (default) or disabled to use change streams."majority"
读关注点,都可以使用更改流;也就是说,可以启用(默认)或禁用读关注majority
支持来使用更改流。
In MongoDB 4.0 and earlier, change streams are available only if 在MongoDB 4.0及更早版本中,只有在启用了"majority"
read concern support is enabled (default)."majority"
读关注点支持(默认)的情况下,更改流才可用。
You can open change streams against:您可以针对以下内容打开变更流:
| |
| |
|
Change Stream Examples更改流示例
The examples on this page use the MongoDB drivers to illustrate how to open a change stream cursor for a collection and work with the change stream cursor.本页上的示例使用MongoDB驱动程序来说明如何为集合打开变更流游标并使用变更流游标。
To open a change stream:要打开变更流,请执行以下操作:
mongos
.mongos
发出打开变更流操作。The following example opens a change stream for a collection and iterates over the cursor to retrieve the change stream documents. 以下示例打开集合的更改流,并在游标上迭代以检索更改流文档。[1]
The Python examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an 下面的Python示例假设您已连接到MongoDB副本集,并访问了包含inventory
collection.inventory
集合的数据库。
The Java examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an 下面的Java示例假设您已连接到MongoDB副本集,并访问了包含inventory
collection.inventory
集合的数据库。
The Node.js examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an 下面的Nodejs示例假设您已连接到MongoDB副本集,并访问了包含inventory
collection.inventory
集合的数据库。
The following example uses stream to process the change events.下面的示例使用流来处理更改事件。
Alternatively, you can also use iterator to process the change events:或者,也可以使用迭代器处理更改事件:
The examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory
collection.
The examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory
collection.
The C examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory
collection.
The C# examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory
collection.
The examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory
collection.
The Go examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an 下面的Go示例假设您已连接到MongoDB副本集,并访问了包含inventory
collection.inventory
集合的数据库。
The Swift (Sync) examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory
collection.
The Swift (Async) examples below assume that you have connected to a MongoDB replica set and have accessed a database that contains an inventory
collection.
To retrieve the data change event from the cursor, iterate the change stream cursor. 要从游标中检索数据更改事件,请迭代更改流游标。For information on the change stream event, see Change Events.有关更改流事件的信息,请参阅更改事件。
While the connection to the MongoDB deployment remains open, the cursor remains open until one of the following occurs:当与MongoDB部署的连接保持打开时,游标保持打开状态,直到出现以下情况之一:
Note
The lifecycle of an unclosed cursor is language-dependent.未关闭游标的生命周期取决于语言。
[1] | startAtOperationTime to open the cursor at a particular point in time. startAtOperationTime 以在特定时间点打开游标。 |
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:在配置变更流时,可以通过提供以下一个或多个管道阶段的数组来控制变更流输出:
$addFields
$match
$project
$replaceRoot
$replaceWith
$redact
$set
$unset
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:在配置变更流时,可以通过提供以下一个或多个管道阶段的数组来控制变更流输出:
$addFields
$match
$project
$replaceRoot
$replaceWith
(Available starting in MongoDB 4.2)$redact
$set
(Available starting in MongoDB 4.2)$unset
(Available starting in MongoDB 4.2)The pipeline
list includes a single $match
stage that filters any operations where the username
is alice
, or operations where the operationType
is delete
.
Passing the pipeline
to the watch()
method directs the change stream to return notifications after passing them through the specified pipeline
.
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
$addFields
$match
$project
$replaceRoot
$replaceWith
$redact
$set
$unset
The following example uses stream to process the change events.
Alternatively, you can also use iterator to process the change events:或者,也可以使用迭代器处理更改事件:
You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:在配置变更流时,可以通过提供以下一个或多个管道阶段的数组来控制变更流输出:
$addFields
$match
$project
$replaceRoot
$replaceWith
(Available starting in MongoDB 4.2)$redact
$set
(Available starting in MongoDB 4.2)$unset
(Available starting in MongoDB 4.2)You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
$addFields
$match
$project
$replaceRoot
$replaceWith
(Available starting in MongoDB 4.2)$redact
$set
(Available starting in MongoDB 4.2)$unset
(Available starting in MongoDB 4.2)You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
$addFields
$match
$project
$replaceRoot
$replaceWith
(Available starting in MongoDB 4.2)$redact
$set
(Available starting in MongoDB 4.2)$unset
(Available starting in MongoDB 4.2)You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
$addFields
$match
$project
$replaceRoot
$replaceWith
(Available starting in MongoDB 4.2)$redact
$set
(Available starting in MongoDB 4.2)$unset
(Available starting in MongoDB 4.2)You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
$addFields
$match
$project
$replaceRoot
$replaceWith
(Available starting in MongoDB 4.2)$redact
$set
(Available starting in MongoDB 4.2)$unset
(Available starting in MongoDB 4.2)You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
$addFields
$match
$project
$replaceRoot
$replaceWith
(Available starting in MongoDB 4.2)$redact
$set
(Available starting in MongoDB 4.2)$unset
(Available starting in MongoDB 4.2)You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
$addFields
$match
$project
$replaceRoot
$replaceWith
(Available starting in MongoDB 4.2)$redact
$set
(Available starting in MongoDB 4.2)$unset
(Available starting in MongoDB 4.2)You can control change stream output by providing an array of one or more of the following pipeline stages when configuring the change stream:
$addFields
$match
$project
$replaceRoot
$replaceWith
(Available starting in MongoDB 4.2)$redact
$set
(Available starting in MongoDB 4.2)$unset
(Available starting in MongoDB 4.2)Tip
The _id field of the change stream event document act as the resume token. 变更流事件文档的_id
字段用作恢复令牌。Do not use the pipeline to modify or remove the change stream event’s 不要使用管道修改或删除更改流事件的_id
field._id
字段。
Starting in MongoDB 4.2, change streams will throw an exception if the change stream aggregation pipeline modifies an event’s _id field.从MongoDB 4.2开始,如果变更流聚合管道修改事件的_id
字段,则变更流将引发异常。
See Change Events for more information on the change stream response document format.有关变更流响应文档格式的更多信息,请参阅变更事件。
By default, change streams only return the delta of fields during the update operation. 默认情况下,变更流仅在更新操作期间返回字段的增量。However, you can configure the change stream to return the most current majority-committed version of the updated document.但是,您可以将更改流配置为返回更新文档的最新多数提交版本。
To return the most current majority-committed version of the updated document, pass full_document='updateLookup'
to the db.collection.watch()
method.
In the example below, all update operations notifications include a full_document
field that represents the current version of the document affected by the update operation.
To return the most current majority-committed version of the updated document, pass 要返回更新文档的最新多数提交版本,请向FullDocument.UPDATE_LOOKUP
to the db.collection.watch.fullDocument()
method.db.collection.watch.fullDocument()
方法传递FullDocument.UPDATE_LOOKUP
。
In the example below, all update operations notifications include a 在下面的示例中,所有更新操作通知都包含一个FullDocument
field that represents the current version of the document affected by the update operation.FullDocument
字段,该字段表示受更新操作影响的文档的当前版本。
To return the most current majority-committed version of the updated document, pass { fullDocument: 'updateLookup' }
to the collection.watch()
method.
In the example below, all update operations notifications include a fullDocument
field that represents the current
version of the document affected by the update operation.
The following example uses stream to process the change events.
Alternatively, you can also use iterator to process the change events:
To return the most current majority-committed version of the updated document, pass "fullDocument' => \MongoDB\Operation\ChangeStreamCommand::FULL_DOCUMENT_UPDATE_LOOKUP"
to the watch()
method.
In the example below, all update operations notifications include a fullDocument
field that represents the current
version of the document affected by the update operation.
To return the most current majority-committed version of the updated document, pass full_document='updateLookup'
to the db.collection.watch()
method.
In the example below, all update operations notifications include a `full_document
field that represents the current
version of the document affected by the update operation.
To return the most current majority-committed version of the updated document, pass the "fullDocument"
option with the "updateLookup"
value to the mongoc_collection_watch
method.
In the example below, all update operations notifications include a fullDocument
field that represents the current
version of the document affected by the update operation.
To return the most current majority-committed version of the updated document, pass "FullDocument = ChangeStreamFullDocumentOption.UpdateLookup"
to the collection.Watch()
method.
In the example below, all update operations notifications include a FullDocument
field that represents the current
version of the document affected by the update operation.
To return the most current majority-committed version of the updated document, pass full_document: 'updateLookup'
to the watch()
method.
In the example below, all update operations notifications include a full_document
field that represents the current
version of the document affected by the update operation.
To return the most current majority-committed version of the updated document, SetFullDocument(options.UpdateLookup)
change stream option.
To return the most current majority-committed version of the updated document, pass options:
ChangeStreamOptions(fullDocument: .updateLookup)
to the watch()
method.
To return the most current majority-committed version of the updated document, pass options:
ChangeStreamOptions(fullDocument: .updateLookup)
to the watch()
method.
Note
If there are one or more majority-committed operations that modified the updated document after the update operation but before the lookup, the full document returned may differ significantly from the document at the time of the update operation.如果有一个或多个多数提交的操作在更新操作之后但在查找之前修改了更新的文档,则返回的完整文档可能与更新操作时的文档存在显著差异。
However, the deltas included in the change stream document always correctly describe the watched collection changes that applied to that change stream event.但是,变更流文档中包含的增量始终正确地描述了应用于该变更流事件的监视集合变更。
See Change Events for more information on the change stream response document format.有关变更流响应文档格式的更多信息,请参阅变更事件。
Change streams are resumable by specifying a resume token to either resumeAfter or startAfter when opening the cursor.通过在打开游标时为resumeAfter
或startAfter
指定恢复令牌,可以恢复更改流。
resumeAfter
for Change StreamsresumeAfter
¶You can resume a change stream after a specific event by passing a resume token to 通过在打开游标时向resumeAfter
when opening the cursor. resumeAfter
传递恢复令牌,可以在特定事件之后恢复更改流。For the resume token, use the _id
value of the change stream event document. See Resume Tokens for more information on the resume token.
Important重要
resumeAfter
to resume a change stream after an invalidate event (for example, a collection drop or rename) closes the stream. Starting in MongoDB 4.2, you can use startAfter to start a new change stream after an invalidate event.You can use the 您可以使用resume_after
modifier to resume notifications after the operation specified in the resume token. resume_after
修饰符在恢复口令中指定的操作之后恢复通知。The resume_after
modifier takes a value that must resolve to a resume token, e.g. resume_token
in the example below.resume_after
修饰符采用必须解析为恢复口令的值,例如下面示例中的resume_token
。
You can use the 可以使用resumeAfter()
method to resume notifications after the operation specified in the resume token. resumeAfter()
方法在resume令牌中指定的操作之后恢复通知。The resumeAfter()
method takes a value that must resolve to a resume token, e.g. resumeToken
in the example below.resumeAfter()
方法接受一个必须解析为恢复口令的值,例如下面示例中的resumeToken
。
You can use the resumeAfter
option to resume notifications after the operation specified in the resume token. The resumeAfter
option takes a value that must resolve to a resume token, e.g. resumeToken
in the example below.
You can use the 您可以使用resumeAfter
option to resume notifications after the operation specified in the resume token. resumeAfter
选项在恢复口令中指定的操作之后恢复通知。The resumeAfter
option takes a value that must resolve to a resume token, e.g. $resumeToken
in the example below.resumeAfter
选项的值必须解析为恢复口令,例如下面示例中的$resumeToken
。
You can use the resume_after
modifier to resume notifications after the operation specified in the resume token. The resume_after
modifier takes a value that must resolve to a resume token, e.g. resume_token
in the example below.
In the example below, the resumeAfter
option is appended to the stream options to recreate the stream after it has been destroyed. Passing the _id
to the change stream attempts to resume notifications starting after the operation specified.
In the example below, the resumeToken
is retrieved from the last change stream document and passed to the Watch()
method as an option. Passing the resumeToken
to the Watch()
method directs the change stream to attempt to resume notifications starting after the operation specified in the resume token.
You can use the 您可以使用resume_after
modifier to resume notifications after the operation specified in the resume token. resume_after
修饰符在恢复口令中指定的操作之后恢复通知。The resume_after
modifier takes a value that must resolve to a resume token, e.g. resume_token
in the example below.
You can use ChangeStreamOptions.SetResumeAfter to specify the resume token for the change stream. If the resumeAfter option is set, the change stream resumes notifications after the operation specified in the resume token. The SetResumeAfter
takes a value that must resolve to a resume token, e.g. resumeToken
in the example below.
You can use the resumeAfter
option to resume notifications after the operation specified in the resume token. The resumeAfter
option takes a value that must resolve to a resume token, e.g. resumeToken
in the example below.
You can use the resumeAfter
option to resume notifications after the operation specified in the resume token. The resumeAfter
option takes a value that must resolve to a resume token, e.g. resumeToken
in the example below.
startAfter
for Change StreamsstartAfter
¶New in version 4.2.版本4.2中的新功能。
You can start a new change stream after a specific event by passing a resume token to 通过在打开游标时向startAfter
when opening the cursor. startAfter
传递恢复令牌,可以在特定事件之后启动新的更改流。Unlike resumeAfter, 与startAfter
can resume notifications after an invalidate event by creating a new change stream. resumeAfter
不同,startAfter
可以通过创建新的更改流在失效事件后恢复通知。For the resume token, use the 对于恢复令牌,使用变更流事件文档的__id
value of the change stream event document. id
值。See Resume Tokens for more information on the resume token.有关恢复令牌的更多信息,请参阅恢复令牌。
Important重要
The 变更流事件文档的_id
value of a change stream event document acts as the resume token:_id
值充当恢复令牌:
The resume token 恢复令牌_data
type depends on the MongoDB versions and, in some cases, the feature compatibility version (fcv) at the time of the change stream’s opening/resumption (i.e. a change in fcv value does not affect the resume tokens for already opened change streams):_data
类型取决于MongoDB版本,在某些情况下,还取决于更改流打开/恢复时的功能兼容性版本(fcv)(即fcv值的更改不会影响已打开更改流的恢复令牌):
MongoDB Version | _data Type_data 类型 | |
---|---|---|
MongoDB 4.2 and later | “4.2” or “4.0” | Hex-encoded string (v1 ) |
MongoDB 4.0.7 and later | “4.0” or “3.6” | Hex-encoded string (v1 ) |
MongoDB 4.0.6 and earlier | “4.0” | Hex-encoded string (v0 ) |
MongoDB 4.0.6 and earlier | “3.6” | BinData |
MongoDB 3.6 | “3.6” | BinData |
With hex-encoded string resume tokens, you can compare and sort the resume tokens.使用十六进制编码的字符串恢复令牌,可以对恢复令牌进行比较和排序。
Regardless of the fcv value, a 4.0 deployment can use either BinData resume tokens or hex string resume tokens to resume a change stream. 无论fcv值是多少,4.0部署都可以使用BinData恢复令牌或十六进制字符串恢复令牌来恢复更改流。As such, a 4.0 deployment can use a resume token from a change stream opened on a collection from a 3.6 deployment.因此,4.0部署可以使用在3.6部署的集合上打开的更改流中的恢复令牌。
New resume token formats introduced in a MongoDB version cannot be consumed by earlier MongoDB versions.MongoDB版本中引入的新简历令牌格式不能被早期的MongoDB版本使用。
Tip
Starting in MongoDB 4.2, change streams will throw an exception if the change stream aggregation pipeline modifies an event’s _id field.从MongoDB 4.2开始,如果变更流聚合管道修改事件的_id
字段,则变更流将引发异常。
Change streams can benefit architectures with reliant business systems, informing downstream systems once data changes are durable. 变更流可以使具有可靠业务系统的体系结构受益,一旦数据变更持久,就会通知下游系统。For example, change streams can save time for developers when implementing Extract, Transform, and Load (ETL) services, cross-platform synchronization, collaboration functionality, and notification services.例如,在实现提取、转换和加载(ETL)服务、跨平台同步、协作功能和通知服务时,变更流可以为开发人员节省时间。
For deployments enforcing Authentication and authorization:对于强制执行身份验证和授权的部署:
changeStream
and find
actions on the corresponding collection.changeStream
权限以及相应集合上find
操作的权限。
changeStream
and find
actions on all non-system
collections in a database.changeStream
权限以及数据库中所有非系统集合上的find
操作的权限。
changeStream
and find
actions on all non-system
collections for all databases in the deployment.changeStream
的权限以及在部署中所有数据库的所有非系统集合上find
操作的权限。
Change streams only notify on data changes that have persisted to a majority of data-bearing members in the replica set. 更改流仅在副本集中的大多数数据承载成员都保留了数据更改时发出通知。This ensures that notifications are triggered only by majority-committed changes that are durable in failure scenarios.这确保了通知仅由大多数提交的更改触发,这些更改在故障场景中是持久的。
For example, consider a 3-member replica set with a change stream cursor opened against the primary. 例如,考虑一个带有primary打开的更改流游标的3个成员副本集。If a client issues an insert operation, the change stream only notifies the application of the data change once that insert has persisted to a majority of data-bearing members.如果客户机发出插入操作,更改流仅在插入持续存在于大多数数据承载成员之后通知应用程序数据更改。
If an operation is associated with a transaction, the change event document includes the 如果操作与事务相关,则变更事件文档包括txnNumber
and the lsid
.txnNumber
和lsid
。
Starting in MongoDB 4.2, change streams use 从MongoDB 4.2开始,除非提供明确的排序规则,否则更改流使用simple
binary comparisons unless an explicit collation is provided. simple
的二进制比较。In earlier versions, change streams opened on a single collection (在早期版本中,在单个集合(db.collection.watch()
) would inherit that collection’s default collation.db.collection.watch()
)上打开的更改流将继承该集合的默认排序规则。