Подключение MQTT к ActiveMQ TOPIC занимает много времени в приемнике событий WSO2 CEP

Обновить

В исходном коде MQTTAdapterListener проекта carbon-analytics-common как это ссылка

run() будет спать поток, и я считаю, что это причина того, что соединение MQTT займет так много времени.

@Override
public void run() {
    while (!connectionSucceeded) {
        try {
            MQTTEventAdapterConstants.initialReconnectDuration = MQTTEventAdapterConstants.initialReconnectDuration
                    * MQTTEventAdapterConstants.reconnectionProgressionFactor;
            Thread.sleep(MQTTEventAdapterConstants.initialReconnectDuration);
            startListener();
            connectionSucceeded = true;
            log.info("MQTT Connection successful");
        } catch (InterruptedException e) {
            log.error("Interruption occurred while waiting for reconnection", e);
        } catch (MqttException e) {
            log.error("MQTT Exception occurred when starting listener", e);

        }

    }
}

InitialReconnectDuration и reconnectionProgressionFactor указаны ниже в MQTTEventAdapterConstants

public static int initialReconnectDuration = 10000;
public static final int reconnectionProgressionFactor = 2;

Если у меня 12 приемников с MQTT, 12-й будет спать 40960 секунд. Кажется, что нет возможности изменить эти две константы? Есть ли способ решить эту проблему? Почему соединение MQTT переводится в спящий поток таким образом?


Мы используем WSO2 CEP версии 4.0.0 и ActiveMQ версии 5.13.0. Развернутый приемник событий выглядит следующим образом

<?xml version="1.0" encoding="UTF-8"?>
<eventReceiver name="5718a6b1851cb3474c6f03c2_PROBE_DATA_RECEIVER"
    statistics="disable" trace="disable" xmlns="http://wso2.org/carbon/eventreceiver">
    <from eventAdapterType="mqtt">
        <property name="clientId">5718a6b1851cb3474c6f03c2</property>
        <property name="topic">PROBE_DATA_571844f5851cb3474c6f0391_57184581851cb3474c6f0394_Topic</property>
        <property name="cleanSession">false</property>
        <property name="url">tcp://127.0.0.1:1883</property>
    </from>
    <mapping customMapping="enable" type="json">
        <property>
            <from jsonPath="$.event.payloadData.dyna.Speed3"/>
            <to name="speed" type="double"/>
        </property>
    </mapping>
    <to streamName="5718a6b1851cb3474c6f03c2_PROBE_DATA_IS" version="1.0.0"/>
</eventReceiver>

ClientId - это случайный идентификатор объекта плана выполнения этого события во внутренней базе данных. Тема существует в ActiveMQ, и я уверен, что сообщение было добавлено в нее. Свойство чистого сеанса имеет значение false для длительной подписки.

Но приемнику всегда требуется много времени для подключения к ActiveMQ TOPIC, независимо от того, после перезапуска WSO2 CEP или развертывания нового приемника.

[2016-04-24 01:07:59,018]  INFO {org.wso2.carbon.event.processor.manager.core.internal.CarbonEventManagementService} -  Starting polling event receivers
[2016-04-24 01:07:59,019]  INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} -  Connecting receiver 56f66bcc7a22442328cd229a_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,020]  INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} -  Connecting receiver 5718a6b1851cb3474c6f03c2_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,021]  INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} -  Connecting receiver 57144fa0851cb33590672418_X_DATA_RECEIVER
[2016-04-24 01:07:59,022]  INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} -  Connecting receiver 5714b5f4851cb3338c26a1cc_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,022]  INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} -  Connecting receiver 571b9ef4851cb356e04ec90b_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,023]  INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} -  Connecting receiver 5714a305851cb3338c26a1c4_DELIVERY_ORDER_RECEI
VER
[2016-04-24 01:07:59,023]  INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} -  Connecting receiver 56f6b03e7a22440c30855015_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,024]  INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} -  Connecting receiver 56f665f27a22442328cd2299_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,024]  INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} -  Connecting receiver 5718a5d7851cb3474c6f03bf_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,025]  INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} -  Connecting receiver 56ef98137a22442e28360c0b_DELIVERY_ORDER_RECEI
VER
[2016-04-24 01:07:59,026]  INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} -  Connecting receiver 571747f2851cb3338c26a1da_PROBE_DATA_RECEIVER
[2016-04-24 01:07:59,026]  INFO {org.wso2.carbon.event.input.adapter.core.internal.InputAdapterRuntime} -  Connecting receiver 56f66c657a22442328cd229c_PROBE_DATA_RECEIVER
[2016-04-24 01:08:19,044]  INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} -  MQTT Connection successful
[2016-04-24 01:08:39,041]  INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} -  MQTT Connection successful
[2016-04-24 01:09:19,034]  INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} -  MQTT Connection successful
[2016-04-24 01:10:39,037]  INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} -  MQTT Connection successful
[2016-04-24 01:13:19,046]  INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} -  MQTT Connection successful
[2016-04-24 01:18:39,035]  INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} -  MQTT Connection successful
[2016-04-24 01:29:19,038]  INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} -  MQTT Connection successful
[2016-04-24 01:50:39,036]  INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} -  MQTT Connection successful
[2016-04-24 02:33:19,039]  INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} -  MQTT Connection successful
[2016-04-24 03:58:39,043]  INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} -  MQTT Connection successful
[2016-04-24 06:49:19,038]  INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} -  MQTT Connection successful
[2016-04-24 12:30:39,040]  INFO {org.wso2.carbon.event.input.adapter.mqtt.internal.util.MQTTAdapterListener} -  MQTT Connection successful

