Я изучаю агенты F# (MailboxProcessor
).
Я имею дело с довольно нестандартной проблемой.
- У меня есть один агент (
dataSource
), который является источником потоковых данных. Данные должны быть обработаны массивом агентов (dataProcessor
). Мы можем рассматриватьdataProcessor
как своего рода устройство слежения. - Данные могут поступать быстрее, чем скорость, с которой
dataProcessor
может обрабатывать ввод. - Это нормально иметь некоторую задержку. Тем не менее, я должен убедиться, что агент остается на вершине своей работы и не попадает под устаревшие наблюдения.
Я изучаю способы решения этой проблемы.
Первой идеей является реализация стека ( ЛИФО) в dataSource
. dataSource
отправит последнее доступное наблюдение, когда dataProcessor
станет доступным для получения и обработки данных. Это решение может работать, но может оказаться сложным, так как dataProcessor
может потребоваться заблокировать и повторно активировать; и сообщить о своем статусе dataSource
, что приводит к проблемам с двусторонней связью. Эта проблема может сводиться к blocking queue
в проблема потребитель-производитель, но я не уверен..
Вторая идея заключается в том, чтобы dataProcessor
позаботился о сортировке сообщений. В этой архитектуре dataSource
будет просто публиковать обновления в очереди dataProcessor
. dataProcessor
будет использовать Scan
для получения последних данных, доступных в его очереди. Это может быть путь. Однако я не уверен, что в текущем дизайне MailboxProcessor
можно очистить очередь сообщений, удалив старые устаревшие. Кроме того, здесь написано, что:
К сожалению, функция TryScan в текущей версии F# не работает по двум причинам. Во-первых, все дело в том, чтобы указать тайм-аут, но реализация фактически не соблюдает его. В частности, нерелевантные сообщения сбрасывают таймер. Во-вторых, как и в случае с другой функцией сканирования, очередь сообщений проверяется под замком, который предотвращает отправку сообщений любым другим потокам на время сканирования, которое может быть сколь угодно большим. Следовательно, сама функция TryScan имеет тенденцию блокировать параллельные системы и даже может создавать взаимоблокировки, поскольку код вызывающего объекта оценивается внутри блокировки (например, отправка аргумента функции в Scan или TryScan может привести к блокировке агента, когда код под блокировкой блокирует ожидание). приобрести замок, под которым он уже находится).
Получение последнего наблюдения может быть проблемой. Автор этого поста, @Jon Harrop, предполагает, что
Мне удалось спроектировать вокруг него, и получившаяся архитектура была на самом деле лучше. По сути, я с нетерпением
Receive
все сообщения и фильтрую, используя свою собственную локальную очередь.
Эту идею, безусловно, стоит изучить, но прежде чем приступить к экспериментам с кодом, я хотел бы получить некоторые советы о том, как я могу структурировать свое решение.
Спасибо.
TryScan
в F # 3.1.1, и она была исправлена. - person J D   schedule 04.02.2014