pcntl запускает один и тот же код несколько раз, требуется помощь

Я использую pcntl, чтобы ускорить довольно тяжелый PHP-скрипт CLI, который состоит в основном из класса, который отвечает за отправку всех автоматических электронных писем в моем приложении.

Моя цель заключается в следующем: я хочу назначить каждый процесс определенной задаче в рамках цикла foreach, реализация, которую я использовал, показана в примере кода ниже.

Проблема в том, что как только вы разветвляете процесс, он выполняется асинхронно, а также получает копию родительского стека процессов. В моем случае происходит то, что одна задача просто выполняется несколько раз. Мой вопрос в том, как я могу сделать этот сценарий умнее, чтобы избежать такого поведения ?.

Код:

/**
@description this is the main procedure of this class, it iteratates over the relevant tasks and sends the emails using the SendGrid wrapper class
@see SendGridWrapper
@return void
*/
public function execute(){
    if(!isset($this->tasks)){
        throw new exception("Please call getRelevantTasks() prior to trying to execute anything");
    }
$proccesses = array();
foreach($this->tasks as $myTask){
    $pid = pcntl_fork();
    if($pid){
        $proccesses[] = $pid;
    }
    else if($pid == -1){
        die('FORK FAILED, STATUS -1');
    }
    else{   
        if(isset($myTask['recipient_model'])){

            $this->currentModel = $myTask['recipient_model'];
            $lang = $myTask['lang'];
            $classPath = self::$modelsDir . $myTask['recipient_model'] . '.php';
            $className = $myTask['recipient_model'];
            if(!class_exists($myTask['recipient_model'] )){
                require_once(dirname(__FILE__) . '/../' .  $classPath); 
            }
            else if(isset($recipientFetcher)){
                unset($this->model);
                unset($this->mailingList);
                unset($this->substitutionList);
            }
            $this->model = null;
            $this->mailingList = null;
            $this->substitutionList = null;
            $this->model = new $className($myTask['lang']);
            $addresses = $this->model->getMailRecipients();
            if(empty($addresses) || sizeof($addresses) == 0){
                continue;
            }
            $this->model->prepare();
            $this->substitutionList = $this->model->getDynamicParams();

        }
        else{
            throw new exception('No recipient model was found');
        }

        foreach($addresses as $myMail){
            $this->mailingList[$myMail['personal_email']] = $myMail['contact_name'];
        }
        $templatePath = dirname(__FILE__) . '/../'; 
        $templatePath .= $lang ? self::$templatesDirEn . $myTask['html_email_path'] : self::$templatesDirHe . $myTask['html_email_path'];
        $html = file_get_contents($templatePath);
        $this->sendMail($html, $myTask['task_schedule_id']);
        echo "model:" . $myTask['recipient_model'];
        echo $this->log;
        $this->log = "";
        die("\r\n Child proccess has been executed successfully\r\n");  

    }   
}
if($pid){
    foreach($proccesses as $key => $val){
         pcntl_waitpid($val, $status, WUNTRACED);
    }
}           

}

Заранее спасибо, Олег.


person Oleg Belousov    schedule 05.05.2013    source источник


Ответы (1)


Введение

Я вижу, что вы пытаетесь отправлять письма $this->sendMail($html, $myTask['task_schedule_id']);, и я считаю, что использовать несколько процессов для этой задачи - плохая идея. Вам следует подумать об использовании очереди сообщений для этой задачи, потому что электронные письма могут быть очень медленными.

Используйте систему очередей

Вы должны использовать Gearman, ZeroMQ или Beanstalkd для этой задачи. Использование в худшем случае Реализуйте собственную простую очередь сообщений с memcached.

Вот типичный пример Gearman: https://stackoverflow.com/questions/13855907/when-to-send-auto-email-instants-on-button-click-or-later

Быстрое исправление

Удалите весь этот код и поместите его в функцию с именем execute_worker, где вы можете передать ей задачу.

// Break Task to groups
$tasks = array_chunk(range("A", "Z"), 10);
foreach($tasks as $task) {
    $pid = pcntl_fork();
    if ($pid == - 1) {
        throw new ErrorException('FORK FAILED, STATUS -1');
        break;
    }

    if ($pid == 0) {
        execute_worker($task); // In Child
        exit(); // In Child
    }
}

