С кластером Kafka из 3 и кластером Zookeeper того же самого я поднял один узел распределенного коннектора. Этот узел успешно справился с одной задачей. Затем я поднял второй соединитель, он, похоже, работал, поскольку часть кода в задаче определенно выполнялась. Однако тогда, похоже, он не оставался в живых (хотя без ошибок, не оставшихся в живых наблюдалось отсутствие ожидаемой активности, в то время как первый соединитель продолжал работать правильно). Когда я вызываю URL http://localhost:8083/connectors/mqtt/tasks
на каждом узле соединителя, он сообщает мне, что у соединителя одна задача. Я ожидал, что это будут две задачи, по одной для каждого узла / рабочего. (В настоящее время в рабочей конфигурации указано tasks.max = 1
, но я также пробовал установить его на 3.
Когда я пытаюсь поднять третий разъем, я получаю сообщение об ошибке:
"POST /connectors HTTP/1.1" 500 90 5
(org.apache.kafka.connect.runtime.rest.RestServer:60)
ERROR IO error forwarding REST request:
(org.apache.kafka.connect.runtime.rest.RestServer:241)
java.net.ConnectException: Connection refused
Попытка снова вызвать метод POST коннектора из оболочки возвращает ошибку:
{"error_code":500,"message":"IO Error trying to forward REST request:
Connection refused"}
Я также пробовал обновиться до Apache Kafka 0.10.1.1, выпущенного сегодня. Я все еще вижу проблемы. Каждый соединитель работает в изолированных контейнерах Docker, определенных одним образом. Они должны быть идентичными.
Проблема может заключаться в том, что я пытаюсь запустить POST-запрос к http://localhost:8083/connectors
для каждого рабочего, когда мне нужно запустить его только один раз для одного рабочего, а затем задачи для этого коннектора будут автоматически распространяться среди других рабочих. Если это так, как мне заставить задачи распределяться? В настоящее время у меня установлено максимальное значение три, но, похоже, только один работает на одном воркере.
Обновлять
В конечном итоге у меня все заработало, используя, по сути, тот же подход, который предложил Юрий. Я дал каждому рабочему уникальный идентификатор группы, а затем дал каждой задаче соединителя одно и то же имя. Это позволило трем соединителям и их отдельным задачам совместно использовать одно смещение, так что в случае соединителей приемника сообщения, которые они получали от Kafka, не дублировались. Они в основном работают как автономные соединители, так как рабочие имеют разные идентификаторы групп и, следовательно, не могут взаимодействовать друг с другом.
Если рабочие коннекторы имеют одинаковый идентификатор группы, вы не можете добавить более одного коннектора с одинаковым именем. Если вы дадите соединителям разные имена, они будут иметь разные смещения и принимать повторяющиеся сообщения. Если у вас есть три воркера в одной группе, один коннектор и три задачи, теоретически у вас будет идеальная ситуация, когда задачи разделяют смещение, а воркеры следят за тем, чтобы задачи всегда выполнялись и хорошо распределялись (каждая задача использует уникальный набор перегородок). На практике структура коннектора не создает более одной задачи, даже если для параметра tasks.max установлено значение 3, а при использовании тематических задач имеется 25 разделов.
Если кто-нибудь знает, почему я наблюдаю такое поведение, дайте мне знать.