Как конвертировать Reactor Flux‹String› в InputStream

Учитывая, что у меня есть Flux<String> неизвестного размера, как я могу преобразовать его в InputStream, который ожидает другая библиотека?

Например, с помощью WebClient я могу добиться этого, используя этот подход.

WebClient.get('example.com').exchange.flatMap { it.bodyToMono(InputStreamResource::class.java) }.map { it.inputStream }

но я не могу понять, как сделать то же самое, когда у меня есть Flux<String> в качестве входа?


person Artem Yarulin    schedule 05.08.2018    source источник
comment
Откуда вы получаете Flux‹String›? Вы можете начать с просмотра DataBufferUtils, который может считывать ресурс в DataBuffer и в InputStream.   -  person Kevin Hussey    schedule 07.08.2018
comment
Я получил его из внешней очереди и впоследствии обработал. Да, я видел DataBufferUtils, но не смог понять, как преобразовать Flux‹String› в DataBuffer, а затем в InputStream. У вас есть пример? Спасибо   -  person Artem Yarulin    schedule 08.08.2018
comment
Является ли подпись API InputStream или Flux‹InputStream›? - можете ли вы расширить свой образец, включив в него полный код?   -  person Kevin Hussey    schedule 08.08.2018
comment
Советы можно найти здесь: github.com/entzik/reactive-spring-boot-examples/blob/master/src/ — но обычно чтение из InputStream блокирует/извлекает данные, а rx больше толкает данные вниз по течению.   -  person Kevin Hussey    schedule 08.08.2018


Ответы (4)


Вероятно, есть много способов сделать это. Одна из возможностей — использовать PipedInputStream и PipedOutputStream.

Это работает следующим образом: вы связываете поток вывода с потоком ввода, так что все, что вы пишете в поток вывода, может быть прочитано из связанного потока ввода, тем самым создавая канал между ними двумя.

PipedInputStream in = new PipedInputStream();
PipedOutputStream out = PipedOutputStream(in);

Однако есть одно предостережение, согласно документации конвейерных потоков, процесс записи и процесс чтения должны происходить в отдельных потоках, иначе мы можем вызвать взаимоблокировку.

Итак, возвращаясь к нашему сценарию с реактивным потоком, мы можем создать конвейер (как упоминалось выше) и подписаться на объект Flux, а данные, которые вы получаете от него, вы записываете в конвейерный выходной поток. Что бы вы там ни написали, будет доступно для чтения на другой стороне канала, в соответствующем входном потоке. Этот входной поток — тот, которым вы можете поделиться со своим нереактивным методом.

Нам просто нужно быть особенно осторожными, чтобы подписаться на Flux в отдельном потоке, например. subscribeOn(Schedulers.elastic()).

Вот очень простая реализация такого подписчика:

class PipedStreamSubscriber extends BaseSubscriber<byte[]> {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private final PipedInputStream in;
    private PipedOutputStream out;

    PipedStreamSubscriber(PipedInputStream in) {
        Objects.requireNonNull(in, "The input stream must not be null");
        this.in = in;
    }

    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        //change if you want to control back-pressure
        super.hookOnSubscribe(subscription);
        try {
            this.out = new PipedOutputStream(in);
        } catch (IOException e) {
            //TODO throw a contextual exception here
            throw new RuntimeException(e);
        }
    }

    @Override
    protected void hookOnNext(byte[] payload) {
        try {
            out.write(payload);
        } catch (IOException e) {
            //TODO throw a contextual exception here
            throw new RuntimeException(e);
        }
    }

    @Override
    protected void hookOnComplete() {
        close();
    }

    @Override
    protected void hookOnError(Throwable error) {
        //TODO handle the error or at least log it
        logger.error("Failure processing stream", error);
        close();
    }

    @Override
    protected void hookOnCancel() {
        close();
    }

    private void close() {
        try {
            if (out != null) {
                out.close();
            }
        } catch (IOException e) {
            //probably just ignore this one or simply  log it
        }
    }
}

И с помощью этого подписчика я мог определить очень простой служебный метод, который превращал Flux<byte[] в InputStream, примерно следующим образом:

static InputStream createInputStream(Flux<byte[]> flux) {

    PipedInputStream in = new PipedInputStream();
    flux.subscribeOn(Schedulers.elastic())
        .subscribe(new PipedStreamSubscriber(in));

    return in;
}

Обратите внимание, что я был очень осторожен, чтобы закрыть выходной поток, когда поток завершен, когда возникает ошибка или подписка отменяется, иначе мы рискуем заблокироваться на стороне чтения, ожидая поступления дополнительных входных данных. Закрытие выходного потока — это то, что сигнализирует об окончании входного потока на другой стороне канала.

И теперь этот InputStream можно использовать так же, как любой обычный поток, и поэтому вы можете передать его своему нереактивному методу, например.

Flux<byte[]> jedi = Flux.just("Luke\n", "Obi-Wan\n", "Yoda\n").map(String::getBytes);

try (InputStream in = createInputStream(jedi)) {
    byte[] data = new byte[5];
    int size = 0;
    while ((size = in.read(data)) > 0) {
        System.out.printf("%s", new String(data, 0, size));
    }
} 

