Присоединение к потокам Kafka, содержащим объекты хэш-карты Java

В настоящее время я работаю над созданием конвейера данных. Я читаю из базы данных sql 2 таблицы, и мне нужно сохранить их в денормализованном формате в хранилище данных OLAP после объединения их в поток с использованием потоков Kafka.

Вместо того, чтобы иметь отдельную тему для каждой таблицы, у меня есть две таблицы, в которых данные вставляются в одну тему.

Я конвертирую строку в хэш-карту, а затем с помощью сериализатора байтов преобразую эту информацию в массив байтов и отправляю в темы, поэтому вся информация в строке хранится в одном объекте. Код для которого:

ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutput out = null;
byte[] yourBytes = null;
try {
     out = new ObjectOutputStream(bos);
     out.writeObject(record);
     // here record is the row hashmap
     out.flush();
     yourBytes = bos.toByteArray();
}
catch (IOException ex) {
    // ignore close exception
}

В приложении потоковой обработки я десериализую массив байтов обратно в хэш-карту и фильтрую записи в два отдельных потока, каждый для одной таблицы.

Итак, мои записи на этапе обработки после десериализации массива байтов обратно в объект hashmap, записи выглядят следующим образом, где одна запись для каждого потока, относящегося к каждой таблице, показана ниже:

(key,{meta = "PRODUCTS",PRODUCTNAME=ONE, ISACTIVE=1, METATABLENAME=PRODUCT, PRODUCTSUBCATEGORYID=16, PRODUCTID=57})

(key,{meta = "BRAND", BRANDNAME="ABC", BRANDID=16, PRODUCTID=57, BRANDCATEGORY = "Electronics"})

Теперь мне нужно объединить данные в двух потоках, где каждое значение представляет собой хэш-карту, и объединить ключ PRODUCTID, который является общим полем для обеих таблиц, и, наконец, сгенерировать одну хеш-карту для каждой строки и перетащите этот поток в тему.

Таким образом, объединенные записи будут выглядеть следующим образом:

(key,{meta = "JOINEDTABLE",PRODUCTNAME=ONE, ISACTIVE=1, METATABLENAME=PRODUCT, PRODUCTSUBCATEGORYID=16, BRANDNAME="ABC", BRANDID=16, PRODUCTID=57,BRANDCATEGORY = "Electronics"})

можно ли это сделать с помощью потоков Kafka, и если да, то как?


person Anmol    schedule 30.08.2017    source источник


Ответы (1)


Если вы хотите присоединиться к Kafka Streams, вам нужно извлечь атрибут соединения и установить его в качестве ключа для сообщения:

KStream streamOfTable1 = ...
streamOfTable1.selectKey(/*extract productId and set as key*/).to("newTopic1");

KStream streamOfTable2 = ...
streamOfTable2.selectKey(/*extract productId and set as key*/).to("newTopic2");

KTable table1 = builder.table("newTopic1");
KTable table2 = builder.table("newTopic2");

table1.join(table2, ...).to("resultTopic");

Дополнительные сведения см. В документации: http://docs.confluent.io/current/streams/developer-guide.html#joining

Я действительно предполагал, что соединение KTable-KTable - это то, что вам нужно. Обратите внимание, что вам нужно создать «newTopic1» и «newTopic2» вручную, и что у обоих должно быть одинаковое количество разделов. (см. http://docs.confluent.io/current/streams/developer-guide.html#user-topics)

Также ознакомьтесь с другими доступными типами соединений, если соединения KTable-KTable вам не нужны.

person Matthias J. Sax    schedule 30.08.2017
comment
Спасибо @Matthias, это сработало. Я использовал для этого KTables. - person Anmol; 01.09.2017