MailboxProcessor, работающий по логике LIFO.

Я изучаю агенты F# (MailboxProcessor).

Я имею дело с довольно нестандартной проблемой.

  • У меня есть один агент (dataSource), который является источником потоковых данных. Данные должны быть обработаны массивом агентов (dataProcessor). Мы можем рассматривать dataProcessor как своего рода устройство слежения.
  • Данные могут поступать быстрее, чем скорость, с которой dataProcessor может обрабатывать ввод.
  • Это нормально иметь некоторую задержку. Тем не менее, я должен убедиться, что агент остается на вершине своей работы и не попадает под устаревшие наблюдения.

Я изучаю способы решения этой проблемы.

Первой идеей является реализация стека ( ЛИФО) в dataSource. dataSource отправит последнее доступное наблюдение, когда dataProcessor станет доступным для получения и обработки данных. Это решение может работать, но может оказаться сложным, так как dataProcessor может потребоваться заблокировать и повторно активировать; и сообщить о своем статусе dataSource, что приводит к проблемам с двусторонней связью. Эта проблема может сводиться к blocking queue в проблема потребитель-производитель, но я не уверен..

Вторая идея заключается в том, чтобы dataProcessor позаботился о сортировке сообщений. В этой архитектуре dataSource будет просто публиковать обновления в очереди dataProcessor. dataProcessor будет использовать Scan для получения последних данных, доступных в его очереди. Это может быть путь. Однако я не уверен, что в текущем дизайне MailboxProcessor можно очистить очередь сообщений, удалив старые устаревшие. Кроме того, здесь написано, что:

К сожалению, функция TryScan в текущей версии F# не работает по двум причинам. Во-первых, все дело в том, чтобы указать тайм-аут, но реализация фактически не соблюдает его. В частности, нерелевантные сообщения сбрасывают таймер. Во-вторых, как и в случае с другой функцией сканирования, очередь сообщений проверяется под замком, который предотвращает отправку сообщений любым другим потокам на время сканирования, которое может быть сколь угодно большим. Следовательно, сама функция TryScan имеет тенденцию блокировать параллельные системы и даже может создавать взаимоблокировки, поскольку код вызывающего объекта оценивается внутри блокировки (например, отправка аргумента функции в Scan или TryScan может привести к блокировке агента, когда код под блокировкой блокирует ожидание). приобрести замок, под которым он уже находится).

Получение последнего наблюдения может быть проблемой. Автор этого поста, @Jon Harrop, предполагает, что

Мне удалось спроектировать вокруг него, и получившаяся архитектура была на самом деле лучше. По сути, я с нетерпением Receive все сообщения и фильтрую, используя свою собственную локальную очередь.

Эту идею, безусловно, стоит изучить, но прежде чем приступить к экспериментам с кодом, я хотел бы получить некоторые советы о том, как я могу структурировать свое решение.

Спасибо.


person NoIdeaHowToFixThis    schedule 29.01.2014    source источник
comment
FWIW, я только что проверил ошибку тайм-аута TryScan в F # 3.1.1, и она была исправлена.   -  person J D    schedule 04.02.2014


Ответы (3)


tl;dr Я бы попробовал следующее: взять реализацию почтового ящика из FSharp.Actor или записи в блоге Зака ​​Брея, заменить ConcurrentQueue на ConcurrentStack (плюс добавить некоторую логику с ограниченной емкостью) и использовать этот измененный агент в качестве диспетчера для передачи сообщения от источника данных к армии процессоров данных, реализованных как обычные MBP или акторы.

tl;dr2 Если воркеры — это дефицитный и медленный ресурс и нам нужно обработать сообщение, которое является последним на момент готовности воркера, то вместо этого все сводится к агенту со стеком очереди (с некоторой логикой ограниченной емкости) плюс BlockingQueue рабочих процессов. Диспетчер удаляет готовый рабочий процесс из очереди, затем извлекает сообщение из стека и отправляет это сообщение рабочему процессу. После того, как задание выполнено, рабочий ставит себя в очередь, когда становится готовым (например, до let! msg = inbox.Receive()). Поток-потребитель-диспетчер затем блокируется до тех пор, пока какой-либо рабочий процесс не будет готов, в то время как поток-производитель обновляет ограниченный стек. (ограниченный стек можно сделать с помощью массива + смещения + размера внутри блокировки, ниже слишком сложно)

Подробности

MailBoxProcessor предназначен только для одного потребителя. Это даже прокомментировано в исходном коде MBP здесь (ищите слово "ДРАКОНЫ" :) )

