Как включить мост JMS в WebSphere MQ в синхронный вызов с использованием шаблона запрос-ответ?

Я просто имею дело с новым для меня сценарием, который, я думаю, может быть общим для некоторых :)..

В соответствии с требованиями мне нужно создать пользовательский интерфейс, похожий на синхронную онлайн-транзакцию для вызова веб-службы, которая фактически делегирует вызов серии IBM MQ с использованием асинхронного моста JMS-MQ.

Клиент вызывает веб-службу, после чего его сообщение должно быть опубликовано в очереди JMS на сервере приложений, которое будет доставлено в WebSphere MQ, а затем после обработки ответ будет доставлен обратно на сервер приложений в конечной точке очереди FIXED JMS.

Требование относится к этой транзакции, которая должна истечь по тайм-ауту в случае, если WebSphere MQ не доставляет ответ в течение определенного периода времени, после чего веб-служба должна отправить клиенту сигнал тайм-аута и проигнорировать эту транзакцию.

Набросок проблемы следующий.

Мне нужно заблокировать запрос в веб-службе до тех пор, пока не придет ответ или не истечет время ожидания.

Чем я ищу некоторую открытую библиотеку, чтобы помочь мне в этой задаче. Или единственное решение — блокировать поток и продолжать собирать ответы? Может быть, я мог бы реализовать какой-нибудь блок со слушателем, чтобы получать уведомления о получении ответа?

Небольшое обсуждение было бы очень полезно для меня сейчас, чтобы попытаться прояснить свои идеи по этому поводу. Какие-либо предложения?

У меня есть набросок, который, надеюсь, поможет прояснить картину ;)

альтернативный текст


person groo    schedule 09.12.2010    source источник


Ответы (2)


Эй, спасибо за публикацию собственного решения!

Да, Receive() с тайм-аутом — самый элегантный способ в этом случае.

Остерегайтесь того, что происходит с сообщениями, которые не прочитаны из-за тайм-аута. Если ваш клиент снова получит доступ к той же очереди, он может получить устаревшее сообщение.

Следите за тем, чтобы сообщения с тайм-аутом своевременно удалялись (если не по другой причине, то чтобы не забивать очередь необработанными сообщениями).

Вы можете легко сделать это либо с помощью кода (установив время жизни на генераторе сообщений), либо на сервере Websphere MQ (с использованием очередей, срок действия которых истекает автоматически).

Последнее проще, если вы не можете/не хотите изменять MQ-сторону кода. Это то, что я бы сделал :)

person Juffo-Wup    schedule 10.01.2011
comment
Да, у нас есть очередь ошибок, в которой сообщения с истекшим временем ожидания пересылаются, и с ними обрабатывается другой фрагмент кода. - person groo; 11.01.2011

после нескольких дней кодирования я нашел решение для этого. Я использую стандартный EJB3 с аннотациями JAX-WS и стандартным JMS.

Ниже приведен код, который я написал для удовлетворения требований. Это сеансовый компонент без сохранения состояния с транзакцией, управляемой компонентом (BMT), поскольку использование стандартной транзакции, управляемой контейнером (CMT), вызывало какое-то зависание, я полагаю, потому что я пытался поместить оба взаимодействия JMS в ту же транзакцию, что и в тот же метод, поэтому обратите внимание, что мне приходилось начинать и заканчивать транзакции для каждого взаимодействия с очередями JMS. Я использую weblogic для этого решения. И я также закодировал MDB, который в основном потребляет сообщение из конечной точки очереди jms/Pergunta и помещает ответное сообщение в очередь jms/Resposta. Я сделал это, чтобы имитировать ожидаемое поведение на стороне MQ этой проблемы. На самом деле в реальном сценарии у нас, вероятно, было бы какое-то приложение COBOL на мейнфрейме или даже другое приложение Java, обрабатывающее сообщения и помещающее ответ в очередь ответов.

Если кому-то нужно попробовать этот код, в основном все, что вам нужно, это иметь контейнер J2EE5 и настроить 2 очереди с именами jndi: jms/Pergunta и jms/Resposta.

Код EJB/веб-сервиса:

@Stateless
@TransactionManagement(TransactionManagementType.BEAN)
@WebService(name="DJOWebService")
public class DJOSessionBeanWS implements DJOSessionBeanWSLocal {

    Logger log = Logger.getLogger(DJOSessionBeanWS.class.getName());

    @Resource
    SessionContext ejbContext;

    // Defines the JMS connection factory.
    public final static String JMS_FACTORY = "weblogic.jms.ConnectionFactory";

    // Defines request queue
    public final static String QUEUE_PERG = "jms/Pergunta";

    // Defines response queue
    public final static String QUEUE_RESP = "jms/Resposta";


    Context ctx;
    QueueConnectionFactory qconFactory;

    /**
     * Default constructor. 
     */
    public DJOSessionBeanWS() {
        log.info("Construtor DJOSessionBeanWS");
    }