Всего получателей 12, WSO2 начинает опрос в 01:07:59. Примерно через 10 минут только 6 из 12 получателей подключились успешно, для остальных потребуется около 10 или более часов.

Кто-нибудь знает, почему MQTT приемника WSO2 CEP так медленно подключается?


person Bruce    schedule 23.04.2016    source источник


Ответы (1)


Не существует готового способа настройки продолжительности повторного подключения и значений коэффициента прогрессирования. Но поскольку вы уже разобрались с кодом, вы можете исправить этот компонент (либо прочитать эти значения из файла конфигурации, либо настроить его через интерфейс адаптера или просто значения жесткого кода) и применить исправление как описан здесь, чтобы преодолеть это ограничение.

person Rajeev Sampath    schedule 26.04.2016
comment
Привет, Раджив, я очень признателен за твою прекрасную идею и ответ. Я проверил, что WSO2CEP, который я использовал, - это версия 4.0.0, а MQTT в его репозитории \ components \ plugins - org.wso2.carbon.event.input.adapter.mqtt_5.0.3.jar. Итак, я проверил исходный код тега v5.0.3 проекта carbon-analytics-common, изменил код и maven построил модуль org.wso2.carbon.event.input.adapter.mqtt. Наконец, у меня есть org.wso2.carbon.event.input.adapter.mqtt-5.0.3.jar! - person Bruce; 26.04.2016
comment
Но я все еще не могу понять, как исправить WSO2CEP. Патч WSO2 - это просто файл .jar, и поместите его в ‹PRODUCT_HOME› / repository / components / patches /? Я сделал это и перезапустил сервер WSO2CEP, но он не работает с параметром -DapplyPatches. - person Bruce; 26.04.2016
comment
Кстати, из-за имени файла, которое я создал, org.wso2.carbon.event.input.adapter.mqtt-5.0.3.jar, а в плагинах WSO2CEP - org.wso2.carbon.event.input.adapter. mqtt_5.0.3.jar. Поэтому я переименовал свой встроенный jar в org.wso2.carbon.event.input.adapter.mqtt_5.0.3.jar и поместил его в папку с патчами. - person Bruce; 26.04.2016
comment
вам нужно будет переименовать файл jar, аналогичный jar в папке плагинов. также создайте папку вроде patch0999 в ‹cep› / repository / component / patches и поместите свою банку в эту папку. затем перезапустите сервер, и он будет автоматически применен - person Rajeev Sampath; 26.04.2016
comment
если вы хотите проверить, правильно ли он применен, проверьте md5sum jar в patches / patch0999 и jar в плагинах после перезапуска сервера. - person Rajeev Sampath; 26.04.2016
comment
Спасибо за ответ, попробовал переименовать банку и положить в папку patch0001, работает отлично! Я также обнаружил, что WSO2 просто заменит банку в папку плагинов и создаст резервную копию исходного файла в patch0000, поэтому, если я просто заменю банку в папку плагинов, она также будет работать правильно. Еще раз спасибо. - person Bruce; 26.04.2016