Как выполнить блокирующий код в неблокирующем обработчике в Undertow?

Как описано в отдельном вопросе, при использовании Undertow вся обработка должно выполняться в выделенном пуле рабочих потоков, который выглядит следующим образом:

public class Start {

  public static void main(String[] args) {
    Undertow server = Undertow.builder()
        .addListener(8080, "localhost")
        .setHandler(new HttpHandler() {
          public void handleRequest(HttpServerExchange exchange)
              throws Exception {
            if (exchange.isInIoThread()) {
              exchange.dispatch(this);
              return;
            }
            exchange.getResponseHeaders()
                    .put(Headers.CONTENT_TYPE, "text/plain");
            exchange.getResponseSender()
                    .send("Hello World");
          }
        })
        .build();
    server.start();
  }
}

Я понимаю, что BlockingHandler можно использовать для явного указания Undertow запланировать запрос в выделенном пуле потоков для блокировки запросов. Мы могли бы адаптировать приведенный выше пример, заключив HttpHandler в экземпляр BlockingHandler, например так:

        .setHandler(new BlockingHandler(new HttpHandler() {

Это будет работать для вызовов, которые, как мы знаем, всегда блокируются.

Однако, если какой-то код большую часть времени неблокирует, но иногда требует блокирующего вызова, как превратить этот блокирующий вызов в неблокирующий? Например, если запрошенное значение присутствует в кеше, следующий код не будет блокироваться (он просто извлекается из какого-то Map<>), но если его нет, его необходимо получить из базы данных.

public class Start {

  public static void main(String[] args) {
    Undertow server = Undertow.builder()
        .addListener(8080, "localhost")
        .setHandler(new HttpHandler() {
          public void handleRequest(HttpServerExchange exchange)
              throws Exception {
            if (exchange.isInIoThread()) {
              exchange.dispatch(this);
              return;
            }
            if (valueIsPresentInCache(exchange)) {
              return valueFromCache;  // non-blocking
            } else {
              return fetchValueFromDatabase(); // blocking!!!
            }
          }
        })
        .build();
    server.start();
  }
}

Согласно документам, является методом HttpServerExchange.startBlocking(), но, согласно JavaDoc, если вам действительно не нужно использовать входной поток, этот вызов по-прежнему является блокирующим.

Вызов этого метода переводит обмен в режим блокировки и создает объект BlockingHttpExchange для хранения потоков. Когда биржа находится в блокирующем режиме, становятся доступными методы входного потока, за исключением того, что в настоящее время нет существенной разницы между блокирующим и неблокирующим режимами.

Как можно превратить этот блокирующий вызов в неблокирующий?


person Danilo Radenovic    schedule 05.07.2019    source источник


Ответы (1)


Правильный способ - фактически реализовать логику в потоке ввода-вывода, если он не блокируется. В противном случае делегируйте запрос выделенному потоку, например:

public class Example {

  public static void main(String[] args) {
    Undertow server = Undertow.builder()
        .addListener(8080, "localhost")
        .setHandler(new HttpHandler() {
          public void handleRequest(HttpServerExchange exchange)
              throws Exception {

            if (valueIsPresentInCache(exchange)) {
              getValueFromCache();  // non-blocking, can be done from IO thread           
            } else {

              if (exchange.isInIoThread()) {
                exchange.dispatch(this);
                // we return immediately, otherwise this request will be
                // handled both in IO thread and a Worker thread, throwing
                // an exception
                return;
              }

              fetchValueFromDatabase(); // blocking!!!

            }
          }
        })
        .build();
    server.start();
  }
}
person Danilo Radenovic    schedule 22.08.2019