Прочитайте файл асинхронно с RxJava 2

Я пытаюсь создать Observable<ByteBuffer> или Flowable<ByteBuffer>, которые будут читать файл асинхронно (или, по крайней мере, я ищу лучшую производительность, которую Java может дать мне в этом случае использования - чтение файла).

Я хочу прочитать его по частям (каждый раз заполняя новый ByteBuffer), потому что у меня недостаточно оперативной памяти, чтобы полностью хранить его в памяти, и мне нужна обработка обратного давления (поскольку ByteBuffer должны обрабатываться один за другим, поэтому я не не хочу, чтобы ввод-вывод переполнял вычисления).

Я новичок в реактивном программировании, как и в RxJava. Так что, возможно, уже существуют какие-то библиотеки, которые делают именно то, что я хочу? (уже искал, но не нашел)

Если это не так, может ли кто-нибудь сказать мне, как сделать то, что я хочу, пожалуйста?


person Anthony O.    schedule 09.02.2017    source источник
comment
Возможно, вы захотите рассмотреть возможность использования файлов с отображением памяти, где вы отображаете весь файл как ByteBuffer, а ОС позаботится о подкачке неиспользуемых его частей, даже если сам файл не помещается в память.   -  person akarnokd    schedule 10.02.2017


Ответы (2)


Создать издателя и соответствовать спецификации реактивных потоков — непростая задача https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#спецификация

Для этого вы можете попробовать библиотеку https://github.com/cqfn/rio:

Publisher<ByteBuffer> pub = new File(Path.of("/tmp/file")).content();

Если вы хотите реализовать его самостоятельно, вы можете проверить источники реактивной Channel реализации: https://github.com/cqfn/rio/blob/master/src/main/java/org/cqfn/rio/channel/ReadableChannelPublisher.java

person Kirill    schedule 05.03.2021

Вы можете использовать Vert.x, чтобы сделать это как один из доступных вариантов. Пример кода приведен ниже:

import io.vertx.core.file.OpenOptions;
import io.vertx.rxjava.core.Vertx;

--

public static void main(String[] args) {
    Vertx vertx = Vertx.vertx();
    vertx.fileSystem()
            .openObservable("PATH-TO-FILE", new OpenOptions())
            .flatMap(asyncFile -> asyncFile.toObservable())
            .subscribe(buffer -> System.out.println(buffer.length() + "\n\n"), 
                       e-> e.printStackTrace(), 
                       () -> vertx.close());

}
person Pavan Kumar    schedule 10.02.2017
comment
Ответ, который не требует дополнительной зависимости, был бы полезен. - person Rikki Gibson; 11.12.2017