Создание потока из актера в Akka Streams

Можно создавать источники и приемники из акторов, используя методы Source.actorPublisher() и Sink.actorSubscriber() соответственно. Но можно ли создать Flow из актера?

Концептуально, похоже, нет веских причин не делать этого, учитывая, что он реализует как черты ActorPublisher, так и ActorSubscriber, но, к сожалению, у объекта Flow нет метода для этого. В этом отличном сообщении блога это сделано в более ранней версии Akka Streams, поэтому вопрос в том, возможно ли это и в последней (2.4.9) версии.


person Ori Popowski    schedule 24.08.2016    source источник
comment
Хммм ... Я предлагаю попробовать, и если это не сработает, обновите свой вопрос.   -  person hveiga    schedule 24.08.2016
comment
Нет возможности сделать это. Может быть, я не совсем понял, но беглый взгляд на методы объекта Flow показывает, что такого метода не существует. У меня вопрос, существует ли он в другой форме / API. Спасибо   -  person Ori Popowski    schedule 24.08.2016


Ответы (3)


Я являюсь членом команды Akka и хотел бы использовать этот вопрос, чтобы прояснить некоторые вещи о необработанных интерфейсах Reactive Streams. Надеюсь, вы найдете это полезным.

В частности, в ближайшее время мы опубликуем несколько сообщений в блоге команды Akka о создании пользовательских этапов, включая потоки, так что следите за этим.

Не используйте ActorPublisher / ActorSubscriber

Пожалуйста, не используйте ActorPublisher и ActorSubscriber. Они слишком низкоуровневые, и вы можете в конечном итоге реализовать их таким образом, чтобы нарушить спецификацию Reactive Streams. Это пережиток прошлого, и даже тогда они были всего лишь «режимом продвинутого пользователя». В настоящее время действительно нет причин использовать эти классы. Мы никогда не предоставляли способ создания потока, потому что сложность просто взрывоопасна, если он был представлен как «сырой» API акторов, который вы могли бы реализовать и получить все правила реализованы правильно.

Если вы действительно хотите реализовать необработанные интерфейсы ReactiveStreams, пожалуйста, используйте Спецификацию TCK, чтобы убедиться, что ваша реализация верна. Вы, вероятно, будете застигнуты врасплох некоторыми из более сложных угловых случаев, с которыми должен справиться Flow (или, по терминологии RS, Processor).

Большинство операций можно построить, не переходя на нижний уровень

Многие потоки вы сможете просто построить, построив из Flow[T] и добавив к нему необходимые операции, как пример:

val newFlow: Flow[String, Int, NotUsed] = Flow[String].map(_.toInt)

Это многоразовое описание Flow.

Поскольку вы спрашиваете о режиме опытного пользователя, это самый мощный оператор в самом DSL: statefulFlatMapConcat. Подавляющее большинство операций, работающих с элементами простого потока, можно выразить с его помощью: Flow.statefulMapConcat[T](f: () ⇒ (Out) ⇒ Iterable[T]): Repr[T].

Если вам нужны таймеры, вы можете zip с Source.timer и т. Д.

GraphStage - самый простой и безопасный API для создания настраиваемых этапов

Вместо этого сборка источников / потоков / приемников имеет собственный мощный и безопасный API: GraphStage. Прочтите документацию о создании пользовательских GraphStages (они может быть Sink / Source / Flow или даже любой произвольной формы). Он обрабатывает все сложные правила Reactive Streams за вас, предоставляя вам полную свободу и безопасность типов при реализации ваших этапов (которые могут быть Flow).

Например, взято из документации GraphStage-реализация оператора filter(T => Boolean):

class Filter[A](p: A => Boolean) extends GraphStage[FlowShape[A, A]] {

  val in = Inlet[A]("Filter.in")
  val out = Outlet[A]("Filter.out")

  val shape = FlowShape.of(in, out)

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      setHandler(in, new InHandler {
        override def onPush(): Unit = {
          val elem = grab(in)
          if (p(elem)) push(out, elem)
          else pull(in)
        }
      })
      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          pull(in)
        }
      })
    }
}

Он также обрабатывает асинхронные каналы и по умолчанию является плавким.

В дополнение к документации, эти сообщения в блоге подробно объясняют, почему этот API является Святым Граалем для создания пользовательских этапов любой формы:

person Konrad 'ktoso' Malawski    schedule 24.08.2016
comment
Рекомендация_не использует ActorPublisher и ActorSubscriber. .. нарушение спецификации реактивных потоков_ предназначено для применения к созданию потока из актора? или также для создания источника из актера. Поскольку акторы кажутся естественным способом создания источников: doc.akka.io/docs/akka/2.4/scala/stream/. Тем не менее, вам интересно, разумна ли эта нестандартная реализация, чтобы не перегружать актера? См. Ниже актер, содержащий буфер. - person SemanticBeeng; 04.03.2017
comment
Просто используйте GraphStage для реализации исходного кода, он будет быстрее и производительнее ;-) Ссылка на единственный метод, который Akka Streams должен напрямую связать между собой, немного вводит в заблуждение, поскольку у нас есть полная страница, объясняющая, как реализовать настраиваемые этапы: doc.akka.io/docs/akka/2.4/ scala / stream / stream-customize.html Как я уже упоминал, использование GraphStage даст много преимуществ: производительность, плавность, отлаживаемость и т. д. Если у вас уже есть издатель, вы можете просто использовать его с Akka, хотя , Конечно. - person Konrad 'ktoso' Malawski; 05.03.2017

