Вы можете решить эту проблему, сохраняя Queue<Promise>
для каждого вышестоящего сервера, который помечает промисы как завершенные, а в результате — ответ.
Решение проблемы будет выглядеть как этот ответ, но нам нужно отредактировать некоторую поддержку конвейерной обработки в потоке, поэтому несколько маршруты обрабатываются корректно.
Предполагая, что ваш вариант использования выглядит примерно так:
+------------+ +------------+
| | | |
| client |+--------+ +----->| upstream |
| 1 | | | | 1 |
+------------+ | | +------------+
| |
+------------+ | +----------+ | +------------+
| | | | this | | | |
| client +---------+------>| server |-----+----->| upstream |
| 2 | | | | | | 2 |
+------------+ | +----------+ | +------------+
| |
+------------+ | | +------------+
| | | | | |
| client +---------+ +----->| upstream |
| 3 | | 3 |
+------------+ +------------+
При реализации этого необходимо убедиться, что соблюдены следующие пункты:
- Если мы разрешаем конвейерную обработку запросов (рекомендуется для повышения производительности), нам нужен либо способ идентификации запросов с использованием, например, порядковых номеров, либо нам нужно дефрагментировать пакеты, чтобы они поступали в правильном порядке.
- Нам нужен способ отправить клиенту ответ о тайм-ауте, если тайм-аут восходящего потока
Мы собираемся предположить, что протокол не имеет порядковых номеров пакетов, так как это самый сложный случай.
Предполагая, что «Служба обработчика запросов» была написана с учетом поддержки большого количества запросов, у нее есть методы, которые можно вызывать с некоторым обратным вызовом.
При условии, что ваша «Служба обработчика запросов» имеет реализацию API, которая выглядит следующим образом:
public interface RequestHandlerService {
public default Future<Response> callMethod(Command cmd) {
return callMethod(cmd, promise);
}
public Future<Response> callMethod(Command cmd, Promise promise);
}
public interface ServerSelector {
public RequestHandlerService selectNextServer(Command cmd);
}
public interface Command {
// ....
}
public interface Response {
// ....
}
Приведенная выше система достаточно универсальна, чтобы ее можно было использовать почти для всех типов API, метод, который можно быстро реализовать, даже если система выше, которая не поддерживает передачу класса обратного вызова, как показано на этот ответ, который делает это для соединения Netty.
Базовый скелет обработчика
Мы можем сделать следующий обработчик для обработки запросов от нашего клиента:
public class UpstreamDispachHandler extends SimpleInboundHandler<String> {
private final static int MAX_PIPELINED_REQUESTS = 32;
private final ServerSelector servers;
private final ArrayDeque<Future<Response>> messageList =
new ArrayDeque<>(MAX_PIPELINED_REQUESTS);
protected ChannelHandlerContext ctx;
public UpstreamDispachHandler (ServerSelector servers) {
this.servers = servers;
}
public void channelRegistered(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
// The following is called messageReceived(ChannelHandlerContext, Command) in Netty 5.0
@Override
public void channelRead0(ChannelHandlerContext ctx, Command msg) {
if(messageList.size() >= MAX_PIPELINED_REQUESTS) {
// Fast fail if the max requests is exceeded
ctx.writeAndFlush(new FailedResponse(msg));
return;
}
RequestHandlerService nextServer = servers.selectNextServer(msg);
if (nextServer == null) {
// Fast fail if the max requests is exceeded
ctx.writeAndFlush(new FailedResponse(msg));
return;
}
sendCommandUpstream(msg, nextServer);
}
}
Приведенный выше код объявляет конструктор и используемые нами переменные, мы используем ArrayDeque
в качестве временного хранилища для нашего ответа, чтобы убедиться, что порядок сохранен.
Теперь мы определяем sendCommandUpstream(Command, RequestHandlerService)
для добавления запроса в очередь и передачи его восходящему потоку.
private void sendCommandUpstream(Command cmd, RequestHandlerService nextServer) {
synchronized(messageList) {
messageList.add(nextServer.callMethod(cmd, ctx.executor().newPromise()
.addListener(f->recalculatePendingReplies())));
}
}
Причина, по которой мы синхронизируем через messageList
вместо использования Queue
с поддержкой внутренней синхронизации, заключается в том, чтобы убедиться, что некоторые из наших будущих операций будут работать правильно.
Сейчас мы находимся в последней части клиент-серверной части программного обеспечения, следя за тем, чтобы ответы направлялись обратно в правильном порядке. Для этого мы просматриваемво главе Queue
проверить, выполнено ли это Future
сделано. Если голова готова, мы можем remove()
, извлеките из него Response
и отправьте обратно клиенту, убедившись, что мы flush()
конвейер после отправки всех ответов. Из соображений производительности мы сбрасываем только в конце, а не после каждого пакета.
private void recalculatePendingReplies() {
boolean hasSendMessage = false;
boolean interruped = false;
synchronized(messageList) {
Future<Response> elm = messageList.peek();
while (elm != null && elm.isDone()) {
elm.remove();
Response result;
try {
while(true) {
try {
result = elm.get();
break;
} catch (InterruptedException e) {
interrupted = true;
}
}
} catch (ExecutionException e) {
// Do something better with e
e.printStackTrace();
result = new FailedResponse();
} catch (ExecutionException e) {
// Task is cancelled
result = new FailedResponse();
}
ctx.write(result);
hasSendMessage = true;
}
if(hasSendMessage)
ctx.flush();
}
if(interrupted)
Thread.currentThread().interrupt();
}
В приведенном выше коде используется несколько шаблонов проектирования, например, логический флаг используется для сохранения статуса прерывания потока вместо того, чтобы проглатывать исключение. Даже в случае возникновения InterruptedException (а такого быть не должно), оно обрабатывается должным образом.
Добавление поддержки тайм-аутов запросов
Иногда мы хотим реализовать поддержку тайм-аутов в нашем протоколе, например, если восходящий поток не отвечает в течение x времени. HashedWheelTimer
идеально подходит для такого рода операций, поскольку его характеристика могут быть легко изменены в соответствии с шаблоном использования приложений.
Продолжительность тика
Как описано для «приблизительно», этот таймер не выполняет запланированную задачу TimerTask вовремя. HashedWheelTimer на каждом тике проверяет, есть ли какие-либо задачи TimerTask, отстающие от расписания, и выполняет их. Вы можете увеличить или уменьшить точность времени выполнения, указав меньшую или большую продолжительность тика в конструкторе. В большинстве сетевых приложений время ожидания ввода-вывода не обязательно должно быть точным. Таким образом, длительность тика по умолчанию составляет 100 миллисекунд, и в большинстве случаев вам не нужно пробовать разные конфигурации.
Тактов на колесо (размер колеса)
HashedWheelTimer поддерживает структуру данных, называемую «колесо». Проще говоря, колесо — это хэш-таблица TimerTasks, хэш-функция которой — «мертвая линия задачи». Количество тактов на колесо по умолчанию (т. е. размер колеса) равно 512. Вы можете указать большее значение, если собираетесь запланировать много тайм-аутов.
Мы определяем следующий набор констант в верхней части нашей программы:
private final static long TICK_DURATION = 100;
private final static int WHEEL_SIZE = 128;
private final static long DEFAULT_TASK_TIMEOUT = TICK_DURATION * DEFAULT_TASK_TIMEOUT;
private final static HashedWheelTimer timer = new HashedWheelTimer(
Executors.defaultThreadFactory(),
TICK_DURATION,
TimeUnit.MILLISECONDS,
WHEEL_SIZE);
Затем мы модифицируем наш метод sendCommandUpstream
, чтобы запланировать тайм-аут в таймере, чтобы задачи cancel()
. Мы можем сделать это с помощью следующего:
private void sendCommandUpstream(Command cmd, RequestHandlerService nextServer) {
synchronized(messageList) {
Future<Response> r = nextServer.callMethod(cmd, ctx.executor().newPromise()
.addListener(f->recalculatePendingReplies()));
messageList.add(r);
timer.newTimeout((Timeout timeout) -> {
if (!r.isDone())
r.cancel(true);
}, DEFAULT_TASK_TIMEOUT, TimeUnit.MILLISECONDS);
}
}
person
Ferrybig
schedule
25.01.2016