я пытаюсь и до сих пор не могу использовать python asyncio для доступа к последовательному порту.
Я был бы очень признателен за любые советы по использованию новой асинхронной структуры Python на простом fd.
Ваше здоровье!
Джеймс
я пытаюсь и до сих пор не могу использовать python asyncio для доступа к последовательному порту.
Я был бы очень признателен за любые советы по использованию новой асинхронной структуры Python на простом fd.
Ваше здоровье!
Джеймс
Другой способ использования FD
import asyncio
import serial
s = serial.Serial('/dev/pts/13', 9600)
def test_serial():
'''
read a line and print.
'''
text = ""
msg = s.read().decode()
while (msg != '\n'):
text += msg
msg = s.read().decode()
print(text)
loop.call_soon(s.write, "ok\n".encode())
loop = asyncio.get_event_loop()
loop.add_reader(s, test_serial)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.close()
SerialException('read failed: {}'.format(e)) serial.serialutil.SerialException: read failed: device reports readiness to read but returned no data (device disconnected or multiple access on port?)
. Почему?
- person jaromrax; 25.03.2021
pySerial получает прямую asyncio
поддержку. Сейчас он находится в экспериментальном состоянии, но работает так, как я ожидал.
Пример взят из документации:
class Output(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
print('port opened', transport)
transport.serial.rts = False
transport.write(b'hello world\n')
def data_received(self, data):
print('data received', repr(data))
self.transport.close()
def connection_lost(self, exc):
print('port closed')
asyncio.get_event_loop().stop()
loop = asyncio.get_event_loop()
coro = serial.aio.create_serial_connection(loop, Output, '/dev/ttyUSB0', baudrate=115200)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()
pyserial
и вместо этого поддерживается отдельным пакетом pyserial-asyncio
. См. pySerial API — документацию по pySerial 3.3 и Краткое введение — документация pySerial-asyncio 0.4.
- person Craig McQueen; 29.05.2017
Другой вариант — написать весь свой последовательный материал с блокирующими вызовами, а затем запустить его в другом потоке с помощью run_in_executor:
import asyncio
import concurrent
from serial import Serial
# Normal serial blocking reads
# This could also do any processing required on the data
def get_byte():
return s.read(1)
# Runs blocking function in executor, yielding the result
@asyncio.coroutine
def get_byte_async():
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
res = yield from loop.run_in_executor(executor, get_byte)
return res
def get_and_print():
b = yield from get_byte_async()
print (b)
s = Serial("COM11", 19200, timeout=10)
loop = asyncio.get_event_loop()
loop.run_until_complete(get_and_print())
Вот рабочий пример с использованием pyserial-asyncio:
from asyncio import get_event_loop
from serial_asyncio import open_serial_connection
async def run():
reader, writer = await open_serial_connection(url='/dev/ttyS0', baudrate=115200)
while True:
line = await reader.readline()
print(str(line, 'utf-8'))
loop = get_event_loop()
loop.run_until_complete(run())
Всем спасибо за предложения, в конце концов я решил проблему немного другим способом и использовал хорошо поддерживаемые соединения сокетов в asyncio, но затем использовал ser2net (http://sourceforge.net/projects/ser2net/) для доступа к последовательным портам.
На настройку ушло около 10 секунд, и это означает, что код Python теперь также может обрабатывать доступ к удаленным последовательным портам.
Некоторое время назад я написал класс AsyncFile
, интерфейс проще, чем низкоуровневые протоколы.
Исходный код находится здесь: https://github.com/l04m33/pyx/blob/dbaf121ab7bb9bbf04616a7285bcaba757682d03/pyx/io.py#L20
class AsyncFile:
"""A local file class for use with the ``asyncio`` module.
``loop`` should be the event loop in use.
``filename`` is the name of the file to be opened.
``fileobj`` should be a regular file-like object.
``mode`` is the open mode accepted by built-in function ``open``.
If ``filename`` is specified, the named file will be opened. And if
``fileobj`` is specified, that file object will be used directly. You
cannot specify both ``filename`` and ``fileobj``.
This class can be used in a ``with`` statement.
"""
DEFAULT_BLOCK_SIZE = 8192
def __init__(self, loop=None, filename=None,
fileobj=None, mode='rb'):
if (filename is None and fileobj is None) or \
(filename is not None and fileobj is not None):
raise RuntimeError('Confilicting arguments')
if filename is not None:
if 'b' not in mode:
raise RuntimeError('Only binary mode is supported')
fileobj = open(filename, mode=mode)
elif 'b' not in fileobj.mode:
raise RuntimeError('Only binary mode is supported')
fl = fcntl.fcntl(fileobj, fcntl.F_GETFL)
if fcntl.fcntl(fileobj, fcntl.F_SETFL, fl | os.O_NONBLOCK) != 0:
if filename is not None:
fileobj.close()
errcode = ctypes.get_errno()
raise OSError((errcode, errno.errorcode[errcode]))
self._fileobj = fileobj
if loop is None:
loop = asyncio.get_event_loop()
self._loop = loop
self._rbuffer = bytearray()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def fileno(self):
return self._fileobj.fileno()
def seek(self, offset, whence=None):
if whence is None:
return self._fileobj.seek(offset)
else:
return self._fileobj.seek(offset, whence)
def tell(self):
return self._fileobj.tell()
def _read_ready(self, future, n, total):
if future.cancelled():
self._loop.remove_reader(self._fileobj.fileno())
return
try:
res = self._fileobj.read(n)
except (BlockingIOError, InterruptedError):
return
except Exception as exc:
self._loop.remove_reader(self._fileobj.fileno())
future.set_exception(exc)
return
if not res: # EOF
self._loop.remove_reader(self._fileobj.fileno())
future.set_result(bytes(self._rbuffer))
return
self._rbuffer.extend(res)
if total > 0:
more_to_go = total - len(self._rbuffer)
if more_to_go <= 0: # enough
res, self._rbuffer = self._rbuffer[:n], self._rbuffer[n:]
self._loop.remove_reader(self._fileobj.fileno())
future.set_result(bytes(res))
else:
more_to_go = min(self.DEFAULT_BLOCK_SIZE, more_to_go)
self._loop.add_reader(self._fileobj.fileno(),
self._read_ready,
future, more_to_go, total)
else: # total < 0
# This callback is still registered with total < 0,
# nothing to do here
pass
@asyncio.coroutine
def read(self, n=-1):
future = asyncio.Future(loop=self._loop)
if n == 0:
future.set_result(b'')
else:
try:
res = self._fileobj.read(n)
except (BlockingIOError, InterruptedError):
if n < 0:
self._rbuffer.clear()
self._loop.add_reader(self._fileobj.fileno(),
self._read_ready,
future, self.DEFAULT_BLOCK_SIZE, n)
else:
self._rbuffer.clear()
read_block_size = min(self.DEFAULT_BLOCK_SIZE, n)
self._loop.add_reader(self._fileobj.fileno(),
self._read_ready,
future, read_block_size, n)
except Exception as exc:
future.set_exception(exc)
else:
future.set_result(res)
return future
def _write_ready(self, future, data, written):
if future.cancelled():
self._loop.remove_writer(self._fileobj.fileno())
return
try:
res = self._fileobj.write(data)
except (BlockingIOError, InterruptedError):
return
except Exception as exc:
self._loop.remove_writer(self._fileobj.fileno())
future.set_exception(exc)
return
if res < len(data):
data = data[res:]
self._loop.add_writer(self._fileobj.fileno(),
self._write_ready,
future, data, written + res)
else:
self._loop.remove_writer(self._fileobj.fileno())
future.set_result(written + res)
@asyncio.coroutine
def write(self, data):
future = asyncio.Future(loop=self._loop)
if len(data) == 0:
future.set_result(0)
else:
try:
res = self._fileobj.write(data)
except (BlockingIOError, InterruptedError):
self._loop.add_writer(self._fileobj.fileno(),
self._write_ready,
future, data, 0)
except Exception as exc:
future.set_exception(exc)
else:
future.set_result(res)
return future
def stat(self):
return os.stat(self._fileobj.fileno(), follow_symlinks=True)
def close(self):
self._loop.remove_reader(self._fileobj.fileno())
self._loop.remove_writer(self._fileobj.fileno())
self._fileobj.close()
asyncio
?
- person Craig McQueen; 26.05.2017
Вот моя попытка последовательного порта asyncio. Этот интерфейс позволяет обернуть экземпляр serial.Serial в класс AIOSerial, который затем позволяет вам выполнять await AIOSerial.readline()
и await AIOSerial.write(data)
и не использовать обратные вызовы в стиле asyncio.Protocol().
import asyncio
import sys
import serial
class AIOSerial:
def __init__(self, serial, ioloop=None):
self._serial = serial
# Asynchronous I/O requires non-blocking devices
self._serial.timeout = 0
self._serial.write_timeout = 0
if ioloop is not None:
self.loop = ioloop
else:
self.loop = asyncio.get_event_loop()
self.loop.add_reader(self._serial.fd, self._on_read)
self._rbuf = b''
self._rbytes = 0
self._wbuf = b''
self._rfuture = None
self._delimiter = None
def _on_read(self):
data = self._serial.read(4096)
self._rbuf += data
self._rbytes = len(self._rbuf)
self._check_pending_read()
def _on_write(self):
written = self._serial.write(self._wbuf)
self._wbuf = self._wbuf[written:]
if not self._wbuf:
self.loop.remove_writer(self._serial.fd)
def _check_pending_read(self):
future = self._rfuture
if future is not None:
# get data from buffer
pos = self._rbuf.find(self._delimiter)
if pos > -1:
ret = self._rbuf[:(pos+len(self._delimiter))]
self._rbuf = self._rbuf[(pos+len(self._delimiter)):]
self._delimiter = self._rfuture = None
future.set_result(ret)
return future
async def read_until(self, delimiter=b'\n'):
while self._delimiter:
await self._rfuture
self._delimiter = delimiter
self._rfuture = asyncio.Future()
#future = self._check_pending_read()
return await self._rfuture
async def readline(self):
return await self.read_until()
async def write(self, data):
need_add_writer = not self._wbuf
self._wbuf = self._wbuf + data
if need_add_writer:
self.loop.add_writer(self._serial.fd, self._on_write)
return len(data)
Пример использования:
async def go_serial():
ser = serial.Serial(sys.argv[1], 9600) #, rtscts=True, dsrdtr=True)
print(ser)
aser = AIOSerial(ser)
written = await aser.write(b'test 1\n')
print('written', written)
data = await aser.readline()
print('got from readline', data)
while True:
await aser.write(b'.\n')
data = await aser.readline()
print('GOT!', data)
await asyncio.sleep(2.78)
async def main():
for n in range(120):
await asyncio.sleep(1)
print('n=%d' % n)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
asyncio.ensure_future(go_serial())
loop.run_until_complete(main())
Это устанавливает последовательный порт и две асинхронные задачи: go_serial и main. Main просто работает в течение 120 секунд, а затем цикл завершается. go_serial записывает и читает последовательный порт, ожидая ответа на каждую отправленную строку.
Затем чтение и запись в последовательный порт выполняются с помощью await aser.write(b'blah')
и await aser.readline()
(или await aser.read_until(b'\r\n')
, если вам нужен другой разделитель).
Обратите внимание, что на самом деле он не готов к работе, так как хотелось бы иметь некоторые ограничения на объем буфера.
Чтобы проверить это, я моделирую последовательный порт с помощью следующего сценария, который выводит имя созданного pty, которое затем является параметром для верхнего примера.
#!/usr/bin/python3
import fcntl
import time
import os
import errno
import pty
chars = []
ser, s = pty.openpty()
oldflags = fcntl.fcntl(ser, fcntl.F_GETFL)
# make the PTY non-blocking
fcntl.fcntl(ser, fcntl.F_SETFL, oldflags | os.O_NONBLOCK)
print('Created: %s' % os.ttyname(s))
while True:
time.sleep(0.1)
c = None
try:
c = os.read(ser, 10)
except OSError as err:
if err.errno == errno.EAGAIN or err.errno == errno.EWOULDBLOCK:
c = None
else:
raise
if c:
chars.append(c)
data = b''.join(chars)
if b'\n' in data:
one, data = data.split(b'\n', 1)
b = b'%.6f\n' % time.time()
os.write(ser, b)
print(one)
chars = [data]
Попробуйте использовать aioserial.
Вот пример:
import aioserial
import asyncio
async def read_and_print(aioserial_instance: aioserial.AioSerial):
while True:
print((await aioserial_instance.read_async()).decode(errors='ignore'), end='', flush=True)
aioserial_com1: aioserial.AioSerial = aioserial.AioSerial(port='COM1')
asyncio.run(read_and_print(aioserial_com1))
Модератору,
Ответ похож на этот один, но не дублируется.
loop.connect_write_pipe()
/loop.connect_read_pipe()
подключитьfd
, как показано в примере async stdio - person jfs   schedule 05.05.2014