Итак, я пытаюсь использовать реактивы для перекомпоновки сообщений, идентифицированных по идентификатору, и у меня возникла проблема с прекращением окончательного наблюдаемого. У меня есть класс сообщения, который состоит из идентификатора, общего размера, полезной нагрузки, номера фрагмента и типа и имеет следующий код на стороне клиента:
Мне нужно рассчитать количество сообщений, которые нужно принять во время выполнения
(from messages in
(from messageArgs in Receive select Serializer.Deserialize<Message>(new MemoryStream(Encoding.UTF8.GetBytes(messageArgs.Message))))
group messages by messages.Id into grouped select grouped)
.Subscribe(g =>
{
var cache = new List<Message>();
g.TakeWhile((int) Math.Ceiling(MaxPayload/g.First().Size) < cache.Count)
.Subscribe(cache.Add,
_ => { /* Rebuild Message Parts From Cache */ });
});
Сначала я создаю сгруппированные наблюдаемые фильтрующие сообщения по их уникальному идентификатору, а затем я пытаюсь кэшировать все сообщения в каждой группе, пока не соберу их все, затем я сортирую их и объединяю. Вышеприведенное, похоже, блокирует g.First().
Мне нужен способ рассчитать число, которое нужно взять из первого (или любого) сообщения, которое приходит, однако у меня возникают трудности с этим. Любая помощь?