Соединитель Mule AMQP не может повторно поставить сообщение в очередь

Я использую конечную точку AMQP, и она работает (с одной оговоркой, упомянутой здесь [Возможная ошибка с сообществом Mule AMQP transport 3.6.2) в других местах моего приложения Mule.

Я пытаюсь создать надежную очередь SMTP, но возникают проблемы.

Я сделал предположение, что если SMTP-коннектор не сможет работать, он выдаст исключение, поэтому, установив для подтверждения сообщения значение MANUAL и перехватив SMTP-исключения и отклонив (и повторно поставив в очередь) сообщение в стратегии исключения, система должна иметь дело с SMTP. отключения.

Мой код (слегка упрощенный) выглядит так:

<amqp:connector name="connector.amqp.mule.default" ackMode="MANUAL" doc:name="AMQP Connector" validateConnections="true" host="${amiab-rabbitmq-hostname}" port="${amiab-rabbitmq-portnumber}" password="${amiab-rabbitmq-password}" username="${amiab-rabbitmq-username}"/>
<smtp:connector name="smtpConnector" contentType="text/html" doc:name="SMTPConnector"/>
<flow name="utility-smtpFlow">
  <amqp:inbound-endpoint exchangeName="AMQP.DEFAULT.EXCHANGE" queueName="utilitySMTP" exchangeType="direct"  responseTimeout="10000"  doc:name="AMQP-0-9" queueDurable="true"/>
    <logger message="Recevied SMTP Message from Queue" level="INFO" doc:name="Logger"/>
    <smtp:outbound-endpoint host="${smtp-hostname}" user="${smtp-username}" password="{smtp-password}" to="#[flowVars.smtpTo]" subject="Testing Message" responseTimeout="10000" doc:name="SMTP"/>
    <amqp:acknowledge-message doc:name="AMQP-0-9 Acknowledge Message"/>
    <catch-exception-strategy doc:name="Catch Exception Strategy">
        <logger message="SMTP Error, Requeuing" level="INFO" doc:name="Logger"/>
        <amqp:reject-message requeue="true" doc:name="AMQP-0-9 Reject Message"/>
    </catch-exception-strategy>

When my SMTP connector throws an exception, the exception strategy is correctly instigated, but then AMQP throws an error as shown below:

INFO  2015-06-18 16:32:22,383 [HTTP_Listener_AMIAB.worker.01] org.mule.transport.service.DefaultTransportServiceDescriptor: Loading default outbound transformer: org.mule.transport.amqp.internal.transformer.ObjectToAmqpMessage
INFO  2015-06-18 16:32:22,383 [HTTP_Listener_AMIAB.worker.01] org.mule.transport.service.DefaultTransportServiceDescriptor: Loading default response transformer: org.mule.transport.amqp.internal.transformer.ObjectToAmqpMessage
INFO  2015-06-18 16:32:22,387 [HTTP_Listener_AMIAB.worker.01] org.mule.lifecycle.AbstractLifecycleManager: Initialising: 'connector.amqp.mule.default.dispatcher.936721322'. Object is: Dispatcher
INFO  2015-06-18 16:32:22,389 [HTTP_Listener_AMIAB.worker.01] org.mule.lifecycle.AbstractLifecycleManager: Starting: 'connector.amqp.mule.default.dispatcher.936721322'. Object is: Dispatcher
INFO  2015-06-18 16:32:22,446 [[amiab-esb-utility].utility-smtpFlow.stage1.02] org.mule.api.processor.LoggerMessageProcessor: Recevied SMTP Message from Queue
INFO  2015-06-18 16:32:22,457 [[amiab-esb-utility].smtpConnector.dispatcher.01] org.mule.transport.service.DefaultTransportServiceDescriptor: Loading default outbound transformer: org.mule.transport.email.transformers.ObjectToMimeMessage
INFO  2015-06-18 16:32:22,481 [[amiab-esb-utility].smtpConnector.dispatcher.01] org.mule.lifecycle.AbstractLifecycleManager: Initialising: 'smtpConnector.dispatcher.404056326'. Object is: SmtpMessageDispatcher
ERROR 2015-06-18 16:32:22,568 [[amiab-esb-utility].smtpConnector.dispatcher.01] org.mule.exception.CatchMessagingExceptionStrategy: 
********************************************************************************
Message               : Unable to connect to mail transport.
Code                  : MULE_ERROR--2
--------------------------------------------------------------------------------
Exception stack is:
1. mail.lyrical.co.uk (java.net.UnknownHostException)
  java.net.AbstractPlainSocketImpl:178 (null)
2. Unknown SMTP host: mail.lyrical.co.uk (javax.mail.MessagingException)
  com.sun.mail.smtp.SMTPTransport:1704 (http://java.sun.com/j2ee/sdk_1.3/techdocs/api/javax/mail/MessagingException.html)
3. Unable to connect to mail transport. (org.mule.api.endpoint.EndpointException)
  org.mule.transport.email.SmtpMessageDispatcher:67 (http://www.mulesoft.org/docs/site/current3/apidocs/org/mule/api/endpoint/EndpointException.html)
--------------------------------------------------------------------------------
Root Exception stack trace:
java.net.UnknownHostException: mail.lyrical.co.uk
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:178)
    at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    + 3 more (set debug level logging or '-Dmule.verbose.exceptions=true' for everything)
********************************************************************************

INFO  2015-06-17 15:25:47,190 [[amiab-esb-utility].smtpConnector.dispatcher.01] org.mule.api.processor.LoggerMessageProcessor: SMTP Error, Requeuing
WARN  2015-06-17 15:25:47,193 [amqpReceiver.01] org.mule.transport.amqp.internal.endpoint.receiver.MessageReceiverConsumer: Received shutdown signal for consumer tag: amq.ctag-NZ96mnj_oscnpJKAPy16ng, the message receiver will try to restart.
com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=90)
    at com.rabbitmq.client.impl.ChannelN.asyncShutdown(ChannelN.java:478) ~[?:?]
    at com.rabbitmq.client.impl.ChannelN.processAsync(ChannelN.java:315) ~[?:?]
    at com.rabbitmq.client.impl.AMQChannel.handleCompleteInboundCommand(AMQChannel.java:144) ~[?:?]
    at com.rabbitmq.client.impl.AMQChannel.handleFrame(AMQChannel.java:91) ~[?:?]
    at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550) ~[?:?]
    at java.lang.Thread.run(Thread.java:745) [?:1.7.0_75]
