У меня есть несколько таблиц в MS SQL, эти таблицы обновляются каждую секунду и запрос более или менее выглядит так
SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID
WHERE table2.UpdateTime >= ${lastUpdateTime} AND table2.G_ID > ${lastID}
Предположим, что запрос выбора внутреннего соединения дает 5 записей, как показано ниже.
Если запрос выполняется впервые, ${lastUpdateTime}
и ${lastG_ID}
установлен в 0, и он вернет менее 5 записей. После обработки записей запрос сохранит max(G_ID)
, то есть 5 и max(UpdateTime)
, то есть 1512010479, в etl_stat
таблице.
G_ID UpdateTime ID Name T_NAME
-------------------------------------------------------------------
1 1512010470 12591225 DUMMY_DATA DUMMY_ID
2 1512096873 12591538 DUMMY_DATA DUMMY_ID
3 1512096875 12591539 DUMMY_DATA DUMMY_ID
4 1512010477 12591226 DUMMY_DATA DUMMY_ID
5 1512010479 12591227 DUMMY_DATA DUMMY_ID
если таблица добавляет еще 5 новых записей, как показано ниже:
G_ID UpdateTime ID Name T_NAME
-------------------------------------------------------------------
1 1512010470 12591225 DUMMY_DATA DUMMY_ID
2 1512096873 12591538 DUMMY_DATA DUMMY_ID
3 1512096875 12591539 DUMMY_DATA DUMMY_ID
4 1512010477 12591226 DUMMY_DATA DUMMY_ID
5 1512010479 12591227 DUMMY_DATA DUMMY_ID
6 1512010480 12591230 DUMMY_DATA DUMMY_ID
7 1512010485 12591231 DUMMY_DATA DUMMY_ID
8 1512010490 12591232 DUMMY_DATA DUMMY_ID
9 1512010493 12591233 DUMMY_DATA DUMMY_ID
10 1512010500 12591234 DUMMY_DATA DUMMY_ID
Запрос сначала прочитает max(G_ID)
и max(UpdateTime)
из etl_stat table
и будет кадрировать запрос следующим образом SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID WHERE table2.UpdateTime >= 1512010479 AND table2.G_ID > 5
, так что запрос вернет только 5 дельта-записей, как показано ниже.
G_ID UpdateTime ID Name T_NAME
-------------------------------------------------------------------
6 1512010480 12591230 DUMMY_DATA DUMMY_ID
7 1512010485 12591231 DUMMY_DATA DUMMY_ID
8 1512010490 12591232 DUMMY_DATA DUMMY_ID
9 1512010493 12591233 DUMMY_DATA DUMMY_ID
10 1512010500 12591234 DUMMY_DATA DUMMY_ID
Таким образом, каждый раз при запуске запроса он должен сначала прочитать max(G_ID)
и max(UpdateTime)
из etl_stat
таблицы и сформировать запрос выбора внутреннего соединения, как показано выше, и получить изменения дельты.
КАК ЕСТЬ АРХИТЕКТУРА С ИСПОЛЬЗОВАНИЕМ SPARK SQL
Я реализовал приведенный выше вариант использования следующим образом:
1) Spark JDBC читает таблицу Phoenix, чтобы получить max(G_ID)
и max(UpdateTime)
из таблицы etl_stat
.
2) Spark JDBC кадрирует запрос выбора внутреннего соединения следующим образом SELECT G_ID,UpdateTime,ID,Name,T_NAME FROM TABLE1 AS table1 INNER JOIN TABLE2 AS table2 ON table1.IP=table2.ID WHERE table2.UpdateTime >= 1512010479 AND table2.G_ID > 5
3) Spark JDBC выполняет шаг 2 запроса внутреннего соединения, считывает дельта-сообщения с сервера MS SQL, обрабатывает записи и вставляет в HBase.
4) После успешной вставки в HBase Spark обновляет таблицу etl_stat
с последними G_ID
, то есть 10 и UpdateTime
, то есть 1512010500.
5) Это задание было запланировано для запуска cron каждые 1 минуту.
БЫТЬ АРХИТЕКТУРОЙ, ИСПОЛЬЗУЯ NIFI
Я хочу переместить этот вариант использования в Nifi, я хочу использовать NiFi для чтения записей из базы данных MS SQL и отправки этой записи в Kafka.
После успешной публикации в Kafka NiFi сохранит G_ID и UpdateTime в базе данных.
Как только сообщение попадет в Kafka, искровая потоковая передача будет читать сообщения из Kafka и сохранять в HBase с использованием существующей бизнес-логики.
При каждом запуске процессор Nifi должен кадрировать запрос внутреннего соединения, используя max(G_ID)
и max(UpdateTime)
, чтобы получить дельта-записи и опубликовать их в Kafka.
Я новичок в Nifi / HDF. Мне нужна ваша помощь и руководство, чтобы реализовать это с помощью Nifi / HDF. Если у вас есть лучшее решение / архитектура для этого варианта использования, предложите.
Извините за такой длинный пост.