Post

协程梳理实践(三) -- sylar协程IO事件调度

梳理sylar协程中的IO事件调度

协程梳理实践(三) -- sylar协程IO事件调度

1. 引言

前文梳理了协程的基本实现 和 协程在多线程情况下的调度,只是搭起了一个架子,协程任务也比较简单,并未涉及到网络IO等跟业务关联紧密的操作。

本篇梳理的内容比较实用,通过协程中结合网络IO事件,能更高效利用系统资源,较大提升应用程序的性能。协程特别适合IO密集型的场景。

相关说明详见:

本篇涉及代码:fiber_lib/5iomanager

说明:本博客作为个人学习实践笔记,可供参考但非系统教程,可能存在错误或遗漏,欢迎指正。若需系统学习,建议参考原链接。

2. 实现总体说明

sylar中的IO协程调度基于epoll实现。相对于上篇多线程下的协程调度,在没有任务时,通过epoll_wait等待事件,避免没有必要的线程空转。

  • 封装的添加、删除接口 和 epoll_ctlEPOLL_CTL_ADDEPOLL_CTL_DEL操作相对应。
  • 关注的事件则归类简化为了 读(EPOLLIN写(EPOLLOUT事件。

总体流程如下:

sylar_io_coroutine_scheduler

  • IO协程调度类继承上篇中的调度类和一个定时器。支持epoll和定时器两个途径来产生任务,任务可以是协程或者函数对象,协程调度时都是包装为协程来进行resume()执行的。
  • 使用idle()tickle()来避免没有任务情况下的空转,通过epoll_wait来进行等待,有两种情况会触发epoll事件:
    • 1)当有用户注册的fd,如网络socket)有数据读写,epoll会触发事件,run调度处理中会进行事件处理,其中根据读、写不同,调用不同上下文进行任务添加;
    • 2)通过pipe管道来进行通知,当任务队列里还有任务没一次性处理完,通过tickle()向管道发送数据来触发epoll_wait

epoll的使用流程和项目中的应用,之前在好几篇历史博文里都梳理过了,可作回顾:

3. 调度逻辑梳理

3.1. IO调度类定义

在前面的class Scheduler中,像start()/stop()tickle()run()idle()等很多成员都定义为了virtual虚函数,是为了显式地提示这部分成员函数可被子类重载

本篇的主角IOManager调度类,就是继承自Scheduler,其定义截取部分内容如下:

  • 其中包含的fd上下文信息:FdContext,会设置给epoll_event的私有数据指针。
    • FdContext里面针对读和写事件,分别定义了一个事件上下文成员(EventContext类型)
    • 各自包含三部分:协程调度器指针、协程、事件回调函数。事件一般是协程 或者 函数 取其一。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// coroutine-lib/fiber_lib/5iomanager/ioscheduler.h
class IOManager : public Scheduler, public TimerManager 
{
    ...
private:
    // 私有数据,赋值给`struct epoll_event`中的私有数据指针:event.data.ptr
    struct FdContext
    {
        // 事件上下文定义,里面包含了3部分:协程调度器指针 + 协程 + 事件回调函数
        struct EventContext
        {
            // scheduler
            Scheduler *scheduler = nullptr;
            // callback fiber
            std::shared_ptr<Fiber> fiber;
            // callback function
            std::function<void()> cb;
        };
        // read event context
        EventContext read;
        // write event context
        EventContext write;
        int fd = 0;
        ...
    };
public:
    IOManager(size_t threads = 1, bool use_caller = true, const std::string &name = "IOManager");
    ~IOManager();

    // add one event at a time
    int addEvent(int fd, Event event, std::function<void()> cb = nullptr);
    // delete event
    bool delEvent(int fd, Event event);
    ...
protected:
    // 下面显式override的函数都重载掉
    void tickle() override;
    bool stopping() override;
    void idle() override;
    ...
private:
    // epoll句柄,epoll_create/epoll_create1 创建
    int m_epfd = 0;
    // fd[0] read,fd[1] write
    // pipe管道,用于idle通知。
    int m_tickleFds[2];
    std::atomic<size_t> m_pendingEventCount = {0};
    // C++17起才支持,读写锁
        // 通过 unique_lock<std::shared_mutex> w_lk(m_mutex) 定义写锁
        // 通过 shared_lock<std::shared_mutex> r_lk(m_mutex) 定义读锁
    std::shared_mutex m_mutex;
    // store fdcontexts for each fd
    std::vector<FdContext *> m_fdContexts;
};