Решение Конрада демонстрирует, как создать настраиваемую сцену, в которой используются Актеры, но в большинстве случаев я думаю, что это немного излишне.

Обычно у вас есть актер, способный ответить на вопросы:

val actorRef : ActorRef = ???

type Input = ???
type Output = ???

val queryActor : Input => Future[Output] = 
  (actorRef ? _) andThen (_.mapTo[Output])

Это можно легко использовать с базовым Flow функция, которая принимает максимальное количество одновременных запросов:

val actorQueryFlow : Int => Flow[Input, Output, _] =
  (parallelism) => Flow[Input].mapAsync[Output](parallelism)(queryActor)

Теперь actorQueryFlow можно интегрировать в любой поток ...

person Ramón J Romero y Vigil    schedule 31.08.2016
comment
Я действительно согласен, я должен найти время, чтобы исправить свой ответ ... не стесняйтесь редактировать его, если у вас есть время! Следует объяснить оба способа, ваш как рекомендуемый. - person Konrad 'ktoso' Malawski; 27.09.2016
comment
@ Konrad'ktoso'Malawski Я благодарен вам за проверку моего ответа. Также спасибо за всю работу над аккой. Вы, ребята, делаете действительно классные вещи. - person Ramón J Romero y Vigil; 27.09.2016

Вот построение решения с использованием этапа графика. Актер должен подтверждать все сообщения, чтобы иметь противодействие. Актер уведомляется, когда поток завершается неудачно / завершается, а поток терпит неудачу, когда актор завершается. Это может быть полезно, если вы не хотите использовать ask, например когда не каждое входное сообщение имеет соответствующее выходное сообщение.

import akka.actor.{ActorRef, Status, Terminated}
import akka.stream._
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler}

object ActorRefBackpressureFlowStage {
  case object StreamInit
  case object StreamAck
  case object StreamCompleted
  case class StreamFailed(ex: Throwable)
  case class StreamElementIn[A](element: A)
  case class StreamElementOut[A](element: A)
}

/**
  * Sends the elements of the stream to the given `ActorRef` that sends back back-pressure signal.
  * First element is always `StreamInit`, then stream is waiting for acknowledgement message
  * `ackMessage` from the given actor which means that it is ready to process
  * elements. It also requires `ackMessage` message after each stream element
  * to make backpressure work. Stream elements are wrapped inside `StreamElementIn(elem)` messages.
  *
  * The target actor can emit elements at any time by sending a `StreamElementOut(elem)` message, which will
  * be emitted downstream when there is demand.
  *
  * If the target actor terminates the stage will fail with a WatchedActorTerminatedException.
  * When the stream is completed successfully a `StreamCompleted` message
  * will be sent to the destination actor.
  * When the stream is completed with failure a `StreamFailed(ex)` message will be send to the destination actor.
  */
class ActorRefBackpressureFlowStage[In, Out](private val flowActor: ActorRef) extends GraphStage[FlowShape[In, Out]] {

  import ActorRefBackpressureFlowStage._

  val in: Inlet[In] = Inlet("ActorFlowIn")
  val out: Outlet[Out] = Outlet("ActorFlowOut")

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {

    private lazy val self = getStageActor {
      case (_, StreamAck) =>
        if(firstPullReceived) {
          if (!isClosed(in) && !hasBeenPulled(in)) {
            pull(in)
          }
        } else {
          pullOnFirstPullReceived = true
        }

      case (_, StreamElementOut(elemOut)) =>
        val elem = elemOut.asInstanceOf[Out]
        emit(out, elem)

      case (_, Terminated(targetRef)) =>
        failStage(new WatchedActorTerminatedException("ActorRefBackpressureFlowStage", targetRef))

      case (actorRef, unexpected) =>
        failStage(new IllegalStateException(s"Unexpected message: `$unexpected` received from actor `$actorRef`."))
    }
    var firstPullReceived: Boolean = false
    var pullOnFirstPullReceived: Boolean = false

    override def preStart(): Unit = {
      //initialize stage actor and watch flow actor.
      self.watch(flowActor)
      tellFlowActor(StreamInit)
    }

    setHandler(in, new InHandler {

      override def onPush(): Unit = {
        val elementIn = grab(in)
        tellFlowActor(StreamElementIn(elementIn))
      }

      override def onUpstreamFailure(ex: Throwable): Unit = {
        tellFlowActor(StreamFailed(ex))
        super.onUpstreamFailure(ex)
      }

      override def onUpstreamFinish(): Unit = {
        tellFlowActor(StreamCompleted)
        super.onUpstreamFinish()
      }
    })

    setHandler(out, new OutHandler {
      override def onPull(): Unit = {
        if(!firstPullReceived) {
          firstPullReceived = true
          if(pullOnFirstPullReceived) {
            if (!isClosed(in) && !hasBeenPulled(in)) {
              pull(in)
            }
          }
        }

      }

      override def onDownstreamFinish(): Unit = {
        tellFlowActor(StreamCompleted)
        super.onDownstreamFinish()
      }
    })

    private def tellFlowActor(message: Any): Unit = {
      flowActor.tell(message, self.ref)
    }

  }

  override def shape: FlowShape[In, Out] = FlowShape(in, out)

}
person Meeuw    schedule 19.08.2018