Публикации по теме 'reactive-streams'


Реактивные потоки в JavaScript: упрощение асинхронной обработки данных с помощью генераторов и…
Что такое реактивные потоки? Реактивные потоки — это парадигма программирования, которая обеспечивает способ обработки асинхронных потоков данных декларативным и компонуемым способом. Цель реактивных потоков — упростить обработку асинхронных данных, предоставив согласованный и стандартизированный способ работы с потоками данных, которые могут создаваться с течением времени. В JavaScript реактивные потоки часто используются для обработки систем, управляемых событиями, таких как..

Вопросы по теме 'reactive-streams'

Project Reactor: подождите, пока вещатель закончит
Есть Broadcaster, который принимает строки и добавляет их в StringBuilder. Я хочу это проверить. Мне нужно использовать Thread#sleep , чтобы ждать, пока вещатель закончит обработку строк. Я хочу удалить sleep . Попытался использовать...
1634 просмотров

Фильтр и группировка Akka Streams по набору ключей
У меня есть поток case class Msg(keys: Seq[Char], value: String) Теперь я хочу отфильтровать подмножество ключей, например. val filterKeys = Set[Char]('k','f','c') и Filter(k.exists(filterKeys.contains))) И затем разделите их так, чтобы...
2771 просмотров
schedule 11.12.2022

Потоки Akka и границы транзакций
Я все еще понимаю концепции потоков Akka и пытаюсь понять, как сопоставить их со сценариями, когда у нас есть набор элементов, которые необходимо обрабатывать атомарно. Допустим, у нас есть заказ на покупку, состоящий из нескольких элементов, и нам...
381 просмотров

Можно ли вообще использовать процессор Reactive Streams в качестве шины событий?
Я начал изучать реактивные потоки, потому что мне было интересно узнать об этой новой тенденции использования RxJava в качестве замены более традиционных шин событий. Этот пост в блоге представляет собой типичное описание того, как готово. Если я...
351 просмотров
schedule 01.03.2023

Преобразование бесконечного потока конечных потоков в бесконечный поток — Reactive X
Как этого добиться в Reactive x (в идеале с примерами в RxJava или RxJs)? a |-a-------------------a-----------a-----------a---- s1 |-x-x-x-x-x-x -| (subscribe) s2 |-x-x-x-x-x-| (subscribe) s2...
541 просмотров
schedule 06.10.2022

Как правильно читать Flux ‹DataBuffer› и преобразовывать его в один inputStream
Я использую WebClient и пользовательский BodyExtractor class для своего приложения с весенней загрузкой WebClient webLCient = WebClient.create(); webClient.get() .uri(url, params) .accept(MediaType.APPLICATION.XML) .exchange()...
38714 просмотров

Несколько приемников в одном потоке
У меня есть такой поток и два стока, но одновременно используется только один: Source.fromElements(1, 2, 3) .via(flow) .runWith(sink1) or Source.fromElements(1, 2, 3) .via(flow) .runWith(sink2) Можно настроить, какой приемник мы...
3831 просмотров
schedule 21.09.2023

Использование ForkJoinPool.commonPool() вместо планировщика вычислений RxJava
В настоящее время я реализую небольшое подмножество API RxJava2 для личного проекта. У меня есть API на основе слушателя, и я начал писать код, чтобы обернуть мой слушатель и реализовать Flowable.create() : public class EventBus { private...
323 просмотров
schedule 28.12.2022

Как обновить Mono ‹Bar› на основе условия в Mono ‹Foo›?
У меня есть приведенный ниже код из ответа, приведенного в этом [ stackoverflow question] bars = bars.flatMap(bar -> findByBarId(bar.getBarId()) .flatMap(foo -> { bar.setIsInFoo(true);...
685 просмотров
schedule 26.04.2022

Как отправить электронную почту в ответ на весенний веб-флюс
Я хотел бы оставаться полностью реактивным в моем новом весеннем приложении. Поэтому я использую web-flux/reactor и ReactiveRepository с MongoDB. Вы знаете, как реактивно интегрировать java-mail в технический стек? Любые альтернативы?
2239 просмотров

Как конвертировать Reactor Flux‹String› в InputStream
Учитывая, что у меня есть Flux<String> неизвестного размера, как я могу преобразовать его в InputStream , который ожидает другая библиотека? Например, с помощью WebClient я могу добиться этого, используя этот подход....
6233 просмотров

Reactor 3.x: есть ли оператор для дросселя Latest (conlate)?
В rx-java 2.x есть оператор с именем throttleLatest , который объединяет входящие события на основе заданного времени: https://github.com/ReactiveX/RxJava/pull/5979 Есть ли аналогичный оператор в Reactor 3? Или можно добиться такого же...
92 просмотров

Объединение нескольких потоков
Я переношу свой проект с Spring на Ktor и решил заменить реализацию реактивных потоков, которая изначально была Reactor, на RxJava 2. Хотя я столкнулся с некоторой проблемой при попытке объединить несколько потоков в один в конце реактивного...
733 просмотров
schedule 22.12.2022

Использование Flux.buffer реактора для пакетной обработки работает только для одного элемента
Я пытаюсь использовать Flux.buffer() для пакетной загрузки из базы данных. Вариант использования заключается в том, что загрузка записей из БД может быть «взрывной», и я хотел бы ввести небольшой буфер для группировки загрузок, где это возможно....
1983 просмотров

Разрешено только одно подключение получателя абонента
Может ли кто-нибудь помочь мне с ошибкой Only one connection receive subscriber allowed. ? Я посмотрел ответ Олега Докуки , но мне это не помогло. Я упростил код для демонстрационных целей. В моем фактическом коде я получаю объемный запрос...
225 просмотров

Издатель реактивных потоков из тела ответа веб-клиента Vertx
Я пытаюсь написать оболочку для Vertx веб-клиента для загрузки тело ответа от сервера, использующего Publisher из реактивных потоков: import org.reactivestreams.Publisher; import io.vertx.reactivex.ext.web.client.WebClient; interface Storage...
475 просмотров

LiveDataReactiveStreams: как обрабатывать ошибки при использовании fromPublisher()
Я пытаюсь преобразовать Flowable в LiveData с помощью Reactive Streams. Просмотрел кучу статей, но до сих пор не понял концепции обработки состояний ошибок. В настоящее время я нахожусь на начальном этапе изучения RxJava . Было бы очень...
327 просмотров

Как добавить автоинкрементные идентификаторы в наблюдаемый массив с динамическим размером?
Я использую callbags, но логика та же, что и в RxJS. Я делаю наблюдаемый список, который можно добавлять и удалять. Я хочу иметь автоинкрементные идентификаторы. Я не знаю правильной логики для прикрепления идентификаторов к элементам. Вот...
57 просмотров

Как объединить Mono и Flux для создания одного объекта?
Я хочу создать один объект, состоящий из Mono и Flux. Допустим, есть 2 сервиса getPersonalInfo и getFriendsInfo . Person необходимы обе службы для создания объекта. При сжатии используется только первый элемент объекта friends , поскольку...
43 просмотров