Выполнить задание потока данных из App Engine

Я относительно новичок в технологии GCP. В настоящее время я занимаюсь POC для создания запланированного задания потока данных, которое загружает (вставляет) данные из облачного хранилища Google в BigQuery. Прочитав несколько руководств и документации, я пришел к следующему:

  1. Сначала я создаю задание потока данных, которое читает файл avro и загружает его в BigQuery. Этот поток данных был протестирован и хорошо зарекомендовал себя.

    (self.pipeline
         | output_table + ': read table ' >> ReadFromAvro(storage_input_path)
         | output_table + ': filter columns' >> beam.Map(self.__filter_columns, columns=columns)
         | output_table + ': write to BigQuery' >> beam.Write(
            beam.io.BigQuerySink(output_table,               
       create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,                               
       write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)))
    
  2. Чтобы создать запланированное задание, я затем создал простую веб-службу, как показано ниже:

    import logging
    from flask import Flask
    from common.tableLoader import TableLoader
    from ingestion import IngestionToBigQuery
    from common.configReader import ConfigReader
    app = Flask(__name__)
    @app.route('/')
    def hello():
         """Return a friendly HTTP greeting."""
        logging.getLogger().setLevel(logging.INFO)
        config = ConfigReader('columbus-config')  # TODO read from args
        tables = TableLoader('experience')
        ingestor = IngestionToBigQuery(config.configuration, tables.list_of_tables)
        ingestor.ingest_table()
        return 'Hello World!'```
    
  3. Я также создал app.yaml:

     runtime: python
     env: flex
     entrypoint: gunicorn -b :$PORT recsys_data_pipeline.main:app
     threadsafe: yes
     runtime_config:
        python_version: 2
        resources:
        memory_gb: 2.0
    

Затем я развернул его с помощью этой команды gcloud app deploy, но получил следующие ошибки:

default[20170417t173837]  ERROR:root:The gcloud tool was not found.
default[20170417t173837]  Traceback (most recent call last):    
File "/env/local/lib/python2.7/site-packages/apache_beam/internal/gcp/auth.py", line 109, in _refresh      ['gcloud', 'auth', 'print-access-token'], stdout=processes.PIPE)    
File "/env/local/lib/python2.7/site-packages/apache_beam/utils/processes.py", line 52, in Popen      return subprocess.Popen(*args, **kwargs)    
File "/usr/lib/python2.7/subprocess.py", line 710, in __init__      errread, errwrite)    File "/usr/lib/python2.7/subprocess.py", line 1335, in _execute_child      raise child_exception  OSError: [Errno 2] No such file or directory

Из сообщения выше я обнаружил, что ошибка исходила от apache_beam auth.py class, в частности, от следующей функции:

def _refresh(self, http_request):
   """Gets an access token using the gcloud client."""
   try:
     gcloud_process = processes.Popen(['gcloud', 'auth', 'print-access-token'], stdout=processes.PIPE)
   except OSError as exn:
     logging.error('The gcloud tool was not found.', exc_info=True)
     raise AuthenticationException('The gcloud tool was not found: %s' % exn)
  output, _ = gcloud_process.communicate()
  self.access_token = output.strip()

который был вызван, когда учетные данные (service_acount_name и service_acount_key не указаны:

if google_cloud_options.service_account_name:
      if not google_cloud_options.service_account_key_file:
        raise AuthenticationException(
            'key file not provided for service account.')
      if not os.path.exists(google_cloud_options.service_account_key_file):
        raise AuthenticationException(
            'Specified service account key file does not exist.')

else:
      try:
        credentials = _GCloudWrapperCredentials(user_agent)
        # Check if we are able to get an access token. If not fallback to
        # application default credentials.
        credentials.get_access_token()
        return credentials

Итак, у меня есть два вопроса:

  1. Есть ли способ «прикрепить» учетные данные (service_acount_name и service_acount_key) где-нибудь в моем коде или в файле конфигурации (например: в app.yaml)?
  2. Каковы лучшие практики для запуска задания потока данных из движка приложений Google?

Большое спасибо, любые предложения и комментарии будут действительно полезны!




Ответы (1)


Взгляните на официальный пример этого на https://github.com/amygdala/gae-dataflow.

person jkff    schedule 18.04.2017
comment
привет @jkff, спасибо за ответ. Я попробовал выполнить шаг за шагом, приведенный в приведенной выше ссылке, но все равно получаю ту же ошибку при развертывании app.yaml. ERROR:root:The gcloud tool was not found. - person bohr; 19.04.2017
comment
Используете ли вы custom среду выполнения в своем app.yaml и есть ли у вас Dockerfile из примера Эми рядом с ней в каталоге? - person jkff; 20.04.2017
comment
Я использую среду выполнения custom в своем app.yaml. Я также создал Dockerfile, такой же, как в примере Эми. - person bohr; 20.04.2017
comment
@bohr - в приведенном выше примере вы используете runtime: python. Чтобы дважды проверить, изменили ли вы это значение на runtime: custom как сделано здесь < / а>? Вы должны увидеть вывод сборки dockerfile (включая установку gcloud) как часть процесса развертывания. - person Amy U.; 20.04.2017
comment
привет @ AmyU, наконец-то мне удалось отправить работу. Благодаря предоставленной ссылке. Я решил создать новый проект и начать перенимать ваш пример. Спасибо - person bohr; 21.04.2017