Вопросы по теме 'apache-kafka-streams'

Ошибка dir io состояния Kafka-streams
Ниже выдается ошибка после прохождения потока в течение определенного времени? Я не могу найти, кто отвечает за создание файла .sst? Env: Kafka версии 0.10.0-cp1 Scala 2.11.8...
2729 просмотров

Kafka KStreams — тайм-ауты обработки
Я пытаюсь использовать <KStream>.process() с TimeWindows.of("name", 30000) , чтобы объединить некоторые значения KTable и отправить их дальше. Кажется, что 30 секунд превышают интервал времени ожидания потребителя, после которого...
8664 просмотров
schedule 27.01.2023

Kafka KStream — использование AbstractProcessor с окном
Я надеюсь сгруппировать оконные пакеты вывода из KStream и записать их во вторичное хранилище. Я ожидал, что .punctuate() будет вызываться примерно каждые 30 секунд. То, что я получил вместо этого, сохранено здесь . (Исходный файл состоял из...
1675 просмотров
schedule 18.09.2022

kafka KStream - топология для подсчета за n секунд
У меня есть поток объектов JSON, которые я ввожу в хэш из нескольких значений. Я надеюсь считать по ключу с n-секундными (10? 60?) интервалами и использовать эти значения для анализа шаблонов. Моя топология: K->aggregateByKey(n...
456 просмотров
schedule 16.02.2023

Почему я не вижу результатов метода сокращения Kafka Streams?
Учитывая следующий код: KStream<String, Custom> stream = builder.stream(Serdes.String(), customSerde, "test_in"); stream .groupByKey(Serdes.String(), customSerde) .reduce(new CustomReducer(), "reduction_state")...
3114 просмотров

Как агрегировать KStream в список фиксированного размера?
Аналогично, но немного иначе, чем этот вопрос: окна пакетной обработки KStream , я хочу группировать сообщения из KStream перед тем, как отправить его потребителям. Однако это нажатие должно быть запланировано не на фиксированное временное...
709 просмотров
schedule 24.01.2024

Как использовать фьючерсы с Kafka Streams
У меня есть кластер kafka, из которого я использую две темы, и присоединяюсь к нему. В результате объединения я делаю некоторые манипуляции с базой данных. Все операции с БД асинхронны, поэтому они возвращают мне Future (scala.concurrent.Future, но...
1589 просмотров
schedule 30.06.2023

Проблемы с приложением Kafka Streams в версии 0.10.2.0
У меня было существующее приложение Kafka Streams, которое отлично работало с 0.10.1.1. Обновлена ​​новая библиотека Kafka Streams 0.10.2.0 вместе с новым брокером (хотя новая библиотека обратно совместима с 0.10.1.1). Быстрый фон У меня есть...
1689 просмотров
schedule 23.11.2023

Выборы лидера Kafka привели к краху Kafka Streams
У меня есть приложение Kafka Streams, которое потребляет и производит кластер Kafka с 3 брокерами и коэффициентом репликации 3. За исключением тем смещения потребителя (50 разделов), все остальные темы имеют только по одному разделу. Когда брокеры...
2723 просмотров
schedule 06.03.2022

Пример использования Kafka Streams
Я создаю простое приложение, которое ниже по порядку - 1) Считывает сообщения от удаленного IBM MQ (устаревшая система работает только с IBM MQ) 2) Записывает эти сообщения в Kafka Topic 3) Читает эти сообщения из той же темы Kafka и вызывает...
4165 просмотров
schedule 04.03.2022

Исключение KafkaStreams serde
я играю с Kafka и потоковой технологией; Я создал собственный сериализатор и десериализатор для KStream, который я буду использовать для получения сообщений из данной темы. Теперь проблема в том, что я создаю serde таким образом:...
6817 просмотров
schedule 24.12.2022

Присоединение к KTable с KStream, и в выходной теме ничего не приходит
Я покинул KStream с помощью KTable, но не вижу вывода в тему вывода: val stringSerde: Serde[String] = Serdes.String() val longSerde: Serde[java.lang.Long] = Serdes.Long() val genericRecordSerde: Serde[GenericRecord] = new GenericAvroSerde()...
1267 просмотров
schedule 07.01.2023

Приложение Kafka Streams НЕ дает сбой, когда кластер Kafka выходит из строя
У меня запущено приложение Kafka Streams (0.10.2.1). Когда я выключаю кластер Kafka, приложение потоков продолжает ждать следующего сообщения, когда кластер будет восстановлен, он возобновит прием сообщений. Пока кластер не работает, приложение...
722 просмотров
schedule 02.06.2023

Spring Kafka - Источник событий - Пример того, как запросить состояние некоторой сущности с помощью Kafka + KafkaStreams API
Я использую Kafka для реализации архитектуры, основанной на поиске событий. Допустим, я храню события в формате JSON: {"name": "ProductAdded", "productId":"1", quantity=3, dateAdded="2017-04-04" } Я хотел бы реализовать запрос, чтобы...
3947 просмотров

Исключения при запуске толстой банки потоков кафки
Я пробую пример подсчета слов для изучения потоков кафки. Ниже приведен используемый код. Я создал толстую банку из проекта и начал создавать сообщения в теме word-count-input1 и получать вывод из word-count-output1 . Но когда я запускаю толстую...
1293 просмотров
schedule 04.07.2022

Проблема с фильтрацией потока Kafka
Я пытаюсь запустить базовое приложение из следующего примера: https://github.com/confluentinc/examples/blob/3.3.x/kafka-streams/src/main/scala/io/confluent/examples/streams/MapFunctionScalaExample.scala Однако я получаю исключение в этой...
603 просмотров

Присоединение к потокам Kafka, содержащим объекты хэш-карты Java
В настоящее время я работаю над созданием конвейера данных. Я читаю из базы данных sql 2 таблицы, и мне нужно сохранить их в денормализованном формате в хранилище данных OLAP после объединения их в поток с использованием потоков Kafka. Вместо...
984 просмотров

Что мне следует использовать: Kafka Stream или потребительский API Kafka или Kafka connect
Я хотел бы знать, что для меня лучше всего: поток Kafka или потребительский API Kafka или подключение Kafka? Я хочу прочитать данные из темы, затем обработать и записать в базу данных. Итак, я написал потребителей, но я чувствую, что могу написать...
1864 просмотров

Метод Kafka streams.allMetadata () возвращает пустой список
Итак, я пытаюсь получить интерактивные запросы, работающие с потоками Kafka. У меня Zookeeper и Kafka работают локально (в Windows). Где я использую C: \ temp в качестве папки для хранения, как для Zookeeper, так и для Kafka. Я настроил тему вот...
1995 просмотров

Запись соединения KStream-KTable в KTable: как синхронизировать соединение с записью ktable?
У меня проблема с тем, как ведет себя следующая топология: String topic = config.topic(); KTable<UUID, MyData> myTable = topology.builder().table(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic); // Receive a stream of various...
1672 просмотров
schedule 08.05.2023