Я пытаюсь написать скрипт, который будет загружать не более N
файлов одновременно через HTTP.
Ранее я использовал AnyEvent::Worker::Pool для управления пулом задач BLOCKING. Я также использовал AnyEvent:: HTTP в сочетании с AnyEvent->condvar для индивидуального управления НЕБЛОКИРУЕМЫМИ загрузками.
Я подумал, что должно быть довольно просто объединить два подхода, чтобы AnyEvent->condvar заставляет AnyEvent::HTTP::http_get выглядеть БЛОКИРУЕМЫМ с точки зрения AnyEvent::Worker::Pool.
Однако я получаю некоторые ошибки, которых не понимаю, предположительно из-за деталей реализации AnyEvent::Worker. Вот действительно урезанная версия скрипта, демонстрирующая проблему:
use EV;
use AnyEvent 5;
use AnyEvent::Worker::Pool;
use AnyEvent::HTTP;
use 5.10.0;
use strict;
my $pool_size = 2;
my $num_jobs = 7;
# Create a pool of $pool_size workers
my $workers = AnyEvent::Worker::Pool->new($pool_size, sub {
my ($job) = @_;
eval {
my $cv = AnyEvent->condvar;
print "worker starting download [$job] ...\n";
http_get 'http://download.thinkbroadband.com/5MB.zip', sub {
my ($data, $headers) = @_;
if ($headers->{Status} =~ /^2/) {
print "download [$job] succeeded.\n";
} else {
print "download [$job] failed.\n";
}
$cv->send; # notification of download complete/exit.
};
$cv->recv; # wait for download to complete/exit before returning to pool
}; if ($@) {
print "worker payload error: $@\n";
}
return 1;
});
# dispatch the full list of downloads
my ($need,$done) = ($num_jobs, 0);
for my $job (0 .. ($need - 1)) {
print "dispatching job $job...\n";
$workers->do($job, sub {
print "worker [$job] payload threw exception: $@\n" if $@;
print "worker [$job] payload completed successfully!\n" unless $@;
EV::unloop if ++$done == $need;
});
}
EV::loop; # wait here for all downloads to complete
print "We're done!\n"; # some useful code to follow here...
Демонстрационный вывод выглядит следующим образом:
user@host:~$ ./test.pl
dispatching job 0...
dispatching job 1...
dispatching job 2...
dispatching job 3...
dispatching job 4...
dispatching job 5...
dispatching job 6...
worker starting download [0] ...
worker starting download [1] ...
EV: error in callback (ignoring): unexpected eof at /usr/local/share/perl/5.14.2/AnyEvent/Worker/Pool.pm, line 46
EV: error in callback (ignoring): unexpected eof at /usr/local/share/perl/5.14.2/AnyEvent/Worker/Pool.pm, line 46
EV: error in callback (ignoring): unexpected eof at /usr/local/share/perl/5.14.2/AnyEvent/Worker/Pool.pm, line 46
worker [6] payload threw exception: no worker connection
EV: error in callback (ignoring): no worker connection at /usr/local/share/perl/5.14.2/AnyEvent/Worker/Pool.pm, line 60
^C
user@host:~$
user@host:~$
user@host:~$ download [1] failed.
unable to write results: Broken pipe at /usr/local/share/perl/5.14.2/AnyEvent/Worker.pm line 139.
...caught at /usr/local/share/perl/5.14.2/AnyEvent/Worker.pm line 145.
Почему AnyEvent::HTTP?
В моем реальном сценарии я использую гораздо больше возможностей AnyEvent::HTTP
; в частности, я комбинирую обратный вызов on_body
с Term::StatusBar
, чтобы показать индикатор выполнения для конечного пользователя скрипта; кроме того, я стратегически «приостанавливаю» обратный вызов on_body
, чтобы поддерживать скорость передачи, равную или меньшую, чем скорость, предварительно определенная конечным пользователем.
Пожалуйста, не стесняйтесь предлагать альтернативу с этими функциями (или простой способ взломать их!)
Почему AnyEvent::Worker::Pool?
Я уже был с ним знаком. Приветствуются альтернативные предложения.
Почему EV?
Это быстро. Опять же, альтернативные предложения приветствуются.
EV
,AnyEvent
илиAnyEvent::HTTP
используют потоки Perl. У вас есть доказательства того, что они это делают? - person David-SkyMesh   schedule 15.12.2013AnyEvent
— это абстракция цикла событий, не зависящая от реализации. В этом случае я используюEV
(libev), который представляет собой цикл событий, основанный на парадигме однопоточного мультиплексирования событий. - person David-SkyMesh   schedule 15.12.2013N
MAX рабочих. - person David-SkyMesh   schedule 15.12.2013