Как я могу организовать код для общения с устройством последовательного порта?

Я пишу приложение .Net, которому нужно будет взаимодействовать с устройством последовательного порта. Устройство в основном представляет собой передатчик для некоторых буквенно-цифровых пейджеров старой школы. Иногда моему приложению нужно открыть последовательный порт и отправить сообщение передатчику.

Я знаю протокол для разговора с устройством. Это что-то вроде «болтливого» протокола. Отправить команду... дождаться конкретного ответа... отправить другую команду... дождаться другого определенного ответа... отправить фактическое сообщение... дождаться "принятого" ответа. Я могу заставить это работать с некоторым действительно хакерским кодом, включающим серию вызовов метода Write(...) для объекта SerialPort с промежуточными вызовами Thread.Sleep.

Конечно, я не хочу делать это, полагаясь на Thread.Sleep в ожидании ответа устройства. Кажется, что фреймворк Reactive Extensions должен подходить для такого типа вещей, но у меня проблемы с этим. Я начал с этого, но быстро заблудился и не знал, куда идти дальше, и имеет ли это вообще смысл:

var receivedData = Observable.FromEventPattern<SerialDataReceivedEventArgs>(serialPort, "DataReceived");
receivedData
    .Where(d => d.EventArgs.EventType == SerialData.Chars)
    .Subscribe(args =>
            {
                var response = serialPort.ReadExisting();
                // Now what?
            });

Во-первых, как запустить эту штуку с помощью первого вызова serialPort.Write()? Тогда как мне связать их вместе, проверяя ожидаемый ответ перед выполнением следующего вызова Write()? И, конечно же, если я не получу ожидаемого ответа, я бы хотел вырваться и создать исключение или что-то в этом роде. Я даже лаю правильное дерево с Rx, или есть другой шаблон, который лучше подходит для этого? Спасибо!


person Kevin Kuebler    schedule 08.09.2011    source источник


Ответы (3)


Rx — это абстракция над сценариями «передача данных из источника данных». В вашем случае вы смоделировали метод «чтения» последовательного порта как наблюдаемый Rx, и это необходимо сочетать с методом записи последовательного порта. Одним из возможных решений может быть что-то вроде приведенного ниже, хотя для этого могут потребоваться некоторые другие модификации в зависимости от конкретных потребностей вашего приложения.

            var serialPort = new System.IO.Ports.SerialPort("COM1");
            serialPort.Open();
            var receivedData = Observable.FromEvent<SerialDataReceivedEventArgs>(serialPort, "DataReceived")
                               .Where(d => d.EventArgs.EventType == SerialData.Chars)
                               .Select(_ => serialPort.ReadExisting());
            var replay = new ReplaySubject<string>();
            receivedData.Subscribe(replay);
            var commands = (new List<string>() { "Init", "Hello", "Done" });
            commands.Select((s, i) =>
            {
                serialPort.Write(s);
                var read = replay.Skip(i).First();
                //Validate s command against read response
                return true;//To some value to indicate success or failure
            }).ToList();
person Ankur    schedule 08.09.2011
comment
Я только немного понимаю это, но я подумал, что попытаюсь реализовать его, чтобы увидеть, работает ли он, и, если да, проработаю его, чтобы улучшить свое понимание. Однако в полученном Data.Subscribe(replay); линия. Компилятор говорит, что не может преобразовать из «System.Reactive.Subjects.ReplaySubject‹string›» в «System.IObserver‹System.Reactive.EventPattern‹System.IO.Ports.SerialDataReceivedEventArgs›› - person Kevin Kuebler; 09.09.2011
comment
У меня была ошибка в предыдущей строке, я ее исправил, и теперь код компилируется. И вроде работает, хотя сработало только один раз. Запуская его более одного раза, приложение всегда зависало, и я никогда не получал хорошего ответа от устройства. Не уверен, что там происходит - я закрываю и удаляю объект SerialPort в конце метода. Но это, наверное, отдельная тема. В целом это кажется хорошим подходом. Спасибо! - person Kevin Kuebler; 09.09.2011
comment
У меня было запущено шпионское приложение для последовательного порта, которое, как я полагаю, могло вызвать зависания. Теперь код работает, хотя я все еще использовал вызов Thread.Sleep() сразу после вызова serialPort.Write(s). Кажется, требуется совсем немного времени, чтобы обработать команду и отправить ответ, что имеет смысл. Если я правильно понимаю, событие DataReceived может запускаться несколько раз, прежде чем все данные будут получены, что, вероятно, и является проблемой. - person Kevin Kuebler; 09.09.2011
comment
Да, событие «Полученные данные» будет срабатывать всякий раз, когда некоторые данные будут доступны, и это могут быть частичные данные в соответствии с протоколом устройства последовательного порта, поэтому вам может потребоваться выполнить некоторую агрегацию наблюдаемых данных, поэтому создайте полное сообщение. - person Ankur; 09.09.2011
comment
На самом деле, я, кажется, могу пройти часть пути, установив свойство NewLine для serialPort и вызвав serialPort.ReadLine() вместо ReadExisting(). Но это работает только частично, потому что, если я удалю Thread.Sleep после записи, я все равно потеряю некоторые ответы. Любые идеи о том, как настроить это так, чтобы сон не нужен? Просто кажется, что это немного взломано. Но в остальном код работает, так что еще раз спасибо! - person Kevin Kuebler; 09.09.2011

