通过上下两篇详细讲述EPOLL的是原理及使用方法,上篇主要针对基本概念和基础用法

1. 概念

1.1 基本概念解释

流(stream)

指计算机中顺序读写字节序列。对应于现实中的输入模型(磁带、纸带,他们都是按顺序从头开始)

流是能统一描述所有常见输入输出类型的模型,包括文件、键盘、显示器等等,能够通过输入输出重定向(不使用标准IO函数,编写相应的读写函数,处理文件、键盘等模型)。相较于随机读写,流模型能兼容不同的读写介质,提高读写效率

缓冲区(Buffer)

假设有一个管道,A端为输入,B端为输出。

  1. 一开始管道是空的,对应内核缓冲区为空,B被阻塞。
  2. 当A开始写入数据时,缓冲区非空,B可能还是读取也可能继续休息。
  3. 若A写满了缓冲区,则A被阻塞,等待B读取数据。
  4. 若B读取所有数据,缓冲区为空,则B又被阻塞。

总结起来就是:

  • 缓冲区空:两种可能,A没开始输入,B读取完毕所有数据,两种情况下B都被阻塞
  • 缓冲区非满:A开始写入数据,B可能读取也可能休息
  • 缓冲区满:A不能再写入,被阻塞,等待B醒来。B读取时,A可能接着写入可能继续休息。

1.2 四种事件处理的方式

阻塞IO:一个线程只能处理一个流的I/O事件。除非采用多线程,否则效率很低。

非阻塞忙轮询IO:即Non-Blocking Busy Polling,等待某个事件的时候,放弃其他事情,休息,专门等待,称之为阻塞等待过程中不休息,不断询问事件是否完成,称之为非阻塞忙轮询。可以同时处理多个流,但需要从头到尾轮询,浪费资源。

Select:相当于一位代理,帮我们观察流。但这位代理只会告诉我们此刻是否有IO事件发生,我们却不知道是哪些流,只能无差别轮询。

Epoll:即event poll,不同于无差别轮询,epoll会把哪个流发生什么样的事情通知我们。

1.3 Select/Poll的缺点

  1. 单个进程能够监视的文件描述为最大为1024,轮询越多,性能越差
  2. Select需要复制大量句柄数据结构,开销巨大。
  3. Select返回整个句柄数组,程序需要遍历数组才能知道哪些句柄发生了什么事件。
  4. 触发方式为水平触发,如果程序没有对一个就绪的文件进行IO操作,之后每次Select调用还是会将文件描述符通知给进程。

Poll使用链表保存文件,没有1的限制,但其他三个缺点依然比较明显。

2. Epoll模型

2.1 Epoll机制解析

Epoll也就是events poll,它分为三个部分实现:

  • 调用epoll_creat建立一个epoll对象
  • 调用epoll_ctl向epoll对象中添加众多套接字
  • 调用epoll_wait收集发生的事件的连接

某一进程调用epoll_creat时,Linux内核会创建一个eventpoll结构体。结构体中存在两个成员:

  • 红黑树的根节点,存储epoll需要监控的事件
  • 双链表的头部,存储epoll_wait返回给用户的满足条件的事件

用户通过epoll_ctl向epoll对象中注册事件,这些事件会挂载到红黑树中,能高效识别重复事件。所有注册的事件都会和驱动程序建立回调关系,事件发生时调用这个回调方法,将发生的事件添加到双链表rdlist中。

当调用epoll_wait检查事件发生时,只需要检查双链表rdlist是否有元素即可。通过epoll_ctl把所有事件传入内核,一起wait,避免了不必要的重复拷贝

当某一进程调用epoll_create方法时,Linux内核会创建一个eventpoll结构体:

