Контроллер загрузки Spring, возвращающий Mono из scala.concurrent.Future

Я запускаю систему акторов Akka внутри приложения Spring Boot. У меня работает группа Актеров.

Из своего класса контроллера я вызываю свой класс обслуживания, который, используя шаблон запроса «Актер», отправляет сообщение актору и ожидает ответа. Ниже приведен код метода обслуживания:

public Mono<Future<SportEventDetailed>> getEventBySportAndLeagueId(Integer sportId, Integer leagueId) {
    final ActorSelection actorSelection = bootstrapAkka.getActorSystem().actorSelection("/user/some/path");
    final ActorMessage message = new ActorMessage()

    final CompletionStage<Future<SportEventDetails>> futureCompletionStage = actorSelection.resolveOne(Duration.ofSeconds(2))
            .thenApplyAsync(actorRef ->
                        Patterns.ask(actorRef, message, 1000)
                        .map(v1 -> (SportEventDetails) v1, ExecutionContext.global())
                )
                .whenCompleteAsync((sportEventDetailsFuture, throwable) -> {
                    // Here sportEventDetailsFuture is of type scala.concurrent.Future
                    sportEventDetailsFuture.onComplete(v1 -> {
                        final SportEventDetails eventDetails = v1.get();
                        log.info("Thread: {} | v1.get - onComplete - SED: {}", Thread.currentThread(), eventDetails);
                        return eventDetails;
                    }, ExecutionContext.global());
                });

    return Mono.fromCompletionStage(futureCompletionStage);
}

Хотя код контроллера так же прост, как

@GetMapping(path = "{sportId}/{leagueId}")
public Mono<Future<SportEventDetails>> getEventsBySportAndLeagueId(@PathVariable("sportId") Integer sportId, @PathVariable("leagueId") Integer leagueId) {
    return eventService.getEventBySportAndLeagueId(sportId, leagueId);
}

Когда клиент вызывает эту конечную точку, он получает либо {"success":true,"failure":false}, либо null (в виде строки).

Я подозреваю, что проблема для ответа null заключается в том, что scala.concurrent.Future не завершен до того, как ответ будет отправлен клиенту, но я не понимаю, почему он не завершился вовремя, потому что я предполагаю, что Mono будет ждать завершения в будущем

Проблема здесь в том, что Patterns.ask возвращает scala.concurrent.Future<SportEventDetails>, и я не смог найти способ преобразовать scala Future в Java CompletableFuture<SportEventDetails> или CompletionStage<SportEventDetails>.

Итак, мой вопрос: как я могу вернуть клиенту json-представление SportEventDetails при использовании модели Akka Patterns.ask (...)?


person dazito    schedule 23.03.2020    source источник


Ответы (1)


Future, Mono и CompletionStage - три реализации одной и той же концепции, значение, которое может быть здесь, а может и не быть. Вам понадобится способ преобразовать их в один и тот же тип, а затем способ «сгладить» вложенный тип. Mono.fromCompletionStage - это такой метод, который превращает CompletionStage в Mono.

Проще всего будет полностью избежать Future и сглаживания:

В более поздних версиях Java (2.5.19 или новее): есть ask перегрузки, требующие java.time.Duration тайм-аут, вы получите возвращаемое значение CompletionStage<SportEventDetail>. Также есть ask перегрузки, которые принимают ActorSelection, поэтому вам не нужно сначала разрешать, а затем спрашивать, когда разрешение завершится:

CompletionStage<SportEventDetail> futureSportEventDetails = 
  Patterns.ask(selection, message, Duration.ofSeconds(3))
return Mono.fromCompletionStage(futureSportEventDetails);

В более старых версиях Akka (я думаю, 2.4.2 и новее) вы сможете найти похожие подписи в akka.pattern.PatternsCS.

Если вы используете еще более старую версию и не можете обновить ее, вам, вероятно, придется предоставить свой собственный метод преобразования с Future<T> на CompletionStage<T> или Mono<T>, который регистрирует onComplete прослушиватель в будущем и завершает экземпляр целевого типа.

person johanandren    schedule 26.03.2020