Project Reactor: подождите, пока вещатель закончит

Есть Broadcaster, который принимает строки и добавляет их в StringBuilder.

Я хочу это проверить.

Мне нужно использовать Thread#sleep, чтобы ждать, пока вещатель закончит обработку строк. Я хочу удалить sleep.

Попытался использовать Control#debug() безуспешно.

public class BroadcasterUnitTest {

@Test
public void test() {
    //prepare
    Environment.initialize();
    Broadcaster<String> sink = Broadcaster.create(Environment.newDispatcher()); //run broadcaster in separate thread (dispatcher)
    StringBuilder sb = new StringBuilder();
    sink
            .observe(s -> sleep(100)) //long-time operation
            .consume(sb::append);

    //do
    sink.onNext("a");
    sink.onNext("b");

    //assert
    sleep(500);//wait while broadcaster finished (if comment this line then the test will fail)
    assertEquals("ab", sb.toString());
}

private void sleep(int millis) {
    try {
        Thread.sleep(millis);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
}
}

person Aleks Ya    schedule 14.11.2015    source источник


Ответы (1)


Я не знаком с Broadcaster (и, вероятно, он устарел, поскольку вопрос старый), но в целом могут быть полезны эти 3 способа:

  1. При тестировании Project-Reactor Fluxes и прочего вам, вероятно, лучше использовать их библиотеку тестирования, созданную специально для этого. Их ссылка и Документация Javadoc в этой части довольно хороша, и я просто скопирую пример, который говорит сам за себя:

    @Test
    public void testAppendBoomError() {
      Flux<String> source = Flux.just("foo", "bar"); 
      StepVerifier.create( 
        appendBoomError(source)) 
        .expectNext("foo") 
        .expectNext("bar")
        .expectErrorMessage("boom") 
        .verify(); 
    }
    
  2. Вы можете просто block() самостоятельно использовать Fluxes и Monos, а затем запускать проверки. И обратите внимание, что если возникает ошибка, это приведет к исключению. Но есть ощущение, что вам понадобится написать больше кода для некоторых случаев (например, проверка Flux выдала 2 элемента X & Y, а затем завершилась ошибкой), и тогда вы будете повторно реализовывать StepVerifier.

    @Test
    public void testFluxOrMono() {
      Flux<String> source = Flux.just(2, 3);
      List<Integer> result = source
            .flatMap(i -> multiplyBy2Async(i))
            .collectList()
            .block();
      // run your asserts on the list. Reminder: the order may not be what you expect because of the `flatMap`
      // Or with a Mono:
      Integer resultOfMono = Mono.just(5)
            .flatMap(i -> multiplyBy2Async(i))
            .map(i -> i * 4)
            .block();
      // run your asserts on the integer
    }
    
  3. Вы можете использовать общие решения для асинхронного тестирования, такие как CountDownLatch, но, опять же, не рекомендовали бы и доставляли вам проблемы в некоторые случаи. Например, если вы заранее не знаете количество приемников, вам нужно будет использовать что-то еще.

person Hossam El-Deen    schedule 17.04.2017