Соединение Rabbitmq внезапно закрывается на локальном хосте

У нас возникла проблема с внезапным закрытием соединения с rabbitmq в сценарии разработки / тестирования localhost. В нашей среде разработки у нас есть rabbitmq, установленный на каждом компьютере разработчиков с Windows 7, и мы подключаемся к нему с помощью java-клиента через библиотеку Spring AMQP. Некоторое время все работает нормально, но в какой-то момент соединение обрывается со следующим сообщением в журнале rabbitmq:

=WARNING REPORT==== 4-Jan-2016::14:39:37 ===
closing AMQP connection <0.3731.0> (127.0.0.1:50792 -> 127.0.0.1:5672):
connection_closed_abruptly

В клиентском журнале есть такое исключение:

04/01/2016 14:39:37.181 (AMQP Connection 127.0.0.1:5672) ERROR [CachingConnectionFactory] Channel shutdown: connection error
04/01/2016 14:39:38.188 (SimpleAsyncTaskExecutor-2) WARN  [SimpleMessageListenerContainer] Consumer raised exception, processing can restart if the connection factory supports it
com.rabbitmq.client.ShutdownSignalException: connection error
    at com.rabbitmq.client.impl.AMQConnection.startShutdown(AMQConnection.java:723)
    at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:713)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:571)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketException: Unrecognized Windows Sockets error: 0: recv failed
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.read(SocketInputStream.java:152)
    at java.net.SocketInputStream.read(SocketInputStream.java:122)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:235)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:254)
    at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
    at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
    at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:139)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:536)
    ... 1 more

При запуске клиента мы создаем динамическую очередь и присоединяем к ней несколько потребителей. Это разрушение соединения приводит к удалению очереди, и любая последующая попытка воссоздать ее завершается неудачей за следующим исключением:

04/01/2016 14:39:38.239 (SimpleAsyncTaskExecutor-8) WARN  [BlockingQueueConsumer] Queue declaration failed; retries left=3
org.springframework.amqp.rabbit.listener.BlockingQueueConsumer$DeclarationException: Failed to declare queue(s):[MY_TEST_QU]
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:571)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:470)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1165)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:885)
    at com.rabbitmq.client.impl.ChannelN.queueDeclarePassive(ChannelN.java:61)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$CachedChannelInvocationHandler.invoke(CachingConnectionFactory.java:704)
    at com.sun.proxy.$Proxy77.queueDeclarePassive(Unknown Source)
    at org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations(BlockingQueueConsumer.java:550)
    ... 3 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'MY_TEST_QU' in vhost '/', class-id=50, method-id=10)
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:361)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:226)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
    ... 12 more
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'MY_TEST_QU' in vhost '/', class-id=50, method-id=10)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:484)
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:321)
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144)
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91)
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:554)
    ... 1 more

У нас была эта проблема с rabbitmq 3.4.x и 3.5.x и Spring AMQP 1.3.x и 1.5.x. Интересно то, что этого никогда не происходит в нашей тестовой и производственной среде, где rabbitmq установлен на отдельном сервере.

Любая помощь горячо приветствуется.


person user3739116    schedule 04.01.2016    source источник


Ответы (1)


org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.attemptPassiveDeclarations() не работает, когда больше нет очереди для прослушивания.

Это не очередь recreation, это просто PassiveDeclarations. Итак, если вы отбрасываете эту «динамическую очередь» вручную, вам также следует воссоздать ее вручную.

Я имею в виду, что если вы не следуете RabbitAdmin bean-контракту, у вас нет выбора, если вы не воссоздаете его вручную. В этом случае вам не нужно беспокоиться о слушателе: он правильно переподключится, когда очередь вернется.

ОБНОВЛЕНИЕ

Начиная с версии 1.5 Spring AMQP предоставляет ListenerContainerConsumerFailedEvent, который выдается ровно после всех attemptPassiveDeclarations() попыток. Итак, вы можете поймать этот ApplicationEvent, stop() ваш контейнер слушателя, объявить queue обратно и start() контейнер снова.

Если ваша очередь не является bean-компонентом, она не будет повторно объявлена ​​автоматически.

person Artem Bilan    schedule 04.01.2016
comment
Я понятия не имею, почему соединение обрывается, но @artem говорит, что если вы добавите очередь как bean-компонент и добавите RabbitAdmin в контекст, он позаботится об автоматическом повторном объявлении очереди для вас при повторном подключении -учредил. См. справочное руководство. В качестве альтернативы не используйте очереди с автоматическим удалением или эксклюзивные очереди. - person Gary Russell; 05.01.2016
comment
Мы используем RabbitAdmin для объявления очереди, но она не является bean-компонентом. Вот код Queue tempQ = new Queue(myQueueName, false, true, true); amqpAdmin.declareQueue(tempQ); , и этот код запускается при запуске. - person user3739116; 05.01.2016
comment
Спасибо @Artem и @Gary. После некоторого покопания кажется, что проблема обрыва соединения с localhost в Windows 7 связана с тем, что на наших машинах включены как IPv4, так и IPv6, а также есть java.net.preferIPv4Stack=true, который требуется для чего-то еще в нашем приложении. Когда я удалил свойство java.net.preferIPv4Stack, проблема с подключением исчезла, но нам все равно нужно сохранить свойство. Таким образом, мы должны смириться с проблемой подключения и исправить проблему с доступом к динамической очереди. - person user3739116; 06.01.2016
comment
Поскольку наш контейнер слушателя - это bean-компонент, пишущий код для перехвата ApplicationEvent и остановки, объявление очереди и запуска немного ошеломляет. Мы пойдем с @ Gary, чтобы использовать автоматическое удаление и исключительное false в определении очереди. - person user3739116; 06.01.2016