Работать с двумя отдельными экземплярами Redis с помощью sidekiq?

Добрый день,

У меня есть два отдельных, но связанных приложения. У них обоих должны быть свои фоновые очереди (читай: отдельные процессы Sidekiq и Redis). Однако я хотел бы иногда иметь возможность помещать задания в очередь app2 из app1.

С точки зрения простой очереди/передачи это было бы легко сделать, если бы app1 не имел существующего стека Sidekiq/Redis:

# In a process, far far away

# Configure client 
Sidekiq.configure_client do |config|
  config.redis = { :url => 'redis://redis.example.com:7372/12', :namespace => 'mynamespace' }
end

# Push jobs without class definition 
Sidekiq::Client.push('class' => 'Example::Workers::Trace', 'args' => ['hello!'])

# Push jobs overriding default's 
Sidekiq::Client.push('queue' => 'example', 'retry' => 3, 'class' =>     'Example::Workers::Trace', 'args' => ['hello!'])

Однако, учитывая, что я уже вызвал Sidekiq.configure_client и Sidekiq.configure_server из app1, вероятно, здесь есть промежуточный шаг, когда что-то должно произойти.

Очевидно, я мог бы просто взять код сериализации и нормализации прямо из Sidekiq и вручную добавить в очередь Redis app2, но это кажется ненадежным решением. Я хотел бы иметь возможность использовать функциональность Client.push.

Я полагаю, что моим идеальным решением было бы что-то вроде:

SidekiqTWO.configure_client { remote connection..... } SidekiqTWO::Client.push(job....)

Или даже:

$redis_remote = remote_connection.....

Sidekiq::Client.push(job, $redis_remote)

Очевидно, немного шутливо, но это мой идеальный вариант использования.

Спасибо!


person Brandon    schedule 07.02.2013    source источник
comment
Я предлагаю вознаграждение в 200 баллов за это — меня интересует это в более общем плане с точки зрения разрешения одному клиенту sidekiq «циклически перебирать» сообщения в два разных экземпляра Redis для целей высокой доступности и отказоустойчивости.   -  person Kevin Bedell    schedule 08.11.2013


Ответы (3)


Во-первых, Согласно Часто задаваемые вопросы, «Формат сообщения Sidekiq довольно прост и стабильен: это просто хэш в формате JSON». Акцент мой: я не думаю, что отправка JSON в sidekiq слишком сложна для выполнения. Особенно, когда вам нужен детальный контроль над тем, на какой экземпляр Redis вы отправляете задания, как в ситуации с OP, я, вероятно, просто написал бы небольшую оболочку, которая позволила бы мне указать экземпляр Redis вместе с поставленным в очередь заданием.

Для более общей ситуации Кевина Беделла с циклическим перебором заданий в экземплярах Redis я полагаю, что вы не хотите контролировать, какой экземпляр Redis используется — вы просто хотите поставить в очередь и иметь распределение будет управляться автоматически. Похоже, что пока что это запросил только один человек, и они придумали решение, которое использует Redis::Distributed:

datastore_config = YAML.load(ERB.new(File.read(File.join(Rails.root, "config", "redis.yml"))).result)

datastore_config = datastore_config["defaults"].merge(datastore_config[::Rails.env])

if datastore_config[:host].is_a?(Array)
  if datastore_config[:host].length == 1
    datastore_config[:host] = datastore_config[:host].first
  else
    datastore_config = datastore_config[:host].map do |host|
      host_has_port = host =~ /:\d+\z/

      if host_has_port
        "redis://#{host}/#{datastore_config[:db] || 0}"
      else
        "redis://#{host}:#{datastore_config[:port] || 6379}/#{datastore_config[:db] || 0}"
      end
    end
  end
end

Sidekiq.configure_server do |config|
  config.redis = ::ConnectionPool.new(:size => Sidekiq.options[:concurrency] + 2, :timeout => 2) do
    redis = if datastore_config.is_a? Array
      Redis::Distributed.new(datastore_config)
    else
      Redis.new(datastore_config)
    end

    Redis::Namespace.new('resque', :redis => redis)
  end
end

Еще одна вещь, которую следует учитывать в своем стремлении добиться высокой доступности и отказоустойчивости, — это получить Sidekiq Pro, который включает в себя функции обеспечения надежности: Клиент Pro может выдерживать временные сбои Redis. Он будет локально ставить задания в очередь в случае ошибки и пытаться доставить эти задания после восстановления подключения». Поскольку sidekiq в любом случае предназначен для фоновых процессов, короткая задержка в случае сбоя экземпляра Redis не должна повлиять на ваше приложение. Если один из двух ваших экземпляров Redis выйдет из строя и вы используете циклический перебор, вы все равно потеряете некоторые задания, если не используете эту функцию.

person carols10cents    schedule 10.11.2013
comment
Спасибо за этот хорошо изученный ответ! - person Kevin Bedell; 10.11.2013

Как говорит carols10cents, это довольно просто, но поскольку мне всегда нравится инкапсулировать возможности и иметь возможность повторно использовать их в других проектах, я обновил идею из блог от Hotel Tonight. Это следующее решение улучшает Hotel Tonight's, которое не выдерживает предварительный загрузчик Rails 4.1 и Spring.

