несколько одновременных серверов asyncio

Я пытаюсь использовать asyncio Python для одновременного запуска нескольких серверов, передавая данные между ними. Для моего конкретного случая мне нужен веб-сервер с веб-сокетами, UDP-подключение к внешнему устройству, а также база данных и другие взаимодействия. Я могу найти примеры практически любого из них по отдельности, но я изо всех сил пытаюсь найти правильный способ их одновременного запуска с передачей данных между ними.

Самое близкое, что я нашел здесь, находится здесь: Связь между асинхронным протоколом/серверами (хотя мне не удалось запустить его на Питоне 3.6)

Для более конкретного примера: как мне взять следующий пример кода aiohttp из https://github.com/aio-libs/aiohttp:

from aiohttp import web

async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    text = "Hello, " + name
    return web.Response(text=text)

async def wshandler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    async for msg in ws:
        if msg.type == web.MsgType.text:
            await ws.send_str("Hello, {}".format(msg.data))
        elif msg.type == web.MsgType.binary:
            await ws.send_bytes(msg.data)
        elif msg.type == web.MsgType.close:
            break

    return ws


app = web.Application()
app.router.add_get('/echo', wshandler)
app.router.add_get('/', handle)
app.router.add_get('/{name}', handle)

web.run_app(app)

и следующий пример эхо-сервера TCP (http://asyncio.readthedocs.io/en/latest/tcp_echo.html):

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')
    print("Received %r from %r" % (message, addr))

    print("Send: %r" % message)
    writer.write(data)
    await writer.drain()

    print("Close the client socket")
    writer.close()

loop = asyncio.get_event_loop()
coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop)
server = loop.run_until_complete(coro)

# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

и объединить их в один сценарий, в котором любые сообщения, полученные через веб-сокеты или эхо-сервер TCP, отправлялись всем клиентам любого из них?

И как мне добавить кусок кода, который (скажем) каждую секунду отправлял сообщение всем клиентам (ради аргумента текущую временную метку)?


person Mark Rogers    schedule 14.03.2018    source источник
comment
Привет, мне любопытно, удалось ли вам решить эту проблему, используя код в моем ответе (или иным образом)?   -  person user4815162342    schedule 17.03.2018
comment
Я все еще работаю над этим; ваш ответ указал мне правильное направление, но я еще не совсем там!   -  person Mark Rogers    schedule 17.03.2018
comment
@user4815162342 user4815162342 Я опубликовал свой рабочий код в качестве ответа, хотя в настоящее время он не выходит чисто, что основано на вашем совете и может быть кому-то полезно. Однако помощь в том, чтобы он вышел чисто, будет оценена по достоинству!   -  person Mark Rogers    schedule 19.03.2018


Ответы (2)


Сначала вам нужно поместить все ваши сопрограммы в один цикл обработки событий. Вы можете начать с отказа от удобных API, которые запускают для вас цикл обработки событий, таких как run_app. Вместо web.run_app(app) напишите что-то вроде:

runner = aiohttp.web.AppRunner(app)
loop.run_until_complete(runner.setup())
# here you can specify the listen address and port
site = aiohttp.web.TCPSite(runner)    
loop.run_until_complete(site.start())

Затем запустите настройку эхо-сервера, и оба готовы совместно использовать цикл событий asyncio. В конце скрипта запустите цикл обработки событий с помощью loop.run_forever() (или любым другим способом, имеющим смысл в вашем приложении).

Чтобы транслировать информацию клиентам, создайте широковещательную сопрограмму и добавьте ее в цикл обработки событий:

# Broadcast data is transmitted through a global Future. It can be awaited
# by multiple clients, all of which will receive the broadcast. At each new
# iteration, a new future is created, to be picked up by new awaiters.
broadcast_data = loop.create_future()

async def broadcast():
    global broadcast_data
    while True:
        broadcast_data.set_result(datetime.datetime.now())
        broadcast_data = loop.create_future()
        await asyncio.sleep(1)

loop.create_task(broadcast())

Наконец, дождитесь трансляции в каждой сопрограмме, созданной для клиента, например handle_echo:

def handle_echo(r, w):
    while True:
        data = await broadcast_data
        # data contains the broadcast datetime - send it to the client
        w.write(str(data))

Должно быть несложно изменить сопрограмму обработчика веб-сокетов, чтобы она ожидала и ретранслировала широковещательные данные таким же образом.

