admin管理员组文章数量:814986
关于如何在一轮请求/响应中发布大量消息的任何建议?
如果我使用Promise.all
发布5万条消息,如下所示:
const pubsub = new PubSub({ projectId: PUBSUB_PROJECT_ID });
const topic = pubsub.topic(topicName, {
batching: {
maxMessages: 1000,
maxMilliseconds: 100,
},
});
const n = 50 * 1000;
const dataBufs: Buffer[] = [];
for (let i = 0; i < n; i++) {
const data = `message payload ${i}`;
const dataBuffer = Buffer.from(data);
dataBufs.push(dataBuffer);
}
const tasks = dataBufs.map((d, idx) =>
topic.publish(d).then((messageId) => {
console.log(`[${new Date().toISOString()}] Message ${messageId} published. index: ${idx}`);
})
);
// publish messages concurrencly
await Promise.all(tasks);
// send response to front-end
res.json(data);
我将解决此问题:pubsub-emulator throw error and publisher throw "Retry total timeout exceeded before any response was received" when publish 50k messages
如果我用于循环和async/await
。问题不见了。
const n = 50 * 1000;
for (let i = 0; i < n; i++) {
const data = `message payload ${i}`;
const dataBuffer = Buffer.from(data);
const messageId = await topic.publish(dataBuffer)
console.log(`[${new Date().toISOString()}] Message ${messageId} published. index: ${i}`)
}
// some logic ...
// send response to front-end
res.json(data);
但是由于async/await
,它将阻止后续逻辑的执行,直到发布所有消息为止。发布50k消息需要很长时间。
关于如何发布大量消息(约50k)而又不阻止后续逻辑执行的任何建议?我是否需要使用child_process
或类似bull的队列在后台发布大量消息,而不阻塞API的请求/响应工作流?这意味着我需要尽快响应前端,这50k消息应该是后台任务。
似乎@google/pubsub
库中有一个内存队列。我不确定是否应该再次使用另一个队列,例如bull。
[发布大量数据所需的时间取决于很多因素:
- 邮件大小。消息越大,发送消息所需的时间越长。
- 网络容量(无论发布商在何处运行,都与Google Cloud以及虚拟机本身(如果相关)之间的连接)。这为可传输的数据量设置了上限。看到限制在40MB / s范围内的较小虚拟机并非非常规。请注意,如果您通过Wifi进行测试,则限制可能甚至低于此限制。
- 线程数和CPU内核数。当必须运行大量异步回调时,安排它们运行的能力可能会受到计算机或运行时环境的并行容量的限制。
通常,尝试同时从一个发布者实例发送50,000个发布是不好的。以上因素很可能导致客户超负荷工作,并导致超过期限的错误。防止这种情况的最佳方法是限制一次可以发布的未完成消息的数量。一些库,例如Java support this natively。 Node.js库尚不支持此功能,但将来可能会支持。
同时,您希望保留未处理邮件数的计数器,并将其限制为客户端似乎能够处理的任何数量。从1000开始,然后根据结果从那里向上或向下进行计算。 semaphore是实现此行为的相当标准的方法。在您的情况下,代码如下所示:
var sem = require('semaphore')(1000);
var publishes = []
const tasks = dataBufs.map((d, idx) =>
sem.take(function() => {
publishes.push(topic.publish(d).then((messageId) => {
console.log(`[${new Date().toISOString()}] Message ${messageId} published. index: ${idx}`);
sem.leave();
}));
})
);
// Await the start of publishing all messages
await Promise.all(tasks);
// Await the actual publishes
await Promise.all(publishes);
本文标签: 关于如何在一轮请求响应中发布大量消息的任何建议
版权声明:本文标题:关于如何在一轮请求响应中发布大量消息的任何建议? 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/biancheng/1715172166a821921.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论