Асинхронная отправка электронной почты Project Reactor с повторной попыткой при ошибке

Мне нужно отправить некоторые данные после регистрации пользователя. Я хочу сделать первую попытку в основном потоке, но если есть какие-либо ошибки, я хочу повторить попытку 5 раз с интервалом в 10 минут.

@Override
public void sendRegisterInfo(MailData data) {
    Mono.just(data)
        .doOnNext(this::send)
        .doOnError(ex -> logger.warn("Main queue {}", ex.getMessage()))
        .doOnSuccess(d -> logger.info("Send mail to {}", d.getRecipient()))
        .onErrorResume(ex -> retryQueue(data))
        .subscribe();
}

private Mono<MailData> retryQueue(MailData data) {
    return Mono.just(data)
               .delayElement(Duration.of(10, ChronoUnit.MINUTES))
               .doOnNext(this::send)
               .doOnError(ex -> logger.warn("Retry queue {}", ex.getMessage()))
               .doOnSuccess(d -> logger.info("Send mail to {}", d.getRecipient()))
               .retry(5)
               .subscribe();
}

Оно работает. Но у меня есть вопросы:

  1. Правильно ли я сделал операцию в doOnNext функции?
  2. Правильно ли использовать delayElement для задержки между казнями?
  3. Поток заблокирован при ожидании задержки?
  4. И как лучше всего повторить попытку ошибки и сделать задержку между ними?

person Алексей Романов    schedule 01.06.2017    source источник


Ответы (1)


  1. doOnXXX для регистрации в порядке. Но для фактической обработки элементов вы должны предпочесть использовать flatMap, а не doOnNext (при условии, что ваша обработка асинхронна / может быть преобразована в возврат _4 _ / _ 5_).

  2. Это правильно. Другой способ - перевернуть код и начать с Flux.interval, но здесь delayElement лучше IMO.

  3. Задержка выполняется в отдельном потоке / планировщике (по умолчанию Schedulers.parallel()), поэтому основной поток не блокируется.

  4. На самом деле в аддоне reactor-extra есть конструктор Retry, посвященный этому типу использования: https://github.com/reactor/reactor-addons/blob/master/reactor-extra/src/main/java/reactor/retry/Retry.java < / а>

person Simon Baslé    schedule 05.06.2017
comment
Почему нельзя выполнять асинхронную обработку операторов doOn **? Я слышал это заявление раньше, но не было четкого обоснования того, почему это не является хорошей практикой. - person Claudiu Guja; 09.08.2018
comment
вы, по сути, нарушаете реактивную цепочку, и вы не получите ни уведомления об успешном завершении процесса , ни об ошибке. Основная последовательность может завершиться раньше, создавая впечатление, что все прошло нормально, только для асинхронного процесса тихо завершиться в углу через секунду. Это нормально только в сценарии «выстрелил и забыл», но обычно вы хотите, по крайней мере, повторно подключиться к сигналам завершения / ошибки. - person Simon Baslé; 09.08.2018