Трансляция Dask недоступна во время вычисления графа

Я экспериментирую с Dask и хочу отправить поиск pandas.DataFrame на все рабочие узлы. К сожалению, это не удается:

TypeError: ("'Future' object is not subscriptable", 'occurred at index 0')

Когда вместо lookup['baz'].iloc[2] используется lookup.result()['foo'].iloc[2], он работает нормально, но: для больших экземпляров входного фрейма данных кажется, что он снова и снова застревает на from_pandas. Кроме того, кажется странным, что будущее нужно блокировать вручную (снова и снова для каждой строки в операции применения. Есть ли способ заблокировать будущее только один раз для каждого рабочего узла? Наивным улучшением могло бы быть использование map_partitions, но это возможно только в том случае, если количество разделов довольно мало.

import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client

client = Client()

df_first = pd.DataFrame({'foo':[1,2,3]})
df_second = pd.DataFrame({'bar':[1,2,3], 'baz':[1,2,3]})

df_first_scattered = client.scatter(df_first, broadcast=True)
df_second_dask = dd.from_pandas(df_second, npartitions=2)


def foo(row, lookup):
    # TODO some computation which relies on the lookup
    return lookup['foo'].iloc[2]

df_second_dask['foo'] = df_second_dask.apply(lambda x: foo(x, df_first_scattered), axis = 1, meta=('baz', 'int64'))
df_second_dask = df_second_dask.compute()
df_second_dask.head()

Фактически, эта наивная реализация dask кажется медленнее, чем простые панды для более крупных проблемных экземпляров. Я подозреваю, что низкая производительность связана с проблемой, поднятой выше.


person Georg Heiler    schedule 10.05.2019    source источник


Ответы (1)


Вместо этого:

df_second_dask['foo'] = df_second_dask.apply(lambda x: foo(x, df_first_scattered), axis = 1, meta=('baz', 'int64'))

Попробуйте вместо этого:

df_second_dask['foo'] = df_second_dask.apply(foo, args=[df_first_scattered], axis = 1, meta=('baz', 'int64'))

Раньше вы прятали будущее внутри лямбда-функции. Даску не удалось найти его, чтобы превратить в подходящую ценность. Вместо этого, когда мы передаем будущее в качестве правильного аргумента, Dask может определить его таким, какое оно есть, и дать вам правильное значение.

person MRocklin    schedule 11.05.2019