Отработка отказа Beanstalkd для очередей в Laravel

Кто-нибудь знает о надежном механизме отработки отказа для очередей Laravel?

В какой-то момент на моем сервере beanstalkd произошла какая-то ошибка (все еще выясняющая, что пошло не так), запускающая Pheanstalk_Exception_ConnectionException в драйвере очереди Laravel (4). В результате новые задания не могли быть помещены в очередь.

Я пытаюсь создать какой-то драйвер аварийного переключения для QueueInterface, который может принимать несколько экземпляров драйверов, поэтому я могу определить, например, драйвер «sync» или «redis» как очередь аварийного переключения. Затем, как только beanstalkd выйдет из строя, этот драйвер будет выполнять задания, и работа не будет потеряна.


person Joram van den Boezem    schedule 30.01.2015    source источник


Ответы (1)


Я просто нанесу вам небольшой удар по этому поводу и надеюсь, что это даст вам представление ... я думаю, что у всех разные потребности в очередях, так что это именно то, что я сделал.

Честно говоря, я просто вырезал часть своего кода, чтобы попытаться упростить то, что я сделал. Я думаю, попытайтесь найти в этом какой-то смысл.

В вашей конфигурации очереди:

'connections' => array(
    'beanstalkd' => array(
        'driver' => 'beanstalk_extended',
        'host'   => 'my.ip.address',
        'queue'  => 'default',
        'ttr'    => 60,
    ),
    'my_fallback' => array(
        'driver' => 'sync',
    ),
);

В ServiceProvider @ boot:

/**
 * Boot the Beanstalkd queue.
 */
protected function bootQueue()
{
    $this->app['queue']->extend('beanstalk_extended', function () {
        return new BeanstalkConnector;
    });

    $this->app->bindShared('queue.failer', function($app) {
        $config = $app['config']['queue.failed'];
        return new DatabaseFailedJobProvider($app['db'], $config['database'], $config['table']);
    });

    $this->app->bindShared('queue.worker', function($app) {
        return new Worker($app['queue'], $app['queue.failer'], $app['events']);
    });
}

Разъем:

<?php namespace App\Framework\Queue\Connectors;

use Illuminate\Queue\Connectors\ConnectorInterface;
use Pheanstalk_Pheanstalk as Pheanstalk;
use App\Framework\Queue\Beanstalk;

class BeanstalkConnector implements ConnectorInterface
{

/**
 * Establish a queue connection.
 *
 * @param  array $config
 * @return \Illuminate\Queue\QueueInterface
 */
    public function connect(array $config)
    {
        $pheanstalk = new Pheanstalk($config['host']);
        $bean = new Beanstalk($pheanstalk, $config['queue'], array_get($config, 'ttr', Pheanstalk::DEFAULT_TTR));
       return $bean;
    }
}

Затем внутри расширения Beanstalkd:

/**
 * Push a new job onto the queue.
 *
 * @param  string $job
 * @param  mixed $data
 * @param  string $queue
 * @return int
 */
public function push($job, $data = '', $queue = null)
{
    try {
        $queue = $this->getQueue($queue);
        $id = parent::push($job, $data, $queue);
        return $id;
    } catch (\Exception $e) {
        return \Queue::connection('my_fallback')->push($job, $data, $queue);
    }
}
person Ryan Hungate    schedule 28.02.2015
comment
Спасибо, Райан, скоро попробую .. Один вопрос, App\Framework\Queue\Beanstalk класс, расширяющий Illuminate\Queue\BeanstalkdQueue? - person Joram van den Boezem; 01.03.2015
comment
Да, сэр ... извините, я не сказал этого раньше. У меня также есть некоторая логика внутри этого класса, которая хранит «ключи» заданий, использующих Redis, поэтому я могу сказать, что Queue :: exists ('my.queue.job'); // правда или ложь - person Ryan Hungate; 02.03.2015