Приведенный выше код дает:

Luke
Obi-Wan
Yoda
person Edwin Dalorzo    schedule 11.08.2018
comment
Не могли бы вы добавить некоторую информацию о потреблении памяти и задержке? Например, если бы я использовал это решение для файла размером 100 МБ, был бы файл полностью загружен в память? Или ваш System.out.printf начнет выводить начало, как только начнется поток? Спасибо! - person Nicolas Raoul; 13.02.2019

Ответ Эдвина не помог мне, поскольку ошибки в восходящем потоке были проглочены подписчиком и не распространились на потребителя InputStream. Тем не менее, вдохновленный ответом Эдвина, я нашел другое решение. Вот пример использования Flux<ByteArray> и передачи его как InputStream вниз по течению. Пример включает расшифровку, чтобы подчеркнуть возможность манипулирования OutputStream даже после того, как Flux<ByteStream> был полностью использован, что в конечном итоге приводит к ошибке, которая распространяется вниз по течению.

fun decryptAndGetInputStream(flux: Flux<ByteArray>, cipher: Cipher): Flux<InputStream> {
    val inputStream = PipedInputStream()
    val outputStream = PipedOutputStream(inputStream)
    val isStreamEmitted = AtomicBoolean(false)
    
    return flux.handle<InputStream> { byteArray, sink ->
        try {
            outputStream.write(cipher.update(byteArray))
            // emit the input stream as soon as we get the first chunk of bytes
            // make sure we do it only once
            if (!isStreamEmitted.getAndSet(true)) {
                sink.next(inputStream)
            }
        } catch (e: Exception) {
            // catch all errors to pass them to the sink
            sink.error(e)
        }
    }.doOnComplete { 
        // here we have a last chance to throw an error  
        outputStream.write(cipher.doFinal())
    }.doOnTerminate {
        // error thrown here won't get propagated downstream
        // since this callback is triggered after flux's completion 
        outputStream.flush()
        outputStream.close()
    }
}

Загвоздка здесь в том, чтобы использовать оператор handle для создания Flux, который генерирует не более одного элемента. В отличие от Mono, Flux не будет прекращено сразу после первого выброса. Хотя он больше не будет выдавать элементы, он остается открытым для выдачи возможной ошибки, которая возникает после первого выброса.

Ниже приведен пример использования Flux<InputStream> и преобразования его в Mono.

fun decryptAndGetProcessingResult(flux: Flux<ByteArray>, cipher: Cipher): Mono<Result> =
    decryptAndGetInputStream(flux, cipher)
        // the following operator gets called at most once
        .flatMap { inputStream ->
            // wrap the blocking operation into mono
            // subscribed on another thread to avoid deadlocks
            Mono.fromCallable { 
                processInputStream(inputStream)
            }.subscribeOn(Schedulers.elastic())
        // to get mono out of flux we implement reduce operator
        // although it gets never called
        }.reduce { t, _ -> t }

Еще одним преимуществом здесь является то, что поток, потребляющий InputStream, не будет блокироваться до тех пор, пока не будет доступен первый фрагмент данных.

person Jan Volf    schedule 26.09.2020

Вы можете преобразовать Flux<String> известного размера в Mono<byte[]>, который, в свою очередь, можно использовать для формирования InputStream. Проверьте это (на Java):

Flux<String> stringFlux = ...;
stringFlux.collect(() -> new ByteArrayOutputStream(),
                   (baos, str) -> {
                       try {
                           baos.write(str.getBytes());
                       } catch (IOException e) {
                           // do nothing
                       }
                   })
          .map(baos -> new ByteArrayInputStream(baos.toByteArray()))
          .map(inputStream -> ... // call other library);

Для этого требуется холодный Flux<T>, так как collect() будет запущен после завершения Flux. Для Flux<T> неизвестного размера (и предполагая, что каждый String является автономным объектом), все становится еще проще:

Flux<String> stringFlux = ...;
stringFlux.map(str -> new ByteArrayInputStream(str.getBytes()))
          .map(inputStream -> ... // call other library);
person MuratOzkan    schedule 08.08.2018
comment
Flux.collect запустится только при завершении. Если это популярный издатель, это не сработает. Поскольку вы не знаете, когда вызывается завершение, так как оно реагирует на входящий JMS - person Kevin Hussey; 08.08.2018
comment
Не заметил, что вы использовали очередь. Тогда будет ли каждая строка, поступающая из Flux, представлять собой InputStream для передачи в эту библиотеку? - person MuratOzkan; 08.08.2018

Вы можете уменьшить Flux<DataBuffer> до Mono<DataBuffer>, а затем перевести на InputStream.

Пример кода для загрузки файла в GridFs в WebFlux:

    private GridFsTemplate gridFsTemplate;

    public Mono<String> storeFile(FilePart filePart) {
        HttpHeaders headers = filePart.headers();
        String contentType = Objects.requireNonNull(headers.getContentType()).toString();

        return filePart.content()
                .reduce(DataBuffer::write).map(DataBuffer::asInputStream)
                .map(input -> gridFsTemplate.store(input, filePart.filename(), contentType))
                .map(ObjectId::toHexString);
    }
person Lin CS    schedule 26.04.2019