Я являюсь членом команды Akka и хотел бы использовать этот вопрос, чтобы прояснить некоторые вещи о необработанных интерфейсах Reactive Streams. Надеюсь, вы найдете это полезным.
В частности, в ближайшее время мы опубликуем несколько сообщений в блоге команды Akka о создании пользовательских этапов, включая потоки, так что следите за этим.
Не используйте ActorPublisher / ActorSubscriber
Пожалуйста, не используйте ActorPublisher
и ActorSubscriber
. Они слишком низкоуровневые, и вы можете в конечном итоге реализовать их таким образом, чтобы нарушить спецификацию Reactive Streams. Это пережиток прошлого, и даже тогда они были всего лишь «режимом продвинутого пользователя». В настоящее время действительно нет причин использовать эти классы. Мы никогда не предоставляли способ создания потока, потому что сложность просто взрывоопасна, если он был представлен как «сырой» API акторов, который вы могли бы реализовать и получить все правила реализованы правильно.
Если вы действительно хотите реализовать необработанные интерфейсы ReactiveStreams, пожалуйста, используйте Спецификацию TCK, чтобы убедиться, что ваша реализация верна. Вы, вероятно, будете застигнуты врасплох некоторыми из более сложных угловых случаев, с которыми должен справиться Flow
(или, по терминологии RS, Processor
).
Большинство операций можно построить, не переходя на нижний уровень
Многие потоки вы сможете просто построить, построив из Flow[T]
и добавив к нему необходимые операции, как пример:
val newFlow: Flow[String, Int, NotUsed] = Flow[String].map(_.toInt)
Это многоразовое описание Flow.
Поскольку вы спрашиваете о режиме опытного пользователя, это самый мощный оператор в самом DSL: statefulFlatMapConcat
. Подавляющее большинство операций, работающих с элементами простого потока, можно выразить с его помощью: Flow.statefulMapConcat[T](f: () ⇒ (Out) ⇒ Iterable[T]): Repr[T]
.
Если вам нужны таймеры, вы можете zip
с Source.timer
и т. Д.
GraphStage - самый простой и безопасный API для создания настраиваемых этапов
Вместо этого сборка источников / потоков / приемников имеет собственный мощный и безопасный API: GraphStage
. Прочтите документацию о создании пользовательских GraphStages (они может быть Sink / Source / Flow или даже любой произвольной формы). Он обрабатывает все сложные правила Reactive Streams за вас, предоставляя вам полную свободу и безопасность типов при реализации ваших этапов (которые могут быть Flow).
Например, взято из документации GraphStage-реализация оператора filter(T => Boolean)
:
class Filter[A](p: A => Boolean) extends GraphStage[FlowShape[A, A]] {
val in = Inlet[A]("Filter.in")
val out = Outlet[A]("Filter.out")
val shape = FlowShape.of(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
if (p(elem)) push(out, elem)
else pull(in)
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
pull(in)
}
})
}
}
Он также обрабатывает асинхронные каналы и по умолчанию является плавким.
В дополнение к документации, эти сообщения в блоге подробно объясняют, почему этот API является Святым Граалем для создания пользовательских этапов любой формы:
person
Konrad 'ktoso' Malawski
schedule
24.08.2016
Flow
показывает, что такого метода не существует. У меня вопрос, существует ли он в другой форме / API. Спасибо - person Ori Popowski   schedule 24.08.2016