TL; DR
Я хочу предварительно загрузить набор данных в планировщик Dask Distributed при его запуске.
Фон
Я использую Dask для запросов в режиме реального времени с набором данных меньшего размера, чем объем памяти. Поскольку это в реальном времени, важно, чтобы рабочие могли доверять тому, что у планировщика всегда есть определенные наборы данных, даже сразу после запуска. Рабочие всегда хранят в памяти весь набор данных.
Традиционно я делал это, подключая клиента, рассеивая df и публикуя набор данных:
df = dd.read_parquet('df.parq')
df = client.persist(df)
client.publish_dataset(flights=dfa)
Но это оставляет возможность перезапуска планировщика и того, что набор данных не будет загружен.
Я знаю, что вы можете использовать --preload
для выполнения скрипта при запуске, например:
dask-scheduler --preload=scheduler-startup.py
И что код котельной плиты выглядит так:
from distributed.diagnostics.plugin import SchedulerPlugin
class MyPlugin(SchedulerPlugin):
def add_worker(self, scheduler=None, worker=None, **kwargs):
print("Added a new worker at", worker)
def dask_setup(scheduler):
plugin = MyPlugin()
scheduler.add_plugin(plugin)
Но как я могу убедить планировщик загрузить мой набор данных без использования внешнего клиента?
Теоретически я мог бы отказаться от подпроцесса, который запускает предварительно заполненного клиента, но он кажется не совсем идеальным :)
Обычный клиент при запуске планировщика
Попытка подключиться в качестве клиента при запуске планировщика:
from distributed.diagnostics.plugin import SchedulerPlugin
from dask.distributed import Client
class MyPlugin(SchedulerPlugin):
def add_worker(self, scheduler=None, worker=None, **kwargs):
print("Added a new worker at", worker)
def dask_setup(scheduler):
c = Client(scheduler.address)
df = dd.read_parquet('df.parq')
df = c.persist(df)
c.publish_dataset(flights=dfa)
Зависает на c = Client(scheduler.address)
и требует принудительного отключения (kill -9
)