0


C/C++高性能服务器网络库框架libhv源码解读

libhv是国人开发的开源C/C++跨平台网络库,底层功能由C实现(60%),使用C++封装(30%),可用于实现C/C++高性能服务器。

开源地址:https://github.com/ithewei

本章是学习libhv框架的简单记录,想学习使用libhv的朋友可以多看看作者的博客,写的很详细:https://hewei.blog.csdn.net/?type=blog
github上的中文文档:https://github.com/ithewei/libhv/tree/master/docs/cn

推荐libhv的理由:
1、完善的跨平台处理(Linux, Windows, macOS, Android, iOS, BSD, Solaris)。
2、功能全面,涵盖了大部分常用的服务端开发模块,实际开发中即使不使用libhv库,也可以从中找到需要的模块移植到自己的项目中。
3、核心的事件循环等模块,分别实现了C和C++封装,可适用于纯C项目或C++项目。
4、国人开发,有中文文档,代码可读性强,方便移植和二次开发,非常值得学习和借鉴。

目录

libhv功能划分

1、基础模块:原子操作,内存分配与释放,字符串操作,文件夹/路径操作,随机数,大小端字节序,数学函数,线程锁,跨平台宏,日期时间,文件操作,文件缓存,dir/ls,url,常用宏定义。
2、数据结构:动态数组宏,堆,缓存,链表,队列,红黑树,MultiMap,JSON。
3、功能模块:日志,网络日志,错误,命令行参数,配置文件,信号处理,软件版本,自动析构,单例宏,线程私有变量,资源池,线程池,拆包组包。
4、网络模块:socket通用接口,ifconfig,心跳,重连,转发,tcp客户端/服务端,udp客户端/服务端,http客户端/服务端,WebSocket客户端/服务端。
5、事件循环:事件,事件循环,事件循环线程,事件循环线程池,定时器,网络IO,异步自定义事件,epoll,poll,select,iocp(win),kqueue(OS_BSD/OS_MAC),evport(OS_SOLARIS)。
6、协议编码:gRPC,rudp,kcp,ssl,tls,mqtt,dns,ftp,icmp,smtp,base64,md5,sha1。

libhv源码框架

按文件夹划分:base(C基础模块),cpputil(C++基础模块),event(C事件循环),evpp(C++事件循环封装),http(http封装),mqtt(MQTT协议客户端),protocol(常用协议),ssl,util(编码和加密)。

base

array.h

#define ARRAY_DECL,预定义宏实现的动态数组。

hatomic.h

原子操作,如果是C++11环境使用std标准库里的原子定义,否则自定义原子数据类型。

hbase.h

基础接口,内存的分配与释放,字符串处理,创建文件夹,文件类型路径判断,可执行文件路径,生成随机数,url 解析等。

hbuf.h

缓存(hbuf_t),带偏移的缓存(offset_buf_t),带头和尾位置的缓存(fifo_buf_t)。
class HBuf : public hbuf_t,对 hbuf_t 的封装,提供接口:data、size、isNull、cleanup、resize、copy。
class HVLBuf : public HBuf,对 HBuf 的封装,可扩容的缓存。
class HRingBuf : public HBuf,对 HBuf 的封装,环形缓存。

hdef.h

常用宏定义,取绝对值,数组大小,安全释放指针,字节合并与拆分,字母大小写,是否16进制等。

heap.h

数据结构堆,struct heap_node,struct heap,并提供操作接口:heap_init、heap_replace、heap_swap、heap_insert、heap_remove、heap_dequeue。
heap时间复杂度:查询最小的超时时间复杂度为O(1),添加删除复杂度O(lgN)。

hendian.h

大小端,字节序操作。

herr.h

错误码定义,使用宏定义的方式,可根据错误码查询错误描述。

hlog.h

日志记录,C语言风格,提供宏接口,没有使用独立线程,多线程通过锁确保安全;可以设置字体颜色,日志级别,日志文件大小,日志保存天数。

hmain.h

命令行参数解析,后台运行、创建pid文件,信号处理:signal_init,signal_handle,开始、停止、重启进程,master-workers多进程模式、崩溃自动重启等。

hmath.h

数学函数

hmutex.h

线程同步锁,互斥锁、读写锁、条件变量、信号量的宏定义,class MutexLock、class SpinLock、class RWLock、class LockGuard 对上述的封装。

hplatform.h

实现跨平台相关的宏

hproc.h

