Установка kubectl на рабочих потоках dataflowrunner

Я хочу загрузить данные в свой кластер elasticsearch, работающий на kubernetes. Данные, которые у меня есть, находятся в Bigquery, и я хочу использовать поток данных (python) для загрузки данных. Версия apache-beam для python, похоже, не имеет эластичного поискового приемника. Я написал свой собственный модуль записи elasticsearch в потоке данных, но мне нужно перенаправить мой порт elasticsearch из кластера kubernetes. Поэтому мне нужно установить google-cloud-sdk и kubectl, чтобы я мог перенаправить порт, записать свои данные и потом закрыть его. Мой код работает нормально, когда я запускаю задание локально, но я не могу установить google-cloud-sdk и kubectl на рабочих.

Мой код работает нормально, когда я запускаю задание локально, но я не могу установить google-cloud-sdk и kubectl на рабочих.

это команды, которые вызываются в setup.py в subprocces.Popen

['export', 'CLOUD_SDK_REPO="cloud-sdk-$(lsb_release', '-c', '-s)"'],
['echo', '"deb', 'https://packages.cloud.google.com/apt', '$CLOUD_SDK_REPO', 'main"', '|', 'sudo', 'tee', '-a', '/etc/apt/sources.list.d/google-cloud-sdk.list'],
['sudo', 'rm', '/etc/apt/sources.list.d/partner.list'],
['sudo', 'apt-get', 'install', 'google-cloud-sdk', 'kubectl']

это мой метод переадресации портов службы elasticsearch в start_bundle

def _open_connection(self):
    tries = 0
    connected = False
    while tries <= 3 and not connected:
        tries += 1
        try:
            res = requests.get('http://{0}:{1}'.format(self.host, self.port))
            connected = (res.status_code == 200)
        except Exception as e:
            logging.warning(e)
            subprocess.check_call('gcloud container clusters get-credentials {0}'.format(ES_CLUSTER_NAME), shell=True)
            try:
                subprocess.check_call('kubectl version', shell=True)
            except exception as ee:
                logging.warning(ee)
                subprocess.check_call('gcloud components install kubectl', shell=True)
            subprocess.call('kubectl port-forward elasticsearch-0 {0}:{0} & disown'.format(self.port), shell=True)
            time.sleep(3)
    return connected

Я ожидаю, что эти команды (я пробовал варианты) установят необходимые пакеты на каждый рабочий процесс, но установка продолжает давать сбой.


person Georges Lorré    schedule 07.01.2019    source источник


Ответы (1)


Я исправил эту проблему, пропустив переадресацию портов и вместо этого реализовав внутренний балансировщик нагрузки на моем порту elasticsearch. Таким образом, мои рабочие потоки данных могут напрямую подключаться к внутреннему IP-адресу для записи данных.

person Georges Lorré    schedule 13.02.2019