Как выполнить асинхронный входной поток HTTP-клиента, который не является входным потоком массива байтов?

Я использую Async Http Client для загрузки множества (возможно, больших) файлов из Интернета. .

В моем конкретном случае мне нужно отправить по InputStream байтов с этих URL-адресов загрузки в другую службу для анализа.

Наивным подходом было бы сделать это:

AsyncHttpClient asyncHttpClient = Dsl.asyncHttpClient(Dsl.config()
    .setMaxConnectionsPerHost(-1)
    .setMaxConnections(-1)
    .setPooledConnectionIdleTimeout(60 * 10 * 1000)
    .setConnectionTtl(6 * 60 * 1000)
    .setConnectTimeout(5 * 1000)
    .setRequestTimeout(5 * 60 * 1000)
    .setFollowRedirect(true)
    .setRealm(new Realm.Builder(username, password)
        .setNtlmDomain(domain)
        .setScheme(Realm.AuthScheme.NTLM)
        .build())
Response httpGetResponse = asyncHttpClient.prepareGet(url).execute().get();
return httpGetResponse.getResponseBodyAsStream();

Но в этом руководстве для асинхронных HTTP-запросов мы узнаем, что в отличие от http-клиента HTTP Components, асинхронный http-клиент будет загружать весь файл в память.

В моем случае это быстро вызовет OOM.

Итак, альтернатива такова:

Response httpGetResponse = asyncHttpClient.prepareGet(url).execute(new AsyncHandler<Response>() {
    private final Response.ResponseBuilder builder = new Response.ResponseBuilder();

    @Override
    public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
      bodyPart.getBodyByteBuffer(); // Each chunk of bytes will be fed into this method.
                                    // I need to write these bytes to the resuting input stream
                                    // without streaming them all into memory.
      return State.CONTINUE;
    }

    @Override
    public State onHeadersReceived(HttpHeaders headers) throws Exception {
      builder.accumulate(headers);
      return State.CONTINUE;
    }

    @Override
    public State onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
      builder.accumulate(responseStatus);
      return State.CONTINUE;
    }

    @Override
    public Response onCompleted() throws Exception {
      return builder.build();
    }

    @Override
    public void onThrowable(Throwable t) {

    }
  }).get();

Каков самый простой и чистый способ получить эти байты по мере их поступления во входной поток?

У меня есть две идеи:

1) Запишите входные данные в файл, затем выполните потоковую передачу файла или 2) Сразу же верните входной поток по конвейеру, и байты будут записаны в поток ввода по конвейеру по мере их получения.

У кого-нибудь есть рабочий пример, которым они могут поделиться с этим?


person Nicholas DiPiazza    schedule 18.05.2018    source источник


Ответы (1)


Я правильно предположил, что кто-то уже сделал это. На самом деле, после того, как я выполнил поиск по «асинхронному http-клиенту» и «конвейерному входному потоку», я нашел это в самом проекте:

https://github.com/AsyncHttpClient/async-http-client/blob/master/client/src/main/java/org/asynchttpclient/handler/BodyDeferringAsyncHandler.java

Применение:

  PipedInputStream pipedInputStream = new PipedInputStream();
  PipedOutputStream pipedOutputStream = new PipedOutputStream(pipedInputStream);
  BodyDeferringAsyncHandler bodyDeferringAsyncHandler = new BodyDeferringAsyncHandler(pipedOutputStream);
  Future<Response> futureResponse = asyncHttpClient.prepareGet(url).execute(bodyDeferringAsyncHandler);
  Response response = bodyDeferringAsyncHandler.getResponse();
  if (response.getStatusCode() == 200) {
    return new BodyDeferringAsyncHandler.BodyDeferringInputStream(futureResponse,
        bodyDeferringAsyncHandler,
        pipedInputStream);
  } else {
    return null;
  }
person Nicholas DiPiazza    schedule 18.05.2018