Потоковое видео через Autobahn WebSocket

Я захватываю кадр видео из OpenCV VideoCapture.read () и отправляю кадр на сервер WebSocket (Twisted с Autobahn WebSocket API), я также использую интерфейс Twisted IPushProducer для потоковой передачи данных в WebSocket, и в конечном итоге он очищает камеру при отправке обратно клиенту.

Вот мой код.

server.py

import cv2
import cv2.cv as cv
import numpy as np
from autobahn.twisted.websocket import  WebSocketServerProtocol, \
                                    WebSocketServerFactory, \
                                    listenWS
from VideoStreamClient import BATH_SIZE

class VideoStreamServerProtocol(WebSocketServerProtocol):
    def onConnect(self,request):
        print("Client connecting: {0}".format(request.peer))

    def onOpen(self):
        print("WebSocket connection open.")

    def onClose(self, wasClean, code, reason):
        print("WebSocket connection closed: {0}".format(reason))

    def onMessageBegin(self, isBinary):
        WebSocketServerProtocol.onMessageBegin(self, isBinary)

    def onMessageFrameBegin(self, length):
        WebSocketServerProtocol.onMessageFrameBegin(self, length)
        self.received = 0
        self.next = BATCH_SIZE

    def onMessageFrameData(self, payload):
        self.received += len(payload)
        if self.received >= self.next:
            self.sendMessageFrameData(payload,isBinary=True)
            self.received = 0;

    def onMessageFrameEnd(self):
        pass

    def onMessageEnd(self):
        pass

class VideoStreamServerFactory(WebSocketServerFactory):
    protocol = VideoStreamServerProtocol

    def __init__(self):
        WebSocketServerFactory.__init__(self,"ws://localhost:9000", debug = False)

if __name__ == '__main__':
   import sys
   from twisted.python import log
   from twisted.internet import reactor

   log.startLogging(sys.stdout)

   factory = VideoStreamServerFactory()
   listenWS(factory)
   reactor.run()

client.py

from autobahn.twisted.websocket import WebSocketClientFactory, \
                                   WebSocketClientProtocol, \
                                   connectWS

from zope.interface import implementer
from twisted.internet import reactor, interfaces

import cv2
import numpy as np

FRAME_SIZE = 0x7FFFFFFFFFFFFFFF
BATCH_SIZE = 1 * 2**20

@implementer(interfaces.IPushProducer)
class VideoStreamProducer:
    def __init__(self,proto):
        self.proto = proto
        self.started = False
        self.paused = False

    def pauseProducing(self):
        self.paused = True

    def resumeProducing(self):
        self.paused = False
        if not self.started:
            self.cap = cv2.VideoCapture(0)
            self.cap.set(cv2.cv.CV_CAP_PROP_FRAME_WIDTH, 640)
            self.cap.set(cv2.cv.CV_CAP_PROP_FRAME_HEIGHT, 480)

            self.proto.beginMessage(isBinary=True)
            self.proto.beginMessageFrame(FRAME_SIZE)
            self.started = True
        while not self.paused:
            isSuccess, frame = self.cap.read()
            _, data = cv2.imencode(".jpg",frame,encode_param)
            if isSuccess:
                if self.proto.sendMessageFrameData(data)<=0:
                    self.proto.beginMessageFrame(FRAME_SIZE)

    def stopProducing(self):
        self.cap.release()

class VideoStreamClientProtocol(WebSocketClientProtocol):
    def onConnect(self,response):
        pass

    def onOpen(self):
        producer = VideoStreamProducer(self)
        self.registerProducer(producer, True)
        producer.resumeProducing()

    def onMessage(self, payload, isBinary):
        print(len(payload))

if __name__ == '__main__':
    factory = WebSocketClientFactory("ws://localhost:9000")
    factory.protocol = VideoStreamClientProtocol
    connectWS(factory)
    reactor.run()

person Greatxam Darthart    schedule 06.05.2014    source источник


Ответы (1)


Это бесконечный цикл, который предотвращает любую обработку события после его начала:

while not self.paused:
    isSuccess, frame = self.cap.read()
    _, data = cv2.imencode(".jpg",frame,encode_param)
    if isSuccess:
        if self.proto.sendMessageFrameData(data)<=0:
            self.proto.beginMessageFrame(FRAME_SIZE)

Помните, что Twisted использует однопоточную кооперативную многозадачность для параллелизма. Этот цикл связывает (единственный) поток реактора и не взаимодействует с другими задачами.

Если вы хотите запускать какой-либо код несколько раз, взгляните на twisted.internet.task.LoopingCall или twisted.internet.task.cooperate.

person Jean-Paul Calderone    schedule 06.05.2014
comment
Спасибо, я попробую. - person Greatxam Darthart; 07.05.2014
comment
Привет, я бы удалил условие WHILE, так как я использую IPushProducer. Но все равно теряет камеру. - person Greatxam Darthart; 07.05.2014
comment
Не знаю, что означает потеря камеры. - person Jean-Paul Calderone; 07.05.2014
comment
Камера перестает подавать кадр и убирается. Может быть, это из-за блокировки потока пользовательского интерфейса. Я попробую LoopingCall и вернусь, если он работает. - person Greatxam Darthart; 08.05.2014
comment
Спасибо за совет. Я использовал threads.deferToThread, но все же потоковая передача не в реальном времени. Я бы поместил свой источник в github.com/dakilasoft/twistedcv, а также включил бы живую демонстрацию. - person Greatxam Darthart; 28.05.2014