Параллельное создание объектов с помощью RxJava

Я написал микросервис Spring Boot, используя RxJava (агрегированный сервис) для реализации следующего упрощенного варианта использования. В целом, когда преподаватель загружает документ с содержанием курса, необходимо создать и сохранить набор вопросов.

  • Пользователь загружает документ в систему.
  • Система вызывает службу документов для преобразования документа в текст.
  • Затем он вызывает другую службу генерации вопросов для создания набора вопросов с учетом приведенного выше текстового содержимого.
  • Наконец, эти вопросы публикуются в базовом микросервисе CRUD для сохранения.

Когда пользователь загружает документ, из него создается множество вопросов (может быть сотни или около того). Проблема здесь в том, что я публикую вопросы по одному последовательно, чтобы служба CRUD могла их сохранить. Это резко замедляет работу из-за интенсивных сетевых вызовов ввода-вывода, поэтому для завершения всего процесса требуется около 20 секунд. Вот текущий код, если все вопросы сформулированы.

questions.flatMapIterable(list -> list).flatMap(q -> createQuestion(q)).toList();

private Observable<QuestionDTO> createQuestion(QuestionDTO question) {
    return Observable.<QuestionDTO> create(sub -> {
        QuestionDTO questionCreated = restTemplate.postForEntity(QUESTIONSERVICE_API,
                new org.springframework.http.HttpEntity<QuestionDTO>(question), QuestionDTO.class).getBody();
        sub.onNext(questionCreated);
        sub.onCompleted();
    }).doOnNext(s -> log.debug("Question was created successfully."))
            .doOnError(e -> log.error("An ERROR occurred while creating a question: " + e.getMessage()));
}

Теперь мое требование — размещать все вопросы параллельно в сервисе CRUD и объединять результаты по завершению. Также обратите внимание, что служба CRUD будет принимать только один объект вопроса за раз, и это нельзя изменить. Я знаю, что могу использовать для этой цели оператор Observable.zip, но понятия не имею, как его применять в этом контексте, поскольку фактическое количество вопросов заранее не определено. Как изменить код в строке 1, чтобы повысить производительность приложения. Любая помощь приветствуется.


person Ravindra Ranwala    schedule 13.04.2017    source источник
comment
stackoverflow.com/a/42823151/7045114   -  person Maksim Ostrovidov    schedule 14.04.2017
comment
Не используйте create, вместо этого используйте fromCallable/defer. Если вы используете оператор создания не по назначению, вы убьете обратное давление (rx1)   -  person Hans Wurst    schedule 16.04.2017


Ответы (1)


По умолчанию наблюдатели в flatMap работают с тем же планировщиком, на который вы подписались. Чтобы запустить ваши createQuestion наблюдаемые объекты параллельно, вы должны подписаться на них в планировщике вычислений.

questions.flatMapIterable(list -> list)
        .flatMap(q -> createQuestion(q).subscribeOn(Schedulers.computation()))
        .toList();

Прочтите эту статью, чтобы получить полное объяснение.

person Lamorak    schedule 13.04.2017
comment
Большое спасибо за ответ. Я просмотрел эту статью, пробуя примеры, некоторые из них не работали должным образом. В любом случае, у меня теперь другой вопрос. createQuestion выполняется асинхронно. Ранее он вызывался в одном потоке. С новой модификацией он будет вызываться в параллельных потоках. В обоих случаях для каждого вопроса будет отдельная ветка, посвященная творчеству за обложками. Будет ли огромный выигрыш из-за этого изменения? Если да, то как? - person Ravindra Ranwala; 13.04.2017
comment
@RavindraRanwala Итак, flatMap работает, создавая новый Observable для каждого полученного элемента и объединяя эти наблюдаемые в один. Итак, если ваш элемент испускается быстро (это ваш список), а ваш createQuestion работает очень медленно. Тогда ваш createQuestion будет работать параллельно. - person Phoenix Wang; 13.04.2017
comment
@PhoenixWang Спасибо, я попробую и буду держать вас в курсе прироста производительности. Это займет несколько дней, так как я далеко от своего офиса. - person Ravindra Ranwala; 13.04.2017
comment
Schedulers.computation() здесь не подходит, так как он блокирует потоки вычислений в задачах ввода-вывода. Должно быть .flatMap(q -> createQuestion(q).subscribeOn(Schedulers.io()), N), где N — количество наблюдаемых, на которые flatMap подпишется в любой момент времени. По сути, это предел параллелизма для этой операции - значение по умолчанию 128 (ИМХО) слишком велико для большинства случаев использования. - person Tassos Bassoukos; 13.04.2017
comment
Спасибо, я попробую и дам вам знать. Это займет некоторое время, так как я далеко от офиса. Спасибо, если вы можете объяснить мне, как это улучшает производительность моего приложения? - person Ravindra Ranwala; 14.04.2017
comment
Как правильно обрабатывать ошибку, возникающую внутри метода createQuestion? - person Almighty; 10.04.2019
comment
Это довольно широкий вопрос, возможно, вы можете сформировать его как совершенно новый вопрос и предоставить более подробную информацию о вашем случае. - person Lamorak; 11.04.2019