У меня есть библиотека, которую использует клиент, и они передают объект DataRequest
, в котором есть userid
, timeout
и некоторые другие поля. Теперь я использую этот объект DataRequest
для создания URL-адреса, а затем я делаю HTTP-вызов, используя RestTemplate
, и моя служба возвращает ответ JSON, который я использую для создания объекта DataResponse
и возвращаю им этот объект DataResponse
.
Ниже представлен мой класс DataClient
, который клиент использует, передав ему объект DataRequest
. Я использую значение тайм-аута, переданное клиентом в DataRequest
, для тайм-аута запроса, если он занимает слишком много времени в методе getSyncData
.
public class DataClient implements Client {
private RestTemplate restTemplate = new RestTemplate();
// first executor
private ExecutorService service = Executors.newFixedThreadPool(15);
@Override
public DataResponse getSyncData(DataRequest key) {
DataResponse response = null;
Future<DataResponse> responseFuture = null;
try {
responseFuture = getAsyncData(key);
response = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
} catch (TimeoutException ex) {
response = new DataResponse(DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR);
responseFuture.cancel(true);
// logging exception here
}
return response;
}
@Override
public Future<DataResponse> getAsyncData(DataRequest key) {
DataFetcherTask task = new DataFetcherTask(key, restTemplate);
Future<DataResponse> future = service.submit(task);
return future;
}
}
DataFetcherTask
класс:
public class DataFetcherTask implements Callable<DataResponse> {
private DataRequest key;
private RestTemplate restTemplate;
public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
this.key = key;
this.restTemplate = restTemplate;
}
@Override
public DataResponse call() throws Exception {
// In a nutshell below is what I am doing here.
// 1. Make an url using DataRequest key.
// 2. And then execute the url RestTemplate.
// 3. Make a DataResponse object and return it.
// I am calling this whole logic in call method as LogicA
}
}
На данный момент мой класс DataFetcherTask
отвечает за один ключ DataRequest
, как показано выше.
Постановка проблемы:-
Сейчас у меня небольшое изменение дизайна. Клиент передаст объект DataRequest
(например, keyA) в мою библиотеку, а затем я сделаю новый http-вызов другой службе (чего я не делаю в своем текущем дизайне), используя идентификатор пользователя, присутствующий в объекте DataRequest
(keyA), который даст мне список идентификаторов пользователей, поэтому я буду использовать эти идентификаторы пользователей и создам несколько других объектов DataRequest
(keyB, keyC, keyD), по одному для каждого идентификатора пользователя, возвращаемого в ответе. И тогда у меня будет объект List<DataRequest>
, который будет иметь объект keyB, keyC и keyD DataRequest
. Максимальный элемент в List<DataRequest>
будет равен трем, вот и все.
Теперь для каждого из этих DataRequest
объектов в List<DataRequest>
я хочу выполнить описанный выше метод DataFetcherTask.call
параллельно, а затем сделать List<DataResponse>
, добавив каждый DataResponse
для каждого ключа. Так что у меня будет три параллельных вызова DataFetcherTask.call
. Идея этого параллельного вызова состоит в том, чтобы получить данные для всех этих максимальных трех ключей в одном глобальном значении времени ожидания.
Итак, мое предложение: класс DataFetcherTask
вернет объект List<DataResponse>
вместо DataResponse
, а затем изменится и сигнатура методов getSyncData
и getAsyncData
. Итак, вот алгоритм:
- Используйте объект DataRequest, переданный клиентом, чтобы сделать
List<DataRequest>
, вызвав другую службу HTTP. - Сделайте параллельный вызов для каждого
DataRequest
в методеList<DataRequest>
toDataFetcherTask.call
и верните клиентуList<DataResponse>
объект вместоDataResponse
.
Таким образом, я могу применить тот же глобальный тайм-аут на шаге 1, а также на шаге 2. Если любой из вышеперечисленных шагов требует времени, мы просто прервем тайм-аут в методе getSyncData
.
DataFetcherTask
класс после изменения конструкции:
public class DataFetcherTask implements Callable<List<DataResponse>> {
private DataRequest key;
private RestTemplate restTemplate;
// second executor here
private ExecutorService executorService = Executors.newFixedThreadPool(10);
public DataFetcherTask(DataRequest key, RestTemplate restTemplate) {
this.key = key;
this.restTemplate = restTemplate;
}
@Override
public List<DataResponse> call() throws Exception {
List<DataRequest> keys = generateKeys();
CompletionService<DataResponse> comp = new ExecutorCompletionService<>(executorService);
int count = 0;
for (final DataRequest key : keys) {
comp.submit(new Callable<DataResponse>() {
@Override
public DataResponse call() throws Exception {
return performDataRequest(key);
}
});
}
List<DataResponse> responseList = new ArrayList<DataResponse>();
while (count-- > 0) {
Future<DataResponse> future = comp.take();
responseList.add(future.get());
}
return responseList;
}
// In this method I am making a HTTP call to another service
// and then I will make List<DataRequest> accordingly.
private List<DataRequest> generateKeys() {
List<DataRequest> keys = new ArrayList<>();
// use key object which is passed in contructor to make HTTP call to another service
// and then make List of DataRequest object and return keys.
return keys;
}
private DataResponse performDataRequest(DataRequest key) {
// This will have all LogicA code here which is shown in my original design.
// everything as it is same..
}
}
Теперь мой вопрос -
- Это должно быть так? Каков правильный дизайн для решения этой проблемы? Я имею в виду, что использование метода
call
в другом методеcall
выглядит странно? - Нужно ли нам иметь двух исполнителей, как в моем коде? Есть ли лучший способ решить эту проблему или какое-либо упрощение/изменение дизайна, которое мы можем здесь сделать?
Я упростил код, чтобы понять, что я пытаюсь сделать.
ExecutorService
для всех моих клиентов в моем текущем сценарии? Кроме того, где вы видите, что у меня есть ожидающий поток, из-за которого я должен исследоватьForkJoinPool
? Я имею в виду, почему вы рекомендовалиForkJoinPool
здесь? - person john   schedule 16.12.2015static
.Future.get()
— блокирующая операция. Представьте, что у вас есть много потоков, выполняющих это в том же пуле, что и тот, который выполняет реальную работу. Вы можете дойти до того, что все потоки в пуле будут заблокированы. - person Peter Lawrey   schedule 16.12.2015CompletableFuture
предлагает мощную основу для объединения задач в цепочку. Если нет, тоListenableFuture
в Guava предоставляет аналогичную функциональность, но без использования лямбда-выражений Java 8. - person Andrew Rueckert   schedule 19.12.2015ListenableFuture
, но я никогда не использовал его раньше, поэтому не знаю, как его можно использовать в моем примере. - person john   schedule 19.12.2015Guava's ListenableFuture
. Я не уверен, как я могу использовать его здесь? Любой пример мне очень поможет. - person john   schedule 24.12.2015getSyncData(...)
. Вам нужна такая опция в вашей библиотеке? - person dezhik   schedule 25.12.2015if one request fails at server side, my code will return partial response I mean success for other requests and failure for the one failed right
. Я не вижу у вас текущей реализацииperformDataRequest(DataRequest key)
, но в вашем посте на CodeReview было что-то вроде этого. ‹br/›В моем предыдущем комментарии я говорил о возможности возврата частичного результата даже после глобального тайм-аута. - person dezhik   schedule 25.12.2015