person user4815162342    schedule 14.03.2018
comment
Я хотел просто указать на использование AppRunner, но ваш ответ потрясающий! Спасибо. - person Andrew Svetlov; 14.03.2018
comment
Я, вероятно, показываю, что у меня нет опыта работы с Python (программист более 30 лет, но Python только один из них), но я не могу найти AppRunner? (NameError: имя «AppRunner» не определено, я не могу понять, что мне нужно импортировать, чтобы получить его?) - person Mark Rogers; 15.03.2018
comment
@MarkRogers AppRunner является частью aiohttp, вы можете импортировать его из aiohttp.web_runner. Подробнее см. в документации. - person user4815162342; 15.03.2018
comment
@ user4815162342 Нашел, у меня была старая версия aiohttp, установленная из репозиториев дистрибутива, поэтому она не имела никакого смысла. Установка через pip исправила. - person Mark Rogers; 15.03.2018
comment
@user4815162342 user4815162342 Замена web.run_app() в первом примере на loop = asyncio.get_event_loop() ; runner = web.AppRunner(app) ; loop.run_until_complete(runner.setup()) ; loop.run_forever() не дает мне работающий веб-сервер (он просто зависает, пока я не уничтожу скрипт). Там явно есть что-то, чего мне не хватает. - person Mark Rogers; 15.03.2018
comment
@MarkRogers Извините, run_app на самом деле делает еще одну вещь: запускает TCPSite с адреса по умолчанию (0.0.0.0) и порта (8080 без SSL, 8443 с SSL). Теперь я изменил ответ, включив в него и эту часть. - person user4815162342; 15.03.2018
comment
(Обратите внимание: если кажется, что localhost работает, но зависает навсегда, то, скорее всего, отсутствует run_forever.) - person user202729; 01.01.2020

По совету @user4815162342 это мой «рабочий» код. Я публикую его как ответ, потому что это полный рабочий скрипт, который соответствует всем требованиям моего исходного вопроса, но он не идеален, поскольку в настоящее время он не завершается корректно.

При запуске он будет принимать веб-соединения через порт 8080 и tcp (например, telnet) через 8081. Любые сообщения, полученные через его веб-форму или telnet, будут транслироваться на все соединения. Кроме того, каждые 5 секунд будет транслироваться время.

Буду признателен за совет о том, как правильно выйти (ctrl+C при установленном веб-соединении генерирует несколько ошибок «Задача была уничтожена, но она находится в ожидании!»), поэтому я могу обновить это отвечать.

(Код довольно длинный, поскольку содержит встроенный HTML и JS для компонента веб-сокетов.)

import asyncio
from aiohttp import web
import aiohttp
import datetime
import re

queues = []

loop = asyncio.get_event_loop()

# Broadcast data is transmitted through a global Future. It can be awaited
# by multiple clients, all of which will receive the broadcast. At each new
# iteration, a new future is created, to be picked up by new awaiters.
broadcast_data = loop.create_future()

def broadcast(msg):
    global broadcast_data
    msg = str(msg)
    print(">> ", msg)
    if not broadcast_data.done():
        broadcast_data.set_result(msg)
    broadcast_data = loop.create_future()

# Dummy loop to broadcast the time every 5 seconds
async def broadcastLoop():
    while True:
        broadcast(datetime.datetime.now())
#       print('#',end='',flush=True)
        await asyncio.sleep(5)

# Handler for www requests
async def wwwhandler(r):
    host = re.search('https?://([^/]+)/', str(r.url)).group(1)
    name = r.match_info.get('name', "Anonymous")
    text = """<!DOCTYPE html>
<html>
    <head>
        <title>WebSocket PHP Open Group Chat App</title>
        <!-- <link type="text/css" rel="stylesheet" href="style.css" /> -->
        <script>
var output;
var websocket;

function WebSocketSupport() {
    if (browserSupportsWebSockets() === false) {
        document.getElementById("ws_support").innerHTML = "<h2>Sorry! Your web browser does not supports web sockets</h2>";
        var element = document.getElementById("wrapper");
        element.parentNode.removeChild(element);
        return;
    }

    output = document.getElementById("chatbox");

    websocket = new WebSocket('ws:{{HOST}}/ws');
    websocket.onopen    = function(e) { writeToScreen("You have have successfully connected to the server"); };
    websocket.onmessage = function(e) { onMessage(e) };
    websocket.onerror   = function(e) { onError(e) };
}

function onMessage(e)   { writeToScreen('<span style="color: blue;"> ' + e.data + '</span>'); }
function onError(e)     { writeToScreen('<span style="color: red;">ERROR:</span> ' + e.data); }

function doSend(message) {
    var validationMsg = userInputSupplied();
    if (validationMsg !== '') {
        alert(validationMsg);
        return;
    }
    var chatname = document.getElementById('chatname').value;

//  document.getElementById('msg').value = "";
//  document.getElementById('msg').focus();
    var msg = chatname + ' says: ' + message;
    websocket.send(msg);
    writeToScreen(msg);
}

function writeToScreen(message) {
    var pre = document.createElement("p");
    pre.style.wordWrap = "break-word";
    pre.innerHTML = message;
    output.appendChild(pre);
}

function userInputSupplied() {
    var chatname = document.getElementById('chatname').value;
    var msg = document.getElementById('msg').value;
    if (chatname === '') { return 'Please enter your username'; } 
    if (msg === '') { return 'Please the message to send'; } 
    return '';
}

function browserSupportsWebSockets() {
    if ("WebSocket" in window) { return true; } else { return false; }
}
    </script>
    </head>
    <body onload="javascript:WebSocketSupport()">
        <div id="ws_support"></div>

        <div id="wrapper">
            <div id="menu">
                <h3 class="welcome">Welcome to WebSocket PHP Open Group Chat App v1</h3>
            </div>

            <div id="chatbox"></div>

            <div id ="controls">
                <label for="name"><b>Name</b></label>
                <input name="chatname" type="text" id="chatname" size="67" placeholder="Type your name here" value="MyName" />
                <input name="msg" type="text" id="msg" size="63" placeholder="Type your message here" value="Test" />
                <input name="sendmsg" type="submit"  id="sendmsg" value="Send" onclick="doSend(document.getElementById('msg').value)" />
            </div>
        </div>
    </body>
</html>"""
    text = text.replace('{{HOST}}', host)
    return web.Response(text=text, headers={'content-type':'text/html'})

