文章目录
前言
最近有一项任务,将视频中目标检测的结果传输给前端。这个任务很好实现,按照实际,对每一帧的视频进行检测然后返回图像流在前端进行展示。然而上头要求不返回视频流,要的是返回检测结果。听到这项任务就纳闷,理论上只需要返回数据流,但是又感觉哪里有点说不出奇怪,于是写下这篇文章梳理整个视频流返回。本篇博客主要参考:Video Streaming with Flask和Flask Video Streaming Revisited。代码请参考:flask-video-streaming。
Streaming
在Flask中用到流的主要有以下两个应用场景:
- large response 在返回的数据块较大的时候,使用流的方式生成并返回,是一种较好的解决方案。当然你也可以将返回响应写在磁盘中,然后返回一个文件
flask.send_file()
。但是这种情况会增加额外的I/O开销。 - 实时数据传输 实时数据传输,如视频或者语音传输,可以使用流传输。
Flask实现流
Flask通过使用
generator functions
为流响应提供支持,一个
generator function
如下所示:
defgen():yield1yield2yield3
通过上面简单理解了生成器,接下来下面的实例显示了如何使用流来处理生成大型数据报表并返回:
from flask import Response, render_template
from app.models import Stock
defgenerate_stock_table():yield render_template('stock_header.html')for stock in Stock.query.all():yield render_template('stock_row.html', stock=stock)yield render_template('stock_footer.html')@app.route('/stock-table')defstock_table():return Response(generate_stock_table())
在这个例子当中,返回流的响应路由需要返回一个由使用生成器函数初始化对象的
Response
,然后
Flask
负责调用生成器将结果分块发送给客户端。这样做的好处是程序当中需要生成一个很大的数据块,而通过流传输,响应返回请求不会随着你的块变大而变大。
流除了能够将将数据块大的进行分块之外,还能提供
Multipart Responses
。在这一方面最主要的应用场景是视频流或者音频流的返回播放。在这当中,流的一个有趣用途是让每个块替换页面的前一个块,这使得流能够在浏览器窗口中“播放”。
Multipart/Response
由一个包含多部分内容类型之一的标头组成,紧接着是边界标记的分割部分,每个部分都有自己的特定内容类型。以下是
Multipart
视频流的结构:
HTTP/1.1 200 OK
Content-Type: multipart/x-mixed-replace; boundary=frame
--frame
Content-Type: image/jpeg
<jpeg data here>
--frame
Content-Type: image/jpeg
<jpeg data here>
...
如上所述,头的
Content-Type
设置为
multipart/x-mixed-replace
以及定义bouondary。然后包括每一帧数据,以
--
为前缀,并在各自行中添加边界字符串以及
Content-type
的标头,每个部分都可以选择包含一个
Content-Length
,说明有效payload的字节长度。
在了解完上面基础知识后,接下来就构建实时视频流服务器。原理比较简单,或者视频中的每一帧然后以流的方式通过
Multipart/Response
返回给客户端。
构建实时视频流
一个简单的FlaskWeb程序,提供Motion JPEG流,注意Motion JPEG应用广泛。这种方法延迟低,但质量不是最好的,因为 JPEG 压缩对于运动视频不是很有效。
从摄像机中获取视频帧:
from time import time
classCamera(object):def__init__(self):
self.frames =[open(f +'.jpg','rb').read()for f in['1','2','3']]defget_frame(self):return self.frames[int(time())%3]
上面一部分代码是实例,针对没有摄像头设备的进行调试,通过在读取工程下面的图像来构建图像流。
#!/usr/bin/env pythonfrom flask import Flask, render_template, Response
from camera import Camera
app = Flask(__name__)@app.route('/')defindex():return render_template('index.html')defgen(camera):whileTrue:
frame = camera.get_frame()yield(b'--frame\r\n'b'Content-Type: image/jpeg\r\n\r\n'+ frame +b'\r\n')@app.route('/video_feed')defvideo_feed():return Response(gen(Camera()),
mimetype='multipart/x-mixed-replace; boundary=frame')if __name__ =='__main__':
app.run(host='0.0.0.0', debug=True)
此应用程序定义了一个
Camera
负责提供帧序列的类。前端HTML内容:
<html><head><title>Video Streaming Demonstration</title></head><body><h1>Video Streaming Demonstration</h1><imgsrc="{{ url_for('video_feed') }}"></body></html>
video_feed
路由中调用
gen
生成器函数,该函数调用```Camera``类来获取视频流。整个流程都是比较简单。然而使用流也有一些限制,当Flask应用程序处理常规请求的时候,请求周期很短。Web Worker接受请求,调用处理函数并最终返回响应给客户端。当客户端接收到的是流,那么需要在流传输期间,客户端要保持连接。另一方面当客户端断开连接的时候,服务端可能也在一直给客户端提供服务,难以关闭流的传输,同时该服务只能提供给相同Web Worker数的客户端。有一些方法能够克服以上的问题,那就是使用协程或者多线程。接下来看看如何对上面的程序进行优化。
视频流优化
上面的视频流程序主要存在两个问题一是如何结束传输数据流,二是如何单个服务给多个客户端提供服务。
首先对于第一个问题,原理是记录最后一次响应的时间戳,如果最后一次响应时间戳与当前时间戳相差大于阈值(可以设定为十秒,但是不能过小,否则会导致无法正常请求)。下面是优化的代码:
- 定义
Camera
基类:
classBaseCamera(object):
thread =None# background thread that reads frames from camera
frame =None# current frame is stored here by background thread
last_access =0# time of last client access to the camera# ...@staticmethoddefframes():"""Generator that returns frames from the camera."""raise RuntimeError('Must be implemented by subclasses.')@classmethoddef_thread(cls):"""Camera background thread."""print('Starting camera thread.')
frames_iterator = cls.frames()for frame in frames_iterator:
BaseCamera.frame = frame
# if there hasn't been any clients asking for frames in# the last 10 seconds then stop the threadif time.time()- BaseCamera.last_access >10:
frames_iterator.close()print('Stopping camera thread due to inactivity.')break
BaseCamera.thread =None
- 继承
BaseCamera
的Camera
类:
classCamera(BaseCamera):"""An emulated camera implementation that streams a repeated sequence of
files 1.jpg, 2.jpg and 3.jpg at a rate of one frame per second."""
imgs =[open(f +'.jpg','rb').read()for f in['1','2','3']]@staticmethoddefframes():whileTrue:
time.sleep(1)yield Camera.imgs[int(time.time())%3]
接着对于第二个问题,针对多客户端请求方式性能提升问题,可以使用多线程的方式进行处理,另一方面在测试中发现服务器消耗了大量的 CPU。原因是后台线程捕获帧和将这些帧提供给客户端的生成器之间没有同步。两者都尽可能快地运行,而不考虑对方的速度。
所以需要有一种机制,生成器只向客户端传递原始帧,如果生成器内部的传递循环比相机线程的帧速率快,那么生成器应该等到有新的帧可用,以便它自己调整速度以匹配相机速率。另一方面,如果传递循环的运行速度比相机线程慢,那么它在处理帧时永远不会落后,而是应该跳过帧以始终传递最新的帧。解决方案是让相机线程在新帧可用时向正在运行的生成器发出信号。然后,生成器可以在发送下一帧之前等待信号时阻塞。
为了避免在生成器中添加事件处理逻辑,实现一个自定义事件类,它使用调用者的线程 id 为每个客户端线程自动创建和管理单独的事件。
classCameraEvent(object):"""An Event-like class that signals all active clients when a new frame is available.
"""def__init__(self):
self.events ={}defwait(self):"""Invoked from each client's thread to wait for the next frame."""
ident = get_ident()if ident notin self.events:# this is a new client# add an entry for it in the self.events dict# each entry has two elements, a threading.Event() and a timestamp
self.events[ident]=[threading.Event(), time.time()]return self.events[ident][0].wait()defset(self):"""Invoked by the camera thread when a new frame is available."""
now = time.time()
remove =Nonefor ident, event in self.events.items():ifnot event[0].isSet():# if this client's event is not set, then set it# also update the last set timestamp to now
event[0].set()
event[1]= now
else:# if the client's event is already set, it means the client# did not process a previous frame# if the event stays set for more than 5 seconds, then assume# the client is gone and remove itif now - event[1]>5:
remove = ident
if remove:del self.events[remove]defclear(self):"""Invoked from each client's thread after a frame was processed."""
self.events[get_ident()][0].clear()
classBaseCamera(object):# ...
event = CameraEvent()# ...defget_frame(self):"""Return the current camera frame."""
BaseCamera.last_access = time.time()# wait for a signal from the camera thread
BaseCamera.event.wait()
BaseCamera.event.clear()return BaseCamera.frame
@classmethoddef_thread(cls):# ...for frame in frames_iterator:
BaseCamera.frame = frame
BaseCamera.event.set()# send signal to clients# ...
整体代码:
base_camera.py
import time
import threading
try:from greenlet import getcurrent as get_ident
except ImportError:try:from thread import get_ident
except ImportError:from _thread import get_ident
classCameraEvent(object):"""An Event-like class that signals all active clients when a new frame is
available.
"""def__init__(self):
self.events ={}defwait(self):"""Invoked from each client's thread to wait for the next frame."""
ident = get_ident()if ident notin self.events:# this is a new client# add an entry for it in the self.events dict# each entry has two elements, a threading.Event() and a timestamp
self.events[ident]=[threading.Event(), time.time()]return self.events[ident][0].wait()defset(self):"""Invoked by the camera thread when a new frame is available."""
now = time.time()
remove =Nonefor ident, event in self.events.items():ifnot event[0].isSet():# if this client's event is not set, then set it# also update the last set timestamp to now
event[0].set()
event[1]= now
else:# if the client's event is already set, it means the client# did not process a previous frame# if the event stays set for more than 5 seconds, then assume# the client is gone and remove itif now - event[1]>5:
remove = ident
if remove:del self.events[remove]defclear(self):"""Invoked from each client's thread after a frame was processed."""
self.events[get_ident()][0].clear()classBaseCamera(object):
thread =None# background thread that reads frames from camera
frame =None# current frame is stored here by background thread
last_access =0# time of last client access to the camera
event = CameraEvent()def__init__(self):"""Start the background camera thread if it isn't running yet."""if BaseCamera.thread isNone:
BaseCamera.last_access = time.time()# start background frame thread
BaseCamera.thread = threading.Thread(target=self._thread)
BaseCamera.thread.start()# wait until first frame is available
BaseCamera.event.wait()defget_frame(self):"""Return the current camera frame."""
BaseCamera.last_access = time.time()# wait for a signal from the camera thread
BaseCamera.event.wait()
BaseCamera.event.clear()return BaseCamera.frame
@staticmethoddefframes():""""Generator that returns frames from the camera."""raise RuntimeError('Must be implemented by subclasses.')@classmethoddef_thread(cls):"""Camera background thread."""print('Starting camera thread.')
frames_iterator = cls.frames()for frame in frames_iterator:
BaseCamera.frame = frame
BaseCamera.event.set()# send signal to clients
time.sleep(0)# if there hasn't been any clients asking for frames in# the last 10 seconds then stop the threadif time.time()- BaseCamera.last_access >10:
frames_iterator.close()print('Stopping camera thread due to inactivity.')break
BaseCamera.thread =None
camera.py
import os
import cv2
from base_camera import BaseCamera
classCamera(BaseCamera):
video_source =0def__init__(self):if os.environ.get('OPENCV_CAMERA_SOURCE'):
Camera.set_video_source(int(os.environ['OPENCV_CAMERA_SOURCE']))super(Camera, self).__init__()@staticmethoddefset_video_source(source):
Camera.video_source = source
@staticmethoddefframes():
camera = cv2.VideoCapture(Camera.video_source)ifnot camera.isOpened():raise RuntimeError('Could not start camera.')whileTrue:# read current frame
_, img = camera.read()# encode as a jpeg image and return ityield cv2.imencode('.jpg', img)[1].tobytes()
app.py
#!/usr/bin/env pythonfrom importlib import import_module
import os
from flask import Flask, render_template, Response
# import camera driverif os.environ.get('CAMERA'):
Camera = import_module('camera_'+ os.environ['CAMERA']).Camera
else:from camera import Camera
# Raspberry Pi camera module (requires picamera package)# from camera_pi import Camera
app = Flask(__name__)@app.route('/')defindex():"""Video streaming home page."""return render_template('index.html')defgen(camera):"""Video streaming generator function."""yieldb'--frame\r\n'whileTrue:
frame = camera.get_frame()yieldb'Content-Type: image/jpeg\r\n\r\n'+ frame +b'\r\n--frame\r\n'@app.route('/video_feed')defvideo_feed():"""Video streaming route. Put this in the src attribute of an img tag."""return Response(gen(Camera()),
mimetype='multipart/x-mixed-replace; boundary=frame')if __name__ =='__main__':
app.run(host='0.0.0.0', threaded=True)
具体代码可以参考我的github:Flask-video-Stream
版权归原作者 RyanC3 所有, 如有侵权,请联系我们删除。