admin管理员组文章数量:1437312
Linux高效IO
IO的五种模型
IO是什么? 用户在应用层使用read和write的时候,本质就是将数据从用户层写给OS——也就是拷贝函数,所以IO的本质就是进行拷贝+等待(比如没有数据就需要等待,也就是说,要进行拷贝必须先判断条件是否成立,然后在进行读写) 那么什么是高效的IO呢?单位时间内,IO过程中,等的比重越小,IO效率越高。(几乎所有提高IO效率的策略都是让等的比重变小)
五种IO模型 1.阻塞式IO(遇到一个数据读写一个数据,没有就一直等待) 2.非阻塞式IO,非阻塞轮询(这里时不时回来看一眼没有没数据,有的话进行IO,没有就去做其他事) 3.信号驱动式IO(如果有数据了就回来进行IO,没有就去做别的事) 4.多路复用,多路转接(弄出一大堆的接受数据的入口,然后轮询式查看有没有数据进来,如果有就进行IO处理)——这种方式获得数据效率最高,而且在只有一个数据的情况下,这种方式获得数据的概率也是最大的。 上面1-4式同步IO。 5.异步IO(自己派一个“工具人”执行IO,怎么执行不重要,重要是能IO就可以了,自己并不参与IO当中,就像父进程创建子进程,让子进程打工一样。) 由此得出:阻塞IO VS 非阻塞IO:等的方式不同。(非阻塞是不一直等) 同步IO VS 异步IO:异步是不会去等的,同步都在等。(包括非阻塞,因为非阻塞虽然会做别的事,但是也会时不时来看一眼,而异步是完全不理会,且不在乎)
这五种哪个效率最高?那就是多路转接和非阻塞IO。
非阻塞 IO 往往需要程序员循环的方式反复尝试读写文件描述符, 这个过程称为轮询. 这对 CPU 来说是较大的浪费, 一般只有特定场景下才使用.
IO 多路转接: 虽然从流程图上看起来和阻塞 IO 类似. 实际上最核心在于 IO 多路转接能够同时等待多个文件描述符的就绪状态.
非阻塞
fcntl 一个文件描述符, 默认都是阻塞 IO,让这个文件描述符以后无论被read还是write使用的时候,都是非阻塞式。 函数原型如下:
代码语言:javascript代码运行次数:0运行复制#include <unistd.h>
#include <fcntl.h>
int fcntl(int fd, int cmd, ... /* arg */ );//将fd文件描述符设置成非阻塞
复制一个现有的描述符(cmd=F_DUPFD). 获得/设置文件描述符标记(cmd=F_GETFD 或 F_SETFD). 获得/设置文件状态标记(cmd=F_GETFL 或 F_SETFL). 获得/设置异步 I/O 所有权(cmd=F_GETOWN 或 F_SETOWN). 获得/设置记录锁(cmd=F_GETLK,F_SETLK 或 F_SETLKW).
例如:
代码语言:javascript代码运行次数:0运行复制void SetNoBlock(int fd) {
int fl = fcntl(fd, F_GETFL);
if (fl < 0) {
perror("fcntl");
return;
}
fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}
使用 F_GETFL 将当前的文件描述符的属性取出来(这是一个位图). 然后再使用 F_SETFL 将文件描述符设置回去. 设置回去的同时, 加上一个O_NONBLOCK 参数.
这里要注意: 1.设置成为非阻塞,如果底层的fd数据没有就绪,read,write等函数就会以出错的形式返回。 2.分为两种情况,真的出错和底层数据没有就绪,需要区分 3.通过返回的error值区分。(错误码11式底层没有数据——EWOULDBLOCK)
多路转接
select
这个函数只负责等,一次等多个fd。
返回值有三种情况: n>0:有n个fd就绪了。 n==0:超时,没有出错,但是也没有fd就绪 n<0:出错了 第一个参数 要等待的文件描述符最大值+1. 最后一个参数是: 首先认识一个时间结构体,第一个参数是时间戳(秒为单位),第二个参数是时间戳(微秒为单位)
这个参数是这种类型,意思是设置一个等待时间。 例如:设置为[5,0],意思就是,如果一直没有fd就绪,那就5s阻塞等待,5s之后返回,如果中途有fd就绪就直接返回。设置为[0,0],也就是非阻塞,立马返回。设置为NULL就是阻塞等待。 并且这个参数是输入输出型参数,假如说设置5s的时间,2s就来了fd,那就立即返回,并且这个时间就会变成3s。 再来了解一下中间的三个参数: 首先清楚,fd_set内核提供的一种数据类型,它是位图。(比特位的个数最多是1024个) 其次,我们目前关心fd上面的事件也就3种,读,写,错误。 并且,这三个参数也都是输入输出型参数(第一个是读事件,第二个是写事件,第三个是错误事件)
输入时,用户告诉内核,我给你一个或者多个fd,你要帮我关心fd上面的事件,如果事件就绪了,你要告诉我。(输入的位图当中,从右向左数,标识文件描述符的编号,比特位的内容是0or1,代表是否需要关心) 输出时,内核告诉用户,你让我关心的多个fd种有哪些事件已经就绪。(返回的时候,根据刚才输入进来的位图查看哪些需要关心,然后将就绪的fd返回,其他的清0,假设是0000 1111,只有2号fd就绪了,就返回0000 0010)
代码使用select
代码语言:javascript代码运行次数:0运行复制#include <iostream>
#include <time.h>
#include <stdarg.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#define SIZE 1024
#define Info 0
#define Debug 1
#define Warning 2
#define Error 3
#define Fatal 4
#define Screen 1
#define Onefile 2
#define Classfile 3
#define LogFile "log.txt"
class Log
{
public:
Log()
{
printMethod = Screen;
path = "./log/";
}
void Enable(int method)
{
printMethod = method;
}
std::string levelToString(int level)
{
switch (level)
{
case Info:
return "Info";
case Debug:
return "Debug";
case Warning:
return "Warning";
case Error:
return "Error";
case Fatal:
return "Fatal";
default:
return "None";
}
}
void printLog(int level, const std::string &logtxt)
{
switch (printMethod)
{
case Screen:
std::cout << logtxt << std::endl;
break;
case Onefile:
printOneFile(LogFile, logtxt);
break;
case Classfile:
printClassFile(level, logtxt);
break;
default:
break;
}
}
void printOneFile(const std::string &logname, const std::string &logtxt)
{
std::string _logname = path + logname;
int fd = open(_logname.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0666); // "log.txt"
if (fd < 0)
return;
write(fd, logtxt.c_str(), logtxt.size());
close(fd);
}
void printClassFile(int level, const std::string &logtxt)
{
std::string filename = LogFile;
filename += ".";
filename += levelToString(level); // "log.txt.Debug/Warning/Fatal"
printOneFile(filename, logtxt);
}
~Log()
{
}
void operator()(int level, const char *format, ...)
{
time_t t = time(nullptr);
struct tm *ctime = localtime(&t);
char leftbuffer[SIZE];
snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]", levelToString(level).c_str(),
ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday,
ctime->tm_hour, ctime->tm_min, ctime->tm_sec);
va_list s;
va_start(s, format);
char rightbuffer[SIZE];
vsnprintf(rightbuffer, sizeof(rightbuffer), format, s);
va_end(s);
// 格式:默认部分+自定义部分
char logtxt[SIZE * 2];
snprintf(logtxt, sizeof(logtxt), "%s %s", leftbuffer, rightbuffer);
// printf("%s", logtxt); // 暂时打印
printLog(level, logtxt);
}
private:
int printMethod;
std::string path;
};
Log lg;
代码语言:javascript代码运行次数:0运行复制#include <iostream>
#include <string>
#include <unistd.h>
#include <cstring>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include "log.hpp"
using namespace std;
enum
{
SocketErr = 2,
BindErr,
ListenErr,
};
const int backlog = 10;
class Sock
{
public:
Sock()
{}
~Sock()
{}
void Socket()
{
sockfd_ = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd_ < 0)
{
lg(Fatal, "socker error, %s: %d", strerror(errno), errno);
exit(SocketErr);
}
int opt = 1;
setsockopt(sockfd_,SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt));
}
void Bind(uint16_t port)
{
struct sockaddr_in local;
memset(&local, 0, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = INADDR_ANY;
if (bind(sockfd_, (struct sockaddr *)&local, sizeof(local)) < 0)
{
lg(Fatal, "bind error, %s: %d", strerror(errno), errno);
exit(BindErr);
}
}
void Listen()
{
if (listen(sockfd_, backlog) < 0)
{
lg(Fatal, "listen error, %s: %d", strerror(errno), errno);
exit(ListenErr);
}
}
int Accept(string *clientip, uint16_t *clientport)
{
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
int newfd = accept(sockfd_, (struct sockaddr*)&peer, &len);
if(newfd < 0)
{
lg(Warning, "accept error, %s: %d", strerror(errno), errno);
return -1;
}
char ipstr[64];
inet_ntop(AF_INET, &peer.sin_addr, ipstr, sizeof(ipstr));
*clientip = ipstr;
*clientport = ntohs(peer.sin_port);
return newfd;
}
bool Connect(const string &ip, const uint16_t &port)
{
struct sockaddr_in peer;
memset(&peer, 0, sizeof(peer));
peer.sin_family = AF_INET;
peer.sin_port = htons(port);
inet_pton(AF_INET, ip.c_str(), &(peer.sin_addr));
int n = connect(sockfd_, (struct sockaddr*)&peer, sizeof(peer));
if(n == -1)
{
std::cerr << "connect to " << ip << ":" << port << " error" << std::endl;
return false;
}
return true;
}
void Close()
{
close(sockfd_);
}
int Fd()
{
return sockfd_;
}
private:
int sockfd_;
};
代码语言:javascript代码运行次数:0运行复制#pragma once
#include <iostream>
#include <sys/select.h>
#include <sys/time.h>
#include "Socket.hpp"
#include <memory>
using namespace std;
static const uint16_t defaultport = 8888;
static const int fd_num_max = (sizeof(fd_set) * 8);//最多储存fd的数量
int defaultfd = -1;
class SelectServer
{
public:
SelectServer(uint16_t port = defaultport) : _port(port)
{
for (int i = 0; i < fd_num_max; i++)
{
fd_array[i] = defaultfd;//默认设置为-1
// std::cout << "fd_array[" << i << "]" << " : " << fd_array[i] << std::endl;
}
}
bool Init()
{
_listensock.Socket();
_listensock.Bind(_port);
_listensock.Listen();
return true;
}
void Accepter()
{
// 我们的连接事件就绪了
std::string clientip;
uint16_t clientport = 0;
int sock = _listensock.Accept(&clientip, &clientport); // 会不会阻塞在这里?不会
if (sock < 0) return;
lg(Info, "accept success, %s: %d, sock fd: %d", clientip.c_str(), clientport, sock);
int pos = 1;
for (; pos < fd_num_max; pos++) //找位置
{
if (fd_array[pos] != defaultfd)
continue;
else
break;
}
if (pos == fd_num_max)//找不到,位图装不下这么多的fd了
{
lg(Warning, "server is full, close %d now!", sock);
close(sock);
}
else
{
fd_array[pos] = sock;//找到了,新获取的fd放在这里
PrintFd();
// TODO
}
}
void Recver(int fd, int pos)
{
char buffer[1024];
ssize_t n = read(fd, buffer, sizeof(buffer) - 1); // bug?
if (n > 0)
{
buffer[n] = 0;
cout << "get a messge: " << buffer << endl;
}
else if (n == 0)
{
lg(Info, "client quit, me too, close fd is : %d", fd);
close(fd);
fd_array[pos] = defaultfd; // 这里本质是从select中移除
}
else
{
lg(Warning, "recv error: fd is : %d", fd);
close(fd);
fd_array[pos] = defaultfd; // 这里本质是从select中移除
}
}
void Dispatcher(fd_set &rfds)
{
for (int i = 0; i < fd_num_max; i++) //遍历所有的fd
{
int fd = fd_array[i];
if (fd == defaultfd)
continue;
if (FD_ISSET(fd, &rfds))//查看当前fd是否存在
{
if (fd == _listensock.Fd())//如果是监听fd就让其去获取新的fd
{
Accepter(); // 连接管理器
}
else // non listenfd
{
Recver(fd, i);
}
}
}
}
void Start()
{
int listensock = _listensock.Fd();
fd_array[0] = listensock;//将监听fd放在第一个位置
for(;;)//accept不能直接接收,要先进行listensock上面的检测事件就绪在进行接收,等价于读事件就绪
{
fd_set rfds;
FD_ZERO(&rfds);//将位图清零
int maxfd = fd_array[0];
for (int i = 0; i < fd_num_max; i++) // 第一次循环
{
if (fd_array[i] == defaultfd)
continue;
FD_SET(fd_array[i], &rfds);//只要找到合法的fd全都设置进入位图
if (maxfd < fd_array[i])
{
maxfd = fd_array[i];//找到最大的fd
lg(Info, "max fd update, max fd is: %d", maxfd);
}
}
// struct timeval timeout = {1, 0}; // 输入输出,可能要进行周期的重复设置
struct timeval timeout = {0, 0}; // 输入输出,可能要进行周期的重复设置
//因为select的rfds参数是输入输出的参数,加入输入的有3个位就绪了,但是输出的时候,只输出就绪的位,之前的没就绪的位全都被覆盖了
//也就是说以后每次都要重新设置
int n = select(maxfd + 1, &rfds, nullptr, nullptr, /*&timeout*/ nullptr);//如果事件就绪不处理,select会一直通知
switch (n)
{
case 0:
cout << "time out, timeout: " << timeout.tv_sec << "." << timeout.tv_usec << endl;
break;
case -1:
cerr << "select error" << endl;
break;
default:// select告诉你就绪了,接下来的一次读取,我们读取fd的时候,不会被阻塞
// 有事件就绪了,TODO
cout << "get a new link!!!!!" << endl;
Dispatcher(rfds); // 就绪的事件和fd你怎么知道只有一个呢???
break;
}
}
}
void PrintFd()
{
cout << "online fd list: ";
for (int i = 0; i < fd_num_max; i++)
{
if (fd_array[i] == defaultfd)
continue;
cout << fd_array[i] << " ";
}
cout << endl;
}
~SelectServer()
{
_listensock.Close();
}
private:
Sock _listensock;
uint16_t _port;
int fd_array[fd_num_max]; //辅助数组,为了方便管理fd,方便传递fd
};
由此可以得知,select是有缺点的: 1.等待的fd是有上限的。 2.输入输出参数比较多,数据拷贝的频率较高。 3.输入输出参数比较多,每次都要对进行关心的fd进行重置。 4.需要使用第三方数组进行管理,用户层和内核层检测fd事件就绪,都需要遍历。
poll
因为select的缺点,所以有了poll。
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
第一个参数是上面这个结构体。 这个结构体主要的功能就是对于fd而言,将输入和输出事件进行了分离。 short是16个比特位,也是位图的样子:
第二个参数是数组的长度。 第三个参数是表示 poll 函数的超时时间, 单位是毫秒(ms).
代码语言:javascript代码运行次数:0运行复制#pragma once
#include <iostream>
#include <poll.h>
#include <sys/time.h>
#include "Socket.hpp"
using namespace std;
static const uint16_t defaultport = 8888;
static const int fd_num_max = 64;
int defaultfd = -1;
int non_event = 0;
class PollServer
{
public:
PollServer(uint16_t port = defaultport) : _port(port)
{
for (int i = 0; i < fd_num_max; i++)
{
_event_fds[i].fd = defaultfd;
_event_fds[i].events = non_event;
_event_fds[i].revents = non_event;
}
}
bool Init()
{
_listensock.Socket();
_listensock.Bind(_port);
_listensock.Listen();
return true;
}
void Accepter()
{
std::string clientip;
uint16_t clientport = 0;
int sock = _listensock.Accept(&clientip, &clientport);
if (sock < 0) return;
lg(Info, "accept success, %s: %d, sock fd: %d", clientip.c_str(), clientport, sock);
// sock -> fd_array[]
int pos = 1;
for (; pos < fd_num_max; pos++)
{
if (_event_fds[pos].fd != defaultfd)
continue;
else
break;
}
if (pos == fd_num_max)
{
lg(Warning, "server is full, close %d now!", sock);
close(sock);
}
else
{
// fd_array[pos] = sock;
_event_fds[pos].fd = sock;
_event_fds[pos].events = POLLIN;
_event_fds[pos].revents = non_event;
PrintFd();
}
}
void Recver(int fd, int pos)
{
char buffer[1024];
ssize_t n = read(fd, buffer, sizeof(buffer) - 1); // bug?
if (n > 0)
{
buffer[n] = 0;
cout << "get a messge: " << buffer << endl;
}
else if (n == 0)
{
lg(Info, "client quit, me too, close fd is : %d", fd);
close(fd);
_event_fds[pos].fd = defaultfd;
}
else
{
lg(Warning, "recv error: fd is : %d", fd);
close(fd);
_event_fds[pos].fd = defaultfd;
}
}
void Dispatcher()
{
for (int i = 0; i < fd_num_max; i++)
{
int fd = _event_fds[i].fd;
if (fd == defaultfd)
continue;
if (_event_fds[i].revents & POLLIN)
{
if (fd == _listensock.Fd())
{
Accepter();
}
else // non listenfd
{
Recver(fd, i);
}
}
}
}
void Start()
{
_event_fds[0].fd = _listensock.Fd();
_event_fds[0].events = POLLIN;
int timeout = 3000; // 3s
for (;;)
{
int n = poll(_event_fds, fd_num_max, timeout);//不需要进行重复的设置位图
switch (n)
{
case 0:
cout << "time out... " << endl;
break;
case -1:
cerr << "poll error" << endl;
break;
default:
cout << "get a new link!!!!!" << endl;
Dispatcher();
break;
}
}
}
void PrintFd()
{
cout << "online fd list: ";
for (int i = 0; i < fd_num_max; i++)
{
if (_event_fds[i].fd == defaultfd)
continue;
cout << _event_fds[i].fd << " ";
}
cout << endl;
}
~PollServer()
{
_listensock.Close();
}
private:
Sock _listensock;
uint16_t _port;
struct pollfd _event_fds[fd_num_max]; // 数组, 用户维护的!
};
虽然使用这个函数还是要用辅助数组,但是空间开辟多大是由机器的硬件设备决定的,而不是像select一样,参数规定死的。 并且,不需要进行重复的设置位图,因为是当前的fd种的输入输出已经被分离开了,不会相互覆盖。 但是,在用户层和内核层依旧需要去遍历这个底层的数组。
epoll
这是优化版本的poll,但其实和poll根本不是一类函数了。
常用接口
#include <sys/epoll.h> int epoll_create(int size);
这是创建一个epoll的函数。 自从 linux2.6.8 之后,size 参数是被忽略的。用完之后, 必须调用 close()关闭。 返回值是成功创建返回新的fd,失败返回-1,同时也是下面这个函数的一个参数:
int epoll_wait(int epfd, struct epoll_event * events, intmaxevents, int timeout);
第一个参数就是上面创建epoll的返回值。 第二个参数是返回已就绪的fd。
代码语言:javascript代码运行次数:0运行复制typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events;/*位图形式传递标记为*/ /* Epoll events */
epoll_data_t data; /* User data variable */
};
第三个参数是返回已就绪的事件。 第四个参数是超时时间。 返回值是已经就绪的fd的个数。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event*event);
它不同于 select()是在监听事件时告诉内核要监听什么类型的事件, 而是在这里先注册要监听的事件类型. 第一个参数是 epoll_create()的返回值. 第二个参数表示动作,用三个宏来表示. EPOLL_CTL_ADD:注册新的 fd 到 epfd 中; EPOLL_CTL_MOD:修改已经注册的 fd 的监听事件; EPOLL_CTL_DEL:从 epfd 中删除一个 fd; 第三个参数是需要监听的 fd. 第四个参数是告诉内核需要监听什么事.
原理
首先提出个问题,OS是如何直到网卡有数据的呢?答案是硬件中断,一旦数据就绪了,中断就会通过OS找到网卡驱动,将数据从网卡读取到网卡驱动当中。 epoll在读取之前,OS会在内存当中维护一个红黑树,节点存放的是fd和要关心的事件等等信息。 并且还有一个就绪队列,红黑树当中哪些节点就绪,就会构建一个新节点连接到就绪队列当中,主要信息就是就绪节点的fd和就绪事件。(这个工作完全由操作系统来工作) 上面说过的通过网卡驱动拿到数据之后,会立刻调用一个回调函数,作用是向上交付,交给TCP的缓冲区。
这里再来说一说上面三个接口是如何运作的。 epoll_create: 首先创建一个struct file的文件结构体,这个结构体指向epoll模型。 epoll_ctl: 通过epfd找到对应的文件结构体,再找到对应的epoll模型里面的红黑树进行op操作。(增加,删除,修改三个操作) epoll_wait: 将就绪队列当中的节点依次放入自己第二个参数。
epoll优势: 1.检测就绪O(1),获取就绪O(N) 2.文件描述符和事件没有上限 3.这颗红黑树就是上面select和poll的辅助数组,OS帮我们维护和使用,不用自己用了。 4.返回值n表示有几个fd事件就绪了,就绪事件是连续的。这样就可以根据返回值个数遍历,遍历几个就处理几个,这样就不会有浪费的动作。(select和poll是需要过滤哪些fd事件是就绪的)
代码实战
代码语言:javascript代码运行次数:0运行复制//nocopy.hpp
#pragma once
class nocopy//不需要拷贝
{
public:
nocopy(){}
nocopy(const nocopy &) = delete;
const nocopy&operator=(const nocopy &) = delete;
};
代码语言:javascript代码运行次数:0运行复制//Epoller.hpp
#pragma once
#include "nocopy.hpp"
#include "log.hpp"
#include <cerrno>
#include <cstring>
#include <sys/epoll.h>
class Epoller:public nocopy
{
static const int size = 128;
public:
Epoller()
{
_epfd = epoll_create(size);
if (_epfd == -1)
{
lg(Error, "epoll_create error: %s", strerror(errno));
}
else
{
lg(Info, "epoll_create success: %d", _epfd);
}
}
int EpollerWait(struct epoll_event revents[], int num)
{
int n = epoll_wait(_epfd, revents, num, /*_timeout 0*/ -1);
return n;
}
int EpllerUpdate(int oper, int sock, uint32_t event)//添加需要关心的事件
{
int n = 0;
if (oper == EPOLL_CTL_DEL)//删除
{
n = epoll_ctl(_epfd, oper, sock, nullptr);
if (n != 0)
{
lg(Error, "epoll_ctl delete error!");
}
}
else
{
// EPOLL_CTL_MOD || EPOLL_CTL_ADD,修改或添加
struct epoll_event ev;
ev.events = event;
ev.data.fd = sock; // 目前,方便我们后期得知,是哪一个fd就绪了!
n = epoll_ctl(_epfd, oper, sock, &ev);
if (n != 0)
{
lg(Error, "epoll_ctl error!");
}
}
return n;
}
~Epoller()
{
if (_epfd >= 0)
close(_epfd);
}
private:
int _epfd;
int _timeout{3000};
};
代码语言:javascript代码运行次数:0运行复制//epollsever.hpp
#pragma once
#include <iostream>
#include <memory>
#include <sys/epoll.h>
#include "Socket.hpp"
#include "log.hpp"
#include "Epoller.hpp"
#include "nocopy.hpp"
uint32_t EVENT_IN = (EPOLLIN);
uint32_t EVENT_OUT = (EPOLLOUT);
class EpollServer:public nocopy
{
static const int num = 64;
public:
EpollServer(uint16_t port)
:_port(port),
_listsocket_ptr(new Sock()),
_epoller_ptr(new Epoller())
{
}
void Init()
{
_listsocket_ptr->Socket();
_listsocket_ptr->Bind(_port);
_listsocket_ptr->Listen();
lg(Info, "create listen socket success: %d\n", _listsocket_ptr->Fd());
}
void Accepter()
{
// 获取了一个新连接
std::string clientip;
uint16_t clientport;
int sock = _listsocket_ptr->Accept(&clientip, &clientport);
if (sock > 0)
{
// 我们能直接读取吗?不能
_epoller_ptr->EpllerUpdate(EPOLL_CTL_ADD, sock, EVENT_IN);
lg(Info, "get a new link, client info@ %s:%d", clientip.c_str(), clientport);
}
}
void Recver(int fd)
{
// demo
char buffer[1024];
ssize_t n = read(fd, buffer, sizeof(buffer) - 1); // 这里存在一个问题
if (n > 0)
{
buffer[n] = 0;
std::cout << "get a messge: " << buffer << std::endl;
// wrirte
std::string echo_str = "server echo $ ";
echo_str += buffer;
write(fd, echo_str.c_str(), echo_str.size());
}
else if (n == 0)
{
lg(Info, "client quit, me too, close fd is : %d", fd);
//细节3
_epoller_ptr->EpllerUpdate(EPOLL_CTL_DEL, fd, 0);
close(fd);
}
else
{
lg(Warning, "recv error: fd is : %d", fd);
_epoller_ptr->EpllerUpdate(EPOLL_CTL_DEL, fd, 0);
close(fd);
}
}
void Dispatcher(struct epoll_event revs[], int num)
{
for (int i = 0; i < num; i++)
{
uint32_t events = revs[i].events;//拿到就绪队列当前fd的事件
int fd = revs[i].data.fd;//拿到就绪队列当前位置的文件描述符
if (events & EVENT_IN)
{
if (fd == _listsocket_ptr->Fd())
{
Accepter();
}
else
{
// 其他fd上面的普通读取事件就绪
Recver(fd);
}
}
else if (events & EVENT_OUT)
{
}
else
{
}
}
}
void Start()
{
//将监听fd和他关心的事件添加到epoll模型的红黑树当中
_epoller_ptr->EpllerUpdate(EPOLL_CTL_ADD, _listsocket_ptr->Fd(), EVENT_IN);
struct epoll_event revs[num];
for (;;)
{
int n = _epoller_ptr->EpollerWait(revs, num);
if (n > 0)
{
// 有事件就绪
lg(Debug, "event happened, fd is : %d", revs[0].data.fd);
Dispatcher(revs, n);
}
else if (n == 0)
{
lg(Info, "time out ...");
}
else
{
lg(Error, "epll wait error");
}
}
}
~EpollServer()
{
_listsocket_ptr->Close();
}
private:
std::shared_ptr<Sock> _listsocket_ptr;
std::shared_ptr<Epoller> _epoller_ptr;
uint16_t _port;
};
LT与ET
epoll默认模式:LT模式。事件到来,如果一直不处理,就会不断的提示处理这里的事件。 ET:数据或连接,从无到有,从有到多,变化的时候,才会通知一次。(只有事件到来数据发生变化的时候才会提醒) ET的通知效率更高),LT一直提醒,CPU会占用资源。 ET会逼着程序员每次通知都要把数据都拿走,全部取走代表要循环的读取fd,但是一旦读完毕之后就无法读了,会阻塞等待,服务器一旦阻塞住就代表挂起,无法正常工作了,所以要设置成非阻塞读取。 ET的IO效率更高,因为数据被全部拿走之后,TCP会告诉对方一个更大的窗口,可以发送更多的数据。 但是如果LT如果将FD都设置成非阻塞,然后循环读取,通知的第一次就全部取走,也可以达到只通知一次的效果。
本质就是: 调用epoll_wait 时,只要数据没有读取完,LT 每次都会将文件描述符重新添加到就绪队列中; 而 ET 只会添加一次文件描述符到就绪队列,数据没有读取完也不会再添加到就绪队列了,直到事件状态发生变化时才会继续添加。
上面的代码Recver函数有的一个问题,不一定能完整的读取一个报文,也就是buff里面的内容,并且buffer还是一个临时空间,第二次再读取剩下的报文,原本的报文也会因临时空间销毁而消失。
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。 原始发表:2025-04-27,如有侵权请联系 cloudcommunity@tencent 删除函数事件数据linuxio本文标签: Linux高效IO
版权声明:本文标题:Linux高效IO 内容由网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://www.betaflare.com/biancheng/1747486831a2699621.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论