Есть ли способ сохранить коллекцию сообщений размером до 1 МБ и записать результат в файл JSON/CSV?

У меня есть блокирующая очередь, которая продолжает получать сообщения через какое-то приложение, теперь в приложении asp.net я попытался использовать очередь и записать вывод в файл CSV/JSON.

Здесь я хочу хранить сообщения размером до 1 МБ, которые поступают из очереди блокировки, а затем записывают их, теперь снова удерживают данные для 1 МБ и снова записывают... и так далее.

В приведенном ниже коде я использую буфер system.reactive и могу хранить количество наблюдаемых и записывать в JSON, но есть ли способ изменить размер наблюдаемых?

class Program
{
    private static readonly BlockingCollection<Message> MessagesBlockingCollection = new BlockingCollection<Message>();

    private static void Producer()
    {
        int ctr = 1;
        while (ctr <= 11)
        {
            MessagesBlockingCollection.Add(new Message { Id = ctr, Name = $"Name-{ctr}" });
            Thread.Sleep(1000);
            ctr++;
        }
    }

    private static void Consumer()
    {
        var observable = MessagesBlockingCollection.GetConsumingEnumerable().ToObservable();

        var bufferedNumberStream = observable.BufferWithThrottle(TimeSpan.FromSeconds(60), 5)
                                    .Subscribe(ts =>
                                    {
                                        WriteToFile(ts.ToList());
                                    });
    }

    private static void WriteToFile(List<Message> listToWrite)
    {
        using (StreamWriter outFile = System.IO.File.CreateText(Path.Combine(@"C:\TEMP", $"{DateTime.Now.ToString("yyyyMMddHHmmssfff")}.json")))
        {
            outFile.Write(JsonConvert.SerializeObject(listToWrite));
        }
    }

    static void Main(string[] args)
    {
        var producer = Task.Factory.StartNew(() => Producer());
        var consumer = Task.Factory.StartNew(() => Consumer());
        Console.Read();
    }
}

Реактивный метод расширения,

public static IObservable<IList<TSource>> BufferWithThrottle<TSource>(this IObservable<TSource> source,
                                                                            TimeSpan threshold, int noOfStream)
    {
        return Observable.Create<IList<TSource>>((obs) =>
        {
            return source.GroupByUntil(_ => true,
                                       g => g.Throttle(threshold).Select(_ => Unit.Default)
                                             .Merge(g.Buffer(noOfStream).Select(_ => Unit.Default)))
                         .SelectMany(i => i.ToList())
                         .Subscribe(obs);
        });
    }

Класс сообщения,

public class Message
{
    public int Id { get; set; }
    public string Name { get; set; }
}

person user584018    schedule 08.01.2019    source источник
comment
Почему вы хотите буферизовать данные в Rx? Если вам необходимо буферизовать запись, можете ли вы буферизовать с помощью Буферизованныйпоток?   -  person Aron    schedule 09.01.2019
comment
@Aron, я пробовал кое-что, используя 'Rx Buffer, но смог сделать ТОЛЬКО с количеством наблюдаемых, а не с размером, пожалуйста, посмотрите этот вопрос, stackoverflow.com/questions/54104055/< /а>   -  person user584018    schedule 09.01.2019
comment
Вопрос отредактирован   -  person user584018    schedule 09.01.2019
comment
Я не говорил использовать буфер Rx. Я сказал, что вы используете BufferedStream. Вы просто обертываете свой FileStream с помощью BufferedStream. Сообщение не имеет размера. Только сериализованная форма...   -  person Aron    schedule 09.01.2019
comment
Хорошо, значит, мы не можем определить наблюдаемый размер сообщения? Нет Rx способа?   -  person user584018    schedule 09.01.2019
comment
Я имею в виду, что это бессмысленный вопрос в .net (ничего общего с Rx). Вы используете управляемую память. В качестве альтернативы есть 101 различных ответов на размер сообщения.   -  person Aron    schedule 09.01.2019
comment
@Enigmativity, пожалуйста, снова откройте этот вопрос Sharp" title="буфер с размером потока вместо реактивного расширения количества потоков c Sharp">stackoverflow.com/questions/54104055/, я удалю этот. Спасибо!   -  person user584018    schedule 09.01.2019