admin管理员组

文章数量:1437312

Linux高效IO

IO的五种模型

IO是什么? 用户在应用层使用read和write的时候,本质就是将数据从用户层写给OS——也就是拷贝函数,所以IO的本质就是进行拷贝+等待(比如没有数据就需要等待,也就是说,要进行拷贝必须先判断条件是否成立,然后在进行读写) 那么什么是高效的IO呢?单位时间内,IO过程中,等的比重越小,IO效率越高。(几乎所有提高IO效率的策略都是让等的比重变小)

五种IO模型 1.阻塞式IO(遇到一个数据读写一个数据,没有就一直等待) 2.非阻塞式IO,非阻塞轮询(这里时不时回来看一眼没有没数据,有的话进行IO,没有就去做其他事) 3.信号驱动式IO(如果有数据了就回来进行IO,没有就去做别的事) 4.多路复用,多路转接(弄出一大堆的接受数据的入口,然后轮询式查看有没有数据进来,如果有就进行IO处理)——这种方式获得数据效率最高,而且在只有一个数据的情况下,这种方式获得数据的概率也是最大的。 上面1-4式同步IO。 5.异步IO(自己派一个“工具人”执行IO,怎么执行不重要,重要是能IO就可以了,自己并不参与IO当中,就像父进程创建子进程,让子进程打工一样。) 由此得出:阻塞IO VS 非阻塞IO:等的方式不同。(非阻塞是不一直等) 同步IO VS 异步IO:异步是不会去等的,同步都在等。(包括非阻塞,因为非阻塞虽然会做别的事,但是也会时不时来看一眼,而异步是完全不理会,且不在乎)

这五种哪个效率最高?那就是多路转接和非阻塞IO。

非阻塞 IO 往往需要程序员循环的方式反复尝试读写文件描述符, 这个过程称为轮询. 这对 CPU 来说是较大的浪费, 一般只有特定场景下才使用.

IO 多路转接: 虽然从流程图上看起来和阻塞 IO 类似. 实际上最核心在于 IO 多路转接能够同时等待多个文件描述符的就绪状态.

非阻塞

fcntl 一个文件描述符, 默认都是阻塞 IO,让这个文件描述符以后无论被read还是write使用的时候,都是非阻塞式。 函数原型如下:

代码语言:javascript代码运行次数:0运行复制
#include <unistd.h>
#include <fcntl.h>
int fcntl(int fd, int cmd, ... /* arg */ );//将fd文件描述符设置成非阻塞

复制一个现有的描述符(cmd=F_DUPFD). 获得/设置文件描述符标记(cmd=F_GETFD 或 F_SETFD). 获得/设置文件状态标记(cmd=F_GETFL 或 F_SETFL). 获得/设置异步 I/O 所有权(cmd=F_GETOWN 或 F_SETOWN). 获得/设置记录锁(cmd=F_GETLK,F_SETLK 或 F_SETLKW).

例如:

代码语言:javascript代码运行次数:0运行复制
void SetNoBlock(int fd) {
int fl = fcntl(fd, F_GETFL);
if (fl < 0) {
	perror("fcntl");
	return;
}
	fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}

使用 F_GETFL 将当前的文件描述符的属性取出来(这是一个位图). 然后再使用 F_SETFL 将文件描述符设置回去. 设置回去的同时, 加上一个O_NONBLOCK 参数.

这里要注意: 1.设置成为非阻塞,如果底层的fd数据没有就绪,read,write等函数就会以出错的形式返回。 2.分为两种情况,真的出错和底层数据没有就绪,需要区分 3.通过返回的error值区分。(错误码11式底层没有数据——EWOULDBLOCK)

多路转接

select

这个函数只负责等,一次等多个fd。

返回值有三种情况: n>0:有n个fd就绪了。 n==0:超时,没有出错,但是也没有fd就绪 n<0:出错了 第一个参数 要等待的文件描述符最大值+1. 最后一个参数是: 首先认识一个时间结构体,第一个参数是时间戳(秒为单位),第二个参数是时间戳(微秒为单位)

