Watch for Changes留意变化

You can watch for changes to a single collection, a database, or an entire deployment in MongoDB with Change Streams.您可以使用变更流监视MongoDB中单个集合、数据库或整个部署的变更。Open a change stream by calling the watch() method on a Collection, Db, or MongoClient object.通过对CollectionDbMongoClient对象调用watch()方法打开更改流。The change stream emits change event documents when they occur.变更流在发生变更事件文档时发出变更事件文档。

The watch() method optionally takes an aggregation pipeline which consists of an array of aggregation stages as the first parameter.watch()方法可以选择将包含聚合阶段数组的聚合管道作为第一个参数。The aggregation stages filter and transform the change events.聚合阶段过滤和转换更改事件。

In the following snippet, the $match stage matches all change event documents with a runtime value of less than 15, filtering all others out.在下面的代码段中,$match阶段匹配runtime值小于15的所有更改事件文档,过滤掉所有其他文档。

const pipeline = [ { $match: { runtime: { $lt: 15 } } } ];
const changeStream = collection.watch(pipeline);

The watch() method accepts an options object as the second parameter.watch()方法接受options对象作为第二个参数。Refer to the links at the end of this section for more information on the settings you can configure with this object.有关可使用此对象配置的设置的更多信息,请参阅本节末尾的链接。

The watch() method returns an instance of a ChangeStream.watch()方法返回ChangeStream的实例。You can read events from change streams by iterating over them or listening for events.您可以通过迭代更改流或侦听事件,从更改流中读取事件。Select the tab that corresponds to the way you want to read events from the change stream below.选择与要从下面的更改流中读取事件的方式相对应的选项卡。

Warning

Using a ChangeStream in EventEmitter and Iterator mode concurrently is not supported by the driver and causes an error.驱动程序不支持在EventEmitter和迭代器模式下同时使用ChangeStream,并会导致错误。This is to prevent undefined behavior, where the driver cannot guarantee which consumer receives documents first.这是为了防止未定义的行为,即驱动程序无法保证哪个使用者首先接收文档。

You can call methods on the ChangeStream object such as:您可以调用ChangeStream对象上的方法,例如:

  • hasNext() to check for remaining documents in the stream用于检查流中的剩余文档
  • next() to request the next document in the stream用于请求流中的下一个文档
  • close() to close the ChangeStream用于关闭更改流

You can attach listener functions to the ChangeStream object by calling the on() method. 可以通过调用on()方法将侦听器函数附加到ChangeStream对象。This method is inherited from the Javascript EventEmitter class. 此方法继承自Javascript EventEmitter类。Pass the string "change" as the first parameter and your callback function as the second parameter as shown below:将字符串"change"作为第一个参数传递,将回调函数作为第二个参数传递,如下所示:

changeStream.on("change", (changeEvent) => { /* your callback function */ });

The callback function triggers when a change event is emitted. 回调函数在发出更改事件时触发。You can specify logic in the callback to process the change event document when it is received.您可以在回调中指定逻辑,以便在收到更改事件文档时对其进行处理。

You can control the change stream by calling pause() to stop emitting events or resume() to continue to emit events.您可以通过调用pause()停止发送事件或调用resume()继续发送事件来控制更改流。

To stop processing change events, call the close() method on the ChangeStream instance. 要停止处理更改事件,请对ChangeStream实例调用close()方法。This closes the change stream and frees resources.这将关闭更改流并释放资源。

changeStream.close();

Visit the following resources for additional material on the classes and methods presented above:有关上述类和方法的更多资料,请访问以下资源:

The following example opens a change stream on the haikus collection in the insertDB database. 以下示例在insertDB数据库中的haikus集合上打开一个变更流。Let's create a listener function to receive and print change events that occur on the collection.让我们创建一个侦听器函数来接收和打印集合上发生的更改事件。

First, open the change stream on the collection and then define a callback on the change stream using the on() method. 首先,在集合上打开变更流,然后使用on()方法在变更流上定义回调。Once set, generate a change event by performing a change to the collection.设置后,通过对集合执行更改来生成更改事件。

To generate the change event on the collection, let's use insertOne() method to add a new document. 要在集合上生成更改事件,让我们使用insertOne()方法添加一个新文档。Since the insertOne() may run before the listener function can register, we use a timer, defined as simulateAsyncPause to wait 1 second before executing the insert.由于insertOne()可能在侦听器函数注册之前运行,因此我们使用一个定义为simulateAsyncPause的计时器在执行insert之前等待1秒。

