0


基于开源的libwebsocket实现websocket服务端和客户端

最近在做一个项目,项目中需要服务端后端和web端进行websocket通信,需要后端给web端不断地发送数据,之前程序已经写好,功能测试没有问题,代码如下:

#include"web_socket_server.h"extern std::mutex radar_obj_mutex_;extern std::map<std::string,std::string> g_map_obj_list_;#defineSERVICE_RADARWEBSOCKET"radarwebsocket"structper_session_data__http{int fd;};#defineMAX_PAYLOAD_SIZE10*1024structsession_data{int msg_count;unsignedchar buf[LWS_PRE + MAX_PAYLOAD_SIZE];int len;};staticintcallback_http(structlws*wsi,enumlws_callback_reasons reason,void*user,void*in,size_t len){return0;}intCallBackFunc(structlws*wsi,enumlws_callback_reasons reason,void*user,void*in,size_t len){// char message[] = "Hello, client!";int mRet =0;//for write msg to client
    InfoItem_t    info;uint64_t delSessionID =0;switch(reason){case LWS_CALLBACK_CLIENT_ESTABLISHED:
            std::cout <<"client connected to server"<< std::endl;break;case LWS_CALLBACK_CLOSED:break;case LWS_CALLBACK_SERVER_WRITEABLE:if(!g_map_obj_list_.empty()){size_t sendLen =sizeof(RADAR_OBJ_INFO);
                sendLen =1024*16;if(recvBuf ==NULL){HKLOG_write(HKLOG_LEVEL_ERROR,"malloc count*****************!");
                    recvBuf =(unsignedchar*)malloc(sendLen *sizeof(char));}if(!recvBuf){free(recvBuf);break;}memset(recvBuf,0,sendLen);//memcpy(recvBuf,&package_head,sizeof(package_head));//memcpy(recvBuf + sizeof(package_head),pDataBuffer,16);
                std::lock_guard<std::mutex>lock(radar_obj_mutex_);auto it = g_map_obj_list_.begin();
                std::string key = it->first;
                std::string strBody = it->second;
                g_map_obj_list_.erase(it);//char* buffer = reinterpret_cast<char*>(&strBody);//memcpy(recvBuf + sizeof(package_head),strBody.c_str(),strBody.length());//memcpy(recvBuf, strBody.c_str(), sendLen); // 使用memcpy进行复制//recvBuf[sendLen] = '\0'; // 添加结尾的空字符memcpy(recvBuf, strBody.c_str(), std::min((int)sendLen,(int)strBody.length()));// 使用memcpy进行复制
                recvBuf[std::min((int)sendLen,(int)strBody.length())]='\0';// 添加结尾的空字符//strcpy(recvBuf, (unsigned char*)strBody.c_str());// HKLOG_write(HKLOG_LEVEL_INFO, "sendLen %d,bodylen %d!",sendLen,strBody.length());
                mRet =lws_write(wsi, recvBuf,strlen(strBody.c_str()), LWS_WRITE_TEXT);if(mRet <0){HKLOG_write(HKLOG_LEVEL_ERROR," LWS_CALLBACK_SERVER_WRITEABLE error %d!",mRet);}}break;case LWS_CALLBACK_CLIENT_RECEIVE:// 处理接收到的数据lwsl_notice("LWS_CALLBACK_CLIENT_RECEIVE\n");break;default:break;}return0;}staticstructlws_protocols protocols[]={/* first protocol must always be HTTP handler */{"http-only",/* name */
        callback_http,/* callback */sizeof(structper_session_data__http),/* per_session_data_size */0,/* max frame size / rx buffer */},{
        SERVICE_RADARWEBSOCKET,
        CallBackFunc,sizeof(structsession_data),/* per_session_data_size */0,},{NULL,NULL,0,0}/* terminator */};voidwebsocketProc(){prctl(PR_SET_NAME,"websocketProc");structlws_context_creation_info info;memset(&info,0,sizeof(lws_context_creation_info));
    info.protocols = protocols;
    info.gid =-1;
    info.uid =-1;
    info.port =8081;//设置websocket的keepalive参数//info.ka_time = 10;                // 10s//info.ka_probes = 10;        //info.ka_interval = 3;            // 3次structlws_context*pContext =lws_create_context(&info);if(pContext ==NULL){HKLOG_write(HKLOG_LEVEL_ERROR,"libwebsocket init failed!");return;}HKLOG_write(HKLOG_LEVEL_INFO,"libwebsocket create finish %p\n!",pContext);while(true){lws_service(pContext,10);// 10毫秒的服务循环// 向客户端发送消息lws_callback_on_writable_all_protocol(pContext,&protocols[1]);}HKLOG_write(HKLOG_LEVEL_INFO,"libwebsocket destroy %p\n!",pContext);lws_context_destroy(pContext);return;}

但是当web界面刷新的时候,会在 lws_service处崩溃,怀疑是在调用 lws_callback_on_writable_all_protocol(pContext, &protocols[1]);的时候没有连接的客户端有效性做校验,后来代码对连接的客户端进行了sessionid的管理。
如下图:

intCWebSocketProtocol::callback(structlws*wsi,enumlws_callback_reasons reason,void*user,void*in, size_t len){int m =0;bool bRet =false;
    Json::Reader reader;
    Json::Value value = Json::Value::null;//for write msg to client
    InfoItem_t    info;
    std::shared_ptr<MsgItem_t>item(nullptr);
    std::string str;
    std::string send;
    Json::FastWriter writer;char*pBuffer =NULL;//uint clienttype = 0;//char szUserName[32] = {0};//char szPassword[32] = {0};//buffer for closechar closebuffer[LWS_SEND_BUFFER_PRE_PADDING + LWS_SEND_BUFFER_POST_PADDING]={0};//for get the sessionID from clientuint64_t sessionID =0;uint64_t delSessionID =0;//for fragmentstructper_session_data*psd =(structper_session_data*)user;
    size_t ran =0;int write_mode = LWS_WRITE_TEXT;//struct lws *last_wsi = nullptr;switch(reason){case LWS_CALLBACK_ESTABLISHED:
        info.session =0;
        info.timestamp =getCurElapseMSecLinux();
        psd->left_message =0;
        psd->total_message =0;
        psd->pBuffer =nullptr;
        psd->state = FRAGSTATE_START_MESSAGE;
        psd->last_flag =0;if(!m_pWebSockSessionManager->checkIfArriveMaxSession()){printf("%s : %p LWS_CALLBACK_ESTABLISHED\n", m_ProtocolName.c_str(), wsi);AddClient2Queue(wsi, info);}else{printf("%s: %p Reach Max Session, LWS_WRITE_CLOSE\n", m_ProtocolName.c_str(), wsi);lws_close_reason(wsi,LWS_CLOSE_STATUS_INVALID_PAYLOAD,(unsignedchar*)closebuffer,0);}break;case LWS_CALLBACK_CLOSED:printf("%s : %p LWS_WRITE_CLOSE\n", m_ProtocolName.c_str(), wsi);if(psd->pBuffer){delete[] psd->pBuffer;
            psd->pBuffer =nullptr;}
        delSessionID =RemoveClientFromQueue(wsi);RemoveWebClientFromProtocol(wsi);
        m_pWebSockSessionManager->deleteSessionById(delSessionID);break;case LWS_CALLBACK_SERVER_WRITEABLE:break;case LWS_CALLBACK_RECEIVE:
        sessionID =0;
        bRet =false;
        str.assign((constchar*)in, len);printf("%s : %p %s, LWS_CALLBACK_RECEIVE\n", m_ProtocolName.c_str(), wsi, str.c_str());/*{
            "method" : { "service" : "Subscribe", "protocol" : "radarwebsocket" }
         }*/if(reader.parse(str, value)){// if(value.isMember("method")&& value["method"].isMember("service")&& value["method"]["service"].isString()){

                std::string method = value["method"]["service"].asString();if("Subscribe"== method && value["method"]["protocol"].isString()&& m_ProtocolName == value["method"]["protocol"].asString()){if(1)//verifyUser(userName, passWordDigestBase, nonce,created)){
                        sessionID = m_pWebSockSessionManager->getSessionIdBysessionFd(lws_get_socket_fd(wsi));if(0== sessionID){
                            sessionID = m_pWebSockSessionManager->createNewSession(lws_get_socket_fd(wsi));printf("new sessionId:%llu\n",(unsignedlonglong)sessionID);}else{printf("sdk websocket had sessionId:%llu\n",(unsignedlonglong)sessionID);}
                        bRet =true;}if(sessionID !=0){
                        bRet =parseProtocol(value, wsi);
                        value["session"]=(Json::UInt64)sessionID;}}}}if(!bRet){lws_close_reason(wsi,LWS_CLOSE_STATUS_INVALID_PAYLOAD,(unsignedchar*)closebuffer,0);RemoveClientFromQueue(wsi);RemoveWebClientFromProtocol(wsi);
            m_pWebSockSessionManager->deleteSessionById(sessionID);printf("%s : %p LWS_WRITE_CLOSE\n", m_ProtocolName.c_str(), wsi);}else{SetClientSessionID(wsi, sessionID, WS_CLIENT_TYPE_SDK);printf("%s : %p client new in\n",m_ProtocolName.c_str(), wsi);}//here you can receive data from clientbreak;/*
     * this just demonstrates how to use the protocol filter. If you won't
     * study and reject connections based on header content, you don't need
     * to handle this callback
     */case LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION:dump_handshake_info(wsi);/* you could return non-zero here and kill the connection */break;default:break;}return0;}

