Что такое реактивные потоки?

Реактивные потоки — это парадигма программирования, которая обеспечивает способ обработки асинхронных потоков данных декларативным и компонуемым способом. Цель реактивных потоков — упростить обработку асинхронных данных, предоставив согласованный и стандартизированный способ работы с потоками данных, которые могут создаваться с течением времени.

В JavaScript реактивные потоки часто используются для обработки систем, управляемых событиями, таких как пользовательский ввод, сетевые запросы или данные датчиков, которые могут выдавать данные с течением времени. Используя реактивные потоки, разработчики могут легко обрабатывать и манипулировать этими данными декларативным и компонуемым способом, что упрощает рассмотрение сложных систем.

Создание реактивных потоков с помощью генераторов

Одним из хороших способов создания реактивных потоков в JavaScript является использование генераторов. Генераторы — это функция, представленная в ES6, которая позволяет вам определить функцию, которую можно приостановить и возобновить в любое время. Когда генератор приостанавливается с помощью ключевого слова yield, он может дождаться поступления новых данных, а затем возобновить выполнение, когда новые данные станут доступны. Это делает генераторы идеальными для работы с асинхронными потоками данных, поскольку их можно приостанавливать и возобновлять по мере поступления новых данных.

Вот пример того, как вы можете использовать генераторы для создания простого реактивного потока в JavaScript:

function* reactiveStream() {
  while (true) {
    const value = yield;
    console.log(value);
  }
}

const stream = reactiveStream();
stream.next(); // start the generator

stream.next(1);
stream.next(2);
stream.next(3);
stream.next(4);

В этом примере мы определяем функцию-генератор reactiveStream, которая прослушивает значения с помощью ключевого слова yield. Затем мы создаем экземпляр генератора, используя const stream = reactiveStream();, и запускаем его, используя stream.next();.

Операции над потоками

Давайте создадим простую карту и операторы фильтрации для нашей реализации реактивных потоков, а также выразим ее в виде класса, чтобы она выглядела более наглядно.

class ReactiveStream {
  constructor() {
    this.listeners = [];
    this.generator = this._createGenerator();
    this.generator.next();
  }

  _createGenerator() {
    return function* () {
      while (true) {
        const value = yield;
        this.listeners.forEach((listener) => listener(value));
      }
    }.bind(this)();
  }

  subscribe(listener) {
    this.listeners.push(listener);
  }

  unsubscribe(listener) {
    this.listeners = this.listeners.filter((l) => l !== listener);
  }

  next(value) {
    this.generator.next(value);
  }

  map(fn) {
    const source = this;
    const stream = new ReactiveStream();
    source.subscribe((value) => stream.next(fn(value)));
    return stream;
  }

  filter(predicate) {
    const source = this;
    const stream = new ReactiveStream();
    source.subscribe((value) => {
      if (predicate(value)) {
        stream.next(value);
      }
    });
    return stream;
  }
}

Мы определяем две функции double и even, которые будут использоваться как преобразования для реактивного потока.

Затем мы создаем два новых потока, mappedStream и filteredStream, применяя операторы map и filter к исходному stream. Оператор map применяет функцию double к каждому значению, выдаваемому исходным потоком, а оператор filter выдает только те значения, которые удовлетворяют функции even.

Мы подписываемся на каждый из новых потоков с помощью метода subscribe и предоставляем функцию, которая будет вызываться каждый раз при генерации значения. Затем мы передаем некоторые значения в исходный поток, используя stream.next(), и наблюдаем за выводом в консоли.

const stream1 = new ReactiveStream();
const stream2 = new ReactiveStream();

const double = map(x => x * 2);
const even = filter(x => x % 2 === 0);

const mappedStream = stream1.map(double);
const filteredStream = stream2.filter(even);

mappedStream.subscribe(console.log)
filteredStream.subscribe(console.log)

setTimeout(() => {
  stream1.next(1);
}, 1000);

setTimeout(() => {
  stream2.next(2);
}, 1500);

setTimeout(() => {
  stream2.next(3);
}, 1500);

// console.log outputs 2 and 2

Слияние реактивных потоков

Еще одним полезным оператором в реактивных потоках является оператор merge, который позволяет объединять несколько потоков в один поток. Это может быть полезно, когда у вас есть несколько источников данных, которые вы хотите объединить (направить) в один поток.

Вот пример того, как вы можете использовать оператор merge для объединения двух или более реактивных потоков в JavaScript:

class ReactiveStream {
//...
static merge(...streams) {
    const mergedStream = new ReactiveStream();
    streams.forEach((stream) => {
      stream.subscribe((value) => mergedStream.next(value));
    });
    return mergedStream;
  }
}

const mergedStream = ReactiveStream.merge(mappedStream, filteredStream);
mergedStream.subscribe(console.log)

// console.log outputs 2 and 2

В этом примере мы добавляем static метод merge к классу ReactiveStream, который позволяет нам объединять несколько потоков в один поток. Метод merge возвращает новый ReactiveStream, который выдает все значения, выдаваемые входными потоками.

Заключение

Reactive Streams — это мощная парадигма программирования, которая значительно упрощает работу с асинхронными потоками данных в JavaScript. Используя генераторы и операторы, такие как map, filter и merge, разработчики могут легко обрабатывать и манипулировать асинхронными данными декларативным и компонуемым способом.

Хотя примеры в этой статье предоставляют базовый обзор реактивных потоков в JavaScript, существует множество других функций и методов, которые можно использовать для создания более продвинутых и мощных систем реактивных потоков. Используя реактивные потоки, разработчики могут создавать управляемые событиями системы, более удобные в сопровождении, масштабируемые и надежные.

При работе с реактивными потоками важно помнить, что система хороша ровно настолько, насколько хорошо ее самое слабое звено. Это означает, что если одна часть системы не реагирует, может пострадать вся система. Поэтому важно убедиться, что все части системы спроектированы так, чтобы быть реактивными и обрабатывать асинхронные данные согласованным и предсказуемым образом.

В заключение, реактивные потоки — это мощная парадигма программирования, которая может значительно упростить работу с асинхронными потоками данных в JavaScript. Используя генераторы и операторы, такие как map, filter и merge, разработчики могут легко обрабатывать и манипулировать асинхронными данными декларативным и компонуемым способом, а также создавать более удобные в сопровождении, масштабируемые и надежные системы, управляемые событиями.

Хотя генераторы — это мощный инструмент для работы с реактивными потоками в JavaScript, поначалу работать с ними может быть немного сложно. Если вы новичок в генераторах и ключевом слове yield, не волнуйтесь — вы не одиноки! Даже опытные разработчики JavaScript могут сначала сбить с толку генераторы, особенно при работе со сложными асинхронными потоками данных.

Но не бойтесь, существует множество доступных ресурсов, которые помогут вам освоить генераторы и реактивные потоки. Предпочитаете ли вы чтение документации, просмотр учебных пособий или погружение в примеры кода, существует множество вариантов, которые помогут вам освоить эту мощную технику.

Поэтому не пугайтесь генераторов и реактивных потоков — немного практики и терпения, и вы тоже сможете использовать мощь этих инструментов для создания более качественных и реактивных приложений.