Завершаемые фьючерсы Java 8 для всех типов данных

У меня есть 3 CompletableFutures, все 3 возвращают разные типы данных.

Я хочу создать объект результата, который представляет собой композицию результата, возвращаемого всеми тремя фьючерсами.

Итак, мой текущий рабочий код выглядит так:

public ClassD getResultClassD() {

    ClassD resultClass = new ClassD();
    CompletableFuture<ClassA> classAFuture = CompletableFuture.supplyAsync(() -> service.getClassA() );
    CompletableFuture<ClassB> classBFuture = CompletableFuture.supplyAsync(() -> service.getClassB() );
    CompletableFuture<ClassC> classCFuture = CompletableFuture.supplyAsync(() -> service.getClassC() );

    CompletableFuture.allOf(classAFuture, classBFuture, classCFuture)
                     .thenAcceptAsync(it -> {
                        ClassA classA = classAFuture.join();
                        if (classA != null) {
                            resultClass.setClassA(classA);
                        }

                        ClassB classB = classBFuture.join();
                        if (classB != null) {
                            resultClass.setClassB(classB);
                        }

                        ClassC classC = classCFuture.join();
                        if (classC != null) {
                            resultClass.setClassC(classC);
                        }

                     });

    return resultClass;
}

Мои вопросы:

  1. Я предполагаю, что, поскольку я использую allOf и thenAcceptAsync, этот вызов не будет блокирующим. Я правильно понимаю?

  2. Это правильный способ иметь дело с несколькими фьючерсами, возвращающими разные типы результатов?

  3. Правильно ли строить ClassD объект в thenAcceptAsync?

  4. Уместно ли использовать метод join или getNow в лямбда-выражении thenAcceptAsync?

person Anand Sunderraman    schedule 27.02.2017    source источник


Ответы (4)


Ваша попытка идет в правильном, но неверном направлении. Ваш метод getResultClassD() возвращает уже созданный объект типа ClassD, для которого произвольный поток вызовет методы изменения, не обращая внимания на вызывающий getResultClassD(). Это может вызвать состояние гонки, если методы модификации не являются потокобезопасными сами по себе, кроме того, вызывающий никогда не узнает, когда экземпляр ClassD действительно готов к использованию.

Правильным решением будет:

public CompletableFuture<ClassD> getResultClassD() {

    CompletableFuture<ClassA> classAFuture
        = CompletableFuture.supplyAsync(() -> service.getClassA() );
    CompletableFuture<ClassB> classBFuture
        = CompletableFuture.supplyAsync(() -> service.getClassB() );
    CompletableFuture<ClassC> classCFuture
        = CompletableFuture.supplyAsync(() -> service.getClassC() );

    return CompletableFuture.allOf(classAFuture, classBFuture, classCFuture)
         .thenApplyAsync(dummy -> {
            ClassD resultClass = new ClassD();

            ClassA classA = classAFuture.join();
            if (classA != null) {
                resultClass.setClassA(classA);
            }

            ClassB classB = classBFuture.join();
            if (classB != null) {
                resultClass.setClassB(classB);
            }

            ClassC classC = classCFuture.join();
            if (classC != null) {
                resultClass.setClassC(classC);
            }

            return resultClass;
         });
}

Теперь вызывающий getResultClassD() может использовать возвращенный CompletableFuture для запроса состояния выполнения или действий, зависящих от цепочки, или использовать join() для получения результата после завершения операции.

Чтобы ответить на другие вопросы, да, эта операция является асинхронной, и использование join() в лямбда-выражениях уместно. join был создан именно потому, что Future.get(), который, как объявлено, выбрасывает проверенные исключения, делает использование этих лямбда-выражений излишне сложным.

Обратите внимание, что null тесты полезны только в том случае, если эти service.getClassX() действительно могут возвращать null. Если один из вызовов службы завершается с ошибкой из-за исключения, вся операция (представленная CompletableFuture<ClassD>) будет завершена в исключительном порядке.

person Holger    schedule 27.02.2017
comment
спасибо за подробный ответ. Мое единственное продолжение вашего ответа заключается в том, что thenApplyAsync имеет тип возвращаемого значения CompletableFuture ‹Void›, как это будет работать здесь и как можно вызвать этот метод и получить результат - person Anand Sunderraman; 27.02.2017
comment
Нет, это тип возврата allOf, который равен CompletableFuture<Void>, поэтому переданная функция thenApplyAsync получает Void в качестве входных данных (параметр dummy вместо dummy ->, вы также можете написать (Void dummy) ->). Затем функция переводит ввод Void (фактически игнорируя его) в результат ClassD, поэтому результатом thenApplyAsync будет CompletableFuture<ClassD>. - person Holger; 27.02.2017
comment
@Holger Я пошел по тому же пути, что и вы, но я использовал Optional.ofNullable в вызовах службы, так что вы можете иметь cCFuture.join().ifPresent(class::SetStuff) - person Ash; 27.02.2017
comment
@Ash: почти всегда есть несколько способов сделать это ... - person Holger; 27.02.2017
comment
@Holger благодарит за объяснение фиктивного параметра. Имеет смысл - person Anand Sunderraman; 27.02.2017