В настоящее время я добавляю следующие файлы в lib/remote_sidekiq/:

remote_sidekiq.rb

class RemoteSidekiq
  class_attribute :redis_pool
end

remote_sidekiq_worker.rb

require 'sidekiq'
require 'sidekiq/client'

module RemoteSidekiqWorker
  def client
    pool = RemoteSidekiq.redis_pool || Thread.current[:sidekiq_via_pool] || Sidekiq.redis_pool
    Sidekiq::Client.new(pool)
  end

  def push(worker_name, attrs = [], queue_name = "default")
    client.push('args' => attrs, 'class' => worker_name, 'queue' => queue_name)
  end
end

Вам нужно создать инициализатор, который устанавливает redis_pool

config/initializers/remote_sidekiq.rb

url = ENV.fetch("REDISCLOUD_URL")
namespace = 'primary'

redis = Redis::Namespace.new(namespace, redis: Redis.new(url: url))

RemoteSidekiq.redis_pool = ConnectionPool.new(size: ENV['MAX_THREADS'] || 6) { redis }

РЕДАКТИРОВАНИЕ Алекса:

В никогда версиях sidekiq вместо строк:

redis = Redis::Namespace.new(namespace, redis: Redis.new(url: url))

RemoteSidekiq.redis_pool = ConnectionPool.new(size: ENV['MAX_THREADS'] || 6) { redis }

использовать строки:

redis_remote_options = {
  namespace: "yournamespace",
  url: ENV.fetch("REDISCLOUD_URL")
}

RemoteSidekiq.redis_pool = Sidekiq::RedisConnection.create(redis_remote_options)

Затем вы можете просто установить модуль include RemoteSidekiqWorker в любом месте. Дело сделано!

**** ДЛЯ БОЛЬШИХ ПОМЕЩЕНИЙ ****

Добавление моделей RemoteWorker дает дополнительные преимущества:

  1. Вы можете повторно использовать RemoteWorkers везде, включая систему, которая имеет доступ к целевым помощникам. Это прозрачно для вызывающего абонента. Чтобы использовать форму «RemoteWorkers» в целевой системе sidekiq, просто не используйте инициализатор, поскольку по умолчанию он будет использовать локальный клиент Sidekiq.
  2. Использование RemoteWorkers гарантирует, что всегда отправляются правильные аргументы (код = документация)
  3. Масштабирование путем создания более сложных архитектур Sidekiq прозрачно для вызывающей стороны.

Вот пример RemoteWorker

class RemoteTraceWorker
  include RemoteSidekiqWorker
  include ActiveModel::Model

  attr_accessor :message

  validates :message, presence: true

  def perform_async
    if valid?
      push(worker_name, worker_args)
    else
      raise ActiveModel::StrictValidationFailed, errors.full_messages
    end
  end

  private

  def worker_name
    :TraceWorker.to_s
  end

  def worker_args
    [message]
  end
end
person ARO    schedule 27.04.2015
comment
Красивый ответ - person Aleks; 29.11.2019

Я столкнулся с этим и столкнулся с некоторыми проблемами, потому что я использую ActiveJob, что усложняет чтение сообщений из очереди.

Основываясь на ответе ARO, вам все равно понадобится установка redis_pool:

remote_sidekiq.rb

class RemoteSidekiq
  class_attribute :redis_pool
end

config/initializers/remote_sidekiq.rb

url = ENV.fetch("REDISCLOUD_URL")
namespace = 'primary'

redis = Redis::Namespace.new(namespace, redis: Redis.new(url: url))

RemoteSidekiq.redis_pool = ConnectionPool.new(size: ENV['MAX_THREADS'] || 6) { redis }

Теперь вместо воркера создадим адаптер ActiveJob для постановки запроса в очередь:

lib/active_job/queue_adapters/remote_sidekiq_adapter.rb

require 'sidekiq'

module ActiveJob
  module QueueAdapters
    class RemoteSidekiqAdapter
      def enqueue(job)
        #Sidekiq::Client does not support symbols as keys
        job.provider_job_id = client.push \
          "class"   => ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper,
          "wrapped" => job.class.to_s,
          "queue"   => job.queue_name,
          "args"    => [ job.serialize ]
      end

      def enqueue_at(job, timestamp)
        job.provider_job_id = client.push \
          "class"   => ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper,
          "wrapped" => job.class.to_s,
          "queue"   => job.queue_name,
          "args"    => [ job.serialize ],
          "at"      => timestamp
      end

      def client
        @client ||= ::Sidekiq::Client.new(RemoteSidekiq.redis_pool)
      end
    end
  end
end

Теперь я могу использовать адаптер для очереди событий:

require 'active_job/queue_adapters/remote_sidekiq_adapter'

class RemoteJob < ActiveJob::Base
  self.queue_adapter = :remote_sidekiq

  queue_as :default

  def perform(_event_name, _data)
    fail "
      This job should not run here; intended to hook into
      ActiveJob and run in another system
    "
  end
end

Теперь я могу поставить задание в очередь, используя обычный API ActiveJob. Какое бы приложение ни считывало это из очереди, для выполнения действия должно быть доступно соответствующее RemoteJob.

person Wheeyls    schedule 16.10.2016