Необъяснимое отсутствие повышения производительности при использовании RxJava Observables в веб-приложениях.

Я выполняю некоторые тесты, чтобы оценить, есть ли реальное преимущество в использовании реактивного API, основанного на Observables, вместо блокирующих традиционных.

Весь пример доступен на Gitug.

Удивительно, но результаты показывают, что результаты пропускной способности следующие:

  • Лучшее: службы REST, которые возвращают Callable/DeferredResult, заключающие в себе блокирующие операции.

  • Не так уж плохо: блокировка служб REST.

  • Худшее: службы REST, которые возвращают DeferredResult, результат которого устанавливается наблюдаемым объектом RxJava.

Это мое весеннее веб-приложение:

Приложение:

@SpringBootApplication
public class SpringNioRestApplication {

   @Bean
    public ThreadPoolTaskExecutor executor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10);
        executor.setMaxPoolSize(20);
        return executor;
    }

    public static void main(String[] args) {
        SpringApplication.run(SpringNioRestApplication.class, args);
    }
}

Контроллер синхронизации:

@RestController("SyncRestController")
@Api(value="", description="Synchronous data controller")
public class SyncRestController {

    @Autowired
    private DataService dataService;

    @RequestMapping(value="/sync/data", method=RequestMethod.GET, produces="application/json")
    @ApiOperation(value = "Gets data", notes="Gets data synchronously")
    @ApiResponses(value={@ApiResponse(code=200, message="OK")})
    public List<Data> getData(){
        return dataService.loadData();
    }
}

AsyncController: с необработанными конечными точками Callable и Observable.

@RestController
@Api(value="", description="Synchronous data controller")
public class AsyncRestController {

    @Autowired
    private DataService dataService;

    private Scheduler scheduler;

    @Autowired
    private TaskExecutor executor;

     @PostConstruct
    protected void initializeScheduler(){
        scheduler = Schedulers.from(executor);
    }

    @RequestMapping(value="/async/data", method=RequestMethod.GET, produces="application/json")
    @ApiOperation(value = "Gets data", notes="Gets data asynchronously")
    @ApiResponses(value={@ApiResponse(code=200, message="OK")})
    public Callable<List<Data>> getData(){
        return ( () -> {return dataService.loadData();} );
    }

    @RequestMapping(value="/observable/data", method=RequestMethod.GET, produces="application/json")
     @ApiOperation(value = "Gets data through Observable", notes="Gets data asynchronously through Observable")
     @ApiResponses(value={@ApiResponse(code=200, message="OK")})
     public DeferredResult<List<Data>> getDataObservable(){
         DeferredResult<List<Data>> dr = new DeferredResult<List<Data>>();
         Observable<List<Data>> dataObservable = dataService.loadDataObservable();
         dataObservable.subscribeOn(scheduler).subscribe( dr::setResult, dr::setErrorResult);
         return dr;
     }
}

DataServiceImpl

@Service
public class DataServiceImpl implements DataService{

    @Override
    public List<Data> loadData() {
        return generateData();
    }

    @Override
    public Observable<List<Data>> loadDataObservable() {
        return Observable.create( s -> {
            List<Data> dataList = generateData();
            s.onNext(dataList);
            s.onCompleted();
        });
    }

    private List<Data> generateData(){
        List<Data> dataList = new ArrayList<Data>();
        for (int i = 0; i < 20; i++) {
            Data data = new Data("key"+i, "value"+i);
            dataList.add(data);
        }
        //Processing time simulation
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return dataList;
    }
}

Я установил задержку Thread.sleep(500), чтобы увеличить время отклика службы.

Вот результаты нагрузочных тестов:

Асинхронизация с Callable: 700 об/с, без ошибок.

>>loadtest -c 15 -t 60 --rps 700 http://localhost:8080/async/data    
...
Requests: 0, requests per second: 0, mean latency: 0 ms
Requests: 2839, requests per second: 568, mean latency: 500 ms
Requests: 6337, requests per second: 700, mean latency: 500 ms
Requests: 9836, requests per second: 700, mean latency: 500 ms
...
Completed requests:  41337
Total errors:        0
Total time:          60.002348360999996 s
Requests per second: 689
Total time:          60.002348360999996 s

Блокировка: около 404 запросов в секунду, но выдает ошибки

>>loadtest -c 15 -t 60 --rps 700 http://localhost:8080/sync/data    
...
Requests: 7683, requests per second: 400, mean latency: 7420 ms
Requests: 9683, requests per second: 400, mean latency: 9570 ms
Requests: 11680, requests per second: 399, mean latency: 11720 ms
Requests: 13699, requests per second: 404, mean latency: 13760 ms
...
Percentage of the requests served within a certain time
  50%      8868 ms
  90%      22434 ms
  95%      24103 ms
  99%      25351 ms
 100%      26055 ms (longest request)

 100%      26055 ms (longest request)

   -1:   7559 errors