上面的m_tickleFdspipe创建的管道,pipe管道是半双工的,1个管道需要2个文件fd,一读一写。POSIX标准写了可以 fd[1]写,fd[0],没有明确是否可以fd[0]写,fd[1]读,使用时需统一遵循fd[1]fd[0]读。

man手册中就是遵循POSIX标准给的说明:

1
2
3
4
5
6
7
# Manual page pipe(3p)
PIPE(3P)                          POSIX Programmer's Manual                         PIPE(3P)
       Data can be written to the file descriptor fildes[1] and read from the file  descrip‐
       tor  fildes[0].  A read on the file descriptor fildes[0] shall access data written to
       the file descriptor fildes[1]  on  a  first-in-first-out  basis.  It  is  unspecified
       whether  fildes[0]  is  also  open for writing and whether fildes[1] is also open for
       reading.

3.2. IOManager类构造

其中工作:

  • 1)初始化epoll句柄;
  • 2)创建好pipe管道;
  • 3)注册读管道(fd[0]用于读)的读事件;
  • 4)父类初始化调度线程池。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// coroutine-lib/fiber_lib/5iomanager/ioscheduler.cpp
// IO调度构造函数
IOManager::IOManager(size_t threads, bool use_caller, const std::string &name): 
Scheduler(threads, use_caller, name), TimerManager()
{
    // create epoll fd
    // 参数自Linux 2.6.8会被忽略,但要>0
    m_epfd = epoll_create(5000);
    assert(m_epfd > 0);

    // create pipe
    // 创建一个pipe管道时,需要2个文件fd,一读一写。一般是fd[1]写,fd[0]读
    int rt = pipe(m_tickleFds);
    assert(!rt);

    // add read event to epoll
    epoll_event event;
    // 注册读事件,且使用边缘触发(来事件后需一次性读取完对应数据)
    event.events  = EPOLLIN | EPOLLET; // Edge Triggered
    event.data.fd = m_tickleFds[0];

    // non-blocked
    rt = fcntl(m_tickleFds[0], F_SETFL, O_NONBLOCK);
    assert(!rt);
    // 此处注册的句柄为pipe的 fd[0],用于读
    rt = epoll_ctl(m_epfd, EPOLL_CTL_ADD, m_tickleFds[0], &event);
    assert(!rt);

    // 初始化 FdContext数组
    contextResize(32);

    // 这里的start()没有在IOManager类中重载,用的是父类Scheduler中的函数实现。
    // 里面会初始化线程池,创建threads个线程都用于协程调度
    start();
}

3.3. epoll事件注册:addEvent

事件注册,传入需要注册的fd和事件,向epoll里进行注册(添加或修改)。几点说明:

  • FdContext *fd_ctx指针会设置给 epoll_event 中的私有数据指针
    • 可以选择cb传入函数对象,若不传入则会创建一个协程,记录在EventContext上下文中
  • 此处注册事件时,epoll的触发模式也是EPOLLET(上述构造注册fd[0]管道的事件也是)
  • 其中涉及std::shared_mutex读写锁(共享锁/独占锁)的用法,需要C++17才支持
    • unique_lock<std::shared_mutex> w_lk(m_mutex) 定义写锁
    • shared_lock<std::shared_mutex> r_lk(m_mutex) 定义读锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// coroutine-lib/fiber_lib/5iomanager/ioscheduler.cpp
