Автоматическое добавление набора данных в планировщик Dask при запуске

TL; DR
Я хочу предварительно загрузить набор данных в планировщик Dask Distributed при его запуске.

Фон
Я использую Dask для запросов в режиме реального времени с набором данных меньшего размера, чем объем памяти. Поскольку это в реальном времени, важно, чтобы рабочие могли доверять тому, что у планировщика всегда есть определенные наборы данных, даже сразу после запуска. Рабочие всегда хранят в памяти весь набор данных.

Традиционно я делал это, подключая клиента, рассеивая df и публикуя набор данных:

df = dd.read_parquet('df.parq')
df = client.persist(df)
client.publish_dataset(flights=dfa)

Но это оставляет возможность перезапуска планировщика и того, что набор данных не будет загружен.

Я знаю, что вы можете использовать --preload для выполнения скрипта при запуске, например:

dask-scheduler --preload=scheduler-startup.py

И что код котельной плиты выглядит так:

from distributed.diagnostics.plugin import SchedulerPlugin

class MyPlugin(SchedulerPlugin):
    def add_worker(self, scheduler=None, worker=None, **kwargs):
        print("Added a new worker at", worker)

def dask_setup(scheduler):
    plugin = MyPlugin()
    scheduler.add_plugin(plugin)

Но как я могу убедить планировщик загрузить мой набор данных без использования внешнего клиента?

Теоретически я мог бы отказаться от подпроцесса, который запускает предварительно заполненного клиента, но он кажется не совсем идеальным :)

Обычный клиент при запуске планировщика
Попытка подключиться в качестве клиента при запуске планировщика:

from distributed.diagnostics.plugin import SchedulerPlugin
from dask.distributed import Client

class MyPlugin(SchedulerPlugin):
    def add_worker(self, scheduler=None, worker=None, **kwargs):
        print("Added a new worker at", worker)

def dask_setup(scheduler):
    c = Client(scheduler.address)
    df = dd.read_parquet('df.parq')
    df = c.persist(df)
    c.publish_dataset(flights=dfa)

Зависает на c = Client(scheduler.address) и требует принудительного отключения (kill -9)


person Niklas B    schedule 28.09.2017    source источник
comment
Что произойдет, если вы поместите клиентский код, который вы указали, в свой сценарий запуска?   -  person MRocklin    schedule 28.09.2017
comment
Он зависает на неопределенный срок (вероятно, в рекурсивном цикле, когда клиент пытается подключиться к планировщику, который еще не запущен). Зависший процесс не может быть нажат Ctrl-C, но должен быть убит -9   -  person Niklas B    schedule 28.09.2017


Ответы (2)


Вы можете добавить свой клиентский код в асинхронную функцию, которая запускается в цикле событий. Это позволит завершить сценарий предварительной загрузки, дать возможность планировщику запуститься, а затем запустить ваш клиентский код. Вам может понадобиться что-то вроде следующего:

async def f(scheduler):
    client =  await Client(scheduler.address)
    df = dd.read_parquet(...)
    await client.publish_dataset(flights=df)

def dask_setup(scheduler):
    scheduler.loop.add_callback(f, scheduler)
person MRocklin    schedule 29.09.2017

Ответ @ MRocklin направил меня на правильный путь, однако мне нужно было перейти в другой поток:

from concurrent.futures import ThreadPoolExecutor

def load_dataset():
    client = Client('127.0.0.1:8786')
    df = dd.read_parquet(...)
    df = client.persist(df)
    client.publish_dataset(flights=df)

async def f(scheduler):
    executor = ThreadPoolExecutor(max_workers=1)
    executor.submit(load_dataset)

def dask_setup(scheduler):
    scheduler.loop.add_callback(f, scheduler)

Обратной стороной является то, что он не мешает рабочим подключаться во время загрузки данных, но я думаю, что этим придется управлять на стороне рабочего (повторите попытку, если набор данных недоступен).

person Niklas B    schedule 02.10.2017