Netty: Как разделить ChannelHandleContext между слабо связанными сервисами

Работая над клиент-серверным приложением, где на стороне сервера у меня есть клиентский сервисный компонент, который перехватывает все запросы сокетов от клиентов и после сканирования сообщения направляет его в разные службы через шину обмена сообщениями, поэтому они слабо связаны.

Что-то вроде этого:

Проблема заключается в том, что после того, как службы выполняются с бизнес-логикой, необходимо отправить ответ обратно клиенту, но через «Службу обработчика запросов», поэтому все ответные сообщения от всех служб проходят через один компонент службы, т.е. «Служба обработчика запросов», поскольку я не нет ссылки на объект ChannelHandleContext с другими службами для отправки ответа обратно клиенту.

Я думал о централизованной распределенной службе кэширования (например, memcached) для хранения сопоставления, подобного этому (идентификатор пользователя -> контекст сеанса (который является ChannelHandleContext из netty), но позже я понял, что не могу этого сделать, поскольку для этого требуется сериализация и этот объект не поддерживает это, поэтому у меня нет других вариантов, кроме как отправить ответ обратно в «Службу обработчика запросов» для обратной связи с клиентом.

Каков правильный подход в этом случае, чтобы я мог легко масштабировать свое приложение прямо сейчас, это действительно большая проблема дизайна.


person Dinesh Kumar    schedule 24.01.2016    source источник
comment
Почему вы откатили полезную правку, сделанную почти два года назад?   -  person Cody Gray    schedule 27.08.2017


Ответы (1)


Вы можете решить эту проблему, сохраняя Queue<Promise> для каждого вышестоящего сервера, который помечает промисы как завершенные, а в результате — ответ.

Решение проблемы будет выглядеть как этот ответ, но нам нужно отредактировать некоторую поддержку конвейерной обработки в потоке, поэтому несколько маршруты обрабатываются корректно.

Предполагая, что ваш вариант использования выглядит примерно так:

+------------+                                         +------------+
|            |                                         |            |
|   client   |+--------+                        +----->|  upstream  |
|     1      |         |                        |      |      1     |
+------------+         |                        |      +------------+
                       |                        |
+------------+         |       +----------+     |      +------------+
|            |         |       |   this   |     |      |            |
|   client   +---------+------>|  server  |-----+----->|  upstream  |
|     2      |         |       |          |     |      |      2     |
+------------+         |       +----------+     |      +------------+
                       |                        |
+------------+         |                        |      +------------+
|            |         |                        |      |            |
|   client   +---------+                        +----->|  upstream  |
|     3      |                                         |      3     |
+------------+                                         +------------+

При реализации этого необходимо убедиться, что соблюдены следующие пункты:

  • Если мы разрешаем конвейерную обработку запросов (рекомендуется для повышения производительности), нам нужен либо способ идентификации запросов с использованием, например, порядковых номеров, либо нам нужно дефрагментировать пакеты, чтобы они поступали в правильном порядке.
  • Нам нужен способ отправить клиенту ответ о тайм-ауте, если тайм-аут восходящего потока

Мы собираемся предположить, что протокол не имеет порядковых номеров пакетов, так как это самый сложный случай.

Предполагая, что «Служба обработчика запросов» была написана с учетом поддержки большого количества запросов, у нее есть методы, которые можно вызывать с некоторым обратным вызовом.

При условии, что ваша «Служба обработчика запросов» имеет реализацию API, которая выглядит следующим образом:

public interface RequestHandlerService {
    public default Future<Response> callMethod(Command cmd) {
        return callMethod(cmd, promise);
    }
    public Future<Response> callMethod(Command cmd, Promise promise);
}

public interface ServerSelector {
    public RequestHandlerService selectNextServer(Command cmd);
}

public interface Command {
    // ....
}

public interface Response {
    // ....
}

Приведенная выше система достаточно универсальна, чтобы ее можно было использовать почти для всех типов API, метод, который можно быстро реализовать, даже если система выше, которая не поддерживает передачу класса обратного вызова, как показано на этот ответ, который делает это для соединения Netty.

Базовый скелет обработчика

Мы можем сделать следующий обработчик для обработки запросов от нашего клиента:

public class UpstreamDispachHandler extends SimpleInboundHandler<String> {
    private final static int MAX_PIPELINED_REQUESTS = 32;
    private final ServerSelector servers;
    private final ArrayDeque<Future<Response>> messageList = 
                  new ArrayDeque<>(MAX_PIPELINED_REQUESTS);
    protected ChannelHandlerContext ctx;

    public UpstreamDispachHandler (ServerSelector servers) {
        this.servers = servers;
    }

    public void channelRegistered(ChannelHandlerContext ctx) {
        this.ctx = ctx;
    }

    // The following is called messageReceived(ChannelHandlerContext, Command) in Netty 5.0
    @Override
    public void channelRead0(ChannelHandlerContext ctx, Command msg) {
        if(messageList.size() >= MAX_PIPELINED_REQUESTS) {
            // Fast fail if the max requests is exceeded
            ctx.writeAndFlush(new FailedResponse(msg));
            return;
        }
        RequestHandlerService nextServer = servers.selectNextServer(msg);
        if (nextServer == null) {
             // Fast fail if the max requests is exceeded
            ctx.writeAndFlush(new FailedResponse(msg));
            return;
        }
        sendCommandUpstream(msg, nextServer);
    }
}

Приведенный выше код объявляет конструктор и используемые нами переменные, мы используем ArrayDeque в качестве временного хранилища для нашего ответа, чтобы убедиться, что порядок сохранен.

Теперь мы определяем sendCommandUpstream(Command, RequestHandlerService) для добавления запроса в очередь и передачи его восходящему потоку.

private void sendCommandUpstream(Command cmd, RequestHandlerService nextServer) {
    synchronized(messageList) {
        messageList.add(nextServer.callMethod(cmd, ctx.executor().newPromise()
                    .addListener(f->recalculatePendingReplies())));
    }
}

Причина, по которой мы синхронизируем через messageList вместо использования Queue с поддержкой внутренней синхронизации, заключается в том, чтобы убедиться, что некоторые из наших будущих операций будут работать правильно.

Сейчас мы находимся в последней части клиент-серверной части программного обеспечения, следя за тем, чтобы ответы направлялись обратно в правильном порядке. Для этого мы просматриваемво главе Queue проверить, выполнено ли это Future сделано. Если голова готова, мы можем remove(), извлеките из него Response и отправьте обратно клиенту, убедившись, что мы flush() конвейер после отправки всех ответов. Из соображений производительности мы сбрасываем только в конце, а не после каждого пакета.

private void recalculatePendingReplies() {
    boolean hasSendMessage = false;
    boolean interruped = false;
    synchronized(messageList) { 
        Future<Response> elm = messageList.peek();
        while (elm != null && elm.isDone()) {
            elm.remove();
            Response result;
            try {
                while(true) {
                    try {
                        result = elm.get();
                        break;
                    } catch (InterruptedException e) {
                        interrupted = true;
                    }
                }
            } catch (ExecutionException e) {
                // Do something better with e
                e.printStackTrace();
                result = new FailedResponse();
            } catch (ExecutionException e) {
                // Task is cancelled
                result = new FailedResponse();
            }
            ctx.write(result);
            hasSendMessage = true;  
        }
        if(hasSendMessage)
            ctx.flush();
    }
    if(interrupted)
        Thread.currentThread().interrupt();
}

В приведенном выше коде используется несколько шаблонов проектирования, например, логический флаг используется для сохранения статуса прерывания потока вместо того, чтобы проглатывать исключение. Даже в случае возникновения InterruptedException (а такого быть не должно), оно обрабатывается должным образом.

Добавление поддержки тайм-аутов запросов

Иногда мы хотим реализовать поддержку тайм-аутов в нашем протоколе, например, если восходящий поток не отвечает в течение x времени. HashedWheelTimer идеально подходит для такого рода операций, поскольку его характеристика могут быть легко изменены в соответствии с шаблоном использования приложений.

Продолжительность тика

Как описано для «приблизительно», этот таймер не выполняет запланированную задачу TimerTask вовремя. HashedWheelTimer на каждом тике проверяет, есть ли какие-либо задачи TimerTask, отстающие от расписания, и выполняет их. Вы можете увеличить или уменьшить точность времени выполнения, указав меньшую или большую продолжительность тика в конструкторе. В большинстве сетевых приложений время ожидания ввода-вывода не обязательно должно быть точным. Таким образом, длительность тика по умолчанию составляет 100 миллисекунд, и в большинстве случаев вам не нужно пробовать разные конфигурации.

Тактов на колесо (размер колеса)

HashedWheelTimer поддерживает структуру данных, называемую «колесо». Проще говоря, колесо — это хэш-таблица TimerTasks, хэш-функция которой — «мертвая линия задачи». Количество тактов на колесо по умолчанию (т. е. размер колеса) равно 512. Вы можете указать большее значение, если собираетесь запланировать много тайм-аутов.

Мы определяем следующий набор констант в верхней части нашей программы:

private final static long TICK_DURATION = 100;
private final static int WHEEL_SIZE = 128;
private final static long DEFAULT_TASK_TIMEOUT = TICK_DURATION * DEFAULT_TASK_TIMEOUT;
private final static HashedWheelTimer timer = new HashedWheelTimer(
              Executors.defaultThreadFactory(),
              TICK_DURATION,
              TimeUnit.MILLISECONDS,
              WHEEL_SIZE);

Затем мы модифицируем наш метод sendCommandUpstream, чтобы запланировать тайм-аут в таймере, чтобы задачи cancel(). Мы можем сделать это с помощью следующего:

private void sendCommandUpstream(Command cmd, RequestHandlerService nextServer) {
    synchronized(messageList) {
        Future<Response> r = nextServer.callMethod(cmd, ctx.executor().newPromise()
                              .addListener(f->recalculatePendingReplies()));
        messageList.add(r);
        timer.newTimeout((Timeout timeout) -> {
            if (!r.isDone()) 
                r.cancel(true);
        }, DEFAULT_TASK_TIMEOUT, TimeUnit.MILLISECONDS);
    }
}
person Ferrybig    schedule 25.01.2016