Медленная подписка на темы MQ. Распараллеливание не улучшает производительность

Я выполняю ряд подписок с подстановочными знаками (например, /A/# и /B/#). Каждая подписка (см. createSubscriber(topic) ниже) приводит к появлению около 1000 тем и занимает около 10 секунд. Является ли 10 секунд разумным временем отклика? Мне он кажется медленным, но мне не с чем его сравнить.

Учитывая приведенный ниже код;

public class JMSClientSubscriber implements Runnable {

    TopicConnection           topicCon;
    Properties                properties;
    List<MyListener>          listeners;
    JmsTopicConnectionFactory jcf;
    boolean                   connected, alive;

    public JMSClientSubscriber() throws JMSException {
            properties = Properties.getInstance();
            listeners = new LinkedList<>();
            jcf = FLOWConnectionFactory.getTopicFactory(properties, Location.CLIENT);
            connected = false;
            alive = true;
    }

    @Override
    public void run() {
            try {
                    connect();
                    while (alive) {
                            Thread.sleep(1000);
                    }
                    disconnect();
            } catch (Exception e) {
                    e.printStackTrace();
            }
    }

    void connect() throws Exception {
            connected = false;
            topicCon = jcf.createTopicConnection();

            topicCon.setExceptionListener(new ExceptionListener() {
                    @Override public void onException(JMSException arg0) {
                            disconnect();
                            try {
                                    Thread.sleep(1000);
                                    connect();
                            } catch (Exception e) {
                                    e.printStackTrace();
                            }
                    }
            });

            topicCon.start();

            for (MyListener listener: listeners) { 
                    Thread t = new Thread() {
                            @Override public void run() {
                                    TopicSession topicSes;
                                    try {
                                            topicSes = topicCon.createTopicSession(false, Session.DUPS_OK_ACKNOWLEDGE);
                                            Topic topic = topicSes.createTopic(listener.exampleMessage.getTopicSubscription());
                                            System.out.println(new Date() + " Subscribing to " + topic);
    /* THIS TAKES 10 SECONDS! */            TopicSubscriber topicSub = topicSes.createSubscriber(topic);
                                            System.out.println(new Date() + " Subscription finished " + topic);
                                            topicSub.setMessageListener(listener);
                                    } catch (Exception e) {
                                            e.printStackTrace();
                                    }
                            }
                    };
                    t.start();
            }
            connected = true;
    }

    void disconnect() {
            try {
                    connected = false;
                    if (topicCon != null) topicCon.close();
            } catch (JMSException e) {}    
    }

    public void stop() { alive = false; }

    public class MyListener implements MessageListener {           
            Class<? extends FlowMessage>       expectedClass;
            FlowMessage                        exampleMessage;

            public MyListener(Class<? extends FlowMessage> expectedClass) throws Exception {
                    this.expectedClass = expectedClass;
                    exampleMessage = expectedClass.newInstance();
                    listeners.add(this);
            }

            @Override
            public void onMessage(javax.jms.Message arg0) {
                    BytesMessage bm = (BytesMessage) arg0;

                    try {
                            byte bytes[] = new byte[(int) bm.getBodyLength()];
                            bm.readBytes(bytes);
                            FlowMessage flowMessage = exampleMessage.newInstance(bytes);
                            System.out.println(new Date() + "[" + bm.getJMSDestination() + "] " + flowMessage.toString());

                    } catch (Exception e) {
                            e.printStackTrace();
                    }
            }
    }


    public static void main(String[] args) throws Exception {
            Properties properties = Properties.newInstance(new File("D:\\cc_views\\D253570_ALL_FLOW_DEV\\DealingRoom\\FLOW\\src\\cfg\\flow.properties"));
            LogManager.getLogManager().readConfiguration(new FileInputStream(properties.getPropertyAsFile("logging.properties")));

            /* Thread per connection */
            for (Class<FlowMessage> clazz: new Class[] { KondorCpty.class, KondorPair.class }) {
                    JMSClientSubscriber s = new JMSClientSubscriber();
                    s.new MyListener(clazz);
                    new Thread(s).start();
            }

            /* Thread per session */
            JMSClientSubscriber s = new JMSClientSubscriber();
            s.new MyListener(KondorCpty.class);
            s.new MyListener(KondorPair.class);
            new Thread(s).start();

    }

}

main в этом коде выполняет два теста;

Одно соединение + несколько потоков/сеансов

Tue Sep 13 10:18:50 2016 Subscribing to topic://DRS/OW/Cpty/#
Tue Sep 13 10:18:50 2016 Subscribing to topic://DRS/OW/Pair/#
Tue Sep 13 10:19:00 2016 Subscription finished topic://DRS/OW/Cpty/#
Tue Sep 13 10:19:07 2016 Subscription finished topic://DRS/OW/Pair/#
Tue Sep 13 10:19:08 2016[topic://DRS/OW/Pair/RONGBP] KondorPair 

Многопоточное соединение + один сеанс на поток/соединение

Tue Sep 13 10:22:42 2016 Subscribing to topic://DRS/OW/Pair/#
Tue Sep 13 10:22:42 2016 Subscribing to topic://DRS/OW/Cpty/#
Tue Sep 13 10:22:52 2016 Subscription finished topic://DRS/OW/Cpty/#
Tue Sep 13 10:23:00 2016 Subscription finished topic://DRS/OW/Pair/#
Tue Sep 13 10:23:00 2016[topic://DRS/OW/Pair/RONGBP] KondorPair

Оба теста одинаковы по времени и поведению.

  • Подписка на ~1000 тем занимает ~10 секунд
  • Подписки, кажется, выполняются последовательно, даже если они находятся в разных потоках.
  • Обновления тем появляются только после завершения ВСЕХ подписок.
  • Наличие TopicConnection.start() до или после Subscriptions не влияет на производительность или время поступления первого обновления Topic.

Итак, как мне ускорить это?


person lafual    schedule 12.09.2016    source источник


Ответы (2)


Обратите внимание на следующее:

1) При каждом вызове метода createSession (очереди или темы) данные передаются как от клиента, так и от администратора очередей для настройки среды сеанса JMS. Вы используете удаленное соединение, что означает, что задействован поток данных по сети.

2) Вызов метода createSubscriber включает в себя создание объекта подписки, временной очереди помимо поиска темы, проверки полномочий и т. д. в конце диспетчера очередей.

Можете ли вы показать нам, как вы распараллеливаете соединения/сеансы?

Согласно спецификациям JMS, сеанс не должен быть разделен между потоками. Я бы выделил по одному треду для каждого подписчика, где тред

1) Создает соединение JMS

