Метод parseFrom() Protobuf Java зависает при получении сообщения от сервера MQTT

В настоящее время я пытаюсь публиковать и получать сообщения Protobuf через сервер Mosquitto MQQT. Я успешно публикую права на сервер. Однако, когда клиент получает его, метод parseFrom() зависает и больше не возвращается. Это очень похоже на эту, когда сообщение Protobuf отправляется через сокет, который никогда не закрывается.

Издатель:

MqttClient adapterClient = new MqttClient(broker, clientID);
SpecsMessage.Specs protoNotifyMessage = SpecsMessage.Specs.newBuilder()
                    .setNodeType("basic")
                    .setAddress(serverSocket.getInetAddress().getHostName())
                    .setPort(serverSocket.getLocalPort())
                    .build();
MqttMessage notifyMessage = new MqttMessage(protoNotifyMessage.toString().getBytes());
adapterClient.publish("availableNodes", notifyMessage);

Подписчик:

public class TestController implements MqttCallback {
  public void messageArrived(String topic, MqttMessage message){
                System.out.println("New node connected");     
                System.out.println("Payload: \n" + new String(message.getPayload()));                           
                SpecsMessage.Specs protoMessage = SpecsMessage.Specs.parseFrom(message.getPayload());
  }
}

Я не смог найти способ указать серверу MQQT правильный способ отправки сообщения.

Я также пытался использовать метод writeDelimitedFrom().

MqttClient adapterClient = new MqttClient(broker, clientID);
SpecsMessage.Specs protoNotifyMessage = SpecsMessage.Specs.newBuilder()
                    .setNodeType("basic")
                    .setAddress(serverSocket.getInetAddress().getHostName())
                    .setPort(serverSocket.getLocalPort())
                    .build();
ByteArrayOutputStream output = new ByteArrayOutputStream();
protoNotifyMessage.writeDelimitedTo(output);
MqttMessage notifyMessage = new MqttMessage(output.toByteArray());
adapterClient.publish("availableNodes", notifyMessage);

Однако сообщение неправильно преобразовано в byte[], вот как оно должно выглядеть:

nodeType: "basic"
address: "0.0.0.0"
port: 43101

И вот что я получаю:

basic0.0.0.0��

Есть ли способ заставить это работать, либо исправив метод отправки, либо решив проблему преобразования byte[]?


person joss-enet    schedule 02.04.2020    source источник
comment
Я сомневаюсь, что это проблема, но я был бы удивлен, если бы завис сам parseFrom, а не метод, предоставляющий данные. new String(payload) работает, однако. Работает ли это, если вы сохраняете полезную нагрузку в переменной, а затем используете ее как в вызовах println, так и в вызовах parseFrom?   -  person Andy Turner    schedule 02.04.2020
comment
Спасибо за ваше замечание. Я пробовал, но parseFrom() все еще зависает. Однако, как вы предположили, это указывает на то, что это не проблема связи.   -  person joss-enet    schedule 02.04.2020
comment
Вы также должны быть осторожны при использовании набора символов по умолчанию с getBytes() и new String: вы получите неожиданные результаты, если JVM издателя и подписчика используют разные наборы символов по умолчанию.   -  person Andy Turner    schedule 02.04.2020
comment
Я буду иметь это в виду, еще раз спасибо за ваше время!   -  person joss-enet    schedule 02.04.2020


Ответы (1)


Вы пытаетесь использовать parseFrom для синтаксического анализа прототипа в текстовом формате. parseFrom предназначен для разбора формата провода.

Вместо этого отправьте прото в формате wire - message.toByteArray().

(Если вы хотите анализировать текстовый формат, используйте TextFormat).

person Andy Turner    schedule 02.04.2020