Я столкнулся с очень своеобразной и недокументированной проблемой с управляемой виртуальной машиной GAE и очередями задач. Я понимаю, что служба управляемых виртуальных машин находится в стадии бета-тестирования, поэтому этот вопрос, возможно, не будет актуален всегда, но сейчас он определенно вызывает у меня много головной боли.
Основным признаком проблемы является то, что при определенных (мне не совсем известных) обстоятельствах я вижу следующую ошибку/трассировку:
File "/home/vmagent/my_app/some_file.py", line 265, in some_ndb_tasklet
res = yield some_task.add_async('some-task-queue-name')
File "/home/vmagent/python_vm_runtime/google/appengine/ext/ndb/tasklets.py", line 472, in _on_rpc_completion
result = rpc.get_result()
File "/home/vmagent/python_vm_runtime/google/appengine/api/apiproxy_stub_map.py", line 613, in get_result
return self.__get_result_hook(self)
File "/home/vmagent/python_vm_runtime/google/appengine/api/taskqueue/taskqueue.py", line 1948, in ResultHook
rpc.check_success()
File "/home/vmagent/python_vm_runtime/google/appengine/api/apiproxy_stub_map.py", line 579, in check_success
self.__rpc.CheckSuccess()
File "/home/vmagent/python_vm_runtime/google/appengine/ext/vmruntime/vmstub.py", line 312, in _WaitImpl
raise self._ErrorException(*_DEFAULT_EXCEPTION)
RPCFailedError: The remote RPC to the application server failed for call taskqueue.BulkAdd().
Я просмотрел свой локальный SDK App Engine, чтобы отследить это, и я могу добраться до последней строки трассировки, но google/appengine/ext/vmruntime/
вообще не существует на моей машине, поэтому я понятия не имею, что происходит в vmstub.py
. Глядя на локальный код, some_task.add_async('the-queue')
запускает RPC и ждет его завершения, но эта ошибка не то, что ожидает except apiproxy_errors.ApplicationError, e:
в строке 1949 taskqueue.py...
Код, выдающий ошибку, выглядит примерно так:
@ndb.tasklet
def kickoff_tasks(batch_of_payloads):
for task_payload in batch_of_payloads:
# task_payload is a dict
task = taskqueue.Task(
url='/the/handler/url',
params=payload)
res = yield task.add_async('some-valid-task-queue-name')
Другие вещи, которые стоит отметить:
- сам этот код выполняется в обработчике задач, запущенном другой задачей.
- Я впервые увидел эту ошибку до того, как реализовал пакетную обработку, и предположил, что проблема в том, что я добавил слишком много задач из обработчика задач.
- В некоторых случаях я могу успешно выполнить это с размером пакета 100, но в других он постоянно терпит неудачу (в зависимости от данных в полезной нагрузке) на 100, а иногда успешно на размере пакета 50.
- Сами полезные нагрузки задачи включают в себя пакеты элементов и настроены так, чтобы быть достаточно маленькими, чтобы соответствовать задаче. App Engine объявляет максимальный размер задачи 100 КБ, поэтому сейчас я ограничиваю размер полезной нагрузки до 90 000 байт. Уменьшение размера еще больше, похоже, не помогает.
- Я также пытался реализовать экспоненциальную отсрочку, чтобы повторить метод
kickoff_tasks
при появлении этой ошибки, но кажется, что после возникновения ошибки я вообще не могу добавлять какие-либо другие задачи из того же обработчика (т.е. я не могу запустить задачу «продолжить с того места, где вы остановились», мне просто нужно позволить этой задаче потерпеть неудачу и перезапустить ее)
Итак, мой вопрос: что на самом деле вызывает эту ошибку? Как я могу избежать этого или исправить это, чтобы я правильно справлялся с этим?