Я шел по тому же пути, что и @Holger в своем ответе, но заключил Service Calls в Optional, что привело к более чистому коду на этапе thenApplyAsync.

CompletableFuture<Optional<ClassA>> classAFuture
    = CompletableFuture.supplyAsync(() -> Optional.ofNullable(service.getClassA())));

CompletableFuture<Optional<ClassB>> classBFuture
    = CompletableFuture.supplyAsync(() -> Optional.ofNullable(service.getClassB()));

CompletableFuture<Optional<ClassC>> classCFuture
    = CompletableFuture.supplyAsync(() -> Optional.ofNullable(service.getClassC()));

return CompletableFuture.allOf(classAFuture, classBFuture, classCFuture)
     .thenApplyAsync(dummy -> {
        ClassD resultClass = new ClassD();

        classAFuture.join().ifPresent(resultClass::setClassA)
        classBFuture.join().ifPresent(resultClass::setClassB)
        classCFuture.join().ifPresent(resultClass::setClassC)

        return resultClass;
     });
person Ash    schedule 27.02.2017
comment
да, использование опций - правильный способ думать об этом - person Anand Sunderraman; 27.02.2017

Раньше я сталкивался с чем-то похожим и создал небольшую демонстрацию, чтобы показать, как я решил эту проблему.

Концепция похожа на @Holger, за исключением того, что я использовал функцию для объединения каждого отдельного будущего.

https://github.com/te21wals/CompletableFuturesDemo

По сути:

    public class CombindFunctionImpl implement CombindFunction {
    public ABCData combind (ClassA a, ClassB b, ClassC c) {
        return new ABCData(a, b, c);
   }
}

...

    public class FutureProvider {
public CompletableFuture<ClassA> retrieveClassA() {
    return CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new ClassA();
    });
}

public CompletableFuture<ClassB> retrieveClassB() {
    return CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(2000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new ClassB();
    });
}
public CompletableFuture<ClassC> retrieveClassC() {
    return CompletableFuture.supplyAsync(() -> {
        try {
            Thread.sleep(3000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new ClassC();
    });
}
}

......

public static void main (String[] args){
    CompletableFuture<ClassA> classAfuture = futureProvider.retrieveClassA();
    CompletableFuture<ClassB> classBfuture = futureProvider.retrieveClassB();
    CompletableFuture<ClassC> classCfuture = futureProvider.retrieveClassC();

    System.out.println("starting completable futures ...");
    long startTime = System.nanoTime();

    ABCData ABCData = CompletableFuture.allOf(classAfuture, classBfuture, classCfuture)
            .thenApplyAsync(ignored ->
                    combineFunction.combind(
                            classAfuture.join(),
                            classBfuture.join(),
                            classCfuture.join())
            ).join();

    long endTime = System.nanoTime();
    long duration = (endTime - startTime);
    System.out.println("completable futures are complete...");
    System.out.println("duration:\t" + Duration.ofNanos(duration).toString());
    System.out.println("result:\t" + ABCData);
}
person Thomas Walsh    schedule 09.09.2018

Другой способ справиться с этим, если вы не хотите объявлять столько переменных, - это использовать thenCombine или thenCombineAsync, чтобы связать ваши фьючерсы вместе.

public CompletableFuture<ClassD> getResultClassD()
{
  return CompletableFuture.supplyAsync(ClassD::new)
    .thenCombine(CompletableFuture.supplyAsync(service::getClassA), (d, a) -> {
      d.setClassA(a);
      return d;
    })
    .thenCombine(CompletableFuture.supplyAsync(service::getClassB), (d, b) -> {
      d.setClassB(b);
      return d;
    })
    .thenCombine(CompletableFuture.supplyAsync(service::getClassC), (d, c) -> {
      d.setClassC(c);
      return d;
    });
}

Геттеры по-прежнему будут запускаться асинхронно, а результаты будут выполняться по порядку. По сути, это еще один вариант синтаксиса для получения того же результата.

person Adam R    schedule 02.03.2017
comment
Вероятно, это хороший подход, если нужно объединить только 2 или 3 фьючерса. Однако вам, вероятно, следует начать с CompletableFuture.completedFuture(new ClassD()), поскольку создание экземпляра, вероятно, не стоит запускать асинхронно. Фактически, вы даже можете создать его экземпляр в thenApply() в первом будущем. - person Didier L; 07.03.2017