выполнение метода параллельно из метода вызова

У меня есть библиотека, которую использует клиент, и они передают объект DataRequest, в котором есть userid, timeout и некоторые другие поля. Теперь я использую этот объект DataRequest для создания URL-адреса, а затем я делаю HTTP-вызов, используя RestTemplate, и моя служба возвращает ответ JSON, который я использую для создания объекта DataResponse и возвращаю им этот объект DataResponse.

Ниже представлен мой класс DataClient, который клиент использует, передав ему объект DataRequest. Я использую значение тайм-аута, переданное клиентом в DataRequest, для тайм-аута запроса, если он занимает слишком много времени в методе getSyncData.

public class DataClient implements Client {

    private RestTemplate restTemplate = new RestTemplate();
    // first executor
    private ExecutorService service = Executors.newFixedThreadPool(15);

    @Override
    public DataResponse getSyncData(DataRequest key) {
        DataResponse response = null;
        Future<DataResponse> responseFuture = null;

        try {
            responseFuture = getAsyncData(key);
            response = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
        } catch (TimeoutException ex) {
            response = new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR);
            responseFuture.cancel(true);
            // logging exception here               
        }

        return response;
    }   

    @Override
    public Future<DataResponse> getAsyncData(DataRequest key) {
        DataFetcherTask task = new DataFetcherTask(key, restTemplate);
        Future<DataResponse> future = service.submit(task);

        return future;
    }
}

DataFetcherTask класс:

public class DataFetcherTask implements Callable<DataResponse> {

    private DataRequest key;
    private RestTemplate restTemplate;

    public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public DataResponse call() throws Exception {
        // In a nutshell below is what I am doing here. 
        // 1. Make an url using DataRequest key.
        // 2. And then execute the url RestTemplate.
        // 3. Make a DataResponse object and return it.

        // I am calling this whole logic in call method as LogicA
    }
}

На данный момент мой класс DataFetcherTask отвечает за один ключ DataRequest, как показано выше.

Постановка проблемы:-

Сейчас у меня небольшое изменение дизайна. Клиент передаст объект DataRequest (например, keyA) в мою библиотеку, а затем я сделаю новый http-вызов другой службе (чего я не делаю в своем текущем дизайне), используя идентификатор пользователя, присутствующий в объекте DataRequest (keyA), который даст мне список идентификаторов пользователей, поэтому я буду использовать эти идентификаторы пользователей и создам несколько других объектов DataRequest (keyB, keyC, keyD), по одному для каждого идентификатора пользователя, возвращаемого в ответе. И тогда у меня будет объект List<DataRequest>, который будет иметь объект keyB, keyC и keyD DataRequest. Максимальный элемент в List<DataRequest> будет равен трем, вот и все.

Теперь для каждого из этих DataRequest объектов в List<DataRequest> я хочу выполнить описанный выше метод DataFetcherTask.call параллельно, а затем сделать List<DataResponse>, добавив каждый DataResponse для каждого ключа. Так что у меня будет три параллельных вызова DataFetcherTask.call. Идея этого параллельного вызова состоит в том, чтобы получить данные для всех этих максимальных трех ключей в одном глобальном значении времени ожидания.

Итак, мое предложение: класс DataFetcherTask вернет объект List<DataResponse> вместо DataResponse, а затем изменится и сигнатура методов getSyncData и getAsyncData. Итак, вот алгоритм:

  • Используйте объект DataRequest, переданный клиентом, чтобы сделать List<DataRequest>, вызвав другую службу HTTP.
  • Сделайте параллельный вызов для каждого DataRequest в методе List<DataRequest> to DataFetcherTask.call и верните клиенту List<DataResponse> объект вместо DataResponse.

Таким образом, я могу применить тот же глобальный тайм-аут на шаге 1, а также на шаге 2. Если любой из вышеперечисленных шагов требует времени, мы просто прервем тайм-аут в методе getSyncData.

DataFetcherTask класс после изменения конструкции:

public class DataFetcherTask implements Callable<List<DataResponse>> {

    private DataRequest key;
    private RestTemplate restTemplate;
    // second executor here
    private ExecutorService executorService = Executors.newFixedThreadPool(10);