Я не считаю, что RX отлично подходит для такого рода последовательной связи. В общем, RX, кажется, больше относится к односторонним потокам данных, а не к протоколам туда и обратно. Для последовательной связи, подобной этой, я написал класс для последовательного порта, который использует WaitHandles для ожидания ответа на команды. Общая структура такова:

Приложение вызывает метод для запуска асинхронной операции для отправки последовательности команд. Это запустит поток (я полагаю, из пула потоков), который отправляет каждую команду по очереди. Как только команда отправлена, операция ожидает ответа от WaitHandle (или истечет время ожидания и повторит попытку или завершит операцию с ошибкой). Когда ответ обработан, сигнализируется получение WaitHandle и отправляется следующая команда.

Событие последовательного приема (которое запускается в фоновых потоках при каждом поступлении данных) формирует пакеты данных. Когда полный пакет получен, проверьте, была ли отправлена ​​команда. Если это так, сигнализируйте потоку отправки нового ответа и ждите другого WaitHandle, чтобы разрешить обработку ответа (что может быть важно, чтобы получатель не испортил данные ответа).

EDIT: Добавлен пример (несколько большой), показывающий два основных метода отправки и получения.

Не показано свойство Me.Receiver, которое имеет тип ISerialReceiver и отвечает за построение пакетов данных, но не определяет, являются ли данные правильным ответом. Также не показаны CheckResponse и ProcessIncoming, которые представляют собой два абстрактных метода, переопределяемых производными классами для определения, является ли ответ только что отправленной командой, и для обработки «незапрошенных» входящих пакетов соответственно.

''' <summary>This field is used by <see cref="SendCommand" /> to wait for a
''' response after sending data.  It is set by <see cref="ReceiveData" />
''' when <see cref="ISerialReceiver.ProcessResponseByte">ProcessResponseByte</see>
''' on the <see cref="Receiver" /> returns true.</summary>
''' <remarks></remarks>
Private ReceiveResponse As New System.Threading.AutoResetEvent(False)
''' <summary>This field is used by <see cref="ReceiveData" /> to wait for
''' the response to be processed after setting <see cref="ReceiveResponse" />.
''' It is set by <see cref="SendCommand" /> when <see cref="CheckResponse" />
''' returns, regardless of the return value.</summary>
''' <remarks></remarks>
Private ProcessResponse As New System.Threading.ManualResetEvent(True)
''' <summary>
''' This field is used by <see cref="SendCommand" /> and <see cref="ReceiveData" />
''' to determine when an incoming packet is a response packet or if it is
''' one of a continuous stream of incoming packets.
''' </summary>
''' <remarks></remarks>
Private responseSolicited As Boolean

''' <summary>
''' Handles the DataReceived event of the wrapped SerialPort.
''' </summary>
''' <param name="sender">The wrapped SerialPort that raised the event.
''' This parameter is ignored.</param>
''' <param name="e">The event args containing data for the event</param>
''' <remarks>This function will process all bytes according to the
''' <see cref="Receiver" /> and allow <see cref="SendCommand" /> to
''' continue or will call <see cref="ProcessIncoming" /> when a complete
''' packet is received.</remarks>
Private Sub ReceiveData(ByVal sender As Object, ByVal e As SerialDataReceivedEventArgs)
    If e.EventType <> SerialData.Chars Then Exit Sub
    Dim input() As Byte

    SyncLock _portLock
        If Not _port.IsOpen OrElse _port.BytesToRead = 0 Then Exit Sub
        input = New Byte(_port.BytesToRead - 1) {}
        _port.Read(input, 0, input.Length)
    End SyncLock

    'process the received data
    If input Is Nothing OrElse input.Length = 0 OrElse Me.Receiver Is Nothing Then Exit Sub

    Dim responseCompleted As Boolean

    For i As Integer = 0 To input.Length - 1
        responseCompleted = Me.Receiver.ProcessResponseByte(input(i))

        'process completed response
        If responseCompleted Then
            responseSolicited = False
            System.Threading.WaitHandle.SignalAndWait(ReceiveResponse, ProcessResponse)

            'the data is not a response to a command sent by the decoder
            If Not responseSolicited Then
                ProcessIncoming(Me.Receiver.GetResponseData())
            End If
        End If
    Next
