Как работает постоянная подписка в магазине событий?

В настоящее время я работаю над проектом, в котором мы используем Event Store в качестве хранилища для записи наших событий. Добавление событий работает хорошо, проблема возникает, когда мы хотим прослушать эти события.

Для прослушивания событий хранилища событий мы используем официальный клиент Event Store Node.js npm i @eventstore/db-client.

Мы создали постоянную подписку в пользовательском интерфейсе администратора. Чтобы подключиться к этой подписке, мы используем eventStoreClient.connectToPersistentSubscription, и он подключается правильно.

Проблема в том, что наши события воспроизводятся для event. На самом деле они застревают в списке припаркованных сообщений.

Является ли воспроизведение событий нормальным поведением? Как хранилище событий может запоминать события, переданные клиенту Node.js?

Примечание. В настоящее время события воспроизводятся, и наша проекция построена правильно, но события воспроизводятся вечно.

Код, который мы используем для прослушивания событий

const stream = this.eventStoreClient.connectToPersistentSubscription<JSONEventType>('$ce-precontrol', 'my-group', {
            bufferSize: 10,
        }, {

        })

        for await (const e of stream) {
            const { event, commitPosition, link } = e
            // console.log(event, commitPosition, link)

            if (!event) {
                this.logger.warn(`The event with commit position ${commitPosition} is undefined.`, { event, commitPosition, link })
                return
            }

            // We should map the event instead of casting it.
            // Every field of an event which are instance of classes won't be correctly
            // unserialized, we need to re-instatiate those classes.
            const eventBody = event.data as Event
            console.log(`${eventBody.fpsProjectId}  -----  ${eventBody.kind}`)

            this.logger.info(`Dispatching an event ${eventBody.kind} ${eventBody.fpsProjectId}`, { event, commitPosition, link })

            try {
                await this.eventHandler.dispatch(eventBody)
                this.logger.info(`Event ${eventBody.kind} ${eventBody.fpsProjectId} dispatched successfully.`, { event, commitPosition, link })
                await stream.ack(event.id)
                // this.logger.info(`Event ${eventBody.fpsProjectId} ${eventBody.kind} was acknowledged successfully.`, { event, commitPosition, link })
            } catch (error) {
                await stream.nack(RETRY, error.message, event.id)
                this.logger.error(`Something went wrong while dispatching an event. Error message: ${error.message}`, { error })
            }
        }

Журналы нашего хранилища событий