多进程操作

hsocket.h

socket通用接口封装

hsysinfo.h

获取系统信息

hthread.h

class HThread,线程基类封装,线程状态、睡眠策略、线程暂停与重新开始。
全局接口:hv_getpid 获取进程ID,hv_gettid 获取线程ID。

htime.h

日期时间,时间获取,时间格式化,时间转换等。

hversion.h

软件版本获取,版本转换。

list.h

struct list_head,struct hlist_head,struct hlist_node,自定义链表。
list时间复杂度:添加删除复杂度为O(1),遍历访问。

netinet.h

IP、UDP、TCP、ICMP头定义,checksum。

queue.h

队列,宏定义实现,先进先出,#define QUEUE_DECL(type, qtype)。

rbtree.h

红黑树

cpputil

hasync.h

class HV_EXPORT GlobalThreadPool : public HThreadPool,封装 HThreadPool ,实现单例。
class async,接口:startup,启动 GlobalThreadPool 线程池。
auto async(Fn&& fn, Args&&… args),全局接口,在 GlobalThreadPool 线程池中执行异步任务。

hdir.h

typedef struct hdir_s,接口:listdir,功能类似linux中的ls命令和windows中的dir命令。

hfile.h

  1. class HFile,文件操作,成员:filepathFILE* fp,接口:opencloseremoverenamereadwriteseektellreadallreadlinereadrange

hmap.h

class MultiMap : public multimap<Key, Value>,把[]操作符封装成insert。

hobjectpool.h

class HObjectFactory,接口:static T* create(),return new T;提供接口创建对象。
class HObjectPool,对象池,类似资源池的设计,设置固定大小的对象池,池内有空闲对象则直接用;没有空闲对象但还没超过池的最大值,直接new一个;如果超过了池的最大值,阻塞等待_timeout时间,此期间有对象被归还给池则使用,超时则分配失败。(Bad design)
class HPoolObject,对 HObjectPool 的封装,可使用 HPoolObject 获取 HObjectPool 池中的一个对象,成员:HObjectPool<T, TFactory> pool_,std::shared_ptr sptr_。

hpath.h

class HV_EXPORT HPath,提供静态接口,常用路径操作,如exists、isdir、isfile、islink,一个完整路径的每段获取,文件名、后缀名、文件夹名等。

hscope.h

class Defer,构建时传入 Function,析构时执行Function。
class ScopeCleanup,类似 Defer,但可通过bind绑定 Function 和参数列表。
class ScopeFree,构造时传入指针,析构时使用 free 释放指针。
class ScopeDelete,构造时传入指针,析构时使用 delete 释放指针。
class ScopeDeleteArray,构造时传入指针,析构时使用 delete 释放指针数组。
class ScopeRelease,构造时传入指针,析构时使用 release 接口释放指针。
class ScopeLock,构造时对线程锁lock,析构时unlock。

hstring.h

  1. std::string操作封装,转换,大小写转换,反转,比较,是否包含,拼接,分隔,修剪,替换等。
  2. class StringCaseLess : public std::less<std::string>,重载operator(),比较字符串大小。

hthreadpool.h

class HThreadPool,线程池,成员:std::list threads,std::queue tasks;接口:start、stop、pause、resume、commit;线程池共享一个任务队列,commit 提交任务到队列,通过信号量唤醒线程执行;设置最大和最小线程数量,不忙时减少线程数量至最小,忙时增加线程数量至最大。

hurl.h

class HV_EXPORT HUrl,url格式转换。

ifconfig.h

接口:ifconfig,提供类似linux命令ifconfig的功能,返回 ifconfig_t 数组,查询内容:name、ip、mask、broadcast、mac。

iniparser.h

class HV_EXPORT IniParser,配置文件读写。

json.hpp

json操作封装(25000多行代码…)。

singleton.h

单例宏定义。

ThreadLocalStorage.h

class HV_EXPORT ThreadLocalStorage,线程私有变量 pthread_key_t 的封装,接口:set,get,setThreadName等。

event

hkcp.h

kcp封装

ikcp.h

kcp宏定义和相关接口。

hevent.h

事件结构体定义,宏定义,struct hloop_s,struct hidle_s,struct htimer_s,struct htimeout_s,struct hperiod_s,struct hio_s 等,hio_t 相关接口。
hloop_s:事件循环结构体,包含运行状态、线程ID、用户数据、激活事件数、定时器事件堆、IO事件数组、空闲事件链表、自定义事件队列、事件优先级队列、互斥锁、事件FD(用于唤醒事件循环)等。