End Sub

''' <summary>
''' Sends a data command through the serial port.
''' </summary>
''' <param name="data">The data to be sent out the port</param>
''' <returns>The data received from the port or null if the operation
''' was cancelled.</returns>
''' <remarks>This function relies on the Receiver 
''' <see cref="ISerialReceiver.GetResponseData">GetResponseData</see> and 
''' the overriden <see cref="CheckResponse" /> to determine what response 
''' was received and if it was the correct response for the command.
''' <seealso cref="CheckResponse" /></remarks>
''' <exception cref="TimeoutException">The operation timed out.  The packet
''' was sent <see cref="MaxTries" /> times and no correct response was received.</exception>
''' <exception cref="ObjectDisposedException">The SerialTransceiver was disposed before
''' calling this method.</exception>
Private Function SendCommand(ByVal data() As Byte, ByVal ignoreCancelled As Boolean) As Byte()
    CheckDisposed()
    If data Is Nothing Then Return Nothing

    'make a copy of the data to ensure that it does not change during sending
    Dim sendData(data.Length - 1) As Byte
    Array.Copy(data, sendData, data.Length)

    Dim sendTries As Integer = 0
    Dim responseReceived As Boolean
    Dim responseData() As Byte = Nothing
    ReceiveResponse.Reset()
    ProcessResponse.Reset()
    While sendTries < MaxTries AndAlso Not responseReceived AndAlso _
          (ignoreCancelled OrElse Not Me.IsCancelled)
        'send the command data
        sendTries += 1
        If Not Me.WriteData(sendData) Then Return Nothing

        If Me.Receiver IsNot Nothing Then
            'wait for Timeout milliseconds for a response.  If no response is received
            'then waitone will return false.  If a response is received, the AutoResetEvent
            'will be triggered by the SerialDataReceived function to return true.
            If ReceiveResponse.WaitOne(Timeout, False) Then
                Try
                    'get the data that was just received
                    responseData = Me.Receiver.GetResponseData()
                    'check to see if it is the correct response
                    responseReceived = CheckResponse(sendData, responseData)
                    If responseReceived Then responseSolicited = True
                Finally
                    'allow the serial receive function to continue checking bytes
                    'regardless of if this function throws an error
                    ProcessResponse.Set()
                End Try
            End If
        Else
            'when there is no Receiver, assume that there is no response to
            'data sent from the transceiver through this method.
            responseReceived = True
        End If
    End While

    If Not ignoreCancelled AndAlso Me.IsCancelled Then
        'operation was cancelled, return nothing
        Return Nothing
    ElseIf Not responseReceived AndAlso sendTries >= MaxTries Then
        'operation timed out, throw an exception
        Throw New TimeoutException(My.Resources.SerialMaxTriesReached)
    Else
        'operation completed successfully, return the data
        Return responseData
    End If
End Function
person Gideon Engelberth    schedule 09.09.2011
comment
Спасибо за ответ. В целом я неплохо разбираюсь в C#, но, честно говоря, у меня не так много опыта в таком асинхронном программировании. Вот почему я надеялся, что Rx может несколько упростить это. :-) Но я думаю, что понимаю, о чем вы говорите: протокол туда и обратно. У вас есть конкретные примеры кода, которыми вы могли бы поделиться, как этот шаблон реализован? Спасибо! - person Kevin Kuebler; 09.09.2011
comment
@Kevin Keubler: см. отредактированный пост. Это просто функции отправки/получения, так что вам придется добавить запуск операции aync поверх SendCommand. Я использовал System.ComponentModel.AsyncOperationManager для запуска метода в фоновом потоке с отчетами о ходе выполнения. - person Gideon Engelberth; 09.09.2011
comment
Спасибо за этот ответ. RX не нацелен на возврат туда и обратно, но давайте попробуем хотя бы поэкспериментировать. Давайте сделаем наблюдаемый поток (ну, их последовательность в случае TCP-клиента, которому, возможно, придется иногда переподключаться, stackoverflow.com/questions/18978523/), наблюдайте с помощью EventLoopScheduler для отдельного потока, мы читаем плохо сгруппированные группы символов, мы -группа с помощью Buffer(). В общем, нет нити/WaitHandle, и поэтому, возможно, мы сможем обрабатывать в методе Observer основной более высокий уровень протокола за меньшее количество строк. Есть предположения ? - person Stéphane Gourichon; 15.04.2015

Рабочий процесс отправки и получения можно легко организовать, объединив Rx и async/await. Я написал об этом в блоге по адресу http://kerry.lothrop.de/serial-rx/.

person kwl    schedule 17.04.2015