У меня настроен EventHub в Azure, а также группа потребителей для чтения данных. Несколько дней работал нормально. Внезапно я вижу задержку входящих данных (около 3 дней). Я использую службу Windows для использования данных на моем сервере. У меня около 500 входящих сообщений в минуту. Может ли кто-нибудь помочь мне понять это?
Получение данных из EventHub задерживается
Ответы (1)
Возможно, вы обрабатываете их элементы слишком медленно. Поэтому работа, которую нужно сделать, растет, и вы будете отставать.
Чтобы получить некоторое представление о том, где вы находитесь в потоке событий, вы можете использовать такой код:
private void LogProgressRecord(PartitionContext context)
{
if (namespaceManager == null)
return;
var currentSeqNo = context.Lease.SequenceNumber;
var lastSeqNo = namespaceManager.GetEventHubPartition(context.EventHubPath, context.ConsumerGroupName, context.Lease.PartitionId).EndSequenceNumber;
var delta = lastSeqNo - currentSeqNo;
logWriter.Write(
$"Last processed seqnr for partition {context.Lease.PartitionId}: {currentSeqNo} of {lastSeqNo} in consumergroup '{context.ConsumerGroupName}' (lag: {delta})",
EventLevel.Informational);
}
namespaceManager строится следующим образом:
namespaceManager = NamespaceManager.CreateFromConnectionString("Endpoint=sb://xxx.servicebus.windows.net/;SharedAccessKeyName=yyy;SharedAccessKey=zzz");
Я вызываю этот метод регистрации в методе CloseAsync
:
public Task CloseAsync(PartitionContext context, CloseReason reason)
{
LogProgressRecord(context);
return Task.CompletedTask;
}
logWriter
— это просто класс ведения журнала, который я использовал для записи информации в хранилище BLOB-объектов.
Теперь он выводит сообщения типа
Последний обработанный порядковый номер для раздела 3: 32780931 из 32823804 в потребительской группе «телеметрия» (отставание: 42873)
поэтому, когда задержка очень велика, вы можете обрабатывать события, которые произошли давно. В этом случае вам необходимо увеличить/уменьшить масштаб вашего процессора.
Если вы заметили задержку, вы должны измерить, сколько времени требуется для обработки заданного количества элементов. Затем вы можете попытаться оптимизировать производительность и посмотреть, улучшится ли она. Мы сделали это так:
public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> events)
{
try
{
stopwatch.Restart();
// process items here
stopwatch.Stop();
await CheckPointAsync(context);
logWriter.Write(
$"Processed {events.Count()} events in {stopwatch.ElapsedMilliseconds}ms using partition {context.Lease.PartitionId} in consumergroup {context.ConsumerGroupName}.",
EventLevel.Informational);
}
}