Как отправить пользовательское сообщение пользовательскому пользователю с помощью весеннего веб-сокета?

Я новичок в spring websocket . Я хочу отправить изменения продукта клиентам. Для этого я хочу сделать это следующим образом: Клиент создает сокетное соединение и подписывается на пункт назначения:

var socket = new SockJS('/websocket');
var stompClient = Stomp.over(socket);

stompClient.connect({}, function (frame) {
    stompClient.subscribe('/product/changes', function (scoredata) {
        // We received product changes
    });
});
//Send Ajax request and say server I want to know product with id=5 changes.
sendAjaxRequest(5);

Я настроил весеннее приложение следующим образом:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/product/");
        registry.setApplicationDestinationPrefixes("/app");
    }
}

Теперь мне нужен следующий метод:

@RestController
public class ProductController {

    @GetMapping("product-{id}")
    public void startSubscribe(@PathVariable("id") Long id) {

        // register current websocket session with product id and 
        // then with convertAndSendToUser send changes to current user.
    }

}

Как мне это реализовать?


person Morteza Malvandi    schedule 19.02.2019    source источник


Ответы (3)


Мой вопрос в первую очередь будет заключаться в том, почему вы пытаетесь отправить http-запрос на остальной контроллер, когда вы успешно интегрировали веб-сокеты с stomp? Если я правильно понимаю ваш вариант использования, я могу придумать три решения atm.

Решение 1 (идентификатор продукта сеанса сокета)

Вы можете отправить свой запрос напрямую с вашего клиента на сервер через открытое соединение через веб-сокет. Затем Spring может определить, какой сеанс Websocket выполнил вызов, и вы можете реализовать свою бизнес-логику. Вам нужно активировать другого брокера с именем «/queue» и указать префикс для целевого пользователя, который необходим, когда подписка не предназначена для трансляции. На стороне клиента также необходимо изменить путь подписки. Наконец, вы должны создать класс с комментарием @Controller, который содержит ваши сопоставления сообщений для получения сообщений от подключенного клиента.

Конфигурация сервера

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket").withSockJS();
    }
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableSimpleBroker("/queue", "/product");  // <- added "/queue"
        registry.setApplicationDestinationPrefixes("/app");
        registry.setUserDestinationPrefix("/user");
    }
}

Контроллер сервера

@Controller
public class WebSocketContoller{
    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    @MessageMapping("/product/register")
    public void register(@Payload Long productId, @Header("simpSessionId") String sessionId) {
        // register current websocket session with product id and 
        // then with convertAndSendToUser send changes to current user.

        // Example of how to send a message to the user using the sessionId
        String response = "This could also be one of your product objects of type Product";
        SimpMessageHeaderAccessor headerAccessor = SimpMessageHeaderAccessor.create(SimpMessageType.MESSAGE);
        headerAccessor.setSessionId(sessionId);
        headerAccessor.setLeaveMutable(true);

        messagingTemplate.convertAndSendToUser(sessionId,"/queue/product/changes", response, headerAccessor.getMessageHeaders());
    }
}

Изменение клиентской подписки

stompClient.subscribe('/user/queue/product/changes', function (scoredata) {
    // We received product changes
});

Для получения подробной информации вы также можете проверить этот ответ: https://stackoverflow.com/a/26288475/11133168


Решение 2 (идентификатор основного продукта)

Однако, если вы действительно хотите рассмотреть возможность использования остаточного контроллера, чтобы начать регистрацию вашего процесса, или если он просто не соответствует вашим требованиям, вам следует просмотреть ссылку ниже. Spring также может отслеживать активные сеансы веб-сокетов и их пользователей через открытый компонент SimpUserRegistry. Однако вам потребуется настроить собственный адаптер ChannelInterceptor для входного канала вашего клиента, в зависимости от безопасности ваших приложений, чтобы определить пользователя. Проверьте этот ответ для получения подробной информации и примеров кода: https://stackoverflow.com/a/45359294/11133168


Решение 3 (темы с идентификаторами продуктов)

Вы также можете подписаться на определенную тему идентификатора продукта, поэтому вам даже не нужно знать, какой пользователь хочет получать уведомления об изменениях для конкретного продукта.

Изменение клиентской подписки

//e.g if you want to be notified about changes for products with id 5 
stompClient.subscribe('/product/changes/5', function (scoredata) {
    // We received product changes
});

Пример службы сервера

@Service
public class WebSocketProductService{

    @Autowired
    private SimpMessagingTemplate simpMessagingTemplate;

    // This would be the method which should inform your clients about specific product     
    // changes, instead of the String parameters a Product object should be used instead, 
    // you have to call this method yourself on product changes or schedule it or sth.
    public void sendProductChange(String product, String productId) {
        this.simpMessagingTemplate.convertAndSend("/product/changes/"+productId, product);
    }
}

Контроллер сервера

Требуется, если вы хотите управлять списком подписок на идентификаторы продуктов. Как объяснялось в решении 1, вам нужен класс с аннотацией @Controller, который содержит метод с аннотацией @SubscribeMapping. Этот метод вызывается, если клиент пытается подписаться на указанный путь.

