Как читать дельта-записи из MS SQL с помощью Nifi / HDF

У меня есть несколько таблиц в MS SQL, эти таблицы обновляются каждую секунду и запрос более или менее выглядит так

SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID 
WHERE table2.UpdateTime >= ${lastUpdateTime} AND table2.G_ID > ${lastID}

Предположим, что запрос выбора внутреннего соединения дает 5 записей, как показано ниже.

Если запрос выполняется впервые, ${lastUpdateTime} и ${lastG_ID} установлен в 0, и он вернет менее 5 записей. После обработки записей запрос сохранит max(G_ID), то есть 5 и max(UpdateTime), то есть 1512010479, в etl_stat таблице.

 G_ID       UpdateTime   ID            Name             T_NAME 
-------------------------------------------------------------------
 1          1512010470  12591225      DUMMY_DATA       DUMMY_ID    
 2          1512096873  12591538      DUMMY_DATA       DUMMY_ID    
 3          1512096875  12591539      DUMMY_DATA       DUMMY_ID    
 4          1512010477  12591226      DUMMY_DATA       DUMMY_ID    
 5          1512010479  12591227      DUMMY_DATA       DUMMY_ID    

если таблица добавляет еще 5 новых записей, как показано ниже:

 G_ID       UpdateTime   ID            Name             T_NAME 
-------------------------------------------------------------------
 1          1512010470  12591225      DUMMY_DATA       DUMMY_ID    
 2          1512096873  12591538      DUMMY_DATA       DUMMY_ID    
 3          1512096875  12591539      DUMMY_DATA       DUMMY_ID    
 4          1512010477  12591226      DUMMY_DATA       DUMMY_ID    
 5          1512010479  12591227      DUMMY_DATA       DUMMY_ID 
 6          1512010480  12591230      DUMMY_DATA       DUMMY_ID 
 7          1512010485  12591231      DUMMY_DATA       DUMMY_ID 
 8          1512010490  12591232      DUMMY_DATA       DUMMY_ID 
 9          1512010493  12591233      DUMMY_DATA       DUMMY_ID 
 10         1512010500  12591234      DUMMY_DATA       DUMMY_ID 

Запрос сначала прочитает max(G_ID) и max(UpdateTime) из etl_stat table и будет кадрировать запрос следующим образом SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID WHERE table2.UpdateTime >= 1512010479 AND table2.G_ID > 5, так что запрос вернет только 5 дельта-записей, как показано ниже.

G_ID        UpdateTime   ID            Name             T_NAME 
-------------------------------------------------------------------
 6          1512010480  12591230      DUMMY_DATA       DUMMY_ID 
 7          1512010485  12591231      DUMMY_DATA       DUMMY_ID 
 8          1512010490  12591232      DUMMY_DATA       DUMMY_ID 
 9          1512010493  12591233      DUMMY_DATA       DUMMY_ID 
 10         1512010500  12591234      DUMMY_DATA       DUMMY_ID 

Таким образом, каждый раз при запуске запроса он должен сначала прочитать max(G_ID) и max(UpdateTime) из etl_stat таблицы и сформировать запрос выбора внутреннего соединения, как показано выше, и получить изменения дельты.

КАК ЕСТЬ АРХИТЕКТУРА С ИСПОЛЬЗОВАНИЕМ SPARK SQL

Я реализовал приведенный выше вариант использования следующим образом:

1) Spark JDBC читает таблицу Phoenix, чтобы получить max(G_ID) и max(UpdateTime) из таблицы etl_stat.

2) Spark JDBC кадрирует запрос выбора внутреннего соединения следующим образом SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID WHERE table2.UpdateTime >= 1512010479 AND table2.G_ID > 5

3) Spark JDBC выполняет шаг 2 запроса внутреннего соединения, считывает дельта-сообщения с сервера MS SQL, обрабатывает записи и вставляет в HBase.

4) После успешной вставки в HBase Spark обновляет таблицу etl_stat с последними G_ID, то есть 10 и UpdateTime, то есть 1512010500.

5) Это задание было запланировано для запуска cron каждые 1 минуту.

БЫТЬ АРХИТЕКТУРОЙ, ИСПОЛЬЗУЯ NIFI

Я хочу переместить этот вариант использования в Nifi, я хочу использовать NiFi для чтения записей из базы данных MS SQL и отправки этой записи в Kafka.

После успешной публикации в Kafka NiFi сохранит G_ID и UpdateTime в базе данных.

Как только сообщение попадет в Kafka, искровая потоковая передача будет читать сообщения из Kafka и сохранять в HBase с использованием существующей бизнес-логики.

При каждом запуске процессор Nifi должен кадрировать запрос внутреннего соединения, используя max(G_ID) и max(UpdateTime), чтобы получить дельта-записи и опубликовать их в Kafka.

Я новичок в Nifi / HDF. Мне нужна ваша помощь и руководство, чтобы реализовать это с помощью Nifi / HDF. Если у вас есть лучшее решение / архитектура для этого варианта использования, предложите.

Извините за такой длинный пост.


person nilesh1212    schedule 01.12.2017    source источник


Ответы (1)


Вы описываете то, что JDBC Kafka Connect Connector делает из коробки. Настройте файл конфигурации, загрузите его, готово. Выполнено. Kafka Connect является частью Apache Kafka. Нет необходимости в дополнительных инструментах и ​​технологиях.

Вы также можете подумать о правильном отслеживании изменений данных (CDC). Для проприетарных СУБД (Oracle, DB2, MS SQL и т. Д.) У вас есть коммерческие инструменты, такие как GoldenGate, Attunity, DBVisit и т. Д. Для СУБД с открытым исходным кодом (например, MySQL, PostgreSQL) вам следует обратиться к инструменту Debezium с открытым исходным кодом. Все эти инструменты CDC интегрируются напрямую с Kafka.

person Robin Moffatt    schedule 01.12.2017
comment
Спасибо за быстрый ответ, мне нужно решение с использованием Nifi / HDF для этого варианта использования. - person nilesh1212; 01.12.2017
comment
Если у кого-то есть решение с помощью NiFi, пожалуйста, помогите мне в этом. - person nilesh1212; 08.12.2017
comment
@daggett, пожалуйста, помогите мне в этом - person nilesh1212; 08.12.2017
comment
FWIW, мы (команда Debezium) также планируем добавить поддержку SQL Server. Если вам интересно, проблема, которую необходимо отслеживать, - это DBZ-40. Любой вклад (или помощь) в этом очень приветствуется. - person Gunnar; 26.01.2018