Подгород. Асинхронный ответ с другим потоком

Я пытаюсь понять, как создать настоящий асинхронный http-сервер с помощью Undertow. Как отправить ответ асинхронно, если у меня есть другой поток, который уже обрабатывает запрос?
Я написал такой код:

    Undertow server = Undertow.builder()
            .addHttpListener(8080, "localhost")
            .setHandler(exchange -> {
                CompletableFuture.runAsync(() -> {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }).thenAccept(string -> {
                    exchange.getResponseHeaders()
                            .put(Headers.CONTENT_TYPE, "text/plain");
                    exchange.getResponseSender().send("Hello World");
                    exchange.endExchange();
                }).exceptionally(throwable -> {
                    System.out.println(throwable.toString());
                    return null;
                });
            }).build();
    server.start();

но этот ответ сервера 200 без данных и в журналах

java.lang.IllegalStateException: UT000127: ответ уже отправлен

Когда я использую метод io.undertow.server.HttpServerExchange#dispatch(java.lang.Runnable) следующим образом:

    Undertow server = Undertow.builder()
            .addHttpListener(8080, "localhost")
            .setHandler(exchange -> {

                exchange.dispatch(() -> {

                    CompletableFuture.runAsync(() -> {
                        try {
                            Thread.sleep(100);
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
                    }).thenAccept(string -> {
                        exchange.getResponseHeaders()
                                .put(Headers.CONTENT_TYPE,"text/plain");
                        exchange.getResponseSender().send("Hello World");
                        exchange.endExchange();
                    }).exceptionally(throwable -> {
                        System.out.println(throwable.toString());
                        return null;
                    });

                });
            }).build();
    server.start();

конечно, ответ «Hello World», как и ожидалось, но сервер создает новый поток для каждого запроса!

(jvisualvm после 10 параллельных запросов) jvisualvm после 10 параллельных запросов


person QIvan    schedule 24.11.2017    source источник
comment
P.S. Я запускаю его в основном методе.   -  person QIvan    schedule 25.11.2017


Ответы (1)


подводное течение не поддерживает этот путь,

я создаю новый проект для его решения:

https://github.com/hank-whu/undertow-async

package io.undertow.async.pingpong;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

import io.undertow.async.handler.AsyncHttpHandler;
import io.undertow.async.io.PooledByteBufferInputStream;
import io.undertow.async.io.PooledByteBufferOutputStream;
import io.undertow.connector.ByteBufferPool;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.StatusCodes;

public class PingPongAsyncHttpHandler extends AsyncHttpHandler {

    @Override
    protected void handleAsyncRequest(HttpServerExchange exchange, PooledByteBufferInputStream content)
            throws Exception {

        CompletableFuture//
                .completedFuture(content)// init
                .thenApplyAsync(this::readBytesAndClose)// read
                .thenApplyAsync(bytes -> {// write
                    ByteBufferPool byteBufferPool = exchange.getConnection().getByteBufferPool();
                    PooledByteBufferOutputStream output = new PooledByteBufferOutputStream(byteBufferPool);
                    write(output, bytes);
                    return output;
                })//
                .thenAcceptAsync(output -> send(exchange, StatusCodes.OK, output));
    }

    private byte[] readBytesAndClose(PooledByteBufferInputStream content) {
        try {
            byte[] bytes = new byte[content.available()];
            content.read(bytes);
            return bytes;
        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            try {// must close it
                content.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void write(PooledByteBufferOutputStream output, byte[] bytes) {
        try {
            output.write("asycn response: ");
            output.write(bytes);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

}
person Kai Han    schedule 18.12.2017