использование последовательного порта в python3 asyncio

я пытаюсь и до сих пор не могу использовать python asyncio для доступа к последовательному порту.

Я был бы очень признателен за любые советы по использованию новой асинхронной структуры Python на простом fd.

Ваше здоровье!

Джеймс


person time4tea    schedule 09.02.2014    source источник
comment
Я думаю, вам нужно будет создать свой собственный транспорт и протокол для чтения/записи на tty. Взгляните на исходный код и попробуйте адаптировать сокеты/транспорт подпроцесса/протокол для вашего варианта использования.   -  person gawel    schedule 10.02.2014
comment
да, это то, на что это похоже, но также похоже, что весь цикл событий должен быть перенастроен, так как он начинает создавать пару сокетов, что здесь не имеет значения. немного озадачен тем, что кажется, что он полностью не поддерживается (я имею в виду, что чтение/запись на tty — это самый простой вариант использования асинхронности, верно?)   -  person time4tea    schedule 10.02.2014
comment
Ага. не должно быть так трудно сделать. и ты ошибаешься. сокет не требуется для использования цикла событий. вы можете по крайней мере использовать подпроцесс вместо этого   -  person gawel    schedule 11.02.2014
comment
вы можете попробовать loop.connect_write_pipe()/loop.connect_read_pipe() подключить fd, как показано в примере async stdio   -  person jfs    schedule 05.05.2014


Ответы (8)


Другой способ использования FD

import asyncio
import serial

s = serial.Serial('/dev/pts/13', 9600)


def test_serial():
    '''
    read a line and print.
    '''
    text = ""
    msg = s.read().decode()
    while (msg != '\n'):
        text += msg
        msg = s.read().decode()
    print(text)
    loop.call_soon(s.write, "ok\n".encode())

loop = asyncio.get_event_loop()
loop.add_reader(s, test_serial)
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    loop.close()
person RodixPy    schedule 13.01.2015
comment
Мой терминал /dev/pts/... закрывается при первом нажатии клавиши, и я всегда получаю - SerialException('read failed: {}'.format(e)) serial.serialutil.SerialException: read failed: device reports readiness to read but returned no data (device disconnected or multiple access on port?) . Почему? - person jaromrax; 25.03.2021

pySerial получает прямую asyncio поддержку. Сейчас он находится в экспериментальном состоянии, но работает так, как я ожидал.

Пример взят из документации:

class Output(asyncio.Protocol):
    def connection_made(self, transport):
        self.transport = transport
        print('port opened', transport)
        transport.serial.rts = False
        transport.write(b'hello world\n')

    def data_received(self, data):
        print('data received', repr(data))
        self.transport.close()

    def connection_lost(self, exc):
        print('port closed')
        asyncio.get_event_loop().stop()

loop = asyncio.get_event_loop()
coro = serial.aio.create_serial_connection(loop, Output, '/dev/ttyUSB0', baudrate=115200)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()
person Günther Jena    schedule 06.09.2016
comment
Очевидно, эта поддержка была удалена из pyserial и вместо этого поддерживается отдельным пакетом pyserial-asyncio. См. pySerial API — документацию по pySerial 3.3 и Краткое введение — документация pySerial-asyncio 0.4. - person Craig McQueen; 29.05.2017

Другой вариант — написать весь свой последовательный материал с блокирующими вызовами, а затем запустить его в другом потоке с помощью run_in_executor:

import asyncio
import concurrent

from serial import Serial

# Normal serial blocking reads
# This could also do any processing required on the data
def get_byte():
    return s.read(1)

# Runs blocking function in executor, yielding the result
@asyncio.coroutine
def get_byte_async():
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        res = yield from loop.run_in_executor(executor, get_byte)
        return res

def get_and_print():
    b = yield from get_byte_async()
    print (b)

s = Serial("COM11", 19200, timeout=10)
loop = asyncio.get_event_loop()
loop.run_until_complete(get_and_print())
person emorris    schedule 13.01.2015