Requests: 31193, requests per second: 689, mean latency: 14350 ms
Errors: 1534, accumulated errors: 7559, 24.2% of total requests

Асинхронность с Observable: не более 20 запросов в секунду, ошибки появляются раньше

>>loadtest -c 15 -t 60 --rps 700 http://localhost:8080/observable/data
Requests: 0, requests per second: 0, mean latency: 0 ms
Requests: 90, requests per second: 18, mean latency: 2250 ms
Requests: 187, requests per second: 20, mean latency: 6770 ms
Requests: 265, requests per second: 16, mean latency: 11870 ms
Requests: 2872, requests per second: 521, mean latency: 1560 ms
Errors: 2518, accumulated errors: 2518, 87.7% of total requests
Requests: 6373, requests per second: 700, mean latency: 1590 ms
Errors: 3401, accumulated errors: 5919, 92.9% of total requests 

Observable выполняется с corePoolSize равным 10, но увеличение его до 50 также ничего не улучшило.

Какое может быть объяснение?

ОБНОВЛЕНИЕ: по предложению akarnokd я внес следующие изменения. Перешел из Object.create в Object.fromCallable в сервисе и повторно использовал планировщик в контроллере, но все равно получаю те же результаты.


person codependent    schedule 15.12.2015    source источник
comment
Не могли бы вы использовать Observable.fromCallable вместо Observable.create? Ваше использование create кажется странным. Кроме того, Thread.sleep не гарантирует точного количества сна, но зависит от ОС. В getVideoInfoAsync вы создаете оболочку планировщика снова и снова без необходимости.   -  person akarnokd    schedule 15.12.2015
comment
Привет akarnokd, спасибо за ваш комментарий. Пара вещей, что не так с использованием Observable.create? Также я не понимаю, что вы имеете в виду, создавая оболочку планировщика снова и снова. Чтобы реализовать это, я следовал тому, что видел здесь, в dzone.   -  person codependent    schedule 15.12.2015
comment
Вы не вызываете s.onCompleted() для начала, но отсутствие обработки отказа от подписки также может быть проблематичным. Кроме того, вы должны увидеть, в чем заключается сбой, который также может указывать на источник потери производительности. У вас есть TaskExecutor в качестве поля-члена, но затем вы оборачиваете его Scheduler.wrap для каждого вызова getVideoInfoAsync, что, как я предполагаю, происходит сотни раз в секунду.   -  person akarnokd    schedule 15.12.2015
comment
Ммм, я также добавил s.onCompleted() в исходный пример, но это тоже не улучшилось. Единственная ошибка, которую я вижу в консоли, это o.a.c.c.C.[Tomcat].[localhost] : Exception Processing ErrorPage[errorCode=0, location=/error] java.lang.IllegalStateException: Cannot forward after response has been committed   -  person codependent    schedule 15.12.2015
comment
Я бы изменил наблюдаемое на Observable.just и вернул бы какой-нибудь имитированный объект, чтобы исключить любые недостатки с RxJava. Поскольку RxJava не работает с сетью, я предполагаю, что у вас есть фреймворк, который делает это вместо этого. Судя по сообщению об ошибке, возможно, эта платформа неправильно настроена, устарела или просто содержит ошибки.   -  person akarnokd    schedule 16.12.2015


Ответы (1)


Проблема была вызвана ошибкой программирования в какой-то момент. На самом деле пример в вопросе работает отлично.

Одно предупреждение, чтобы у других не было проблем: остерегайтесь использования Observable.just(func), func фактически вызывается при создании Observable. Таким образом, любой Thread.sleep, размещенный там, заблокирует вызывающий поток.

@Override
public Observable<List<Data>> loadDataObservable() {
    return Observable.just(generateData()).delay(500, TimeUnit.MILLISECONDS);
}

private List<Data> generateData(){
    List<Data> dataList = new ArrayList<Data>();
    for (int i = 0; i < 20; i++) {
        Data data = new Data("key"+i, "value"+i);
        dataList.add(data);
    }
    return dataList;
}

Я начал обсуждение в группе RxJava Google, где мне помогли разобраться .

person codependent    schedule 16.12.2015
comment
Итак, каков конечный результат? - person Rekin; 16.12.2015
comment
В результате производительность Observable немного выше, чем при использовании только Callable. - person codependent; 16.12.2015