Как объединить несколько ответов CompletionStage типа List (для меня) или какой-либо другой в java

Я пытаюсь создать несколько CompletionStage типа List, например. CompletionStage<List<Car>>. И в конце я хочу объединить все ответы типа <List<Car>> в один список в одном CompletionStage.

CompletionStage<List<Car>> completionStageOne= carClientOne.getCarList();
CompletionStage<List<Car>> completionStageTwo= carClientTwo.getCarList();
CompletionStage<List<Car>> completionStageThree= carClientThree.getCarList();

Итак, предположим, у меня есть 3 разных сервиса, которые дадут мне другой список автомобилей, как в форме ответа CompletionStage<List<Car>>.

Сейчас пытаюсь их объединить и составить один общий список машин и тут у меня возникает проблема. Я использую приведенный ниже код, чтобы объединить результат

CompletionStage<List<Car>> completionStageOneTwo = completionStageOne
.thenCombine(completionStageTwo,(x, y) -> Stream.concat(x.stream(), y.stream()).collect(Collectors.toList()));

//above will work but if I add the third one then it will not. 

CompletionStage<List<Car>> completionStageFinal = completionStageOneTwo
.thenCombine(completionStageThree,(x, y) -> Stream.concat(x.stream(), y.stream()).collect(Collectors.toList())); 

и в конце я делаю

List<Car> finalList = completionStageFinal.toCompletableFuture().get();

Так что я делаю неправильно? Как я могу совместить эти три? Я что-то блокирую?

Примечание. Я уже проверил этот ответ Хольгера, но не смог понять, как использовать concat.


person user2367130    schedule 28.08.2018    source источник
comment
У вас есть трассировка стека в вашей консоли?   -  person Tony    schedule 28.08.2018
comment
Что вы имеете в виду под «[…] если я добавлю третий, он не будет [работать]»   -  person Didier L    schedule 28.08.2018
comment
@DidierL Итак, когда я это делаю, CompletionStage<List<Car>> completionStageOneTwo = completionStageOne .thenCombine(completionStageTwo,(x, y) -> Stream.concat(x.stream(), y.stream()).collect(Collectors.toList())); он объединяет два результата в один CompletionStage типа List‹Car›, но когда я пытаюсь объединить CompletionStageThree с completionStageOneTwo (который результат комбинации завершенияStageOne и завершенияStageTwo ), это не работает.   -  person user2367130    schedule 29.08.2018
comment
Это дает исключение SocketTimeOutException. java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.net.SocketTimeoutException: 30,000 milliseconds timeout on connection   -  person user2367130    schedule 29.08.2018


Ответы (2)


Позвольте мне показать вам пример. Я покажу, как использовать CompletableFuture.AllOf(...), которые позволяют ожидать всех фьючерсов.

    // create promises to get cars
    CompletableFuture<List<String>> cars1 = CompletableFuture.completedFuture(Arrays.asList("BMW", "Alfa"));
    CompletableFuture<List<String>> cars2 = CompletableFuture.completedFuture(Collections.singletonList("WV"));
    CompletableFuture<List<String>> cars3 = CompletableFuture.completedFuture(Collections.singletonList("FIAT"));

    // collect promises just for convenience
    List<CompletableFuture<List<String>>> allFutures = Arrays.asList(cars1, cars2, cars3);

    // wait until all cars will be obtained
    CompletableFuture<List<String>> listCompletableFuture =
            CompletableFuture.allOf(cars1, cars2, cars3)
            .thenApply(avoid -> allFutures  //start to collect them
                    .stream()
                    .flatMap(f -> f.join().stream()) //get List from feature. Here these cars has been obtained, therefore non blocking
                    .collect(Collectors.toList())
    );

    // there are here
    listCompletableFuture.join().forEach(System.out::println);

Выход:

BMW
Alfa
WV
FIAT
person Bukharov Sergey    schedule 28.08.2018

На самом деле я столкнулся с той же проблемой, и у меня есть пример того, как я использовал несколько завершаемых фьючерсов для создания объекта.

https://github.com/te21wals/CompletableFuturesDemo

Это немного более «предприимчиво», чем необходимо, но вы должны понять идею.

public static void main (String[] args){
    CompletableFuture<ClassA> classAfuture = futureProvider.retrieveClassA();
    CompletableFuture<ClassB> classBfuture = futureProvider.retrieveClassB();
    CompletableFuture<ClassC> classCfuture = futureProvider.retrieveClassC();

    System.out.println("starting completable futures ...");
    long startTime = System.nanoTime();

    ABCData ABCData = CompletableFuture.allOf(classAfuture, classBfuture, classCfuture)
            .thenApply(ignored ->
                    combineFunction.combind(
                            classAfuture.join(),
                            classBfuture.join(),
                            classCfuture.join())
            ).join();

    long endTime = System.nanoTime();
    long duration = (endTime - startTime);
    System.out.println("completable futures are complete...");
    System.out.println("duration:\t" + Duration.ofNanos(duration).toString());
    System.out.println("result:\t" + ABCData);
} 

Приятным дополнительным преимуществом, которое вы получаете, является то, что общий запрос занимает время самого длинного отдельного запроса.

person Thomas Walsh    schedule 12.09.2018