文章目录
简介
本文实现用海康工业相机,触发模式,开发调试用软件触发,实际用硬件触发抓图,调用目标检测接口
hikrobot工业相机抓图
安装驱动
去官网下载驱动,下载客户端,而不要选运行环境,客户端包含了运行环境,还有带示例,文档,里面有个安装文件大概如 MVS-2.1.2_x86_64_20231011.deb
# 安装sudo dpkg -i MVS-2.1.2_x86_64_20231011.deb
# 删除用以下命令sudo dpkg -r mvs
# 打开客户端cd /opt/MVS/bin/
./MVS
文档在 /opt/MVS/doc/
示例在 /opt/MVS/Samples
python示例在 /opt/MVS/Samples/64/Python/
相机抓图
代码如下,使用2个相机,开起线程模拟循环调用软件触发。
sdk有callback传参数失效的坑,感觉是海康sdk的bug,现象是这样的,
使用MV_CC_RegisterImageCallBackEx(call_back_fun,user_data)
传给回调函数的参数user_data,user_data是个指针,运行开始传参是正确的,运行一段时间后传的地址正确,但是指针指向的数据内容错了,应该是被sdk修改了,所以起了2个回调函数,根据用户自定义的名称DeviceUserID区别是哪个相机,2个相机对应不同的目标检测业务
# -*- coding: utf-8-*-
import sys
import time
import os
import numpy as np
import cv2
from ctypes import *
import termios
from datetime import datetime
import threading
sys.path.append("/opt/MVS/Samples/64/Python/MvImport") # 导入相应SDK的库,实际安装位置绝对路径
from MvCameraControl_class import *
#以下需要配置
#抓图保存路径,提供给app
grab_dir="box-end/app/new_coming_images/"
DEVICE_USER_ID_FRONT ="user_id_002"
DEVICE_USER_ID_BACK ="user_id_001"
#其他全局默认值,不用配置
CAM_LIST =[]
DEVICE_NUM =0
GRAB_RUN = True
SERVICE_ID ="200002"
USER_DATA_PY =[]#DEVICE_IDX = None
# 打印设备详情
def printDeviceInfo(deviceList):for i in range(0, deviceList.nDeviceNum):
mvcc_dev_info =cast(deviceList.pDeviceInfo[i],POINTER(MV_CC_DEVICE_INFO)).contents
print("mvcc_dev_info",mvcc_dev_info)if mvcc_dev_info.nTLayerType == MV_GIGE_DEVICE:print("\ngige device: [%d]"% i)
strModeName =""for per in mvcc_dev_info.SpecialInfo.stGigEInfo.chModelName:
strModeName = strModeName +chr(per)print("device model name: %s"% strModeName)
nip1 =((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp &0xff000000)>>24)
nip2 =((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp &0x00ff0000)>>16)
nip3 =((mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp &0x0000ff00)>>8)
nip4 =(mvcc_dev_info.SpecialInfo.stGigEInfo.nCurrentIp &0x000000ff)print("current ip: %d.%d.%d.%d\n"%(nip1, nip2, nip3, nip4))
elif mvcc_dev_info.nTLayerType == MV_USB_DEVICE:print("\nu3v device: [%d]"% i)
strModeName =""for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chModelName:if per ==0:break
strModeName = strModeName +chr(per)print("device model name: %s"% strModeName)
strSerialNumber =""for per in mvcc_dev_info.SpecialInfo.stUsb3VInfo.chSerialNumber:if per ==0:break
strSerialNumber = strSerialNumber +chr(per)print("user serial number: %s"% strSerialNumber)
# 判读图像格式是彩色还是黑白
def IsImageColor(enType):
dates ={
PixelType_Gvsp_RGB8_Packed:'color',
PixelType_Gvsp_BGR8_Packed:'color',
PixelType_Gvsp_YUV422_Packed:'color',
PixelType_Gvsp_YUV422_YUYV_Packed:'color',
PixelType_Gvsp_BayerGR8:'color',
PixelType_Gvsp_BayerRG8:'color',
PixelType_Gvsp_BayerGB8:'color',
PixelType_Gvsp_BayerBG8:'color',
PixelType_Gvsp_BayerGB10:'color',
PixelType_Gvsp_BayerGB10_Packed:'color',
PixelType_Gvsp_BayerBG10:'color',
PixelType_Gvsp_BayerBG10_Packed:'color',
PixelType_Gvsp_BayerRG10:'color',
PixelType_Gvsp_BayerRG10_Packed:'color',
PixelType_Gvsp_BayerGR10:'color',
PixelType_Gvsp_BayerGR10_Packed:'color',
PixelType_Gvsp_BayerGB12:'color',
PixelType_Gvsp_BayerGB12_Packed:'color',
PixelType_Gvsp_BayerBG12:'color',
PixelType_Gvsp_BayerBG12_Packed:'color',
PixelType_Gvsp_BayerRG12:'color',
PixelType_Gvsp_BayerRG12_Packed:'color',
PixelType_Gvsp_BayerGR12:'color',
PixelType_Gvsp_BayerGR12_Packed:'color',
PixelType_Gvsp_Mono8:'mono',
PixelType_Gvsp_Mono10:'mono',
PixelType_Gvsp_Mono10_Packed:'mono',
PixelType_Gvsp_Mono12:'mono',
PixelType_Gvsp_Mono12_Packed:'mono'}return dates.get(enType,'未知')
# 回调取图采集
def image_callback_main(pData, pFrameInfo, device_idx):
global USER_DATA_PY
stFrameInfo =cast(pFrameInfo,POINTER(MV_FRAME_OUT_INFO_EX)).contents
#print("pUser",pUser,"cast",cast(pUser,POINTER(py_object)))#user_data_py =cast(pUser,POINTER(py_object)).contents.value#print("user_data_py",user_data_py)#cam= user_data_py["cam"]#device_idx = user_data_py["device_idx"]#print("pUser",pUser)#DEVICE_IDX =cast(pUser,POINTER(py_object)).contents.value#print("DEVICE_IDX",DEVICE_IDX)#print("USER_DATA_PY",USER_DATA_PY,"DEVICE_IDX",DEVICE_IDX)#cam= USER_DATA_PY[DEVICE_IDX]["cam"]
DEVICE_IDX = device_idx
cam = USER_DATA_PY[DEVICE_IDX]["cam"]if stFrameInfo:print("get one frame: Width[%d], Height[%d], nFrameNum[%d], enPixelType[%s]"%(stFrameInfo.nWidth, stFrameInfo.nHeight, stFrameInfo.nFrameNum, stFrameInfo.enPixelType))
time_start = time.time()
#图片名格式 image-200001-4-2022-01-18-10-57-03-1642474623702.jpg
img_name ="image-"+SERVICE_ID+"-"+str(DEVICE_IDX+1)+"-"+datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')
img_name_tmp = img_name +".jpeg"
img_name_jpg = img_name +".jpg"
img_path_tmp = os.path.join(grab_dir, img_name_tmp)
img_path_jpg = os.path.join(grab_dir, img_name_jpg)
stConvertParam =MV_CC_PIXEL_CONVERT_PARAM()memset(byref(stConvertParam),0,sizeof(stConvertParam))ifIsImageColor(stFrameInfo.enPixelType)=='mono':print("mono!")
stConvertParam.enDstPixelType = PixelType_Gvsp_Mono8
nConvertSize = stFrameInfo.nWidth * stFrameInfo.nHeight
elif IsImageColor(stFrameInfo.enPixelType)=='color':print("color!")
stConvertParam.enDstPixelType = PixelType_Gvsp_BGR8_Packed # opecv要用BGR,不能使用RGB
nConvertSize = stFrameInfo.nWidth * stFrameInfo.nHeight*3else:print("not support!!!")
img_buffer =(c_ubyte * stFrameInfo.nFrameLen)()
stConvertParam.nWidth = stFrameInfo.nWidth
stConvertParam.nHeight = stFrameInfo.nHeight
stConvertParam.pSrcData =cast(pData,POINTER(c_ubyte))
stConvertParam.nSrcDataLen = stFrameInfo.nFrameLen
stConvertParam.enSrcPixelType = stFrameInfo.enPixelType
stConvertParam.pDstBuffer =(c_ubyte * nConvertSize)()
stConvertParam.nDstBufferSize = nConvertSize
print('time cos 1:', time.time()- time_start,'s')
ret = cam.MV_CC_ConvertPixelType(stConvertParam)print('time cos 2:', time.time()- time_start,'s')if ret !=0:print("convert pixel fail! ret[0x%x]"% ret)
del stConvertParam.pSrcData
sys.exit()else:#print("convert ok!!")
# 转OpenCV
# 黑白处理
ifIsImageColor(stFrameInfo.enPixelType)=='mono':
img_buffer =(c_ubyte * stConvertParam.nDstLen)()memmove(byref(img_buffer), stConvertParam.pDstBuffer, stConvertParam.nDstLen)
img_buffer = np.frombuffer(img_buffer,count=int(stConvertParam.nDstLen), dtype=np.uint8)
img_buffer = img_buffer.reshape((stFrameInfo.nHeight, stFrameInfo.nWidth))#print("mono ok!!")
# 彩色处理
ifIsImageColor(stFrameInfo.enPixelType)=='color':
img_buffer =(c_ubyte * stConvertParam.nDstLen)()memmove(byref(img_buffer), stConvertParam.pDstBuffer, stConvertParam.nDstLen)
img_buffer = np.frombuffer(img_buffer, count=int(stConvertParam.nDstBufferSize), dtype=np.uint8)
img_buffer = img_buffer.reshape(stFrameInfo.nHeight,stFrameInfo.nWidth,3)#print("color ok!!")print('time cos 3:', time.time()- time_start,'s')
height, width = img_buffer.shape[0:2]
img_buffer = cv2.resize(img_buffer,(int(width/2),int(height/2)), interpolation=cv2.INTER_AREA)print("img_path",img_path_tmp)
cv2.imwrite(img_path_tmp, img_buffer)
os.rename(img_path_tmp, img_path_jpg)#cv2.imshow('img', img_buffer)#cv2.waitKey(10)
#下面是模拟摄像机2#img_name ="image-"+SERVICE_ID+"-"+str(DEVICE_IDX+2)+"-"+datetime.now().strftime('%Y-%m-%d-%H-%M-%S-%f')#img_name_tmp = img_name +".jpeg"#img_name_jpg = img_name +".jpg"#img_path_tmp = os.path.join(grab_dir, img_name_tmp)#img_path_jpg = os.path.join(grab_dir, img_name_jpg)#cv2.imwrite(img_path_tmp, img_buffer)#os.rename(img_path_tmp, img_path_jpg)print("")
g_winfun_ctype = CFUNCTYPE
g_st_frame_info =POINTER(MV_FRAME_OUT_INFO_EX)
g_p_data =POINTER(c_ubyte)
FrameInfoCallBack =g_winfun_ctype(None, g_p_data, g_st_frame_info, c_void_p)
def image_callback_0(pData, pFrameInfo, pUser):image_callback_main(pData, pFrameInfo,0)
def image_callback_1(pData, pFrameInfo, pUser):image_callback_main(pData, pFrameInfo,1)
# 因为回调传值会变化,暂时只能用2个回调区别那个摄像头
CALL_BACK_FUN_0 =FrameInfoCallBack(image_callback_0)
CALL_BACK_FUN_1 =FrameInfoCallBack(image_callback_1)
def press_any_key_exit():
fd = sys.stdin.fileno()
old_ttyinfo = termios.tcgetattr(fd)
new_ttyinfo = old_ttyinfo[:]
new_ttyinfo[3]&=~termios.ICANON
new_ttyinfo[3]&=~termios.ECHO
#sys.stdout.write(msg)#sys.stdout.flush()
termios.tcsetattr(fd, termios.TCSANOW, new_ttyinfo)
try:
os.read(fd,7)
except:
pass
finally:
termios.tcsetattr(fd, termios.TCSANOW, old_ttyinfo)
def add_trigger_event_thread(cam=0, idx=None):print("add_trigger_event_thread")while True:if GRAB_RUN==False:break
ret = cam.MV_CC_SetCommandValue("TriggerSoftware");if ret !=0:print("TriggerSoftware fail! ret[0x%x]"% ret)break
time.sleep(2)
def start():
global GRAB_RUN
global CAM_LIST
global DEVICE_NUM
global USER_DATA_PY
deviceList =MV_CC_DEVICE_INFO_LIST()
tlayerType = MV_GIGE_DEVICE | MV_USB_DEVICE
# 1 枚举设备 | en:Enum device
ret = MvCamera.MV_CC_EnumDevices(tlayerType, deviceList)if ret !=0:print("enum devices fail! ret[0x%x]"% ret)
GRAB_RUN = False
returnif deviceList.nDeviceNum ==0:print("find no device!")
GRAB_RUN = False
returnprint("Find %d devices!"% deviceList.nDeviceNum)
# 打印设备详情
printDeviceInfo(deviceList)
# 2 打开
# 2.1 创建相机实例 | en:Creat Camera Object
DEVICE_NUM = deviceList.nDeviceNum
for i in range(0, DEVICE_NUM):
CAM_LIST.append(MvCamera())#ch:选择设备并创建句柄| en:Select device and create handle
stDeviceList =cast(deviceList.pDeviceInfo[int(i)],POINTER(MV_CC_DEVICE_INFO)).contents
CAM_LIST[i].MV_CC_CreateHandle(stDeviceList)if ret !=0:print("create handle fail! ret[0x%x]"% ret)
GRAB_RUN = False
return
# 2.2 打开设备 | en:Open device
ret = CAM_LIST[i].MV_CC_OpenDevice(MV_ACCESS_Exclusive,0)if ret !=0:print("open device fail! ret[0x%x]"% ret)
GRAB_RUN = False
return
ret = CAM_LIST[i].MV_CC_SetEnumValue("ExposureAuto", MV_EXPOSURE_AUTO_MODE_CONTINUOUS)if ret !=0:print("set ExposureAuto fail! ret[0x%x]"% ret)
GRAB_RUN = False
return
ret = CAM_LIST[i].MV_CC_SetEnumValue("GainAuto",MV_GAIN_MODE_CONTINUOUS)if ret !=0:print("set GainAuto fail! ret[0x%x]"% ret)
GRAB_RUN = False
return
ret = CAM_LIST[i].MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_ON)#ret= CAM_LIST[i].MV_CC_SetEnumValue("TriggerMode", MV_TRIGGER_MODE_OFF)if ret !=0:print('Enable trigger failed! [{0:#X}]'.format(ret))
GRAB_RUN = False
return False
ret = CAM_LIST[i].MV_CC_SetEnumValue('TriggerSource', MV_TRIGGER_SOURCE_SOFTWARE)#ret= CAM_LIST[i].MV_CC_SetEnumValue('TriggerSource', MV_TRIGGER_SOURCE_LINE0)if ret !=0:print('Set trigger source failed![{0:#X}]'.format(ret))
GRAB_RUN = False
return False
#ret= CAM_LIST[i].MV_CC_SetEnumValue('TriggerActivation',3)#ifret !=0:#print('Set trigger source failed![{0:#X}]'.format(ret))#GRAB_RUN = False#returnFalse
stStringValue =MVCC_STRINGVALUE()memset(byref(stStringValue),0,sizeof(MVCC_STRINGVALUE))
ret = CAM_LIST[i].MV_CC_GetStringValue("DeviceUserID", stStringValue)if ret !=0:print("获取 string 型数据 %s 失败 ! 报错码 ret[0x%x]"%("DeviceUserID", ret))
GRAB_RUN = False
return False
device_user_id = bytes.decode(stStringValue.chCurValue)print("device_user_id", device_user_id)
USER_DATA_PY.append({"cam":CAM_LIST[i],"device_idx":i
})#user_data =cast(pointer(py_object(i)), c_void_p)#user_data =cast(pointer(c_int(i)), c_void_p)#print("pass callback arg,i:",i)#user_data =pointer(c_int(i))#device_idx =(c_int)()#device_idx = i#print("pass callback arg,device_idx:",device_idx)#user_data =cast(pointer(c_int(device_idx)), c_void_p)
user_data =cast(pointer(py_object(i)), c_void_p)if device_user_id == DEVICE_USER_ID_FRONT:
call_back_fun = CALL_BACK_FUN_0
elif device_user_id == DEVICE_USER_ID_BACK:
call_back_fun = CALL_BACK_FUN_1
ret = CAM_LIST[i].MV_CC_RegisterImageCallBackEx(call_back_fun,user_data)if ret !=0:print('Register callback failed![{0:#X}]'.format(ret))
GRAB_RUN = False
return False
hThreadHandle = threading.Thread(target=add_trigger_event_thread, args=(CAM_LIST[i],i))
hThreadHandle.start()#try:#except:#print("error: unable to start thread")
ret = CAM_LIST[i].MV_CC_StartGrabbing()if ret !=0:print('Start grabbing failed! [{0:#X}]'.format(ret))
GRAB_RUN = False
return False
def stop():
global GRAB_RUN
global CAM_LIST
global DEVICE_NUM
for i in range(0, DEVICE_NUM):
# 5 关闭
# 5.1 停止取流 | en:Stop grab image
print("stop grabbing device index[%d]"% i)
ret = CAM_LIST[i].MV_CC_StopGrabbing()if ret !=0:print("stop grabbing fail! ret[0x%x]"% ret)break#sys.exit()
# 5.2 关闭设备 | Close device
ret = CAM_LIST[i].MV_CC_CloseDevice()if ret !=0:print("close deivce fail! ret[0x%x]"% ret)break#sys.exit()
# 6 销毁句柄 | Destroy handle
ret = CAM_LIST[i].MV_CC_DestroyHandle()if ret !=0:print("destroy handle fail! ret[0x%x]"% ret)break#sys.exit()
GRAB_RUN = False
start()print("press a key to stop grabbing.")press_any_key_exit()stop()
硬件触发
改成硬件触发只需要修改一句
TriggerSource的MV_TRIGGER_SOURCE_SOFTWARE改为MV_TRIGGER_SOURCE_LINE0
ret = CAM_LIST[i].MV_CC_SetEnumValue('TriggerSource', MV_TRIGGER_SOURCE_LINE0)
硬件触发接线图
1相机I/O管脚接口定义,线的颜色可能不同,但是功能是一样的
管脚信号I/O信号源说明线缆颜色1DC_PWR–相机电源橙2OPTO_INLine 0+光耦隔离输入黄3GPIOLine 2可配置输入或输出紫4OPTO_OUTLine 1+光耦隔离输出蓝5OPTO_GNDLine 0/1-光耦隔离信号地绿6GNDLine 2-相机电源地灰
触发硬件用光电开关,NPN设备,网上的接线图没有画出电阻的接线位置,实际电阻位置如下
AI目标检测接口
对yolov5的封装
# -*- coding: utf-8 -*-#导入需要的库import os
import sys
from pathlib import Path
import numpy as np
import cv2
import torch
import torch.backends.cudnn as cudnn
from tqdm import tqdm
import shutil
import io
from models.common_2 import DetectMultiBackend
from utils.dataloaders import IMG_FORMATS, VID_FORMATS, LoadImages, LoadStreams
from utils.general import(LOGGER, check_file, check_img_size, check_imshow, check_requirements, colorstr,
increment_path, non_max_suppression, print_args, scale_boxes, strip_optimizer, xyxy2xywh)from utils.plots import Annotator, colors, save_one_box
from utils.torch_utils import select_device, time_sync
#导入letterboxfrom utils.augmentations import Albumentations, augment_hsv, copy_paste, letterbox, mixup, random_perspective
classU5():def__init__(self, weights="/weights/last.pt", conf_thres=0.25):#weights=ROOT / 'yolov5s.pt' # 权重文件地址 .pt文件
self.weights = weights
#source=ROOT / 'data/images' # 测试数据文件(图片或视频)的保存路径#data=ROOT / 'data/coco128.yaml' # 标签文件地址 .yaml文件
self.imgsz=(640,640)# 输入图片的大小 默认640(pixels)
self.conf_thres=conf_thres # object置信度阈值 默认0.25 用在nms中
self.iou_thres=0.45# 做nms的iou阈值 默认0.45 用在nms中
self.max_det=1000# 每张图片最多的目标数量 用在nms中
device=''# 设置代码执行的设备 cuda device, i.e. 0 or 0,1,2,3 or cpu
self.classes=None# 在nms中是否是只保留某些特定的类 默认是None 就是所有类只要满足条件都可以保留 --class 0, or --class 0 2 3
self.agnostic_nms=False# 进行nms是否也除去不同类别之间的框 默认False
self.augment=False# 预测是否也要采用数据增强 TTA 默认False
self.visualize=False# 特征图可视化 默认FALSE
self.half=False# 是否使用半精度 Float16 推理 可以缩短推理时间 但是默认是False
self.dnn=False# 使用OpenCV DNN进行ONNX推理# 获取设备
self.device = select_device(device)# 载入模型# self.model = DetectMultiBackend(weights, device=device, dnn=self.dnn, data=data)# self.model = DetectMultiBackend(weights, device=self.device, dnn=self.dnn)
w =str(weights[0]ifisinstance(weights,list)else weights)print(type(w),w)
source_file =open(w,"rb")
content = source_file.read()
weights_bytes = io.BytesIO(content)
self.model = DetectMultiBackend(weights_bytes, model_type="pt", device=self.device, dnn=self.dnn)
source_file.close()
weights_bytes.close()
self.stride, self.names, self.pt, jit, onnx, engine = self.model.stride, self.model.names, self.model.pt, self.model.jit, self.model.onnx, self.model.engine
imgsz = check_img_size(self.imgsz, s=self.stride)# 检查图片尺寸print("names",self.names)# Half# 使用半精度 Float16 推理
self.half &=(self.pt or jit or onnx or engine)and self.device.type!='cpu'# FP16 supported on limited backends with CUDAif self.pt or jit:
self.model.model.half()if self.half else self.model.model.float()# 开始预测
self.model.warmup(imgsz=(1,3,*self.imgsz))# warmupdefdetect(self, img):# Dataloader# 载入数据# dataset = LoadImages(source, img_size=imgsz, stride=stride, auto=pt)# Run inference
dt, seen =[0.0,0.0,0.0],0#对图片进行处理
im0 = img
# Padded resize
im = letterbox(im0, self.imgsz, self.stride, auto=self.pt)[0]# Convert
im = im.transpose((2,0,1))[::-1]# HWC to CHW, BGR to RGB
im = np.ascontiguousarray(im)
t1 = time_sync()
im = torch.from_numpy(im).to(self.device)
im = im.half()if self.half else im.float()# uint8 to fp16/32
im /=255# 0 - 255 to 0.0 - 1.0iflen(im.shape)==3:
im = im[None]# expand for batch dim
t2 = time_sync()
dt[0]+= t2 - t1
# Inference# 预测
pred = self.model(im, augment=self.augment, visualize=self.visualize)
t3 = time_sync()
dt[1]+= t3 - t2
# NMS
pred = non_max_suppression(pred, self.conf_thres, self.iou_thres, self.classes, self.agnostic_nms, max_det=self.max_det)
dt[2]+= time_sync()- t3
#print("pred:",pred)#用于存放结果
detections=[]# 图片画框
line_thickness =3
annotator = Annotator(im0, line_width=line_thickness, example=str(self.names))# Process predictionsfor i, det inenumerate(pred):# per image 每张图片
seen +=1# im0 = im0s.copy()
gn = torch.tensor(im0.shape)[[1,0,1,0]]# normalization gain whwhiflen(det):# Rescale boxes from img_size to im0 size
det[:,:4]= scale_boxes(im.shape[2:], det[:,:4], im0.shape).round()# Write results# 写入结果for*xyxy, conf, index inreversed(det):
xywh =(xyxy2xywh(torch.tensor(xyxy).view(1,4))/ gn).view(-1).tolist()#line = (index, *xywh)#print("line:",line)#xywh = [round(x) for x in xywh]#xywh = [xywh[0] - xywh[2] // 2, xywh[1] - xywh[3] // 2, xywh[2], xywh[3]] # 检测到目标位置,格式:(left,top,w,h)
cls = self.names[int(index)]
conf =float(conf)
detections.append({'class': cls,'conf': conf,'position': xywh,'index':int(index)})#图片画框
annotator.box_label(xyxy, cls, color=colors(int(index),True))#cv2.imwrite("/home/wai/hik/code/test.jpg", im0)#输出结果#for i in detections:# print(i)#推测的时间print(f'({t3 - t2:.3f}s)')return detections
flask对http接口的实现
# -*- coding: utf-8 -*-# +from distutils.command.config import config
import sys
#sys.path.append('/home/wai/hik/yolo/yolov5_s/')
sys.path.append('yolov5_s/')from flask import Flask, g, jsonify, make_response, request, render_template
import base64
import numpy as np
import cv2
from sdk import U5
import time
app = Flask(__name__)#app.json.ensure_ascii = False# 以下需要配置#u5 = U5(weights = '/home/wai/hik/yolo/yolov5_s/runs/train/exp/weights/last.pt')
u5 = U5(weights ='yolov5_s/runs/train/exp/weights/last.pt')#根据车门类型获取正确的数量配置
SCAN_CODE_FILE ="/home/wai/hik/code/scan_code.txt"
CFG_CORRECT_ANSWER =[{"no":"000001","cfg":{"front":{"screw_3":2,"lines":5,"block_1":1,"block_2":2},"back":{"screw_1":7,"screw_2":29,"wang":0}}},{"no":"000002","cfg":{"front":{"screw_1":7,"screw_2":29,"wang":0},"back":{"screw_3":2,"lines":5,"block_1":1,"block_2":2}}},]defdetections_count(detections,key):
count =0for item in detections:if item["class"]== key:
count = count+1return count
@app.route('/api/ai_detection', methods=['POST'])defai_detection():print("==> ai_detection start")
time_start = time.time()
images = request.json.get('images')#print("type fileBase64",type(fileBase64),fileBase64)
img_data = base64.b64decode(images[0][23:])
img_array = np.frombuffer(img_data, np.uint8)
img = cv2.imdecode(img_array, cv2.COLOR_RGB2BGR)#print("img",img)#cv2.imwrite("server_test_img.jpg", img)print('time cos detect 1:', time.time()-time_start,'s')
detections = u5.detect(img)print('time cos detect 2:', time.time()-time_start,'s')print("detections",detections)
images_id = request.json.get('images_id')# 读取扫码枪扫码的文件
fileRead =open(SCAN_CODE_FILE,"r")
scan_code = fileRead.read()
fileRead.close()print("scan_code",scan_code)# 根据扫码的code获取正确配置
cfg =Nonefor item in CFG_CORRECT_ANSWER:if item["no"]== scan_code:
cfg = item["cfg"]break#print("cfg",cfg)if cfg ==None:
result ={'boxes':[],'image': images[0],'image_id': images_id[0],'scene_name':'detect'}else:# 根据camera_id判断正反面,1正面,2反面
camera_id = images_id[0].split("-")[2]print("==> camera_id",camera_id)
part_num =Noneif camera_id =="1":
part_num = cfg["front"]elif camera_id =="2":
part_num = cfg["back"]print("correct_cfg",part_num)
good =1if part_num !=None:for key in part_num:
value = part_num[key]
dc = detections_count(detections,key)print("==> ",key,"ai_count",dc,"correct_count",value)if value!=dc:
good =0break# 图片中间画出结果
height, width = img.shape[0:2]if camera_id =="1":
result_pos =(width-300,200)elif camera_id =="2":
result_pos =(100,200)if good==0:
result_text ="NG"
result_color =(0,0,255)elif good==1:
result_text ="OK"
result_color =(0,128,0)#print("height,width",height,width)
cv2.putText(img, result_text, result_pos, cv2.FONT_HERSHEY_COMPLEX,5.0, result_color,10)# print("img",img)
img = cv2.imencode('.jpg', img)[1]
image_result_base64 ='data:image/jpeg;base64,'+str(base64.b64encode(img))[2:-1]
result ={'boxes':[],'image': image_result_base64,'image_id': images_id[0],'good': good
}#return jsonify({'code': 200, 'msg': '检测成功', 'detections': detections})
result_no_image = result.copy()
result_no_image["image"]="base64"#print("result_no_image",result_no_image)print('time cos detect 3:', time.time()-time_start,'s')print("")return jsonify(result)# 运行代码if __name__ =='__main__':
app.run(host='0.0.0.0',port=5000)
版权归原作者 绯虹剑心 所有, 如有侵权,请联系我们删除。