Получение данных из EventHub задерживается

У меня настроен EventHub в Azure, а также группа потребителей для чтения данных. Несколько дней работал нормально. Внезапно я вижу задержку входящих данных (около 3 дней). Я использую службу Windows для использования данных на моем сервере. У меня около 500 входящих сообщений в минуту. Может ли кто-нибудь помочь мне понять это?


person vishnu    schedule 28.11.2016    source источник
comment
Как вы читаете данные из eventhub? Используете ли вы экземпляр IEventProcessor?   -  person Peter Bons    schedule 28.11.2016
comment
@PeterBons Да, Питер, я использую экземпляр IEventProcessor.   -  person vishnu    schedule 28.11.2016


Ответы (1)


Возможно, вы обрабатываете их элементы слишком медленно. Поэтому работа, которую нужно сделать, растет, и вы будете отставать.

Чтобы получить некоторое представление о том, где вы находитесь в потоке событий, вы можете использовать такой код:

private void LogProgressRecord(PartitionContext context)
{
    if (namespaceManager == null)
        return;

    var currentSeqNo = context.Lease.SequenceNumber;
    var lastSeqNo = namespaceManager.GetEventHubPartition(context.EventHubPath, context.ConsumerGroupName, context.Lease.PartitionId).EndSequenceNumber;
    var delta = lastSeqNo - currentSeqNo;

    logWriter.Write(
            $"Last processed seqnr for partition {context.Lease.PartitionId}: {currentSeqNo} of {lastSeqNo} in consumergroup '{context.ConsumerGroupName}' (lag: {delta})",
            EventLevel.Informational);
}

namespaceManager строится следующим образом:

namespaceManager = NamespaceManager.CreateFromConnectionString("Endpoint=sb://xxx.servicebus.windows.net/;SharedAccessKeyName=yyy;SharedAccessKey=zzz");

Я вызываю этот метод регистрации в методе CloseAsync:

public Task CloseAsync(PartitionContext context, CloseReason reason)
{
    LogProgressRecord(context);

    return Task.CompletedTask;
}

logWriter — это просто класс ведения журнала, который я использовал для записи информации в хранилище BLOB-объектов.

Теперь он выводит сообщения типа

Последний обработанный порядковый номер для раздела 3: 32780931 из 32823804 в потребительской группе «телеметрия» (отставание: 42873)

поэтому, когда задержка очень велика, вы можете обрабатывать события, которые произошли давно. В этом случае вам необходимо увеличить/уменьшить масштаб вашего процессора.

Если вы заметили задержку, вы должны измерить, сколько времени требуется для обработки заданного количества элементов. Затем вы можете попытаться оптимизировать производительность и посмотреть, улучшится ли она. Мы сделали это так:

public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> events)
{
        try
        {
            stopwatch.Restart();

            // process items here

            stopwatch.Stop();

            await CheckPointAsync(context);

            logWriter.Write(
                $"Processed {events.Count()} events in {stopwatch.ElapsedMilliseconds}ms using partition {context.Lease.PartitionId} in consumergroup {context.ConsumerGroupName}.",
                EventLevel.Informational);
        }
}
person Peter Bons    schedule 28.11.2016
comment
Спасибо, Питер, за ваше драгоценное время. У меня нет дорогостоящей операции в процессоре данных. Я просто вставляю входящую запись в плоскую таблицу с помощью EF. Я только что проверил отставание, и оно составляет более 100000 для каждого раздела (есть 4 раздела). Можно ли запустить несколько экземпляров моей службы Windows и компенсировать отставание? - person vishnu; 28.11.2016
comment
Да, но обратите внимание, что в зависимости от базы данных может случиться так, что EF/БД просто не справится с нагрузкой. 500 сообщений в секунду это не так уж и много. Вы должны измерять время своих операций. Смотрите обновленный ответ. - person Peter Bons; 28.11.2016
comment
Да, может быть. Но у меня пакетное обновление 25. Может 25 маловато для него, проверю. Между тем, есть ли какие-либо ограничения на количество активных слушателей для группы потребителей? Потому что я также планировал сделать еще несколько экземпляров. В VS возникает фатальная ошибка, если я пытаюсь запустить уже работающую группу потребителей в течение нескольких минут. - person vishnu; 28.11.2016
comment
Возможно, массовая копия sql для 25 записей быстрее, чем использование EF. См. stackoverflow.com/questions/682015/ - person Peter Bons; 28.11.2016
comment
Это зависит от количества разделов. Один процесс обработки на раздел является оптимальным. К сожалению, вы не можете увеличить количество разделов после создания концентратора событий. - person Peter Bons; 28.11.2016
comment
Так что, наверное, лучше создать 4 экземпляра ради... правильно? - person vishnu; 28.11.2016
comment
Да, хотя я действительно рекомендую измерить и посмотреть, что лучше. Если EF является самым медленным фактором, вам может не понадобиться 4 экземпляра, если вы перейдете на необработанный sql или массовое копирование sql. - person Peter Bons; 28.11.2016
comment
Хорошо, я сначала попробую SqlBulkCopy. Если он не работает, он перейдет к нескольким экземплярам. Спасибо Питер.. :) - person vishnu; 28.11.2016