Клиент Paho Android разрывает соединение при отображении сообщения

Я пытаюсь использовать базовый Eclipse Paho MQTT-клиент версии 1.1.0 для подключения к CloudAMQP Экземпляр RabbitMQ, подпишитесь на тему и получайте сообщения (которые я отправляю из консоли веб-администратора).

Это хорошо работает, если приложение отправляет все полезные данные сообщения в вывод журнала.

Если приложение добавляет сообщение в TextView, оно появляется, но соединение немедленно разрывается, и сообщения больше не принимаются.

Полный проект доступен на GitHub. Ниже приведен простой пример.

Существует клиент MQTT Paho на основе служб, но я подумал, что для очень простых приложений базовый клиент должен иметь возможность получать и отображать сообщения в пользовательском интерфейсе приложения Android.

...

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttClientPersistence;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;

public class MainActivity extends AppCompatActivity implements MqttCallback {

    private static final String TAG = "main";
    private Connection connection;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        Toolbar toolbar = (Toolbar) findViewById(R.id.toolbar);
        setSupportActionBar(toolbar);

        configureUI();
    }

    private Button buttonConnect;
    private TextView messageWindow;


    private void configureUI() {
        buttonConnect = (Button) findViewById(R.id.buttonConnect);
        messageWindow = (TextView) findViewById(R.id.messageWindow);

        buttonConnect.setOnClickListener(new View.OnClickListener() {
            @Override
            public void onClick(View v) {
                String s = "***";
                String d = "test";
                String u = "***";
                String p = "***";

                if (connection != null && connection.isConnected()) {
                    connection.disconnect();
                    connection = null;
                    messageWindow.setText(String.format("Disconnected from server %s",
                            new Object[]{s}));
                    return;
                }

                messageWindow.setText(String.format("Connecting to server %s as user %s",
                        new Object[]{s, u}));

                connection = new Connection(MainActivity.this, MainActivity.this, s, u, p);
                connection.connect();

                if (connection.isConnected()) {
                    messageWindow.append("\n\n");
                    messageWindow.append(String.format("Connected, listening for messages from topic %s",
                            new Object[]{d}));
                    connection.subscribe(d);
                }
            }
        });
    }

    @Override
    public void connectionLost(Throwable cause) {
        Log.e(TAG, "connectionLost" + cause.getMessage());
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        String msg = new String(message.getPayload());
        Log.i(TAG, "Message Arrived: " + msg);
        // messageWindow.append(msg);
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        Log.i(TAG, "Delivery Complete!");
    }

    class Connection {
        private static final String TAG = "conn";
        private static final String protocol = "tcp://";
        private static final int port = 1883;
        private static final int version = MqttConnectOptions.MQTT_VERSION_3_1_1;
        private static final int keepAliveSeconds = 20 * 60;

        private final Context context;
        private MqttClient client;

        private final String server;
        private final String user;
        private final String pass;

        private final MqttConnectOptions options = new MqttConnectOptions();

        public Connection(Context ctx, MqttCallback mqttCallback, String server, String user, String pass) {
            this.context = ctx;
            this.server = server;
            this.user = user;
            this.pass = pass;

            MqttClientPersistence memPer = new MemoryPersistence();
            try {
                String url = protocol + server + ":" + port;
                client = new MqttClient(url, MqttClient.generateClientId(), memPer);
                client.setCallback(mqttCallback);
            } catch (MqttException e) {
                e.printStackTrace();
            }

            options.setUserName(user + ":" + user);
            options.setPassword(pass.toCharArray());
            options.setMqttVersion(version);
            options.setKeepAliveInterval(keepAliveSeconds);
        }

        void connect() {
            Log.i(TAG, "buttonConnect");
            try {
                client.connect(options);
            } catch (MqttException ex) {
                Log.e(TAG, "Connection attempt failed with reason code = " + ex.getReasonCode() + ":" + ex.getCause());
            }
        }

        public boolean isConnected() {
            return client.isConnected();
        }

        public void disconnect() {
            try {
                client.disconnect();
            } catch (MqttException e) {
                Log.e(TAG, "Disconnect failed with reason code = " + e.getReasonCode());
            }
        }

        void subscribe(String dest) {
            try {
                client.subscribe(dest);
            } catch (MqttException e) {
                Log.e(TAG, "Subscribe failed with reason code = " + e.getReasonCode());
            }
        }
    }
}

person mjn    schedule 02.08.2016    source источник
comment
соединение принимается сразу после отображения сообщения. Вы имеете в виду, что соединение немедленно разрывается ...   -  person hardillb    schedule 02.08.2016
comment
@hardillb, я исправил это, большое спасибо, что заметили!   -  person mjn    schedule 02.08.2016


Ответы (1)


Я предполагаю, что это связано с тем, что вы пытаетесь обновить TextView из потока пользовательского интерфейса без пользовательского интерфейса.

Попробуйте заключить messageWindow.append(msg); в runOnUiThread вызов.

public void messageArrived(String topic, MqttMessage message) throws Exception {
    String msg = new String(message.getPayload());
    Log.i(TAG, "Message Arrived: " + msg);
    runOnUiThread(new Runnable(){
       public void run() {
           messageWindow.append(msg);
       }
    });
}
person hardillb    schedule 02.08.2016
comment
Это решило проблему, большое спасибо! (Я думал, что перенос всего соединения в AsyncTask может помочь) - person mjn; 02.08.2016