Выполняется ли doFinally в том же потоке в Reactor

Выполняется ли doFinally в одном потоке? Будет ли ниже код блокировать основной поток?

mono
.map(fileName -> asyncDownloadFile(fileName, folderName))
.doFinally(v -> {
    FileUtils.cleanDirectory(folderName); // this method is blocking
});

если да, то как лучше всего выполнить cleanDirectory в отдельном потоке в doFinally?


person user3908406    schedule 29.10.2020    source источник


Ответы (2)


Оберните блокирующий вызов в Runnable и запустите его на отдельном thread:

Runnable task = () -> {FileUtils.cleanDirectory(folderName)};

Mono<Object> cleanDirPromise = Mono.fromRunnable(task);

mono
.map(fileName -> asyncDownloadFile(fileName, folderName))
.doFinally(v -> {
    cleanDirPromise.subscribeOn(Schedulers.parallel()).subscribe();
});

Примечание. По сути, это будет вызов типа «запустил и забыл», когда вас не волнует результат cleanDirPromise.

person Prashant Pandey    schedule 29.10.2020
comment
Schedulers.parallel() создает фиксированный пул рабочих потоков, количество ядер равно количеству запущенной системы. Schedulers.boundedElastic() - лучший выбор creates new worker pools as needed and reuses idle ones. Worker pools that stay idle for too long (the default is 60s) are also disposed. ... This is a better choice for I/O blocking work. Schedulers.boundedElastic() is a handy way to give a blocking process its own thread so that it does not tie up other resources. взято из документации projectreactor.io/docs/core/release / reference / # планировщик - person Toerktumlare; 30.10.2020
comment
Schedulers.parallel() следует использовать для задач с высокой интенсивностью использования ЦП, которые должны выполнять вычисления на нескольких ядрах. - person Toerktumlare; 30.10.2020
comment
и subscribe следует удалить, так как нет причин для подписки в середине приложения. - person Toerktumlare; 30.10.2020
comment
@ThomasAndolf doFinally не подписывается на внутренний поток. Внешняя подписка не имеет ничего общего с подпиской на внутренний поток. Что касается планировщика, это не было частью вопроса. Но да, любые блокирующие задачи нужно запускать на эластике. - person Prashant Pandey; 30.10.2020
comment
doFinally возвращает Mono, поэтому подписка не требуется projectreactor.io/docs/core/release/api/reactor/core/publisher/. - person Toerktumlare; 30.10.2020
comment
@ThomasAndolf: каждый оператор Reactor возвращает новый поток, как и doFinally. Внутренний поток должен запускаться путем явной подписки на него. Вы можете попробовать запустить фиктивный пример. doOnNext также возвращает поток, но если вы хотите выполнить асинхронную операцию в реактивном конвейере внутри doOnNext, вам все равно нужно подписаться на него. Сравните это с flatMap, которая неявно подписывается на внутренний поток. doFinally не заботится о том, что вы делаете внутри него - вы также можете запустить в нем CompletableFuture. - person Prashant Pandey; 30.10.2020

Для этого лучше использовать оператор .then():

mono
    .map(fileName -> asyncDownloadFile(fileName, folderName))
    .then()
    .flatMap(
        Mono.fromRunnable(() -> FileUtils.cleanDirectory(folderName))
            .subscribeOn(Schedulers.boundedElastic())
    )
    ...

Оператор then() гарантирует, что cleanDirectory будет выполняться после asyncDownloadFile, а также позволяет построить один конвейер и обрабатывать ошибки.

person Yauhen Balykin    schedule 29.10.2020
comment
Также нужно добавить свой обработчик ошибки, но лучше запускать новую подписку в doFinally. - person Yauhen Balykin; 02.11.2020