这个参数是这种类型,意思是设置一个等待时间。 例如:设置为[5,0],意思就是,如果一直没有fd就绪,那就5s阻塞等待,5s之后返回,如果中途有fd就绪就直接返回。设置为[0,0],也就是非阻塞,立马返回。设置为NULL就是阻塞等待。 并且这个参数是输入输出型参数,假如说设置5s的时间,2s就来了fd,那就立即返回,并且这个时间就会变成3s。 再来了解一下中间的三个参数: 首先清楚,fd_set内核提供的一种数据类型,它是位图。(比特位的个数最多是1024个) 其次,我们目前关心fd上面的事件也就3种,读,写,错误。 并且,这三个参数也都是输入输出型参数(第一个是读事件,第二个是写事件,第三个是错误事件)

输入时,用户告诉内核,我给你一个或者多个fd,你要帮我关心fd上面的事件,如果事件就绪了,你要告诉我。(输入的位图当中,从右向左数,标识文件描述符的编号,比特位的内容是0or1,代表是否需要关心) 输出时,内核告诉用户,你让我关心的多个fd种有哪些事件已经就绪。(返回的时候,根据刚才输入进来的位图查看哪些需要关心,然后将就绪的fd返回,其他的清0,假设是0000 1111,只有2号fd就绪了,就返回0000 0010)

代码使用select

代码语言:javascript代码运行次数:0运行复制
#include <iostream>
#include <time.h>
#include <stdarg.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>

#define SIZE 1024

#define Info 0
#define Debug 1
#define Warning 2
#define Error 3
#define Fatal 4

#define Screen 1
#define Onefile 2
#define Classfile 3

#define LogFile "log.txt"

class Log
{
public:
    Log()
    {
        printMethod = Screen;
        path = "./log/";
    }
    void Enable(int method)
    {
        printMethod = method;
    }
    std::string levelToString(int level)
    {
        switch (level)
        {
        case Info:
            return "Info";
        case Debug:
            return "Debug";
        case Warning:
            return "Warning";
        case Error:
            return "Error";
        case Fatal:
            return "Fatal";
        default:
            return "None";
        }
    }

    void printLog(int level, const std::string &logtxt)
    {
        switch (printMethod)
        {
        case Screen:
            std::cout << logtxt << std::endl;
            break;
        case Onefile:
            printOneFile(LogFile, logtxt);
            break;
        case Classfile:
            printClassFile(level, logtxt);
            break;
        default:
            break;
        }
    }
    void printOneFile(const std::string &logname, const std::string &logtxt)
    {
        std::string _logname = path + logname;
        int fd = open(_logname.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0666); // "log.txt"
        if (fd < 0)
            return;
        write(fd, logtxt.c_str(), logtxt.size());
        close(fd);
    }
    void printClassFile(int level, const std::string &logtxt)
    {
        std::string filename = LogFile;
        filename += ".";
        filename += levelToString(level); // "log.txt.Debug/Warning/Fatal"
        printOneFile(filename, logtxt);
    }

    ~Log()
    {
    }
    void operator()(int level, const char *format, ...)
    {
        time_t t = time(nullptr);
        struct tm *ctime = localtime(&t);
        char leftbuffer[SIZE];

        snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]", levelToString(level).c_str(),
                 ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday,
                 ctime->tm_hour, ctime->tm_min, ctime->tm_sec);

        va_list s;
        va_start(s, format);
        char rightbuffer[SIZE];
        vsnprintf(rightbuffer, sizeof(rightbuffer), format, s);
        va_end(s);

        // 格式:默认部分+自定义部分
        char logtxt[SIZE * 2];
        snprintf(logtxt, sizeof(logtxt), "%s %s", leftbuffer, rightbuffer);

        // printf("%s", logtxt); // 暂时打印
        printLog(level, logtxt);
    }

