0


linux篇【12】:网络套接字<后序>—tcp接入线程池并改为守护进程

一.tcp接入线程池(使用线程池)

1.tcp初步接入线程池

我们设置了对应的任务是死循环,那么线程池提供服务,就显得有不太合适。我们给线程池抛入的任务都是短任务

因为他并没有访问任何类内成员,所以可以把执行方法提到类外,执行任务时就可以不用把服务和类绑定了bind(&ServerTcp::transService, this,。②threadRoutine也没用了

代码:

提交了2022-12-23对tcp初步接上线程池的代码 · cf60e99 · beyond/linux - Gitee.com

2.popen

作用:创建管道,再fork创建子进程,子进程调用exec执行命令command,执行完后通过管道将数据交给父进程,父进程再把数据传入返回值中的文件描述符中。使用完把文件pclose关闭即可

FILE *popen(const char *command, const char *type);

command:执行一个命令command。type:r读还是w写方式打开

返回值:底层的pipe/fork调用失败返回nullptr

3.此gitee上的代码后半部分是popen使用:

例如command命令是pwd,执行完command命令,把command命令执行的结果输入到fp文件中,下面fgets(line, sizeof(line)-1, fp) 从fp中逐行(每行长度是sizeof(line)-1)读取数据放入数组line,最后write把line中数据写入sock文件中

            FILE *fp = popen(command, "r");
            if(fp == nullptr)
            {
                logMessage(WARINING, "exec %s failed, beacuse: %s", command, 
                                                            strerror(errno));
                break;
            }
            char line[1024];
            while(fgets(line, sizeof(line)-1, fp) != nullptr)
            {
                write(sock, line, strlen(line));
            }

2022-12-25日提交了24日写的:1.tcp接入线程池后并使用popen的写法版本。2.tcp接入线程池后使用popen的写法版本并将服务器改造成守护进程 · 9874ca3 · beyond/linux - Gitee.com

二.守护进程/精灵进程——部署

这两种是同一种进程的不同翻译,是特殊的孤儿进程,不但运行在后台,最主要的是脱离了与终端和登录会话的所有联系,也就是默默的运行在后台不想受到任何影响,并且退出后不会成为僵尸进程。

一般以服务器的方式工作,对外提供服务的服务器,都是以守护进程(精灵进程)的方式在服务器中工作的,一旦启动之后,除非用户主动关闭,否则,一直会在运行
守护进程特点:①父进程是系统。②守护进程命名通常以d结尾

1.介绍PGID,SID等各个名称

(1)介绍

PPID:父进程的id。PID:进程本身的id。PGID:当前进程所属的进程组。(同组进程的第一个进程一般是进程组的组长)SID:当前进程的会话id。

COMMAND:命令。TIME:进程启动的时长。UID:用户名映射后的数字,用户UID。

TPGID:当前进程组和终端相关的信息,-1 说明进程和终端无关系,非-1说明有关。

TTY:哪一个终端

(2)进程组:

2.会话,守护进程/精灵进程定义

用户登录时会建立一个会话,会话内部会构建一个前台进程组 和 0个或者多个后台进程组,linux下客户端登录时 会给我们加载bash,bash就是前台进程组。(windows下的注销就是新建立一个会话)前台进程组必须有一个,而且任何时刻只能有一个。

bash构建了一个会话,自己是会话前台进程组,后续如果我们自己再新启进程 或者 启动进程组,这些进程都属于bash的会话!

在命令行中启动一个进程./mytest 可以称作:在会话中启动一个进程组(可以只有一个进程),来完成某种任务,所有会话内的进程fork创建子进程,一般依旧属于当前会话! !
让某个后台进程(例如我们自己写的网络服务器进程)脱离当前会话,让它独立的在计算机中形成一个新会话,自成进程组,这种进程叫做守护进程/精灵进程。

三. 如何使我们自己写的服务器进程形成守护进程?

1.为什么需要使我们自己写的服务器进程成为守护进程

我们自己写的网络服务器进程会受到计算机上客户端或者shell的影响,即:如果我们的shell掉线了我们写的网络服务器会终止,我们电脑用户注销了,或者电脑关了,我们写的网络服务器进程都会终止,如果我们写的网络服务器成为守护进程:他脱离当前会话,让它独立的在计算机中形成一个新会话,自成进程组,他便不受计算机上客户端会话或者shell的限制,就算我们把电脑关了,shell也关了,这个服务器也会在云端一直提供服务。(这也是抖音随时打开都能刷的原因)想结束守护进程,ps axj|grep 进程名,然后kill -9 即可结束。

2.形成守护进程的核心步骤——setsid

守护进程要编写必须调用一个函数setsid():将调用进程设置成为独立的会话
注意:进程组的组长不可调用setsid();(组长本来就是一个会话的组长)

3.setsid

作用:使调用setsid的进程成为一个新的会话,并成为会话内某个进程组的组长

pid_t setsid(void);

返回值:成功返回调用进程的pid,失败返回-1错误码被设置

4.形成守护进程全步骤

(1)必做:fork+setsid()——让自己不成为进程组组长+设置自己是一个独立的会话

那我如何不成为组长以便调用setsid**呢?**——bash中新启动第一个进程一定成为组长,所以你可以成为进程组内的第二个进程。即:常规做法:fork()子进程,子进程就不再是组长进程了,它就可以成功调用setsid(); ————

if(fork() > 0) exit(0) ;
setsid() ;

(2)选做:

①忽略SIGPIPE信号:

系统的管道:写端一直在写,读端关闭,写端会被终止,被信号终止SIGPIPE。网络套接字同理!
网络的套接字:server向client写入,client (close) 关闭了,server也会收到SIGPIPE,server会直接被终止,为了防止服务器server异常退出,需要 忽略SIGPIPE信号

②更改守护进程的工作目录,如何更改进程的工作目录?——chdir()

(3)一般守护进程都要做的(必做):

  1. (不常用做法一)因为守护进程与标准输入,标准输出,标准错误已经没关系了,所以close(0, 1,2) 守护进程获取输入或写入都是和网络有关,不会从键盘获取,不会往显示器输出。(很少有人这样做,因为兼容性不好,会导致代码中的打印代码报错)

2.(常用做法二)类似于所有Linux下的一个”垃圾桶(文件黑洞)“,凡是从 /dev/null 里面读/写一概被丢弃

这些输入输出的数据会全部被丢弃:
[whb@ecs-144421 ~]$ echo "hello wrold" >> /dev/null
[whb@ecs-144421 ~]$ echo cat < /dev/null            

推荐做法:打开/dev/null, 并且对 0,1,2 进行重定向!

总结:1.忽略SIGPIPE
2.更改进程的工作目录
3.让自己不要成为进程组组长(必做)
4.设置自己是一个独立的会话(必做)
5.重定向0,1,2(必做)

daemonize.hpp文件

5.重定向0,1,2后,可以将打印信息保存入一个文件serverTcp.log中

log.hpp中:

四.总结 将自己的进程守护进程化 的三种方法

1.自己写(严重推荐)
2.用系统daemon
3. nohup . /command &,默认形成日志:nohup. out

1.自己写(严重推荐)

2.daemon

[zsh@ecs-78471 vscode]$ man 3 daemon

int daemon(int nochdir, int noclose); 让一个进程运行到后端

nochdir:是否更改当前路径。 noclose:是否关闭标注输入输出错误。

3.命令 nohup . /a.out &,默认形成日志:nohup. out

hello.cpp

#include<iostream>
using namespace std;
#include<unistd.h>
int main()
{
    while(true)
    {
        cout<<"hello"<<endl;
        sleep(1);
    }
    return 0;
}

nohup 不要挂起阻塞,nohup . /a.out &后,下面的nohup. out一直在变大。

[zsh@ecs-78471 12-25-nohup]$ ll
total 4
-rw-rw-r-- 1 zsh zsh 162 Dec 25 10:16 hello.cpp
[zsh@ecs-78471 12-25-nohup]$ g++ hello.cpp
[zsh@ecs-78471 12-25-nohup]$ ll
total 16
-rwxrwxr-x 1 zsh zsh 9024 Dec 25 10:17 a.out
-rw-rw-r-- 1 zsh zsh  162 Dec 25 10:16 hello.cpp
[zsh@ecs-78471 12-25-nohup]$ nohup ./a.out &
[1] 1554
[zsh@ecs-78471 12-25-nohup]$ nohup: ignoring input and appending output to ‘nohup.out’

[zsh@ecs-78471 12-25-nohup]$ll
total 20
-rwxrwxr-x 1 zsh zsh 9024 Dec 25 10:17 a.out
-rw-rw-r-- 1 zsh zsh  162 Dec 25 10:16 hello.cpp
-rw------- 1 zsh zsh   48 Dec 25 10:17 nohup.out
[zsh@ecs-78471 12-25-nohup]$ cat nohup.out
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
hello
[zsh@ecs-78471 12-25-nohup]$ ll
total 20
-rwxrwxr-x 1 zsh zsh 9024 Dec 25 10:17 a.out
-rw-rw-r-- 1 zsh zsh  162 Dec 25 10:16 hello.cpp
-rw------- 1 zsh zsh  180 Dec 25 10:18 nohup.out
[zsh@ecs-78471 12-25-nohup]$ ll
total 20
-rwxrwxr-x 1 zsh zsh 9024 Dec 25 10:17 a.out
-rw-rw-r-- 1 zsh zsh  162 Dec 25 10:16 hello.cpp
-rw------- 1 zsh zsh  192 Dec 25 10:18 nohup.out
[zsh@ecs-78471 12-25-nohup]$ ll
total 20
-rwxrwxr-x 1 zsh zsh 9024 Dec 25 10:17 a.out
-rw-rw-r-- 1 zsh zsh  162 Dec 25 10:16 hello.cpp
-rw------- 1 zsh zsh  216 Dec 25 10:18 nohup.out
[zsh@ecs-78471 12-25-nohup]$ ll
total 20
-rwxrwxr-x 1 zsh zsh 9024 Dec 25 10:17 a.out
-rw-rw-r-- 1 zsh zsh  162 Dec 25 10:16 hello.cpp
-rw------- 1 zsh zsh  324 Dec 25 10:18 nohup.out
[zsh@ecs-78471 12-25-nohup]$ ps ajx|head -1&&ps ajx|grep a.out
 PPID   PID  PGID   SID TTY      TPGID STAT   UID   TIME COMMAND
  411  1554  1554   411 pts/0     1565 S     1002   0:00 ./a.out
  411  1566  1565   411 pts/0     1565 R+    1002   0:00 grep --color=auto a.out

并且如果我们ctrl+d或者logout退出登录,再登录,会发现之前的会话411里面的东西都没了,只剩下a.out了,a.out虽然依旧是非进程组组长,但他已经成为了独立会话的守护进程。

[zsh@ecs-78471 12-25-nohup]$ ps ajx|head -1&&ps ajx|grep a.out
 PPID   PID  PGID   SID TTY      TPGID STAT   UID   TIME COMMAND
  411  1581  1581   411 pts/0     1585 S     1002   0:00 ./a.out
  411  1586  1585   411 pts/0     1585 R+    1002   0:00 grep --color=auto a.out
[zsh@ecs-78471 12-25-nohup]$ logout

Connection closed.

Disconnected from remote host(我的学习机) at 10:22:34.

Type `help' to learn how to use Xshell prompt.
[C:\~]$ 

Connecting to 124.71.81.109:22...
Connection established.
To escape to local shell, press Ctrl+Alt+].

WARNING! The remote SSH server rejected X11 forwarding request.
Last login: Sun Dec 25 08:17:16 2022 from 110.241.215.215
    
    Welcome to Huawei Cloud Service

[zsh@ecs-78471 ~]$ ps ajx|head -1&&ps ajx|grep a.out
 PPID   PID  PGID   SID TTY      TPGID STAT   UID   TIME COMMAND
    1  1581  1581   411 ?           -1 S     1002   0:00 ./a.out
 1590  1616  1615  1590 pts/0     1615 R+    1002   0:00 grep --color=auto a.out

五.三次握手,四次挥手

(1)tcp是面向链接的,client进行通信前要connect,server进行通信前要accept
(2)tcp在建立连接的时候,采用的是三次握手,在断开链接的时候,采用的是四次挥手!
(3)connect发起三次握手(只需要client进行connect,server不用做) ,但是client 和 server 都要close(),一次close()执行4次挥手中的2次,client 和 server各执行一次close(),总共是4次挥手
(4)什么是3次握手?什么是4次挥手?

为什么是3次?为什么是4次?这个问题后面再说

六.代码合集

代码合集:tcp接入线程池后使用popen的写法版本并将服务器改造成守护进程

daemonize.hpp

#pragma once

#include <cstdio>
#include <iostream>
#include <signal.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

void daemonize()
{
    int fd = 0;
    // 1. 忽略SIGPIPE
    signal(SIGPIPE, SIG_IGN);
    // 2. 更改进程的工作目录
    // chdir();
    // 3. 让自己不要成为进程组组长
    if (fork() > 0)
        exit(1);
    // 4. 设置自己是一个独立的会话
    setsid();
    // 5. 重定向0,1,2
    if ((fd = open("/dev/null", O_RDWR)) != -1) // fd == 3
    {
        dup2(fd, STDIN_FILENO);
        dup2(fd, STDOUT_FILENO);
        dup2(fd, STDERR_FILENO);
        // 6. 关闭掉不需要的fd
        if(fd > STDERR_FILENO) close(fd);
    }
}

serverTcp.cc

#include "util.hpp"
#include "Task.hpp"
#include "ThreadPool.hpp"
#include "daemonize.hpp"

#include <signal.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <pthread.h>

class ServerTcp; // 申明一下ServerTcp

// 大小写转化服务
// TCP && UDP: 支持全双工
void transService(int sock, const std::string &clientIp, uint16_t clientPort)
{
    assert(sock >= 0);
    assert(!clientIp.empty());
    assert(clientPort >= 1024);

    char inbuffer[BUFFER_SIZE];
    while (true)
    {
        ssize_t s = read(sock, inbuffer, sizeof(inbuffer) - 1); //我们认为我们读到的都是字符串
        if (s > 0)
        {
            // read success
            inbuffer[s] = '\0';
            if (strcasecmp(inbuffer, "quit") == 0)
            {
                logMessage(DEBUG, "client quit -- %s[%d]", clientIp.c_str(), clientPort);
                break;
            }
            logMessage(DEBUG, "trans before: %s[%d]>>> %s", clientIp.c_str(), clientPort, inbuffer);
            // 可以进行大小写转化了
            for (int i = 0; i < s; i++)
            {
                if (isalpha(inbuffer[i]) && islower(inbuffer[i]))
                    inbuffer[i] = toupper(inbuffer[i]);
            }
            logMessage(DEBUG, "trans after: %s[%d]>>> %s", clientIp.c_str(), clientPort, inbuffer);

            write(sock, inbuffer, strlen(inbuffer));
        }
        else if (s == 0)
        {
            // pipe: 读端一直在读,写端不写了,并且关闭了写端,读端会如何?s == 0,代表对端关闭
            // s == 0: 代表对方关闭,client 退出
            logMessage(DEBUG, "client quit -- %s[%d]", clientIp.c_str(), clientPort);
            break;
        }
        else
        {
            logMessage(DEBUG, "%s[%d] - read: %s", clientIp.c_str(), clientPort, strerror(errno));
            break;
        }
    }

    // 只要走到这里,一定是client退出了,服务到此结束
    close(sock); // 如果一个进程对应的文件fd,打开了没有被归还,文件描述符泄漏!
    logMessage(DEBUG, "server close %d done", sock);
}

void execCommand(int sock, const std::string &clientIp, uint16_t clientPort)
{
    assert(sock >= 0);
    assert(!clientIp.empty());
    assert(clientPort >= 1024);

    char command[BUFFER_SIZE];
    while (true)
    {
        ssize_t s = read(sock, command, sizeof(command) - 1); //我们认为我们读到的都是字符串
        if (s > 0)
        {
            command[s] = '\0';
            logMessage(DEBUG, "[%s:%d] exec [%s]", clientIp.c_str(), clientPort, command);
            // 考虑安全
            std::string safe = command;
            if((std::string::npos != safe.find("rm")) || (std::string::npos != safe.find("unlink")))
            {
                break;
            }
            // 我们是以r方式打开的文件,没有写入
            // 所以我们无法通过dup的方式得到对应的结果
            FILE *fp = popen(command, "r");
            if(fp == nullptr)
            {
                logMessage(WARINING, "exec %s failed, beacuse: %s", command, strerror(errno));
                break;
            }
            char line[1024];
            while(fgets(line, sizeof(line)-1, fp) != nullptr)
            {
                write(sock, line, strlen(line));
            }
            // dup2(fd, 1);
            // dup2(sock, fp->_fileno);
            // fflush(fp);
            pclose(fp);
            logMessage(DEBUG, "[%s:%d] exec [%s] ... done", clientIp.c_str(), clientPort, command);
        }
        else if (s == 0)
        {
            // pipe: 读端一直在读,写端不写了,并且关闭了写端,读端会如何?s == 0,代表对端关闭
            // s == 0: 代表对方关闭,client 退出
            logMessage(DEBUG, "client quit -- %s[%d]", clientIp.c_str(), clientPort);
            break;
        }
        else
        {
            logMessage(DEBUG, "%s[%d] - read: %s", clientIp.c_str(), clientPort, strerror(errno));
            break;
        }
    }

    // 只要走到这里,一定是client退出了,服务到此结束
    close(sock); // 如果一个进程对应的文件fd,打开了没有被归还,文件描述符泄漏!
    logMessage(DEBUG, "server close %d done", sock);
}

class ThreadData
{
public:
    uint16_t clientPort_;
    std::string clinetIp_;
    int sock_;
    ServerTcp *this_;

public:
    ThreadData(uint16_t port, std::string ip, int sock, ServerTcp *ts)
        : clientPort_(port), clinetIp_(ip), sock_(sock), this_(ts)
    {
    }
};

class ServerTcp
{
public:
    ServerTcp(uint16_t port, const std::string &ip = "")
        : port_(port),
          ip_(ip),
          listenSock_(-1),
          tp_(nullptr)
    {
    }
    ~ServerTcp()
    {
    }

public:
    void init()
    {
        // 1. 创建socket
        listenSock_ = socket(PF_INET, SOCK_STREAM, 0);
        if (listenSock_ < 0)
        {
            logMessage(FATAL, "socket: %s", strerror(errno));
            exit(SOCKET_ERR);
        }
        logMessage(DEBUG, "socket: %s, %d", strerror(errno), listenSock_);

        // 2. bind绑定
        // 2.1 填充服务器信息
        struct sockaddr_in local; // 用户栈
        memset(&local, 0, sizeof local);
        local.sin_family = PF_INET;
        local.sin_port = htons(port_);
        ip_.empty() ? (local.sin_addr.s_addr = INADDR_ANY) : (inet_aton(ip_.c_str(), &local.sin_addr));
        // 2.2 本地socket信息,写入sock_对应的内核区域
        if (bind(listenSock_, (const struct sockaddr *)&local, sizeof local) < 0)
        {
            logMessage(FATAL, "bind: %s", strerror(errno));
            exit(BIND_ERR);
        }
        logMessage(DEBUG, "bind: %s, %d", strerror(errno), listenSock_);

        // 3. 监听socket,为何要监听呢?tcp是面向连接的!
        if (listen(listenSock_, 5 /*后面再说*/) < 0)
        {
            logMessage(FATAL, "listen: %s", strerror(errno));
            exit(LISTEN_ERR);
        }
        logMessage(DEBUG, "listen: %s, %d", strerror(errno), listenSock_);
        // 运行别人来连接你了

        // 4. 加载线程池
        tp_ = ThreadPool<Task>::getInstance();
    }
    // static void *threadRoutine(void *args)
    // {
    //     pthread_detach(pthread_self()); //设置线程分离
    //     ThreadData *td = static_cast<ThreadData *>(args);
    //     td->this_->transService(td->sock_, td->clinetIp_, td->clientPort_);
    //     delete td;
    //     return nullptr;
    // }
    void loop()
    {
        // signal(SIGCHLD, SIG_IGN); // only Linux
        tp_->start();
        logMessage(DEBUG, "thread pool start success, thread num: %d", tp_->threadNum());
        while (true)
        {
            struct sockaddr_in peer;
            socklen_t len = sizeof(peer);
            // 4. 获取连接, accept 的返回值是一个新的socket fd ??
            // 4.1 listenSock_: 监听 && 获取新的链接-> sock
            // 4.2 serviceSock: 给用户提供新的socket服务
            int serviceSock = accept(listenSock_, (struct sockaddr *)&peer, &len);
            if (serviceSock < 0)
            {
                // 获取链接失败
                logMessage(WARINING, "accept: %s[%d]", strerror(errno), serviceSock);
                continue;
            }
            // 4.1 获取客户端基本信息
            uint16_t peerPort = ntohs(peer.sin_port);
            std::string peerIp = inet_ntoa(peer.sin_addr);

            logMessage(DEBUG, "accept: %s | %s[%d], socket fd: %d",
                       strerror(errno), peerIp.c_str(), peerPort, serviceSock);
            // 5 提供服务, echo -> 小写 -> 大写
            // 5.0 v0 版本 -- 单进程 -- 一旦进入transService,主执行流,就无法进行向后执行,
只能提供完毕服务之后才能进行accept
            // transService(serviceSock, peerIp, peerPort);

            // 5.1 v1 版本 -- 多进程版本 -- 父进程打开的文件会被子进程继承吗?会的
            // pid_t id = fork();
            // assert(id != -1);
            // if(id == 0)
            // {
            //     close(listenSock_); //建议
            //     //子进程
            //     transService(serviceSock, peerIp, peerPort);
            //     exit(0); // 进入僵尸
            // }
            // // 父进程
            // close(serviceSock); //这一步是一定要做的!

            // 5.1 v1.1 版本 -- 多进程版本  -- 也是可以的
            // 爷爷进程
            // pid_t id = fork();
            // if(id == 0)
            // {
            //     // 爸爸进程
            //     close(listenSock_);//建议
            //     // 又进行了一次fork,让 爸爸进程
            //     if(fork() > 0) exit(0);
            //     // 孙子进程 -- 就没有爸爸 -- 孤儿进程 -- 被系统领养 -- 回收问题就交给了系统来回收
            //     transService(serviceSock, peerIp, peerPort);
            //     exit(0);
            // }
            // // 父进程
            // close(serviceSock); //这一步是一定要做的!
            // // 爸爸进程直接终止,立马得到退出码,释放僵尸进程状态
            // pid_t ret = waitpid(id, nullptr, 0); //就用阻塞式
            // assert(ret > 0);
            // (void)ret;

            // 5.2 v2 版本 -- 多线程
            // 这里不需要进行关闭文件描述符吗??不需要啦
            // 多线程是会共享文件描述符表的!
            // ThreadData *td = new ThreadData(peerPort, peerIp, serviceSock, this);
            // pthread_t tid;
            // pthread_create(&tid, nullptr, threadRoutine, (void*)td);

            // 5.3 v3 版本 --- 线程池版本
            // 5.3.1 构建任务
            // 5.3 v3.1
            // Task t(serviceSock, peerIp, peerPort, std::bind(&ServerTcp::transService, 
this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
            // tp_->push(t);

            // 5.3 v3.2
            // Task t(serviceSock, peerIp, peerPort, transService);
            // tp_->push(t);
            // 5.3 v3.3
            Task t(serviceSock, peerIp, peerPort, execCommand);
            tp_->push(t);

            // waitpid(); 默认是阻塞等待!WNOHANG
            // 方案1

            // logMessage(DEBUG, "server 提供 service start ...");
            // sleep(1);
        }
    }

private:
    // sock
    int listenSock_;
    // port
    uint16_t port_;
    // ip
    std::string ip_;
    // 引入线程池
    ThreadPool<Task> *tp_;
};

static void Usage(std::string proc)
{
    std::cerr << "Usage:\n\t" << proc << " port ip" << std::endl;
    std::cerr << "example:\n\t" << proc << " 8080 127.0.0.1\n"
              << std::endl;
}

// ./ServerTcp local_port local_ip
int main(int argc, char *argv[])
{
    if (argc != 2 && argc != 3)
    {
        Usage(argv[0]);
        exit(USAGE_ERR);
    }
    uint16_t port = atoi(argv[1]);
    std::string ip;
    if (argc == 3)
        ip = argv[2];
    
    daemonize(); // 我们的进程就会成为守护进程

    ServerTcp svr(port, ip);
    svr.init();
    svr.loop();
    return 0;
}

clientTcp.cc

#include "util.hpp"
// 2. 需要bind吗??需要,但是不需要自己显示的bind! 不要自己bind!!!!
// 3. 需要listen吗?不需要的!
// 4. 需要accept吗?不需要的!

volatile bool quit = false;

static void Usage(std::string proc)
{
    std::cerr << "Usage:\n\t" << proc << " serverIp serverPort" << std::endl;
    std::cerr << "Example:\n\t" << proc << " 127.0.0.1 8081\n"
              << std::endl;
}
// ./clientTcp serverIp serverPort
int main(int argc, char *argv[])
{
    if (argc != 3)
    {
        Usage(argv[0]);
        exit(USAGE_ERR);
    }
    std::string serverIp = argv[1];
    uint16_t serverPort = atoi(argv[2]);

    // 1. 创建socket SOCK_STREAM
    int sock = socket(AF_INET, SOCK_STREAM, 0);
    if (sock < 0)
    {
        std::cerr << "socket: " << strerror(errno) << std::endl;
        exit(SOCKET_ERR);
    }

    // 2. connect,发起链接请求,你想谁发起请求呢??当然是向服务器发起请求喽
    // 2.1 先填充需要连接的远端主机的基本信息
    struct sockaddr_in server;
    memset(&server, 0, sizeof(server));
    server.sin_family = AF_INET;
    server.sin_port = htons(serverPort);
    inet_aton(serverIp.c_str(), &server.sin_addr);
    // 2.2 发起请求,connect 会自动帮我们进行bind!
    if (connect(sock, (const struct sockaddr *)&server, sizeof(server)) != 0)
    {
        std::cerr << "connect: " << strerror(errno) << std::endl;
        exit(CONN_ERR);
    }
    std::cout << "info : connect success: " << sock << std::endl;

    std::string message;
    while (!quit)
    {
        message.clear();
        std::cout << "请输入你的消息>>> ";
        std::getline(std::cin, message); // 结尾不会有\n
        if (strcasecmp(message.c_str(), "quit") == 0)
            quit = true;

        ssize_t s = write(sock, message.c_str(), message.size());
        if (s > 0)
        {
            message.resize(1024);
            ssize_t s = read(sock, (char *)(message.c_str()), 1024);
            if (s > 0)
                message[s] = 0;
            std::cout << "Server Echo>>> " << message << std::endl;
        }
        else if (s <= 0)
        {
            break;
        }
    }
    close(sock);
    return 0;
}

ThreadPool.hpp 线程池文件

#pragma once

#include <iostream>
#include <cassert>
#include <queue>
#include <memory>
#include <cstdlib>
#include <pthread.h>
#include <unistd.h>
#include <sys/prctl.h>
#include "Lock.hpp"

using namespace std;

int gThreadNum = 15;

template <class T>
class ThreadPool
{
private:
    ThreadPool(int threadNum = gThreadNum) : threadNum_(threadNum), isStart_(false)
    {
        assert(threadNum_ > 0);
        pthread_mutex_init(&mutex_, nullptr);
        pthread_cond_init(&cond_, nullptr);
    }
    ThreadPool(const ThreadPool<T> &) = delete;
    void operator=(const ThreadPool<T>&) = delete;

public:
    static ThreadPool<T> *getInstance()
    {
        static Mutex mutex;
        if (nullptr == instance) //仅仅是过滤重复的判断
        {
            LockGuard lockguard(&mutex); //进入代码块,加锁。退出代码块,自动解锁
            if (nullptr == instance)
            {
                instance = new ThreadPool<T>();
            }
        }

        return instance;
    }
    //类内成员, 成员函数,都有默认参数this
    static void *threadRoutine(void *args)
    {
        pthread_detach(pthread_self());
        ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args);
        // prctl(PR_SET_NAME, "follower"); // 更改线程名称
        while (1)
        {
            tp->lockQueue();
            while (!tp->haveTask())
            {
                tp->waitForTask();
            }
            //这个任务就被拿到了线程的上下文中
            T t = tp->pop();
            tp->unlockQueue();
            t(); // 让指定的先处理这个任务
        }
    }
    void start()
    {
        assert(!isStart_);
        for (int i = 0; i < threadNum_; i++)
        {
            pthread_t temp;
            pthread_create(&temp, nullptr, threadRoutine, this);
        }
        isStart_ = true;
    }
    void push(const T &in)
    {
        lockQueue();
        taskQueue_.push(in);
        choiceThreadForHandler();
        unlockQueue();
    }
    ~ThreadPool()
    {
        pthread_mutex_destroy(&mutex_);
        pthread_cond_destroy(&cond_);
    }
    int threadNum()
    {
        return threadNum_;
    }

private:
    void lockQueue() { pthread_mutex_lock(&mutex_); }
    void unlockQueue() { pthread_mutex_unlock(&mutex_); }
    bool haveTask() { return !taskQueue_.empty(); }
    void waitForTask() { pthread_cond_wait(&cond_, &mutex_); }
    void choiceThreadForHandler() { pthread_cond_signal(&cond_); }
    T pop()
    {
        T temp = taskQueue_.front();
        taskQueue_.pop();
        return temp;
    }

private:
    bool isStart_;
    int threadNum_;
    queue<T> taskQueue_;
    pthread_mutex_t mutex_;
    pthread_cond_t cond_;

    static ThreadPool<T> *instance;
    // const static int a = 100;
};

template <class T>
ThreadPool<T> *ThreadPool<T>::instance = nullptr;

log.hpp

#pragma once

#include <cstdio>
#include <ctime>
#include <cstdarg>
#include <cassert>
#include <cassert>
#include <cstring>
#include <cerrno>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>

#define DEBUG 0
#define NOTICE 1
#define WARINING 2
#define FATAL 3

const char *log_level[] = {"DEBUG", "NOTICE", "WARINING", "FATAL"};

#define LOGFILE "serverTcp.log"

// logMessage(DEBUG, "%d", 10);
void logMessage(int level, const char *format, ...)
{
    assert(level >= DEBUG);
    assert(level <= FATAL);

    char *name = getenv("USER");

    char logInfo[1024];
    va_list ap; // ap -> char*
    va_start(ap, format);

    vsnprintf(logInfo, sizeof(logInfo) - 1, format, ap);

    va_end(ap); // ap = NULL

    // 每次打开太麻烦
    umask(0);
    int fd = open(LOGFILE, O_WRONLY | O_CREAT | O_APPEND, 0666);
    assert(fd >= 0);

    FILE *out = (level == FATAL) ? stderr : stdout;

    dup2(fd, 1);
    dup2(fd, 2);

    fprintf(out, "%s | %u | %s | %s\n",
            log_level[level],
            (unsigned int)time(nullptr),
            name == nullptr ? "unknow" : name,
            logInfo);

    fflush(out); // 将C缓冲区中的数据刷新到OS
    fsync(fd);   // 将OS中的数据尽快刷盘 

    close(fd);
    // char *s = format;
    // while(s){
    //     case '%':
    //         if(*(s+1) == 'd')  int x = va_arg(ap, int);
    //     break;
    // }
}

Task.hpp

#pragma once

#include <iostream>
#include <string>
#include <functional>
#include <pthread.h>
#include "log.hpp"

class Task
{
public:
    //等价于
    // typedef std::function<void (int, std::string, uint16_t)> callback_t;
    using callback_t = std::function<void (int, std::string, uint16_t)>;
private:
    int sock_; // 给用户提供IO服务的sock
    uint16_t port_;  // client port
    std::string ip_; // client ip
    callback_t func_;  // 回调方法
public:
    Task():sock_(-1), port_(-1)
    {}
    Task(int sock, std::string ip, uint16_t port, callback_t func)
    : sock_(sock), ip_(ip), port_(port), func_(func)
    {}
    void operator () ()
    {
        logMessage(DEBUG, "线程ID[%p]处理%s:%d的请求 开始啦...",\
            pthread_self(), ip_.c_str(), port_);

        func_(sock_, ip_, port_);

        logMessage(DEBUG, "线程ID[%p]处理%s:%d的请求 结束啦...",\
            pthread_self(), ip_.c_str(), port_);
    }
    ~Task()
    {}
};

Lock.hpp

#pragma once

#include <iostream>
#include <pthread.h>

class Mutex
{
public:
    Mutex()
    {
        pthread_mutex_init(&lock_, nullptr);
    }
    void lock()
    {
        pthread_mutex_lock(&lock_);
    }
    void unlock()
    {
        pthread_mutex_unlock(&lock_);
    }
    ~Mutex()
    {
        pthread_mutex_destroy(&lock_);
    }

private:
    pthread_mutex_t lock_;
};

class LockGuard
{
public:
    LockGuard(Mutex *mutex) : mutex_(mutex)
    {
        mutex_->lock();
        std::cout << "加锁成功..." << std::endl;
    }

    ~LockGuard()
    {
        mutex_->unlock();
        std::cout << "解锁成功...." << std::endl;
    }

private:
    Mutex *mutex_;
};

Makefile

.PHONY:all
all:clientTcp serverTcpd

clientTcp: clientTcp.cc
    g++ -o $@ $^ -std=c++11
serverTcpd:serverTcp.cc
    g++ -o $@ $^ -std=c++11 -lpthread

.PHONY:clean
clean:
    rm -f serverTcpd clientTcp

util.hpp

#pragma once

#include <iostream>
#include <string>
#include <cstring>
#include <cstdlib>
#include <cassert>
#include <ctype.h>
#include <unistd.h>
#include <strings.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "log.hpp"

#define SOCKET_ERR 1
#define BIND_ERR   2
#define LISTEN_ERR 3
#define USAGE_ERR  4
#define CONN_ERR   5

#define BUFFER_SIZE 1024
标签: linux 服务器

本文转载自: https://blog.csdn.net/zhang_si_hang/article/details/128395483
版权归原作者 beyond.myself 所有, 如有侵权,请联系我们删除。

“linux篇【12】:网络套接字<后序>—tcp接入线程池并改为守护进程”的评论:

还没有评论