int IOManager::addEvent(int fd, Event event, std::function<void()> cb) 
{
    // attemp to find FdContext
    FdContext *fd_ctx = nullptr;
    
    // 读锁
    std::shared_lock<std::shared_mutex> read_lock(m_mutex);
    if ((int)m_fdContexts.size() > fd) 
    {
        // fd作为数组下标,好处是便于索引查找,不过没使用的fd下标存在一些浪费
        fd_ctx = m_fdContexts[fd];
        // 解除读锁。加锁只为了访问 m_fdContexts
        read_lock.unlock();
    }
    else
    {
        // 先解除上面的读锁
        read_lock.unlock();
        // 写锁
        std::unique_lock<std::shared_mutex> write_lock(m_mutex);
        // fd作下标超出vector容量,则根据 fd*1.5 来扩容,而不是之前的capacity
        contextResize(fd * 1.5);
        fd_ctx = m_fdContexts[fd];
    }

    // fd上下文整体加互斥锁
    std::lock_guard<std::mutex> lock(fd_ctx->mutex);
    
    // the event has already been added
    if(fd_ctx->events & event)
    {
        return -1;
    }

    // add new event
    // 原来的事件不是NONE(0),则op是修改,按位或增加本次要注册的事件
    int op = fd_ctx->events ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
    epoll_event epevent;
    // 边缘触发模式
    epevent.events   = EPOLLET | fd_ctx->events | event;
    epevent.data.ptr = fd_ctx;

    // 事件注册
    int rt = epoll_ctl(m_epfd, op, fd, &epevent);
    if (rt) 
    {
        std::cerr << "addEvent::epoll_ctl failed: " << strerror(errno) << std::endl; 
        return -1;
    }

    // 注册的事件计数+1
    ++m_pendingEventCount;

    // update fdcontext
    // 更新成 Event 里限定的3个枚举(无事件、读、写),去掉了前面 按位| 的边缘触发标志
    fd_ctx->events = (Event)(fd_ctx->events | event);

    // update event context
    // 根据读写类型获取对应的 FdContext,设置其信息:调度类指针 和 协程/回调函数
    // fd_ctx指针设置给了上述 epoll_event 中的私有数据指针,只是个指针。此处更新fd_ctx指向结构的内容,前后顺序没影响
    FdContext::EventContext& event_ctx = fd_ctx->getEventContext(event);
    assert(!event_ctx.scheduler && !event_ctx.fiber && !event_ctx.cb);
    event_ctx.scheduler = Scheduler::GetThis();
    if (cb) 
    {
        // 如果传入了函数对象,则记录在EventContext中
        event_ctx.cb.swap(cb);
    } 
    else 
    {
        // 没传函数则创建一个新协程(新协程默认是RUNNING),并记录在EventContext中
        event_ctx.fiber = Fiber::GetThis();
        assert(event_ctx.fiber->getState() == Fiber::RUNNING);
    }
    return 0;
}

3.4. 调度流程

IOManager里没有重载父类Scheduler中的run(),因此调度类线程池中各调度线程的线程函数还是Scheduler::run()

具体逻辑可见上篇中的 调度处理:run()。这里贴一下上篇的流程图:

sylar-coroutine-schedule

差异比较大的是tickle()idle()成员函数,在IOManager类中进行了重载实现。

1、tickle()中判断有idle线程(空闲线程)时,向pipe管道的fd[1]发送(write)一个消息,而idle()里会进行消息接收方的处理。

1
2
3
4
5
6
7
8
9
10
11
// coroutine-lib/fiber_lib/5iomanager/ioscheduler.cpp
void IOManager::tickle() 
{
    // no idle threads
    if(!hasIdleThreads()) 
    {
        return;
    }
    int rt = write(m_tickleFds[1], "T", 1);
    assert(rt == 1);
}

2、如上面调度流程图所示,没有任务时,通过idle协程进行resume()操作,执行的是idle协程绑定的idle()函数,此处即重载后的IOManager::idle()

  • idle()里面结合了定时器做epoll_wait,定时器说明见下小节。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
