Вопросы по теме 'spring-kafka'

Получение только одной записи из многих, когда потребитель kafka впервые получает запись?
Я использую spring-kafka и spring-kafka-test версии 1.0.2.RELEASE. В одном из моих тестов мое приложение отправляет 100 записей подряд в один TopicPartion на экземпляре EmbeddedKafka с использованием KafkaTemplate и в основном с настройками...
3202 просмотров

Десериализация Spring-Kafka
Я пытаюсь создать потребителя кафки, который слушает определенную тему и обрабатывает потребляемое сообщение как JSON. Я попытался следовать подходу, указанному в весенних документах здесь , но не могу чтобы получать сообщения в формате JSON....
3541 просмотров

Доступ к потребителю из ConsumerRebalanceListener в Spring Kafka
Мне нужно использовать ConsumerRebalanceListener и обнаружить, что его можно зарегистрировать с помощью метода containerProperties.setConsumerRebalanceListener. Мне нужен экземпляр потребителя в прослушивателе балансировщика для получения позиций...
1572 просмотров
schedule 18.04.2023

spring-cloud-stream-kafka Потреблять только последние сообщения после запуска приложения
В нашем проекте мы используем spring-cloud-stream-binder-kafka версии 1.1.2 для интеграции с kafka. Недавно у нас была ситуация, когда одна из наших служб потребляла старые сообщения (уже использованные) из темы после запуска. В этой теме есть 2...
704 просмотров

KafkaProducerException при отправке сообщения в тему
Свойства Spring загрузки для производителя kafka: spring.kafka.bootstrap-servers=localhost:9092 spring.kafka.client-id=bam #spring.kafka.producer.acks= # Number of acknowledgments the producer requires the leader to have received before...
2336 просмотров

Включение @KafkaListener для получения имен переменных тем из файла application.yml
Я пытаюсь загрузить несколько тем в один @KafkaListener , но у меня возникают проблемы, так как я считаю, что он ищет постоянное значение, но инициализация переменной topics из файла application.yml вызывает какие-то проблемы, мне было интересно,...
11593 просмотров
schedule 21.04.2023

Spring Kafka - Источник событий - Пример того, как запросить состояние некоторой сущности с помощью Kafka + KafkaStreams API
Я использую Kafka для реализации архитектуры, основанной на поиске событий. Допустим, я храню события в формате JSON: {"name": "ProductAdded", "productId":"1", quantity=3, dateAdded="2017-04-04" } Я хотел бы реализовать запрос, чтобы...
3947 просмотров

@KafkaListener должен извлекать новые данные только при выполнении определенных условий. Если условие не выполняется, извлечение данных должно прекратиться до тех пор, пока условие не будет выполнено.
Вариант использования, над которым я работаю, заключается в том, что сообщение, полученное от KafkaListener, запускает асинхронный метод. Я хочу, чтобы этот метод Aysnc завершился и только после этого получил новое сообщение из очереди kafka. Любые...
160 просмотров
schedule 16.12.2022

не могу подключиться к кафке с внешней машины
Я начинаю с Apache Kafka, и у меня возникают проблемы, когда я пытаюсь подключиться к внешней машине. С приведенной ниже конфигурацией все работает нормально, если приложение и докер работают на одном компьютере. но когда я помещаю приложение на...
3444 просмотров
schedule 11.06.2022

auto-offset-reset = latest не работает в spring-kafka
У меня есть вариант использования, когда я хочу, чтобы потребитель всегда начинал с последнего смещения. Мне не нужно фиксировать смещения для этого потребителя. Этого невозможно достичь с помощью spring-kafka, поскольку новая группа потребителей...
2204 просмотров
schedule 12.04.2022

Kafka потребляет необработанные сообщения. Как позже переработать неработающие сообщения?
Мы реализуем Kafka Consumer с помощью Spring Kafka. Как я правильно понимаю, если обработка одного сообщения не удалась, есть возможность Все равно и просто ACK Сделайте некоторую обработку повторных попыток, используя RetryTemplate Если...
646 просмотров

Интеграция Spring Websocket с Kafka
Я пытаюсь отправить использованные данные Kafka во внешний интерфейс (JavaScript) через Spring-Websockets в проекте Spring MVC. Чтобы установить связь между сервером и клиентом, у меня есть следующее. Клиент (app.js) function connect() {...
4672 просмотров

Потребление сообщения avro с помощью spring-kafka, созданного с помощью spring-cloud-stream-kafka-binder
Я пытаюсь прочитать последнее сообщение, доступное в теме kafka, используя ConsumerSeekAware. Тип сообщения - Список объектов Avro. Я умею это делать успешно. Но когда во время десериализации это не удается. Сообщение было создано с использованием...
1055 просмотров

Spring Kafka: как отбросить сообщения, уже полученные опросом () после выполнения поиска ()?
Это дополнительный вопрос к чтению одного и того же сообщения несколько раз от Kafka . Если есть лучший способ задать этот вопрос, не публикуя новый вопрос, дайте мне знать. В этом посте Гэри упоминает «Но вы все равно сначала увидите более...
4523 просмотров
schedule 02.08.2023

Spring kafka Batch Listener - фиксировать смещения вручную в пакетном режиме
Я реализую пакетный прослушиватель spring kafka, который читает список сообщений из темы Kafka и отправляет данные в службу REST. Я хотел бы понять управление смещением в случае отказа службы REST, смещения для пакета не должны фиксироваться, а...
7083 просмотров
schedule 01.04.2022

Как восстановить из исключений, отправленных производителем.send () в Spring Cloud Stream
Мы испытали следующий сценарий: У нас есть кластер Kafka, состоящий из 3 узлов, каждая созданная тема имеет 3 раздела. Сообщение отправляется через MessageChannel.send() , создавая запись, скажем, для раздела 1. Брокер, действующий в...
1028 просмотров
schedule 03.04.2022

Как агрегировать данные ежечасно?
Всякий раз, когда пользователь добавляет в избранное какой-либо контент на нашем сайте, мы собираем события, и то, что мы планировали сделать, - это ежечасно фиксировать совокупное избранное контента и обновлять общее количество избранных в БД. Мы...
5404 просмотров

Как использовать Spring Integration 5 с Spring Boot 1.5.x
Я пытаюсь настроить проект, использующий Spring Boot 1.5.10 и Spring Integration. Кажется, что Spring Boot 1.5.x использует старую версию Spring Integration, которая является .xml#L160" rel="nofollow noreferrer">4.3.14 . Для проекта также...
456 просмотров

Приложение Spring Cloud Stream завершает работу после одного ввода мусора
У меня проблема с приложением облачного потока Spring, которое использует компонент KStream. Он прослушивает один вход и направляет сообщения на один выход после их обработки. Он ожидает поступления строки JSON и пытается преобразовать ее в Spring...
442 просмотров

Канал ошибок Spring Cloud Stream Kafka
Я пытаюсь настроить привязку для пересылки сообщений Kafka из Spring Integration errorChannel в настраиваемый канал (для централизованной обработки ошибок). Сообщения об ошибках отправляются на настроенный канал, но они прибывают как...
2013 просмотров