Сервер MySQL ушел во время работы Laravel Queue

Я пытаюсь импортировать данные из файла CSV в базу данных, используя очередь laravel. Эти файлы CSV огромны и содержат около 500 тыс. строк.

Я где-то узнал, что при использовании очереди laravel нам не нужно думать о времени ожидания соединения, но это не похоже на правду. Может быть, я был неправ.

Пожалуйста, проверьте мой код работы, если в этих методах что-то не так. Я использую "League\Csv" для чтения файла CSV.

public function __construct($data,$error_arr,$error_row_numbers) {  
       $this->data = $data;
       $this->error_arr = $error_arr;
       $this->error_row_numbers = $error_row_numbers;
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        $offset = $this->data['offset']; 
        $limit = $this->data['limit'];

        $filename = $this->data['file_name'];
        $service = new Service();
        $table = 'committees';
        $dbase = new Committee();

        //map_data have array about which csv column
        //should be inserted in which column of database
        $map_data = $this->data['map_data'];

    //get all columns name of a table
        $db_header_obj = new Committee();
        $db_header = $db_header_obj->getTableColumns();

        $csv_file_path = storage_path('app/files/committee/').$filename;
        if (!ini_get("auto_detect_line_endings")) {
            ini_set("auto_detect_line_endings", TRUE);
        }
        $csv = Reader::createFromPath($csv_file_path, 'r');

        $csv->setOutputBOM(Reader::BOM_UTF8);
        $csv->addStreamFilter('convert.iconv.ISO-8859-15/UTF-8');

            $csv->setHeaderOffset(0); 
            $csv_header = $csv->getHeader();    



            $rec_arr = array();
            $records = array();
            $records_arr = array();

                    $stmt = (new Statement())
                    ->offset($offset)
                    ->limit($limit)
                    ;

                    $records = $stmt->process($csv);

                    foreach ($records as $record) 
                    {
                        $rec_arr[] = array_values($record);
                    }

                    //trim index if the value of an array is empty
                    $records_arr = $service->trimArray($rec_arr);

                    if(count($records_arr)>0)
                    {
                        foreach($records_arr as $ck => $cv){


                            $committee_arr = array();
                            foreach ($map_data as $mk => $mv) {
                                if(isset($mv)){
                                    $data_type = $service->getDatabaseColumnType($table,$mv);
                                    //if data is one of datetime data type
                                    //then format the csv data to mysql datetime format
                                    if($data_type == 'date' || $data_type == 'datetime' || $data_type == 'timestamp'){
                                        $datetime =  (array)$cv[$mk];
                                        $dt = array_shift($datetime);
                                        $dt = date('Y-m-d h:i:s', strtotime($dt));
                                        $committee_arr[$mv] = $dt;
                                    }else{
                                        $committee_arr[$mv] = $cv[$mk];
                                    }  

                                }
                            }

                            $error_encountered = false;


                            DB::beginTransaction();

                            if(!empty($committee_arr['com_id'])){

                                try{
                                        $committee_row = Committee::updateOrCreate(
                                            ['com_id' => $committee_arr['com_id']],
                                            $committee_arr
                                        );
                                        if ($committee_row->wasRecentlyCreated === true) {
                                            $committee_row->created_by = $this->data['user_id'];
                                        }else{
                                            $committee_row->updated_by = $this->data['user_id'];
                                        }
                                        $committee_row->save();
                                    } catch (\Exception $e) {
                                        $error_encountered = true;
                                         $this->error_arr[] = $e->getMessage();
                                         $this->error_row_numbers[] = $this->data['row_value']; 
                                    }

                            }


                            DB::commit();
                            //just to keep track which row is currently processing
                            //so that user can be notified in which row of csv
                            //there is an error
                            $this->data['row_value'] = $this->data['row_value'] + 1;

                        }

                        //offset just to start fectch next chunk of data from csv
                        $this->data['offset'] = $offset + $limit;

            //Call to same job but with increased offset value
                        $committeeInsertJob = (new StoreCommittee($this->data,$this->error_arr,$this->error_row_numbers))->delay(Carbon::now()->addSeconds(3)); 
                        dispatch($committeeInsertJob);

                    }else{

                        //Store activity just to keep track of activity
                        $activity = new Activity();
                        $activity->url = $this->data['url'];
                        $activity->action = 'store';
                        $activity->description = $table;
                        $activity->user_id = $this->data['user_id'];
                        $activity->created_at = date('Y-m-d H:i:s');
                        $activity->save();

                        $arr_data = [
                                'filename' => $filename,
                                'user_name' => $this->data['user_name'],
                                'error' => $this->error_arr,
                                'error_row_numbers' => $this->error_row_numbers
                            ];
                        //Notify user that the job is complete
                        Mail::to($this->data['user_email'])->send(new CSVImportJobComplete($arr_data));

                    }



            if (!ini_get("auto_detect_line_endings")) {
                ini_set("auto_detect_line_endings", FALSE);
            }
        }

Ошибка: From (laravel.log внутри хранилища)

