前言
本文将分享下本人做大文件上传的一些思路,以及相关代码的实现。至于minio的搭建,还是比较简单的。本文就不再赘述。本文搭建的🌰例子也仅仅是把主要流程走通,
相关的demo代码可能会有bug
。
有不同思路的大佬也可以在评区分享下,开拓下思路。
其实主要需要实现的就是分片上传。断点续传,秒传仅仅是在分片上传的基础上增加的逻辑扩张。
demo源码地址
https://gitee.com/Gary2016/minio-upload
演示
大致步骤
流程图
- 前端获取到文件流,计算出文件的唯一标识identifier(md5摘要)。
- 将获取到的identifier传递给后端,查询该文件的上传任务记录。如果没有则初始化一个上传任务
- 校验上传任务记录是否完成上传(成功执行合并分片的操作后视为完成上传) 3.1 任务完成,直接返回文件地址 3.2 任务未完成,获取已上传的分片。前端按照分片任务中记录的分片大小将文件分片。然后遍历所有分片进行单片上传,如果分块存在于已上传的分片列表中,则跳过该分块的上传。所有分片完成上传后,请求后端合并分片的接口进行合并。合并完成后,返回文件地址
单片上传
单片上传是通过预签名上传的方式:获取到minio经过签名的上传地址后由前端直接向minio服务器发起真正的上传请求。避免上传时占用应用服务器的带宽,影响系统稳定。
代码实现
主要技术栈
vue 3.0
element plus
promise-queue-plus
springboot 2.7.3
mybatis-plus 3.5.1
aws-java-sdk-s3 1.12.263
mysql8
minio 最新版
后端实现
数据库设计
实现断点续传,秒传的前提就是服务端需要记录文件的上传进度。因此,需要一张表来记录文件的上传记录。至于已上传的分块记录由minio提供的接口来获取。
以下是表设计
CREATETABLE`sys_upload_task`(`id`bigintNOTNULL,`upload_id`varchar(255)CHARACTERSET utf8mb4 COLLATE utf8mb4_general_ci NOTNULLCOMMENT'分片上传的uploadId',`file_identifier`varchar(500)CHARACTERSET utf8mb4 COLLATE utf8mb4_general_ci NOTNULLCOMMENT'文件唯一标识(md5)',`file_name`varchar(500)COLLATE utf8mb4_general_ci NOTNULLCOMMENT'文件名',`bucket_name`varchar(255)CHARACTERSET utf8mb4 COLLATE utf8mb4_general_ci NOTNULLCOMMENT'所属桶名',`object_key`varchar(500)CHARACTERSET utf8mb4 COLLATE utf8mb4_general_ci NOTNULLCOMMENT'文件的key',`total_size`bigintNOTNULLCOMMENT'文件大小(byte)',`chunk_size`bigintNOTNULLCOMMENT'每个分片大小(byte)',`chunk_num`intNOTNULLCOMMENT'分片数量',PRIMARYKEY(`id`),UNIQUEKEY`uq_file_identifier`(`file_identifier`)USINGBTREE,UNIQUEKEY`uq_upload_id`(`upload_id`)USINGBTREE)ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='分片上传-分片任务记录';
接口设计
以下接口的响应参数均被包装在Result对象的data字段中
Result
名称类型说明codeint自定义状态码(成功:200000,失败:500000)dataobject接口真实数据msgstring信息
1.根据文件唯一标识获取上传任务
主要流程就是查询数据库记录,存在上传任务再通过amazon s3的sdk方法:
amazonS3.doesObjectExist
,判断是否存在文件对象,存在则说明已经合并完成。
接口地址:/v1/minio/tasks/{identifier}
请求方式:GET
响应参数:
名称类型说明finishedboolean是否完成上传pathstring文件地址taskRecordTaskRecordDTO任务记录信息
TaskRecordDTO
名称类型说明idlong任务iduploadIdstringminio的uploadIdfileIdentifierstring文件唯一标识(MD5)fileNamestring文件名称bucketNamestring所属桶名objectKeystring文件的keytotalSizelong文件大小(byte)chunkSizelong每个分片大小(byte)chunkNumint分片数量exitPartListPartSummary[]已上传完的分片 (finished为true时,该字段为null)
PartSummary(该类由s3的sdk提供)
名称类型说明partNumberint分片编号lastModifiedDate最后修改时间eTagstring分片的eTag(MD5)sizelong分片大小
主要代码
/**
* 获取上传进度
* @param identifier 文件md5
* @return
*/@GetMapping("/{identifier}")publicResult<TaskInfoDTO> taskInfo (@PathVariable("identifier")String identifier){returnResult.ok(sysUploadTaskService.getTaskInfo(identifier));}
@OverridepublicTaskInfoDTOgetTaskInfo(String identifier){SysUploadTask task =getByIdentifier(identifier);if(task ==null){returnnull;}TaskInfoDTO result =newTaskInfoDTO().setFinished(true).setTaskRecord(TaskRecordDTO.convertFromEntity(task)).setPath(getPath(task.getBucketName(), task.getObjectKey()));boolean doesObjectExist = amazonS3.doesObjectExist(task.getBucketName(), task.getObjectKey());if(!doesObjectExist){// 未上传完,返回已上传的分片ListPartsRequest listPartsRequest =newListPartsRequest(task.getBucketName(), task.getObjectKey(), task.getUploadId());PartListing partListing = amazonS3.listParts(listPartsRequest);
result.setFinished(false).getTaskRecord().setExitPartList(partListing.getParts());}return result;}
2.初始化一个上传任务
当接口1返回的数据为null时,调用此接口初始化一个上传任务。
接口地址:/v1/minio/tasks
请求方式:POST
请求参数(body):
名称类型说明identifierstring文件唯一标识(MD5)totalSizelong文件大小(byte)chunkSizelong分片大小(byte)fileNamestring文件名称
响应参数:与接口1的响应参数一致,此处就不再重复
主要代码
/**
* 创建一个上传任务
* @return
*/@PostMappingpublicResult<TaskInfoDTO> initTask (@Valid@RequestBodyInitTaskParam param,BindingResult bindingResult){if(bindingResult.hasErrors()){returnResult.error(bindingResult.getFieldError().getDefaultMessage());}returnResult.ok(sysUploadTaskService.initTask(param));}
@OverridepublicTaskInfoDTOinitTask(InitTaskParam param){Date currentDate =newDate();String bucketName = minioProperties.getBucket();String fileName = param.getFileName();String suffix = fileName.substring(fileName.lastIndexOf("."), fileName.length());// 使用uuid生成一个objectKeyString key =StrUtil.format("{}/{}.{}",DateUtil.format(currentDate,"YYYY-MM-dd"),IdUtil.randomUUID(), suffix);// 设置文件的媒体类型(图片,视频等能被浏览器解析的资源可以直接浏览器打开),获取不到的默认为流格式String contentType =MediaTypeFactory.getMediaType(key).orElse(MediaType.APPLICATION_OCTET_STREAM).toString();ObjectMetadata objectMetadata =newObjectMetadata();
objectMetadata.setContentType(contentType);InitiateMultipartUploadResult initiateMultipartUploadResult = amazonS3
.initiateMultipartUpload(newInitiateMultipartUploadRequest(bucketName, key).withObjectMetadata(objectMetadata));String uploadId = initiateMultipartUploadResult.getUploadId();SysUploadTask task =newSysUploadTask();// 计算分片数int chunkNum =(int)Math.ceil(param.getTotalSize()*1.0/ param.getChunkSize());
task.setBucketName(minioProperties.getBucket()).setChunkNum(chunkNum).setChunkSize(param.getChunkSize()).setTotalSize(param.getTotalSize()).setFileIdentifier(param.getIdentifier()).setFileName(fileName).setObjectKey(key).setUploadId(uploadId);
sysUploadTaskMapper.insert(task);returnnewTaskInfoDTO().setFinished(false).setTaskRecord(TaskRecordDTO.convertFromEntity(task)).setPath(getPath(bucketName, key));}
3.根据文件唯一标识和分片编号获取一个预签名上传地址
前端在校验当前分片未上传时,调用该接口,获取到一个分片的上传地址,将分片的文件流直接通过PUT请求上传到该地址。该接口也是对amazon sdk的方法进行包装:
amazonS3.generatePresignedUrl
。
接口地址:/v1/minio/tasks/{identifier}/{partNumber}
请求方式:GET
响应参数:string(预签名上传地址)
主要代码
/**
* 获取每个分片的预签名上传地址
* @param identifier
* @param partNumber
* @return
*/@GetMapping("/{identifier}/{partNumber}")publicResult preSignUploadUrl (@PathVariable("identifier")String identifier,@PathVariable("partNumber")Integer partNumber){SysUploadTask task = sysUploadTaskService.getByIdentifier(identifier);if(task ==null){returnResult.error("分片任务不存在");}Map<String,String> params =newHashMap<>();// 必须传入partNumber和uploadId,否则在获取已上传分片列表时会获取不到
params.put("partNumber", partNumber.toString());
params.put("uploadId", task.getUploadId());returnResult.ok(sysUploadTaskService.genPreSignUploadUrl(task.getBucketName(), task.getObjectKey(), params));}
@OverridepublicStringgenPreSignUploadUrl(String bucket,String objectKey,Map<String,String> params){Date currentDate =newDate();Date expireDate =DateUtil.offsetMillisecond(currentDate,MinioConstant.PRE_SIGN_URL_EXPIRE.intValue());GeneratePresignedUrlRequest request =newGeneratePresignedUrlRequest(bucket, objectKey).withExpiration(expireDate).withMethod(HttpMethod.PUT);if(params !=null){
params.forEach((key, val)-> request.addRequestParameter(key, val));}URL preSignedUrl = amazonS3.generatePresignedUrl(request);return preSignedUrl.toString();}
4.根据文件唯一标识进行合并分片
当所有分片完成上传时,调用该接口。该接口是对amazon sdk的方法:
amazonS3.completeMultipartUpload
进行封装。在合并的校验逻辑中,仅仅是对分片数量是否一致做了校验,理论上应该通过已上传分片的eTag计算总文件的MD5是否与数据库中存储的一致。但我通过etag计算出来的md5与直接前端通过文件流计算出来的md5不一致,所以只能采用这种方式了。
接口地址:/v1/minio/tasks/{identifier}
请求方式:GET
主要代码
/**
* 合并分片
* @param identifier
* @return
*/@PostMapping("/merge/{identifier}")publicResult merge (@PathVariable("identifier")String identifier){
sysUploadTaskService.merge(identifier);returnResult.ok();}
@Overridepublicvoidmerge(String identifier){SysUploadTask task =getByIdentifier(identifier);if(task ==null){thrownewRuntimeException("分片任务不存");}ListPartsRequest listPartsRequest =newListPartsRequest(task.getBucketName(), task.getObjectKey(), task.getUploadId());PartListing partListing = amazonS3.listParts(listPartsRequest);List<PartSummary> parts = partListing.getParts();if(!task.getChunkNum().equals(parts.size())){// 已上传分块数量与记录中的数量不对应,不能合并分块thrownewRuntimeException("分片缺失,请重新上传");}CompleteMultipartUploadRequest completeMultipartUploadRequest =newCompleteMultipartUploadRequest().withUploadId(task.getUploadId()).withKey(task.getObjectKey()).withBucketName(task.getBucketName()).withPartETags(parts.stream().map(partSummary ->newPartETag(partSummary.getPartNumber(), partSummary.getETag())).collect(Collectors.toList()));CompleteMultipartUploadResult result = amazonS3.completeMultipartUpload(completeMultipartUploadRequest);}
前端实现
使用spark-md5计算文件的md5
在计算文件md5时,如果文件过大,可能会导致浏览器崩溃,所以也是通过分片加载到内存中,再进行md5计算。该分片大小,可以与上传时的大小不一致。因为同一个文件不管分多少块,最终计算出来的md5值都是一致的。
import SparkMD5 from'spark-md5'constDEFAULT_SIZE=5*1024*1024constmd5=(file, chunkSize =DEFAULT_SIZE)=>{returnnewPromise((resolve, reject)=>{const startMs =newDate().getTime();let blobSlice =File.prototype.slice ||File.prototype.mozSlice ||File.prototype.webkitSlice;let chunks = Math.ceil(file.size / chunkSize);let currentChunk =0;let spark =newSparkMD5.ArrayBuffer();//追加数组缓冲区。let fileReader =newFileReader();//读取文件
fileReader.onload=function(e){
spark.append(e.target.result);
currentChunk++;if(currentChunk < chunks){loadNext();}else{const md5 = spark.end();//完成md5的计算,返回十六进制结果。
console.log('文件md5计算结束,总耗时:',(newDate().getTime()- startMs)/1000,'s')resolve(md5);}};
fileReader.onerror=function(e){reject(e);};functionloadNext(){
console.log('当前part number:', currentChunk,'总块数:', chunks);let start = currentChunk * chunkSize;let end = start + chunkSize;(end > file.size)&&(end = file.size);
fileReader.readAsArrayBuffer(blobSlice.call(file, start, end));}loadNext();});}exportdefault md5
vue3 + element-plus的el-upload 实现自定义上传
<script setup>import{ UploadFilled }from'@element-plus/icons-vue'import md5 from"../lib/md5";import{ taskInfo, initTask, preSignUrl, merge }from'../lib/api';import{ElNotification}from"element-plus";import Queue from'promise-queue-plus';import axios from'axios'import{ ref }from'vue'// 文件上传分块任务的队列(用于移除文件时,停止该文件的上传队列) key:fileUid value: queue objectconst fileUploadChunkQueue =ref({}).value
/**
* 获取一个上传任务,没有则初始化一个
*/constgetTaskInfo=async(file)=>{let task;const identifier =awaitmd5(file)const{ code, data, msg }=awaittaskInfo(identifier)if(code ===200000){
task = data
if(!task){const initTaskData ={
identifier,fileName: file.name,totalSize: file.size,chunkSize:5*1024*1024}const{ code, data, msg }=awaitinitTask(initTaskData)if(code ===200000){
task = data
}else{
ElNotification.error({title:'文件上传错误',message: msg
})}}}else{
ElNotification.error({title:'文件上传错误',message: msg
})}return task
}/**
* 上传逻辑处理,如果文件已经上传完成(完成分块合并操作),则不会进入到此方法中
*/consthandleUpload=(file, taskRecord, options)=>{let uploadedSize =0// 已上传的大小const totalSize = file.size ||0// 文件总大小const{ exitPartList, chunkSize, chunkNum, fileIdentifier }= taskRecord
constuploadNext=async(partNumber)=>{const start =newNumber(chunkSize)*(partNumber -1)const end = start +newNumber(chunkSize)const blob = file.slice(start, end)const{ code, data, msg }=awaitpreSignUrl({identifier: fileIdentifier,partNumber: partNumber})if(code ===200000&& data){await axios.request({url: data,method:'PUT',data: blob,headers:{'Content-Type':'application/octet-stream'}})return Promise.resolve({partNumber: partNumber,uploadedSize: blob.size })}return Promise.reject(`分片${partNumber}, 获取上传地址失败`)}/**
* 更新上传进度
* @param increment 为已上传的进度增加的字节量
*/constupdateProcess=(increment)=>{
increment =newNumber(increment)const{ onProgress }= options
let factor =1000;// 每次增加1000 bytelet from =0;// 通过循环一点一点的增加进度while(from <= increment){
from += factor
uploadedSize += factor
const percent = Math.round(uploadedSize / totalSize *100).toFixed(2);onProgress({percent: percent})}}returnnewPromise(resolve=>{const failArr =[];const queue =Queue(5,{"retry":3,//Number of retries"retryIsJump":false,//retry now?"workReject":function(reason,queue){
failArr.push(reason)},"queueEnd":function(queue){resolve(failArr);}})
fileUploadChunkQueue[file.uid]= queue
for(let partNumber =1; partNumber <= chunkNum; partNumber++){const exitPart =(exitPartList ||[]).find(exitPart=> exitPart.partNumber == partNumber)if(exitPart){// 分片已上传完成,累计到上传完成的总额中updateProcess(exitPart.size)}else{
queue.push(()=>uploadNext(partNumber).then(res=>{// 单片文件上传完成再更新上传进度updateProcess(res.uploadedSize)}))}}if(queue.getLength()==0){// 所有分片都上传完,但未合并,直接return出去,进行合并操作resolve(failArr);return;}
queue.start()})}/**
* el-upload 自定义上传方法入口
*/consthandleHttpRequest=async(options)=>{const file = options.file
const task =awaitgetTaskInfo(file)if(task){const{ finished, path, taskRecord }= task
const{fileIdentifier: identifier }= taskRecord
if(finished){return path
}else{const errorList =awaithandleUpload(file, taskRecord, options)if(errorList.length >0){
ElNotification.error({title:'文件上传错误',message:'部分分片上次失败,请尝试重新上传文件'})return;}const{ code, data, msg }=awaitmerge(identifier)if(code ===200000){return path;}else{
ElNotification.error({title:'文件上传错误',message: msg
})}}}else{
ElNotification.error({title:'文件上传错误',message:'获取上传任务失败'})}}/**
* 移除文件列表中的文件
* 如果文件存在上传队列任务对象,则停止该队列的任务
*/consthandleRemoveFile=(uploadFile, uploadFiles)=>{const queueObject = fileUploadChunkQueue[uploadFile.uid]if(queueObject){
queueObject.stop()
fileUploadChunkQueue[undefined]}}</script><template><el-card style="width: 80%; margin: 80px auto" header="文件分片上传"><el-upload
class="upload-demo"
drag
action="/"multiple:http-request="handleHttpRequest":on-remove="handleRemoveFile"><el-icon class="el-icon--upload"><upload-filled /></el-icon><div class="el-upload__text">
请拖拽文件到此处或 <em>点击此处上传</em></div></el-upload></el-card></template>
遇到的问题
1. 如何解决上传请求的并发数限制,以及错误重试?
前端实现这部分比较麻烦的是在分片上传的时候要控制请求的并发数,让多个分片并发上传可以提升上传效率,但是请求过多时,会占用操作系统大部分资源。我是使用了一个第三方插件:promise-queue-plus 用于控制分片上传的并发数,以及对上传错误的分片进行重试。
2. 上传进度计算问题
以前计算上传进度的方法是,使用ajax提供的
onprogress
监听,获取到已加载的文件流大小:loaded,再除以文件总大小:total。但是分片上传后,每个分片上传线程获取到的loaded,都是该分片的已加载的文件流大小。
如果把同一文件的分片上传线程获取到的loaded累计在一起,最后得到的值是会远超文件总大小的。
所以我采用的方式是,当文件某一分片上传完成时,再将该分片的大小累计到文件的总上传大小中,在触发el-upload的
onProgress
方法,回显文件的上传进度。但此方案实现出来的效果,上传进度条的递增效果会变得比较急促,没有以前那么丝滑。
3. 为什么不通过后端批量生产预签名上传地址?
其实在调用接口:1.根据文件唯一标识获取上传任务 和2.初始化一个上传任务时,可以将响应参数exitPartList变更为waitUploadList。此时生成待上传的分片列表,列表包含分片编号,预签名上传地址。前端遍历该列表,通过分片编号将文件分片上传到对应的地址中既可。
但是我并没有采用此方案,因为上传地址具有时效性。可能单个上传地址有效时间为10分钟(当然如果设置的比较长,就能解决问题),但是文件上传完可能需要20分钟,那么在并发上传请求时,可能某些分片在上传地址失效后才轮到它去执行。此时就需要有个机制去重新获取预签名上传地址。
所以我的做法是,在轮到该分片上传时,才会去获取预签名上传地址。但是由于我的代码没写好,把预签名请求获取的方法和真实上传的逻辑都封装到
uploadNext
方法中,导致上传失败时,会重复执行
uploadNext
导致重复去获取预签名上传地址了。可以通过前端去做优化,我比较懒就不优化了。
4. 文件比较大的时候md5计算比较慢
本文的demo是对整个文件进行md5计算,在文件比较大时,计算md5可能比较耗时。我在本机电脑测试,几百兆的文件计算还是比较快的,几秒到十几秒就能计算出来。5G左右的文件,可能需要一分钟多钟。我觉得这个速度还是能接受的,毕竟磨刀不误砍柴功。上传5G的文件来说,花几分钟算个md5我觉得不过分。也可以继续做优化,例如提取文件的某几个片段进行md5。当然,这种做法,可能会提高md5重复的概率。
版权归原作者 暴走的咖喱 所有, 如有侵权,请联系我们删除。