    public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public List<DataResponse> call() throws Exception {
        List<DataRequest> keys = generateKeys();
        CompletionService<DataResponse> comp = new ExecutorCompletionService<>(executorService);

        int count = 0;
        for (final DataRequest key : keys) {
            comp.submit(new Callable<DataResponse>() {
                @Override
                public DataResponse call() throws Exception {
                    return performDataRequest(key);
                }
            });
        }

        List<DataResponse> responseList = new ArrayList<DataResponse>();
        while (count-- > 0) {
            Future<DataResponse> future = comp.take();
            responseList.add(future.get());
        }
        return responseList;
    }

    // In this method I am making a HTTP call to another service
    // and then I will make List<DataRequest> accordingly.
    private List<DataRequest> generateKeys() {
        List<DataRequest> keys = new ArrayList<>();
        // use key object which is passed in contructor to make HTTP call to another service
        // and then make List of DataRequest object and return keys.
        return keys;
    }       

    private DataResponse performDataRequest(DataRequest key) {
        // This will have all LogicA code here which is shown in my original design.
        // everything as it is same..
    }
}

Теперь мой вопрос -

  • Это должно быть так? Каков правильный дизайн для решения этой проблемы? Я имею в виду, что использование метода call в другом методе call выглядит странно?
  • Нужно ли нам иметь двух исполнителей, как в моем коде? Есть ли лучший способ решить эту проблему или какое-либо упрощение/изменение дизайна, которое мы можем здесь сделать?

Я упростил код, чтобы понять, что я пытаюсь сделать.


