Как я могу решить проблему сломанной трубы MQTT Client

Когда я пытаюсь опубликовать некоторые сообщения брокеру, я получаю следующую ошибку:

Exception in thread Thread-4:
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/threading.py", line 932, in _bootstrap_inner
    self.run()
  File "/usr/local/lib/python3.8/site-packages/opcua/client/client.py", line 66, in run
    self.client.open_secure_channel(renew=True)
  File "/usr/local/lib/python3.8/site-packages/opcua/client/client.py", line 325, in open_secure_channel
    result = self.uaclient.open_secure_channel(params)
  File "/usr/local/lib/python3.8/site-packages/opcua/client/ua_client.py", line 265, in open_secure_channel
    return self._uasocket.open_secure_channel(params)
  File "/usr/local/lib/python3.8/site-packages/opcua/client/ua_client.py", line 197, in open_secure_channel
    future = self._send_request(request, message_type=ua.MessageType.SecureOpen)
  File "/usr/local/lib/python3.8/site-packages/opcua/client/ua_client.py", line 72, in _send_request
    self._socket.write(msg)
  File "/usr/local/lib/python3.8/site-packages/opcua/common/utils.py", line 118, in write
    self.socket.sendall(data)
BrokenPipeError: [Errno 32] Broken pipe

я использую сетевой цикл после подключения клиента, и я пытался обработать ошибку, но это не работает, у меня всегда одна и та же проблема, моя функция вызывается много раз в секунду, каждый раз, когда я получаю уведомление об изменении данных от сервер. Может кто-нибудь помочь, пожалуйста? Вот следующий код, который вызывает ошибку:

def clientPub(self,top,msg):
    try: 
        cl=self.client
        if(self.connectionFlag !=1):
            cl.connect(self.broker,1883)
            cl.loop_start()            
        cl.publish(top,msg,2)
    except IOError as e: 
        if e.errno == errno.EPIPE:
            print("Trying again to publish the message ...")
            cl.publish(top,msg,2) 

вышеуказанная функция вызывается этой для каждого уведомления с сервера:

def datachange_notification(self, node, val, data):
    print("data recieved :",data.monitored_item.Value.SourceTimestamp,val,machine_number+".G3_"+node.nodeid.Identifier[3:])        
    valuesDict={"name":machine_number+".G3_"+node.nodeid.Identifier[3:],"datapoint":[[str(data.monitored_item.Value.SourceTimestamp),val,3]],"attributes":{"machine-type":"opcua"}}   
    finalvalue={"messageId":str(round(time.time()*1000)),"body":valuesDict}
    sentval=json.dumps(finalvalue)
    self.clientPub(self.topic,sentval)

жду вашего ответа, спасибо.


person RiyadhG    schedule 18.05.2020    source источник
comment
Трассировка стека отображается в пакете ua_client, а не в библиотеке MQTT.   -  person hardillb    schedule 19.05.2020
comment
да @Panagiotis, я заметил это вчера, когда пытался решить проблему, но до сих пор не нашел решения сломанной трубы. благодарю вас   -  person RiyadhG    schedule 19.05.2020


Ответы (1)


Это классическая проблема производителя-потребителя.

Вы создаете и храните данные здесь, в sentval, и в то же время пытаетесь использовать sentval, вызывая: self.clientPub(self.topic,sentval)

У меня также была аналогичная проблема, и я решил ее, используя поток производитель-потребитель с использованием блокировки

Это сработало отлично для меня!

person M.B. Explorer    schedule 26.05.2020
comment
На самом деле эта ошибка возникает только тогда, когда вы пытаетесь выполнить многопроцессорную обработку кода с ошибками. Неожиданно столкнуться с этой проблемой, когда ваш код правильный. - person M.B. Explorer; 27.05.2020