Использование потоков

Вы также можете использовать Worker или Тема в PHP с pThreads для ускорения обработки.

  • Простой в использовании, быстрый в освоении Threading API для PHP5.3 +
  • Выполнять все предопределенные и объявленные пользователем методы и функции асинхронно
  • Включена готовая синхронизация, ориентированная на среду PHP
  • Да! Поддержка Windows

Простой проект

file_get_contents считается медленным по сравнению с curl и нигде не приближается к мощности curl_multi_init, которая позволяет обрабатывать несколько дескрипторов cURL параллельно.

Видеть:

Наша цель - реализовать нашу собственную версию Multi file_get_contents

Пример нескольких file_get_contents

// My Storage
$s = new Storage();

// Threads Storage
$ts = array();

// Total Threads same as total pages
$pages = 100;

// Porpulate Threads (Don't Start Yet)
$i = 0;
while($i ++ < $pages) {
    $ts[] = new Process($s, $i);
}

// Start the timer
$start = microtime(true);

// Lets start all our Threads
foreach($ts as $t) {
    $t->start();
}

// Wait for all threads to compleate
foreach($ts as $t) {
    $t->join();
}

printf("\n\nFound %s in %d pages", number_format($s->total), $pages);
printf("\nFinished %0.3f sec", microtime(true) - $start);

Вывод

