Вопросы по теме 'apache-kafka-streams'
Ошибка dir io состояния Kafka-streams
Ниже выдается ошибка после прохождения потока в течение определенного времени? Я не могу найти, кто отвечает за создание файла .sst?
Env:
Kafka версии 0.10.0-cp1
Scala 2.11.8...
2729 просмотров
schedule
18.01.2024
Kafka KStreams — тайм-ауты обработки
Я пытаюсь использовать <KStream>.process() с TimeWindows.of("name", 30000) , чтобы объединить некоторые значения KTable и отправить их дальше. Кажется, что 30 секунд превышают интервал времени ожидания потребителя, после которого...
8664 просмотров
schedule
27.01.2023
Kafka KStream — использование AbstractProcessor с окном
Я надеюсь сгруппировать оконные пакеты вывода из KStream и записать их во вторичное хранилище.
Я ожидал, что .punctuate() будет вызываться примерно каждые 30 секунд. То, что я получил вместо этого, сохранено здесь .
(Исходный файл состоял из...
1675 просмотров
schedule
18.09.2022
kafka KStream - топология для подсчета за n секунд
У меня есть поток объектов JSON, которые я ввожу в хэш из нескольких значений. Я надеюсь считать по ключу с n-секундными (10? 60?) интервалами и использовать эти значения для анализа шаблонов.
Моя топология: K->aggregateByKey(n...
456 просмотров
schedule
16.02.2023
Почему я не вижу результатов метода сокращения Kafka Streams?
Учитывая следующий код:
KStream<String, Custom> stream =
builder.stream(Serdes.String(), customSerde, "test_in");
stream
.groupByKey(Serdes.String(), customSerde)
.reduce(new CustomReducer(), "reduction_state")...
3114 просмотров
schedule
08.01.2023
Как агрегировать KStream в список фиксированного размера?
Аналогично, но немного иначе, чем этот вопрос: окна пакетной обработки KStream , я хочу группировать сообщения из KStream перед тем, как отправить его потребителям.
Однако это нажатие должно быть запланировано не на фиксированное временное...
709 просмотров
schedule
24.01.2024
Как использовать фьючерсы с Kafka Streams
У меня есть кластер kafka, из которого я использую две темы, и присоединяюсь к нему. В результате объединения я делаю некоторые манипуляции с базой данных. Все операции с БД асинхронны, поэтому они возвращают мне Future (scala.concurrent.Future, но...
1589 просмотров
schedule
30.06.2023
Проблемы с приложением Kafka Streams в версии 0.10.2.0
У меня было существующее приложение Kafka Streams, которое отлично работало с 0.10.1.1. Обновлена новая библиотека Kafka Streams 0.10.2.0 вместе с новым брокером (хотя новая библиотека обратно совместима с 0.10.1.1). Быстрый фон
У меня есть...
1689 просмотров
schedule
23.11.2023
Выборы лидера Kafka привели к краху Kafka Streams
У меня есть приложение Kafka Streams, которое потребляет и производит кластер Kafka с 3 брокерами и коэффициентом репликации 3. За исключением тем смещения потребителя (50 разделов), все остальные темы имеют только по одному разделу.
Когда брокеры...
2723 просмотров
schedule
06.03.2022
Пример использования Kafka Streams
Я создаю простое приложение, которое ниже по порядку -
1) Считывает сообщения от удаленного IBM MQ (устаревшая система работает только с IBM MQ)
2) Записывает эти сообщения в Kafka Topic
3) Читает эти сообщения из той же темы Kafka и вызывает...
4165 просмотров
schedule
04.03.2022
Исключение KafkaStreams serde
я играю с Kafka и потоковой технологией; Я создал собственный сериализатор и десериализатор для KStream, который я буду использовать для получения сообщений из данной темы.
Теперь проблема в том, что я создаю serde таким образом:...
6817 просмотров
schedule
24.12.2022
Присоединение к KTable с KStream, и в выходной теме ничего не приходит
Я покинул KStream с помощью KTable, но не вижу вывода в тему вывода:
val stringSerde: Serde[String] = Serdes.String()
val longSerde: Serde[java.lang.Long] = Serdes.Long()
val genericRecordSerde: Serde[GenericRecord] = new GenericAvroSerde()...
1267 просмотров
schedule
07.01.2023
Приложение Kafka Streams НЕ дает сбой, когда кластер Kafka выходит из строя
У меня запущено приложение Kafka Streams (0.10.2.1). Когда я выключаю кластер Kafka, приложение потоков продолжает ждать следующего сообщения, когда кластер будет восстановлен, он возобновит прием сообщений. Пока кластер не работает, приложение...
722 просмотров
schedule
02.06.2023
Spring Kafka - Источник событий - Пример того, как запросить состояние некоторой сущности с помощью Kafka + KafkaStreams API
Я использую Kafka для реализации архитектуры, основанной на поиске событий.
Допустим, я храню события в формате JSON:
{"name": "ProductAdded", "productId":"1", quantity=3, dateAdded="2017-04-04" }
Я хотел бы реализовать запрос, чтобы...
3947 просмотров
schedule
01.06.2022
Исключения при запуске толстой банки потоков кафки
Я пробую пример подсчета слов для изучения потоков кафки. Ниже приведен используемый код. Я создал толстую банку из проекта и начал создавать сообщения в теме word-count-input1 и получать вывод из word-count-output1 . Но когда я запускаю толстую...
1293 просмотров
schedule
04.07.2022
Проблема с фильтрацией потока Kafka
Я пытаюсь запустить базовое приложение из следующего примера:
https://github.com/confluentinc/examples/blob/3.3.x/kafka-streams/src/main/scala/io/confluent/examples/streams/MapFunctionScalaExample.scala
Однако я получаю исключение в этой...
603 просмотров
schedule
12.05.2023
Присоединение к потокам Kafka, содержащим объекты хэш-карты Java
В настоящее время я работаю над созданием конвейера данных. Я читаю из базы данных sql 2 таблицы, и мне нужно сохранить их в денормализованном формате в хранилище данных OLAP после объединения их в поток с использованием потоков Kafka.
Вместо...
984 просмотров
schedule
07.04.2023
Что мне следует использовать: Kafka Stream или потребительский API Kafka или Kafka connect
Я хотел бы знать, что для меня лучше всего: поток Kafka или потребительский API Kafka или подключение Kafka?
Я хочу прочитать данные из темы, затем обработать и записать в базу данных. Итак, я написал потребителей, но я чувствую, что могу написать...
1864 просмотров
schedule
19.01.2024
Метод Kafka streams.allMetadata () возвращает пустой список
Итак, я пытаюсь получить интерактивные запросы, работающие с потоками Kafka. У меня Zookeeper и Kafka работают локально (в Windows). Где я использую C: \ temp в качестве папки для хранения, как для Zookeeper, так и для Kafka.
Я настроил тему вот...
1995 просмотров
schedule
04.03.2022
Запись соединения KStream-KTable в KTable: как синхронизировать соединение с записью ktable?
У меня проблема с тем, как ведет себя следующая топология:
String topic = config.topic();
KTable<UUID, MyData> myTable = topology.builder().table(UUIDSerdes.get(), GsonSerdes.get(MyData.class), topic);
// Receive a stream of various...
1672 просмотров
schedule
08.05.2023