asyncio yield from concurrent.futures.Future исполнителя

У меня есть функция long_task, которая выполняет тяжелые вычисления с привязкой к процессору, и я хочу сделать ее асинхронной, используя новую структуру asyncio. Результирующая функция long_task_async использует ProcessPoolExecutor для передачи работы другому процессу, чтобы не ограничиваться GIL.

Проблема в том, что по какой-то причине экземпляр concurrent.futures.Future возвращается из ProcessPoolExecutor.submit при возврате из возвращает TypeError. Это по дизайну? Эти фьючерсы несовместимы с классом asyncio.Future? Что может быть обходным путем?

Я также заметил, что генераторы не поддаются выборке, поэтому отправка курутины в ProcessPoolExecutor не удастся. Есть ли какое-нибудь чистое решение для этого?

import asyncio
from concurrent.futures import ProcessPoolExecutor

@asyncio.coroutine
def long_task():
    yield from asyncio.sleep(4)
    return "completed"

@asyncio.coroutine
def long_task_async():
    with ProcessPoolExecutor(1) as ex:
        return (yield from ex.submit(long_task)) #TypeError: 'Future' object is not iterable
                                                 # long_task is a generator, can't be pickled

loop = asyncio.get_event_loop()

@asyncio.coroutine
def main():
    n = yield from long_task_async()
    print( n )

loop.run_until_complete(main())

person krdx    schedule 17.03.2014    source источник


Ответы (2)


Вы хотите использовать loop.run_in_executor, который использует исполнитель concurrent.futures, но сопоставляет возвращаемое значение с asyncio будущее.

Первоначальный asyncio PEP предполагает, что concurrent.futures.Future может когда-нибудь развить метод __iter__, чтобы его можно было использовать с yield from, но на данный момент библиотека спроектирована так, чтобы требовать только поддержку yield from и ничего больше. (Иначе какой-то код не работал бы в версии 3.3.)

person Eevee    schedule 17.03.2014

Мы можем обернуть concurrent.futures.Future в asyncio.future, вызвав asyncio.wrap_future(Future). Я попробовал это с кодом ниже. Работает отлично

from asyncio import coroutine
import asyncio
from concurrent import futures


def do_something():
    ls = []
    for i in range(1, 1000000):
        if i % 133333 == 0:
            ls.append(i)
    return ls


@coroutine
def method():
    with futures.ProcessPoolExecutor(max_workers=10) as executor:
        job = executor.submit(do_something)
        return (yield from asyncio.wrap_future(job))

@coroutine
def call_method():
    result = yield from method()
    print(result)


def main():
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(call_method())
    finally:
        loop.close()


if __name__ == '__main__':
    main()
person Nihal Sharma    schedule 25.07.2015