Обработчик UDP с Reactor в Spring

Я хочу, чтобы мое приложение реагировало на события UDP, отправленные тысячами разных клиентов. Каждый клиент отправляет 1-10 пакетов UDP каждые 5-10 секунд. Каждый пакет будет и должен обрабатываться очень быстро (в основном в памяти и небольших вычислениях, помог с Redis, только случайные вызовы БД). Возврат данных вызывающим абонентам невозможен.

Я внедрил Reactor в Spring, как описано в вики. Затем я реализовал входящий канал UDP, как описано в их документах Spring Integration. Вот конфиг:

<int-ip:udp-inbound-channel-adapter id="receiverChannel"
                                    channel="stringConvert"
                                    port="9000"
                                    multicast="false"
                                    check-length="false"
                                    pool-size="10"
                                    lookup-host="false"
        />

<int:transformer id="convertChannel"
                 input-channel="stringConvert"
                 output-channel="toProcess"
                 ref="transformer"
                 method="transform"

        />

<int:service-activator input-channel="toProcess"
                       ref="accumulator"
                       method="accumulate"/>

<bean id="accumulator" class="hello.UDPAccumulator" />
<bean id="transformer" class="hello.UDPTransformer" />

И затем в UDPAccumulator я публикую это сообщение в реактор:

@Service
public class UDPAccumulator {

@Autowired
ReactorProducer producer;

public void accumulate(String quote) {
    producer.fireEvent(quote);

}

}

Является ли это «правильным» способом сделать это, учитывая, что мне нужна высокая производительность? Какова внутренняя работа int-ip:udp-inbound-channel-adapter и может ли он быть узким местом перед передачей сообщений в реактор? Я вижу, что у реактора есть некоторые классы и поддержка, связанные с TCP, но нет UDP. Любое предложение о том, как сделать это наилучшим образом, приветствуется!

Бонусный вопрос. Что, если сообщения приходят быстрее, чем отправляются в реактор? Будет хранилище сообщений Redis (внизу статьи) помочь? А что, если мой метод, который перемалывает эти пакеты в реакторе, работает медленно?


person Saša Šijak    schedule 03.11.2013    source источник


Ответы (2)


Поскольку у нас пока нет прямой поддержки UDP в Reactor, ваши абстракции для публикации событий в Reactor очень разумны. Но в своем «дополнительном вопросе» вы отмечаете, что существуют проблемы с пропускной способностью издателя/потребителя, которыми необходимо управлять каким-то специфичным для домена способом; там нет серебряной пули.

В вашем случае использования у меня действительно возникло бы искушение сказать Processor [1] может подойти лучше. Он обеспечивает гораздо более высокую общую пропускную способность для обработки данных, потому что он обходит динамическую диспетчеризацию на основе селектора, которая происходит в простом Reactor. Если вы не отправляете входящие события разным обработчикам на основе каких-то критериев темы, я бы посоветовал вам вместо этого взглянуть на это. С более высокой пропускной способностью вам придется немного меньше беспокоиться о том, чтобы потребители не отставали (если только ваш Consumer не делает что-то очень медленное, что ничто не может автоматически ускорить).

Но если вам действительно, действительно нужно управлять отставанием, я бы предложил разделить ваших производителей и потребителей с помощью Queue. Reactor имеет PersistentQueue абстракцию [2], которую вы можете использовать для публикации объекты и сохраняются на диске с помощью JavaChronicle [3], который затем можно слить в Consumer с помощью a Poller (javadoc выйдет Poller где-то на этой неделе, когда мы готовимся к версии 1.0... ранее он назывался Pipe [4]).

person Jon Brisbin    schedule 04.11.2013

Я не могу говорить с Reactor, но адаптер UDP имеет выделенный поток, который считывает необработанные пакеты и передает их TaskExecutor. Он делает это как можно скорее, чтобы вернуться к чтению следующего пакета.

По умолчанию TaskExecutor — это фиксированный пул потоков.

Reactor имеет DispatcherTaskExecutor, который можно внедрить в адаптер.

Один и тот же исполнитель задач используется для основного потока чтения и передачи обслуживания.

person Gary Russell    schedule 04.11.2013