void IOManager::idle()
{
    static const uint64_t MAX_EVNETS = 256;
    // 创建临时的epoll_event数组(没用vector<epoll_event>方式),用于接收epoll_wait返回的就绪事件,每次最大256个
    std::unique_ptr<epoll_event[]> events(new epoll_event[MAX_EVNETS]);

    while(true)
    {
        // blocked at epoll_wait
        int rt = 0;
        // 此处while循环为了结合定时器做超时检查,等待epoll事件超时触发,有触发则break此处的while(true)
        while(true)
        {
            static const uint64_t MAX_TIMEOUT = 5000;
            // 返回堆中最近的超时时间,还有多少ms到期(set里第一个成员时间最小,最先到期)
            uint64_t next_timeout = getNextTimer();
            next_timeout = std::min(next_timeout, MAX_TIMEOUT);

            // 获取events原始指针,接收epoll触发的事件。此处阻塞等待事件发生,避免idle协程空转
            rt = epoll_wait(m_epfd, events.get(), MAX_EVNETS, (int)next_timeout);
            // EINTR -> retry
            if(rt < 0 && errno == EINTR)
            {
                continue;
            }
            else
            {
                // 只要有任何事件通知就break出小循环
                break;
            }
        }

        // 既然有超时触发的事件,此处捞取超时定时器的回调函数
        std::vector<std::function<void()>> cbs;
        listExpiredCb(cbs);
        if(!cbs.empty()) 
        {
            // 把这些回调函数都加入到协程调度器的任务队列里
            for(const auto& cb : cbs) 
            {
                // 如果是第一次添加任务,则会tickle()一次:其中会向管道的fd[1]进行一次write(fd[0]就可以收到epoll读事件)
                scheduleLock(cb);
            }
            cbs.clear();
        }

        // 处理epoll_wait获取到的事件
        for (int i = 0; i < rt; ++i) 
        {
            epoll_event& event = events[i];
            // pipe管道,则做read,由于是边缘触发,此处while处理。虽然fd[1] write时也只是写了1个字符。
            if (event.data.fd == m_tickleFds[0]) 
            {
                uint8_t dummy[256];
                // edge triggered -> exhaust
                while (read(m_tickleFds[0], dummy, sizeof(dummy)) > 0);
                continue;
            }
            // other events
            FdContext *fd_ctx = (FdContext *)event.data.ptr;
            std::lock_guard<std::mutex> lock(fd_ctx->mutex);
            ...
            // 事件只保留读和写类型
            int real_events = NONE;
            if (event.events & EPOLLIN) 
            {
                real_events |= READ;
            }
            if (event.events & EPOLLOUT) 
            {
                real_events |= WRITE;
            }
            ...
            // 根据类型触发相应的事件回调处理
            if (real_events & READ) 
            {
                fd_ctx->triggerEvent(READ);
                --m_pendingEventCount;
            }
            if (real_events & WRITE) 
            {
                fd_ctx->triggerEvent(WRITE);
                --m_pendingEventCount;
            }
        } // end for

        Fiber::GetThis()->yield();
    } // end while(true)
}

3.5. 定时器说明

定时器通过std::set来模拟最小堆,set管理的类重载了operator(),从小到大,即第一个元素最小,begin()当做堆顶元素(最小)。

  • getNextTimer() 返回堆中最近的超时时间,还有多少ms到期(set里第一个成员时间最小,最先到期)。
  • listExpiredCb 取出所有超时定时器的回调函数
    • 如果定时器支持循环利用,则重置超时时间后重新加入到管理器中;
    • 否则超时的定时器会从m_timersstd::set模拟的堆)中移除。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// coroutine-lib/fiber_lib/5iomanager/timer.h
class TimerManager 
{
    // 声明 Timer 是 TimerManager 的友元(Timer 可以访问 TimerManager 的私有成员)
    friend class Timer;
public:
    TimerManager();
    virtual ~TimerManager();
    ...
    // 添加timer
    std::shared_ptr<Timer> addTimer(uint64_t ms, std::function<void()> cb, bool recurring = false);
    // 拿到堆中最近的超时时间
    uint64_t getNextTimer();
    // 取出所有超时定时器的回调函数
    void listExpiredCb(std::vector<std::function<void()>>& cbs);
    ...
protected:
    ...
    // 添加timer
    void addTimer(std::shared_ptr<Timer> timer);
private:
    ...
    // 时间堆。根据超时时间由小到大排序
    // 此处用set模拟堆,begin作为堆顶(最小堆,堆顶是最小元素)。而没有用std::priority_queue优先级队列
    std::set<std::shared_ptr<Timer>, Timer::Comparator> m_timers;
    ...
};

class Timer : public std::enable_shared_from_this<Timer> 
{
    // 声明 TimerManager 是 Timer 的友元(TimerManager 可以访问 Timer 的私有成员)
    friend class TimerManager;
    ...
private:
    // 此处构造定义为了私有,可通过友元类(friend)TimerManager来访问
    Timer(uint64_t ms, std::function<void()> cb, bool recurring, TimerManager* manager);
private:
    ...
    // 超时时触发的回调函数
    std::function<void()> m_cb;
    ...
private:
    // 实现最小堆的比较函数
    struct Comparator 
    {
        // 实现中是由小到大:lhs->m_next < rhs->m_next。默认情况就是`std::less`,也是从小到大升序排序。
        bool operator()(const std::shared_ptr<Timer>& lhs, const std::shared_ptr<Timer>& rhs) const;
    };
};