Если вы отправляете свои данные в MBP, то только один поток может взять их из внутренней очереди или стека. В вашем конкретном случае я бы использовал ConcurrentStack напрямую или лучше завернутый в BlockingCollection:

  • Это позволит многим одновременным потребителям
  • Это очень быстро и безопасно для потоков
  • BlockingCollection имеет свойство BoundedCapacity, позволяющее ограничить размер коллекции. Он выдает Add, но вы можете поймать его или использовать TryAdd. Если A является основным стеком, а B — резервным, то TryAdd к A, при ошибке Add к B и поменяйте местами два с помощью Interlocked.Exchange, затем обработайте необходимые сообщения в A, очистите его, создайте новый резерв или используйте три стека, если обработка A может занять больше времени. B может снова стать полным; таким образом вы не блокируете и не теряете никаких сообщений, но можете отбрасывать ненужные контролируемым способом.

BlockingCollection имеет такие методы, как AddToAny/TakeFromAny, которые работают с массивами BlockingCollection. Это может помочь, например:

  • dataSource создает сообщения для BlockingCollection с реализацией ConcurrentStack (BCCS).
  • другой поток получает сообщения от BCCS и отправляет их в массив обрабатывающих BCCS. Вы сказали, что данных много. Вы можете пожертвовать одним потоком, чтобы блокировать и отправлять ваши сообщения на неопределенный срок.
  • каждый агент обработки имеет свой собственный BCCS или реализован как агент/исполнитель/MBP, которому диспетчер отправляет сообщения. В вашем случае вам нужно отправить сообщение только одному процессорному агенту, поэтому вы можете хранить агенты обработки в циклическом буфере, чтобы всегда отправлять сообщение наименее недавно используемому процессору.

