Как получить результаты задач, когда они завершены, а не после того, как все закончились в Dask?

У меня есть фреймворк dask, и я хочу вычислить некоторые независимые задачи. Некоторые задачи выполняются быстрее других, но я получаю результат каждой задачи после завершения более длительных задач.

Я создал локального клиента и использую client.compute() для отправки задач. Затем я использую future.result(), чтобы получить результат каждой задачи.

Я использую потоки, чтобы одновременно запрашивать результаты и измерять время вычисления каждого результата следующим образом:

def get_result(future,i):
    t0 = time.time()
    print("calculating result", i)
    result = future.result()
    print("result {} took {}".format(i, time.time() - t0))

client = Client()
df = dd.read_csv(path_to_csv)

future1 = client.compute(df[df.x > 200])
future2 = client.compute(df[df.x > 500])

threading.Thread(target=get_result, args=[future1,1]).start()
threading.Thread(target=get_result, args=[future2,2]).start()

Я ожидаю, что вывод приведенного выше кода будет примерно таким:

calculating result 1
calculating result 2
result 2 took 10
result 1 took 46

Так как первая задача крупнее.

Но вместо этого я получил оба одновременно

calculating result 1
calculating result 2
result 2 took 46.3046760559082
result 1 took 46.477620363235474

Я предполагаю, что это потому, что future2 фактически выполняет вычисления в фоновом режиме и завершается до future1, но ожидает завершения future1 для возврата.

Есть ли способ получить результат future2 на момент его завершения?


person Diego Rodriguez    schedule 05.04.2019    source источник


Ответы (1)


Вам не нужно заставлять потоки использовать фьючерсы асинхронным образом - они уже изначально асинхронны и отслеживают их статус в фоновом режиме. Если вы хотите получать результаты в том порядке, в котором они были готовы, вы должны использовать as_completed.

Однако для вашей конкретной ситуации вы можете просто просмотреть панель управления (или использовать df.visulalize()), чтобы понять происходящие вычисления. Оба фьючерса зависят от чтения CSV, и эта единственная задача потребуется, прежде чем любая из них сможет работать - и, вероятно, занимает большую часть времени. Без сканирования всех данных Даск не знает, какие строки имеют какое значение x.

person mdurant    schedule 05.04.2019