@Controller
public class WebSocketContoller{
    @SubscribeMapping("/product/changes/{productId}")
    public void productIdSubscription(@DestinationVariable Long productId) {
        //Manage your product id subscription list e.g.
    }
}
person FlorianDe    schedule 01.03.2019
comment
Пользователь может слушать разные продукты во время сеанса. - person Morteza Malvandi; 01.03.2019
comment
Почему мы должны указывать sessionid два раза? В первом параметре, а также в заголовках: messageTemplate.convertAndSendToUser(sessionId,/queue/product/changes, response, headerAccessor.getMessageHeaders()) - person GabrielBB; 05.09.2019

Если вы хотите отправлять пользователям обновления продукта только тогда, когда они запрашивают их, вы можете использовать обычные HTTP-запросы. Но я понимаю, что вы хотите push-уведомления на основе пользовательской бизнес-логики. Вы также должны реализовать Spring Security для аутентификации ваших пользователей.


Решение

Я предлагаю добавить эту бизнес-логику в ваш бэкэнд, используя таблицу user_product_updates( user_id, product_id) — каждая строка соответствует product_id, на которую пользователь с user_id хочет подписаться на обновления:

@GetMapping("product-{id}")
public void startSubscribe(@PathVariable("id") Long id) {
    // Save this custom setting into your models
}

Теперь вы можете запустить запланированное серверное задание (которое может быть заданием cron в зависимости от бизнес-логики ваших push-уведомлений) для отправки обновлений на ваши пользователи:

@Autowired 
org.springframework.messaging.simp.SimpMessagingTemplate simpMessagingTemplate;   

@Scheduled(cron = "0 0 1 * * ?") // based on your business logic (say daily at 12:01 am)
public void scheduleTaskUsingCronExpression() {
   // loop through user_product_updates table and construct "data"
   // username is from your spring security username (principal.getName())
   simpMessagingTemplate.convertAndSendToUser(username, "/queue/products", data);
}

В будущем вы можете добавить несколько кешей для их оптимизации (особенно для получения информации о продукте от product_id), чтобы все работало гладко.


Сводка

Ваша конфигурация веб-сокета:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/websocket").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.setApplicationDestinationPrefixes("/app")
            .setUserDestinationPrefix("/user")
            .enableSimpleBroker("/topic", "/queue", "/product");
    }
}

Ваш слушатель во внешнем приложении может выглядеть так:

that.stompClient.subscribe("/user/queue/products", (message) => {
    if (message.body) {
      // We received product changes
    }
});

Пользователи будут регистрироваться для получения обновлений продукта:

@GetMapping("product-{id}")
public void startSubscribe(@PathVariable("id") Long id) {
    // Save to your persistence module
    // (that the particular user wants updates from such-and-such products)
}

Задание внутреннего планировщика будет отправлять обновления по мере их доступности:

@Scheduled(cron = "0 0 1 * * ?") // based on your business logic
public void scheduleTaskUsingCronExpression() {
   // loop through user_product_updates table and construct "data"
   // username is from your spring security username (principal.getName())
   template.convertAndSendToUser(username, "/queue/products", data);
}
person kukkuz    schedule 02.03.2019
comment
спасибо за ваш ответ и некоторые примечания к вашему ответу: 1- этот метод имеет нагрузку на обработку, 2- он имеет задержку, 3- он должен реализовать безопасность 4- еще раз спасибо за ваш ответ :). - person Morteza Malvandi; 02.03.2019
comment
если ваш трафик или пользовательская база огромны, тогда есть нагрузка...convertAndSendToUser(username, "/queue/products", data) не требует весенней безопасности, она просто питается от нее... вы можете очень хорошо иметь имя пользователя веб-сокета от stompclient и отправлять пользовательские обновления.. :) - person kukkuz; 02.03.2019

Spring документация — хорошая отправная точка для изучения концепций веб-сокетов. Для отправки клиенту вы можете использовать SimpMessageSendingOperations.

@Autowired
private SimpMessageSendingOperations messageSendingOperations;

Из метода контроллера сообщение может быть отправлено примерно следующим образом:

messageSendingOperations.convertAndSendToUser(websocketUserId, "/product/changes", messageObject);
person Shah Minul Amin    schedule 19.02.2019
comment
Это может быть принципом текущей аутентификации (поддерживаемой безопасностью Spring). Вероятно, вам также придется работать с настройкой безопасности веб-сокета. docs.spring.io/spring- безопасность/сайт/документы/4.0.x/ссылка/html/ - person Shah Minul Amin; 19.02.2019
comment
Когда соединение с сокетом установлено, оно хэширует идентификатор. Как я могу получить к нему доступ? - person Morteza Malvandi; 19.02.2019
comment
Одним из способов получения информации о заголовке сообщения является использование SimpMessageHeaderAccessor (docs.spring.io/spring/docs/current/javadoc-api/org/). У него есть метод для получения текущего идентификатора сеанса. Вы можете ввести SimpMessageHeaderAccessor в свой метод контроллера, т.е. public void startSubscribe (SimpMessageHeaderAccessor headerAccessor, @PathVariable (id) Long id) - person Shah Minul Amin; 19.02.2019
comment
Методы getSessionId() и getSubscriptionId() возвращают null. - person Morteza Malvandi; 19.02.2019