Post

协程梳理实践(五) -- libco、boost.coroutine等协程库简析

简要梳理libco、boost.coroutine2等开源协程库实现逻辑

协程梳理实践(五) -- libco、boost.coroutine等协程库简析

1. 引言

前几篇了解了协程的基本概念和模块,并梳理学习了sylar中的协程实现,本篇中对几个知名协程库的流程进行梳理说明。

协程库和相关链接:

说明:本博客作为个人学习实践笔记,可供参考但非系统教程,可能存在错误或遗漏,欢迎指正。若需系统学习,建议参考原链接。

2. libco协程库简析

libco中实现了有栈协程。相关核心机制,和之前 协程梳理实践 中梳理sylar协程时基本类似,不过libco更为完善,支持共享栈协程间可嵌套调用。核心机制,如:协程yield/resume操作、协程调度、系统调用hook等。

2.1. 项目文件结构

项目中的文件并不多:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
[MacOS-xd@qxd ➜ libco git:(master) ]$ ll
total 344
-rw-r--r--  1 xd  staff   1.7K  6 17 22:24 CMakeLists.txt
-rw-r--r--  1 xd  staff    19K  6 17 22:24 LICENSE.txt
-rw-r--r--  1 xd  staff   2.3K  6 17 22:24 Makefile
-rw-r--r--  1 xd  staff   2.2K  6 17 22:24 README.md
-rw-r--r--  1 xd  staff   2.2K  6 17 22:24 co.mk
-rw-r--r--  1 xd  staff   3.0K  6 17 22:24 co_closure.h
-rw-r--r--  1 xd  staff   6.5K  6 17 22:24 co_epoll.cpp
-rw-r--r--  1 xd  staff   2.1K  6 17 22:24 co_epoll.h
-rw-r--r--  1 xd  staff    23K  6 17 22:24 co_hook_sys_call.cpp

# 协程实现
-rw-r--r--  1 xd  staff    24K  6 17 22:24 co_routine.cpp
-rw-r--r--  1 xd  staff   2.4K  6 17 22:24 co_routine.h
# 定义协程结构体:stCoRoutine_t
-rw-r--r--  1 xd  staff   2.2K  6 17 22:24 co_routine_inner.h

-rw-r--r--  1 xd  staff   2.2K  6 17 22:24 co_routine_specific.h
# 协程上下文结构定义
-rw-r--r--  1 xd  staff   2.9K  6 17 22:24 coctx.cpp
-rw-r--r--  1 xd  staff   1.1K  6 17 22:24 coctx.h
-rw-r--r--  1 xd  staff   2.0K  6 17 22:24 coctx_swap.S
# 几个
-rw-r--r--  1 xd  staff   1.9K  6 17 22:24 example_xxx.cpp
...

其中:

  • co_routine.h/cpp 中为协程API实现
  • co_routine_inner.h中定义 stCoRoutine_t 协程结构体
  • libco/coctx.h中定义 coctx_t 协程上下文结构
    • coctx_swap.S里基于汇编实现了协程的上下文切换函数
  • co_epoll.h/cpp 封装了epollkqueue
  • co_hook_sys_call.cpp 对系统api的hook封装,使用dlsym函数获取原始系统调用的地址

stCoRoutine_t协程结构定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// libco/co_routine_inner.h
// 协程结构定义
struct stCoRoutine_t
{
    stCoRoutineEnv_t *env;  // 协程所在的运行环境,可以理解为,该协程所属的协程管理器
    
    pfn_co_routine_t pfn; // 协程所对应的函数
    void *arg; // 函数参数
    coctx_t ctx; // 协程上下文,包括寄存器和栈
 
    // 以下用char表示了bool语义,节省空间
    char cStart;          // 是否已经开始运行了
    char cEnd;            // 是否已经结束
    char cIsMain;         // 是否是主协程
    char cEnableSysHook;  // 是否要打开钩子标识,默认是关闭的
    char cIsShareStack;   // 是否要采用共享栈

    void *pvEnv;

    //char sRunStack[ 1024 * 128 ];
    stStackMem_t* stack_mem; // 栈内存

    //save satck buffer while confilct on same stack_buffer;
    char* stack_sp; 
    unsigned int save_size; // save_buffer的长度
    char* save_buffer; // 当协程挂起时,栈的内容会栈暂存到save_buffer中

    stCoSpec_t aSpec[1024];
};

2.2. 使用方式

先说明下部分核心API:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// libco/co_routine.h
/**
* 创建一个协程对象
* 
* @param ppco - (output) 创建的协程对象。传入指针(stCoRoutine_t *)的地址,该指针会指向本函数中创建的协程对象
* @param attr - (input) 协程属性,目前主要是共享栈 
* @param pfn - (input) 协程所运行的函数
* @param arg - (input) 协程运行函数的参数
*/
int     co_create( stCoRoutine_t **co,const stCoRoutineAttr_t *attr,void *(*routine)(void*),void *arg );
// 恢复协程co,即切换到co协程
void    co_resume( stCoRoutine_t *co );
// 挂起
void    co_yield( stCoRoutine_t *co );
...