TimerManager声明为了Timerfriend类,才能使用到Timer类的私有构造类:

1
2
3
4
5
6
7
// coroutine-lib/fiber_lib/5iomanager/timer.cpp
std::shared_ptr<Timer> TimerManager::addTimer(uint64_t ms, std::function<void()> cb, bool recurring) 
{
    std::shared_ptr<Timer> timer(new Timer(ms, cb, recurring, this));
    addTimer(timer);
    return timer;
}

4. 基本运行

编译运行coroutine-lib中的基本示例。为了避免日志打印错误,暂时新增一个mutex用于日志互斥打印。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// coroutine-lib/fiber_lib/5iomanager/main.cpp
// 临时新增仅用于打印
std::mutex mutex_cout;

void func()
{
    recv(sock, recv_data, 4096, 0);
    std::cout << recv_data << std::endl << std::endl;
}

const char data[] = "GET / HTTP/1.0\r\n\r\n"; 
void func2()
{
    send(sock, data, sizeof(data), 0);
}

int main(int argc, char const *argv[])
{
    IOManager manager(2);

    sock = socket(AF_INET, SOCK_STREAM, 0);

    sockaddr_in server;
    server.sin_family = AF_INET;
    server.sin_port = htons(80);  // HTTP 标准端口
    // 百度网站的ip
    server.sin_addr.s_addr = inet_addr("103.235.46.96");

    fcntl(sock, F_SETFL, O_NONBLOCK);

    connect(sock, (struct sockaddr *)&server, sizeof(server));
    
    // 发送 GET请求
    manager.addEvent(sock, IOManager::WRITE, &func2);
    manager.addEvent(sock, IOManager::READ, &func);
    
    {
        std::lock_guard<std::mutex> lk(mutex_cout);
        std::cout << "event has been posted\n\n";
    }

    // 等待一会,防止主线程退出提前触发调度线程等的析构,影响流程
    sleep(1);
    
    return 0;
}

运行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
[root@xdlinux ➜ 5iomanager git:(main)]$ g++ *.cpp -o test -pthread -std=c++17
[root@xdlinux ➜ 5iomanager git:(main)]$ ./test
Scheduler::Scheduler() success
# 创建调度线程1
Schedule::run() starts in thread: 499525
Scheduler::start() success
IOManager::idle(),run in thread: 499525
# 线程1很快就已经调度处理了请求
event has been posted

IOManager::idle(),run in thread: 499525
IOManager::idle(),run in thread: 499525
# 应答信息
HTTP/1.0 200 OK
Accept-Ranges: bytes
Cache-Control: no-cache
Content-Length: 29506
Content-Type: text/html
    ...
    <link
        rel="search"
        type="application/opensearchdescription+xml"
        href="//www.baidu.com/content-search.xml"
        title="百度搜索"
    />
    <title>百度一下,你就知道</title>
    <style type="text/css">
    ...

IOManager::idle(),run in thread: 499525
IOManager::idle(),run in thread: 499525
# 触发析构,先stop线程2?(不是还没创建?)
  # 这是由于main中,IOManager定义时设置了第一个参数为2,2个线程,
  # 而主线程自身也作为调度线程(use_caller默认true),因此只创建一个新线程。执行结束即触发析构
Schedule::stop() starts in thread: 499524
# 调度线程2(其实是主线程,也作为协程调度线程)
Schedule::run() starts in thread: 499524
IOManager::idle(),run in thread: 499525
name = IOManager idle exits in thread: 499525
Schedule::run() ends in thread: 499525
IOManager::idle(),run in thread: 499524
name = IOManager idle exits in thread: 499524
Schedule::run() ends in thread: 499524
m_schedulerFiber ends in thread:499524
Schedule::stop() ends in thread:499524
Scheduler::~Scheduler() success

5. 小结

梳理sylar中结合epoll和定时器之后的协程调度逻辑。限于篇幅,标准库和系统API hook的梳理还是作为下篇单独梳理。

6. 参考

This post is licensed under CC BY 4.0 by the author.