一.tcp接入线程池(使用线程池)
1.tcp初步接入线程池
我们设置了对应的任务是死循环,那么线程池提供服务,就显得有不太合适。我们给线程池抛入的任务都是短任务
因为他并没有访问任何类内成员,所以可以把执行方法提到类外,执行任务时就可以不用把服务和类绑定了bind(&ServerTcp::transService, this,。②threadRoutine也没用了
代码:
提交了2022-12-23对tcp初步接上线程池的代码 · cf60e99 · beyond/linux - Gitee.com
2.popen
作用:创建管道,再fork创建子进程,子进程调用exec执行命令command,执行完后通过管道将数据交给父进程,父进程再把数据传入返回值中的文件描述符中。使用完把文件pclose关闭即可
FILE *popen(const char *command, const char *type);
command:执行一个命令command。type:r读还是w写方式打开
返回值:底层的pipe/fork调用失败返回nullptr
3.此gitee上的代码后半部分是popen使用:
例如command命令是pwd,执行完command命令,把command命令执行的结果输入到fp文件中,下面fgets(line, sizeof(line)-1, fp) 从fp中逐行(每行长度是sizeof(line)-1)读取数据放入数组line,最后write把line中数据写入sock文件中
FILE *fp = popen(command, "r");
if(fp == nullptr)
{
logMessage(WARINING, "exec %s failed, beacuse: %s", command,
strerror(errno));
break;
}
char line[1024];
while(fgets(line, sizeof(line)-1, fp) != nullptr)
{
write(sock, line, strlen(line));
}
2022-12-25日提交了24日写的:1.tcp接入线程池后并使用popen的写法版本。2.tcp接入线程池后使用popen的写法版本并将服务器改造成守护进程 · 9874ca3 · beyond/linux - Gitee.com
二.守护进程/精灵进程——部署
这两种是同一种进程的不同翻译,是特殊的孤儿进程,不但运行在后台,最主要的是脱离了与终端和登录会话的所有联系,也就是默默的运行在后台不想受到任何影响,并且退出后不会成为僵尸进程。
一般以服务器的方式工作,对外提供服务的服务器,都是以守护进程(精灵进程)的方式在服务器中工作的,一旦启动之后,除非用户主动关闭,否则,一直会在运行
守护进程特点:①父进程是系统。②守护进程命名通常以d结尾
1.介绍PGID,SID等各个名称
(1)介绍
PPID:父进程的id。PID:进程本身的id。PGID:当前进程所属的进程组。(同组进程的第一个进程一般是进程组的组长)SID:当前进程的会话id。
COMMAND:命令。TIME:进程启动的时长。UID:用户名映射后的数字,用户UID。
TPGID:当前进程组和终端相关的信息,-1 说明进程和终端无关系,非-1说明有关。
TTY:哪一个终端
(2)进程组:
2.会话,守护进程/精灵进程定义
用户登录时会建立一个会话,会话内部会构建一个前台进程组 和 0个或者多个后台进程组,linux下客户端登录时 会给我们加载bash,bash就是前台进程组。(windows下的注销就是新建立一个会话)前台进程组必须有一个,而且任何时刻只能有一个。
bash构建了一个会话,自己是会话前台进程组,后续如果我们自己再新启进程 或者 启动进程组,这些进程都属于bash的会话!
在命令行中启动一个进程./mytest 可以称作:在会话中启动一个进程组(可以只有一个进程),来完成某种任务,所有会话内的进程fork创建子进程,一般依旧属于当前会话! !
让某个后台进程(例如我们自己写的网络服务器进程)脱离当前会话,让它独立的在计算机中形成一个新会话,自成进程组,这种进程叫做守护进程/精灵进程。
三. 如何使我们自己写的服务器进程形成守护进程?
1.为什么需要使我们自己写的服务器进程成为守护进程
我们自己写的网络服务器进程会受到计算机上客户端或者shell的影响,即:如果我们的shell掉线了我们写的网络服务器会终止,我们电脑用户注销了,或者电脑关了,我们写的网络服务器进程都会终止,如果我们写的网络服务器成为守护进程:他脱离当前会话,让它独立的在计算机中形成一个新会话,自成进程组,他便不受计算机上客户端会话或者shell的限制,就算我们把电脑关了,shell也关了,这个服务器也会在云端一直提供服务。(这也是抖音随时打开都能刷的原因)想结束守护进程,ps axj|grep 进程名,然后kill -9 即可结束。
2.形成守护进程的核心步骤——setsid
守护进程要编写必须调用一个函数setsid():将调用进程设置成为独立的会话
注意:进程组的组长不可调用setsid();(组长本来就是一个会话的组长)
3.setsid
作用:使调用setsid的进程成为一个新的会话,并成为会话内某个进程组的组长
pid_t setsid(void);
返回值:成功返回调用进程的pid,失败返回-1错误码被设置
4.形成守护进程全步骤
(1)必做:fork+setsid()——让自己不成为进程组组长+设置自己是一个独立的会话
那我如何不成为组长以便调用setsid**呢?**——bash中新启动第一个进程一定成为组长,所以你可以成为进程组内的第二个进程。即:常规做法:fork()子进程,子进程就不再是组长进程了,它就可以成功调用setsid(); ————
if(fork() > 0) exit(0) ;
setsid() ;
(2)选做:
①忽略SIGPIPE信号:
系统的管道:写端一直在写,读端关闭,写端会被终止,被信号终止SIGPIPE。网络套接字同理!
网络的套接字:server向client写入,client (close) 关闭了,server也会收到SIGPIPE,server会直接被终止,为了防止服务器server异常退出,需要 忽略SIGPIPE信号
②更改守护进程的工作目录,如何更改进程的工作目录?——chdir()
(3)一般守护进程都要做的(必做):
- (不常用做法一)因为守护进程与标准输入,标准输出,标准错误已经没关系了,所以close(0, 1,2) 守护进程获取输入或写入都是和网络有关,不会从键盘获取,不会往显示器输出。(很少有人这样做,因为兼容性不好,会导致代码中的打印代码报错)
2.(常用做法二)类似于所有Linux下的一个”垃圾桶(文件黑洞)“,凡是从 /dev/null 里面读/写一概被丢弃
这些输入输出的数据会全部被丢弃:
[whb@ecs-144421 ~]$ echo "hello wrold" >> /dev/null
[whb@ecs-144421 ~]$ echo cat < /dev/null
推荐做法:打开/dev/null, 并且对 0,1,2 进行重定向!
总结:1.忽略SIGPIPE
2.更改进程的工作目录
3.让自己不要成为进程组组长(必做)
4.设置自己是一个独立的会话(必做)
5.重定向0,1,2(必做)
daemonize.hpp文件
5.重定向0,1,2后,可以将打印信息保存入一个文件serverTcp.log中
log.hpp中:
四.总结 将自己的进程守护进程化 的三种方法
1.自己写(严重推荐)
2.用系统daemon
3. nohup . /command &,默认形成日志:nohup. out
1.自己写(严重推荐)
2.daemon
[zsh@ecs-78471 vscode]$ man 3 daemon
int daemon(int nochdir, int noclose); 让一个进程运行到后端
nochdir:是否更改当前路径。 noclose:是否关闭标注输入输出错误。
3.命令 nohup . /a.out &,默认形成日志:nohup. out
hello.cpp
#include<iostream>
using namespace std;
#include<unistd.h>
int main()
{
while(true)
{
cout<<"hello"<<endl;
sleep(1);
}
return 0;
}
nohup 不要挂起阻塞,nohup . /a.out &后,下面的nohup. out一直在变大。
[zsh@ecs-78471 12-25-nohup]$ ll
total 4
-rw-rw-r-- 1 zsh zsh 162 Dec 25 10:16 hello.cpp
[zsh@ecs-78471 12-25-nohup]$ g++ hello.cpp
[zsh@ecs-78471 12-25-nohup]$ ll
total 16
-rwxrwxr-x 1 zsh zsh 9024 Dec 25 10:17 a.out
-rw-rw-r-- 1 zsh zsh 162 Dec 25 10:16 hello.cpp
[zsh@ecs-78471 12-25-nohup]$ nohup ./a.out &
[1] 1554
[zsh@ecs-78471 12-25-nohup]$ nohup: ignoring input and appending output to ‘nohup.out’
[zsh@ecs-78471 12-25-nohup]$ll
total 20
-rwxrwxr-x 1 zsh zsh 9024 Dec 25 10:17 a.out
-rw-rw-r-- 1 zsh zsh 162 Dec 25 10:16 hello.cpp
-rw------- 1 zsh zsh 48 Dec 25 10:17 nohup.out
[zsh@ecs-78471 12-25-nohup]$ cat nohup.out
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
[zsh@ecs-78471 12-25-nohup]$ ll
total 20
-rwxrwxr-x 1 zsh zsh 9024 Dec 25 10:17 a.out
-rw-rw-r-- 1 zsh zsh 162 Dec 25 10:16 hello.cpp
-rw------- 1 zsh zsh 180 Dec 25 10:18 nohup.out
[zsh@ecs-78471 12-25-nohup]$ ll
total 20
-rwxrwxr-x 1 zsh zsh 9024 Dec 25 10:17 a.out
-rw-rw-r-- 1 zsh zsh 162 Dec 25 10:16 hello.cpp
-rw------- 1 zsh zsh 192 Dec 25 10:18 nohup.out
[zsh@ecs-78471 12-25-nohup]$ ll
total 20
-rwxrwxr-x 1 zsh zsh 9024 Dec 25 10:17 a.out
-rw-rw-r-- 1 zsh zsh 162 Dec 25 10:16 hello.cpp
-rw------- 1 zsh zsh 216 Dec 25 10:18 nohup.out
[zsh@ecs-78471 12-25-nohup]$ ll
total 20
-rwxrwxr-x 1 zsh zsh 9024 Dec 25 10:17 a.out
-rw-rw-r-- 1 zsh zsh 162 Dec 25 10:16 hello.cpp
-rw------- 1 zsh zsh 324 Dec 25 10:18 nohup.out
[zsh@ecs-78471 12-25-nohup]$ ps ajx|head -1&&ps ajx|grep a.out
PPID PID PGID SID TTY TPGID STAT UID TIME COMMAND
411 1554 1554 411 pts/0 1565 S 1002 0:00 ./a.out
411 1566 1565 411 pts/0 1565 R+ 1002 0:00 grep --color=auto a.out
并且如果我们ctrl+d或者logout退出登录,再登录,会发现之前的会话411里面的东西都没了,只剩下a.out了,a.out虽然依旧是非进程组组长,但他已经成为了独立会话的守护进程。
[zsh@ecs-78471 12-25-nohup]$ ps ajx|head -1&&ps ajx|grep a.out
PPID PID PGID SID TTY TPGID STAT UID TIME COMMAND
411 1581 1581 411 pts/0 1585 S 1002 0:00 ./a.out
411 1586 1585 411 pts/0 1585 R+ 1002 0:00 grep --color=auto a.out
[zsh@ecs-78471 12-25-nohup]$ logout
Connection closed.
Disconnected from remote host(我的学习机) at 10:22:34.
Type `help' to learn how to use Xshell prompt.
[C:\~]$
Connecting to 124.71.81.109:22...
Connection established.
To escape to local shell, press Ctrl+Alt+].
WARNING! The remote SSH server rejected X11 forwarding request.
Last login: Sun Dec 25 08:17:16 2022 from 110.241.215.215
Welcome to Huawei Cloud Service
[zsh@ecs-78471 ~]$ ps ajx|head -1&&ps ajx|grep a.out
PPID PID PGID SID TTY TPGID STAT UID TIME COMMAND
1 1581 1581 411 ? -1 S 1002 0:00 ./a.out
1590 1616 1615 1590 pts/0 1615 R+ 1002 0:00 grep --color=auto a.out
五.三次握手,四次挥手
(1)tcp是面向链接的,client进行通信前要connect,server进行通信前要accept
(2)tcp在建立连接的时候,采用的是三次握手,在断开链接的时候,采用的是四次挥手!
(3)connect发起三次握手(只需要client进行connect,server不用做) ,但是client 和 server 都要close(),一次close()执行4次挥手中的2次,client 和 server各执行一次close(),总共是4次挥手
(4)什么是3次握手?什么是4次挥手?
为什么是3次?为什么是4次?这个问题后面再说
六.代码合集
代码合集:tcp接入线程池后使用popen的写法版本并将服务器改造成守护进程:
daemonize.hpp
#pragma once
#include <cstdio>
#include <iostream>
#include <signal.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
void daemonize()
{
int fd = 0;
// 1. 忽略SIGPIPE
signal(SIGPIPE, SIG_IGN);
// 2. 更改进程的工作目录
// chdir();
// 3. 让自己不要成为进程组组长
if (fork() > 0)
exit(1);
// 4. 设置自己是一个独立的会话
setsid();
// 5. 重定向0,1,2
if ((fd = open("/dev/null", O_RDWR)) != -1) // fd == 3
{
dup2(fd, STDIN_FILENO);
dup2(fd, STDOUT_FILENO);
dup2(fd, STDERR_FILENO);
// 6. 关闭掉不需要的fd
if(fd > STDERR_FILENO) close(fd);
}
}
serverTcp.cc
#include "util.hpp"
#include "Task.hpp"
#include "ThreadPool.hpp"
#include "daemonize.hpp"
#include <signal.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <pthread.h>
class ServerTcp; // 申明一下ServerTcp
// 大小写转化服务
// TCP && UDP: 支持全双工
void transService(int sock, const std::string &clientIp, uint16_t clientPort)
{
assert(sock >= 0);
assert(!clientIp.empty());
assert(clientPort >= 1024);
char inbuffer[BUFFER_SIZE];
while (true)
{
ssize_t s = read(sock, inbuffer, sizeof(inbuffer) - 1); //我们认为我们读到的都是字符串
if (s > 0)
{
// read success
inbuffer[s] = '\0';
if (strcasecmp(inbuffer, "quit") == 0)
{
logMessage(DEBUG, "client quit -- %s[%d]", clientIp.c_str(), clientPort);
break;
}
logMessage(DEBUG, "trans before: %s[%d]>>> %s", clientIp.c_str(), clientPort, inbuffer);
// 可以进行大小写转化了
for (int i = 0; i < s; i++)
{
if (isalpha(inbuffer[i]) && islower(inbuffer[i]))
inbuffer[i] = toupper(inbuffer[i]);
}
logMessage(DEBUG, "trans after: %s[%d]>>> %s", clientIp.c_str(), clientPort, inbuffer);
write(sock, inbuffer, strlen(inbuffer));
}
else if (s == 0)
{
// pipe: 读端一直在读,写端不写了,并且关闭了写端,读端会如何?s == 0,代表对端关闭
// s == 0: 代表对方关闭,client 退出
logMessage(DEBUG, "client quit -- %s[%d]", clientIp.c_str(), clientPort);
break;
}
else
{
logMessage(DEBUG, "%s[%d] - read: %s", clientIp.c_str(), clientPort, strerror(errno));
break;
}
}
// 只要走到这里,一定是client退出了,服务到此结束
close(sock); // 如果一个进程对应的文件fd,打开了没有被归还,文件描述符泄漏!
logMessage(DEBUG, "server close %d done", sock);
}
void execCommand(int sock, const std::string &clientIp, uint16_t clientPort)
{
assert(sock >= 0);
assert(!clientIp.empty());
assert(clientPort >= 1024);
char command[BUFFER_SIZE];
while (true)
{
ssize_t s = read(sock, command, sizeof(command) - 1); //我们认为我们读到的都是字符串
if (s > 0)
{
command[s] = '\0';
logMessage(DEBUG, "[%s:%d] exec [%s]", clientIp.c_str(), clientPort, command);
// 考虑安全
std::string safe = command;
if((std::string::npos != safe.find("rm")) || (std::string::npos != safe.find("unlink")))
{
break;
}
// 我们是以r方式打开的文件,没有写入
// 所以我们无法通过dup的方式得到对应的结果
FILE *fp = popen(command, "r");
if(fp == nullptr)
{
logMessage(WARINING, "exec %s failed, beacuse: %s", command, strerror(errno));
break;
}
char line[1024];
while(fgets(line, sizeof(line)-1, fp) != nullptr)
{
write(sock, line, strlen(line));
}
// dup2(fd, 1);
// dup2(sock, fp->_fileno);
// fflush(fp);
pclose(fp);
logMessage(DEBUG, "[%s:%d] exec [%s] ... done", clientIp.c_str(), clientPort, command);
}
else if (s == 0)
{
// pipe: 读端一直在读,写端不写了,并且关闭了写端,读端会如何?s == 0,代表对端关闭
// s == 0: 代表对方关闭,client 退出
logMessage(DEBUG, "client quit -- %s[%d]", clientIp.c_str(), clientPort);
break;
}
else
{
logMessage(DEBUG, "%s[%d] - read: %s", clientIp.c_str(), clientPort, strerror(errno));
break;
}
}
// 只要走到这里,一定是client退出了,服务到此结束
close(sock); // 如果一个进程对应的文件fd,打开了没有被归还,文件描述符泄漏!
logMessage(DEBUG, "server close %d done", sock);
}
class ThreadData
{
public:
uint16_t clientPort_;
std::string clinetIp_;
int sock_;
ServerTcp *this_;
public:
ThreadData(uint16_t port, std::string ip, int sock, ServerTcp *ts)
: clientPort_(port), clinetIp_(ip), sock_(sock), this_(ts)
{
}
};
class ServerTcp
{
public:
ServerTcp(uint16_t port, const std::string &ip = "")
: port_(port),
ip_(ip),
listenSock_(-1),
tp_(nullptr)
{
}
~ServerTcp()
{
}
public:
void init()
{
// 1. 创建socket
listenSock_ = socket(PF_INET, SOCK_STREAM, 0);
if (listenSock_ < 0)
{
logMessage(FATAL, "socket: %s", strerror(errno));
exit(SOCKET_ERR);
}
logMessage(DEBUG, "socket: %s, %d", strerror(errno), listenSock_);
// 2. bind绑定
// 2.1 填充服务器信息
struct sockaddr_in local; // 用户栈
memset(&local, 0, sizeof local);
local.sin_family = PF_INET;
local.sin_port = htons(port_);
ip_.empty() ? (local.sin_addr.s_addr = INADDR_ANY) : (inet_aton(ip_.c_str(), &local.sin_addr));
// 2.2 本地socket信息,写入sock_对应的内核区域
if (bind(listenSock_, (const struct sockaddr *)&local, sizeof local) < 0)
{
logMessage(FATAL, "bind: %s", strerror(errno));
exit(BIND_ERR);
}
logMessage(DEBUG, "bind: %s, %d", strerror(errno), listenSock_);
// 3. 监听socket,为何要监听呢?tcp是面向连接的!
if (listen(listenSock_, 5 /*后面再说*/) < 0)
{
logMessage(FATAL, "listen: %s", strerror(errno));
exit(LISTEN_ERR);
}
logMessage(DEBUG, "listen: %s, %d", strerror(errno), listenSock_);
// 运行别人来连接你了
// 4. 加载线程池
tp_ = ThreadPool<Task>::getInstance();
}
// static void *threadRoutine(void *args)
// {
// pthread_detach(pthread_self()); //设置线程分离
// ThreadData *td = static_cast<ThreadData *>(args);
// td->this_->transService(td->sock_, td->clinetIp_, td->clientPort_);
// delete td;
// return nullptr;
// }
void loop()
{
// signal(SIGCHLD, SIG_IGN); // only Linux
tp_->start();
logMessage(DEBUG, "thread pool start success, thread num: %d", tp_->threadNum());
while (true)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
// 4. 获取连接, accept 的返回值是一个新的socket fd ??
// 4.1 listenSock_: 监听 && 获取新的链接-> sock
// 4.2 serviceSock: 给用户提供新的socket服务
int serviceSock = accept(listenSock_, (struct sockaddr *)&peer, &len);
if (serviceSock < 0)
{
// 获取链接失败
logMessage(WARINING, "accept: %s[%d]", strerror(errno), serviceSock);
continue;
}
// 4.1 获取客户端基本信息
uint16_t peerPort = ntohs(peer.sin_port);
std::string peerIp = inet_ntoa(peer.sin_addr);
logMessage(DEBUG, "accept: %s | %s[%d], socket fd: %d",
strerror(errno), peerIp.c_str(), peerPort, serviceSock);
// 5 提供服务, echo -> 小写 -> 大写
// 5.0 v0 版本 -- 单进程 -- 一旦进入transService,主执行流,就无法进行向后执行,
只能提供完毕服务之后才能进行accept
// transService(serviceSock, peerIp, peerPort);
// 5.1 v1 版本 -- 多进程版本 -- 父进程打开的文件会被子进程继承吗?会的
// pid_t id = fork();
// assert(id != -1);
// if(id == 0)
// {
// close(listenSock_); //建议
// //子进程
// transService(serviceSock, peerIp, peerPort);
// exit(0); // 进入僵尸
// }
// // 父进程
// close(serviceSock); //这一步是一定要做的!
// 5.1 v1.1 版本 -- 多进程版本 -- 也是可以的
// 爷爷进程
// pid_t id = fork();
// if(id == 0)
// {
// // 爸爸进程
// close(listenSock_);//建议
// // 又进行了一次fork,让 爸爸进程
// if(fork() > 0) exit(0);
// // 孙子进程 -- 就没有爸爸 -- 孤儿进程 -- 被系统领养 -- 回收问题就交给了系统来回收
// transService(serviceSock, peerIp, peerPort);
// exit(0);
// }
// // 父进程
// close(serviceSock); //这一步是一定要做的!
// // 爸爸进程直接终止,立马得到退出码,释放僵尸进程状态
// pid_t ret = waitpid(id, nullptr, 0); //就用阻塞式
// assert(ret > 0);
// (void)ret;
// 5.2 v2 版本 -- 多线程
// 这里不需要进行关闭文件描述符吗??不需要啦
// 多线程是会共享文件描述符表的!
// ThreadData *td = new ThreadData(peerPort, peerIp, serviceSock, this);
// pthread_t tid;
// pthread_create(&tid, nullptr, threadRoutine, (void*)td);
// 5.3 v3 版本 --- 线程池版本
// 5.3.1 构建任务
// 5.3 v3.1
// Task t(serviceSock, peerIp, peerPort, std::bind(&ServerTcp::transService,
this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
// tp_->push(t);
// 5.3 v3.2
// Task t(serviceSock, peerIp, peerPort, transService);
// tp_->push(t);
// 5.3 v3.3
Task t(serviceSock, peerIp, peerPort, execCommand);
tp_->push(t);
// waitpid(); 默认是阻塞等待!WNOHANG
// 方案1
// logMessage(DEBUG, "server 提供 service start ...");
// sleep(1);
}
}
private:
// sock
int listenSock_;
// port
uint16_t port_;
// ip
std::string ip_;
// 引入线程池
ThreadPool<Task> *tp_;
};
static void Usage(std::string proc)
{
std::cerr << "Usage:\n\t" << proc << " port ip" << std::endl;
std::cerr << "example:\n\t" << proc << " 8080 127.0.0.1\n"
<< std::endl;
}
// ./ServerTcp local_port local_ip
int main(int argc, char *argv[])
{
if (argc != 2 && argc != 3)
{
Usage(argv[0]);
exit(USAGE_ERR);
}
uint16_t port = atoi(argv[1]);
std::string ip;
if (argc == 3)
ip = argv[2];
daemonize(); // 我们的进程就会成为守护进程
ServerTcp svr(port, ip);
svr.init();
svr.loop();
return 0;
}
clientTcp.cc
#include "util.hpp"
// 2. 需要bind吗??需要,但是不需要自己显示的bind! 不要自己bind!!!!
// 3. 需要listen吗?不需要的!
// 4. 需要accept吗?不需要的!
volatile bool quit = false;
static void Usage(std::string proc)
{
std::cerr << "Usage:\n\t" << proc << " serverIp serverPort" << std::endl;
std::cerr << "Example:\n\t" << proc << " 127.0.0.1 8081\n"
<< std::endl;
}
// ./clientTcp serverIp serverPort
int main(int argc, char *argv[])
{
if (argc != 3)
{
Usage(argv[0]);
exit(USAGE_ERR);
}
std::string serverIp = argv[1];
uint16_t serverPort = atoi(argv[2]);
// 1. 创建socket SOCK_STREAM
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0)
{
std::cerr << "socket: " << strerror(errno) << std::endl;
exit(SOCKET_ERR);
}
// 2. connect,发起链接请求,你想谁发起请求呢??当然是向服务器发起请求喽
// 2.1 先填充需要连接的远端主机的基本信息
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(serverPort);
inet_aton(serverIp.c_str(), &server.sin_addr);
// 2.2 发起请求,connect 会自动帮我们进行bind!
if (connect(sock, (const struct sockaddr *)&server, sizeof(server)) != 0)
{
std::cerr << "connect: " << strerror(errno) << std::endl;
exit(CONN_ERR);
}
std::cout << "info : connect success: " << sock << std::endl;
std::string message;
while (!quit)
{
message.clear();
std::cout << "请输入你的消息>>> ";
std::getline(std::cin, message); // 结尾不会有\n
if (strcasecmp(message.c_str(), "quit") == 0)
quit = true;
ssize_t s = write(sock, message.c_str(), message.size());
if (s > 0)
{
message.resize(1024);
ssize_t s = read(sock, (char *)(message.c_str()), 1024);
if (s > 0)
message[s] = 0;
std::cout << "Server Echo>>> " << message << std::endl;
}
else if (s <= 0)
{
break;
}
}
close(sock);
return 0;
}
ThreadPool.hpp 线程池文件
#pragma once
#include <iostream>
#include <cassert>
#include <queue>
#include <memory>
#include <cstdlib>
#include <pthread.h>
#include <unistd.h>
#include <sys/prctl.h>
#include "Lock.hpp"
using namespace std;
int gThreadNum = 15;
template <class T>
class ThreadPool
{
private:
ThreadPool(int threadNum = gThreadNum) : threadNum_(threadNum), isStart_(false)
{
assert(threadNum_ > 0);
pthread_mutex_init(&mutex_, nullptr);
pthread_cond_init(&cond_, nullptr);
}
ThreadPool(const ThreadPool<T> &) = delete;
void operator=(const ThreadPool<T>&) = delete;
public:
static ThreadPool<T> *getInstance()
{
static Mutex mutex;
if (nullptr == instance) //仅仅是过滤重复的判断
{
LockGuard lockguard(&mutex); //进入代码块,加锁。退出代码块,自动解锁
if (nullptr == instance)
{
instance = new ThreadPool<T>();
}
}
return instance;
}
//类内成员, 成员函数,都有默认参数this
static void *threadRoutine(void *args)
{
pthread_detach(pthread_self());
ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
// prctl(PR_SET_NAME, "follower"); // 更改线程名称
while (1)
{
tp->lockQueue();
while (!tp->haveTask())
{
tp->waitForTask();
}
//这个任务就被拿到了线程的上下文中
T t = tp->pop();
tp->unlockQueue();
t(); // 让指定的先处理这个任务
}
}
void start()
{
assert(!isStart_);
for (int i = 0; i < threadNum_; i++)
{
pthread_t temp;
pthread_create(&temp, nullptr, threadRoutine, this);
}
isStart_ = true;
}
void push(const T &in)
{
lockQueue();
taskQueue_.push(in);
choiceThreadForHandler();
unlockQueue();
}
~ThreadPool()
{
pthread_mutex_destroy(&mutex_);
pthread_cond_destroy(&cond_);
}
int threadNum()
{
return threadNum_;
}
private:
void lockQueue() { pthread_mutex_lock(&mutex_); }
void unlockQueue() { pthread_mutex_unlock(&mutex_); }
bool haveTask() { return !taskQueue_.empty(); }
void waitForTask() { pthread_cond_wait(&cond_, &mutex_); }
void choiceThreadForHandler() { pthread_cond_signal(&cond_); }
T pop()
{
T temp = taskQueue_.front();
taskQueue_.pop();
return temp;
}
private:
bool isStart_;
int threadNum_;
queue<T> taskQueue_;
pthread_mutex_t mutex_;
pthread_cond_t cond_;
static ThreadPool<T> *instance;
// const static int a = 100;
};
template <class T>
ThreadPool<T> *ThreadPool<T>::instance = nullptr;
log.hpp
#pragma once
#include <cstdio>
#include <ctime>
#include <cstdarg>
#include <cassert>
#include <cassert>
#include <cstring>
#include <cerrno>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#define DEBUG 0
#define NOTICE 1
#define WARINING 2
#define FATAL 3
const char *log_level[] = {"DEBUG", "NOTICE", "WARINING", "FATAL"};
#define LOGFILE "serverTcp.log"
// logMessage(DEBUG, "%d", 10);
void logMessage(int level, const char *format, ...)
{
assert(level >= DEBUG);
assert(level <= FATAL);
char *name = getenv("USER");
char logInfo[1024];
va_list ap; // ap -> char*
va_start(ap, format);
vsnprintf(logInfo, sizeof(logInfo) - 1, format, ap);
va_end(ap); // ap = NULL
// 每次打开太麻烦
umask(0);
int fd = open(LOGFILE, O_WRONLY | O_CREAT | O_APPEND, 0666);
assert(fd >= 0);
FILE *out = (level == FATAL) ? stderr : stdout;
dup2(fd, 1);
dup2(fd, 2);
fprintf(out, "%s | %u | %s | %s\n",
log_level[level],
(unsigned int)time(nullptr),
name == nullptr ? "unknow" : name,
logInfo);
fflush(out); // 将C缓冲区中的数据刷新到OS
fsync(fd); // 将OS中的数据尽快刷盘
close(fd);
// char *s = format;
// while(s){
// case '%':
// if(*(s+1) == 'd') int x = va_arg(ap, int);
// break;
// }
}
Task.hpp
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <pthread.h>
#include "log.hpp"
class Task
{
public:
//等价于
// typedef std::function<void (int, std::string, uint16_t)> callback_t;
using callback_t = std::function<void (int, std::string, uint16_t)>;
private:
int sock_; // 给用户提供IO服务的sock
uint16_t port_; // client port
std::string ip_; // client ip
callback_t func_; // 回调方法
public:
Task():sock_(-1), port_(-1)
{}
Task(int sock, std::string ip, uint16_t port, callback_t func)
: sock_(sock), ip_(ip), port_(port), func_(func)
{}
void operator () ()
{
logMessage(DEBUG, "线程ID[%p]处理%s:%d的请求 开始啦...",\
pthread_self(), ip_.c_str(), port_);
func_(sock_, ip_, port_);
logMessage(DEBUG, "线程ID[%p]处理%s:%d的请求 结束啦...",\
pthread_self(), ip_.c_str(), port_);
}
~Task()
{}
};
Lock.hpp
#pragma once
#include <iostream>
#include <pthread.h>
class Mutex
{
public:
Mutex()
{
pthread_mutex_init(&lock_, nullptr);
}
void lock()
{
pthread_mutex_lock(&lock_);
}
void unlock()
{
pthread_mutex_unlock(&lock_);
}
~Mutex()
{
pthread_mutex_destroy(&lock_);
}
private:
pthread_mutex_t lock_;
};
class LockGuard
{
public:
LockGuard(Mutex *mutex) : mutex_(mutex)
{
mutex_->lock();
std::cout << "加锁成功..." << std::endl;
}
~LockGuard()
{
mutex_->unlock();
std::cout << "解锁成功...." << std::endl;
}
private:
Mutex *mutex_;
};
Makefile
.PHONY:all
all:clientTcp serverTcpd
clientTcp: clientTcp.cc
g++ -o $@ $^ -std=c++11
serverTcpd:serverTcp.cc
g++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:
rm -f serverTcpd clientTcp
util.hpp
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <cstdlib>
#include <cassert>
#include <ctype.h>
#include <unistd.h>
#include <strings.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "log.hpp"
#define SOCKET_ERR 1
#define BIND_ERR 2
#define LISTEN_ERR 3
#define USAGE_ERR 4
#define CONN_ERR 5
#define BUFFER_SIZE 1024
版权归原作者 beyond.myself 所有, 如有侵权,请联系我们删除。