Я пытаюсь использовать API запланированных сообщений hornetq core-api, чтобы доставить сообщение через 30 секунд, очередь не долговечна и определена в файле конфигурации hornetq.
val message:String ={...} //some string
val clientMessage:ClientMessage =session.createMessage(false)
clientMessage.getBodyBuffer.writeString(message)
clientMessage.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, System.currentTimeMillis() + 30000)
//expecting to deliver the message after 30 seconds
producer.send(MyQueue,clientMessage)
однако, когда я смотрю журналы, мне кажется, что сообщение отправляется и приходит в ту же секунду. я должен определить что-нибудь еще? я что-то пропустил ?
Добавление кода:
class HornetQMessageTest extends FunSuite with ShouldMatchers {
test("scheduleMessage") {
def createServerLocator: ServerLocator = {
var map = new java.util.HashMap[String, Object]
map.put("host", "127.0.0.1")
map.put("port", "5445")
val transConf = new TransportConfiguration(classOf[NettyConnectorFactory].getName,map)
val locator = HornetQClient.createServerLocatorWithoutHA(transConf)
locator.setConfirmationWindowSize(1024^2)//confirmationWindowSize
locator.setBlockOnDurableSend(false)
locator.setBlockOnNonDurableSend(false)
locator.setClientFailureCheckPeriod(5000) //keepAlivePing
locator.setConnectionTTL(10000) //connection TTL
locator
}
val serverLocator: ServerLocator = createServerLocator
val sessionFactory: ClientSessionFactory = serverLocator.createSessionFactory()
val receiverSession = sessionFactory.createSession(true, true, 0)
val senderSession =sessionFactory.createSession(true, true, 0)
val queue = "atestq"
def closeHorentQClient() {
receiverSession.close()
senderSession.close()
sessionFactory.close()
serverLocator.close()
}
senderSession should not be null
receiverSession should not be null
val query: QueueQuery = senderSession.queueQuery(new SimpleString(queue))
if (query == null || !query.isExists) senderSession.createQueue(queue,queue,false)
val producer: ClientProducer = senderSession.createProducer(queue)
val message = senderSession.createMessage(false)
message.getBodyBuffer().writeString("This is my string........")
val deliverytime = System.currentTimeMillis() + 30000
message.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, deliverytime)
println("Message Sent "+deliverytime)
senderSession.start()
producer.send(message)
receiverSession.start()
val consumer = receiverSession.createConsumer(queue)
val message2 = consumer.receive(50000)
val messageBody = message2.getBodyBuffer().readString()
println("received message: "+messageBody +" after" + ((System.currentTimeMillis()-deliverytime)/1000)+" seconds ")
message2 should not be null
System.currentTimeMillis() should be >= deliverytime
assert(messageBody == "This is my string........")
message2.acknowledge()
// Make sure no more messages
closeHorentQClient()
} }
результаты теста:
Message Sent 1392198959686
received message: This is my string........ after-29 seconds
1392198929758 was not greater than or equal to 1392198959686