# Handler for websocket connections
async def wshandler(r):
    # Get the websocket connection
    ws = web.WebSocketResponse()
    await ws.prepare(r)    
    # Append it to list so we can manage it later if needed
    r.app['websockets'].append(ws)

    try:
        # Create the broadcast task, and add it to list for later management
        echo_task = asyncio.Task(echo_loop(ws))
        r.app['tasks'].append(echo_task)

        # Tell the world we've connected
        # Note: Connecting client won't get this message, not really sure why
        broadcast('Hello {}'.format(r.remote))
#       await ws.send_str('Hello {}'.format(r.remote))

        # Loop through any messages we get from the client
        async for msg in ws:
            # .. and broadcast them
            if msg.type == web.WSMsgType.TEXT:
                print('<< ', msg.data)
                broadcast(msg.data)
                #            await ws.send_str("Hello, {}".format(msg.data))
                #        elif msg.type == web.WSMsgType.BINARY:
                #            await ws.send_bytes(msg.data)
            elif msg.type == web.WSMsgType.CLOSE:
                print('WS Connection closed')
                break
            elif msg.type == web.WSMsgType.ERROR:
                print('WS Connection closed with exception %s' % ws.exception())
                break
            else:
                print('WS Connection received unknown message type %2' % msg.type)

        # ws has stopped sending us data so broadcast goodbye 
        broadcast('Goodbye {}'.format(r.remote))
    except GeneratorExit:
        pass
    finally:
        # Close the ws and remove it from the list
        await ws.close()
        r.app['websockets'].remove(ws)

        # Cancel the task and remove it from the list
        # Note: cancel() only requests cancellation, it doesn't wait for it
        echo_task.cancel()
        r.app['tasks'].remove(echo_task)

    return ws

# ws broadcast loop: Each WS connection gets one of these which waits for broadcast data then sends it
async def echo_loop(ws):
    while True:
        msg = await broadcast_data
        await ws.send_str(str(msg))

# web app shutdown code: cancels any open tasks and closes any open websockets
# Only partially working   
async def on_shutdown(app):
    print('Shutting down:', end='')
    for t in app['tasks']:
        print('#', end='')
        if not t.cancelled():
            t.cancel()
    for ws in app['websockets']:
        print('.', end='')
        await ws.close(code=aiohttp.WSCloseCode.GOING_AWAY, message='Server Shutdown')
    print(' Done!')

# Code to handle TCP connections
async def echo_loop_tcp(writer):
    while True:
        msg = await broadcast_data
        writer.write( (msg + "\r\n").encode() )
        await writer.drain()

async def handle_echo(reader, writer):
    echo_task = asyncio.Task(echo_loop_tcp(writer))
    while True:
        data = await reader.readline()
        if not data:
            break
        message = data.decode().strip()
#       addr = writer.get_extra_info('peername')
        broadcast(message)

    print("Connection dropped")
    echo_task.cancel()

tcpServer = loop.run_until_complete(asyncio.start_server(handle_echo, '0.0.0.0', 8081, loop=loop))
print('Serving on {}'.format(tcpServer.sockets[0].getsockname()))

# The application code:
app = web.Application()
app['websockets'] = []
app['tasks'] = []
app.router.add_get('/ws', wshandler)
app.router.add_get('/', wwwhandler)
app.router.add_get('/{name}', wwwhandler)
app.on_shutdown.append(on_shutdown)

def main():
    # Kick off the 5s loop
    tLoop=loop.create_task(broadcastLoop())

    # Kick off the web/ws server
    async def start():
        global runner, site
        runner = web.AppRunner(app)
        await runner.setup()
        site = web.TCPSite(runner, '0.0.0.0', 8080)
        await site.start()

    async def end():
        await app.shutdown()

    loop.run_until_complete(start())

    # Main program "loop"
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        # On exit, kill the 5s loop
        tLoop.cancel()
        # .. and kill the web/ws server
        loop.run_until_complete( end() )

    # Stop the main event loop
    loop.close()

if __name__ == '__main__':
    main()
person Mark Rogers    schedule 19.03.2018