通过上下两篇详细讲述EPOLL的是原理及使用方法,上篇主要针对基本概念和基础用法
1. 概念
1.1 基本概念解释
流(stream)
指计算机中顺序读写字节序列。对应于现实中的输入模型(磁带、纸带,他们都是按顺序从头开始)
流是能统一描述所有常见输入输出类型的模型,包括文件、键盘、显示器等等,能够通过输入输出重定向(不使用标准IO函数,编写相应的读写函数,处理文件、键盘等模型)。相较于随机读写,流模型能兼容不同的读写介质,提高读写效率。
缓冲区(Buffer)
假设有一个管道,A端为输入,B端为输出。
- 一开始管道是空的,对应内核缓冲区为空,B被阻塞。
- 当A开始写入数据时,缓冲区非空,B可能还是读取也可能继续休息。
- 若A写满了缓冲区,则A被阻塞,等待B读取数据。
- 若B读取所有数据,缓冲区为空,则B又被阻塞。
总结起来就是:
- 缓冲区空:两种可能,A没开始输入,B读取完毕所有数据,两种情况下B都被阻塞
- 缓冲区非满:A开始写入数据,B可能读取也可能休息
- 缓冲区满:A不能再写入,被阻塞,等待B醒来。B读取时,A可能接着写入可能继续休息。
1.2 四种事件处理的方式
阻塞IO:一个线程只能处理一个流的I/O事件。除非采用多线程,否则效率很低。
非阻塞忙轮询IO:即Non-Blocking Busy Polling,等待某个事件的时候,放弃其他事情,休息,专门等待,称之为阻塞。等待过程中不休息,不断询问事件是否完成,称之为非阻塞忙轮询。可以同时处理多个流,但需要从头到尾轮询,浪费资源。
Select:相当于一位代理,帮我们观察流。但这位代理只会告诉我们此刻是否有IO事件发生,我们却不知道是哪些流,只能无差别轮询。
Epoll:即event poll,不同于无差别轮询,epoll会把哪个流发生什么样的事情通知我们。
1.3 Select/Poll的缺点
- 单个进程能够监视的文件描述为最大为1024,轮询越多,性能越差
- Select需要复制大量句柄数据结构,开销巨大。
- Select返回整个句柄数组,程序需要遍历数组才能知道哪些句柄发生了什么事件。
- 触发方式为水平触发,如果程序没有对一个就绪的文件进行IO操作,之后每次Select调用还是会将文件描述符通知给进程。
Poll使用链表保存文件,没有1的限制,但其他三个缺点依然比较明显。
2. Epoll模型
2.1 Epoll机制解析
Epoll也就是events poll,它分为三个部分实现:
- 调用
epoll_creat
建立一个epoll对象 - 调用
epoll_ctl
向epoll对象中添加众多套接字 - 调用
epoll_wait
收集发生的事件的连接
某一进程调用epoll_creat
时,Linux内核会创建一个eventpoll
结构体。结构体中存在两个成员:
- 红黑树的根节点,存储epoll需要监控的事件
- 双链表的头部,存储
epoll_wait
返回给用户的满足条件的事件
用户通过epoll_ctl
向epoll对象中注册事件,这些事件会挂载到红黑树中,能高效识别重复事件。所有注册的事件都会和驱动程序建立回调关系,事件发生时调用这个回调方法,将发生的事件添加到双链表rdlist
中。
当调用epoll_wait
检查事件发生时,只需要检查双链表rdlist
是否有元素即可。通过epoll_ctl
把所有事件传入内核,一起wait,避免了不必要的重复拷贝。
当某一进程调用epoll_create
方法时,Linux内核会创建一个eventpoll
结构体:
struct eventpoll{
....
/*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
struct rb_root rbr;
/*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
struct list_head rdlist;
....
};
每次调用epoll_create
方法都会创建一个epoll对象,每一个对象都有一个独立的eventpoll
结构体,用于存放通过epoll_ctl
方法向epoll对象中添加进来的事件。在epoll中,对于每一个事件,都会建立一个epitem
结构体,如下所示:
struct epitem{
struct rb_node rbn;//红黑树节点
struct list_head rdllink;//双向链表节点
struct epoll_filefd ffd; //事件句柄信息
struct eventpoll *ep; //指向其所属的eventpoll对象
struct epoll_event event; //期待发生的事件类型
}
总结一下,epoll内部有两个重要的结构体:
- eventpoll:每个epoll都有一个,维护一个红黑树和双向链表,ctl注册的事件挂载到红黑树,发生的事件挂载到链表。
- epitem:每个事件都有一个,记载了事件在红黑树和链表中的位置,以及事件的属性。
2.2 Epoll使用方法
1.创建Epoll句柄
int epfd = epoll_create(intsize);
创建一个epoll句柄,size用来告诉内核这个监听的数目有多大。注意:当创建好epoll句柄后,它就是会占用一个fd值,所以使用完epoll以后,需要及时调用close()
关闭,否则会导致fd耗尽。
2.注册或修改监听事件
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
参数:
- epfd,之前creat的句柄
- op,动作类型
EPOLL_CTL_ADD
:注册新的fd到epfd中EPOLL_CTL_MOD
:修改已经注册的fd的监听事件EPOLL_CTL_DEL
:从epfd中删除一个fd
- fd,需要监听的文件句柄
- event,需要监听的事件
调用成功返回0,不成功返回-1。以下是epoll_event
结构体
// 保存触发事件的某个文件描述符相关的数据
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
// 注册所感兴趣的事件和回传所发生待处理的事件
struct epoll_event {
__uint32_t events; // Epoll events
epoll_data_t data; // User data variable
};
其中events
包含了如下的宏集合:
EPOLLIN
:表示对应的文件描述符可读(包括对端Socket)EPOLLOUT
:表示对应的文件描述符可写EPOLLPRI
:表示对应的文件描述符有紧急数据可读(带外数据)EPOLLERR
:表示对应的文件描述符发生错误;EPOLLHUP
:表示对应的文件描述符被挂断;EPOLLET
:将EPOLL设为边缘触发(Edge Triggered),这是相对于水平触发(Level Triggered)而言的。
边缘触发和水平触发的区别是:
- 水平触发LT:默认方式,支持阻塞和非阻塞。内核告诉你某个fd就位了,如果你不对这个fd进行IO操作,内核会一直通知你。所以这种模式安全性较高。
- 边缘触发ET:只支持非阻塞,内核只会通知你一次,如果你不操作他也不管你了,速度快
3.等待事件触发
int epoll_wait(int epfd, struct epoll_event * events, intmaxevents, int timeout);
参数:
events
是分配好的epoll_event
结构体数组,epoll
将会把发生的事件赋值到events
数组中。maxevents
告诉内核这个events
数组有多大,这个maxevents
的值不能大于创建epoll_create
时的size。- 参数
timeout
是超时时间(毫秒)。
该函数返回需要处理的事件数目,如返回0表示已超时。
等侍注册在epfd
上的socket fd
的事件的发生.
- 如果发生则将发生的
socket fd
和事件类型放入到events
数组中,并将注册在epfd
上的socket fd
的事件类型给清空。 - 如果下一个循环还要关注这个
socket fd
的话,则需要用epoll_ctl(epfd,EPOLL_CTL_MOD,listenfd,&ev)
来重新设置socket fd
的事件类型。这时不用EPOLL_CTL_ADD
,因为socket fd
并未清空,只是事件类型清空。
3. 实战
使用epoll的基本逻辑如下。我们需要用到多个socket句柄,首先本地服务器肯定要创建一个socket,同时也需要将这个fd注册到epoll中。
实际代码如下,主要工作是初始化服务器socket,并将其注册到epoll中。
/*步骤1:设置socket */
struct sockaddr_in clientaddr;
struct sockaddr_in serveraddr;
listenfd = socket(AF_INET, SOCK_STREAM, 0);
setnonblocking(listenfd); //自己写的函数,把socket设置为非阻塞方式
/*步骤2:创建并设置epoll */
struct epoll_event ev, events[20]; //声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件
epfd = epoll_create(256); //生成用于处理accept的epoll专用的文件描述符
ev.data.fd = listenfd; //设置与要处理的事件相关的文件描述符
ev.events = EPOLLIN | EPOLLET; //设置要处理的事件类型
epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev); //注册epoll事件
/*步骤3:连接并监听网络 */
bzero(&serveraddr, sizeof(serveraddr)); //置零
serveraddr.sin_family = AF_INET;
char *local_addr = "127.0.0.1";
inet_aton(local_addr, &(serveraddr.sin_addr)); //将一个字符串IP地址转换为一个32位的网络序列IP地址
serveraddr.sin_port = htons(portnumber); //portnumber自己设的端口号
bind(listenfd, (sockaddr *)&serveraddr, sizeof(serveraddr));
listen(listenfd, 20); //最大等待20个
客户端与服务器沟通分为两步:
- 客户端连接,事件的句柄fd是服务器的fd。
- 客户端收发数据,事件的句柄fd是客户端本身的socket句柄
代码如下:
/*步骤4:处理事件 */
while (1)
{
nfds = epoll_wait(epfd, events, 20, 500); //等待epoll事件的发生,最大20个,超时500ms
for (i = 0; i < nfds; ++i) //处理所发生的所有事件
{
if (events[i].data.fd == listenfd) //如果新监测到一个SOCKET用户连接到了绑定的SOCKET端口,建立新的连接。
{
connfd = accept(listenfd, (sockaddr *)&clientaddr, &clilen);
char *str = inet_ntoa(clientaddr.sin_addr);
cout << "accapt a connection from " << str << endl;
ev.data.fd = connfd; //设置用于读操作的文件描述符
ev.events = EPOLLIN | EPOLLET; //设置用于注测的读操作事件
epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev); //注册ev
}
else if (events[i].events & EPOLLIN) //如果是已经连接的用户,并且收到数据,那么进行读入。
{
cout << "EPOLLIN" << endl;
int n = read(sockfd, line, MAXLINE))//传送到line中,返回n传送的字节数
line[n] = '/0';
cout << "read " << line << endl;
ev.data.fd = sockfd; //设置用于写操作的文件描述符
ev.events = EPOLLOUT | EPOLLET; //设置用于注测的写操作事件
epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev); //修改sockfd上要处理的事件为EPOLLOUT
}
else if (events[i].events & EPOLLOUT) // 如果有数据发送
{
sockfd = events[i].data.fd;
write(sockfd, line, n);
ev.data.fd = sockfd; //设置用于读操作的文件描述符
ev.events = EPOLLIN | EPOLLET; //设置用于注测的读操作事件
epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev); //修改sockfd上要处理的事件为EPOLIN
}
}
}