hloop.h

事件循环线程接口,和 hevent 一样是C语言风格的,提供结构体(hevent_s)和全局接口,包括 socket IO接口、事件循环线程接口、定时器接口、SSL/TLS接口的封装等,拆包设置、可靠udp、重连设置、负载均衡等。
hevent_s:libhv中的事件包括IO事件、timer定时器事件、idle空闲事件、自定义事件,所有事件均继承自该基类。

iowatcher.h

epoll/poll/select/iocp(win)/kqueue(OS_BSD/OS_MAC)/evport(OS_SOLARIS)相关接口,iowatcher_init,iowatcher_add_event,iowatcher_del_event等。

nlog.h

网络日志,接口:network_logger、nlog_listen,成员:network_logger_t s_logger,创建tcp服务端,接受客户端接入,收到客户端消息写入日志。

overlapio.h

重叠IO

rudp.h

可靠用户数据报协议(RUDP)?

unpack.h

拆包接口,hio_unpack,可按照固定长度、分隔符、头部长度字段拆包。

evpp

Buffer.h

typedef HBuf Buffer; typedef std::shared_ptr BufferPtr;

Channel.h

class Channel,把 hio_t 以及C风格接口封装成C++的类。
class SocketChannel : public Channel,加入SSL/TLS,对Channel的进一步封装。

Event.h

struct Event,struct Timer,分别封装 hevent_t 和 htimer_t。

EventLoop.h

class EventLoop : public Status,基于已有或新建 hloop_t 创建事件循环类,成员:hloop_t*,customEvents自定义任务队列,timers定时器队列,接口:run(调用 hloop_run,循环处理epoll/IO事件、定时器事件、自定义事件),stop,pause,resume,setTimer,killTimer,queueInLoop等,可以启动、停止、暂停、重新开始事件循环,增加/删除定时器,把自定义任务加入队列等。

EventLoopThread.h

class EventLoopThread : public Status,成员:EventLoopPtr loop_,std::shared_ptrstd::thread thread_;接口:start、stop,开始/停止事件循环线程,把 EventLoop 的事件循环放在 thread_ 线程里执行。

EventLoopThreadPool.h

class EventLoopThreadPool : public Status,成员:thread_num_、std::vector loop_threads_;接口:start,stop,loop(获取一个 EventLoop),nextLoop(可选轮询调度、随机调度、最小连接数调度);创建指定数量的 EventLoopThread 事件循环线程池。

Status.h

class Status,封装线程状态。

TcpClient.h

class TcpClientEventLoopTmpl,成员:remote_addr,reconn_setting,EventLoopPtr,接口:createsocket,bind,startConnect,startReconnect,send等,tcp客户端接口。
class TcpClientTmpl : private EventLoopThread, public TcpClientEventLoopTmpl,接口:start,stop。
typedef TcpClientTmpl TcpClient;

TcpServer.h

class TcpServerEventLoopTmpl,成员:host,port,listenfd,EventLoopThreadPool,EventLoopPtr,channels,接口:createsocket,closesocket,setMaxConnectionNum,setThreadNum,startAccept,stopAccept,addChannel,connectionNum,broadcast,onAccept等,tcp服务端。
class TcpServerTmpl : private EventLoopThread, public TcpServerEventLoopTmpl,接口:start,stop。
typedef TcpServerTmpl TcpServer;

TimerThread.h

class TimerThread : public EventLoopThread,接口:setTimer,resetTimer,setInterval,killTimer,创建一个事件循环线程,用来执行定时器。

UdpClient.h

class UdpClientEventLoopTmpl,成员:EventLoopPtr,remote_host,remote_port,接口:createsocket,bind,startRecv,sendto等,udp客户端。
class UdpClientTmpl : private EventLoopThread, public UdpClientEventLoopTmpl,接口:start,stop。
typedef UdpClientTmpl UdpClient;

UdpServer.h

class UdpServerEventLoopTmpl,成员:host,port,EventLoopPtr;接口:createsocket,startRecv,stopRecv,start,sendto,udp服务端。
class UdpServerTmpl : private EventLoopThread, public UdpServerEventLoopTmpl,接口:start,stop。
typedef UdpServerTmpl UdpServer;

http

client/
AsyncHttpClient.h