// 获取当前协程
stCoRoutine_t *co_self();

/**
* 大部分的sys_hook都需要用到这个函数来把事件注册到epoll中。具体实现在`co_poll_inner`函数中。
* 
* @param ctx epoll上下文
* @param fds[] fds 要监听的文件描述符 原始poll函数的参数,
* @param nfds  nfds fds的数组长度 原始poll函数的参数
* @param timeout_ms timeout 等待的毫秒数 原始poll函数的参数
*/
int     co_poll( stCoEpoll_t *ctx,struct pollfd fds[], nfds_t nfds, int timeout_ms );
/*
* libco的核心调度
* 在此处调度三种事件:
* 1. 被hook的io事件,该io事件是通过co_poll_inner注册进来的
* 2. 超时事件
* 3. 用户主动使用poll的事件
* 所以,如果用户用到了三种事件,必须得配合使用co_eventloop
*
* @param ctx epoll管理器
* @param pfn 每轮事件循环的最后会调用该函数
* @param arg pfn的参数
*/
void    co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg );

使用方式,步骤如下:

  • 1、co_create创建协程。其声明如上所述,创建时指定协程处理函数。
    • 比如:co_create( &co,NULL,readwrite_routine, &endpoint);
  • 2、协程切换,调用co_resume
  • 3、调用co_eventloop,开始协程调度

示例:以libco里的example_echocli.cpp做说明。

  • 使用方式 ./example_echosvr 127.0.0.1 10000 100 50,各参数为:服务端ip、端口、协程数(每个子进程都创建该数量的协程)、子进程数
  • 创建50个子进程,每个子进程里创建100个协程,而后每个子进程开始协程调度(各自包含一个协程调度器)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// libco/example_echocli.cpp
int main(int argc, char *argv[])
{
    stEndPoint endpoint;
    endpoint.ip = argv[1];
    endpoint.port = atoi(argv[2]);
    int cnt = atoi( argv[3] );
    int proccnt = atoi( argv[4] );
    ...
    for(int k=0; k<proccnt; k++) {
        pid_t pid = fork();
        if(pid > 0) {
            // 父进程
            continue;
        }
        else if(pid < 0) {
            break;
        }
        // 下面是每个子进程都会进行的操作
        for(int i=0; i<cnt; i++) {
            stCoRoutine_t *co = 0;
            // 创建协程
            co_create( &co, NULL, readwrite_routine, &endpoint);
            co_resume( co );
        }
        // 开始协程调度,每个子进程都有一个协程调度器
        co_eventloop( co_get_epoll_ct(),0,0 );

        exit(0);
    }
    return 0;
}

协程处理函数如下:

  • 其中socketconnectclosewriteread等网络api接口都进行了hook封装,相关同步接口hook成了基于epoll的异步事件处理
    • 并使用dlsym函数获取原始函数地址。具体可见co_hook_sys_call.cpp的实现。
  • connect连接服务端,连接成功后write发送8字节数据,而后进行read接收,这3个接口都在epoll事件循环中进行处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// libco/co_hook_sys_call.cpp
static void *readwrite_routine( void *arg ) {
    co_enable_hook_sys();
    stEndPoint *endpoint = (stEndPoint *)arg;
    ...
    int fd = -1;
    for(;;) {
        // 若连接返回`EALREADY`或`EINPROGRESS`,则继续重连
        if ( fd < 0 ) {
            // 里面对原生socket做了hook,使用dlsym获取原有函数地址
            fd = socket(PF_INET, SOCK_STREAM, 0);
            struct sockaddr_in addr;
            SetAddr(endpoint->ip, endpoint->port, addr);
            // 句柄注册到epoll中(其中调用`co_poll_inner`),注册POLLOUT事件
            ret = connect(fd,(struct sockaddr*)&addr,sizeof(addr));
            if ( errno == EALREADY || errno == EINPROGRESS ) {
                struct pollfd pf = { 0 };
                pf.fd = fd;
                pf.events = (POLLOUT|POLLERR|POLLHUP);
                // 其中调用`co_poll_inner`,注册POLLOUT事件
                co_poll( co_get_epoll_ct(),&pf,1,200);
                //check connect
                int error = 0;
                uint32_t socklen = sizeof(error);
                errno = 0;
                ret = getsockopt(fd, SOL_SOCKET, SO_ERROR,(void *)&error,  &socklen);
                if ( ret == -1 || error) {
                    ...
                    continue;
                }
            } 
        }
        
        // hook写,其中会注册POLLOUT事件
        ret = write( fd,str, 8);
        if ( ret > 0 ) {
            // hook读,其中注册POLLIN事件
            ret = read( fd, buf, sizeof(buf) );
            ...
        }
        ...
    }
    return 0;
}

3. boost.coroutine

TODO

4. 小结

5. 参考

This post is licensed under CC BY 4.0 by the author.