Реактивный подход к использованию временной карты поиска

У меня есть код, который я хотел бы сделать более реактивным, используя RxJava или Proj. Реактор. Я немного борюсь с этим.

Имея список CatalogRequests, некоторые из которых могут быть для одного и того же CatalogItem, но с другим условием (NEW против различных уровней USED), я хочу:

  1. Получить отдельные элементы каталога по их itemId. Данные для всех условий хранятся в одной строке, поэтому мне нужно сделать запрос на один и тот же элемент только один раз.

  2. Когда у меня есть отдельные элементы, просмотрите мой список запросов и выполните некоторую логику, которая преобразует эти запросы с помощью отдельного элемента каталога, который я только что просмотрел.

    class CatalogRequest {
        String itemId;
        Integer condition;
    }
    
    //class does not look like this, simplified for the question
    class CatalogItem {
         Map<Condition, Money> usedPrices;
         Map<Condition, DateTime> availableDate;
    }
    
    //non reactive code
    public List<CatalogResponse> fetchResponses(List<CatalogRequest> requests) {
    List<CatalogItem> items = requests.stream().map(CatalogRequest::getItemId)
        .distinct()
        .collect(toList());
    
    List<Optional<CatalogItem>> cachedResults = items.stream()
        .map (this::fetchItem)
        .map (CompletableFuture::join)
        .filter(Optional::isPresent)
        .collect(toList());
    
    List<CatalogResponse> responses = new ArrayList<>();
    requests.forEach( request -> 
        {
             CatalogResponse response = transformResponse(request, cachedResults.get(request.getItemId());
             responses.add(response);
        });
    }
    

person Maude    schedule 21.03.2017    source источник
comment
ну, это кажется очень легко конвертируемым в Observable вместо потоков, большинство операторов одинаковы (карта, отдельный, фильтр), но вопрос в том, почему вы пытаетесь сделать его более «реактивным», где здесь ваш асинхронный код ? вроде все как в коллекциях памяти, так что не так с потоками?   -  person yosriz    schedule 21.03.2017
comment
Пожалуйста, не засоряйте StackOverflow ответами, зачем вам это.   -  person Maude    schedule 21.03.2017


Ответы (1)


Ваш алгоритм и вариант использования, ИМХО, плохо адаптированы к шаблону реактивного программирования. Это связано с тем, что он «объединяет» информацию из двух коллекций (запросов и кэшированных ответов) и может сделать это только после обработки всех данных, в отличие от обработки, которая может выполняться постепенно, элемент за элементом.

Это даже не просто решить, комбинируя такие операторы, как zip, потому что данные фильтруются и не гарантируется, что они будут в том же порядке или количестве элементов.

(кстати, есть проблема с cachedResults.get(request.getItemId()), я думаю, поскольку это список, а не карта, нет гарантии, что идентификатор элемента запроса правильно сопоставляется с индексом списка).

Таким образом, преобразование первых шагов fetchResponses достаточно просто, но не последнего шага.

Для справки, вот как я могу перенести часть этого кода:

Возможно, вы могли бы изменить fetchItem(), чтобы вернуть Mono (например, используя Mono.fromCallable(), в зависимости от того, что на самом деле делает выборка). Обратите внимание, что если fetch использует ExecutorService внутри, вам необходимо убедиться, что ваш Mono также выполняется аналогичным образом в другом контексте потоковой передачи, например, с использованием .subscribeOn(Schedulers.elastic())...

Затем вы можете начать перенос кода в fetchResponses. Я не думаю, что начать с набора CatalogRequests — это проблема, если вы собрали их в памяти без особых задержек. Если нет, вы всегда можете также перенести родительский код, чтобы вместо этого вызывать метод с Flux<CatalogRequest> и асинхронно реагировать на каждый запрос.

Список можно преобразовать в Flux с помощью Flux.fromIterable(requests), а затем отфильтровать дубликаты с помощью .distinct().

Вот изменение: пока не собирайте, а продолжайте создавать свою асинхронную последовательность.

Выполните .flatMap(this::fetchItem) для асинхронного извлечения строк. Возможно, вам даже не понадобится этот метод для возврата Optional, поскольку пустой поток/моно будет эквивалентен и будет игнорироваться flatMap.

Но на данный момент вы как бы застряли с возвратом к .collectList().block() и повторным использованием императивного кода.

Если бы вы могли найти декларативный способ выполнения той же задачи (подумайте об использовании одного Stream), все было бы гораздо более адаптируемым к реактивному программированию.

person Simon Baslé    schedule 22.03.2017