0


Python 使用Flask传输视频流

文章目录

前言

最近有一项任务,将视频中目标检测的结果传输给前端。这个任务很好实现,按照实际,对每一帧的视频进行检测然后返回图像流在前端进行展示。然而上头要求不返回视频流,要的是返回检测结果。听到这项任务就纳闷,理论上只需要返回数据流,但是又感觉哪里有点说不出奇怪,于是写下这篇文章梳理整个视频流返回。本篇博客主要参考:Video Streaming with Flask和Flask Video Streaming Revisited。代码请参考:flask-video-streaming。

Streaming

在Flask中用到流的主要有以下两个应用场景:

  • large response 在返回的数据块较大的时候,使用流的方式生成并返回,是一种较好的解决方案。当然你也可以将返回响应写在磁盘中,然后返回一个文件flask.send_file()。但是这种情况会增加额外的I/O开销。
  • 实时数据传输 实时数据传输,如视频或者语音传输,可以使用流传输。

Flask实现流

Flask通过使用

generator functions

为流响应提供支持,一个

generator function

如下所示:

defgen():yield1yield2yield3

通过上面简单理解了生成器,接下来下面的实例显示了如何使用流来处理生成大型数据报表并返回:

from flask import Response, render_template
from app.models import Stock

defgenerate_stock_table():yield render_template('stock_header.html')for stock in Stock.query.all():yield render_template('stock_row.html', stock=stock)yield render_template('stock_footer.html')@app.route('/stock-table')defstock_table():return Response(generate_stock_table())

在这个例子当中,返回流的响应路由需要返回一个由使用生成器函数初始化对象的

Response

,然后

Flask

负责调用生成器将结果分块发送给客户端。这样做的好处是程序当中需要生成一个很大的数据块,而通过流传输,响应返回请求不会随着你的块变大而变大。

流除了能够将将数据块大的进行分块之外,还能提供

Multipart Responses

。在这一方面最主要的应用场景是视频流或者音频流的返回播放。在这当中,流的一个有趣用途是让每个块替换页面的前一个块,这使得流能够在浏览器窗口中“播放”。

Multipart/Response

由一个包含多部分内容类型之一的标头组成,紧接着是边界标记的分割部分,每个部分都有自己的特定内容类型。以下是

Multipart

视频流的结构:

HTTP/1.1 200 OK
Content-Type: multipart/x-mixed-replace; boundary=frame

--frame
Content-Type: image/jpeg

<jpeg data here>
--frame
Content-Type: image/jpeg

<jpeg data here>
...

如上所述,头的

Content-Type

设置为

multipart/x-mixed-replace

以及定义bouondary。然后包括每一帧数据,以

--

为前缀,并在各自行中添加边界字符串以及

Content-type

的标头,每个部分都可以选择包含一个

Content-Length

,说明有效payload的字节长度。

在了解完上面基础知识后,接下来就构建实时视频流服务器。原理比较简单,或者视频中的每一帧然后以流的方式通过

Multipart/Response

返回给客户端。

构建实时视频流

一个简单的FlaskWeb程序,提供Motion JPEG流,注意Motion JPEG应用广泛。这种方法延迟低,但质量不是最好的,因为 JPEG 压缩对于运动视频不是很有效。
从摄像机中获取视频帧:

from time import time

classCamera(object):def__init__(self):
        self.frames =[open(f +'.jpg','rb').read()for f in['1','2','3']]defget_frame(self):return self.frames[int(time())%3]

上面一部分代码是实例,针对没有摄像头设备的进行调试,通过在读取工程下面的图像来构建图像流。

#!/usr/bin/env pythonfrom flask import Flask, render_template, Response
from camera import Camera

app = Flask(__name__)@app.route('/')defindex():return render_template('index.html')defgen(camera):whileTrue:
        frame = camera.get_frame()yield(b'--frame\r\n'b'Content-Type: image/jpeg\r\n\r\n'+ frame +b'\r\n')@app.route('/video_feed')defvideo_feed():return Response(gen(Camera()),
                    mimetype='multipart/x-mixed-replace; boundary=frame')if __name__ =='__main__':
    app.run(host='0.0.0.0', debug=True)

此应用程序定义了一个

Camera

负责提供帧序列的类。前端HTML内容:

<html><head><title>Video Streaming Demonstration</title></head><body><h1>Video Streaming Demonstration</h1><imgsrc="{{ url_for('video_feed') }}"></body></html>
video_feed

路由中调用

gen