class ConnPool,struct HttpClientTask,struct HttpClientContext。
class AsyncHttpClient : private EventLoopThread,异步http客户端,在事件循环线程发起http请求,在线程内异步接收http响应并解析,成员:std::map<int, SocketChannelPtr> channels;std::map<std::string, ConnPool> conn_pools;成员:send。

axios.h

模拟nodejs axios api

requests.h

模拟python requests api

HttpClient.h

class HttpClient,成员:struct http_client_s http_client_t;同步http客户端,也可进行 AsyncHttpClient 异步操作。

WebSocketClient.h

class WebSocketClient : public TcpClientTmpl,WebSocket客户端,成员:HttpParserPtr、HttpRequestPtr、HttpResponsePtr、WebSocketParserPtr。

server/
FileCache.h

struct file_cache_s,文件缓存结构体。
class FileCache,文件缓存类,成员:FileCacheMap cached_files;成员:Open,Close。

http_page.h

http页面构造,提供接口:make_http_status_page,make_index_of_page,创建 html 格式字符串。

HttpContext.h

struct HttpContext,管理http内容,提供设置和查询接口,成员:HttpService*,HttpRequestPtr,HttpResponsePtr,HttpResponseWriterPtr。

HttpHandler.h

class HttpHandler,http处理主类,成员:hio_t,HttpService,HttpRequestPtr,HttpResponsePtr,HttpResponseWriterPtr,HttpParserPtr,HttpContextPtr,http_handler,WebSocketService,WebSocketChannelPtr,WebSocketParserPtr 等,以及相关接口。

HttpMiddleware.h

class HttpMiddleware,接口:CORS(HttpRequest* req, HttpResponse* resp)。

HttpResponseWriter.h

class HttpResponseWriter : public SocketChannel,http 响应发送类,成员:HttpResponsePtr,接口:WriteHeader、WriteCookie、WriteBody 等。

HttpServer.h

struct http_server_t,http服务端结构体。
class HttpServer : public http_server_t,http服务端类,对 http_server_t 封装接口。

HttpService.h

http业务类 (包括api service、web service、indexof service),struct http_handler,struct http_method_handler,struct HV_EXPORT HttpService。

WebSocketServer.h

struct WebSocketService,class WebSocketServer : public HttpServer,WebSocket服务类。

grpcdef.h

gRPC封装定义,gRPC是Google公司开发的一个高性能、开源和通用的 RPC 框架,面向移动和 HTTP/2 设计;RPC是指远程过程调用。

httpdef.h

http定义,http_status(200OK,404notfound等)、http_method(POST/GET等)、http_content_type 标识码/枚举/描述的映射,并提供方法用于标识码与描述的相互转换,如404对应Not Found。

http_parser.h

http1解析实现,错误码 http_errno,结构体 http_parser、http_parser_settings,http解析方法 http_parser_init、http_parser_execute 等。

HttpParser.h

class HV_EXPORT HttpParser,http解析基类,抽象类。

Http1Parser.h

class Http1Parser : public HttpParser,http1解析类。

Http2Parser.h

http2解析类

http_content.h

struct FormData,格式化数据;struct FormFile : public FormData;typedef HV_MAP<std::string, FormData> MultiPart;
提供接口:dump_multipart、parse_multipart、dump_json、parse_json。
multipart_parser.h
multipart 解析,struct multipart_parser,接口:multipart_parser_init、multipart_parser_execute 等。

HttpMessage.h

http 请求响应类,struct NetAddr,struct HttpCookie,class HttpMessage。
class HV_EXPORT HttpRequest : public HttpMessage,http 请求。
class HV_EXPORT HttpResponse : public HttpMessage,http 响应。

wsdef.h

websocket 接口定义,enum ws_opcode,ws_build_frame,ws_encode_key 等。

websocket_parser.h

websocket 解析基础功能,struct websocket_parser,struct websocket_parser_settings,接口:websocket_parser_init,websocket_parser_execute 等。

WebSocketChannel.h

class WebSocketChannel : public SocketChannel,WebSocket封装,成员:Buffer sendbuf_,std::mutex mutex_;接口:send、sendPing、sendPong。

WebSocketParser.h

class WebSocketParser,websocket 解析类。

mqtt

mqtt_client.h

typedef struct mqtt_client_s mqtt_client_t;class MqttClient,mqtt 协议客户端。

mqtt_protocol.h

mqtt 协议,接口:mqtt_head_pack,mqtt_head_unpack。

