Как реализовать асинхронные взаимозависимые отменяемые операции с акторами?

Я новичок в модели акторов, поэтому я думаю, что уже существуют устоявшиеся шаблоны для моего обычно выглядящего сценария с такими прекрасными составными абстракциями, как актеры и фьючерсы.

У меня есть асинхронные операции со следующими требованиями:

  • Они используют устаревшую систему, отправляя низкоуровневый запрос, а затем отслеживая состояние объекта с помощью опроса. Таким образом, результат фактической операции доступен только в отложенном виде, запрашивающие должны быть уведомлены, когда наблюдаемое состояние достигает желаемого состояния.
  • Эти операции могут выполняться только после завершения некоторых других операций, которые они должны ожидать параллельно.
  • Операции можно отменить. Конечно, уже отправленные низкоуровневые запросы нельзя отменить; отмена означает, что фактическая операция не выполняется после завершения операций, от которых мы зависим, и, конечно, это должно распространяться рекурсивно (если мы ждем зависимости и у нее есть несколько ожидающих операций, не запускайте их).

Я думаю о фьючерсах: первое требование можно решить, например. Akka's map/flatMap, второй с комбинатором traverse без процедурной поддержки зависимостей/зависимых. Но я не могу придумать решение для отмены; фьючерсы не могут быть отменены, и если они составлены, их компоненты недоступны. Как функционально инкапсулировать «отменить текущую операцию»? Поддерживает ли это какой-либо из фреймворков для Scala?


person thSoft    schedule 31.08.2011    source источник
comment
Почему этот вопрос получил только 1 голос, мне непонятно. И спустя почти 3 года до сих пор нет хорошего ответа.   -  person axiopisty    schedule 29.05.2014


Ответы (2)


Используйте прослушиватели: https://github.com/jboner/akka/blob/release-1.2/akka-actor/src/main/scala/akka/routing/Listeners.scala

Создайте Актера, который использует Слушателей для распространения состояния опроса на всех и всех слушателей. Затем вы можете использовать зацикливание передачи сообщений, чтобы повторно инициировать опрос.

class MyActor extends Actor with Listeners {

  override def preStart {
    self ! 'poll //Start looping on start
  }

  def receive = listenerManagement orElse {
    case 'poll => val result = pollYourExternalDude()
    gossip(result)
    self ! 'poll //Loop
  }
}

Затем вы можете остановить актера, используя либо остановку, либо отправку PoisonPill.

Это помогает?

person Viktor Klang    schedule 31.08.2011
comment
Спасибо за указание, тем временем я более глубоко изучил документы Akka и соответствующим образом отредактировал вопрос. Фьючерсы и их составление — это здорово, но все же не решает проблему отмены. Если мне придется поддерживать слушателей вручную, я выберу этот способ. - person thSoft; 08.09.2011
comment
Если вы используете только фьючерсы, у вас может быть фьючерс, который сигнализирует об отмене, поэтому, если он завершен, он отменен, а затем проверьте это, когда вы делаете свой расчет? - person Viktor Klang; 08.09.2011

ListenableFuture от Guava поддерживать отмену до уровня, когда связаны вместе (но не тогда, когда собраны из коллекции).

person thSoft    schedule 17.01.2012