Публикации по теме 'reactive-streams'
Реактивные потоки в JavaScript: упрощение асинхронной обработки данных с помощью генераторов и…
Что такое реактивные потоки?
Реактивные потоки — это парадигма программирования, которая обеспечивает способ обработки асинхронных потоков данных декларативным и компонуемым способом. Цель реактивных потоков — упростить обработку асинхронных данных, предоставив согласованный и стандартизированный способ работы с потоками данных, которые могут создаваться с течением времени.
В JavaScript реактивные потоки часто используются для обработки систем, управляемых событиями, таких как..
Вопросы по теме 'reactive-streams'
Project Reactor: подождите, пока вещатель закончит
Есть Broadcaster, который принимает строки и добавляет их в StringBuilder.
Я хочу это проверить.
Мне нужно использовать Thread#sleep , чтобы ждать, пока вещатель закончит обработку строк. Я хочу удалить sleep .
Попытался использовать...
1634 просмотров
schedule
11.07.2022
Фильтр и группировка Akka Streams по набору ключей
У меня есть поток
case class Msg(keys: Seq[Char], value: String)
Теперь я хочу отфильтровать подмножество ключей, например. val filterKeys = Set[Char]('k','f','c') и Filter(k.exists(filterKeys.contains))) И затем разделите их так, чтобы...
2771 просмотров
schedule
11.12.2022
Потоки Akka и границы транзакций
Я все еще понимаю концепции потоков Akka и пытаюсь понять, как сопоставить их со сценариями, когда у нас есть набор элементов, которые необходимо обрабатывать атомарно. Допустим, у нас есть заказ на покупку, состоящий из нескольких элементов, и нам...
381 просмотров
schedule
21.04.2022
Можно ли вообще использовать процессор Reactive Streams в качестве шины событий?
Я начал изучать реактивные потоки, потому что мне было интересно узнать об этой новой тенденции использования RxJava в качестве замены более традиционных шин событий. Этот пост в блоге представляет собой типичное описание того, как готово. Если я...
351 просмотров
schedule
01.03.2023
Преобразование бесконечного потока конечных потоков в бесконечный поток — Reactive X
Как этого добиться в Reactive x (в идеале с примерами в RxJava или RxJs)?
a |-a-------------------a-----------a-----------a----
s1 |-x-x-x-x-x-x -| (subscribe)
s2 |-x-x-x-x-x-| (subscribe)
s2...
541 просмотров
schedule
06.10.2022
Как правильно читать Flux ‹DataBuffer› и преобразовывать его в один inputStream
Я использую WebClient и пользовательский BodyExtractor class для своего приложения с весенней загрузкой
WebClient webLCient = WebClient.create();
webClient.get()
.uri(url, params)
.accept(MediaType.APPLICATION.XML)
.exchange()...
38714 просмотров
schedule
31.03.2023
Несколько приемников в одном потоке
У меня есть такой поток и два стока, но одновременно используется только один:
Source.fromElements(1, 2, 3)
.via(flow)
.runWith(sink1)
or
Source.fromElements(1, 2, 3)
.via(flow)
.runWith(sink2)
Можно настроить, какой приемник мы...
3831 просмотров
schedule
21.09.2023
Использование ForkJoinPool.commonPool() вместо планировщика вычислений RxJava
В настоящее время я реализую небольшое подмножество API RxJava2 для личного проекта. У меня есть API на основе слушателя, и я начал писать код, чтобы обернуть мой слушатель и реализовать Flowable.create() :
public class EventBus {
private...
323 просмотров
schedule
28.12.2022
Как обновить Mono ‹Bar› на основе условия в Mono ‹Foo›?
У меня есть приведенный ниже код из ответа, приведенного в этом [ stackoverflow question]
bars = bars.flatMap(bar -> findByBarId(bar.getBarId())
.flatMap(foo -> {
bar.setIsInFoo(true);...
685 просмотров
schedule
26.04.2022
Как отправить электронную почту в ответ на весенний веб-флюс
Я хотел бы оставаться полностью реактивным в моем новом весеннем приложении. Поэтому я использую web-flux/reactor и ReactiveRepository с MongoDB.
Вы знаете, как реактивно интегрировать java-mail в технический стек? Любые альтернативы?
2239 просмотров
schedule
15.08.2022
Как конвертировать Reactor Flux‹String› в InputStream
Учитывая, что у меня есть Flux<String> неизвестного размера, как я могу преобразовать его в InputStream , который ожидает другая библиотека?
Например, с помощью WebClient я могу добиться этого, используя этот подход....
6233 просмотров
schedule
29.06.2023
Reactor 3.x: есть ли оператор для дросселя Latest (conlate)?
В rx-java 2.x есть оператор с именем throttleLatest , который объединяет входящие события на основе заданного времени:
https://github.com/ReactiveX/RxJava/pull/5979
Есть ли аналогичный оператор в Reactor 3? Или можно добиться такого же...
92 просмотров
schedule
02.03.2023
Объединение нескольких потоков
Я переношу свой проект с Spring на Ktor и решил заменить реализацию реактивных потоков, которая изначально была Reactor, на RxJava 2. Хотя я столкнулся с некоторой проблемой при попытке объединить несколько потоков в один в конце реактивного...
733 просмотров
schedule
22.12.2022
Использование Flux.buffer реактора для пакетной обработки работает только для одного элемента
Я пытаюсь использовать Flux.buffer() для пакетной загрузки из базы данных.
Вариант использования заключается в том, что загрузка записей из БД может быть «взрывной», и я хотел бы ввести небольшой буфер для группировки загрузок, где это возможно....
1983 просмотров
schedule
24.05.2023
Разрешено только одно подключение получателя абонента
Может ли кто-нибудь помочь мне с ошибкой Only one connection receive subscriber allowed. ?
Я посмотрел ответ Олега Докуки , но мне это не помогло.
Я упростил код для демонстрационных целей. В моем фактическом коде я получаю объемный запрос...
225 просмотров
schedule
27.11.2022
Издатель реактивных потоков из тела ответа веб-клиента Vertx
Я пытаюсь написать оболочку для Vertx веб-клиента для загрузки тело ответа от сервера, использующего Publisher из реактивных потоков:
import org.reactivestreams.Publisher;
import io.vertx.reactivex.ext.web.client.WebClient;
interface Storage...
475 просмотров
schedule
14.05.2022
LiveDataReactiveStreams: как обрабатывать ошибки при использовании fromPublisher()
Я пытаюсь преобразовать Flowable в LiveData с помощью Reactive Streams. Просмотрел кучу статей, но до сих пор не понял концепции обработки состояний ошибок. В настоящее время я нахожусь на начальном этапе изучения RxJava . Было бы очень...
327 просмотров
schedule
02.01.2023
Как добавить автоинкрементные идентификаторы в наблюдаемый массив с динамическим размером?
Я использую callbags, но логика та же, что и в RxJS.
Я делаю наблюдаемый список, который можно добавлять и удалять. Я хочу иметь автоинкрементные идентификаторы. Я не знаю правильной логики для прикрепления идентификаторов к элементам.
Вот...
57 просмотров
schedule
28.01.2023
Как объединить Mono и Flux для создания одного объекта?
Я хочу создать один объект, состоящий из Mono и Flux. Допустим, есть 2 сервиса getPersonalInfo и getFriendsInfo . Person необходимы обе службы для создания объекта. При сжатии используется только первый элемент объекта friends , поскольку...
43 просмотров
schedule
07.07.2023