2) Создает сеанс JMS

3) Создает подписчика

4) Запускает ли JMS Connection.start() запуск доставки сообщений.

person Shashi    schedule 12.09.2016
comment
Привет, спасибо, что нашли время ответить. Я включил полный код. - person lafual; 13.09.2016
comment
Моим первым предложением было бы переместить темуCon.start() после создания подписчиков и настройки слушателей. Это гарантирует, что слушатели готовы получать сообщения, когда вызывается connection.start(), чтобы сообщить провайдеру обмена сообщениями о начале доставки сообщений. - person Shashi; 13.09.2016
comment
Я попробовал это. Без изменений. Тем не менее (а) для возврата каждой подписки требуется 10 секунд, и (б) первое обновление темы получено только после завершения окончательной подписки. (К вашему сведению, я использую JAR-файлы MQClient 7.5 с Java8). - person lafual; 13.09.2016
comment
Я попробую выполнить ваш код и обновить, когда у меня будет какой-то результат - person Shashi; 13.09.2016
comment
Привет, я думаю, что Threading приносит больше вреда, чем пользы. Сначала я выполнил TopicConnection.start(), затем createSubscriber в цикле for. Здесь обновления темы для первой подписки читаются ее MessageListener, в то время как выполняется следующая createSubscriber. - person lafual; 13.09.2016

Проблема была в onMessage. Вместо того, чтобы работать с сообщением здесь, я поместил сообщение на BlockingQueue. Затем несколько отдельных потоков опросили этот BlockingQueue. Это значительно повысило пропускную способность MessageListener и убрало проблему многопоточности с кода JMS/MQ.

person lafual    schedule 31.10.2016