Интеграция Spring Websocket с Kafka

Я пытаюсь отправить использованные данные Kafka во внешний интерфейс (JavaScript) через Spring-Websockets в проекте Spring MVC.

Чтобы установить связь между сервером и клиентом, у меня есть следующее.

Клиент (app.js)

function connect() {
    var socket = new SockJS('/kafka-data-websocket');
    stompClient = Stomp.over(socket);
    stompClient.connect({}, function (frame) {
        console.log('Connected: ' + frame);
        stompClient.send("/app/fetchData");
        stompClient.subscribe('/data/records', function (message) {
            console.log(JSON.parse(message.body).content);
        });
    });
}

Сервер (KafkaController.java)

@Controller
public class KafkaController {

    @MessageMapping("/fetchData")
    @SendTo("/data/records")
    public String fetchMetrics() {
        //...
    }
}

Чтобы использовать данные из определенной темы Kafka, я использую аннотацию @KafkaListener следующим образом:

public class KafkaReceiver {
    @KafkaListener(topics = "mytopic")
    public void receive(ConsumerRecord<?, ?> record) {
        MyRecord m = new MyRecord(new Long(record.offset()), record.key().toString(), record.value().toString());
           //...
    }
}

И у меня есть подходящий класс KafkaConfig со всеми необходимыми bean-компонентами (как описано здесь < / а>).

Как я могу отправлять данные из метода receive в fetchMetrics KafkaController (и, следовательно, в веб-сокет) при каждом входящем / потребляемом сообщении?


person Ricardo    schedule 26.09.2017    source источник
comment
вы нашли какое-нибудь решение для этого?   -  person Bhargav Patel    schedule 03.08.2018


Ответы (1)


Вы должны ввести SimpMessagingTemplate в KafkaReceiver и использовать его из метода receive():

 this.template.convertAndSend("/data/records", m);

См. Дополнительную информацию в Spring Framework Справочное руководство.

person Artem Bilan    schedule 26.09.2017
comment
Привет! большое спасибо за ответ. Я получил эту ошибку: SLF4J: Failed toString() invocation on an object of type [org.apache.kafka.clients.NodeApiVersions] java.lang.NullPointerException at org.apache.kafka.clients.NodeApiVersions.apiVersionToText(NodeApiVersions.java:167) Есть идеи? - person Ricardo; 02.10.2017