实现的客户端的代码如下:

#include"libwebsockets.h"#include<signal.h>#include"json/json.h"

using namespace std;staticvolatileint exit_sig =0;#defineMAX_PAYLOAD_SIZE10*1024uint8_t g_curLogLevel = HKLOG_LEVEL_INFO;voidsighdl(int sig ){lwsl_notice("%d traped", sig );
    exit_sig =1;}/**
 * 会话上下文对象,结构根据需要自定义
 */structsession_data{int msg_count;unsignedchar buf[LWS_PRE + MAX_PAYLOAD_SIZE];int len;};/**
 * 某个协议下的连接发生事件时,执行的回调函数
 * wsi:指向WebSocket实例的指针
 * reason:导致回调的事件
 * user 库为每个WebSocket会话分配的内存空间
 * in 某些事件使用此参数,作为传入数据的指针
 * len 某些事件使用此参数,说明传入数据的长度
 */intcallback(structlws*wsi,enumlws_callback_reasons reason,void*user,void*in,size_t len ){structsession_data*data =(structsession_data*) user;
    
    std::string strBody;int isize =0;
    bool parsingSuccessful = false;charconst* begin = nullptr;charconst* end = nullptr;
    Json::Value root;
    Json::CharReaderBuilder readerBuilder;
    Json::CharReader* reader = nullptr;
    std::string errors;
    std::string strrecv;char* received_data = nullptr;switch( reason ){case LWS_CALLBACK_CLIENT_ESTABLISHED:// 连接到服务器后的回调lwsl_notice("Connected to server\n");break;case LWS_CALLBACK_CLIENT_RECEIVE:// 接收到服务器数据后的回调,数据为in,其长度为len
            isize =sizeof(RADAR_OBJ_INFO);//lwsl_notice( "RxBody: %s,len: %d,isize: %d\n", (char *) in,len,isize);
            received_data = static_cast<char*>(in);
            strrecv = received_data;
            strBody.assign((constchar*)in, len);
            std::cout <<"client Received: "<< static_cast<char*>(in)<< std::endl;
            std::cout <<"strrecv: "<< strrecv.c_str()<< std::endl;printf("Read bodyinfo: [ %s ]\n", strBody.c_str());
            strBody = HKCodingConv::AnsiToUtf8(strBody);HKLOG_write(HKLOG_LEVEL_INFO," LWS_CALLBACK_SERVER_WRITEABLE body %s!",strBody.c_str());
            begin = strBody.data();
            end = begin + strBody.size();
            reader = readerBuilder.newCharReader();
            parsingSuccessful = reader->parse(begin, end,&root,&errors);if(!parsingSuccessful){//  解析失败,输出错误信息
                std::cout <<"Error parsing JSON: "<< errors << std::endl;}break;case LWS_CALLBACK_CLIENT_WRITEABLE:// 当此客户端可以发送数据时的回调if( data->msg_count <1){// 前面LWS_PRE个字节必须留给LWS//"method" : { "service" : "Subscribe", "protocol" : "radarwebsocket" }memset( data->buf,0,sizeof( data->buf ));char*msg =(char*)&data->buf[ LWS_PRE ];

                Json::Value eventInfo;
                eventInfo["service"]="Subscribe";
                eventInfo["protocol"]="radarwebsocket";
                Json::Value parent;
                parent["method"]= eventInfo;
                Json::FastWriter writer;
                std::string strresponse = writer.write(parent);

                data->len =sprintf( msg,"%s %d",strresponse.c_str(),++data->msg_count );lwsl_notice("Tx: %s\n", msg );// 通过WebSocket发送文本消息lws_write( wsi,&data->buf[ LWS_PRE ], data->len, LWS_WRITE_TEXT );}break;}return0;}/**
 * 支持的WebSocket子协议数组
 * 子协议即JavaScript客户端WebSocket(url, protocols)第2参数数组的元素
 * 你需要为每种协议提供回调函数
 */structlws_protocols protocols[]={{//协议名称,协议回调,接收缓冲区大小"radarwebsocket", callback,sizeof(structsession_data),4*1024*1024,},{NULL,NULL,0,0}// 结束标识};intmain(){HKLOG_init("TEST");HKLOG_set_level(g_curLogLevel);// 信号处理函数signal( SIGTERM, sighdl );// 用于创建vhost或者context的参数structlws_context_creation_info ctx_info ={0};
    ctx_info.port = CONTEXT_PORT_NO_LISTEN;
    ctx_info.protocols = protocols;
    ctx_info.gid =-1;
    ctx_info.uid =-1;// 创建一个WebSocket处理器structlws_context*context =lws_create_context(&ctx_info );char*address ="172.0.0.1";int port =8085;char addr_port[256]={0};sprintf( addr_port,"%s:%u", address, port &65535);// 客户端连接参数structlws_client_connect_info conn_info ={0};
    conn_info.context = context;
    conn_info.address = address;
    conn_info.port = port;
    conn_info.ssl_connection =0;
    conn_info.path ="/";
    conn_info.host = addr_port;
    conn_info.origin = addr_port;
    conn_info.protocol = protocols[0].name;// 下面的调用触发LWS_CALLBACK_PROTOCOL_INIT事件// 创建一个客户端连接structlws*wsi =lws_client_connect_via_info(&conn_info );while(!exit_sig ){// 执行一次事件循环(Poll),最长等待1000毫秒lws_service(context,20);/**
         * 下面的调用的意义是:当连接可以接受新数据时,触发一次WRITEABLE事件回调
         * 当连接正在后台发送数据时,它不能接受新的数据写入请求,所有WRITEABLE事件回调不会执行
         */lws_callback_on_writable( wsi );}// 销毁上下文对象lws_context_destroy( context );return0;}

亲测服务端和客户端是可以通信的,完整的代码连接如下:
https://blog.csdn.net/weixin_46543392/article/details/140500490?spm=1001.2014.3001.5502


本文转载自: https://blog.csdn.net/weixin_46543392/article/details/140497707
版权归原作者 想年薪百万的it小小鸟 所有, 如有侵权,请联系我们删除。

“基于开源的libwebsocket实现websocket服务端和客户端”的评论:

还没有评论