protocol

dns.h

dns 解析接口,nslookup。

ftp.h

ftp 协议封装,struct ftp_handle_t,接口:ftp_command_str,ftp_connect,ftp_login,ftp_quit,ftp_upload,ftp_download,ftp_download_with_cb等。

icmp.h

icmp 协议,接口:ping,模拟ping命令。

smtp.h

smtp 协议,struct mail_t,接口:smtp_command_str,smtp_status_str,smtp_build_command,sendmail。

ssl

hssl.h

SSL协议封装,结构体和接口定义。

util

base64.h

BASE64编解码

md5.h

MD5数字摘要,生成字符串或文件的 md5 值,接口:hv_md5,hv_md5_hex。

sha1.h

SHA1安全散列算法,生成字符串或文件的 sha1 值,接口:hv_sha1,hv_sha1_hex。

libhv源码测试

开源项目中有很多测试代码,例如examples/,unittest/,evpp/,本章只测试几个核心模块。

EventLoopThread 事件循环线程测试

1、事件循环线程是一个网络框架的核心, libhv 把事件循环(EventLoopPtr)和线程(std::thread)分开封装,可以先创建 EventLoopPtr ,再创建 EventLoopThread 启动线程。

2、EventLoopThread 构建时可以传入已有的 EventLoopPtr ,如果不传则自动创建一个 EventLoopPtr ;也可传入Functor pre post ,分别在循环前和循环后执行。

3、EventLoopThread 可以暂停(HLOOP_STATUS_PAUSE 线程还在,只是不处理任务),或无激活事件时退出线程,或只执行一次(HLOOP_FLAG_RUN_ONCE),或一直执行。

libhv 事件循环线程主要处理3种任务: 定时器,网络IO,自定义事件。
1、定时器:支持创建单次或循环定时器,存放在小顶堆(timers)中,堆顶即是最先超时的定时器,每次事件循环检测堆顶,超时的定时器会先出堆再重新入堆, libhv 会优先处理定时器任务。事件循环检测堆顶定时器后,会把下一次定时器超时时长 blocktime_ms 传入 epoll_wait ,避免过渡执行循环,如果没有定时器则 epoll_wait 默认100ms超时。
2、自定义任务:使用 epoll 实现事件监听,事件循环刚启动时创建事件(eventfds)和 epoll,调用 runInLoop 把自定义事件加入队列 customEvents ,并通过 eventfds 唤醒 epoll 执行循环。
3、网络IO:使用和自定义事件同一个 epoll 监听网络IO事件,比如创建tcp客户端时把fd传入epoll监听。

创建epoll调用栈

EventLoopThread::start->EventLoopThread::loop_thread->EventLoop::run->hloop_run->hloop_create_eventfds->hread->hio_read->hio_add->iowatcher_add_event->iowatcher_init->epoll_create.

epoll_wait调用栈

EventLoopThread::start->EventLoopThread::loop_thread->EventLoop::run->hloop_run->hloop_process_events->hloop_process_ios->iowatcher_poll_events.

详见源码目录:evpp/EventLoopThread_test.cpp

  1. staticvoidonTimer(TimerID timerID,int n){printf("tid=%ld timerID=%lu time=%lus n=%d\n",hv_gettid(),(unsignedlong)timerID,(unsignedlong)time(NULL), n);}intEventLoopThread_Test(){
  2. HV_MEMCHECK;printf("main tid=%ld\n",hv_gettid());
  3. EventLoopThread loop_thread;const EventLoopPtr& loop = loop_thread.loop();//创建定时器// runEvery 1s
  4. loop->setInterval(1000, std::bind(onTimer, std::placeholders::_1,100));// runAfter 10s
  5. loop->setTimeout(10000,[&loop](TimerID timerID){
  6. loop->stop();});
  7. loop_thread.start();//创建异步自定义事件
  8. loop->queueInLoop([](){printf("queueInLoop tid=%ld\n",hv_gettid());});
  9. loop->runInLoop([](){printf("runInLoop tid=%ld\n",hv_gettid());});// wait loop_thread exit
  10. loop_thread.join();return0;}

定时器测试

详见源码目录:evpp/EventLoopThread_test.cpp
1、可以在 EventLoopThread 内添加定时器,也可以创建一个事件循环线程(TimerThread)专门用来执行定时器,定时器被存放在小顶堆(heap)中,堆顶就是最先超时的定时器.
2、创建定时器时会生成 TimerID(uint64_t 原子) 作为定时器的唯一标识,停止定时器和重启定时器需要传参 TimerID.

