Как загружать строки после определенного идентификатора и отслеживать новые строки с помощью Kafka JDBC Connector?

Я использую Kafka JDBC Connector для импорта данных из базы данных MySQL в тему Kafka. С приведенными ниже параметрами я могу отслеживать новые строки, вставленные в данную таблицу.

name=test
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10

connection.url=jdbc:mysql://localhost:3306/test?user=root&password=asdf
table.whitelist=test_table

mode=incrementing
incrementing.column.name=id

topic.prefix=test-

Мне нужно загрузить все записи после определенного идентификатора из таблицы, а также отслеживать все новые вставленные записи. Как я могу этого добиться? Одним из решений может быть собственный запрос с фильтрацией, но я не уверен в запросе.


person korujzade    schedule 24.02.2017    source источник


Ответы (3)


Пользовательский запрос, вероятно, должен быть «выбрать * из таблицы, где id> X», где X - это конкретный идентификатор, который вы упомянули.

person Gwen Shapira    schedule 25.02.2017
comment
Просто обратите внимание, что если вы пойдете по этому пути, вы, вероятно, не сможете использовать инкрементный режим: docs.confluent.io/3.1.2/connect/connect-jdbc/docs/ - person dawsaw; 25.02.2017
comment
Документация говорит, что в качестве альтернативы указанный запрос может сам обрабатывать фильтрацию новых обновлений; однако обратите внимание, что отслеживание смещений выполняться не будет, поэтому запрос должен отслеживать смещения сам. Тогда как мы можем сделать отслеживание смещений в самом запросе? - person korujzade; 27.02.2017

Ну я не делал этого раньше. Но я все еще думаю, что это выполнимо, что, очевидно, потребует некоторого изменения кода. В методе JdbcSourceTask.start смещения загружаются с использованием приведенного ниже кода.

offsets = context.offsetStorageReader().offsets(partitions);

Здесь вы можете определить свое собственное смещение. Однако есть одна проблема. Это смещение будет загружаться каждый раз при перезапуске вашего коннектора вместо того, которое сохранено в теме. Кроме того, чтобы решить эту проблему, вы можете определить пользовательскую конфигурацию, как показано ниже.

 connector.firsttime=true

Затем то же самое можно использовать в методе запуска, как показано ниже:

String strIsFirstTime
    = config.getString(JdbcSourceTaskConfig.FIRST_TIME_CONFIG);
if("true".equals(strIsFirstTime)){
//load custom offset
//lStartingPosition is the value at which you want to start the processing.
Long lStartingPosition=Long.MAX_VALUE;
//partition is the relevant partiton of the table in question.
  offsets.put(partition, new TimestampIncrementingOffset(null,lStartingPosition).toMap()); 
}
else{
offsets = context.offsetStorageReader().offsets(partitions);
}

Однако не забудьте установить для этой настраиваемой конфигурации значение false при каждом перезапуске этого соединителя.

Дайте мне знать, если это работает.

person Manish Bansal    schedule 22.03.2017

Другой способ сделать это — создать представление для вашего пользовательского запроса и использовать фильтр в предикате.

create or replace view xyz as select * from table where id > X;


name=test
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10

connection.url=jdbc:mysql://localhost:3306/test?user=root&password=asdf
table.whitelist=test_table

mode=incrementing
incrementing.column.name=id

topic.prefix=test-

poll.interval.ms : 300000,
query: "select id from xyz"
person singhprad    schedule 15.08.2017