Как мне справиться с этим вариантом использования с помощью EventMachine?

У меня есть приложение, которое реагирует на сообщения, отправленные клиентами. Одно сообщение — reload_credentials, которое приложение получает каждый раз, когда регистрируется новый клиент. Затем это сообщение подключается к базе данных PostgreSQL, выполняет запрос всех учетных данных, а затем сохраняет их в обычном хеше Ruby ( client_id => client_token ).

Некоторые другие сообщения, которые может получать приложение, — это start,stop,pause, которые используются для отслеживания времени некоторых сеансов. Я хочу сказать, что я представляю, как приложение работает следующим образом:

  • клиент отправляет сообщение
  • сообщение ставится в очередь
  • очередь обрабатывается

Но, например, я не хочу блокировать реактор. Кроме того, давайте представим, что у меня есть сообщение reload_credentials, следующее в очереди. Я не хочу, чтобы какое-либо другое сообщение из очереди обрабатывалось до тех пор, пока учетные данные не будут перезагружены из БД. Кроме того, пока я обрабатываю определенное сообщение (например, ожидаю завершения запроса учетных данных), я хочу разрешить постановку в очередь других сообщений.

Не могли бы вы направить меня к решению такой проблемы? Я думаю, что мне, возможно, придется использовать em-synchrony, но я не уверен.


person Geo    schedule 06.09.2012    source источник


Ответы (2)


Используйте один из драйверов Postgresql EM или EM.defer, чтобы не блокировать реактор.

Когда вы получаете сообщение «reload_credentials», просто переверните флаг, который приведет к тому, что все последующие сообщения будут поставлены в очередь. После завершения 'reload_credentials' обработайте все сообщения из очереди. После того, как очередь пуста, установите флаг, который заставляет сообщения обрабатываться по мере их получения.

Драйверы EM для Postgresql перечислены здесь: https://github.com/eventmachine/eventmachine/wiki/Protocol-Implementations

module Server
  def post_init
    @queue               = []
    @loading_credentials = false
  end

  def recieve_message(type, data)
    return @queue << [type, data] if @loading_credentials || [email protected]?
    return process_msg(type, data) unless :reload_credentials == type
    @loading_credentials = true
    reload_credentials do
      @loading_credentials = false
      process_queue
    end
  end

  def reload_credentials(&when_done)
    EM.defer( proc { query_and_load_credentials }, when_done )
  end


  def process_queue
    while (type, data = @queue.shift)
      process_msg(type, data)
    end
  end

  # lots of other methods
end

EM.start_server(HOST, PORT, Server)

Если вы хотите, чтобы все соединения помещались в очередь сообщений всякий раз, когда какое-либо соединение получает сообщение «reload_connections», вам придется координировать свои действия через собственный класс.

person simulacre    schedule 08.09.2012
comment
Но сообщение reload_credentials может быть получено несколько раз. Разве не должно быть 2 темы? Тот, который продолжает стоять в очереди, и тот, который обрабатывает? - person Geo; 09.09.2012
comment
Да, если reload_credentials получено во время обработки другого reload_credentials, оно будет поставлено в очередь, как и другие сообщения. - person simulacre; 10.09.2012
comment
Несколько сообщений reload_credentials должны обрабатываться так же, как и первое. Помещая reload_credentials в блок EM.defer, вы выполняете его в другом потоке. Пока ваш «обрабатывающий» код не блокирует, вы будете продолжать получать сообщения. Используйте библиотеки, совместимые с EM, чтобы убедиться, что вы не блокируете. В качестве альтернативы используйте EM.defer для обработки. - person simulacre; 10.09.2012

Я полагаю, что что-то вроде вашей текущей реализации:

    class Worker
      def initialize queue
        @queue = queue
        dequeue
      end

      def dequeue
        @queue.pop do |item|
          begin
            work_on item
          ensure
            dequeue
          end
        end
      end

      def work_on item
        case item.type
        when :reload_credentials
          # magic happens here
        else
          # more magic happens here
        end
      end
    end


    q = EM::Queue.new

    workers = Array.new(10) { Worker.new q }

Проблема выше, если я вас правильно понимаю, заключается в том, что вы не хотите, чтобы работники работали над новыми заданиями (заданиями, которые поступили раньше на временной шкале производителя), чем над любыми заданиями reload_credentials. Следующее должно обслуживать это (дополнительные слова предостережения в конце).

    class Worker
      def initialize queue
        @queue = queue
        dequeue
      end

      def dequeue
        @queue.pop do |item|
          begin
            work_on item
          ensure
            dequeue
          end
        end
      end

      def work_on item
        case item.type
        when :reload_credentials
          # magic happens here
        else
          # more magic happens here
        end
      end
    end

    class LockingDispatcher
      def initialize channel, queue
        @channel = channel
        @queue = queue

        @backlog = []
        @channel.subscribe method(:dispatch_with_locking)

        @locked = false
      end

      def dispatch_with_locking item
        if locked?
          @backlog << item
        else
          # You probably want to move the specialization here out into a method or
          # block that's passed into the constructor, to make the lockingdispatcher
          # more of a generic processor
          case item.type
          when :reload_credentials
            lock
            deferrable = CredentialReloader.new(item).start
            deferrable.callback { unlock }
            deferrable.errback  { unlock }
          else
            dispatch_without_locking item
          end
        end
      end

      def dispatch_without_locking item
        @queue << item
      end

      def locked?
        @locked
      end

      def lock
        @locked = true
      end

      def unlock
        @locked = false
        bl = @backlog.dup
        @backlog.clear
        bl.each { |item| dispatch_with_locking item }
      end

    end

    channel = EM::Channel.new
    queue = EM::Queue.new

    dispatcher = LockingDispatcher.new channel, queue

    workers = Array.new(10) { Worker.new queue }

Таким образом, входные данные для первой системы поступают на q, а в этой новой системе — на channel. queue по-прежнему используется для распределения работы между работниками, но queue не заполняется, пока выполняется операция обновления учетных данных. К сожалению, поскольку я не тратил больше времени, я не обобщил LockingDispatcher таким образом, чтобы он не был связан с типом элемента и кодом для отправки CredentialsReloader. Я оставлю это вам.

Вы должны отметить здесь, что, хотя это и служит тому, что я понимаю в отношении вашего первоначального запроса, как правило, лучше ослабить требования такого рода. Есть несколько нерешенных проблем, которые по существу не могут быть устранены без изменения этого требования:

  • Система не ожидает завершения выполнения заданий перед запуском заданий учетных данных.
  • Система будет очень плохо обрабатывать пакеты учетных данных — другие элементы, которые могут быть обработаны, не будут обрабатываться.
  • В случае ошибки в коде учетных данных невыполненная работа может заполнить оперативную память и привести к сбою. Простого тайм-аута может быть достаточно, чтобы избежать катастрофических последствий, если код можно прервать, а последующие сообщения достаточно обрабатываются, чтобы избежать дальнейших взаимоблокировок.

На самом деле похоже, что у вас есть какое-то представление об идентификаторе пользователя в системе. Если вы продумаете свои требования, вполне возможно, что вам нужно будет занести в журнал только те элементы, которые относятся к идентификатору пользователя, учетные данные которого находятся в состоянии обновления. Это другая проблема, связанная с другим видом диспетчеризации. Попробуйте хэш заблокированных невыполненных работ для этих пользователей с обратным вызовом при завершении учетных данных, чтобы слить эти невыполненные работы в рабочие процессы, или что-то подобное.

Удачи!

person raggi    schedule 09.09.2012