Запланированные сообщения Hornetq не доставляются вовремя

Я пытаюсь использовать API запланированных сообщений hornetq core-api, чтобы доставить сообщение через 30 секунд, очередь не долговечна и определена в файле конфигурации hornetq.

val message:String ={...} //some string
val clientMessage:ClientMessage =session.createMessage(false)
clientMessage.getBodyBuffer.writeString(message)
clientMessage.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + 30000) 
//expecting to deliver the message after 30 seconds
    producer.send(MyQueue,clientMessage)

однако, когда я смотрю журналы, мне кажется, что сообщение отправляется и приходит в ту же секунду. я должен определить что-нибудь еще? я что-то пропустил ?

Добавление кода:

class HornetQMessageTest extends FunSuite with ShouldMatchers {
test("scheduleMessage") {
    def createServerLocator: ServerLocator = {
      var map = new java.util.HashMap[String, Object]
      map.put("host",  "127.0.0.1")
      map.put("port", "5445")
      val transConf = new TransportConfiguration(classOf[NettyConnectorFactory].getName,map)
      val locator = HornetQClient.createServerLocatorWithoutHA(transConf)
      locator.setConfirmationWindowSize(1024^2)//confirmationWindowSize
      locator.setBlockOnDurableSend(false)
      locator.setBlockOnNonDurableSend(false)
      locator.setClientFailureCheckPeriod(5000) //keepAlivePing
      locator.setConnectionTTL(10000) //connection TTL
      locator
    }
    val serverLocator: ServerLocator = createServerLocator
    val sessionFactory: ClientSessionFactory = serverLocator.createSessionFactory()
    val receiverSession = sessionFactory.createSession(true, true, 0)
    val senderSession =sessionFactory.createSession(true, true, 0)
    val queue = "atestq"
    def closeHorentQClient() {
      receiverSession.close()
      senderSession.close()
      sessionFactory.close()
      serverLocator.close()
    }
    senderSession should not be null
    receiverSession should not be null
    val query: QueueQuery = senderSession.queueQuery(new SimpleString(queue))
    if (query == null || !query.isExists) senderSession.createQueue(queue,queue,false)
    val producer: ClientProducer = senderSession.createProducer(queue)
    val message = senderSession.createMessage(false)
      message.getBodyBuffer().writeString("This is my string........")

    val deliverytime = System.currentTimeMillis() + 30000
    message.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, deliverytime)
    println("Message Sent "+deliverytime)
    senderSession.start()
    producer.send(message)
    receiverSession.start()
    val consumer = receiverSession.createConsumer(queue)
    val message2 = consumer.receive(50000)
    val messageBody = message2.getBodyBuffer().readString()
    println("received message: "+messageBody +" after" + ((System.currentTimeMillis()-deliverytime)/1000)+" seconds ")
    message2 should not be  null
    System.currentTimeMillis() should be >= deliverytime
    assert(messageBody == "This is my string........")

    message2.acknowledge()

    // Make sure no more messages
    closeHorentQClient()
}  }

результаты теста:

Message Sent 1392198959686
received message: This is my string........ after-29 seconds 

1392198929758 was not greater than or equal to 1392198959686

person igx    schedule 09.02.2014    source источник


Ответы (1)


Мы используем Scheduled Executor, а в некоторых операционных системах... scheduler.schedule (через 30 секунд) вызывает наш Runnable за меньшее время, чем вы предполагали. На самом деле я довольно часто видел, как эта проблема возникает в Windows.

Убедитесь, что вы понимаете, что время расписания основано на времени сервера (а не на времени клиента). Синхронизируйте время вашего клиента с вашими серверами.

Недавно мы внесли множество улучшений в версию 2.4.0, и я не думаю, что вы снова столкнетесь с этой проблемой, поскольку теперь мы проверяем время и больше не доверяем ScheduledExecutor.

если вы обновите свою ОС, вы не увидите этой проблемы ... обычно это проблема ядра в режиме реального времени и несоблюдение ожиданий.

Или если бы вы могли использовать 2.4.0, который больше не доверяет этому поведению.

Я попробовал этот код, используя 2.2.eap5 на MAC без патча на 2.4.0, и это сработало. который в значительной степени является вашим тестом, преобразованным в java.

   public void testSomething() throws Exception
   {
      // then we create a client as normal
      ClientSessionFactory sessionFactory = createSessionFactory(locator);
      ClientSession receiverSession = sessionFactory.createSession(true, true, 0);
      ClientSession senderSession = sessionFactory.createSession(true, true, 0);
      String queue = "atestq";

      ClientSession.QueueQuery query = senderSession.queueQuery(new SimpleString(queue));
      if (query == null || !query.isExists()) senderSession.createQueue(queue, queue, false);

      ClientProducer producer = senderSession.createProducer(queue);
      ClientMessage message = senderSession.createMessage(false);
      message.getBodyBuffer().writeString("This is my string........");

      long original = System.currentTimeMillis();
      long deliverytime = System.currentTimeMillis() + 30000;
      message.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, deliverytime);
      System.out.println("Message Sent " + deliverytime);
      senderSession.start();
      producer.send(message);
      receiverSession.start();
      ClientConsumer consumer = receiverSession.createConsumer(queue);
      ClientMessage message2 = consumer.receive(50000);
      String messageBody = message2.getBodyBuffer().readString();
      System.out.println("received message: " + messageBody + " after" + ((System.currentTimeMillis() - original) / 1000) + " seconds ");
      assert (messageBody.equals("This is my string........"));

      message2.acknowledge();
   }

Я только что поднял запрос функции в HornetQ для этого.

person Clebert Suconic    schedule 11.02.2014
comment
Спасибо, я добавил тестовый код, который я использую, по-прежнему получая сообщение в то же время без задержки, как вы можете видеть в результатах. - person igx; 12.02.2014
comment
мы используем версию 2.3.0 - person igx; 12.02.2014
comment
Я думаю, что ответил сейчас... ваши данные о редактировании поста помогли мне понять, что вы видели.. - person Clebert Suconic; 12.02.2014
comment
обновил версию до 2.4.0.final, но все равно тот же результат, можете ли вы попробовать запустить тест, который я написал? может что-то не так с тестовым кодом? - person igx; 13.02.2014
comment
Вы запускаете это на другом сервере? Время на сервере такое же, как на клиенте? Планировщик основан на времени сервера, а не на клиентах. - person Clebert Suconic; 13.02.2014
comment
Я преобразовал ваш тест в Java, и он работал нормально... Я отредактировал свой ответ с вашим тестом. - person Clebert Suconic; 13.02.2014
comment
Единственное, что было неправильно, это использование неправильной переменной в последнем System.out. (ищите оригинал) System.out.println(получено сообщение: + messageBody + after + ((System.currentTimeMillis() - original) / 1000) + секунды); - person Clebert Suconic; 13.02.2014
comment
Я только что добавил абзац о времени сервера. Убедитесь, что ваше время синхронизировано. - person Clebert Suconic; 13.02.2014
comment
,Спасибо. хорошо понял . поэтому я должен получить синхронизацию часов между клиентом и сервером. есть ли в любом случае установка задержки, а не установка времени доставки? что-то вроде now() + 30000? - person igx; 14.02.2014
comment
если бы вы могли принять ответ, а также? (кажется вы только проголосовали) - person Clebert Suconic; 14.02.2014
comment
давайте продолжим обсуждение в чате - person Clebert Suconic; 14.02.2014