Второй и третий распределенные рабочие коннекторы Kafka не работают правильно

С кластером Kafka из 3 и кластером Zookeeper того же самого я поднял один узел распределенного коннектора. Этот узел успешно справился с одной задачей. Затем я поднял второй соединитель, он, похоже, работал, поскольку часть кода в задаче определенно выполнялась. Однако тогда, похоже, он не оставался в живых (хотя без ошибок, не оставшихся в живых наблюдалось отсутствие ожидаемой активности, в то время как первый соединитель продолжал работать правильно). Когда я вызываю URL http://localhost:8083/connectors/mqtt/tasks на каждом узле соединителя, он сообщает мне, что у соединителя одна задача. Я ожидал, что это будут две задачи, по одной для каждого узла / рабочего. (В настоящее время в рабочей конфигурации указано tasks.max = 1, но я также пробовал установить его на 3.

Когда я пытаюсь поднять третий разъем, я получаю сообщение об ошибке:

"POST /connectors HTTP/1.1" 500 90  5 
(org.apache.kafka.connect.runtime.rest.RestServer:60)

ERROR IO error forwarding REST request: 
(org.apache.kafka.connect.runtime.rest.RestServer:241) 
java.net.ConnectException: Connection refused

Попытка снова вызвать метод POST коннектора из оболочки возвращает ошибку:

 {"error_code":500,"message":"IO Error trying to forward REST request:
 Connection refused"}

Я также пробовал обновиться до Apache Kafka 0.10.1.1, выпущенного сегодня. Я все еще вижу проблемы. Каждый соединитель работает в изолированных контейнерах Docker, определенных одним образом. Они должны быть идентичными.

Проблема может заключаться в том, что я пытаюсь запустить POST-запрос к http://localhost:8083/connectors для каждого рабочего, когда мне нужно запустить его только один раз для одного рабочего, а затем задачи для этого коннектора будут автоматически распространяться среди других рабочих. Если это так, как мне заставить задачи распределяться? В настоящее время у меня установлено максимальное значение три, но, похоже, только один работает на одном воркере.

Обновлять

В конечном итоге у меня все заработало, используя, по сути, тот же подход, который предложил Юрий. Я дал каждому рабочему уникальный идентификатор группы, а затем дал каждой задаче соединителя одно и то же имя. Это позволило трем соединителям и их отдельным задачам совместно использовать одно смещение, так что в случае соединителей приемника сообщения, которые они получали от Kafka, не дублировались. Они в основном работают как автономные соединители, так как рабочие имеют разные идентификаторы групп и, следовательно, не могут взаимодействовать друг с другом.

Если рабочие коннекторы имеют одинаковый идентификатор группы, вы не можете добавить более одного коннектора с одинаковым именем. Если вы дадите соединителям разные имена, они будут иметь разные смещения и принимать повторяющиеся сообщения. Если у вас есть три воркера в одной группе, один коннектор и три задачи, теоретически у вас будет идеальная ситуация, когда задачи разделяют смещение, а воркеры следят за тем, чтобы задачи всегда выполнялись и хорошо распределялись (каждая задача использует уникальный набор перегородок). На практике структура коннектора не создает более одной задачи, даже если для параметра tasks.max установлено значение 3, а при использовании тематических задач имеется 25 разделов.

Если кто-нибудь знает, почему я наблюдаю такое поведение, дайте мне знать.


person LaserJesus    schedule 21.12.2016    source источник


Ответы (2)


Я столкнулся с подобной проблемой в той же ситуации, что и ваша.

  1. Task.max настроен для темы, и распределенные работники автоматически решают, какие узлы обрабатывают тему. Итак, если у вас есть 3 воркера в кластере и в вашей конфигурации темы указано task.max = 2, то только 2 из 3 воркеров будут обрабатывать тему. Теоретически, если один из рабочих выйдет из строя, третий должен взять на себя нагрузку. Но..
  2. Распределенный коннектор оказался очень ненадежным: как только вы добавляете \ удаляете какие-то узлы, кластер ломался, и все воркеры ничего не делали, а пытались выбрать лидера и потерпели неудачу. Единственный способ исправить это - перезапустить весь кластер и желательно всех рабочих одновременно.

Я выбрал другой путь - я использовал автономного воркера, и он мне очень понравился, потому что распределение нагрузки реализовано на уровне клиента Kafka, и после того, как какой-то воркер упал, кластер автоматически перенастраивается, и клиенты подключаются к незанятым темам.

PS. Может быть, и вам это будет полезно. Конфлюентный соединитель не допускает недопустимой полезной нагрузки, которая не соответствует схеме темы. Как только коннектор получает некорректное сообщение, он молча умирает. Единственный способ узнать это - проанализировать метрики.

person Yuri Tceretian    schedule 22.12.2016

Отправляю ответ на старый вопрос, поскольку Kafka Connect сильно изменилась за три года.

В последней версии (2.3.1) есть инкрементальный ребалансировка, которая значительно улучшает поведение Kafka Connect.

Также стоит отметить, что при настройке Kafka Connect rest.advertised.host.name должен быть установлен правильно, так как в противном случае вы увидите ошибки, в том числе указанную

{"error_code":500,"message":"IO Error trying to forward REST request: Connection refused"}

См. этот пост для более подробной информации.

person Robin Moffatt    schedule 22.11.2019