private:
    int printMethod;
    std::string path;
};
Log lg;
代码语言:javascript代码运行次数:0运行复制
#include <iostream>
#include <string>
#include <unistd.h>
#include <cstring>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "log.hpp"
using namespace std;
enum
{
    SocketErr = 2,
    BindErr,
    ListenErr,
};
const int backlog = 10;
class Sock
{
public:
    Sock()
    {}
    ~Sock()
    {}
    void Socket()
    {
        sockfd_ = socket(AF_INET, SOCK_STREAM, 0);
        if (sockfd_ < 0)
        {
            lg(Fatal, "socker error, %s: %d", strerror(errno), errno);
            exit(SocketErr);
        }
        int opt = 1;
        setsockopt(sockfd_,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
    }
    void Bind(uint16_t port)
    {
        struct sockaddr_in local;
        memset(&local, 0, sizeof(local));
        local.sin_family = AF_INET;
        local.sin_port = htons(port);
        local.sin_addr.s_addr = INADDR_ANY;

        if (bind(sockfd_, (struct sockaddr *)&local, sizeof(local)) < 0)
        {
            lg(Fatal, "bind error, %s: %d", strerror(errno), errno);
            exit(BindErr);
        }
    }
    void Listen()
    {
        if (listen(sockfd_, backlog) < 0)
        {
            lg(Fatal, "listen error, %s: %d", strerror(errno), errno);
            exit(ListenErr);
        }
    }
    int Accept(string *clientip, uint16_t *clientport)
    {
        struct sockaddr_in peer;
        socklen_t len = sizeof(peer);
        int newfd = accept(sockfd_, (struct sockaddr*)&peer, &len);
        if(newfd < 0)
        {
            lg(Warning, "accept error, %s: %d", strerror(errno), errno);
            return -1;
        }
        char ipstr[64];
        inet_ntop(AF_INET, &peer.sin_addr, ipstr, sizeof(ipstr));
        *clientip = ipstr;
        *clientport = ntohs(peer.sin_port);

        return newfd;
    }
    bool Connect(const string &ip, const uint16_t &port)
    {
        struct sockaddr_in peer;
        memset(&peer, 0, sizeof(peer));
        peer.sin_family = AF_INET;
        peer.sin_port = htons(port);
        inet_pton(AF_INET, ip.c_str(), &(peer.sin_addr));

        int n = connect(sockfd_, (struct sockaddr*)&peer, sizeof(peer));
        if(n == -1) 
        {
            std::cerr << "connect to " << ip << ":" << port << " error" << std::endl;
            return false;
        }
        return true;
    }
    void Close()
    {
        close(sockfd_);
    }
    int Fd()
    {
        return sockfd_;
    }
private:
    int sockfd_;
};
代码语言:javascript代码运行次数:0运行复制
#pragma once

#include <iostream>
#include <sys/select.h>
#include <sys/time.h>
#include "Socket.hpp"
#include <memory>
using namespace std;

static const uint16_t defaultport = 8888;
static const int fd_num_max = (sizeof(fd_set) * 8);//最多储存fd的数量
int defaultfd = -1;
class SelectServer
{
public:
     SelectServer(uint16_t port = defaultport) : _port(port)
    {
        for (int i = 0; i < fd_num_max; i++)
        {
            fd_array[i] = defaultfd;//默认设置为-1
            // std::cout << "fd_array[" << i << "]" << " : " << fd_array[i] << std::endl;
        }
    }
    bool Init()
    {
        _listensock.Socket();
        _listensock.Bind(_port);
        _listensock.Listen();

        return true;
    }
    void Accepter()
    {
         // 我们的连接事件就绪了
        std::string clientip;
        uint16_t clientport = 0;
        int sock = _listensock.Accept(&clientip, &clientport); // 会不会阻塞在这里?不会
        if (sock < 0) return;
        lg(Info, "accept success, %s: %d, sock fd: %d", clientip.c_str(), clientport, sock);
        int pos = 1;
        for (; pos < fd_num_max; pos++) //找位置
        {
            if (fd_array[pos] != defaultfd)
                continue;
            else
                break;
        }
        if (pos == fd_num_max)//找不到,位图装不下这么多的fd了
        {
            lg(Warning, "server is full, close %d now!", sock);
            close(sock);
        }
        else
        {
            fd_array[pos] = sock;//找到了,新获取的fd放在这里
            PrintFd();
            // TODO
        }

    }
    void Recver(int fd, int pos)
    {
        char buffer[1024];
        ssize_t n = read(fd, buffer, sizeof(buffer) - 1); // bug?
        if (n > 0)
        {
            buffer[n] = 0;
            cout << "get a messge: " << buffer << endl;
        }
        else if (n == 0)
        {
            lg(Info, "client quit, me too, close fd is : %d", fd);
            close(fd);
            fd_array[pos] = defaultfd; // 这里本质是从select中移除
        }
        else
        {
            lg(Warning, "recv error: fd is : %d", fd);
            close(fd);
            fd_array[pos] = defaultfd; // 这里本质是从select中移除
        }
    }
    void Dispatcher(fd_set &rfds)
    {
        for (int i = 0; i < fd_num_max; i++) //遍历所有的fd
        {
            int fd = fd_array[i];
            if (fd == defaultfd)
                continue;

            if (FD_ISSET(fd, &rfds))//查看当前fd是否存在
            {
                if (fd == _listensock.Fd())//如果是监听fd就让其去获取新的fd
                {
                    Accepter(); // 连接管理器
                }
                else // non listenfd
                {
                    Recver(fd, i);
                }
            }
        }
    }
    void Start()
    {
        int listensock = _listensock.Fd();
        fd_array[0] = listensock;//将监听fd放在第一个位置
        for(;;)//accept不能直接接收,要先进行listensock上面的检测事件就绪在进行接收,等价于读事件就绪
        {
            fd_set rfds;
            FD_ZERO(&rfds);//将位图清零
            int maxfd = fd_array[0];
            for (int i = 0; i < fd_num_max; i++) // 第一次循环
            {
                if (fd_array[i] == defaultfd)
                    continue;
                FD_SET(fd_array[i], &rfds);//只要找到合法的fd全都设置进入位图
                if (maxfd < fd_array[i])
                {
                    maxfd = fd_array[i];//找到最大的fd
                    lg(Info, "max fd update, max fd is: %d", maxfd);
                }
            }
            // struct timeval timeout = {1, 0}; // 输入输出,可能要进行周期的重复设置
            struct timeval timeout = {0, 0}; // 输入输出,可能要进行周期的重复设置
            //因为select的rfds参数是输入输出的参数,加入输入的有3个位就绪了,但是输出的时候,只输出就绪的位,之前的没就绪的位全都被覆盖了
            //也就是说以后每次都要重新设置
            int n = select(maxfd + 1, &rfds, nullptr, nullptr, /*&timeout*/ nullptr);//如果事件就绪不处理,select会一直通知
            switch (n)
            {
            case 0:
                cout << "time out, timeout: " << timeout.tv_sec << "." << timeout.tv_usec << endl;
                break;
            case -1:
                cerr << "select error" << endl;
                break;
            default:// select告诉你就绪了,接下来的一次读取,我们读取fd的时候,不会被阻塞
                // 有事件就绪了,TODO
                cout << "get a new link!!!!!" << endl;
                Dispatcher(rfds); // 就绪的事件和fd你怎么知道只有一个呢???
                break;
            }
        }
    }
    void PrintFd()
    {
        cout << "online fd list: ";
        for (int i = 0; i < fd_num_max; i++)
        {
            if (fd_array[i] == defaultfd)
                continue;
            cout << fd_array[i] << " ";
        }
        cout << endl;
    }
    ~SelectServer()
    {
        _listensock.Close();
    }

private:
    Sock _listensock;
    uint16_t _port;
    int fd_array[fd_num_max]; //辅助数组,为了方便管理fd,方便传递fd
};

由此可以得知,select是有缺点的: 1.等待的fd是有上限的。 2.输入输出参数比较多,数据拷贝的频率较高。 3.输入输出参数比较多,每次都要对进行关心的fd进行重置。 4.需要使用第三方数组进行管理,用户层和内核层检测fd事件就绪,都需要遍历。

poll

因为select的缺点,所以有了poll。

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

第一个参数是上面这个结构体。 这个结构体主要的功能就是对于fd而言,将输入和输出事件进行了分离。 short是16个比特位,也是位图的样子:

第二个参数是数组的长度。 第三个参数是表示 poll 函数的超时时间, 单位是毫秒(ms).

代码语言:javascript代码运行次数:0运行复制
#pragma once

#include <iostream>
#include <poll.h>
#include <sys/time.h>
#include "Socket.hpp"

using namespace std;

static const uint16_t defaultport = 8888;
static const int fd_num_max = 64;
int defaultfd = -1;
int non_event = 0;

class PollServer
{
public:
    PollServer(uint16_t port = defaultport) : _port(port)
    {
        for (int i = 0; i < fd_num_max; i++)
        {
            _event_fds[i].fd = defaultfd;
            _event_fds[i].events = non_event;
            _event_fds[i].revents = non_event;

           
        }
    }
    bool Init()
    {
        _listensock.Socket();
        _listensock.Bind(_port);
        _listensock.Listen();

        return true;
    }
    void Accepter()
    {
        
        std::string clientip;
        uint16_t clientport = 0;
        int sock = _listensock.Accept(&clientip, &clientport);
        if (sock < 0) return;
        lg(Info, "accept success, %s: %d, sock fd: %d", clientip.c_str(), clientport, sock);

        // sock -> fd_array[]
        int pos = 1;
        for (; pos < fd_num_max; pos++) 
        {
            if (_event_fds[pos].fd != defaultfd)
                continue;
            else
                break;
        }
        if (pos == fd_num_max)
        {
            lg(Warning, "server is full, close %d now!", sock);
            close(sock);
           
        }
        else
        {
            // fd_array[pos] = sock;
            _event_fds[pos].fd = sock;
            _event_fds[pos].events = POLLIN;
            _event_fds[pos].revents = non_event;
            PrintFd();
           
        }
    }
    void Recver(int fd, int pos)
    {
        
        char buffer[1024];
        ssize_t n = read(fd, buffer, sizeof(buffer) - 1); // bug?
        if (n > 0)
        {
            buffer[n] = 0;
            cout << "get a messge: " << buffer << endl;
        }
        else if (n == 0)
        {
            lg(Info, "client quit, me too, close fd is : %d", fd);
            close(fd);
            _event_fds[pos].fd = defaultfd; 
        }
        else
        {
            lg(Warning, "recv error: fd is : %d", fd);
            close(fd);
            _event_fds[pos].fd = defaultfd;
        }
    }
    void Dispatcher()
    {
        for (int i = 0; i < fd_num_max; i++)
        {
            int fd = _event_fds[i].fd;
            if (fd == defaultfd)
                continue;

            if (_event_fds[i].revents & POLLIN)
            {
                if (fd == _listensock.Fd())
                {
                    Accepter(); 
                }
                else // non listenfd
                {
                    Recver(fd, i);
                }
            }
        }
    }
    void Start()
    {
        _event_fds[0].fd = _listensock.Fd();
        _event_fds[0].events = POLLIN;
        int timeout = 3000; // 3s
        for (;;)
        {
            int n = poll(_event_fds, fd_num_max, timeout);//不需要进行重复的设置位图
            switch (n)
            {
            case 0:
                cout << "time out... " << endl;
                break;
            case -1:
                cerr << "poll error" << endl;
                break;
            default:
                
                cout << "get a new link!!!!!" << endl;
                Dispatcher(); 
                break;
            }
        }
    }
    void PrintFd()
    {
        cout << "online fd list: ";
        for (int i = 0; i < fd_num_max; i++)
        {
            if (_event_fds[i].fd == defaultfd)
                continue;
            cout << _event_fds[i].fd << " ";
        }
        cout << endl;
    }
    ~PollServer()
    {
        _listensock.Close();
    }

private:
    Sock _listensock;
    uint16_t _port;
    struct pollfd _event_fds[fd_num_max]; // 数组, 用户维护的!
};

虽然使用这个函数还是要用辅助数组,但是空间开辟多大是由机器的硬件设备决定的,而不是像select一样,参数规定死的。 并且,不需要进行重复的设置位图,因为是当前的fd种的输入输出已经被分离开了,不会相互覆盖。 但是,在用户层和内核层依旧需要去遍历这个底层的数组。

epoll

这是优化版本的poll,但其实和poll根本不是一类函数了。

常用接口

#include <sys/epoll.h> int epoll_create(int size);

这是创建一个epoll的函数。 自从 linux2.6.8 之后,size 参数是被忽略的。用完之后, 必须调用 close()关闭。 返回值是成功创建返回新的fd,失败返回-1,同时也是下面这个函数的一个参数:

int epoll_wait(int epfd, struct epoll_event * events, intmaxevents, int timeout);

第一个参数就是上面创建epoll的返回值。 第二个参数是返回已就绪的fd。

代码语言:javascript代码运行次数:0运行复制
typedef union epoll_data {
               void    *ptr;
               int      fd;
               uint32_t u32;
               uint64_t u64;
           } epoll_data_t;
           struct epoll_event {
               uint32_t    events;/*位图形式传递标记为*/    /* Epoll events */
               epoll_data_t data;      /* User data variable */
};

第三个参数是返回已就绪的事件。 第四个参数是超时时间。 返回值是已经就绪的fd的个数。

int epoll_ctl(int epfd, int op, int fd, struct epoll_event*event);

它不同于 select()是在监听事件时告诉内核要监听什么类型的事件, 而是在这里先注册要监听的事件类型. 第一个参数是 epoll_create()的返回值. 第二个参数表示动作,用三个宏来表示. EPOLL_CTL_ADD:注册新的 fd 到 epfd 中; EPOLL_CTL_MOD:修改已经注册的 fd 的监听事件; EPOLL_CTL_DEL:从 epfd 中删除一个 fd; 第三个参数是需要监听的 fd. 第四个参数是告诉内核需要监听什么事.

原理

首先提出个问题,OS是如何直到网卡有数据的呢?答案是硬件中断,一旦数据就绪了,中断就会通过OS找到网卡驱动,将数据从网卡读取到网卡驱动当中。 epoll在读取之前,OS会在内存当中维护一个红黑树,节点存放的是fd和要关心的事件等等信息。 并且还有一个就绪队列,红黑树当中哪些节点就绪,就会构建一个新节点连接到就绪队列当中,主要信息就是就绪节点的fd和就绪事件。(这个工作完全由操作系统来工作) 上面说过的通过网卡驱动拿到数据之后,会立刻调用一个回调函数,作用是向上交付,交给TCP的缓冲区。

这里再来说一说上面三个接口是如何运作的。 epoll_create: 首先创建一个struct file的文件结构体,这个结构体指向epoll模型。 epoll_ctl: 通过epfd找到对应的文件结构体,再找到对应的epoll模型里面的红黑树进行op操作。(增加,删除,修改三个操作) epoll_wait: 将就绪队列当中的节点依次放入自己第二个参数。

epoll优势: 1.检测就绪O(1),获取就绪O(N) 2.文件描述符和事件没有上限 3.这颗红黑树就是上面select和poll的辅助数组,OS帮我们维护和使用,不用自己用了。 4.返回值n表示有几个fd事件就绪了,就绪事件是连续的。这样就可以根据返回值个数遍历,遍历几个就处理几个,这样就不会有浪费的动作。(select和poll是需要过滤哪些fd事件是就绪的)

代码实战

代码语言:javascript代码运行次数:0运行复制
//nocopy.hpp
#pragma once

class nocopy//不需要拷贝
{
public:
    nocopy(){}
    nocopy(const nocopy &) = delete;
    const nocopy&operator=(const nocopy &) = delete;
};
代码语言:javascript代码运行次数:0运行复制
//Epoller.hpp
#pragma once

#include "nocopy.hpp"
#include "log.hpp"
#include <cerrno>
#include <cstring>
#include <sys/epoll.h>


class Epoller:public nocopy
{
    static const int size = 128;
public:
    Epoller()
    {
        _epfd = epoll_create(size);
        if (_epfd == -1)
        {
            lg(Error, "epoll_create error: %s", strerror(errno));
        }
        else
        {
            lg(Info, "epoll_create success: %d", _epfd);
        }
    }
    int EpollerWait(struct epoll_event revents[], int num)
    {
        int n = epoll_wait(_epfd, revents, num, /*_timeout 0*/ -1);
        return n;
    }
    int EpllerUpdate(int oper, int sock, uint32_t event)//添加需要关心的事件
    {
        int n = 0;
        if (oper == EPOLL_CTL_DEL)//删除
        {
            n = epoll_ctl(_epfd, oper, sock, nullptr);
            if (n != 0)
            {
                lg(Error, "epoll_ctl delete error!");
            }
        }
        else
        {
            // EPOLL_CTL_MOD || EPOLL_CTL_ADD,修改或添加
            struct epoll_event ev;
            ev.events = event;
            ev.data.fd = sock; // 目前,方便我们后期得知,是哪一个fd就绪了!

            n = epoll_ctl(_epfd, oper, sock, &ev);
            if (n != 0)
            {
                lg(Error, "epoll_ctl error!");
            }
        }
        return n;
    }
    ~Epoller()
    {
        if (_epfd >= 0)
            close(_epfd);
    }
private:
    int _epfd;
    int _timeout{3000};
};
代码语言:javascript代码运行次数:0运行复制
//epollsever.hpp
#pragma once

#include <iostream>
#include <memory>
#include <sys/epoll.h>
#include "Socket.hpp"
#include "log.hpp"
#include "Epoller.hpp"
#include "nocopy.hpp"

uint32_t EVENT_IN = (EPOLLIN);
uint32_t EVENT_OUT = (EPOLLOUT);

class EpollServer:public nocopy
{
    static const int num = 64;
public:
    EpollServer(uint16_t port)
        :_port(port),
        _listsocket_ptr(new Sock()),
        _epoller_ptr(new Epoller())
    {
    }
    void Init()
    {
        _listsocket_ptr->Socket();
        _listsocket_ptr->Bind(_port);
        _listsocket_ptr->Listen();

        lg(Info, "create listen socket success: %d\n", _listsocket_ptr->Fd());
    }
    void Accepter()
    {
        // 获取了一个新连接
        std::string clientip;
        uint16_t clientport;
        int sock = _listsocket_ptr->Accept(&clientip, &clientport);
        if (sock > 0)
        {
            // 我们能直接读取吗?不能
            _epoller_ptr->EpllerUpdate(EPOLL_CTL_ADD, sock, EVENT_IN);
            lg(Info, "get a new link, client info@ %s:%d", clientip.c_str(), clientport);
        }
    }
    void Recver(int fd)
    {
        // demo
        char buffer[1024];
        ssize_t n = read(fd, buffer, sizeof(buffer) - 1); // 这里存在一个问题
        if (n > 0)
        {
            buffer[n] = 0;
            std::cout << "get a messge: " << buffer << std::endl;
            // wrirte
            std::string echo_str = "server echo $ ";
            echo_str += buffer;
            write(fd, echo_str.c_str(), echo_str.size());
        }
        else if (n == 0)
        {
            lg(Info, "client quit, me too, close fd is : %d", fd);
            //细节3
            _epoller_ptr->EpllerUpdate(EPOLL_CTL_DEL, fd, 0);
            close(fd);
        }
        else
        {
            lg(Warning, "recv error: fd is : %d", fd);
            _epoller_ptr->EpllerUpdate(EPOLL_CTL_DEL, fd, 0);
            close(fd);
        }
    }
    void Dispatcher(struct epoll_event revs[], int num)
    {
        for (int i = 0; i < num; i++)
        {
            uint32_t events = revs[i].events;//拿到就绪队列当前fd的事件
            int fd = revs[i].data.fd;//拿到就绪队列当前位置的文件描述符
            if (events & EVENT_IN)
            {
                if (fd == _listsocket_ptr->Fd())
                {
                    Accepter();
                }
                else
                {
                    // 其他fd上面的普通读取事件就绪
                    Recver(fd);
                }
            }
            else if (events & EVENT_OUT)
            {
            }
            else
            {
            }
        }
    }
    void Start()
    {
        //将监听fd和他关心的事件添加到epoll模型的红黑树当中
        _epoller_ptr->EpllerUpdate(EPOLL_CTL_ADD, _listsocket_ptr->Fd(), EVENT_IN);
        struct epoll_event revs[num];
        for (;;)
        {
            int n = _epoller_ptr->EpollerWait(revs, num);
            if (n > 0)
            {
                // 有事件就绪
                lg(Debug, "event happened, fd is : %d", revs[0].data.fd);
                Dispatcher(revs, n);
            }
            else if (n == 0)
            {
                lg(Info, "time out ...");
            }
            else
            {
                lg(Error, "epll wait error");
            }
        }
    }
    ~EpollServer()
    {
        _listsocket_ptr->Close();
    }
private:    
    std::shared_ptr<Sock> _listsocket_ptr;
    std::shared_ptr<Epoller> _epoller_ptr;
    uint16_t _port;
};

LT与ET

epoll默认模式:LT模式。事件到来,如果一直不处理,就会不断的提示处理这里的事件。 ET:数据或连接,从无到有,从有到多,变化的时候,才会通知一次。(只有事件到来数据发生变化的时候才会提醒) ET的通知效率更高),LT一直提醒,CPU会占用资源。 ET会逼着程序员每次通知都要把数据都拿走,全部取走代表要循环的读取fd,但是一旦读完毕之后就无法读了,会阻塞等待,服务器一旦阻塞住就代表挂起,无法正常工作了,所以要设置成非阻塞读取。 ET的IO效率更高,因为数据被全部拿走之后,TCP会告诉对方一个更大的窗口,可以发送更多的数据。 但是如果LT如果将FD都设置成非阻塞,然后循环读取,通知的第一次就全部取走,也可以达到只通知一次的效果。

本质就是: 调用epoll_wait 时,只要数据没有读取完,LT 每次都会将文件描述符重新添加到就绪队列中; 而 ET 只会添加一次文件描述符到就绪队列,数据没有读取完也不会再添加到就绪队列了,直到事件状态发生变化时才会继续添加。

上面的代码Recver函数有的一个问题,不一定能完整的读取一个报文,也就是buff里面的内容,并且buffer还是一个临时空间,第二次再读取剩下的报文,原本的报文也会因临时空间销毁而消失。

本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。 原始发表:2025-04-27,如有侵权请联系 cloudcommunity@tencent 删除函数事件数据linuxio

本文标签: Linux高效IO