Change Streams变更流
Change streams变更流 let you listen for updates to documents in a given model's collection, or even documents in an entire database. 允许您监听给定模型集合中的文档,甚至整个数据库中的文档的更新。Unlike middleware, change streams are a MongoDB server construct, which means they pick up changes from anywhere. 与中间件不同,变更流是MongoDB服务器结构,这意味着它们可以从任何地方获取变更。Even if you update a document from a MongoDB GUI, your Mongoose change stream will be notified.即使您从MongoDB GUI更新文档,也会通知您的Mongoose变更流。
The watch()
function creates a change stream. watch()
函数创建一个变更流。Change streams emit a 文档更新时,变更流会发出'data'
event when a document is updated.'data'
事件。
const Person = mongoose.model('Person', new mongoose.Schema({ name: String }));
// Create a change stream. The 'change' event gets emitted when there's a change in the database. Print what the change stream emits.创建变更流。当数据库发生更改时,会发出“change”事件。打印变更流发出的内容。
Person.watch().
on('change', data => console.log(data));
// Insert a doc, will trigger the change stream handler above插入一个文档,将触发上面的变更流处理程序
await Person.create({ name: 'Axl Rose' });
The above script will print output that looks like:上面的脚本将打印如下输出:
{
_id: {
_data: '8262408DAC000000012B022C0100296E5A10042890851837DB4792BE6B235E8B85489F46645F6964006462408DAC6F5C42FF5EE087A20004'
},
operationType: 'insert',
clusterTime: new Timestamp({ t: 1648397740, i: 1 }),
fullDocument: {
_id: new ObjectId("62408dac6f5c42ff5ee087a2"),
name: 'Axl Rose',
__v: 0
},
ns: { db: 'test', coll: 'people' },
documentKey: { _id: new ObjectId("62408dac6f5c42ff5ee087a2") }
}
Note that you must be connected to a MongoDB replica set or sharded cluster to use change streams. 请注意,您必须连接到MongoDB副本集或分片集群才能使用变更流。If you try to call 如果您尝试在连接到独立的MongoDB服务器时调用watch()
when connected to a standalone MongoDB server, you'll get the below error.watch()
,您将得到以下错误。
MongoServerError: The $changeStream stage is only supported on replica sets
If you're using 如果您在生产中使用watch()
in production, we recommend using MongoDB Atlas. watch()
,我们建议您使用MongoDB Atlas。For local development, we recommend mongodb-memory-server or run-rs to start a replica set locally.对于本地开发,我们建议使用mongodb内存服务器或run-rs在本地启动副本集。
Iterating using next()
使用next()
进行迭代
next()
If you want to iterate through a change stream in a AWS Lambda function, do not use event emitters to listen to the change stream. 如果您想在AWS Lambda函数中迭代变更流,请不要使用事件发射器来侦听变更流。You need to make sure you close your change stream when your Lambda function is done executing, because your change stream may end up in an inconsistent state if Lambda stops your container while the change stream is pulling data from MongoDB.您需要确保在Lambda函数执行完毕后关闭变更流,因为如果Lambda在变更流从MongoDB提取数据时停止容器,则变更流可能会处于不一致的状态。
Change streams also have a 变更流还有一个next()
function that lets you explicitly wait for the next change to come in. next()
函数,可以让您明确地等待下一个变更的到来。Use 使用resumeAfter
to track where the last change stream left off, and add a timeout to make sure your handler doesn't wait forever if no changes come in.resumeAfter
跟踪最后一个变更流停止的位置,并添加一个超时,以确保在没有更改的情况下处理程序不会永远等待。
let resumeAfter = undefined;
exports.handler = async(event, context) => {
// add this so that we can re-use any static/global variables between function calls if Lambda happens to re-use existing containers for the invocation.添加这一点,这样,如果Lambda碰巧重用现有容器进行调用,我们就可以在函数调用之间重用任何静态/全局变量。
context.callbackWaitsForEmptyEventLoop = false;
await connectToDatabase();
const changeStream = await Country.watch([], { resumeAfter });
// Change stream `next()` will wait forever if there are no changes. 如果没有更改,变更流`next()`将永远等待。So make sure to stop listening to the change stream after a fixed period of time.因此,请确保在一段固定的时间后停止收听变更流。
const timeoutPromise = new Promise(resolve => setTimeout(() => resolve(false), 1000));
let doc = null;
while (doc = await Promise.race([changeStream.next(), timeoutPromise])) {
console.log('Got', doc);
}
// `resumeToken` tells you where the change stream left off, so next function instance can pick up any changes that happened in the meantime.告诉变更流停止的位置,以便下一个函数实例可以拾取在此期间发生的任何更改。
resumeAfter = changeStream.resumeToken;
await changeStream.close();
};