Вот рабочий пример с использованием pyserial-asyncio:

from asyncio import get_event_loop
from serial_asyncio import open_serial_connection

async def run():
    reader, writer = await open_serial_connection(url='/dev/ttyS0', baudrate=115200)
    while True:
        line = await reader.readline()
        print(str(line, 'utf-8'))

loop = get_event_loop()
loop.run_until_complete(run())
person pdenes    schedule 29.01.2020

Всем спасибо за предложения, в конце концов я решил проблему немного другим способом и использовал хорошо поддерживаемые соединения сокетов в asyncio, но затем использовал ser2net (http://sourceforge.net/projects/ser2net/) для доступа к последовательным портам.

На настройку ушло около 10 секунд, и это означает, что код Python теперь также может обрабатывать доступ к удаленным последовательным портам.

person time4tea    schedule 26.05.2014
comment
Интересное решение. Хотя это, возможно, не является жизнеспособным решением, если вам нужно делать такие вещи, как изменение скорости передачи данных, обработка ошибок кадрирования, обработка аппаратных линий квитирования последовательного порта (DTR, DSR, DCD и т. д.). - person Craig McQueen; 26.05.2017

Некоторое время назад я написал класс AsyncFile, интерфейс проще, чем низкоуровневые протоколы.

Исходный код находится здесь: https://github.com/l04m33/pyx/blob/dbaf121ab7bb9bbf04616a7285bcaba757682d03/pyx/io.py#L20

class AsyncFile:
    """A local file class for use with the ``asyncio`` module.
    ``loop`` should be the event loop in use.
    ``filename`` is the name of the file to be opened.
    ``fileobj`` should be a regular file-like object.
    ``mode`` is the open mode accepted by built-in function ``open``.
    If ``filename`` is specified, the named file will be opened. And if
    ``fileobj`` is specified, that file object will be used directly. You
    cannot specify both ``filename`` and ``fileobj``.
    This class can be used in a ``with`` statement.
    """

    DEFAULT_BLOCK_SIZE = 8192

    def __init__(self, loop=None, filename=None,
                 fileobj=None, mode='rb'):
        if (filename is None and fileobj is None) or \
                (filename is not None and fileobj is not None):
            raise RuntimeError('Confilicting arguments')

        if filename is not None:
            if 'b' not in mode:
                raise RuntimeError('Only binary mode is supported')
            fileobj = open(filename, mode=mode)
        elif 'b' not in fileobj.mode:
            raise RuntimeError('Only binary mode is supported')

        fl = fcntl.fcntl(fileobj, fcntl.F_GETFL)
        if fcntl.fcntl(fileobj, fcntl.F_SETFL, fl | os.O_NONBLOCK) != 0:
            if filename is not None:
                fileobj.close()
            errcode = ctypes.get_errno()
            raise OSError((errcode, errno.errorcode[errcode]))

        self._fileobj = fileobj

        if loop is None:
            loop = asyncio.get_event_loop()
        self._loop = loop
        self._rbuffer = bytearray()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.close()

    def fileno(self):
        return self._fileobj.fileno()

    def seek(self, offset, whence=None):
        if whence is None:
            return self._fileobj.seek(offset)
        else:
            return self._fileobj.seek(offset, whence)

    def tell(self):
        return self._fileobj.tell()

    def _read_ready(self, future, n, total):
        if future.cancelled():
            self._loop.remove_reader(self._fileobj.fileno())
            return

        try:
            res = self._fileobj.read(n)
        except (BlockingIOError, InterruptedError):
            return
        except Exception as exc:
            self._loop.remove_reader(self._fileobj.fileno())
            future.set_exception(exc)
            return

        if not res:     # EOF
            self._loop.remove_reader(self._fileobj.fileno())
            future.set_result(bytes(self._rbuffer))
            return

        self._rbuffer.extend(res)

        if total > 0:
            more_to_go = total - len(self._rbuffer)
            if more_to_go <= 0:  # enough
                res, self._rbuffer = self._rbuffer[:n], self._rbuffer[n:]
                self._loop.remove_reader(self._fileobj.fileno())
                future.set_result(bytes(res))
            else:
                more_to_go = min(self.DEFAULT_BLOCK_SIZE, more_to_go)
                self._loop.add_reader(self._fileobj.fileno(),
                                      self._read_ready,
                                      future, more_to_go, total)
        else:   # total < 0
            # This callback is still registered with total < 0,
            # nothing to do here
            pass

    @asyncio.coroutine
    def read(self, n=-1):
        future = asyncio.Future(loop=self._loop)

        if n == 0:
            future.set_result(b'')
        else:
            try:
                res = self._fileobj.read(n)
            except (BlockingIOError, InterruptedError):
                if n < 0:
                    self._rbuffer.clear()
                    self._loop.add_reader(self._fileobj.fileno(),
                                          self._read_ready,
                                          future, self.DEFAULT_BLOCK_SIZE, n)
                else:
                    self._rbuffer.clear()
                    read_block_size = min(self.DEFAULT_BLOCK_SIZE, n)
                    self._loop.add_reader(self._fileobj.fileno(),
                                          self._read_ready,
                                          future, read_block_size, n)
            except Exception as exc:
                future.set_exception(exc)
            else:
                future.set_result(res)

        return future

    def _write_ready(self, future, data, written):
        if future.cancelled():
            self._loop.remove_writer(self._fileobj.fileno())
            return

        try:
            res = self._fileobj.write(data)
        except (BlockingIOError, InterruptedError):
            return
        except Exception as exc:
            self._loop.remove_writer(self._fileobj.fileno())
            future.set_exception(exc)
            return

        if res < len(data):
            data = data[res:]
            self._loop.add_writer(self._fileobj.fileno(),
                                  self._write_ready,
                                  future, data, written + res)
        else:
            self._loop.remove_writer(self._fileobj.fileno())
            future.set_result(written + res)

    @asyncio.coroutine
    def write(self, data):
        future = asyncio.Future(loop=self._loop)

        if len(data) == 0:
            future.set_result(0)
        else:
            try:
                res = self._fileobj.write(data)
            except (BlockingIOError, InterruptedError):
                self._loop.add_writer(self._fileobj.fileno(),
                                      self._write_ready,
                                      future, data, 0)
            except Exception as exc:
                future.set_exception(exc)
            else:
                future.set_result(res)

        return future

    def stat(self):
        return os.stat(self._fileobj.fileno(), follow_symlinks=True)

    def close(self):
        self._loop.remove_reader(self._fileobj.fileno())
        self._loop.remove_writer(self._fileobj.fileno())
        self._fileobj.close()
person l04m33    schedule 20.05.2015
comment
Можете ли вы привести краткий пример кода, который использует этот класс для последовательного порта с asyncio? - person Craig McQueen; 26.05.2017

Вот моя попытка последовательного порта asyncio. Этот интерфейс позволяет обернуть экземпляр serial.Serial в класс AIOSerial, который затем позволяет вам выполнять await AIOSerial.readline() и await AIOSerial.write(data) и не использовать обратные вызовы в стиле asyncio.Protocol().

import asyncio
import sys

import serial


class AIOSerial:
    def __init__(self, serial, ioloop=None):
        self._serial = serial
        # Asynchronous I/O requires non-blocking devices
        self._serial.timeout = 0
        self._serial.write_timeout = 0

        if ioloop is not None:
            self.loop = ioloop
        else:
            self.loop = asyncio.get_event_loop()
        self.loop.add_reader(self._serial.fd, self._on_read)
        self._rbuf = b''
        self._rbytes = 0
        self._wbuf = b''
        self._rfuture = None
        self._delimiter = None

    def _on_read(self):
        data = self._serial.read(4096)
        self._rbuf += data
        self._rbytes = len(self._rbuf)
        self._check_pending_read()

    def _on_write(self):
        written = self._serial.write(self._wbuf)
        self._wbuf = self._wbuf[written:]
        if not self._wbuf:
            self.loop.remove_writer(self._serial.fd)

    def _check_pending_read(self):
        future = self._rfuture
        if future is not None:
            # get data from buffer
            pos = self._rbuf.find(self._delimiter)
            if pos > -1:
                ret = self._rbuf[:(pos+len(self._delimiter))]
                self._rbuf = self._rbuf[(pos+len(self._delimiter)):]
                self._delimiter = self._rfuture = None
                future.set_result(ret)
                return future

    async def read_until(self, delimiter=b'\n'):
        while self._delimiter:
            await self._rfuture

        self._delimiter = delimiter
        self._rfuture = asyncio.Future()
        #future = self._check_pending_read()
        return await self._rfuture

    async def readline(self):
        return await self.read_until()

    async def write(self, data):
        need_add_writer = not self._wbuf

        self._wbuf = self._wbuf + data
        if need_add_writer:
            self.loop.add_writer(self._serial.fd, self._on_write)
        return len(data)

Пример использования:

async def go_serial():
    ser = serial.Serial(sys.argv[1], 9600) #, rtscts=True, dsrdtr=True)
    print(ser)
    aser = AIOSerial(ser)

    written = await aser.write(b'test 1\n')
    print('written', written)
    data = await aser.readline()
    print('got from readline', data)

    while True:
        await aser.write(b'.\n')
        data = await aser.readline()
        print('GOT!', data)
        await asyncio.sleep(2.78)

async def main():
    for n in range(120):
        await asyncio.sleep(1)
        print('n=%d' % n)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(go_serial())
    loop.run_until_complete(main())

Это устанавливает последовательный порт и две асинхронные задачи: go_serial и main. Main просто работает в течение 120 секунд, а затем цикл завершается. go_serial записывает и читает последовательный порт, ожидая ответа на каждую отправленную строку.

Затем чтение и запись в последовательный порт выполняются с помощью await aser.write(b'blah') и await aser.readline() (или await aser.read_until(b'\r\n'), если вам нужен другой разделитель).

Обратите внимание, что на самом деле он не готов к работе, так как хотелось бы иметь некоторые ограничения на объем буфера.

Чтобы проверить это, я моделирую последовательный порт с помощью следующего сценария, который выводит имя созданного pty, которое затем является параметром для верхнего примера.

#!/usr/bin/python3
import fcntl
import time
import os
import errno
import pty


chars = []
ser, s = pty.openpty()
oldflags = fcntl.fcntl(ser, fcntl.F_GETFL)
# make the PTY non-blocking
fcntl.fcntl(ser, fcntl.F_SETFL, oldflags | os.O_NONBLOCK)

print('Created: %s' % os.ttyname(s))


while True:
    time.sleep(0.1)
    c = None
    try:
        c = os.read(ser, 10)
    except OSError as err:
        if err.errno == errno.EAGAIN or err.errno == errno.EWOULDBLOCK:
            c = None
        else:
            raise
    if c:
        chars.append(c)

    data = b''.join(chars)
    if b'\n' in data:
        one, data = data.split(b'\n', 1)
        b = b'%.6f\n' % time.time()
        os.write(ser, b)
        print(one)
    chars = [data]
person hruske    schedule 09.06.2018

Попробуйте использовать aioserial.

Вот пример:

import aioserial
import asyncio


async def read_and_print(aioserial_instance: aioserial.AioSerial):
    while True:
        print((await aioserial_instance.read_async()).decode(errors='ignore'), end='', flush=True)


aioserial_com1: aioserial.AioSerial = aioserial.AioSerial(port='COM1')

asyncio.run(read_and_print(aioserial_com1))

Модератору,

Ответ похож на этот один, но не дублируется.

person changyuheng    schedule 25.09.2018