Использование реактивов для объединения разрозненных сообщений

Итак, я пытаюсь использовать реактивы для перекомпоновки сообщений, идентифицированных по идентификатору, и у меня возникла проблема с прекращением окончательного наблюдаемого. У меня есть класс сообщения, который состоит из идентификатора, общего размера, полезной нагрузки, номера фрагмента и типа и имеет следующий код на стороне клиента:

Мне нужно рассчитать количество сообщений, которые нужно принять во время выполнения

(from messages in
   (from messageArgs in Receive select Serializer.Deserialize<Message>(new MemoryStream(Encoding.UTF8.GetBytes(messageArgs.Message))))
 group messages by messages.Id into grouped select grouped)
.Subscribe(g =>
{
    var cache = new List<Message>();
    g.TakeWhile((int) Math.Ceiling(MaxPayload/g.First().Size) < cache.Count)
      .Subscribe(cache.Add, 
    _ => { /* Rebuild Message Parts From Cache */ });
});

Сначала я создаю сгруппированные наблюдаемые фильтрующие сообщения по их уникальному идентификатору, а затем я пытаюсь кэшировать все сообщения в каждой группе, пока не соберу их все, затем я сортирую их и объединяю. Вышеприведенное, похоже, блокирует g.First().

Мне нужен способ рассчитать число, которое нужно взять из первого (или любого) сообщения, которое приходит, однако у меня возникают трудности с этим. Любая помощь?


person aslate    schedule 11.02.2011    source источник


Ответы (1)


First — блокирующий оператор (а как еще он может вернуть T, а не IObservable<T>?)

Я думаю, что использование Scan (который со временем создает агрегат) может быть тем, что вам нужно. Используя Scan, вы можете скрыть "состояние" реконструируемого сообщения в объекте "строитель".

MessageBuilder.IsComplete возвращает значение true, когда размер всех полученных им сообщений достигает MaxPayload (или любых других требований). Затем MessageBuilder.Build() возвращает восстановленное сообщение.

Я также переместил ваш код "создания сообщений" в SelectMany, который хранит созданные сообщения в монаде.

(Извините за переформатирование кода в методы расширения, мне трудно читать/писать смешанный синтаксис LINQ)

Receive
    .Select(messageArgs => Serializer.Deserialize<Message>(
        new MemoryStream(Encoding.UTF8.GetBytes(messageArgs.Message))))
    .GroupBy(message => message.Id)
    .SelectMany(group =>
    {
        // Use the builder to "add" message parts to
        return group.Scan(new MessageBuilder(), (builder, messagePart) =>
        {
            builder.AddPart(messagePart);

            return builder;
        })
        .SkipWhile(builder => !builder.IsComplete)
        .Select(builder => builder.Build());
    })
    .Subscribe(OnMessageReceived);
person Richard Szalay    schedule 11.02.2011
comment
Классное решение, всегда забываю про GroupBy - person Ana Betts; 12.02.2011
comment
Это действительно очень классное решение, мне, очевидно, нужно изучить широкий спектр возможностей, доступных в LINQ. Большое спасибо - person aslate; 14.02.2011