Как правильно читать Flux ‹DataBuffer› и преобразовывать его в один inputStream

Я использую WebClient и пользовательский BodyExtractorclass для своего приложения с весенней загрузкой

WebClient webLCient = WebClient.create();
webClient.get()
   .uri(url, params)
   .accept(MediaType.APPLICATION.XML)
   .exchange()
   .flatMap(response -> {
     return response.body(new BodyExtractor());
   })

BodyExtractor.java

@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
  Flux<DataBuffer> body = response.getBody();
  body.map(dataBuffer -> {
    try {
      JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
      Unmarshaller unmarshaller = jc.createUnmarshaller();

      return (T) unmarshaller.unmarshal(dataBuffer.asInputStream())
    } catch(Exception e){
       return null;
    }
  }).next();
}

Приведенный выше код работает с небольшой полезной нагрузкой, но не с большой полезной нагрузкой, я думаю, это потому, что я читаю только одно значение потока с next, и я не уверен, как объединить и прочитать все dataBuffer.

Я новичок в реакторе, поэтому не знаю многих трюков с флюсом / моно.


person Bk Santiago    schedule 28.09.2017    source источник


Ответы (6)


Это действительно не так сложно, как предполагают другие ответы.

Единственный способ передать данные без буферизации в памяти - использовать канал, как предложил @ jin-kwon. Однако это можно сделать очень просто, используя Spring BodyExtractors и DataBufferUtils служебные классы.

Пример:

private InputStream readAsInputStream(String url) throws IOException {
    PipedOutputStream osPipe = new PipedOutputStream();
    PipedInputStream isPipe = new PipedInputStream(osPipe);

    ClientResponse response = webClient.get().uri(url)
        .accept(MediaType.APPLICATION.XML)
        .exchange()
        .block();
    final int statusCode = response.rawStatusCode();
    // check HTTP status code, can throw exception if needed
    // ....

    Flux<DataBuffer> body = response.body(BodyExtractors.toDataBuffers())
        .doOnError(t -> {
            log.error("Error reading body.", t);
            // close pipe to force InputStream to error,
            // otherwise the returned InputStream will hang forever if an error occurs
            try(isPipe) {
              //no-op
            } catch (IOException ioe) {
                log.error("Error closing streams", ioe);
            }
        })
        .doFinally(s -> {
            try(osPipe) {
              //no-op
            } catch (IOException ioe) {
                log.error("Error closing streams", ioe);
            }
        });

    DataBufferUtils.write(body, osPipe)
        .subscribe(DataBufferUtils.releaseConsumer());

    return isPipe;
}

Если вам не нужно проверять код ответа или генерировать исключение для кода состояния сбоя, вы можете пропустить вызов block() и промежуточную переменную ClientResponse, используя

flatMap(r -> r.body(BodyExtractors.toDataBuffers()))

вместо.

person user1585916    schedule 07.10.2019
comment
выглядит многообещающим и простым, вероятно, это правильный ответ для обработки больших запросов. Я попробую это, если у меня будет время. - person Bk Santiago; 15.10.2019
comment
Я должен добавить, что я согласен с ранее высказанным @abhijit-sarkar комментарий, что WebClient не лучший инструмент для этой работы. Хотя это можно сделать (как я продемонстрировал), это не самый эффективный способ сделать это. Если вам нужен только InputStream, вам лучше использовать синхронный клиент, такой как java.net.http.HttpClient. Если вы застряли с WebClient, то я считаю, что мое решение - лучший вариант. - person user1585916; 24.10.2019
comment
Похоже, что если ошибки нет, isPipe никогда не закрывается - person Abhijit Sarkar; 26.07.2020
comment
@AbhijitSarkar, закрывать isPipe здесь незачем. Ответственность за это несет вызывающая сторона этого readAsInputStream() метода (т. Е. Использовать InputStream в блоке try-with-resources). doFinally() здесь гарантирует, что osPipe всегда будет закрыто, и это вызовет вызов приемника (isPipe) о достижении конца ввода и возвратит -1 / EOF при следующем чтении. - person user1585916; 29.07.2020
comment
Измените PipedInputSteam на PipedInputStream и MediaType.APPLICATION.XML на MediaType.APPLICATION_XML. Я избавился от кода состояния, поэтому мне нужно использовать flatMapMany(r -> r.body(BodyExtractors.toDataBuffers())) вместо flatMap(r -> r.body(BodyExtractors.toDataBuffers())) - person Juan; 24.08.2020
comment
Попробуйте использовать ресурсы для переменной, объявленной вне блока try, требуется минимальная версия Java 9 - person John Masiello; 25.09.2020
comment
Не работал с Java 8 с реактором-ядром 3.3.9.RELEASE. PipedInputStream и PipedOutputStream содержат только 0 без завершения. Вешает мой unmarshaller в call unmarshaller.unmarshal (isPipe). Фактически, в моем отладчике doFinally никогда не вызывается, что подозрительно. - person John Masiello; 26.09.2020
comment
не получилось получить java.io.IOException pipe not connected, любая подсказка по этому поводу - person Pandit Biradar; 08.07.2021