Что-то вроде этого:

            (data stream produces 'T)
                |
            [dispatcher's BCSC]
                |
            (a dispatcher thread consumes 'T  and pushes to processors, manages capacity of BCCS and LRU queue)
                 |                               |
            [processor1's BCCS/Actor/MBP] ... [processorN's BCCS/Actor/MBP]
                 |                               |
               (process)                         (process)

Вместо ConcurrentStack вы можете прочитать о куче. структура данных. Если вам нужны ваши последние сообщения по какому-либо свойству сообщений, например. отметке времени, а не по порядку, в котором они поступают в стек (например, если могут быть задержки в пути и порядке прибытия ‹> порядок создания), вы можете получить последнее сообщение с помощью кучи.

Если вам все еще нужна семантика/API агентов, вы можете прочитать несколько источников в дополнение к ссылкам Дейва и каким-то образом адаптировать реализацию для нескольких одновременных потребителей:

  • интересная статья Зака ​​Брея об эффективной реализации Актеров. Там вам нужно заменить (под комментарием // Might want to schedule this call on another thread.) строку execute true на строку async { execute true } |> Async.Start или аналогичную, потому что в противном случае производящий поток будет потреблять поток - не очень хорошо для одного быстрого производителя. Однако для описанного выше диспетчера это как раз то, что нужно.

  • FSharp.Actor (он же Fakka) разработка ветка и исходный код FSharp MPB (первая ссылка выше) здесь могут быть очень полезны для деталей реализации. Библиотека FSharp.Actors была заморожена в течение нескольких месяцев, но в ветке разработки есть некоторая активность.

  • Не следует пропускать обсуждение Fakka в группах Google в этом контексте.

У меня есть несколько похожий вариант использования, и за последние два дня я исследовал все, что мог найти в F# Agents/Actors. Этот ответ — своего рода TODO для себя, чтобы попробовать эти идеи, половина из которых родилась во время его написания.

person V.B.    schedule 30.01.2014
comment
Большое спасибо за вклад. У меня вопрос: если потребители реализованы как обычные MBP, из-за неопределенности в доставке сообщений, поступающих от диспетчера, они все равно могут запускаться и обрабатывать наблюдения, которые не являются последними доступными, не так ли? В моем проекте я должен постараться обеспечить, чтобы процессоры работали с самыми свежими наблюдениями. - person NoIdeaHowToFixThis; 30.01.2014
comment
В этой схеме диспетчер гарантирует, что последнее доступное сообщение всегда будет отправлено первым. Однако диспетчер будет довольно быстро отправлять сообщения, а рабочие могут накапливать старые значения... Я должен исправить текст, он не достигает вашей цели, как написано изначально. Если вы не можете удалить какое-либо сообщение, но должны сначала обработать последнее, то рабочие процессы должны быть реализованы как диспетчер со стеком вместо очереди. Если вы можете отбрасывать не самые последние сообщения, тогда работайте непосредственно в диспетчере (где убедитесь, что сообщение и получение находятся в разных потоках, если вы используете реализацию из сообщения в блоге). - person V.B.; 30.01.2014
comment
... Я не совсем понимаю ваш вариант использования. В моем тексте, вероятно, нет действенного ответа на ваш вопрос, но есть некоторые строительные блоки и ссылки на другие полезные. - person V.B.; 30.01.2014
comment
Да, я понимаю. Я новичок в F#, и у меня очень маленький опыт работы с актерами. Я получил отличную пищу для размышлений от вас и @7sharp9. Ваша идея использовать кучу тоже очень заманчива. Попробую над чем-нибудь поработать и поэкспериментировать. Надеюсь, мы сможем поддерживать связь через это, я был бы рад увидеть, как вы продолжите реализацию. - person NoIdeaHowToFixThis; 30.01.2014
comment
Я тоже не эксперт и изучаю это на собственном горьком опыте :) Перечитав ваш вопрос, возникла еще одна мысль: если количество воркеров фиксировано и вам нужно запустить воркера с последним сообщением, как только воркер станет доступен, вы могли бы используйте SemaphoreSlim в диспетчере с ограничением, равным количеству рабочих, плюс некоторую логику для передачи сообщения освобожденному рабочему - person V.B.; 30.01.2014
comment
Привет, я пытаюсь реализовать класс агентов, проверяющий мою идею. Моя функция Post работает. Теперь пришло время поработать над PostAndReply. Я просмотрел код #ZachBray, и его функция PostAndReply не является достаточно общей. С другой стороны, код @7sharp9, где он использует специальную реализацию, основанную на TaskCompletionSource. Есть ли альтернативы этому подходу? Доступна ли реализация оригинального MailBoxProcessor? Спасибо! - person NoIdeaHowToFixThis; 03.02.2014
comment
Первая ссылка, с Драконами :) У FSharp.Actor немного другая, но общая реализация. Также проверьте это: github.com/rogeralsing/Pigeon, его разработка сейчас очень активна. - person V.B.; 03.02.2014
comment
Я думал, что Powerpack F# — это дополнительная библиотека к стандартному ядру. Я изучу материал. Большое спасибо. - person NoIdeaHowToFixThis; 03.02.2014

Похоже, вам может понадобиться деструктивная версия сканирования процессора почтовых ящиков, я реализовал это с помощью TPL Dataflow в серии блогов, которая может вас заинтересовать.

Мой блог в настоящее время закрыт на техническое обслуживание, но я могу указать вам на сообщения в формате уценки.

Часть 1
Часть 2
Часть 3

Вы также можете проверить код на github.

Я также писал о проблемах со сканированием в моем скрытом ужасе пост

Надеюсь, это поможет...

person 7sharp9    schedule 29.01.2014
comment
Привет. Это действительно интересное чтение. Я пройдусь по материалу. Кстати, у меня проблема с форматированием {% codeblock lang:fsharp %} и других тегов. Есть ли трюк, чтобы получить правильный макет? Большое спасибо. - person NoIdeaHowToFixThis; 29.01.2014
comment
Он отформатирован с помощью жидкого плагина, github просто показывает его со стандартным процессором разметки. Я перенесу их в свой текущий блог, просто пока не было времени. Вы также можете найти части 1 и 2 на обратном пути: http://web.archive.org/web/20131126185321/http://moiraesoftware.com/blog/2012/01/22/FSharp-Dataflow-agents-I/ http://web.archive.org/web/20131127003757/http://moiraesoftware.com/blog/2012/01/30/FSharp-Dataflow-agents-II/ - person 7sharp9; 29.01.2014
comment
@7sharp9 Спасибо за прямые читаемые ссылки! Вчера я попробовал кеш Google :) Я протестировал DataFlowAgent на том же тесте, что и в статье Зака ​​Брея, и DFA в ~ 4 раза медленнее, чем стандартный MBP на моей машине. Он тратит 72% на DataflowBlock.ReceiveAsync/DataflowBlock.Post, которые находятся внутри System.Threading.Tasks.Dataflow, что и объясняет падение скорости в 4 раза. STTD медленный или что? Фрагмент здесь: fssnip.net/lt - person V.B.; 30.01.2014

Самое простое решение — жадно поедать все сообщения в папке «Входящие» по мере их поступления и удалять все, кроме самых последних. Легко сделать с помощью TryReceive:

let rec readLatestLoop oldMsg =
  async { let! newMsg = inbox.TryReceive 0
          match newMsg with
          | None -> oldMsg
          | Some newMsg -> return! readLatestLoop newMsg }
let readLatest() =
  async { let! msg = inbox.Receive()
          return! readLatestLoop msg }

Столкнувшись с той же проблемой, я разработал более сложное и эффективное решение, которое я назвал отменяемой потоковой передачей и описал в статье журнала F# здесь. Идея состоит в том, чтобы начать обработку сообщений, а затем отменить эту обработку, если они заменены. Это значительно улучшает параллелизм, если выполняется значительная обработка.

person J D    schedule 04.02.2014