В настоящее время я работаю над проектом, в котором мы используем Event Store в качестве хранилища для записи наших событий. Добавление событий работает хорошо, проблема возникает, когда мы хотим прослушать эти события.
Для прослушивания событий хранилища событий мы используем официальный клиент Event Store Node.js npm i @eventstore/db-client
.
Мы создали постоянную подписку в пользовательском интерфейсе администратора. Чтобы подключиться к этой подписке, мы используем eventStoreClient.connectToPersistentSubscription
, и он подключается правильно.
Проблема в том, что наши события воспроизводятся для event. На самом деле они застревают в списке припаркованных сообщений.
Является ли воспроизведение событий нормальным поведением? Как хранилище событий может запоминать события, переданные клиенту Node.js?
Примечание. В настоящее время события воспроизводятся, и наша проекция построена правильно, но события воспроизводятся вечно.
Код, который мы используем для прослушивания событий
const stream = this.eventStoreClient.connectToPersistentSubscription<JSONEventType>('$ce-precontrol', 'my-group', {
bufferSize: 10,
}, {
})
for await (const e of stream) {
const { event, commitPosition, link } = e
// console.log(event, commitPosition, link)
if (!event) {
this.logger.warn(`The event with commit position ${commitPosition} is undefined.`, { event, commitPosition, link })
return
}
// We should map the event instead of casting it.
// Every field of an event which are instance of classes won't be correctly
// unserialized, we need to re-instatiate those classes.
const eventBody = event.data as Event
console.log(`${eventBody.fpsProjectId} ----- ${eventBody.kind}`)
this.logger.info(`Dispatching an event ${eventBody.kind} ${eventBody.fpsProjectId}`, { event, commitPosition, link })
try {
await this.eventHandler.dispatch(eventBody)
this.logger.info(`Event ${eventBody.kind} ${eventBody.fpsProjectId} dispatched successfully.`, { event, commitPosition, link })
await stream.ack(event.id)
// this.logger.info(`Event ${eventBody.fpsProjectId} ${eventBody.kind} was acknowledged successfully.`, { event, commitPosition, link })
} catch (error) {
await stream.nack(RETRY, error.message, event.id)
this.logger.error(`Something went wrong while dispatching an event. Error message: ${error.message}`, { error })
}
}
Журналы нашего хранилища событий
{"@t":"2021-06-23T15:51:48.3692381Z","@mt":"SLOW BUS MSG [{bus}]: {message} - {elapsed}ms. Handler: {handler}.","@l":"Debug","bus":"PersistentSubscriptionsBus","message":"PersistentSubscriptionTimerTick","elapsed":1981,"handler":"PersistentSubscriptionService","SourceContext":"EventStore.Core.Bus.InMemoryBus","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:49.3743720Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:51:49.3746323Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:51:49.3746694Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:51:54.3993484Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:51:55.4072471Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:51:56.4114240Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:57.4147394Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:58.4250298Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:59.4330048Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:59.4333048Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:59.4335573Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:52:04.4664585Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:05.4775447Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:06.4824377Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:52:07.4876752Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:08.4934409Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:52:09.5068553Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:09.5069364Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:09.5069701Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:14.5376026Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:15.5472390Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:16.5521045Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:52:17.5623183Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":24}
{"@t":"2021-06-23T15:52:18.5650711Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:19.5781601Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:19.5782624Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:19.5782980Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:24.6146992Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:52:25.6217820Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:26.6296129Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":24}
{"@t":"2021-06-23T15:52:27.6418730Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:28.6438550Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:29.6514819Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:29.6515344Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:29.6515586Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:34.6962266Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:35.6984264Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:36.7013623Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:37.7038263Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:38.7111554Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:39.7205755Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:39.7206264Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:39.7206490Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:44.7380921Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:45.7465243Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:46.7523226Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:47.7535593Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:48.7656883Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":24}
{"@t":"2021-06-23T15:52:49.7718183Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:49.7718754Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:49.7718986Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:54.8063912Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:55.8160008Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:56.8185175Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:57.8277609Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":24}
{"@t":"2021-06-23T15:52:58.8294945Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:59.8368339Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:59.8369266Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:59.8369451Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:53:00.4461267Z","@mt":"SLOW BUS MSG [{bus}]: {message} - {elapsed}ms. Handler: {handler}.","@l":"Debug","bus":"PersistentSubscriptionsBus","message":"PersistentSubscriptionTimerTick","elapsed":609,"handler":"PersistentSubscriptionService","SourceContext":"EventStore.Core.Bus.InMemoryBus","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:53:05.4787174Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":24}
{"@t":"2021-06-23T15:53:06.4825158Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:53:07.4891830Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:53:08.4988531Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:53:09.5058824Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:53:10.5174611Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:53:10.5175070Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:53:10.5175221Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:53:15.5455199Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:53:16.5574829Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:53:17.5670153Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:53:18.5712367Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:53:19.5807126Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:53:20.5878979Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:53:20.5879796Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:53:20.5880027Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
Конфигурация постоянной подписки
РЕДАКТИРОВАТЬ: мы изменили наш скрипт, чтобы подтвердить событие linkto вместо связанного события, поскольку $ce-precontrol — это поток linkto.
Наши события не повторяются навсегда.
const stream = this.eventStoreClient.connectToPersistentSubscription<JSONEventType>('$ce-precontrol', 'my-group', {
bufferSize: 10,
}, {
})
for await (const e of stream) {
const { event, commitPosition, link } = e
// console.log(event, commitPosition, link)
if (!event) {
this.logger.warn(`The event with commit position ${commitPosition} is undefined.`, { event, commitPosition, link })
return
}
// We should map the event instead of casting it.
// Every field of an event which are instance of classes won't be correctly
// unserialized, we need to re-instatiate those classes.
const eventBody = event.data as Event
console.log(`${eventBody.fpsProjectId} ----- ${eventBody.kind}`)
this.logger.info(`Dispatching an event ${eventBody.kind} ${eventBody.fpsProjectId}`, { event, commitPosition, link })
try {
await this.eventHandler.dispatch(eventBody)
this.logger.info(`Event ${eventBody.kind} ${eventBody.fpsProjectId} dispatched successfully.`, { event, commitPosition, link })
await stream.ack(link?.id || '')
this.logger.info(`Event ${eventBody.fpsProjectId} ${eventBody.kind} was acknowledged successfully.`, { event, commitPosition, link })
} catch (error) {
await stream.nack(RETRY, error.message, link?.id || '')
this.logger.error(`Something went wrong while dispatching an event. Error message: ${error.message}`, { error })
}
}