Вероятно, есть много способов сделать это. Одна из возможностей — использовать PipedInputStream и PipedOutputStream.
Это работает следующим образом: вы связываете поток вывода с потоком ввода, так что все, что вы пишете в поток вывода, может быть прочитано из связанного потока ввода, тем самым создавая канал между ними двумя.
PipedInputStream in = new PipedInputStream();
PipedOutputStream out = PipedOutputStream(in);
Однако есть одно предостережение, согласно документации конвейерных потоков, процесс записи и процесс чтения должны происходить в отдельных потоках, иначе мы можем вызвать взаимоблокировку.
Итак, возвращаясь к нашему сценарию с реактивным потоком, мы можем создать конвейер (как упоминалось выше) и подписаться на объект Flux
, а данные, которые вы получаете от него, вы записываете в конвейерный выходной поток. Что бы вы там ни написали, будет доступно для чтения на другой стороне канала, в соответствующем входном потоке. Этот входной поток — тот, которым вы можете поделиться со своим нереактивным методом.
Нам просто нужно быть особенно осторожными, чтобы подписаться на Flux в отдельном потоке, например. subscribeOn(Schedulers.elastic())
.
Вот очень простая реализация такого подписчика:
class PipedStreamSubscriber extends BaseSubscriber<byte[]> {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final PipedInputStream in;
private PipedOutputStream out;
PipedStreamSubscriber(PipedInputStream in) {
Objects.requireNonNull(in, "The input stream must not be null");
this.in = in;
}
@Override
protected void hookOnSubscribe(Subscription subscription) {
//change if you want to control back-pressure
super.hookOnSubscribe(subscription);
try {
this.out = new PipedOutputStream(in);
} catch (IOException e) {
//TODO throw a contextual exception here
throw new RuntimeException(e);
}
}
@Override
protected void hookOnNext(byte[] payload) {
try {
out.write(payload);
} catch (IOException e) {
//TODO throw a contextual exception here
throw new RuntimeException(e);
}
}
@Override
protected void hookOnComplete() {
close();
}
@Override
protected void hookOnError(Throwable error) {
//TODO handle the error or at least log it
logger.error("Failure processing stream", error);
close();
}
@Override
protected void hookOnCancel() {
close();
}
private void close() {
try {
if (out != null) {
out.close();
}
} catch (IOException e) {
//probably just ignore this one or simply log it
}
}
}
И с помощью этого подписчика я мог определить очень простой служебный метод, который превращал Flux<byte[]
в InputStream
, примерно следующим образом:
static InputStream createInputStream(Flux<byte[]> flux) {
PipedInputStream in = new PipedInputStream();
flux.subscribeOn(Schedulers.elastic())
.subscribe(new PipedStreamSubscriber(in));
return in;
}
Обратите внимание, что я был очень осторожен, чтобы закрыть выходной поток, когда поток завершен, когда возникает ошибка или подписка отменяется, иначе мы рискуем заблокироваться на стороне чтения, ожидая поступления дополнительных входных данных. Закрытие выходного потока — это то, что сигнализирует об окончании входного потока на другой стороне канала.
И теперь этот InputStream можно использовать так же, как любой обычный поток, и поэтому вы можете передать его своему нереактивному методу, например.
Flux<byte[]> jedi = Flux.just("Luke\n", "Obi-Wan\n", "Yoda\n").map(String::getBytes);
try (InputStream in = createInputStream(jedi)) {
byte[] data = new byte[5];
int size = 0;
while ((size = in.read(data)) > 0) {
System.out.printf("%s", new String(data, 0, size));
}
}
Приведенный выше код дает:
Luke
Obi-Wan
Yoda
person
Edwin Dalorzo
schedule
11.08.2018