Загрузите всего M файлов с помощью N одновременных асинхронных HTTP-клиентов, где M — большое, а N — настраиваемое

Я пытаюсь написать скрипт, который будет загружать не более N файлов одновременно через HTTP.

Ранее я использовал AnyEvent::Worker::Pool для управления пулом задач BLOCKING. Я также использовал AnyEvent:: HTTP в сочетании с AnyEvent->condvar для индивидуального управления НЕБЛОКИРУЕМЫМИ загрузками.

Я подумал, что должно быть довольно просто объединить два подхода, чтобы AnyEvent->condvar заставляет AnyEvent::HTTP::http_get выглядеть БЛОКИРУЕМЫМ с точки зрения AnyEvent::Worker::Pool.

Однако я получаю некоторые ошибки, которых не понимаю, предположительно из-за деталей реализации AnyEvent::Worker. Вот действительно урезанная версия скрипта, демонстрирующая проблему:

use EV;
use AnyEvent 5;
use AnyEvent::Worker::Pool;
use AnyEvent::HTTP;
use 5.10.0;
use strict;

my $pool_size = 2;
my $num_jobs  = 7;

# Create a pool of $pool_size workers
my $workers = AnyEvent::Worker::Pool->new($pool_size, sub {
  my ($job) = @_;
  eval {
    my $cv = AnyEvent->condvar;
    print "worker starting download [$job] ...\n";
    http_get 'http://download.thinkbroadband.com/5MB.zip', sub {
      my ($data, $headers) = @_;
      if ($headers->{Status} =~ /^2/) { 
        print "download [$job] succeeded.\n"; 
      } else { 
        print "download [$job] failed.\n"; 
      }
      $cv->send; # notification of download complete/exit.
    };

    $cv->recv; # wait for download to complete/exit before returning to pool
  }; if ($@) {
    print "worker payload error: $@\n";
  }
  return 1;
});

# dispatch the full list of downloads
my ($need,$done) = ($num_jobs, 0);
for my $job (0 .. ($need - 1)) {
  print "dispatching job $job...\n";
  $workers->do($job, sub {
    print "worker [$job] payload threw exception: $@\n" if $@;
    print "worker [$job] payload completed successfully!\n" unless $@;
    EV::unloop if ++$done == $need;
  });
}

EV::loop; # wait here for all downloads to complete
print "We're done!\n"; # some useful code to follow here...

Демонстрационный вывод выглядит следующим образом:

user@host:~$ ./test.pl
dispatching job 0...
dispatching job 1...
dispatching job 2...
dispatching job 3...
dispatching job 4...
dispatching job 5...
dispatching job 6...
worker starting download [0] ...
worker starting download [1] ...
EV: error in callback (ignoring): unexpected eof at /usr/local/share/perl/5.14.2/AnyEvent/Worker/Pool.pm, line 46
EV: error in callback (ignoring): unexpected eof at /usr/local/share/perl/5.14.2/AnyEvent/Worker/Pool.pm, line 46
EV: error in callback (ignoring): unexpected eof at /usr/local/share/perl/5.14.2/AnyEvent/Worker/Pool.pm, line 46
worker [6] payload threw exception: no worker connection
EV: error in callback (ignoring): no worker connection at /usr/local/share/perl/5.14.2/AnyEvent/Worker/Pool.pm, line 60

^C
user@host:~$
user@host:~$
user@host:~$ download [1] failed.
unable to write results: Broken pipe at /usr/local/share/perl/5.14.2/AnyEvent/Worker.pm line 139.
  ...caught at /usr/local/share/perl/5.14.2/AnyEvent/Worker.pm line 145.

Почему AnyEvent::HTTP?

В моем реальном сценарии я использую гораздо больше возможностей AnyEvent::HTTP; в частности, я комбинирую обратный вызов on_body с Term::StatusBar, чтобы показать индикатор выполнения для конечного пользователя скрипта; кроме того, я стратегически «приостанавливаю» обратный вызов on_body, чтобы поддерживать скорость передачи, равную или меньшую, чем скорость, предварительно определенная конечным пользователем.

Пожалуйста, не стесняйтесь предлагать альтернативу с этими функциями (или простой способ взломать их!)