person john    schedule 16.12.2015    source источник
comment
Вызов метода для коллекции может разумно вызывать метод для каждого элемента. Это довольно распространенный шаблон. У вас может быть один глобальный ExecutorService для всех клиентов, хотя использование ForkJoinPool может быть лучшим выбором, поскольку вы хотите, чтобы ожидающий поток выполнял некоторую работу во время ожидания.   -  person Peter Lawrey    schedule 16.12.2015
comment
Итак, как я могу иметь глобальные ExecutorService для всех моих клиентов в моем текущем сценарии? Кроме того, где вы видите, что у меня есть ожидающий поток, из-за которого я должен исследовать ForkJoinPool? Я имею в виду, почему вы рекомендовали ForkJoinPool здесь?   -  person john    schedule 16.12.2015
comment
Вы делаете поле глобальным, делая его static. Future.get() — блокирующая операция. Представьте, что у вас есть много потоков, выполняющих это в том же пуле, что и тот, который выполняет реальную работу. Вы можете дойти до того, что все потоки в пуле будут заблокированы.   -  person Peter Lawrey    schedule 16.12.2015
comment
Какую версию Java вы используете? Если доступна Java 8, CompletableFuture предлагает мощную основу для объединения задач в цепочку. Если нет, то ListenableFuture в Guava предоставляет аналогичную функциональность, но без использования лямбда-выражений Java 8.   -  person Andrew Rueckert    schedule 19.12.2015
comment
Я нахожусь на Java 7 на данный момент. Моей компании потребуется некоторое время, чтобы начать использовать Java 8. :( ... Ох, я вижу, я могу использовать ListenableFuture, но я никогда не использовал его раньше, поэтому не знаю, как его можно использовать в моем примере.   -  person john    schedule 19.12.2015
comment
@AndrewRueckert Если возможно, вы можете привести пример с Guava's ListenableFuture. Я не уверен, как я могу использовать его здесь? Любой пример мне очень поможет.   -  person john    schedule 24.12.2015
comment
Иногда гораздо лучше дать частичный результат, чем ничего, если один из нескольких запросов не удался (или не завершился вовремя), как это может случиться в getSyncData(...). Вам нужна такая опция в вашей библиотеке?   -  person dezhik    schedule 25.12.2015
comment
@dezhik Да, я так думаю, но я предполагаю, что в моем коде, если один запрос завершится ошибкой на стороне сервера, мой код вернет частичный ответ. Я имею в виду успех для других запросов и неудачу для одного, верно? нет? Но на стороне клиента, если он занимает больше, чем глобальный тайм-аут, он истечет все. Это то, что происходит прямо сейчас в моем коде.   -  person john    schedule 25.12.2015
comment
if one request fails at server side, my code will return partial response I mean success for other requests and failure for the one failed right. Я не вижу у вас текущей реализации performDataRequest(DataRequest key), но в вашем посте на CodeReview было что-то вроде этого. ‹br/›В моем предыдущем комментарии я говорил о возможности возврата частичного результата даже после глобального тайм-аута.   -  person dezhik    schedule 25.12.2015
comment
Да, что у меня было в ЧР, то же самое и здесь. Просто хотел сделать это проще, поэтому я удалил код здесь. И похоже, что я узнал о фреймворке ForkJoin после того, как Питер/Стефан предложил мне. Можем ли мы вернуть частичный ответ в случае глобального тайм-аута?   -  person john    schedule 25.12.2015
comment
Проверьте эту библиотеку github.com/Netflix/Hystrix.   -  person mommcilo    schedule 26.12.2015


Ответы (2)


Как уже упоминалось в комментариях к вашему вопросу, вы можете использовать платформу Java ForkJoin. Это сэкономит вам дополнительный пул потоков в вашем DataFetcherTask.

Вам просто нужно использовать ForkJoinPool в DataClient и преобразовать DataFetcherTask в RecursiveTask (один из подтипов ForkJoinTask). Это позволяет легко выполнять другие подзадачи параллельно.

Итак, после этих модификаций ваш код будет выглядеть примерно так:

Задача сборщика данных

DataFetcherTask теперь является RecursiveTask, который сначала генерирует ключи и вызывает подзадачи для каждого сгенерированного ключа. Эти подзадачи выполняются в том же ForkJoinPool, что и родительская задача.

public class DataFetcherTask extends RecursiveTask<List<DataResponse>> {

  private final DataRequest key;
  private final RestTemplate restTemplate;

  public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
      this.key = key;
      this.restTemplate = restTemplate;
  }

  @Override
  protected List<DataResponse> compute() {
    // Create subtasks for the key and invoke them
    List<DataRequestTask> requestTasks = requestTasks(generateKeys());
    invokeAll(requestTasks);

    // All tasks are finished if invokeAll() returns.
    List<DataResponse> responseList = new ArrayList<>(requestTasks.size());
    for (DataRequestTask task : requestTasks) {
      try {
        responseList.add(task.get());
      } catch (InterruptedException | ExecutionException e) {
        // TODO - Handle exception properly
        Thread.currentThread().interrupt();
        return Collections.emptyList();
      }
    }

    return responseList;
  }

  private List<DataRequestTask> requestTasks(List<DataRequest> keys) {
    List<DataRequestTask> tasks = new ArrayList<>(keys.size());
    for (DataRequest key : keys) {
      tasks.add(new DataRequestTask(key));
    }

    return tasks;
  }

  // In this method I am making a HTTP call to another service
  // and then I will make List<DataRequest> accordingly.
  private List<DataRequest> generateKeys() {
      List<DataRequest> keys = new ArrayList<>();
      // use key object which is passed in contructor to make HTTP call to another service
      // and then make List of DataRequest object and return keys.
      return keys;
  }

  /** Inner class for the subtasks. */
  private static class DataRequestTask extends RecursiveTask<DataResponse> {

    private final DataRequest request;

    public DataRequestTask(DataRequest request) {
      this.request = request;
    }

    @Override
    protected DataResponse compute() {
      return performDataRequest(this.request);
    }

    private DataResponse performDataRequest(DataRequest key) {
      // This will have all LogicA code here which is shown in my original design.
      // everything as it is same..
      return new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK);
    }
  }

}

Клиент данных

DataClient не сильно изменится, за исключением нового пула потоков:

public class DataClient implements Client {

  private final RestTemplate restTemplate = new RestTemplate();
  // Replace the ExecutorService with a ForkJoinPool
  private final ForkJoinPool service = new ForkJoinPool(15);

  @Override
  public List<DataResponse> getSyncData(DataRequest key) {
      List<DataResponse> responsList = null;
      Future<List<DataResponse>> responseFuture = null;

      try {
          responseFuture = getAsyncData(key);
          responsList = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
      } catch (TimeoutException | ExecutionException | InterruptedException ex) {
          responsList = Collections.singletonList(new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR));
          responseFuture.cancel(true);
          // logging exception here
      }