INFO  2015-06-17 15:25:47,195 [amqpReceiver.01] org.mule.lifecycle.AbstractLifecycleManager: Stopping: 'null'. Object is: MultiChannelMessageSubReceiver
INFO  2015-06-17 15:25:47,195 [amqpReceiver.01] org.mule.transport.amqp.internal.endpoint.receiver.MultiChannelMessageSubReceiver: Connecting clusterizable message receiver
INFO  2015-06-17 15:25:47,195 [amqpReceiver.01] org.mule.lifecycle.AbstractLifecycleManager: Starting: 'null'. Object is: MultiChannelMessageSubReceiver
INFO  2015-06-17 15:25:47,196 [amqpReceiver.01] org.mule.transport.amqp.internal.endpoint.receiver.MultiChannelMessageSubReceiver: Starting clusterizable message receiver
INFO  2015-06-17 15:25:47,203 [amqpReceiver.01] org.mule.transport.amqp.internal.endpoint.receiver.MultiChannelMessageSubReceiver: Started subscription: amq.ctag-sCCRCnOWKyREyip26pSYDA on channel: AMQChannel(amqp://[email protected]:5672/,3)

Я пропустил что-то очевидное, или я неправильно понимаю, как это следует использовать? Очень благодарен, если кто-нибудь может указать мне в правильном направлении.


person Nich Overend    schedule 17.06.2015    source источник
comment
Не могли бы вы показать немного больше строк журнала, например. из строки «Полученное SMTP-сообщение из очереди»? Если бы вы могли добавить простой <logger level="INFO" /> сразу после него, мы бы увидели свойства сообщения (включая тег доставки). Спасибо.   -  person David Dossot    schedule 17.06.2015
comment
Спасибо, Дэвид. Я отредактировал журнал ошибок, чтобы показать полный журнал.   -  person Nich Overend    schedule 18.06.2015
comment
У меня есть теория, согласно которой acknowledge-message вызывается даже в случае исключения. Не могли бы вы добавить регистратор прямо перед ним, чтобы проверить, верна ли эта теория?   -  person David Dossot    schedule 18.06.2015
comment
Ты прав! Звонят сразу. Это проблема отсека для резьбы. Он подтверждает сообщение немедленно, прежде чем компонент SMTP сможет начать работу и выдать исключение (в новом потоке). Компонент SMTP не работает в режиме запрос-ответ и задерживает поток до его завершения. Любые предложения, чтобы обойти это?   -  person Nich Overend    schedule 18.06.2015
comment
Понятно! Примените стратегию синхронной обработки к основному потоку, и все будет работать как надо, за исключением того, что он повторно ставится в очередь, а затем немедленно повторно доставляется в цикле. Можно ли повторно поставить в очередь инструкцию для брокера AMQP удерживать сообщение в течение определенного периода времени перед повторной доставкой?   -  person Nich Overend    schedule 19.06.2015
comment
Гах! Вы комментируете быстрее, чем я успеваю печатать ответы :$   -  person David Dossot    schedule 19.06.2015
comment
Семантика повторной доставки AMQP не позволяет управлять повторной доставкой со стороны клиента. Это должно быть настроено на вашем брокере AMQP.   -  person David Dossot    schedule 19.06.2015


Ответы (1)


Как вы отметили в своем комментарии, smtp:outbound-endpoint является конечной точкой one-way, ее операция отправки выполняется в другом потоке, поэтому она происходит параллельно с выполнением потока. Поэтому amqp:acknowledge-message вызывается в любом случае, независимо от успеха или неудачи операции smtp:outbound-endpoint.

Вы можете обойти эту проблему, установив processingStrategy="synchronous" для потока: это заставит все взаимодействия с конечной точкой выполняться во входящем потоке.

Бесстыдная вилка: это подробно обсуждается в главе 11.2 книги «Мул в действии», второе издание.

person David Dossot    schedule 18.06.2015
comment
Спасибо, Дэвид, это именно то, откуда я это взял. ;) - person Nich Overend; 19.06.2015