RxJava Завершение бесконечных потоков

Я изучаю реактивное программирование и RxJava. Это весело, но я застрял на проблеме, на которую не могу найти ответа. Мой основной вопрос: каков соответствующий реактивно способ завершить бесконечно работающий Observable? Я также приветствую критику и лучшие практики в отношении моего кода.

В качестве упражнения я пишу хвостовую утилиту файла журнала. Поток строк в файле журнала представлен Observable<String>. Чтобы BufferedReader продолжал читать текст, добавленный в файл, я игнорирую обычную проверку завершения reader.readLine() == null и вместо этого интерпретирую ее как означающую, что мой поток должен засыпать и ждать следующего текста регистратора.

Но хотя я могу завершить работу Observer с помощью takeUntil, мне нужно найти чистый способ завершить работу бесконечно работающего наблюдателя файлов. Я могу написать свой собственный terminateWatcher метод / поле, но это нарушает инкапсуляцию Observable / Observer - и я хотел бы оставаться максимально строгим в отношении реактивной парадигмы.

Вот код Observable<String>:

public class FileWatcher implements OnSubscribeFunc<String> {
    private Path path = . . .;

    @Override
    // The <? super String> generic is pointless but required by the compiler
    public Subscription onSubscribe(Observer<? super String> observer) {
        try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) {
            String newLine = "";
            while (!Thread.interrupted()) {  // How do I terminate this reactively?
                if ((newLine = reader.readLine()) != null)
                    observer.onNext(newLine);
                else
                    try {
                        // Wait for more text
                        Thread.sleep(250);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
            }
            observer.onCompleted();
        } catch (Exception e) {
            observer.onError(e);
        }

        return null;  // Not sure what Subscription I should return
    }
}

Вот код Observer, который печатает новые строки по мере их поступления:

public static void main(String... args) {
    . . .
    Observable<String> lines = Observable.create(createWatcher(file));
    lines = lines.takeWhile(new Func1<String, Boolean>() {
        @Override
        public Boolean call(String line) {
            // Predicate for which to continue processing
            return !line.contains("shutdown");
        }
    }).subscribeOn(Schedulers.threadPoolForIO())
            .observeOn(Schedulers.currentThread());
    // Seems like I should use subscribeOn() and observeOn(), but they
    // make my tailer terminate without reading any text.

    Subscription subscription = lines.subscribe(new Action1<String>() {
        @Override
        public void call(String line) {
             System.out.printf("%20s\t%s\n", file, line);
        }
    });
}

Мои два вопроса:

  1. Что такое реактивно-согласованный способ завершить бесконечно работающий поток?
  2. Какие еще ошибки в моем коде заставляют вас плакать? :)

person MonkeyWithDarts    schedule 18.12.2013    source источник


Ответы (1)


Поскольку вы запускаете просмотр файлов, когда запрашивается подписка, имеет смысл прекратить его, когда подписка закончится, то есть когда Subscription закрыт. Это устраняет один из недостатков в комментариях к коду: вы возвращаете Subscription, который говорит FileWatcher прекратить просмотр файла. Затем замените условие цикла таким образом, чтобы оно проверяло, была ли отменена подписка, вместо проверки того, был ли прерван текущий поток.

Проблема в том, что это не очень помогает, если ваш onSubscribe() метод никогда не возвращается. Возможно, ваш FileWatcher должен требовать, чтобы планировщик был указан, и чтение происходит в этом планировщике.

person Mike Strobel    schedule 18.12.2013
comment
Таким образом, планировщик может определять, как часто следует читать файл журнала; затем, когда планировщик запускает FileWatcher, FileWatcher вытягивает строки до readLine() == null. Затем Observer может отказаться от подписки, как обычно, или использовать takeUntil() для автоматического отказа от подписки. Мне придется изучить эти методы, чтобы убедиться, что они работают, но идея мне нравится. - person MonkeyWithDarts; 18.12.2013