[2019-04-05 07:13:23] local.ERROR: PDOStatement::execute(): MySQL server has gone away (SQL: insert into `jobs` (`queue`, `attempts`, `reserved_at`, `available_at`, `created_at`, `payload`) values (default, 0, , 1554448406, 1554448403, ....................................................(long list)

Откуда: командный терминал

$ php artisan queue:work --tries=3
[2019-04-05 07:09:11][1] Processing: App\Jobs\StoreCommittee
[2019-04-05 07:09:33][1] Processed:  App\Jobs\StoreCommittee
[2019-04-05 07:09:36][2] Processing: App\Jobs\StoreCommittee
[2019-04-05 07:09:58][2] Processed:  App\Jobs\StoreCommittee
[2019-04-05 07:10:01][3] Processing: App\Jobs\StoreCommittee
[2019-04-05 07:10:23][3] Processed:  App\Jobs\StoreCommittee
[2019-04-05 07:10:26][4] Processing: App\Jobs\StoreCommittee
[2019-04-05 07:10:48][4] Processed:  App\Jobs\StoreCommittee
[2019-04-05 07:10:51][5] Processing: App\Jobs\StoreCommittee
[2019-04-05 07:11:13][5] Processed:  App\Jobs\StoreCommittee
[2019-04-05 07:11:17][6] Processing: App\Jobs\StoreCommittee
[2019-04-05 07:11:40][6] Processed:  App\Jobs\StoreCommittee
[2019-04-05 07:11:43][7] Processing: App\Jobs\StoreCommittee
[2019-04-05 07:12:05][7] Processed:  App\Jobs\StoreCommittee
[2019-04-05 07:12:08][8] Processing: App\Jobs\StoreCommittee
[2019-04-05 07:12:31][8] Processed:  App\Jobs\StoreCommittee
[2019-04-05 07:12:34][9] Processing: App\Jobs\StoreCommittee
[2019-04-05 07:12:57][9] Processed:  App\Jobs\StoreCommittee
[2019-04-05 07:13:00][10] Processing: App\Jobs\StoreCommittee

dell@DESKTOP-UQ2 MINGW64 /d/wamp64/www/project(master)
$
(it stops without any error or failed notifications)

Есть ли что-нибудь, что я могу улучшить в своей логике работы? Как я могу справиться со всем этим обрывом соединения, максимальным тайм-аутом или какими-то другими вещами? Я не думаю, что увеличение времени ожидания является решением. Поскольку нельзя гарантировать, что он будет завершен в течение этого фиксированного времени.

Вместо этого есть способ закрыть соединение и снова подключиться между каждой рабочей очередью?


person Aayush Dahal    schedule 06.04.2019    source источник
comment
Похоже, вы пытаетесь использовать один запрос, чтобы сжать все 500 тыс. записей в MySQL. Это правда? Если да, то подход совершенно неправильный. Вы должны подготовить оператор один раз для вставки одной записи. В цикле вы связываете значения и выполняете оператор. У вас также есть транзакция, но, насколько я могу судить, выполняется только 1 запрос. Это означает, что транзакция бессмысленна, потому что если у вас есть только 1 запрос, InnoDB работает в режиме автоматической фиксации (каждый запрос является отдельной транзакцией).   -  person Mjh    schedule 06.04.2019
comment
@mjh в настоящее время я использую кусок из 250 строк, это означает, что этот цикл foreach($records_arr as $ck => $cv){ будет выполняться 250 раз, а затем вызывать то же задание со значением смещения 250. Не могли бы вы предложить какой-либо другой способ, если это не подходит для этого?   -  person Aayush Dahal    schedule 07.04.2019


Ответы (1)


Попытка решения

Вы проанализировали файл CSV и попытались отправить все содержимое с помощью одного запроса. MySQL содержит переменные, которые не позволяют ему принимать слишком большие запросы. Он называется max_allowed_packet.

Причина, по которой вы это сделали, была производительность. Однако вы можете столкнуться с одной из многих переменных, связанных с сетью/MySQL, при работе с запросом, который слишком велик с точки зрения количества данных.

Улучшенные критерии решения

  • Чистый запрос, чтобы было видно, что делается
  • Быстрый запрос, поэтому можно отправить много данных для быстрой записи
  • Не нажимайте значения ограничивающих переменных, таких как max_packet_size.

Решение

  1. Заявление готовил ровно один раз. Подготовленные операторы используются один раз, выполняются несколько раз

  2. Разобрать CSV и прокрутить записи

  3. Привяжите значения к подготовленным операторам и выполните их по мере прохождения цикла.

  4. Чтобы все было быстрее, используйте транзакции. Оборачивать каждые 1000 записей в транзакцию. Это позволит вам писать простые запросы на вставку, но они будут быстрыми, потому что MySQL будет мультиплексировать записи.

Вы используете Laravel, поэтому описанные выше шаги очень просты

Псевдокод с использованием laravel

$csv = collect([]); // This is the array holding your CSV records

// Split the array into chunks. Let's assume you want to insert 1000 records in one attempt

$chunk_count = ceil($csv->count() / 1000);

$csv->chunk($chunk_count)->map(function($chunk) {
    \DB::beginTransaction();

    // Create a record 
    $chunk->map(function($data) {
        StoreCommittee::create($data);
    });

    \DB::commit();
});
person Mjh    schedule 08.04.2019