添加定时器调用栈

任意线程->EventLoop::setInterval->setTimerInLoop->runInLoop->添加自定义事件调用栈->事件循环线程->EventLoop::setTimer->htimer_add(hloop_t::timers)->添加到 EventLoop::timers

定时器超时调用栈

EventLoopThread::loop_thread->EventLoop::run->hloop_run->hloop_process_events->hloop_process_timers->__hloop_process_timers->EVENT_PENDING(timer) 加入优先级队列->hloop_process_pendings 执行回调

PS: 刚添加定时器时不会立即更新 epoll_wait 超时时长,默认还是100ms超时(建议无事件时 epoll_wait 一直睡眠,添加定时器时立即唤醒一次线程更新 epoll_wait 超时时长).

异步自定义事件

详见源码目录:evpp/EventLoopThread_test.cpp
EventLoop::runInLoop 把自定义事件函数加入 EventLoop 的 customEvents 队列,使用eventfds唤醒线程执行,有锁线程安全,可以在任意线程调用。
添加自定义事件调用栈:

EventLoop::runInLoop->queueInLoop->postEvent->hloop_post_event->event_queue_push_back(&loop->custom_events, ev)
postEvent 传入回调(cb),使用cb创建 EventPtr ev, ev 加入 EventLoop 的 customEvents , ev->event 加入 loop->custom_events。

唤醒循环调用栈

EventLoopThread::loop_thread->EventLoop::run->hloop_run->hloop_process_events->hloop_process_ios->iowatcher_poll_events->EVENT_PENDING(io) 加入待处理队列 pendings

待处理队列调用栈

EventLoopThread::loop_thread->EventLoop::run->hloop_run->hloop_process_events->hloop_process_pendings->cur->cb(cur) 执行回调

hloop_s::pendings(待处理优先级队列),是个二维数组,每个优先级一个数组,优先处理高优先级的数组.

tcp客户端测试

创建tcp socket,非阻塞连接到tcp服务端,实现接收回调 onMessage,连接结果回调 onConnection,可以设置连接失败或断开时重连
创建socket

任意线程->TcpClient::createsocket->创建fd和 hio_t.

非阻塞连接

任意线程->TcpClient::start->事件循环线程->TcpClientTmpl::startConnect->SocketChannel::startConnect->hio_connect->系统调用 connect,根据返回值判断,epoll监听可写事件,创建超时定时器,超时或连接成功后->onConnection,判断是否重连

接收数据

epoll_wait调用栈->iowatcher_poll_events->EVENT_PENDING(io)添加到优先级队列->(处理优先级队列)hloop_process_pendings->hio_handle_events->nio_read->__read_cb->hio_handle_read->hio_read_cb->Channel::on_read->TcpClient::onMessage 执行自定义接收回调

数据发送

任意线程 runInLoop->EventLoop::run->hloop_run->hloop_process_events->hloop_process_pendings->EventLoop::onTimer->Channel::write ->hio_write->__nio_write,系统调用 send

使用 SocketChannel::write 发送,数据发送线程安全锁 hio_t::write_mutex ,支持多线程发送。