{"@t":"2021-06-23T15:51:48.3692381Z","@mt":"SLOW BUS MSG [{bus}]: {message} - {elapsed}ms. Handler: {handler}.","@l":"Debug","bus":"PersistentSubscriptionsBus","message":"PersistentSubscriptionTimerTick","elapsed":1981,"handler":"PersistentSubscriptionService","SourceContext":"EventStore.Core.Bus.InMemoryBus","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:49.3743720Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:51:49.3746323Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:51:49.3746694Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:51:54.3993484Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:51:55.4072471Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:51:56.4114240Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:57.4147394Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:58.4250298Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:59.4330048Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:59.4333048Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:51:59.4335573Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:52:04.4664585Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:05.4775447Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:06.4824377Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:52:07.4876752Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:08.4934409Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:52:09.5068553Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:09.5069364Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:09.5069701Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:14.5376026Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:15.5472390Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:16.5521045Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:52:17.5623183Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":24}
{"@t":"2021-06-23T15:52:18.5650711Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:19.5781601Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:19.5782624Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:19.5782980Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:24.6146992Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":9}
{"@t":"2021-06-23T15:52:25.6217820Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:26.6296129Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":24}
{"@t":"2021-06-23T15:52:27.6418730Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:28.6438550Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:29.6514819Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:29.6515344Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:29.6515586Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:34.6962266Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:35.6984264Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:36.7013623Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:37.7038263Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:38.7111554Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:39.7205755Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:39.7206264Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:39.7206490Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:44.7380921Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:45.7465243Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:46.7523226Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:52:47.7535593Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:48.7656883Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":24}
{"@t":"2021-06-23T15:52:49.7718183Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:49.7718754Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:49.7718986Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:54.8063912Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:55.8160008Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:56.8185175Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:52:57.8277609Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":24}
{"@t":"2021-06-23T15:52:58.8294945Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:52:59.8368339Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:59.8369266Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:52:59.8369451Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:53:00.4461267Z","@mt":"SLOW BUS MSG [{bus}]: {message} - {elapsed}ms. Handler: {handler}.","@l":"Debug","bus":"PersistentSubscriptionsBus","message":"PersistentSubscriptionTimerTick","elapsed":609,"handler":"PersistentSubscriptionService","SourceContext":"EventStore.Core.Bus.InMemoryBus","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:53:05.4787174Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":24}
{"@t":"2021-06-23T15:53:06.4825158Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:53:07.4891830Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:53:08.4988531Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:53:09.5058824Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:53:10.5174611Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:53:10.5175070Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:53:10.5175221Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:53:15.5455199Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":40,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:53:16.5574829Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":41,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":47}
{"@t":"2021-06-23T15:53:17.5670153Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":42,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":49}
{"@t":"2021-06-23T15:53:18.5712367Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":43,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:53:19.5807126Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":44,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":4}
{"@t":"2021-06-23T15:53:20.5878979Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":45,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:53:20.5879796Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":46,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}
{"@t":"2021-06-23T15:53:20.5880027Z","@mt":"Retrying message {subscriptionId} {stream}/{eventNumber}","@l":"Debug","subscriptionId":"$ce-precontrol::my-group","stream":"$ce-precontrol","eventNumber":47,"SourceContext":"EventStore.Core.Services.PersistentSubscription.PersistentSubscription","ProcessId":1,"ThreadId":44}

Конфигурация постоянной подписки

введите здесь описание изображения

РЕДАКТИРОВАТЬ: мы изменили наш скрипт, чтобы подтвердить событие linkto вместо связанного события, поскольку $ce-precontrol — это поток linkto.

Наши события не повторяются навсегда.

const stream = this.eventStoreClient.connectToPersistentSubscription<JSONEventType>('$ce-precontrol', 'my-group', {
            bufferSize: 10,
        }, {

        })

for await (const e of stream) {
            const { event, commitPosition, link } = e
            // console.log(event, commitPosition, link)

            if (!event) {
                this.logger.warn(`The event with commit position ${commitPosition} is undefined.`, { event, commitPosition, link })
                return
            }

            // We should map the event instead of casting it.
            // Every field of an event which are instance of classes won't be correctly
            // unserialized, we need to re-instatiate those classes.
            const eventBody = event.data as Event
            console.log(`${eventBody.fpsProjectId}  -----  ${eventBody.kind}`)

            this.logger.info(`Dispatching an event ${eventBody.kind} ${eventBody.fpsProjectId}`, { event, commitPosition, link })

            try {
                await this.eventHandler.dispatch(eventBody)
                this.logger.info(`Event ${eventBody.kind} ${eventBody.fpsProjectId} dispatched successfully.`, { event, commitPosition, link })
                await stream.ack(link?.id || '')
                this.logger.info(`Event ${eventBody.fpsProjectId} ${eventBody.kind} was acknowledged successfully.`, { event, commitPosition, link })
            } catch (error) {
                await stream.nack(RETRY, error.message, link?.id || '')
                this.logger.error(`Something went wrong while dispatching an event. Error message: ${error.message}`, { error })
            }
        }


person botf    schedule 23.06.2021    source источник
comment
Кажется, что ваш диспетчер бросает, и подписка находится в бесконечном цикле повторных попыток.   -  person Alexey Zimarev    schedule 23.06.2021
comment
Спасибо за быстрый ответ :). Я помню, что проверял эту возможность, и отправка работала. Мне нужно проверить это завтра (парное программирование и мой коллега не зафиксировали) журналы моего приложения.   -  person botf    schedule 23.06.2021
comment
Привет, @AlexeyZimarev, диспетчер вроде не падает. Однако мы используем параметр Resolve Link Tos и подтверждаем не ссылку, а связанное событие. Поскольку мы подтверждали неправильное событие, у них истекал тайм-аут и повторялись попытки до тех пор, пока не было достигнуто максимальное количество повторных попыток.   -  person botf    schedule 24.06.2021
comment
Правильно, я упустил тот факт, что вы использовали поток категорий, а не разрешающие ссылки. Рад, что в итоге все получилось :)   -  person Alexey Zimarev    schedule 24.06.2021


Ответы (1)


Мои события были повторены навсегда. Я слушал поток, сделанный из ссылки на. Я настроил постоянную подписку для разрешения ссылки, поэтому, когда я обращался к event.id, я получал связанный идентификатор события.

Я использовал этот связанный идентификатор события для подтверждения события вместо вызова подтверждения со ссылкой на идентификатор события.

Поскольку мои события не были подтверждены, они истекли по тайм-ауту и ​​повторялись до тех пор, пока не было достигнуто максимальное количество повторных попыток.


for await (const e of stream) {
            const { event, commitPosition, link } = e
            // console.log(event, commitPosition, link)

            if (!event) {
                this.logger.warn(`The event with commit position ${commitPosition} is undefined.`, { event, commitPosition, link })
                return
            }

            // We should map the event instead of casting it.
            // Every field of an event which are instance of classes won't be correctly
            // unserialized, we need to re-instatiate those classes.
            const eventBody = event.data as Event
            console.log(`${eventBody.fpsProjectId}  -----  ${eventBody.kind}`)

            this.logger.info(`Dispatching an event ${eventBody.kind} ${eventBody.fpsProjectId}`, { event, commitPosition, link })

            try {
                await this.eventHandler.dispatch(eventBody)
                this.logger.info(`Event ${eventBody.kind} ${eventBody.fpsProjectId} dispatched successfully.`, { event, commitPosition, link })
                // Using the link to event id to ack
                await stream.ack(link?.id || '')
                this.logger.info(`Event ${eventBody.fpsProjectId} ${eventBody.kind} was acknowledged successfully.`, { event, commitPosition, link })
            } catch (error) {
                await stream.nack(RETRY, error.message, link?.id || '')
                this.logger.error(`Something went wrong while dispatching an event. Error message: ${error.message}`, { error })
            }
        }
person botf    schedule 24.06.2021