Почему AnyEvent::Worker::Pool?

Я уже был с ним знаком. Приветствуются альтернативные предложения.

Почему EV?

Это быстро. Опять же, альтернативные предложения приветствуются.


person David-SkyMesh    schedule 15.12.2013    source источник
comment
Сценарии Perl, как правило, используют глобальные переменные для файловых дескрипторов. Это не потокобезопасно.   -  person David Knipe    schedule 15.12.2013
comment
@DavidKnipe Я не думаю, что EV, AnyEvent или AnyEvent::HTTP используют потоки Perl. У вас есть доказательства того, что они это делают?   -  person David-SkyMesh    schedule 15.12.2013
comment
Нет, я совсем не знаком с библиотеками, которые вы используете, но вопрос звучал как связанный с потоком. В общем, не обращайте на меня особого внимания :-)   -  person David Knipe    schedule 15.12.2013
comment
@ДэвидКнайп. Нет проблем :-) AnyEvent — это абстракция цикла событий, не зависящая от реализации. В этом случае я использую EV (libev), который представляет собой цикл событий, основанный на парадигме однопоточного мультиплексирования событий.   -  person David-SkyMesh    schedule 15.12.2013
comment
Вы говорите, что клиенты по очереди обрабатывают биты ответа от сервера? В этом случае все еще верно, что все они пытаются одновременно открыть дескрипторы файлов. Что было бы проблематично. Это предполагает, что они используют файловые дескрипторы и что файловые дескрипторы реализованы в традиционном стиле как глобальные переменные. И довольно бесполезно то, что Perl, кажется, не жалуется, если вы пытаетесь открыть дескриптор файла, который уже открыт. Но это Perl для вас :-)   -  person David Knipe    schedule 15.12.2013
comment
@DavidKnipe клиенты работают нормально одновременно ... это не 1995 год :) Эти однопоточные серверы / циклы, основанные на событиях, широко используются в Perl уже около 10 лет. Я просто пытаюсь управлять рабочими в пуле из N MAX рабочих.   -  person David-SkyMesh    schedule 15.12.2013


Ответы (1)


Вы не должны использовать AnyEvent::Worker::Poll для этой задачи.
И я рекомендую вам не использовать специальные функции цикла, такие как EV::loop EV::unloop. Это делает ваш код несовместимым с реализацией других циклов.

Ваш код может быть переписан так

use strict;
use AnyEvent;
use AnyEvent::HTTP;

my $pool_size = 2;
my $num_jobs  = 7;
my $cur_job = 0;

my $cv = AnyEvent->condvar;
$cv->begin();

for (1..($pool_size < $num_jobs ? $pool_size : $num_jobs)) {
    $cv->begin();
    make_job($cur_job++);
}

$cv->end();

sub make_job {
    my $job = shift;
    $num_jobs--;

    http_get 'http://download.thinkbroadband.com/5MB.zip', sub {
        my ($data, $headers) = @_;
        if ($headers->{Status} =~ /^2/) { 
            print "download [$job] succeeded.\n"; 
        } else { 
            print "download [$job] failed.\n"; 
        }

        if ($num_jobs > 0) {
            make_job($cur_job++);
        }
        else {
            $cv->end();
        }
    };
}

$cv->recv();
person Oleg G    schedule 15.12.2013
comment
Это намного проще, чем AnyEvent::Worker. Хорошо сделано. Вы хоть представляете, что с ним пошло не так? - person David-SkyMesh; 16.12.2013
comment
Я думаю, что вы не можете полагаться на один и тот же цикл событий в дочернем процессе. это не безопасно - person Oleg G; 16.12.2013
comment
Ой ну спасибо. Я даже не знал, что AnyEvent::Worker создал подпроцессы. Я думаю, это должно было быть очевидно, учитывая предупреждения о сокетах домена в выводе. :-/ - person David-SkyMesh; 16.12.2013
comment
В качестве примечания: AnyEvent::Fork (и связанные с ним модули), если это когда-либо понадобится, может создавать процессы, в которых можно использовать AnyEvent или другие циклы обработки событий. Для этой задачи приведенный выше ответ, конечно, более уместен. - person Remember Monica; 13.10.2014