详见源码目录:evpp/TcpClient_test.cpp

  1. intTcpClient_Test(){constchar* remote_host ="192.168.3.91";int remote_port =9090;
  2. TcpClient cli;//设置tcp组包策略为固定分隔符,只有收到OK时才会调用 onMessage 接收回调函数;//如果不设置 setUnpack ,则收到消息会直接调用 onMessage 回调。
  3. unpack_setting_t OK_unpack_setting;memset(&OK_unpack_setting,0,sizeof(unpack_setting_t));
  4. OK_unpack_setting.package_max_length = DEFAULT_PACKAGE_MAX_LENGTH;
  5. OK_unpack_setting.mode = UNPACK_BY_DELIMITER;
  6. OK_unpack_setting.delimiter[0]='O';
  7. OK_unpack_setting.delimiter[1]='K';
  8. OK_unpack_setting.delimiter_bytes =2;
  9. cli.setUnpack(&OK_unpack_setting);int connfd = cli.createsocket(remote_port, remote_host);if(connfd <0){return-20;}printf("client connect to port %d, connfd=%d ...\n", remote_port, connfd);//连接成功或断开连接回调
  10. cli.onConnection =[&cli](const SocketChannelPtr& channel){
  11. std::string peeraddr = channel->peeraddr();if(channel->isConnected()){printf("connected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd());// send(time) every 3s//当连接成功时创建定时器,周期性向服务器发送消息setInterval(3000,[channel](TimerID timerID){if(channel->isConnected()){if(channel->isWriteComplete()){char str[DATETIME_FMT_BUFLEN]={0};
  12. datetime_t dt =datetime_now();datetime_fmt(&dt, str);
  13. channel->write(str);}}else{killTimer(timerID);}});}else{printf("disconnected to %s! connfd=%d\n", peeraddr.c_str(), channel->fd());}if(cli.isReconnect()){printf("reconnect cnt=%d, delay=%d\n", cli.reconn_setting->cur_retry_cnt, cli.reconn_setting->cur_delay);}};//接收消息回调
  14. cli.onMessage =[](const SocketChannelPtr& channel, Buffer* buf){printf("< %.*s\n",(int)buf->size(),(char*)buf->data());};//连接失败或断开时会重连,重连周期可设置固定、线性、指数等策略#ifTEST_RECONNECT// reconnect: 1,2,4,8,10,10,10...
  15. reconn_setting_t reconn;reconn_setting_init(&reconn);
  16. reconn.min_delay =1000;
  17. reconn.max_delay =10000;
  18. reconn.delay_policy =2;
  19. cli.setReconnect(&reconn);#endif#ifTEST_TLS
  20. cli.withTLS();#endif
  21. cli.start();
  22. std::string str;while(std::getline(std::cin, str)){if(str =="close"){
  23. cli.closesocket();}elseif(str =="start"){
  24. cli.start();}elseif(str =="stop"){
  25. cli.stop();break;}else{if(!cli.isConnected())break;
  26. cli.send(str);}}return0;}

tcp服务端测试

1、TcpServer 继承于 EventLoopThread ,tcp监听 startAccept 使用这个继承的 EventLoopThread 线程。
2、TcpServer 的基类 TcpServerEventLoopTmpl 拥有成员变量 EventLoopThreadPool,接入的tcp客户端分配到 EventLoopThreadPool。
3、新接入tcp客户端会创建 TSocketChannelPtr,存入 std::map<uint32_t, TSocketChannelPtr> channels 成员容器。

启动监听

任意线程->TcpServer::createsocket->Listen->ListenFD->系统调用 listen

监听tcp接入事件

EventLoop::runInLoop->…->TcpServerEventLoopTmpl::startAccept->haccept->hio_accept->hio_add->iowatcher_add_event,把 listenfd 添加到 epoll 监听.

tcp客户端接入

1、 tcp监听所在的 EventLoopThread->EventLoop::run->hloop_run->hloop_process_events->hloop_process_pendings->hio_handle_events->nio_accept(accept)->__accept_cb->hio_accept_cb->TcpServerEventLoopTmpl::onAccept,
2、在分配的 EventLoopThreadPool 线程执行:
runInLoop->EventLoop::run->…->TcpServerEventLoopTmpl::newConnEvent->TcpServerEventLoopTmpl::onConnection,调用自定义的回调函数,tcp新接入和断开时均调用该回调函数。

接收数据

EventLoop::run->hloop_run->hloop_process_events->hloop_process_pendings->hio_handle_events->nio_read->__read_cb->hio_handle_read->hio_read_cb->Channel::on_read->TcpServerEventLoopTmpl::newConnEvent->TcpServerEventLoopTmpl::onMessage,调用自定义接收回调,运行在分配的 EventLoopThreadPool 线程

数据发送

和tcp客户端类似,使用 SocketChannel::write 发送,建议使用分配的 EventLoopThreadPool 线程发送,也可在任意线程发送,线程安全锁 hio_t::write_mutex

