Ошибка при экспорте кадра данных dask в csv

В моем кадре данных dask около 120 мм строк и 4 столбца:

df_final.dtypes

cust_id        int64
score            float64
total_qty        float64
update_score    float64
dtype: object

и я делаю эту операцию на ноутбуках jupyter, подключенных к Linux-машине:

%time df_final.to_csv('/path/claritin-files-*.csv')

и выдает эту ошибку:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-24-46468ae45023> in <module>()
----> 1 get_ipython().magic(u"time df_final.to_csv('path/claritin-files-*.csv')")

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in magic(self, arg_s)
   2334         magic_name, _, magic_arg_s = arg_s.partition(' ')
   2335         magic_name = magic_name.lstrip(prefilter.ESC_MAGIC)
-> 2336         return self.run_line_magic(magic_name, magic_arg_s)
   2337 
   2338     #-------------------------------------------------------------------------

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in run_line_magic(self, magic_name, line)
   2255                 kwargs['local_ns'] = sys._getframe(stack_depth).f_locals
   2256             with self.builtin_trap:
-> 2257                 result = fn(*args,**kwargs)
   2258             return result
   2259 

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/magics/execution.pyc in time(self, line, cell, local_ns)

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/magic.pyc in <lambda>(f, *a, **k)
    191     **# but it's overkill for just that one bit of state.**
    192     def magic_deco(arg):
--> 193         call = lambda f, *a, **k: f(*a, **k)
    194 
    195         if callable(arg):

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/magics/execution.pyc in time(self, line, cell, local_ns)
   1161         if mode=='eval':
   1162             st = clock2()
-> 1163             out = eval(code, glob, local_ns)
   1164             end = clock2()
   1165         else:

<timed eval> in <module>()

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/dataframe/core.pyc in to_csv(self, filename, **kwargs)
    936         """ See dd.to_csv docstring for more information """
    937         from .io import to_csv
--> 938         return to_csv(self, filename, **kwargs)
    939 
    940     def to_delayed(self):

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/dataframe/io/csv.pyc in to_csv(df, filename, name_function, compression, compute, get, **kwargs)
    411     if compute:
    412         from dask import compute
--> 413         compute(*values, get=get)
    414     else:
    415         return values

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/base.pyc in compute(*args, **kwargs)
    177         dsk = merge(var.dask for var in variables)
    178     keys = [var._keys() for var in variables]
--> 179     results = get(dsk, keys, **kwargs)
    180 
    181     results_iter = iter(results)

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/threaded.pyc in get(dsk, result, cache, num_workers, **kwargs)
     74     results = get_async(pool.apply_async, len(pool._pool), dsk, result,
     75                         cache=cache, get_id=_thread_get_id,
---> 76                         **kwargs)
     77 
     78     # Cleanup pools associated to dead threads

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/async.pyc in get_async(apply_async, num_workers, dsk, result, cache, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, dumps, loads, **kwargs)
    491                     _execute_task(task, data)  # Re-execute locally
    492                 else:
--> 493                     raise(remote_exception(res, tb))
    494             state['cache'][key] = res
    495             finish_task(dsk, key, state, results, keyorder.get)

**ValueError: invalid literal for long() with base 10: 'total_qty'**

Traceback
---------
  File "/home/mspra/anaconda2/lib/python2.7/site-packages/dask/async.py", line 268, in execute_task
    result = _execute_task(task, data)
  File "/home/mspra/anaconda2/lib/python2.7/site-packages/dask/async.py", line 249, in _execute_task
    return func(*args2)
  File "/home/mspra/anaconda2/lib/python2.7/site-packages/dask/dataframe/io/csv.py", line 55, in pandas_read_text
    coerce_dtypes(df, dtypes)
  File "/home/mspra/anaconda2/lib/python2.7/site-packages/dask/dataframe/io/csv.py", line 83, in coerce_dtypes
    df[c] = df[c].astype(dtypes[c])
  File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/generic.py", line 3054, in astype
    raise_on_error=raise_on_error, **kwargs)
  File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 3189, in astype
    return self.apply('astype', dtype=dtype, **kwargs)
  File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 3056, in apply
    applied = getattr(b, f)(**kwargs)
  File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 461, in astype
    values=values, **kwargs)
  File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 504, in _astype
    values = _astype_nansafe(values.ravel(), dtype, copy=True)
  File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/types/cast.py", line 534, in _astype_nansafe
    return lib.astype_intsafe(arr.ravel(), dtype).reshape(arr.shape)
  File "pandas/lib.pyx", line 980, in pandas.lib.astype_intsafe (pandas/lib.c:17409)
  File "pandas/src/util.pxd", line 93, in util.set_value_at_unsafe (pandas/lib.c:72777)

У меня есть пара вопросов:

1) Прежде всего, этот экспорт работал нормально в пятницу, он выдал 100 файлов csv (так как он имеет 100 разделов), которые я позже агрегировал. Итак, что сегодня не так — что-нибудь из журнала ошибок?

2) Может быть, это вопрос к создателям этого пакета, каков наиболее эффективный по времени способ получить извлечение csv из кадра данных dask такого размера, поскольку в последний раз это занимало от 1,5 до 2 часов. работал.

Я не использую распределенный dask, и это на одном ядре кластера Linux.


person ML_Passion    schedule 21.02.2017    source источник


Ответы (1)


Эта ошибка, вероятно, имеет мало общего с to_csv и больше связана с чем-то еще в ваших вычислениях. Вызов df.to_csv был первым разом, когда вы заставили вычисление прокрутить все данные.

Учитывая ошибку, я действительно подозреваю, что это не работает в read_csv. Dask.dataframe прочитал первые несколько сотен килобайт вашего первого файла, чтобы угадать типы данных, но, похоже, угадал неправильно. Возможно, вы захотите попробовать явно указать dtypes в вызове read_csv.

Что касается второго вопроса о быстрой записи в CSV, мой первый ответ будет «использовать вместо этого Parquet или HDF5». Они намного быстрее и точнее почти во всех отношениях.

person MRocklin    schedule 21.02.2017
comment
Спасибо !!, да, я подозревал это раньше, так как я читаю кадр данных из формата csv. Не уверен, почему он не читает его должным образом. Что касается вашего предложения по 2-му вопросу, это для чтения и записи в формате паркета (я знаком с паркетом). - person ML_Passion; 22.02.2017
comment
Распространенной причиной является то, что в целочисленном столбце есть некоторые пропущенные значения, поэтому pandas решает, что ему нужно использовать float на полпути. Я не понимаю вашего комментария по поводу паркета. - person MRocklin; 22.02.2017
comment
Я имел в виду, когда вы сказали использовать паркет или HDF5, вы имели в виду чтение файлов паркета для преобразования в кадры данных dask, а затем запись в формат паркета вместо формата csv? можно ли экспортировать файлы csv быстрее (мой фрейм данных составляет 130 мм x 4 столбца), если я использую dask, распределенный по кластеру машин? - person ML_Passion; 22.02.2017
comment
Другая проблема заключается в том, что пакет fastparquet плохо работает с Python 2.7.11. Я прокомментировал журнал проблем для fastparquet, и Мартин Дюрант сказал, что выпустит новую версию пакета. - person ML_Passion; 22.02.2017