生成器函数,该函数调用```Camera``类来获取视频流。整个流程都是比较简单。然而使用流也有一些限制,当Flask应用程序处理常规请求的时候,请求周期很短。Web Worker接受请求,调用处理函数并最终返回响应给客户端。当客户端接收到的是流,那么需要在流传输期间,客户端要保持连接。另一方面当客户端断开连接的时候,服务端可能也在一直给客户端提供服务,难以关闭流的传输,同时该服务只能提供给相同Web Worker数的客户端。有一些方法能够克服以上的问题,那就是使用协程或者多线程。接下来看看如何对上面的程序进行优化。

视频流优化

上面的视频流程序主要存在两个问题一是如何结束传输数据流,二是如何单个服务给多个客户端提供服务。
首先对于第一个问题,原理是记录最后一次响应的时间戳,如果最后一次响应时间戳与当前时间戳相差大于阈值(可以设定为十秒,但是不能过小,否则会导致无法正常请求)。下面是优化的代码:

  1. 定义Camera基类:
classBaseCamera(object):
    thread =None# background thread that reads frames from camera
    frame =None# current frame is stored here by background thread
    last_access =0# time of last client access to the camera# ...@staticmethoddefframes():"""Generator that returns frames from the camera."""raise RuntimeError('Must be implemented by subclasses.')@classmethoddef_thread(cls):"""Camera background thread."""print('Starting camera thread.')
        frames_iterator = cls.frames()for frame in frames_iterator:
            BaseCamera.frame = frame

            # if there hasn't been any clients asking for frames in# the last 10 seconds then stop the threadif time.time()- BaseCamera.last_access >10:
                frames_iterator.close()print('Stopping camera thread due to inactivity.')break
        BaseCamera.thread =None
  1. 继承BaseCameraCamera类:
classCamera(BaseCamera):"""An emulated camera implementation that streams a repeated sequence of
    files 1.jpg, 2.jpg and 3.jpg at a rate of one frame per second."""
    imgs =[open(f +'.jpg','rb').read()for f in['1','2','3']]@staticmethoddefframes():whileTrue:
            time.sleep(1)yield Camera.imgs[int(time.time())%3]

接着对于第二个问题,针对多客户端请求方式性能提升问题,可以使用多线程的方式进行处理,另一方面在测试中发现服务器消耗了大量的 CPU。原因是后台线程捕获帧和将这些帧提供给客户端的生成器之间没有同步。两者都尽可能快地运行,而不考虑对方的速度。
所以需要有一种机制,生成器只向客户端传递原始帧,如果生成器内部的传递循环比相机线程的帧速率快,那么生成器应该等到有新的帧可用,以便它自己调整速度以匹配相机速率。另一方面,如果传递循环的运行速度比相机线程慢,那么它在处理帧时永远不会落后,而是应该跳过帧以始终传递最新的帧。解决方案是让相机线程在新帧可用时向正在运行的生成器发出信号。然后,生成器可以在发送下一帧之前等待信号时阻塞。
为了避免在生成器中添加事件处理逻辑,实现一个自定义事件类,它使用调用者的线程 id 为每个客户端线程自动创建和管理单独的事件。

classCameraEvent(object):"""An Event-like class that signals all active clients when a new frame is available.
    """def__init__(self):
        self.events ={}defwait(self):"""Invoked from each client's thread to wait for the next frame."""
        ident = get_ident()if ident notin self.events:# this is a new client# add an entry for it in the self.events dict# each entry has two elements, a threading.Event() and a timestamp
            self.events[ident]=[threading.Event(), time.time()]return self.events[ident][0].wait()defset(self):"""Invoked by the camera thread when a new frame is available."""
        now = time.time()
        remove =Nonefor ident, event in self.events.items():ifnot event[0].isSet():# if this client's event is not set, then set it# also update the last set timestamp to now
                event[0].set()
                event[1]= now
            else:# if the client's event is already set, it means the client# did not process a previous frame# if the event stays set for more than 5 seconds, then assume# the client is gone and remove itif now - event[1]>5:
                    remove = ident
        if remove:del self.events[remove]defclear(self):"""Invoked from each client's thread after a frame was processed."""
        self.events[get_ident()][0].clear()
classBaseCamera(object):# ...
    event = CameraEvent()# ...defget_frame(self):"""Return the current camera frame."""
        BaseCamera.last_access = time.time()# wait for a signal from the camera thread
        BaseCamera.event.wait()
        BaseCamera.event.clear()return BaseCamera.frame

    @classmethoddef_thread(cls):# ...for frame in frames_iterator:
            BaseCamera.frame = frame
            BaseCamera.event.set()# send signal to clients# ...

整体代码:

base_camera.py
import time
import threading
try:from greenlet import getcurrent as get_ident
except ImportError:try:from thread import get_ident
    except ImportError:from _thread import get_ident

classCameraEvent(object):"""An Event-like class that signals all active clients when a new frame is
    available.
    """def__init__(self):
        self.events ={}defwait(self):"""Invoked from each client's thread to wait for the next frame."""
        ident = get_ident()if ident notin self.events:# this is a new client# add an entry for it in the self.events dict# each entry has two elements, a threading.Event() and a timestamp
            self.events[ident]=[threading.Event(), time.time()]return self.events[ident][0].wait()defset(self):"""Invoked by the camera thread when a new frame is available."""
        now = time.time()
        remove =Nonefor ident, event in self.events.items():ifnot event[0].isSet():# if this client's event is not set, then set it# also update the last set timestamp to now
                event[0].set()
                event[1]= now
            else:# if the client's event is already set, it means the client# did not process a previous frame# if the event stays set for more than 5 seconds, then assume# the client is gone and remove itif now - event[1]>5:
                    remove = ident
        if remove:del self.events[remove]defclear(self):"""Invoked from each client's thread after a frame was processed."""
        self.events[get_ident()][0].clear()classBaseCamera(object):
    thread =None# background thread that reads frames from camera
    frame =None# current frame is stored here by background thread
    last_access =0# time of last client access to the camera
    event = CameraEvent()def__init__(self):"""Start the background camera thread if it isn't running yet."""if BaseCamera.thread isNone:
            BaseCamera.last_access = time.time()# start background frame thread
            BaseCamera.thread = threading.Thread(target=self._thread)
            BaseCamera.thread.start()# wait until first frame is available
            BaseCamera.event.wait()defget_frame(self):"""Return the current camera frame."""
        BaseCamera.last_access = time.time()# wait for a signal from the camera thread
        BaseCamera.event.wait()
        BaseCamera.event.clear()return BaseCamera.frame

    @staticmethoddefframes():""""Generator that returns frames from the camera."""raise RuntimeError('Must be implemented by subclasses.')@classmethoddef_thread(cls):"""Camera background thread."""print('Starting camera thread.')
        frames_iterator = cls.frames()for frame in frames_iterator:
            BaseCamera.frame = frame
            BaseCamera.event.set()# send signal to clients
            time.sleep(0)# if there hasn't been any clients asking for frames in# the last 10 seconds then stop the threadif time.time()- BaseCamera.last_access >10:
                frames_iterator.close()print('Stopping camera thread due to inactivity.')break
        BaseCamera.thread =None
camera.py
import os
import cv2
from base_camera import BaseCamera

classCamera(BaseCamera):
    video_source =0def__init__(self):if os.environ.get('OPENCV_CAMERA_SOURCE'):
            Camera.set_video_source(int(os.environ['OPENCV_CAMERA_SOURCE']))super(Camera, self).__init__()@staticmethoddefset_video_source(source):
        Camera.video_source = source

    @staticmethoddefframes():
        camera = cv2.VideoCapture(Camera.video_source)ifnot camera.isOpened():raise RuntimeError('Could not start camera.')whileTrue:# read current frame
            _, img = camera.read()# encode as a jpeg image and return ityield cv2.imencode('.jpg', img)[1].tobytes()
app.py
#!/usr/bin/env pythonfrom importlib import import_module
import os
from flask import Flask, render_template, Response

# import camera driverif os.environ.get('CAMERA'):
    Camera = import_module('camera_'+ os.environ['CAMERA']).Camera
else:from camera import Camera

# Raspberry Pi camera module (requires picamera package)# from camera_pi import Camera

app = Flask(__name__)@app.route('/')defindex():"""Video streaming home page."""return render_template('index.html')defgen(camera):"""Video streaming generator function."""yieldb'--frame\r\n'whileTrue:
        frame = camera.get_frame()yieldb'Content-Type: image/jpeg\r\n\r\n'+ frame +b'\r\n--frame\r\n'@app.route('/video_feed')defvideo_feed():"""Video streaming route. Put this in the src attribute of an img tag."""return Response(gen(Camera()),
                    mimetype='multipart/x-mixed-replace; boundary=frame')if __name__ =='__main__':
    app.run(host='0.0.0.0', threaded=True)

具体代码可以参考我的github:Flask-video-Stream

标签: python flask 后端

本文转载自: https://blog.csdn.net/u012655441/article/details/124798348
版权归原作者 RyanC3 所有, 如有侵权,请联系我们删除。

“Python 使用Flask传输视频流”的评论:

还没有评论