      return responsList;
  }

  @Override
  public Future<List<DataResponse>> getAsyncData(DataRequest key) {
      DataFetcherTask task = new DataFetcherTask(key, this.restTemplate);
      return this.service.submit(task);
  }
}

Как только вы перейдете на Java8, вы можете подумать об изменении реализации на CompletableFutures. Тогда это будет выглядеть примерно так:

DataClientCF

public class DataClientCF {

  private final RestTemplate restTemplate = new RestTemplate();
  private final ExecutorService executor = Executors.newFixedThreadPool(15);

  public List<DataResponse> getData(DataRequest initialKey) {
    return CompletableFuture.supplyAsync(() -> generateKeys(initialKey), this.executor)
      .thenApply(requests -> requests.stream().map(this::supplyRequestAsync).collect(Collectors.toList()))
      .thenApply(responseFutures -> responseFutures.stream().map(future -> future.join()).collect(Collectors.toList()))
      .exceptionally(t -> { throw new RuntimeException(t); })
      .join();
  }

  private List<DataRequest> generateKeys(DataRequest key) {
    return new ArrayList<>();
  }

  private CompletableFuture<DataResponse> supplyRequestAsync(DataRequest key) {
    return CompletableFuture.supplyAsync(() -> new DataResponse(DataErrorEnum.OK, DataStatusEnum.OK), this.executor);
  }
}

Как упоминалось в комментариях, ListenableFutures в Guava будут обеспечивать аналогичную функциональность для Java7, но без лямбда-выражений они становятся неуклюжими.

person Stefan Ferstl    schedule 20.12.2015

Насколько я знаю, RestTemplate блокирует, об этом сказано в ForkJoinPool JavaDoc в ForkJoinTask:

Вычисления должны избегать синхронизированных методов или блоков и должны сводить к минимуму другую блокирующую синхронизацию, кроме присоединения к другим задачам или использования синхронизаторов, таких как Phaser, которые объявлены для взаимодействия с планированием fork/join. ...
Задачи также не должны выполнять блокирующий ввод-вывод,...

Звонок в звонок избыточен.
И вам не нужны два исполнителя. Также вы можете вернуть частичный результат в getSyncData(DataRequest key). Это можно сделать следующим образом:

DataClient.java

public class DataClient implements Client {

    private RestTemplate restTemplate = new RestTemplate();
    // first executor
    private ExecutorService service = Executors.newFixedThreadPool(15);

    @Override
    public List<DataResponse> getSyncData(DataRequest key) {
        List<DataResponse> responseList = null;
        DataFetcherResult response = null;
        try {
            response = getAsyncData(key);
            responseList = response.get(key.getTimeout(), key.getTimeoutUnit());
        } catch (TimeoutException ex) {
            response.cancel(true);
            responseList = response.getPartialResult();
        }
        return responseList;
    }

    @Override
    public DataFetcherResult getAsyncData(DataRequest key) {
        List<DataRequest> keys = generateKeys(key);
        final List<Future<DataResponse>> responseList = new ArrayList<>();
        final CountDownLatch latch = new CountDownLatch(keys.size());//assume keys is not null
        for (final DataRequest _key : keys) {
            responseList.add(service.submit(new Callable<DataResponse>() {
                @Override
                public DataResponse call() throws Exception {
                    DataResponse response = null;
                    try {
                        response = performDataRequest(_key);
                    } finally {
                        latch.countDown();
                        return response;
                    }
                }
            }));
        }
        return new DataFetcherResult(responseList, latch);
    }

    // In this method I am making a HTTP call to another service
    // and then I will make List<DataRequest> accordingly.
    private List<DataRequest> generateKeys(DataRequest key) {
        List<DataRequest> keys = new ArrayList<>();
        // use key object which is passed in contructor to make HTTP call to another service
        // and then make List of DataRequest object and return keys.
        return keys;
    }

    private DataResponse performDataRequest(DataRequest key) {
        // This will have all LogicA code here which is shown in my original design.
        // everything as it is same..
        return null;
    }
}

