Интегрируйте spring -actor в существующее приложение Spring Framework 4 STOMP Over WebSocket

В моем приложении используется Spring Framework 4, включенный в модуль обмена сообщениями Spring (с ключевыми абстракциями из проекта Spring Integration, такими как Message, MessageChannel, MessageHandler и другие, которые могут служить основой для такой архитектуры обмена сообщениями).

Мое приложение использует Websocket и STOMP. Он поддерживает соединения (сеансы веб-сокетов) с большим количеством клиентов java-веб-сокетов, и одним из требований было использование либо akka, либо реактора.

Я хочу интегрировать пружинный реактор RingBufferAsyncTaskExecutor вместо ThreadPoolTaskExecutor в clientInboundChannelExecutor и clientOutboundChannelExecutor, чтобы повысить пропускную способность. По крайней мере, я определил этот подход как способ интеграции пружинного реактора в мое существующее приложение - это может быть неправильный подход.

Я смотрел на Reaction-si-quickstart, поскольку он демонстрирует, как использовать реактор с интеграцией Spring, и поскольку обмен сообщениями Spring в Spring Framework 4 включает ключевые абстракции из проекта Spring Integration. Я подумал, что это будет ближайшая ссылка.

Моя рабочая конфигурация java для веб-сокета имеет следующее объявление класса: открытый класс WebSocketConfig extends WebSocketMessageBrokerConfigurationSupport реализует WebSocketMessageBrokerConfigurer. WebSocketMessageBrokerConfigurationSupport расширяет AbstractMessageBrokerConfiguration.

В org.springframework.messaging.simp.config.AbstractMessageBrokerConfiguration я хотел попробовать настроить RingBufferAsyncTaskExecutor вместо ThreadPoolTaskExecutor

@Bean
public ThreadPoolTaskExecutor clientInboundChannelExecutor() {
    TaskExecutorRegistration reg = getClientInboundChannelRegistration().getOrCreateTaskExecRegistration();
    ThreadPoolTaskExecutor executor = reg.getTaskExecutor();
    executor.setThreadNamePrefix("clientInboundChannel-");
    return executor;
}

Когда я пытаюсь переопределить этот метод в WebSocketConfig «Метод getOrCreateTaskExecRegistration () из типа ChannelRegistration не отображается», потому что в AbstractMessageBrokerConfiguration он защищен ....

protected final ChannelRegistration getClientInboundChannelRegistration() {
    if (this.clientInboundChannelRegistration == null) {
        ChannelRegistration registration = new ChannelRegistration();
        configureClientInboundChannel(registration);
        this.clientInboundChannelRegistration = registration;
    }
    return this.clientInboundChannelRegistration;
}

Я не совсем понимаю иерархию WebSocketMessageBrokerConfigurationSupport или интерфейс WebSocketMessageBrokerConfigurer в моем WebSocketConfig. Я просто поигрался с переопределением того, что мне нужно, чтобы мои настройки работали.

Не уверен, что это актуально, но мне не нужен внешний брокер, потому что мое приложение на данный момент не отправляет никаких данных всем подключенным подписчикам и вряд ли будет работать в будущем. Связь с клиентами веб-сокетов java типа демона является двухточечной, но веб-сокет веб-интерфейса в браузере действительно использует подписку для получения данных в реальном времени, поэтому это удобная настройка (а не прямой канал весенней интеграции) и там, где есть четкие источники на как его настроить - все же я не уверен, что это самый эффективный дизайн приложения. Архитектура обмена сообщениями STOMP через WebSocket, описанная в справочной документации Spring-framework, была наиболее комплексным подходом, поскольку это мой первый весенний проект.

Можно ли повысить производительность за счет интеграции пружинного реактора в мое существующее приложение?

Или, если бы я попытался использовать вместо этого интеграцию Spring, это потребовало бы больших изменений, насколько я могу судить - также кажется нелогичным, что это было бы необходимо, учитывая, что Spring Framework 4, включенный в модуль обмена сообщениями Spring, был получен из интеграции Spring.

Как мне интегрировать пружинный реактор в мою стандартную платформу Spring 4 STOMP Over WebSocket Messaging Architecture?

Если настройка RingBufferAsyncTaskExecutor вместо ThreadPoolTaskExecutor в clientInboundChannelExecutor и clientOutboundChannelExecutor является правильным способом, как мне это сделать?


person justify    schedule 21.11.2014    source источник


Ответы (1)


На самом деле RingBufferAsyncTaskExecutor не ThreadPoolTaskExecutor, поэтому вы не можете использовать его таким образом.

Вы можете просто переопределить clientInbound(Outbound)Channel beans из своего AbstractWebSocketMessageBrokerConfigurer impl и просто использовать @EnableWebSocketMessageBroker:

@Configuration
@EnableWebSocketMessageBroker
@EnableReactor
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

    @autowired
    Environment reactorEnv;

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry configurer) {
        configurer.setApplicationDestinationPrefixes("/app");
        configurer.enableSimpleBroker("/topic", "/queue");
    }

    @Bean
    public AbstractSubscribableChannel clientInboundChannel() {
        ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(new RingBufferAsyncTaskExecutor(this.reactorEnv));
        ChannelRegistration reg = getClientOutboundChannelRegistration();
        channel.setInterceptors(reg.getInterceptors());
        return channel;
    }

}

И обратите внимание на поддержку WebSocket в Интеграция Spring.

Кстати: укажите, пожалуйста, на ссылку для этого reactor-si-quickstart.

person Artem Bilan    schedule 21.11.2014
comment
Я нашел эту страницу после того, как разместил вопрос github.com/reactor/reactor/wiki/AsyncTaskExecutor кажется, что вам следует использовать реактор AsyncTaskExecutor вместо ThreadPoolExecutor, чтобы получить максимально возможную пропускную способность - мне придется когда-нибудь прочитать, как все эти интерфейсы связаны. Спасибо за пример, попробую. Ссылка: github.com/reactor/reactor-si-quickstart. - person justify; 22.11.2014