php a.php
3:01:37: 3548 #START    {"page":1}
3:01:37: 7064 #START    {"page":2}
3:01:37: 10908 #START   {"page":3}
3:01:37: 10424 #START   {"page":4}
3:01:37: 11472 #START   {"page":5}
3:01:37: 3876 #START    {"page":6}
3:01:37: 7276 #START    {"page":7}
3:01:37: 11484 #START   {"page":8}
3:01:37: 932 #START     {"page":9}
3:01:37: 11492 #START   {"page":10}
3:01:37: 11500 #START   {"page":11}
3:01:37: 11508 #START   {"page":12}
3:01:37: 11504 #START   {"page":13}
3:01:37: 11512 #START   {"page":14}
3:01:37: 11516 #START   {"page":15}
3:01:37: 11520 #START   {"page":16}
3:01:37: 11524 #START   {"page":17}
3:01:37: 11528 #START   {"page":18}
3:01:37: 10816 #START   {"page":19}
3:01:37: 7280 #START    {"page":20}
3:01:37: 11556 #START   {"page":21}
3:01:37: 11560 #START   {"page":22}
3:01:37: 11564 #START   {"page":23}
3:01:37: 11612 #START   {"page":24}
3:01:37: 11616 #START   {"page":25}
3:01:37: 11600 #START   {"page":26}
3:01:37: 11608 #START   {"page":27}
3:01:37: 11568 #START   {"page":28}
3:01:37: 11452 #START   {"page":29}
3:01:38: 11624 #START   {"page":30}
3:01:38: 11628 #START   {"page":31}
3:01:38: 11632 #START   {"page":32}
3:01:38: 11636 #START   {"page":33}
3:01:38: 11644 #START   {"page":34}
3:01:38: 11648 #START   {"page":35}
3:01:38: 11652 #START   {"page":36}
3:01:38: 11656 #START   {"page":37}
3:01:38: 11660 #START   {"page":38}
3:01:38: 11664 #START   {"page":39}
3:01:38: 11668 #START   {"page":40}
3:01:38: 11672 #START   {"page":41}
3:01:38: 11676 #START   {"page":42}
3:01:38: 11680 #START   {"page":43}
3:01:38: 11684 #START   {"page":44}
3:01:38: 11688 #START   {"page":45}
3:01:38: 11692 #START   {"page":46}
3:01:38: 11696 #START   {"page":47}
3:01:38: 11700 #START   {"page":48}
3:01:38: 11704 #START   {"page":49}
3:01:38: 11712 #START   {"page":50}
3:01:38: 11708 #START   {"page":51}
3:01:38: 11716 #START   {"page":52}
3:01:38: 11720 #START   {"page":53}
3:01:38: 11724 #START   {"page":54}
3:01:38: 11728 #START   {"page":55}
3:01:38: 11732 #START   {"page":56}
3:01:38: 11736 #START   {"page":57}
3:01:38: 11740 #START   {"page":58}
3:01:38: 11744 #START   {"page":59}
3:01:38: 11748 #START   {"page":60}
3:01:38: 11752 #START   {"page":61}
3:01:38: 11756 #START   {"page":62}
3:01:38: 11760 #START   {"page":63}
3:01:38: 11764 #START   {"page":64}
3:01:38: 11768 #START   {"page":65}
3:01:38: 11772 #START   {"page":66}
3:01:38: 11776 #START   {"page":67}
3:01:38: 11780 #START   {"page":68}
3:01:38: 11784 #START   {"page":69}
3:01:38: 11788 #START   {"page":70}
3:01:38: 11792 #START   {"page":71}
3:01:38: 11796 #START   {"page":72}
3:01:38: 11800 #START   {"page":73}
3:01:38: 11804 #START   {"page":74}
3:01:38: 11808 #START   {"page":75}
3:01:38: 11812 #START   {"page":76}
3:01:38: 11816 #START   {"page":77}
3:01:38: 11820 #START   {"page":78}
3:01:38: 11824 #START   {"page":79}
3:01:38: 11828 #START   {"page":80}
3:01:38: 11832 #START   {"page":81}
3:01:38: 11836 #START   {"page":82}
3:01:38: 11840 #START   {"page":83}
3:01:38: 11844 #START   {"page":84}
3:01:38: 11848 #START   {"page":85}
3:01:38: 11852 #START   {"page":86}
3:01:38: 11856 #START   {"page":87}
3:01:38: 11860 #START   {"page":88}
3:01:38: 11864 #START   {"page":89}
3:01:38: 11868 #START   {"page":90}
3:01:38: 11872 #START   {"page":91}
3:01:38: 11876 #START   {"page":92}
3:01:38: 11880 #START   {"page":93}
3:01:38: 11884 #START   {"page":94}
3:01:38: 11888 #START   {"page":95}
3:01:38: 11892 #START   {"page":96}
3:01:38: 11896 #START   {"page":97}
3:01:38: 11900 #START   {"page":98}
3:01:38: 11904 #START   {"page":99}
3:01:38: 11908 #START   {"page":100}
3:01:38: 11508 #END             {"page":12,"byte":1141,"count":155839}
3:01:38: 10424 #END             {"page":4,"byte":1201,"count":553595}
3:01:38: 11516 #END             {"page":15,"byte":1204,"count":119612}
3:01:38: 3548 #END              {"page":1,"byte":1208,"count":6737525}
3:01:38: 11484 #END             {"page":8,"byte":1160,"count":257021}
3:01:38: 11472 #END             {"page":5,"byte":1175,"count":446411}
3:01:38: 10908 #END             {"page":3,"byte":1222,"count":787301}
3:01:38: 11492 #END             {"page":10,"byte":1175,"count":193958}
3:01:38: 11504 #END             {"page":13,"byte":1130,"count":141450}
3:01:38: 11528 #END             {"page":18,"byte":1102,"count":95511}
3:01:38: 11524 #END             {"page":17,"byte":1147,"count":102727}
3:01:38: 11560 #END             {"page":22,"byte":1111,"count":73536}
3:01:38: 11556 #END             {"page":21,"byte":1101,"count":78097}
3:01:38: 11500 #END             {"page":11,"byte":1201,"count":172820}
3:01:38: 932 #END               {"page":9,"byte":1159,"count":222922}
3:01:38: 11520 #END             {"page":16,"byte":1135,"count":110510}
3:01:38: 7064 #END              {"page":2,"byte":1165,"count":1264444}
3:01:38: 11512 #END             {"page":14,"byte":1123,"count":129721}
3:01:38: 11612 #END             {"page":24,"byte":1115,"count":65012}
3:01:38: 11600 #END             {"page":26,"byte":1134,"count":58928}
3:01:38: 7276 #END              {"page":7,"byte":1189,"count":301469}
3:01:38: 10816 #END             {"page":19,"byte":1120,"count":89609}
3:01:38: 11616 #END             {"page":25,"byte":1052,"count":61793}
3:01:38: 3876 #END              {"page":6,"byte":1188,"count":362101}
3:01:38: 7280 #END              {"page":20,"byte":1079,"count":83632}
3:01:38: 11564 #END             {"page":23,"byte":1076,"count":68909}
3:01:38: 11632 #END             {"page":32,"byte":1095,"count":44013}
3:01:38: 11652 #END             {"page":36,"byte":1042,"count":37185}
3:01:38: 11452 #END             {"page":29,"byte":1097,"count":50532}
3:01:38: 11636 #END             {"page":33,"byte":1097,"count":42148}
3:01:38: 11644 #END             {"page":34,"byte":1124,"count":40236}
3:01:38: 11664 #END             {"page":39,"byte":1078,"count":32792}
3:01:38: 11668 #END             {"page":40,"byte":1017,"count":31487}
3:01:38: 11608 #END             {"page":27,"byte":1117,"count":55561}
3:01:38: 11628 #END             {"page":31,"byte":1076,"count":46133}
3:01:38: 11624 #END             {"page":30,"byte":1111,"count":48265}
3:01:38: 11568 #END             {"page":28,"byte":1076,"count":52851}
3:01:38: 11656 #END             {"page":37,"byte":1068,"count":35590}
3:01:38: 11688 #END             {"page":45,"byte":1062,"count":26060}
3:01:38: 11680 #END             {"page":43,"byte":1081,"count":28013}
3:01:38: 11672 #END             {"page":41,"byte":1086,"count":30320}
3:01:38: 11724 #END             {"page":54,"byte":1060,"count":19900}
3:01:38: 11716 #END             {"page":52,"byte":1069,"count":21079}
3:01:38: 11732 #END             {"page":56,"byte":1038,"count":18748}
3:01:38: 11692 #END             {"page":46,"byte":1033,"count":25230}
3:01:38: 11696 #END             {"page":47,"byte":1098,"count":24469}
3:01:38: 11728 #END             {"page":55,"byte":1003,"count":19353}
3:01:38: 11648 #END             {"page":35,"byte":1105,"count":38651}
3:01:38: 11660 #END             {"page":38,"byte":1075,"count":34037}
3:01:38: 11700 #END             {"page":48,"byte":1059,"count":23725}
3:01:39: 11720 #END             {"page":53,"byte":1028,"count":20463}
3:01:39: 11704 #END             {"page":49,"byte":1006,"count":22966}
3:01:39: 11712 #END             {"page":50,"byte":988,"count":22369}
3:01:39: 11676 #END             {"page":42,"byte":1113,"count":29144}
3:01:39: 11748 #END             {"page":60,"byte":1054,"count":17002}
3:01:39: 11684 #END             {"page":44,"byte":1041,"count":26999}
3:01:39: 11756 #END             {"page":62,"byte":1024,"count":16165}
3:01:39: 11760 #END             {"page":63,"byte":1036,"count":15814}
3:01:39: 11740 #END             {"page":58,"byte":1075,"count":17833}
3:01:39: 11736 #END             {"page":57,"byte":1064,"count":18293}
3:01:39: 11752 #END             {"page":61,"byte":1077,"count":16607}
3:01:39: 11708 #END             {"page":51,"byte":1045,"count":21668}
3:01:39: 11768 #END             {"page":65,"byte":1041,"count":15021}
3:01:39: 11764 #END             {"page":64,"byte":1063,"count":15405}
3:01:39: 11744 #END             {"page":59,"byte":1052,"count":17394}
3:01:39: 11800 #END             {"page":73,"byte":1025,"count":12361}
3:01:39: 11792 #END             {"page":71,"byte":1053,"count":13051}
3:01:39: 11796 #END             {"page":72,"byte":1092,"count":12721}
3:01:39: 11784 #END             {"page":69,"byte":1031,"count":13677}
3:01:39: 11780 #END             {"page":68,"byte":1019,"count":13967}
3:01:39: 11772 #END             {"page":66,"byte":1068,"count":14644}
3:01:39: 11816 #END             {"page":77,"byte":1045,"count":11185}
3:01:39: 11804 #END             {"page":74,"byte":1062,"count":12071}
3:01:39: 11824 #END             {"page":79,"byte":1047,"count":10719}
3:01:39: 11820 #END             {"page":78,"byte":1035,"count":10940}
3:01:39: 11788 #END             {"page":70,"byte":987,"count":13354}
3:01:39: 11776 #END             {"page":67,"byte":1036,"count":14278}
3:01:39: 11828 #END             {"page":80,"byte":1013,"count":10519}
3:01:39: 11832 #END             {"page":81,"byte":1052,"count":10318}
3:01:39: 11812 #END             {"page":76,"byte":991,"count":11465}
3:01:39: 11808 #END             {"page":75,"byte":1043,"count":11769}
3:01:39: 11860 #END             {"page":88,"byte":1018,"count":8991}
3:01:39: 11852 #END             {"page":86,"byte":971,"count":9362}
3:01:39: 11868 #END             {"page":90,"byte":1006,"count":8641}
3:01:39: 11840 #END             {"page":83,"byte":1026,"count":9922}
3:01:39: 11872 #END             {"page":91,"byte":980,"count":8464}
3:01:39: 11892 #END             {"page":96,"byte":936,"count":7727}
3:01:39: 11836 #END             {"page":82,"byte":1052,"count":10117}
3:01:39: 11844 #END             {"page":84,"byte":973,"count":9739}
3:01:39: 11864 #END             {"page":89,"byte":1033,"count":8821}
3:01:39: 11856 #END             {"page":87,"byte":994,"count":9169}
3:01:39: 11848 #END             {"page":85,"byte":1040,"count":9544}
3:01:39: 11896 #END             {"page":97,"byte":988,"count":7562}
3:01:39: 11876 #END             {"page":92,"byte":1003,"count":8294}
3:01:39: 11888 #END             {"page":95,"byte":995,"count":7860}
3:01:39: 11880 #END             {"page":93,"byte":1052,"count":8143}
3:01:39: 11900 #END             {"page":98,"byte":977,"count":7418}
3:01:39: 11904 #END             {"page":99,"byte":999,"count":7270}
3:01:39: 11884 #END             {"page":94,"byte":931,"count":8002}
3:01:39: 11908 #END             {"page":100,"byte":977,"count":7144}


