SpringBoot整合ffmpeg实现动态拉流转推
在最近的开发中,遇到一个 rtsp 协议的视频流,前端vue并不能直接播放,因此需要对流进行处理。在网上查阅后,ffmpeg和webrtc是最多的解决方案,但是使用webrtc的时候没成功,所以选择ffmpeg。下面介绍一下整体的实现步骤。
一、搭建 ffmepg
- 安装升级必要的编译工具和库
sudo yum install-y epel-release
sudo yum install-y\
autoconf automake bzip2 cmake freetype-devel gcc gcc-c++ git libtool make\
mercurial nasm pkgconfig zlib-devel
- 安装 yasm 和 nasm
sudo yum install-y yasm nasm
- 安装第三方更新源
sudo yum localinstall --nogpgcheck https://download1.rpmfusion.org/free/el/rpmfusion-free-release-7.noarch.rpm
- 安装 ffmpeg
yum install ffmpeg ffmpeg-devel -y
- 查看版本
ffmpeg -version
版本比较低,但是在网上的yum安装方式,版本都差不多。也可以通过官网的源码包,安装最新的版本。
- 测试 ffmepg 功能
如果没有可以测试的流地址,可以参考这个网站RTSP 测试地址,不确保每个都可以用,可以用vlc播放器测试一下流能不能用。
找到可以使用的流后,通过ffmpeg指令,测试转码功能。参考下面的指令。
ffmpeg -rtsp_transport tcp -analyzeduration50000000-probesize50000000-i"rtsp://stream.strba.sk:1935/strba/VYHLAD_JAZERO.stream"-c:v h264 -c:a aac -strict-2 /root/1.mp4
如果ffmpeg正常运行,那么这个指令会将流,转换成MP4类型的文件,保存在 /root 目录下。关于其他参数的作用,可以上网搜索。包括可以查看支持哪些视频编码格式以及音频编码格式。
二、创建 Spring Boot 测试项目
考虑到要动态控制拉流的流地址,所以需要用SpringBoot来控制Linux指令,也就是上文最后的测试指令。也是在网上搜索后,找到一个最简单的方案,代码如下:
@RestController@RequestMapping("/demo")publicclassDemoController{@PostMapping("/rtsp")publicStringrtsp(@RequestBodyMap<String,String> requestParams){String url = requestParams.get("url");String fileName = requestParams.get("fileName");String ffmpegCmd =String.format("ffmpeg -rtsp_transport tcp -analyzeduration 50000000 -probesize 50000000 -i \"%s\" -c:v h264 -c:a aac -strict -2 /root/%s", url, fileName);System.out.println(ffmpegCmd);try{Process process =Runtime.getRuntime().exec(newString[]{"bash","-c", ffmpegCmd });}catch(IOException e){thrownewRuntimeException(e);}return fileName;}}
编写了一个接口,入参中填入url和fileName,没做校验,一开始测试也可以直接在代码中全部写死,主要是测试 Process 类能不能正常操作 Linux。打包部署测试后,是可以成功控制的。接口方式就可以满足你的业务的话,在这基础上修改一下就可以使用了。
三、定时任务控制拉流
现在最简单的demo就已经完成了,但是这样的实现方式需要手动控制,而视频流其实是固定的几个,用接口方式会很麻烦,所以我们可以创建定时任务,从数据库中读取流和其他数据,实现自动拉流。
- 封装拉流方法
@Slf4j@ServicepublicclassFfmpegService{privatestaticfinalMap<FfmpegBO,Process>PROCESS_MAP=newConcurrentHashMap<>();publicvoidconvertStream(FfmpegBO bo){String url = bo.getStreamUrl();String fileDirName = bo.getFileDir();/**
* /opt/ffmpeg/hls/ + 文件名
*/String baseDirPath ="/opt/ffmpeg/hls/ "+ fileDirName;String fileCreateCmd =String.format("mkdir -p %s", baseDirPath);try{Runtime.getRuntime().exec(newString[]{"sh","-c", fileCreateCmd });}catch(IOException e){thrownewRuntimeException(e.getMessage());}String fileName = bo.getFilename();String ffmpegCmd =String.format("ffmpeg -rtsp_transport tcp -analyzeduration 50000000 -probesize 50000000 -i \"%s\" -c:v h264 -c:a aac -strict -2 /root/%s", url, fileName);try{Process process =Runtime.getRuntime().exec(newString[]{"sh","-c", ffmpegCmd });// 按规则生成转换后的流地址
bo.setConvertStreamUrl("xxxxxxxxxxxxxxxxxxxxxxxxx");PROCESS_MAP.put(bo, process);}catch(IOException e){thrownewRuntimeException(e.getMessage());}
log.info("[FfmpegServiceImpl.pushStream] pushStreamBO: {}", bo);}}
代码中创建了一个
PROCESS_MAP
用来保存执行的代码,用于在后面停止进程。在流转换方法中,传入自己需要的参数,按需求执行转换指令,然后保存到
PROCESS_MAP
。
- 创建定时任务
@Slf4j@ConfigurationpublicclassPushAndPullStreamTaskimplementsInitializingBean{@ResourceprivateFfmpegService ffmpegService;privatefinalScheduledThreadPoolExecutor scheduledPool =newScheduledThreadPoolExecutor(2);publicstaticThreadPoolExecutor commonAsyncPool =newThreadPoolExecutor(4,8,3,TimeUnit.SECONDS,newArrayBlockingQueue<>(100),
r ->{Thread newThread =newThread(r);
newThread.setName(" commonAsyncPool - "+ThreadLocalRandom.current().nextInt(10000));return newThread;});@OverridepublicvoidafterPropertiesSet()throwsException{// 开始流转,30秒后执行第一次,然后每隔五分钟执行一次
scheduledPool.scheduleAtFixedRate(newconvertStreamTask(),30,5*60,TimeUnit.SECONDS);}/**
* 转换流任务
*/class convertStreamTask implementsRunnable{@Overridepublicvoidrun(){List<Equipment> equipmentList =newArrayList<>();/* 填充list */
equipmentList.stream().forEach(equipment ->{
commonAsyncPool.execute(()->{try{FfmpegBO ffmpegBO =newFfmpegBO();
ffmpegBO.setStreamUrl(equipment.getRemark());
ffmpegBO.setFileDir(equipment.getDeviceSerial());
ffmpegBO.setFilename(equipment.getDeviceSerial()+"_"+ equipment.getChannelId());
ffmpegService.convertStream(ffmpegBO);}catch(Exception e){// 处理异常
log.error("Error processing equipment: {}, ", equipment.getPkId(), e);}});});}}}
简单构造一个定时任务,大概每五分钟执行一次(间隔短方便测试)。方法中构造了入参需要的ffmpegBO,开启一个线程池,并发执行转换方法。
PS:定时任务需要在启动类添加注解
- 停止进程的定时任务
publicvoidstopProcess(){
log.info("[FfmpegServiceImpl.stopProcess] 停止进程, {}",PROCESS_MAP);PROCESS_MAP.forEach((bo, process)->{if(!process.isAlive()){return;}
process.destroy();PROCESS_MAP.remove(bo);// 删除文件String baseDirPath ="/opt/ffmpeg/hls/"+ bo.getFileDir();String fileCreateCmd =String.format("rm -rf %s", baseDirPath);try{Runtime.getRuntime().exec(newString[]{"sh","-c", fileCreateCmd });}catch(IOException e){thrownewRuntimeException(e.getMessage());}
log.info("stopProcess: {}", bo);});}
在
FfmpegService
中新增停止任务方式,删除保存的文件,并且停止之前的转换流进程。在
PushAndPullStreamTask
中添加停止任务的定时任务。比转换任务提前20秒执行。
@Slf4j@ConfigurationpublicclassPushAndPullStreamTaskimplementsInitializingBean{@ResourceprivateFfmpegService ffmpegService;privatefinalScheduledThreadPoolExecutor scheduledPool =newScheduledThreadPoolExecutor(2);publicstaticThreadPoolExecutor commonAsyncPool =newThreadPoolExecutor(4,8,3,TimeUnit.SECONDS,newArrayBlockingQueue<>(100),
r ->{Thread newThread =newThread(r);
newThread.setName(" commonAsyncPool - "+ThreadLocalRandom.current().nextInt(10000));return newThread;});@OverridepublicvoidafterPropertiesSet()throwsException{// 开始流转,30秒后执行第一次,然后每隔五分钟执行一次
scheduledPool.scheduleAtFixedRate(newconvertStreamTask(),30,5*60,TimeUnit.SECONDS);// 停止流转,10秒后执行第一次,然后每隔五分钟执行一次
scheduledPool.scheduleAtFixedRate(newdestroyStreamTask(),10,5*60,TimeUnit.SECONDS);}/**
* 转换流任务
*/class convertStreamTask implementsRunnable{@Overridepublicvoidrun(){List<Equipment> equipmentList =newArrayList<>();
equipmentList.add(Equipment.builder().pkId(1).deviceSerial("1002654").channelId(1).remark("rtsp://180.101.128.47:9090/dss/monitor/param?cameraid=1002654%40021%241&substream=2").build());
equipmentList.stream().forEach(equipment ->{
commonAsyncPool.execute(()->{try{FfmpegBO ffmpegBO =newFfmpegBO();
ffmpegBO.setStreamUrl(equipment.getRemark());
ffmpegBO.setFileDir(equipment.getDeviceSerial());
ffmpegBO.setFilename(equipment.getDeviceSerial()+"_"+ equipment.getChannelId());
ffmpegService.pushStream(ffmpegBO);}catch(Exception e){// 处理异常
log.error("Error processing equipment: {}, ", equipment.getPkId(), e);}});});}}class destroyStreamTask implementsRunnable{@Overridepublicvoidrun(){
ffmpegService.stopProcess();}}}
打包部署运行后,观察服务器上是否有文件自动生成,以及自动删除。
四、容器化部署解决方案
现在的部署方式,一般都是容器化部署。但是ffmpeg安装在宿主机中,这意味着需要在容器中操作宿主机执行指令。最简单的方案就是使用
ssh
指令,执行 ssh root@xxx.xxx.xxx.xxx “指令”。
测试方案是否可行
- 运行容器
docker run -it alpine
- 安装 ssh 指令
# 镜像sed-i's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories
# 下载安装
apk update && apk add --no-cache openssh-client
- ssh 远程控制宿主机
ssh root@ip "mkdir -p /opt/ffmpeg"
执行命令后,可以通过输入密码或者密钥的方案实现执行命令,最后宿主机成功创建了文件夹,测试结果证明这样的方案是可行的。
但是java代码没有办法输入密码,所以只能通过密钥的免密登录方式来执行命令。
免密登录测试
- 宿主机创建 rsa 密钥
ssh-keygen -t rsa
执行指令后,会在 /root/.ssh/文件夹下生成两个密钥,后缀 pub 的是公钥,另一个就是私钥。免密登录需要将公钥复制到被登录的目标服务器,在现在需求中,需要在容器中远程登录宿主机,所以宿主机就是目标服务器,那么换个思路,将这里生成的私钥,放在容器中,就可以从容器中远程登录宿主机。
- 宿主机添加公钥
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
- 编写一个 Dockerfile,构建自定义镜像
vim Dockerfile
# Dockerfile 内容
FROM alpine
COPY ./.ssh/id_rsa /root/.ssh/id_rsa
RUN sed-i's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories
RUN apk update && apk add --no-cache openssh-client \&&chmod600 /root/.ssh/id_rsa \&& ssh-keyscan -H 【宿主机ip】 >> /root/.ssh/known_hosts
# 构建镜像docker build -t[镜像名].
- 运行容器,测试免密登录
准备工作都完成后,就修改最初的ffmpeg任务代码,通过ssh的方式调用宿主机执行命令
修改ffmpeg指令
将一些固定配置,抽离到配置文件中,封装config类,灵活控制。参考如下代码。
@Data@ComponentpublicclassFfmpegConfig{@Value(value ="${ffmpeg.baseDirPath}")privateString baseDirPath;@Value(value ="${ffmpeg.ipAddr}")privateString ipAddr;@Value(value ="${ffmpeg.baseUrl}")privateString baseUrl;@Value(value ="${ffmpeg.fileSuffix}")privateString fileSuffix;}
结合配置,修改service代码,参考代码。
@Slf4j@ServicepublicclassFfmpegService{privatestaticfinalMap<FfmpegBO,Process>PROCESS_MAP=newConcurrentHashMap<>();@ResourceprivateFfmpegConfig ffmpegConfig;publicvoidpushStream(FfmpegBO bo){String url = bo.getStreamUrl();String fileDirName = bo.getFileDir();/**
* /opt/ffmpeg/hls/ + 文件名
*/String baseDirPath = ffmpegConfig.getBaseDirPath()+ fileDirName;String fileCreateCmd =String.format("mkdir -p %s", baseDirPath);String sshCmd =String.format("ssh root@%s \"%s\"", ffmpegConfig.getIpAddr(), fileCreateCmd);
log.info("[FfmpegServiceImpl.pushStream] ssh :{}, 执行创建目录指令:{} ", sshCmd ,fileCreateCmd);try{Runtime.getRuntime().exec(newString[]{"sh","-c", sshCmd });}catch(IOException e){thrownewRuntimeException(e.getMessage());}String fileName = bo.getFilename();String outputM3u8 = baseDirPath +"/"+ fileName + ffmpegConfig.getFileSuffix();String ffmpegCmd =String.format(" ffmpeg -rtsp_transport tcp -analyzeduration 50000000 -probesize 50000000 -i \"%s\" -c:v h264 -c:a aac -strict -2 -f hls -hls_time 10 -hls_list_size 0 -hls_segment_filename \"%s/%s_segment_%%03d.ts\" %s",
url, baseDirPath, fileName, outputM3u8);
sshCmd =String.format("ssh root@%s \"%s\"", ffmpegConfig.getIpAddr(), ffmpegCmd);
log.info("[FfmpegServiceImpl.pushStream] ssh :{}, 执行ffmpeg指令:{} ", sshCmd ,ffmpegCmd);try{Process process =Runtime.getRuntime().exec(newString[]{"sh","-c", sshCmd });
bo.setConvertStreamUrl(ffmpegConfig.getBaseUrl()+ fileDirName +"/"+ fileName + ffmpegConfig.getFileSuffix());PROCESS_MAP.put(bo, process);}catch(IOException e){thrownewRuntimeException(e.getMessage());}
log.info("[FfmpegServiceImpl.pushStream] pushStreamBO: {}", bo);}publicvoidstopProcess(){
log.info("[FfmpegServiceImpl.stopProcess] 停止进程, {}",PROCESS_MAP);PROCESS_MAP.forEach((bo, process)->{if(!process.isAlive()){return;}
process.destroy();PROCESS_MAP.remove(bo);// 删除文件String baseDirPath = ffmpegConfig.getBaseDirPath()+ bo.getFileDir();String fileCreateCmd =String.format("rm -rf %s", baseDirPath);String sshCmd =String.format("ssh root@%s \"%s\"", ffmpegConfig.getIpAddr(), fileCreateCmd);
log.info("[FfmpegServiceImpl.pushStream] ssh :{}, 执行删除目录指令:{} ", sshCmd ,fileCreateCmd);try{Runtime.getRuntime().exec(newString[]{"sh","-c", sshCmd });}catch(IOException e){thrownewRuntimeException(e.getMessage());}
log.info("stopProcess: {}", bo);});}}
本次业务最终代码
调整后的代码,抽离封装了一些方法,并且将指令执行后的内容打印出来,方便观察执行效果
@Slf4j@ServicepublicclassFfmpegServiceImplimplementsFfmpegService{privatestaticfinalMap<FfmpegBO,Process>PROCESS_MAP=newConcurrentHashMap<>();@ResourceprivateFfmpegConfig ffmpegConfig;@OverridepublicvoidpushStream(FfmpegBO bo){String url = bo.getStreamUrl();String baseUrl = ffmpegConfig.getBaseUrl();String ipAddr = ffmpegConfig.getIpAddr();String fileDirName = bo.getFileDir();String baseDirPath = ffmpegConfig.getBaseDirPath()+ fileDirName;String fileName = bo.getFilename();String outputM3u8 = baseDirPath +"/"+ fileName + ffmpegConfig.getFileSuffix();// 创建远程目录createRemoteDirectory(ipAddr, baseDirPath);// 执行 FFmpeg 推流命令Process process =executeFfmpegCommand(ipAddr, url, baseDirPath, fileName, outputM3u8);// 设置转换后的流地址
bo.setConvertStreamUrl(baseUrl + fileDirName +"/"+ fileName + ffmpegConfig.getFileSuffix());
log.info("[FfmpegServiceImpl.pushStream] pushStreamBO: {}", bo);// 将进程对象存入 PROCESS_MAPPROCESS_MAP.put(bo, process);}/**
* 停止所有推流进程,并删除远程目录
*/@OverridepublicvoidstopProcess(){
log.info("[FfmpegServiceImpl.stopProcess] 停止进程, {}",PROCESS_MAP);PROCESS_MAP.forEach((bo, process)->{if(!process.isAlive()){
log.warn("[FfmpegServiceImpl.stopProcess] 进程已停止, {}", bo);PROCESS_MAP.remove(bo);return;}// 终止进程
process.destroy();PROCESS_MAP.remove(bo);// 删除远程目录deleteRemoteDirectory(ffmpegConfig.getIpAddr(), ffmpegConfig.getBaseDirPath()+ bo.getFileDir());
log.info("stopProcess: {}", bo);});}/**
* 创建远程目录
* @param ipAddr 远程服务器 IP 地址
* @param baseDirPath 远程目录路径
*/privatevoidcreateRemoteDirectory(String ipAddr,String baseDirPath){String fileCreateCmd =String.format("mkdir -p %s", baseDirPath);String sshCmd =String.format("ssh root@%s \"%s\"", ipAddr, fileCreateCmd);
log.info("[FfmpegServiceImpl.createRemoteDirectory] ssh :{}, 执行创建目录指令:{} ", sshCmd, fileCreateCmd);executeCommand(sshCmd);}/**
* 删除远程目录
* @param ipAddr 远程服务器 IP 地址
* @param baseDirPath 远程目录路径
*/privatevoiddeleteRemoteDirectory(String ipAddr,String baseDirPath){String fileDeleteCmd =String.format("rm -rf %s", baseDirPath);String sshCmd =String.format("ssh root@%s \"%s\"", ipAddr, fileDeleteCmd);
log.info("[FfmpegServiceImpl.deleteRemoteDirectory] ssh :{}, 执行删除目录指令:{} ", sshCmd, fileDeleteCmd);executeCommand(sshCmd);}/**
* 执行 FFmpeg 推流命令
* @param ipAddr 远程服务器 IP 地址
* @param url 推流 URL
* @param baseDirPath 远程目录路径
* @param fileName 文件名
* @param outputM3u8 输出的 M3U8 文件路径
* @return 返回启动的进程对象
*/privateProcessexecuteFfmpegCommand(String ipAddr,String url,String baseDirPath,String fileName,String outputM3u8){String ffmpegCmd =String.format("ffmpeg -rtsp_transport tcp -analyzeduration 50000000 -probesize 50000000 -i \"%s\" -c:v h264 -c:a aac -strict -2 -f hls -hls_time 10 -hls_list_size 0 -hls_segment_filename \"%s/%s_segment_%%03d.ts\" %s",
url, baseDirPath, fileName, outputM3u8);String sshCmd =String.format("ssh root@%s \"%s\"", ipAddr, ffmpegCmd);
log.info("[FfmpegServiceImpl.executeFfmpegCommand] ssh :{}, 执行ffmpeg指令:{} ", sshCmd, ffmpegCmd);Process process =executeCommand(sshCmd);// 启动线程处理标准输出和错误输出,防止进程阻塞handleProcessOutput(process);return process;}/**
* 执行 Shell 命令
* @param command 要执行的命令
* @return 进程对象
*/privateProcessexecuteCommand(String command){try{returnRuntime.getRuntime().exec(newString[]{"sh","-c", command});}catch(IOException e){
log.error("执行命令失败:{}", command, e);thrownewRuntimeException("执行命令失败:"+ e.getMessage());}}/**
* 处理进程的标准输出和错误输出
* @param process 需要处理的进程
*/privatevoidhandleProcessOutput(Process process){newThread(()->{try(BufferedReader reader =newBufferedReader(newInputStreamReader(process.getInputStream()))){String line;while((line = reader.readLine())!=null){
log.info("[FfmpegServiceImpl.handleProcessOutput] Process output: {}", line);}}catch(IOException e){
log.error("[FfmpegServiceImpl.handleProcessOutput] 读取进程输出失败", e);}}).start();newThread(()->{try(BufferedReader reader =newBufferedReader(newInputStreamReader(process.getErrorStream()))){String line;while((line = reader.readLine())!=null){
log.error("[FfmpegServiceImpl.handleProcessOutput] Process error: {}", line);}}catch(IOException e){
log.error("[FfmpegServiceImpl.handleProcessOutput] 读取进程错误输出失败", e);}}).start();}}
五、Nginx 推流
拉流的流程都成功以后,就需要将流推出去,这边用nginx进行推流。修改nginx配置文件。在server节点中添加下面的配置,root的值根据自己的文件保存位置填写,现在的配置代表文件位于
/opt/ffmpeg/hls/
目录下。
location /hls {
types {
application/vnd.apple.mpegurl m3u8;
video/mp2t ts;
}
root /opt/ffmpeg;
add_header Cache-Control no-cache;
}
修改完配置后,
nginx -s reload
使配置生效。
六、前端参考代码
转换后的流是hls格式,使用 vue3-video-play 组件,demo代码如下
<template>
<div class="login-container">
<videoPlay :src="streamUrl" type="application/vnd.apple.mpegurl">
</videoPlay>
</div>
</template>
<script setup>
import { ref } from 'vue'
import 'vue3-video-play/dist/style.css'
import videoPlay from 'vue3-video-play'
const streamUrl = ref("https://xxxxxxxxxxxxxx.m3u8")
</script>
<style lang="scss" scoped>
</style>
版权归原作者 Flobby529 所有, 如有侵权,请联系我们删除。