Слегка измененная версия ответа Bk Santiago использует reduce() вместо collect(). Очень похоже, но не требует дополнительного класса:

Джава:

body.reduce(new InputStream() {
    public int read() { return -1; }
  }, (s: InputStream, d: DataBuffer) -> new SequenceInputStream(s, d.asInputStream())
).flatMap(inputStream -> /* do something with single InputStream */

Или Котлин:

body.reduce(object : InputStream() {
  override fun read() = -1
}) { s: InputStream, d -> SequenceInputStream(s, d.asInputStream()) }
  .flatMap { inputStream -> /* do something with single InputStream */ }

Преимущество этого подхода перед использованием collect() в том, что вам просто не нужен другой класс для сбора информации.

Я создал новый пустой InputStream(), но если этот синтаксис сбивает с толку, вы также можете заменить его на ByteArrayInputStream("".toByteArray()), чтобы вместо этого создать пустой ByteArrayInputStream в качестве начального значения.

person samanime    schedule 14.08.2018
comment
Вместо new InputStream() { public int read() { return -1; } } можно использовать InputStream.nullInputStream() - person Saljack; 08.06.2020

Вот еще один вариант из других ответов. И это все еще плохо для памяти.

static Mono<InputStream> asStream(WebClient.ResponseSpec response) {
    return response.bodyToFlux(DataBuffer.class)
        .map(b -> b.asInputStream(true))
        .reduce(SequenceInputStream::new);
}

static void doSome(WebClient.ResponseSpec response) {
    asStream(response)
        .doOnNext(stream -> {
            // do some with stream
            // close the stream!!!
        })
        .block();
}
person Jin Kwon    schedule 13.10.2019
comment
Очень просто при работе с небольшими файлами. - person Jeremy; 01.12.2019
comment
@ Шины Я очень сомневаюсь в DataBuffer::asInputStream. См. asInputStream () - person Jin Kwon; 15.08.2020
comment
@JinKwon Ты прав. Мне интересно, почему я не вижу предупреждения Netty о невыпущенных буферах раньше - person Tires; 16.08.2020
comment
Будь осторожен. Если вы закроете SequenceInputStream (иначе вы получите невыпущенные ошибки буфера от Netty), это может очень легко вызвать StackoverflowError, если у вас большой файл или много маленьких буферов. - person Lakatos Gyula; 11.05.2021

Я смог заставить его работать, используя Flux#collect и SequenceInputStream

@Override
public Mono<T> extract(ClientHttpResponse response, BodyExtractor.Context context) {
  Flux<DataBuffer> body = response.getBody();
  return body.collect(InputStreamCollector::new, (t, dataBuffer)-> t.collectInputStream(dataBuffer.asInputStream))
    .map(inputStream -> {
      try {
        JaxBContext jc = JaxBContext.newInstance(SomeClass.class);
        Unmarshaller unmarshaller = jc.createUnmarshaller();

        return (T) unmarshaller.unmarshal(inputStream);
      } catch(Exception e){
        return null;
      }
  }).next();
}

InputStreamCollector.java

public class InputStreamCollector {
  private InputStream is;

  public void collectInputStream(InputStream is) {
    if (this.is == null) this.is = is;
    this.is = new SequenceInputStream(this.is, is);
  }

  public InputStream getInputStream() {
    return this.is;
  }
}
person Bk Santiago    schedule 28.09.2017
comment
почему вы пишете свой собственный BodyExtractor? WebFlux уже поддерживает Jaxb с Jaxb2XmlDecoder. - person Brian Clozel; 28.09.2017
comment
@BrianClozel мне нужно что-то настроить, чтобы он работал? bodyToMono, похоже, не забирает мои pojo. - person Bk Santiago; 29.09.2017
comment
Что InputStreamCollector? - person Abhijit Sarkar; 07.12.2017
comment
@AbhijitSarkar, пожалуйста, проверьте мое использование выше. - person Bk Santiago; 14.12.2017
comment
Интересно, но WebClient - неподходящий инструмент для этой работы. Вы реконструируете ответ InputStream, поэтому вы не получаете преимущества от использования WebClient. Лучше использовать обычный HTTP-клиент. - person Abhijit Sarkar; 14.12.2017
comment
Разве это решение не считывает все тело ответа в память? ByteBuffer хранит все данные в памяти, верно? Таким образом, результат InputStream будет таким же, как ByteArrayInputStream, поэтому это решение не обрабатывает большие данные. - person Ruslan Stelmachenko; 24.08.2018
comment
Я использую аналогичный подход. Две рекомендации, которые я бы добавил: 1. содержать ссылку на JAXBContext и 2. повторно использовать dataBuffer.asInputStream(true), так что возможное закрытие inputStream закроет также dataBuffer - person John Masiello; 25.09.2020

Есть гораздо более чистый способ сделать это, используя непосредственно базовую реакторную сеть HttpClient вместо использования WebClient. Иерархия композиции такая:

WebClient -uses-> HttpClient -uses-> TcpClient

Проще показать код, чем объяснить:

HttpClient.create()
    .get()
    .responseContent() // ByteBufFlux
    .aggregate() // ByteBufMono
    .asInputStream() // Mono<InputStream>
    .block() // We got an InputStream, yay!

Однако, как я уже отмечал, использование InputStream - это операция блокировки, которая сводит на нет цель использования неблокирующего HTTP-клиента, не говоря уже о агрегировании всего ответа. См. this для сравнения Java NIO и ввода-вывода.

person Abhijit Sarkar    schedule 27.07.2020

Можно использовать трубы.

static <R> Mono<R> pipeAndApply(
        final Publisher<DataBuffer> source, final Executor executor,
        final Function<? super ReadableByteChannel, ? extends R> function) {
    return using(Pipe::open,
                 p -> {
                     executor.execute(() -> write(source, p.sink())
                             .doFinally(s -> {
                                 try {
                                     p.sink().close();
                                 } catch (final IOException ioe) {
                                     log.error("failed to close pipe.sink", ioe);
                                     throw new RuntimeException(ioe);
                                 }
                             })
                             .subscribe(releaseConsumer()));
                     return just(function.apply(p.source()));
                 },
                 p -> {
                     try {
                         p.source().close();
                     } catch (final IOException ioe) {
                         log.error("failed to close pipe.source", ioe);
                         throw new RuntimeException(ioe);
                     }
                 });
}

Или используя CompletableFuture,

static <R> Mono<R> pipeAndApply(
        final Publisher<DataBuffer> source,
        final Function<? super ReadableByteChannel, ? extends R> function) {
    return using(Pipe::open,
                 p -> fromFuture(supplyAsync(() -> function.apply(p.source())))
                         .doFirst(() -> write(source, p.sink())
                                 .doFinally(s -> {
                                     try {
                                         p.sink().close();
                                     } catch (final IOException ioe) {
                                         log.error("failed to close pipe.sink", ioe);
                                         throw new RuntimeException(ioe);
                                     }
                                 })
                                 .subscribe(releaseConsumer())),
                 p -> {
                     try {
                         p.source().close();
                     } catch (final IOException ioe) {
                         log.error("failed to close pipe.source", ioe);
                         throw new RuntimeException(ioe);
                     }
                 });
}
person Jin Kwon    schedule 12.05.2019