Реактивные потоки

Спецификации Reactive Streams позволяют нам создавать компоненты приложения, которые обмениваются данными асинхронно, сохраняя при этом безопасность с помощью механизмов обратного давления.

Реактивные потоки реализуются во всех современных языках программирования, таких как JavaScript и Scala.

Давайте рассмотрим один упрощенный пример, который соединяет производителя с подписчиком, чтобы увидеть, как идет поток.

val publisher = Subject[Int]

val subscriber = publisher.subscribe(message => println(message))


publisher.onNext(5)

Здесь мы создали Subject [Int], чтобы мы могли вызвать для него .onNext, который отправляет сообщение (Int) в поток. На другом конце канала у нас есть подписчик, который подписывается на издателя и укажите, что каждое новое сообщение будет обрабатываться функцией.

Как видите, нам нужны две части, издатель и подписчик, и нам нужно соединить их вместе.

Операции над потоком

Иногда нам не нужно обрабатывать каждое отдельное сообщение, поэтому мы можем фильтровать канал.

val subscriber = publisher
  .filter(_ % 2 == 0)
  .subscribe(message => println(message))

В других случаях нам нужно применить преобразования, поэтому мы сопоставляем потоки.

case class EvenNumber(n: Int)
val subscriber = publisher
  .filter(_ % 2 == 0)
  .map(EvenNumber)
  .subscribe(message => println(message))

К настоящему времени мы, возможно, поняли, что поток имеет операции, аналогичные тем, которые мы можем найти в любой коллекции. Это мощная функция, поскольку нам не нужно изучать новый API, мы можем работать с тем, что мы уже знаем.

Одна маленькая проблема

Наша проблема в том, что для создания полного конвейера нам нужно соединить все части вместе. Это означает, что при создании подписчика нам нужен уже созданный издатель, и у нас должен быть к нему доступ.

val subscriber = publisher.subscribe(message => println(message))

Подписчик должен знать об издателе, поэтому нам нужно передать издателя везде, где должен быть создан подписчик.

Общая конечная точка

Мы хотим создать механизм, позволяющий избежать упомянутой проблемы. Идея решить эту проблему совсем не сложна. На самом деле он исходит от современной системы Apache Kafka.

Kafka использует подход на основе темы для разделения потоков. Мы можем поместить сообщение в тему A, так что только те, кто слушает тему A, получат и обработают сообщение.

С другой стороны, издатели и подписчики полностью разделены. Они ничего не знают друг о друге. Они используют центральное место для публикации сообщений и подписки на них.

Строительство MiniKaf

MiniKaf - это библиотека, которую мы создали для решения данной проблемы. Он предоставляет очень чистый API, поддерживаемый минимальным набором типов.

Давайте посмотрим на некоторые из основных компонентов.

Во-первых, у нас есть интерфейс для каждого события / сообщения, опубликованного в системе.

trait Event[A] {
  def value: A
}

Затем у нас есть механизм для преобразования наших типов в Event, прежде чем они будут отправлены в канал.

@typeclass trait ToEvent[A] {
  def event(a: A): Event[A]
}

И, конечно же, способ преобразования по умолчанию.

object ToEvent {

  implicit def toEvent[A]: ToEvent[A] = new ToEvent[A] {
    override def event(a: A) = E(a)
  }
}

Или мы можем использовать специфический для пользователя способ выполнить преобразование с помощью функции event.

object ToEvent {

  def event[A](a: A)(f: A => Event[A]): Event[A] = f(a)

  implicit def toEvent[A]: ToEvent[A] = new ToEvent[A] {
    override def event(a: A) = E(a)
  }
}

Теперь мы готовы начать отправлять сообщения.

val publisher = Publisher()
publisher.publish(5)
publisher.publish("hello")

Но ждать! Как мы потребляем эти сообщения?

Получение сообщения - это так же просто, как подписка на тип сообщения, которое вы хотите использовать.

val subscriber = Subscriber()
subscriber.subscribe[Int](e => println((e.topic, e.value)))

Здесь мы подписались на сообщения типа Int. В следующий раз, когда Int будет отправлен через поток, функция e => println((e.topic, e.value)) будет выполнена. Обратите внимание, что e - это не Int, а EventRecord, где topic - это "Int", а value - это фактическое сообщение, отправленное издателем.

Самое важное, на что следует обратить внимание, это то, что мы не подключили подписчика к издателю. Мы только что создали подписчика и указали подписку. Это тот же уровень развязки, что и у Kafka, но мы полагаемся на реактивные потоки для базовой реализации.

Выводы

Реактивные потоки распространяются на огромный набор платформ, позволяя им обмениваться данными с помощью асинхронных каналов, которые являются эластичными, надежными и управляются сообщениями со встроенным обратным давлением. Однако мы думаем, что они предназначены не для прямого использования, а через наши собственные API поверх них.

Требуется помощь

MiniKaf находится в самом начале своего существования, в настоящее время. Мы выполняем некоторую небезопасную работу во время выполнения, которая может быть заменена механизмом времени компиляции (возможно, с использованием Shapeless). Кроме того, API не является полным, и некоторые элементы отсутствуют. Любая помощь будет оценена по достоинству.

Хакерский полдень - это то, с чего хакеры начинают свои дни. Мы часть семьи @AMI. Сейчас мы принимаем заявки и рады обсуждать рекламные и спонсорские возможности.

Чтобы узнать больше, прочтите нашу страницу о нас, поставьте лайк / напишите нам в Facebook или просто tweet / DM @HackerNoon.

Если вам понравился этот рассказ, мы рекомендуем прочитать наши Последние технические истории и Современные технические истории. До следующего раза не воспринимайте реалии мира как должное!