struct eventpoll{
    ....
    /*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
    struct rb_root  rbr;
    /*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
    struct list_head rdlist;
    ....
};

每次调用epoll_create方法都会创建一个epoll对象,每一个对象都有一个独立的eventpoll结构体,用于存放通过epoll_ctl方法向epoll对象中添加进来的事件。在epoll中,对于每一个事件,都会建立一个epitem结构体,如下所示:

struct epitem{
    struct rb_node  rbn;//红黑树节点
    struct list_head    rdllink;//双向链表节点
    struct epoll_filefd  ffd;  //事件句柄信息
    struct eventpoll *ep;    //指向其所属的eventpoll对象
    struct epoll_event event; //期待发生的事件类型
}

总结一下,epoll内部有两个重要的结构体:

  • eventpoll:每个epoll都有一个,维护一个红黑树和双向链表,ctl注册的事件挂载到红黑树,发生的事件挂载到链表。
  • epitem:每个事件都有一个,记载了事件在红黑树和链表中的位置,以及事件的属性。

2.2 Epoll使用方法

1.创建Epoll句柄

int epfd = epoll_create(intsize);

创建一个epoll句柄,size用来告诉内核这个监听的数目有多大。注意:当创建好epoll句柄后,它就是会占用一个fd值,所以使用完epoll以后,需要及时调用close()关闭,否则会导致fd耗尽

2.注册或修改监听事件

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

参数:

  • epfd,之前creat的句柄
  • op,动作类型
    • EPOLL_CTL_ADD:注册新的fd到epfd中
    • EPOLL_CTL_MOD:修改已经注册的fd的监听事件
    • EPOLL_CTL_DEL:从epfd中删除一个fd
  • fd,需要监听的文件句柄
  • event,需要监听的事件

调用成功返回0,不成功返回-1。以下是epoll_event结构体

// 保存触发事件的某个文件描述符相关的数据
typedef union epoll_data {
    void *ptr;
    int fd;
    __uint32_t u32;
    __uint64_t u64;
} epoll_data_t;

// 注册所感兴趣的事件和回传所发生待处理的事件
struct epoll_event {
    __uint32_t events; // Epoll events
    epoll_data_t data; // User data variable
};

其中events包含了如下的宏集合:

  • EPOLLIN:表示对应的文件描述符可读(包括对端Socket)
  • EPOLLOUT:表示对应的文件描述符可写
  • EPOLLPRI:表示对应的文件描述符有紧急数据可读(带外数据)
  • EPOLLERR:表示对应的文件描述符发生错误;
  • EPOLLHUP:表示对应的文件描述符被挂断;
  • EPOLLET:将EPOLL设为边缘触发(Edge Triggered),这是相对于水平触发(Level Triggered)而言的。

边缘触发和水平触发的区别是:

  • 水平触发LT:默认方式,支持阻塞和非阻塞。内核告诉你某个fd就位了,如果你不对这个fd进行IO操作,内核会一直通知你。所以这种模式安全性较高
  • 边缘触发ET只支持非阻塞内核只会通知你一次,如果你不操作他也不管你了,速度快

3.等待事件触发

int epoll_wait(int epfd, struct epoll_event * events, intmaxevents, int timeout);

参数:

  • events是分配好的epoll_event结构体数组,epoll将会把发生的事件赋值到events数组中。
  • maxevents告诉内核这个events数组有多大,这个maxevents的值不能大于创建epoll_create时的size。
  • 参数timeout是超时时间(毫秒)。

该函数返回需要处理的事件数目,如返回0表示已超时。

等侍注册在epfd上的socket fd的事件的发生.

  • 如果发生则将发生的socket fd和事件类型放入到events数组中,并将注册在epfd上的socket fd的事件类型给清空
  • 如果下一个循环还要关注这个socket fd的话,则需要用epoll_ctl(epfd,EPOLL_CTL_MOD,listenfd,&ev)重新设置socket fd的事件类型。这时不用EPOLL_CTL_ADD,因为socket fd并未清空,只是事件类型清空。

3. 实战

使用epoll的基本逻辑如下。我们需要用到多个socket句柄,首先本地服务器肯定要创建一个socket,同时也需要将这个fd注册到epoll中。

实际代码如下,主要工作是初始化服务器socket,并将其注册到epoll中。

/*步骤1:设置socket */
struct sockaddr_in clientaddr;
struct sockaddr_in serveraddr;
listenfd = socket(AF_INET, SOCK_STREAM, 0);
setnonblocking(listenfd); //自己写的函数,把socket设置为非阻塞方式

/*步骤2:创建并设置epoll */
struct epoll_event ev, events[20];             //声明epoll_event结构体的变量,ev用于注册事件,数组用于回传要处理的事件
epfd = epoll_create(256);                      //生成用于处理accept的epoll专用的文件描述符
ev.data.fd = listenfd;                         //设置与要处理的事件相关的文件描述符
ev.events = EPOLLIN | EPOLLET;                 //设置要处理的事件类型
epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev); //注册epoll事件

/*步骤3:连接并监听网络 */
bzero(&serveraddr, sizeof(serveraddr)); //置零
serveraddr.sin_family = AF_INET;
char *local_addr = "127.0.0.1";
inet_aton(local_addr, &(serveraddr.sin_addr)); //将一个字符串IP地址转换为一个32位的网络序列IP地址
serveraddr.sin_port = htons(portnumber);       //portnumber自己设的端口号
bind(listenfd, (sockaddr *)&serveraddr, sizeof(serveraddr));
listen(listenfd, 20); //最大等待20个

客户端与服务器沟通分为两步:

  1. 客户端连接,事件的句柄fd是服务器的fd。
  2. 客户端收发数据,事件的句柄fd是客户端本身的socket句柄

代码如下:

/*步骤4:处理事件 */
while (1)
{
    nfds = epoll_wait(epfd, events, 20, 500); //等待epoll事件的发生,最大20个,超时500ms
    for (i = 0; i < nfds; ++i)                //处理所发生的所有事件
    {
        if (events[i].data.fd == listenfd) //如果新监测到一个SOCKET用户连接到了绑定的SOCKET端口,建立新的连接。
        {
            connfd = accept(listenfd, (sockaddr *)&clientaddr, &clilen);
            char *str = inet_ntoa(clientaddr.sin_addr);
            cout << "accapt a connection from " << str << endl;
            ev.data.fd = connfd;                         //设置用于读操作的文件描述符
            ev.events = EPOLLIN | EPOLLET;               //设置用于注测的读操作事件
            epoll_ctl(epfd, EPOLL_CTL_ADD, connfd, &ev); //注册ev
        }
        else if (events[i].events & EPOLLIN) //如果是已经连接的用户,并且收到数据,那么进行读入。
        {
            cout << "EPOLLIN" << endl;
            int n = read(sockfd, line, MAXLINE))//传送到line中,返回n传送的字节数
            line[n] = '/0';
            cout << "read " << line << endl;
            ev.data.fd = sockfd;                         //设置用于写操作的文件描述符
            ev.events = EPOLLOUT | EPOLLET;              //设置用于注测的写操作事件
            epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev); //修改sockfd上要处理的事件为EPOLLOUT
        }
        else if (events[i].events & EPOLLOUT) // 如果有数据发送
        {
            sockfd = events[i].data.fd;
            write(sockfd, line, n);
            ev.data.fd = sockfd;                         //设置用于读操作的文件描述符
            ev.events = EPOLLIN | EPOLLET;               //设置用于注测的读操作事件
            epoll_ctl(epfd, EPOLL_CTL_MOD, sockfd, &ev); //修改sockfd上要处理的事件为EPOLIN
        }
    }
}