We also use simulateAsyncPause after the insertion of the document to provide ample time for the listener function to receive the change event and for the callback to complete its execution before closing the ChangeStream instance using the close() method.在插入文档之后,我们还使用simulateAsyncPause为侦听器函数提供充足的时间来接收更改事件,并在使用close()方法关闭ChangeStream实例之前为回调函数提供充足的时间来完成其执行。

The timers used in this example are only necessary for this demonstration to make sure there is enough time to register listener and have the callback process the event before exiting.本示例中使用的计时器仅在本演示中是必要的,以确保有足够的时间注册侦听器,并在退出之前让回调处理事件。

Note

You can use this example to connect to an instance of MongoDB and interact with a database that contains sample data. 您可以使用此示例连接到MongoDB实例,并与包含示例数据的数据库交互。To learn more about connecting to your MongoDB instance and loading a sample dataset, see the Usage Examples guide.要了解有关连接到MongoDB实例并加载示例数据集的更多信息,请参阅用法示例指南

import { MongoClient } from "mongodb";
// Replace the uri string with your MongoDB deployment's connection string.用MongoDB部署的连接字符串替换uri字符串。 const uri = "<connection string uri>";
const client = new MongoClient(uri);
const simulateAsyncPause = () => new Promise(resolve => { setTimeout(() => resolve(), 1000); });
let changeStream; async function run() { try { await client.connect(); const database = client.db("insertDB"); const collection = database.collection("haikus");
// open a Change Stream on the "haikus" collection在“haikus”集合上打开更改流 changeStream = collection.watch();
// set up a listener when change events are emitted在发出更改事件时设置侦听器 changeStream.on("change", next => { // process any change event处理任何变更事件 console.log("received a change to the collection: \t", next); });
await simulateAsyncPause();
await collection.insertOne({ title: "Record of a Shriveled Datum", content: "No bytes, no problem. Just insert a document, in MongoDB", });
await simulateAsyncPause();
await changeStream.close(); console.log("closed the change stream"); } finally { await client.close(); } } run().catch(console.dir);
import { MongoClient } from "mongodb";

// Replace the uri string with your MongoDB deployment's connection string.用MongoDB部署的连接字符串替换uri字符串。 const uri = "<connection string uri>";
const client = new MongoClient(uri);
const simulateAsyncPause = () => new Promise(resolve => { setTimeout(() => resolve(), 1000); });
let changeStream; async function run() { try { await client.connect(); const database = client.db("insertDB"); const collection = database.collection("haikus");
// open a Change Stream on the "haikus" collection在“haikus”集合上打开更改流 changeStream = collection.watch();
// set up a listener when change events are emitted在发出更改事件时设置侦听器 changeStream.on("change", next => { // process any change event console.log("received a change to the collection: \t", next); });
await simulateAsyncPause();
await collection.insertOne({ title: "Record of a Shriveled Datum", content: "No bytes, no problem. Just insert a document, in MongoDB", });
await simulateAsyncPause();
await changeStream.close(); console.log("closed the change stream"); } finally { await client.close(); } } run().catch(console.dir);
Note
Identical Code Snippets相同的代码片段

The JavaScript and TypeScript code snippets above are identical. 上面的JavaScript和TypeScript代码片段是相同的。There are no TypeScript specific features of the driver relevant to this use case.驱动程序没有与此用例相关的特定于TypeScript的特性。

If you run the preceding example, you should see the following output:如果运行前面的示例,您应该会看到以下输出:

received a change to the collection:          {
  _id: { _data: '825EC...' },
  operationType: 'insert',
  clusterTime: new Timestamp { ... },
  fullDocument: { _id: new ObjectId(...), title: 'Record of a Shriveled Datum', content: 'No bytes, no problem. Just insert a document, in MongoDB' },
  ns: { db: 'insertDB', coll: 'haikus' },
  documentKey: { _id: new ObjectId(...) }
}
closed the change stream
Note
Receive Full Documents From Updates接收来自更新的完整文档

Change events that contain information on update operations only return the modified fields by default rather than the full updated document. 默认情况下,包含更新操作信息的更改事件只返回修改后的字段,而不是完整的更新文档。You can configure your change stream to also return the most current version of the document by setting the fullDocument field of the options object to "updateLookup" as follows:通过将options对象的fullDocument字段设置为"updateLookup",您可以将更改流配置为也返回文档的最新版本,如下所示:

const options = { fullDocument: "updateLookup" };
// This could be any pipeline.这可能是任何管道。
const pipeline = [];
const changeStream = collection.watch(pipeline, options);