DataFetcherResult.java

public class DataFetcherResult implements Future<List<DataResponse>> {
    final List<Future<DataResponse>> futures;
    final CountDownLatch latch;

    public DataFetcherResult(List<Future<DataResponse>> futures, CountDownLatch latch) {
        this.futures = futures;
        this.latch = latch;
    }

    //non-blocking
    public List<DataResponse> getPartialResult() {
        List<DataResponse> result = new ArrayList<>(futures.size());
        for (Future<DataResponse> future : futures) {
            try {
                result.add(future.isDone() ? future.get() : null);
                //instead of null you can return new DataResponse(DataErrorEnum.NOT_READY, DataStatusEnum.ERROR);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
                //ExecutionException or CancellationException could be thrown, especially if DataFetcherResult was cancelled
                //you can handle them here and return DataResponse with corresponding DataErrorEnum and DataStatusEnum
            }
        }
        return result;
    }

    @Override
    public List<DataResponse> get() throws ExecutionException, InterruptedException {
        List<DataResponse> result = new ArrayList<>(futures.size());
        for (Future<DataResponse> future : futures) {
            result.add(future.get());
        }
        return result;
    }

    @Override
    public List<DataResponse> get(long timeout, TimeUnit timeUnit)
            throws ExecutionException, InterruptedException, TimeoutException {
        if (latch.await(timeout, timeUnit)) {
            return get();
        }
        throw new TimeoutException();//or getPartialResult()
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        boolean cancelled = true;
        for (Future<DataResponse> future : futures) {
            cancelled &= future.cancel(mayInterruptIfRunning);
        }
        return cancelled;
    }

    @Override
    public boolean isCancelled() {
        boolean cancelled = true;
        for (Future<DataResponse> future : futures) {
            cancelled &= future.isCancelled();
        }
        return cancelled;
    }

    @Override
    public boolean isDone() {
        boolean done = true;
        for (Future<DataResponse> future : futures) {
            done &= future.isDone();
        }
        return done;
    }

    //and etc.
}

Я написал это с помощью CountDownLatch, и это выглядит здорово, но обратите внимание, что есть нюанс. Вы можете ненадолго застрять в DataFetcherResult.get(long timeout, TimeUnit timeUnit), потому что CountDownLatch не синхронизировано с будущим состоянием. И может случиться так, что latch.getCount() == 0, но не все фьючерсы вернут future.isDone() == true одновременно. Потому что они уже передали latch.countDown(); внутри блока finally {} Callable, но не изменили внутренний state, который по-прежнему равен NEW.
Поэтому вызов get() внутри get(long timeout, TimeUnit timeUnit) может вызвать небольшую задержку.
Аналогичный случай был описан здесь.

Get с тайм-аутом DataFetcherResult.get(...) можно переписать с использованием фьючерсов future.get(long timeout, TimeUnit timeUnit), и вы можете удалить CountDownLatch из класса.

public List<DataResponse> get(long timeout, TimeUnit timeUnit)
        throws ExecutionException, InterruptedException{
    List<DataResponse> result = new ArrayList<>(futures.size());
    long timeoutMs = timeUnit.toMillis(timeout);
    boolean timeout = false;
    for (Future<DataResponse> future : futures) {
        long beforeGet = System.currentTimeMillis();
        try {
            if (!timeout && timeoutMs > 0) {
                result.add(future.get(timeoutMs, TimeUnit.MILLISECONDS));
                timeoutMs -= System.currentTimeMillis() - beforeGet;
            } else {
                if (future.isDone()) {
                    result.add(future.get());
                } else {
                    //result.add(new DataResponse(DataErrorEnum.NOT_READY, DataStatusEnum.ERROR)); ?
                }
            }
        } catch (TimeoutException e) {
            result.add(new DataResponse(DataErrorEnum.TIMEOUT, DataStatusEnum.ERROR));
            timeout = true;
        }
        //you can also handle ExecutionException or CancellationException here
    }

    return result;
}

Этот код был приведен в качестве примера, и его следует протестировать перед использованием в производстве, но он кажется законным :)

person dezhik    schedule 26.12.2015