Found 14,075,927 in 100 pages
Finished 1.489 sec

Затраченное время

Found 14,075,927 in 100 pages
Finished 1.489 sec

Используемые классы

class Process extends Thread {

    public function __construct($storage, $page) {
        $this->storage = $storage;
        $this->page = $page;
        // $this->start();
    }

    public function run() {
        $format = "%s: %1u %s\t%s\n";
        $formatTime = "g:i:s";
        $sleep = mt_rand(0, 1); // Just for Demo

        printf($format, date($formatTime), $this->getThreadId(), "#START", "{\"page\":$this->page}");

        // Do something useful
        $data = file_get_contents(sprintf("http://api.stackoverflow.com/1.1/tags?pagesize=100&page=%s", $this->page));

        // Decode the Data from API
        $json = json_decode(gzdecode($data));

        // Lets Build A profile
        $profile = array();
        $profile['page'] = $this->page;
        $profile['byte'] = strlen($data);

        // Do more work
        $profile['count'] = array_sum(array_map(function ($v) {
            return $v->count;
        }, $json->tags));

        $this->storage->total = bcadd($this->storage->total, $profile['count']);
        // Print Information
        printf($format, date($formatTime), $this->getThreadId(), "#END\t", json_encode($profile));
    }
}

class Storage extends Stackable {
    public $total = 0;

    public function run() {
    }
}

Заключение

file_get_contents только что получил 100 pages всего за 1.489 sec из-за моего дерьмового соединения. Yes так и было. Протестировал тот же код на моем реальном сервере, и мне потребовалось менее 0.939 sec, чтобы получить 200 страниц.

Ваше приложение может быть быстрее во многих отношениях, вам просто нужно использовать правильную технологию в нужном месте.

person Baba    schedule 08.05.2013