    @WebMethod(operationName = "processaMensagem")
    public String processaMensagem(String mensagemEntrada, String idUnica)
    {
        //gets UserTransaction reference as this is a BMT EJB.
        UserTransaction ut = ejbContext.getUserTransaction();
        try {

            ctx = new InitialContext();
            //get the factory before any transaction it is a weblogic resource.
            qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
            log.info("Got QueueConnectionFactory");
            ut.begin();
            QueueConnection qcon = qconFactory.createQueueConnection();
            QueueSession qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue qs = (Queue) (new InitialContext().lookup("jms/Pergunta"));
            TextMessage message = qsession.createTextMessage("this is a request message");
            message.setJMSCorrelationID(idUnica);
            qsession.createSender(qs).send(message);
            ut.commit();
            qcon.close();
            //had to finish and start a new transaction, I decided also get new references for all JMS related objects, not sure if this is REALLY required
            ut.begin();
            QueueConnection queuecon = qconFactory.createQueueConnection();
            Queue qreceive = (Queue) (new InitialContext().lookup("jms/Resposta"));
            QueueSession queuesession = queuecon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            String messageSelector = "JMSCorrelationID = '" + idUnica + "'";
            //creates que receiver and sets a message selector to get only related message from the response queue.
                    QueueReceiver qr = queuesession.createReceiver(qreceive, messageSelector);
            queuecon.start();
            //sets the timeout to keep waiting for the response...
            TextMessage tresposta = (TextMessage) qr.receive(10000);
            if(tresposta != null)
            {
                ut.commit();
                queuecon.close();
                return(tresposta.toString());
            }
            else{
                //commints anyway.. does not have a response though 
                ut.commit();
                queuecon.close();
                log.info("null reply, returned by timeout..");
                return "Got no reponse message.";
            }



        } catch (Exception e) {
            log.severe("Unexpected error occurred ==>> " + e.getMessage());
            e.printStackTrace();
            try {
                ut.commit();
            } catch (Exception ex) {
                ex.printStackTrace();
            }
            return "Error committing transaction after some other error executing ==> " + e.getMessage();
        } 

    }
}   

И это код для MDB, который издевается над стороной MQ этой проблемы. Во время моих тестов у меня был фрагмент Thread.sleep для имитации и проверки тайм-аута на стороне клиента для проверки решения, но в этой версии его нет.

/**
 * Mock to get message from request queue and publish a new one on the response queue.
 */
@MessageDriven(
        activationConfig = { @ActivationConfigProperty(
                propertyName = "destinationType", propertyValue = "javax.jms.Queue"
        ) }, 
        mappedName = "jms/Pergunta")
public class ConsomePerguntaPublicaRespostaMDB implements MessageListener {

    Logger log = Logger.getLogger(ConsomePerguntaPublicaRespostaMDB.class.getName());

    // Defines the JMS connection factory.
    public final static String JMS_FACTORY = "weblogic.jms.ConnectionFactory";

    // Define Queue de resposta
    public final static String QUEUE_RESP = "jms/Resposta";


    Context ctx;
    QueueConnectionFactory qconFactory;



    /**
     * Default constructor. 
     */
    public ConsomePerguntaPublicaRespostaMDB() {
        log.info("Executou construtor ConsomePerguntaPublicaRespostaMDB");
        try {
            ctx = new InitialContext();
        } catch (NamingException e) {
            e.printStackTrace();
        }
    }

    /**
     * @see MessageListener#onMessage(Message)
     */
    public void onMessage(Message message) {
        log.info("Recuperou mensagem da fila jms/FilaPergunta, executando ConsomePerguntaPublicaRespostaMDB.onMessage");
        TextMessage tm = (TextMessage) message;

        try {
            log.info("Mensagem recebida no onMessage ==>> " + tm.getText());

            //pega id da mensagem na fila de pergunta para setar corretamente na fila de resposta.
             String idMensagem = tm.getJMSCorrelationID();
             log.info("Id de mensagem que sera usada na resposta ==>> " + idMensagem);

            qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
            log.info("Inicializou contexto jndi e deu lookup na QueueConnectionFactory do weblogic com sucesso. Enviando mensagem");
            QueueConnection qcon = qconFactory.createQueueConnection();
            QueueSession qsession = qcon.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue queue = (Queue) (ctx.lookup("jms/Resposta"));
            TextMessage tmessage = qsession.createTextMessage("Mensagem jms para postar na fila de resposta...");
            tmessage.setJMSCorrelationID(idMensagem);
            qsession.createSender(queue).send(tmessage);
        } catch (JMSException e) {
            log.severe("Erro no onMessage ==>> " + e.getMessage());
            e.printStackTrace();
        }  catch (NamingException e) {
            log.severe("Erro no lookup ==>> " + e.getMessage());
            e.printStackTrace();
        }

    }

}

[]s

person groo    schedule 11.12.2010