详见源码目录:evpp/TcpServer_test.cpp

  1. intTcpServer_Test(){int port =9090;hlog_set_level(LOG_LEVEL_DEBUG);
  2. TcpServer srv;int listenfd = srv.createsocket(port);if(listenfd <0){return-20;}printf("server listen on port %d, listenfd=%d ...\n", port, listenfd);//当有客户端接入和断开时执行回调
  3. srv.onConnection =[](const SocketChannelPtr& channel){
  4. std::string peeraddr = channel->peeraddr();if(channel->isConnected()){printf("%s connected! connfd=%d id=%d tid=%ld\n", peeraddr.c_str(), channel->fd(), channel->id(), currentThreadEventLoop->tid());}else{printf("%s disconnected! connfd=%d id=%d tid=%ld\n", peeraddr.c_str(), channel->fd(), channel->id(), currentThreadEventLoop->tid());}};//收到客户端消息回调
  5. srv.onMessage =[](const SocketChannelPtr& channel, Buffer* buf){// echoprintf("< %.*s\n",(int)buf->size(),(char*)buf->data());
  6. channel->write(buf);};//设置EventLoopThreadPool worker_threads 线程数量
  7. srv.setThreadNum(4);//设置线程分配策略,最小连接数优先
  8. srv.setLoadBalance(LB_LeastConnections);#ifTEST_TLS
  9. hssl_ctx_opt_t ssl_opt;memset(&ssl_opt,0,sizeof(hssl_ctx_opt_t));
  10. ssl_opt.crt_file ="cert/server.crt";
  11. ssl_opt.key_file ="cert/server.key";
  12. ssl_opt.verify_peer =0;
  13. srv.withTLS(&ssl_opt);#endif//启动监听线程和任务线程池
  14. srv.start();
  15. std::string str;while(std::getline(std::cin, str)){if(str =="close"){
  16. srv.closesocket();}elseif(str =="start"){
  17. srv.start();}elseif(str =="stop"){
  18. srv.stop();break;}else{
  19. srv.broadcast(str.data(), str.size());}}return0;}

udp客户端测试

详见源码目录:evpp/UdpClient_test.cpp
UdpClient 构建可传入已有EventLoopThread,不传入时自动创建一个 EventLoopThread。

  1. intUdpClient_Test(){constchar* remote_host ="192.168.3.91";int remote_port =9090;
  2. UdpClient cli;//创建udp客户端,指定远端IP和端口号,本地端口号随机分配int sockfd = cli.createsocket(remote_port, remote_host);if(sockfd <0){return-20;}//接收回调printf("client sendto port %d, sockfd=%d ...\n", remote_port, sockfd);
  3. cli.onMessage =[](const SocketChannelPtr& channel, Buffer* buf){printf("< %.*s\n",(int)buf->size(),(char*)buf->data());};
  4. cli.start();// sendto(time) every 3s//创建定时器,间隔向对端发送消息
  5. cli.loop()->setInterval(3000,[&cli](TimerID timerID){char str[DATETIME_FMT_BUFLEN]={0};
  6. datetime_t dt =datetime_now();datetime_fmt(&dt, str);
  7. cli.sendto(str);});
  8. std::string str;while(std::getline(std::cin, str)){if(str =="close"){
  9. cli.closesocket();}elseif(str =="start"){
  10. cli.start();}elseif(str =="stop"){
  11. cli.stop();break;}else{
  12. cli.sendto(str);}}return0;}

udp服务端测试

详见源码目录:evpp/UdpServer_test.cpp
UdpServer 构建可传入已有EventLoopThread,不传入时自动创建一个 EventLoopThread。

  1. intUdpServer_Test(){int port =9090;
  2. UdpServer srv;//创建udp服务端,绑定IP和端口号int bindfd = srv.createsocket(port);if(bindfd <0){return-20;}printf("server bind on port %d, bindfd=%d ...\n", port, bindfd);//接收回调//UdpServer 不记录接收到消息的历史udp客户端, io->peeraddr 只记录上一次接收到消息的udp客户端//channel->write 是发消息给 io->peeraddr//UdpServer::sendto 默认发给 io->peeraddr ,指定 peeraddr 时是发给指定的udp客户端。
  3. srv.onMessage =[](const SocketChannelPtr& channel, Buffer* buf){// echoprintf("< %.*s\n",(int)buf->size(),(char*)buf->data());
  4. channel->write(buf);};//启动 EventLoopThread 线程
  5. srv.start();
  6. std::string str;while(std::getline(std::cin, str)){if(str =="close"){
  7. srv.closesocket();}elseif(str =="start"){
  8. srv.start();}elseif(str =="stop"){
  9. srv.stop();break;}else{
  10. srv.sendto(str);}}return0;}
标签: c语言 c++ 服务器

本文转载自: https://blog.csdn.net/weixin_40355471/article/details/133853127
版权归原作者 夏天匆匆2过 所有, 如有侵权,请联系我们删除。

“C/C++高性能服务器网络库框架libhv源码解读”的评论:

还没有评论