凌云的博客

行胜于言

libco 协程库原理与实现

分类:C++| 发布时间:2019-05-16 19:48:00


概述

本文主要介绍 libco 协程库的原理与实现要点。

本文不会介绍协程是什么,以及如何使用 libco 编写程序。

本文主要分析 Linux 平台的 libco 实现。

libco 框架

libco 在框架分为三层,分别是接口层、系统函数 Hook 层以及事件驱动层。

事件驱动层

libco 在 co_epoll.h 和 co_epoll.cpp 里面封装了实现驱动的接口。

struct co_epoll_res
{
    int size;
    struct epoll_event *events;
    struct kevent *eventlist;
};

int     co_epoll_wait( int epfd,struct co_epoll_res *events,int maxevents,int timeout );
int     co_epoll_ctl( int epfd,int op,int fd,struct epoll_event * );
int     co_epoll_create( int size );
struct  co_epoll_res *co_epoll_res_alloc( int n );
void    co_epoll_res_free( struct co_epoll_res * );

Linux 平台下主要是简单封装了下 epoll 的相关接口

系统函数 Hook 层

libco 主要在 co_hook_sys_call.cpp 文件中使用 dlsym(RTLD_NEXT, …) 将一些相关的系统函数替换为自己的实现。

目前进行 Hook 的函数有:

  • socket
  • connect
  • close
  • read
  • write
  • sendto
  • recvfrom
  • send
  • recv
  • poll
  • setsockopt
  • fcntl
  • setenv
  • unsetenv
  • getenv
  • __res_state
  • gethostbyname
  • __poll

要启用 libco 的 Hook 函数需要在每个协程里面调用 void co_enable_hook_sys() 函数。

struct rpchook_t
{
	int user_flag;
	struct sockaddr_in dest; //maybe sockaddr_un;
	int domain; //AF_LOCAL , AF_INET

	struct timeval read_timeout;
	struct timeval write_timeout;
};
static rpchook_t *g_rpchook_socket_fd[ 102400 ] = { 0 };

libco 的 Hook 层会在 g_rpchook_socket_fd 全局变量里面记录每个 fd 的属性,在开源的版本里主要用到了: * user_flag 记录了原始的 flag * read_timeout write_timeout 读写的 timeout 时间

libco 里面在 user_flag 记录了原始的 flag,主要是用于区分用户的阻塞 I/O 和非阻塞 I/O,对于非阻塞 I/O,libco 会直接调用相应的系统函数进行处理,而不会使用 Hook 层的逻辑。

对于用户的阻塞 I/O,libco 会在 socket 和 open 的 Hook 函数里面将 fd 设置 O_NONBLOCK(通过 user_flag 可以知道用户是期望以阻塞方式进行操作的), 然后使用 poll 函数(这个函数也是被 Hook 掉的)探测 fd 是否可读/可写, libco 的 poll 函数会使用 co_poll_inner 进行探测,而 co_poll_inner 会将其注册到线程的 iEpollFd 然后让出时间片。

libco 接口层

libco 接口层主要提供了一些供用户使用的抽象接口,主要实现代码在 co_routine.h 和 co_routine.cpp:

struct stCoRoutine_t;
struct stShareStack_t;

struct stCoRoutineAttr_t
{
	int stack_size;
	stShareStack_t*  share_stack;
	stCoRoutineAttr_t() {
		stack_size = 128 * 1024;
		share_stack = NULL;
	}
}__attribute__ ((packed));

//2.co_routine
int co_create( stCoRoutine_t **co,const stCoRoutineAttr_t *attr,void *(*routine)(void*),void *arg );
void co_resume( stCoRoutine_t *co );
void co_yield( stCoRoutine_t *co );
void co_yield_ct(); //ct = current thread
void co_release( stCoRoutine_t *co );

stCoRoutine_t *co_self();

int	co_poll( stCoEpoll_t *ctx,struct pollfd fds[], nfds_t nfds, int timeout_ms );
void co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg );

co_create 用于创建协程,co_resume 用于恢复协程,co_yield 用于让出时间片,co_release 用于释放协程资源。 co_eventloop 用于主协程执行事件循环。

libco 协程的创建

libco 的协程是单调用链的,就是说一个线程内可以创建 N 个协程,协程总是由当前线程调用,一个线程只有一条调用链。

struct stCoRoutineEnv_t
{
	stCoRoutine_t *pCallStack[ 128 ];
	int iCallStackSize;
	stCoEpoll_t *pEpoll;

	//for copy stack log lastco and nextco
	stCoRoutine_t* pending_co;
	stCoRoutine_t* occupy_co;
};

static __thread stCoRoutineEnv_t* gCoEnvPerThread = NULL;

libco 使用 stCoRoutineEnv_t 结构来记录当前的调用链。 当前线程的调用链通过线程专有变量 gCoEnvPerThread 来保存,libco 内部会在第一次使用协程的时候初始化这个变量,我们看到 pCallStack 的大小 128,这意味着,我们最多可以在协程嵌套 co_resume 新协程的深度为 128(协程里面运行新的协程)。

下面我们来看看协程的创建:

struct stCoRoutineAttr_t
{
    int stack_size;
    stShareStack_t*  share_stack;
    stCoRoutineAttr_t()
    {
        stack_size = 128 * 1024;
        share_stack = NULL;
    }
}__attribute__ ((packed));

struct stCoRoutine_t
{
    stCoRoutineEnv_t *env;
    pfn_co_routine_t pfn;
    void *arg;
    coctx_t ctx;

    char cStart;
    char cEnd;
    char cIsMain;
    char cEnableSysHook;
    char cIsShareStack;

    void *pvEnv;

    //char sRunStack[ 1024 * 128 ];
    stStackMem_t* stack_mem;


    //save satck buffer while confilct on same stack_buffer;
    char* stack_sp;
    unsigned int save_size;
    char* save_buffer;

    stCoSpec_t aSpec[1024];
};

typedef void *(*pfn_co_routine_t)( void * );

int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg )
{
    if( !co_get_curr_thread_env() ) {
        co_init_curr_thread_env();
    }
    stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, pfn,arg );
    *ppco = co;
    return 0;
}

协程的创建逻辑相当简单,主要就是检查当前线程的 gCoEnvPerThread 变量是否已经初始化,若未则进行初始化。然后初始化相关的结构体。

void co_init_curr_thread_env()
{
    gCoEnvPerThread = (stCoRoutineEnv_t*)calloc( 1, sizeof(stCoRoutineEnv_t) );
    stCoRoutineEnv_t *env = gCoEnvPerThread;

    env->iCallStackSize = 0;
    struct stCoRoutine_t *self = co_create_env( env, NULL, NULL,NULL );
    self->cIsMain = 1;

    env->pending_co = NULL;
    env->occupy_co = NULL;

    coctx_init( &self->ctx );

    env->pCallStack[ env->iCallStackSize++ ] = self;

    stCoEpoll_t *ev = AllocEpoll();
    SetEpoll( env,ev );
}
stCoRoutineEnv_t *co_get_curr_thread_env()
{
    return gCoEnvPerThread;
}

co_init_curr_thread_env 在初始化 gCoEnvPerThread 的时候会创建一个主协程。

协程的运行

在创建协程后可以通过 co_resume 接口运行协程。

struct coctx_t
{
    void *regs[ 14 ];
    size_t ss_size;
    char *ss_sp;

};
void co_resume( stCoRoutine_t *co )
{
    stCoRoutineEnv_t *env = co->env;
    stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];
    if( !co->cStart ) {
        coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 );
        co->cStart = 1;
    }
    env->pCallStack[ env->iCallStackSize++ ] = co;
    co_swap( lpCurrRoutine, co );
}

co_resume 里面主要检查其上下文结构(主要用于保存寄存器、入口函数和栈地址)是否已经初始化,若未则将其初始化。 然后将协程加入调用链,最后使用 co_swap 进行上下文切换,co_swap 会保存当前协程的状态(寄存器,栈地址等),然后切入要运行的协程。

协程时间片的让出

协程在发生如下情况的时候会进行时间片的让出

  • 调用 co_enable_hook_sys 后进行阻塞的读写操作(这主要是通过在 Hook 后的读写函数调用 poll->co_poll_inner)完成的
  • 调用 poll(Hook 后的 poll 函数会调用 co_poll_inner)
  • 调用 co_yield_ct 让出

协程内不能调用 sleep 函数,不然会阻塞整个线程的调用链,可以使用 poll 模拟

协程的事件循环函数

通常我们会在主协程调用 co_eventloop 来处理事件。

struct stCoEpoll_t
{
    int iEpollFd;
    static const int _EPOLL_SIZE = 1024 * 10;
    struct stTimeout_t *pTimeout;
    struct stTimeoutItemLink_t *pstTimeoutList;
    struct stTimeoutItemLink_t *pstActiveList;
    co_epoll_res *result;

};

struct stCoRoutineEnv_t
{
    stCoRoutine_t *pCallStack[ 128 ];
    int iCallStackSize;
    stCoEpoll_t *pEpoll;
};

typedef int (*pfn_co_eventloop_t)(void *);
void co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg );

stCoEpoll_t 是在 libco 创建 stCoRoutineEnv_t 的时候使用 AllocEpoll 初始化的。

stCoEpoll_t *AllocEpoll()
{
    stCoEpoll_t *ctx = (stCoEpoll_t*)calloc( 1,sizeof(stCoEpoll_t) );
    ctx->iEpollFd = co_epoll_create( stCoEpoll_t::_EPOLL_SIZE );
    ctx->pTimeout = AllocTimeout( 60 * 1000 );
    ctx->pstActiveList = (stTimeoutItemLink_t*)calloc( 1,sizeof(stTimeoutItemLink_t) );
    ctx->pstTimeoutList = (stTimeoutItemLink_t*)calloc( 1,sizeof(stTimeoutItemLink_t) );
    return ctx;
}

struct stTimeout_t
{
    stTimeoutItemLink_t *pItems;
    int iItemSize;

    unsigned long long ullStart;
    long long llStartIdx;
};

stTimeout_t *AllocTimeout( int iSize )
{
    stTimeout_t *lp = (stTimeout_t*)calloc( 1,sizeof(stTimeout_t) );
    lp->iItemSize = iSize;
    lp->pItems = (stTimeoutItemLink_t*)calloc( 1,sizeof(stTimeoutItemLink_t) * lp->iItemSize );
    lp->ullStart = GetTickMS();
    lp->llStartIdx = 0;

    return lp;
}

可以看到 iEpollFd 是 epoll_create 创建的 fd,而 pTimeout 是一组保存超时事件的队列, pTimeout 的 pItems 的每一个元素都是一个超时队列,这个队列保存了接下来 N 毫秒超时的事件(数组的索引就是 N)。 由于这个数组初始化为了 60 * 1000 大小,因此我们创建的超时事件不能大于 60 秒。

现在来看看 eventloop 的实现

void co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg )
{
    if( !ctx->result ) {
        ctx->result =  co_epoll_res_alloc( stCoEpoll_t::_EPOLL_SIZE );
    }
    co_epoll_res *result = ctx->result;

    for(;;) {
        int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 );
        stTimeoutItemLink_t *active = (ctx->pstActiveList);
        stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList);
        memset( timeout,0,sizeof(stTimeoutItemLink_t) );
        for(int i=0;i<ret;i++) {
            stTimeoutItem_t *item = (stTimeoutItem_t*)result->events[i].data.ptr;
            if( item->pfnPrepare ) {
                item->pfnPrepare( item,result->events[i],active );
            } else {
                AddTail( active,item );
            }
        }

        unsigned long long now = GetTickMS();
        TakeAllTimeout( ctx->pTimeout,now,timeout );

        stTimeoutItem_t *lp = timeout->head;
        while( lp )	{
            lp->bTimeout = true;
            lp = lp->pNext;
        }

        Join<stTimeoutItem_t,stTimeoutItemLink_t>( active,timeout );

        lp = active->head;
        while( lp ) {
            PopHead<stTimeoutItem_t,stTimeoutItemLink_t>( active );
            if (lp->bTimeout && now < lp->ullExpireTime) {
                int ret = AddTimeout(ctx->pTimeout, lp, now);
                if (!ret) {
                    lp->bTimeout = false;
                    lp = active->head;
                    continue;
                }
            }
            if( lp->pfnProcess ) {
                lp->pfnProcess( lp );
            }

            lp = active->head;
        }
        if( pfn ) {
            if( -1 == pfn( arg ) ) {
                break;
            }
        }
    }
}

eventloop 里面是一个无限循环,每次循环的时候先用 co_epoll_wait 等待 IO 事件,之后将对所有准备好的 IO 事件放入 active 队列(或者调用指定的 pfnPrepare 函数)。

int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 );
stTimeoutItemLink_t *active = (ctx->pstActiveList);
stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList);
memset( timeout,0,sizeof(stTimeoutItemLink_t) );
for(int i=0;i<ret;i++) {
    stTimeoutItem_t *item = (stTimeoutItem_t*)result->events[i].data.ptr;
    if( item->pfnPrepare ) {
        item->pfnPrepare( item,result->events[i],active );
    } else {
        AddTail( active,item );
    }
}

然后将所有超时的事件放入 timeout 队列。并将超时队列的事件放入 active 队列。

unsigned long long now = GetTickMS();
TakeAllTimeout( ctx->pTimeout,now,timeout );
stTimeoutItem_t *lp = timeout->head;
    while( lp ) {
        lp->bTimeout = true;
        lp = lp->pNext;
    }

Join<stTimeoutItem_t,stTimeoutItemLink_t>( active,timeout );

接下来就是处理 active 的每个事件,若事件的过期时间未到达则将其重新加入 pTimeout。 若事件对象指定了处理函数(pfnProcess)则执行处理函数。

while( lp ) {
    PopHead<stTimeoutItem_t,stTimeoutItemLink_t>( active );
    if (lp->bTimeout && now < lp->ullExpireTime) {
        int ret = AddTimeout(ctx->pTimeout, lp, now);
        if (!ret) {
            lp->bTimeout = false;
            lp = active->head;
            continue;
        }
    }
    if( lp->pfnProcess ) {
        lp->pfnProcess( lp );
    }
    lp = active->head;
}

最后事件循环会调用回调函数,检查是否需要退出事件循环。

if( pfn ) {
    if( -1 == pfn( arg ) ) {
        break;
    }
}
IO 事件注册

在讲 eventloop 的实现时,我们可以看到 eventloop 主要处理两种事件,一种是 IO 事件,另一种是超时时间,本节主要将如何向 eventloop 注册 IO 事件。

注册 IO 事件的一种方式是直接使用 co_poll 函数:

int co_poll( stCoEpoll_t *ctx,struct pollfd fds[], nfds_t nfds, int timeout_ms )
{
    return co_poll_inner(ctx, fds, nfds, timeout_ms, NULL);
}

这种情况下直接调用了 co_poll_inner 进行所有的细节处理,还有一种方式是在调用了 co_enable_hook_sys 后直接使用阻塞的 IO 读写操作,从上面的分析可知,这种情况下最终也是调用到了 co_poll_inner。

co_poll_inner 首先要做的就是将创建相应的数据结构。

struct stPoll_t : public stTimeoutItem_t
{
    struct pollfd *fds;
    nfds_t nfds; // typedef unsigned long int nfds_t;
    stPollItem_t *pPollItems;
    int iAllEventDetach;
    int iEpollFd;
    int iRaiseCnt;
};

struct stPollItem_t : public stTimeoutItem_t
{
    struct pollfd *pSelf;
    stPoll_t *pPoll;
    struct epoll_event stEvent;
};

int co_poll_inner( stCoEpoll_t *ctx,struct pollfd fds[], nfds_t nfds, int timeout, poll_pfn_t pollfunc)
{
    if (timeout == 0) {
        return pollfunc(fds, nfds, timeout);
    }
    if (timeout < 0) {
        timeout = INT_MAX;
    }
    int epfd = ctx->iEpollFd;
    stCoRoutine_t* self = co_self();

    //1.struct change
    stPoll_t& arg = *((stPoll_t*)malloc(sizeof(stPoll_t)));
    memset( &arg,0,sizeof(arg) );

    arg.iEpollFd = epfd;
    arg.fds = (pollfd*)calloc(nfds, sizeof(pollfd));
    arg.nfds = nfds;

    stPollItem_t arr[2];
    if( nfds < sizeof(arr) / sizeof(arr[0]) && !self->cIsShareStack) {
        arg.pPollItems = arr;
    } else {
        arg.pPollItems = (stPollItem_t*)malloc( nfds * sizeof( stPollItem_t ) );
    }
    memset( arg.pPollItems,0,nfds * sizeof(stPollItem_t) );

    arg.pfnProcess = OnPollProcessEvent;
    arg.pArg = GetCurrCo( co_get_curr_thread_env() );
    // ...
}

然后,将 pollfd 的结构转换为 epoll 需要的结构,然后使用 epoll_ctl 将其添加。

for(nfds_t i=0;i<nfds;i++) {
    arg.pPollItems[i].pSelf = arg.fds + i;
    arg.pPollItems[i].pPoll = &arg;
    arg.pPollItems[i].pfnPrepare = OnPollPreparePfn;
    struct epoll_event &ev = arg.pPollItems[i].stEvent;

    if( fds[i].fd > -1 ) {
        ev.data.ptr = arg.pPollItems + i;
        ev.events = PollEvent2Epoll( fds[i].events );
        int ret = co_epoll_ctl( epfd,EPOLL_CTL_ADD, fds[i].fd, &ev );
        if (ret < 0 && errno == EPERM && nfds == 1 && pollfunc != NULL) {
            if( arg.pPollItems != arr ) {
                free( arg.pPollItems );
                arg.pPollItems = NULL;
            }
            free(arg.fds);
            free(&arg);
            return pollfunc(fds, nfds, timeout);
        }
    }
}

在这里如果 epoll_ctl 失败并且 errno 是 EPERM 的话,则直接调用 pollfunc,什么时候会失败并且 errno 为 EPERM 呢?查了下 epoll_ctl 的文档,说明如下:

EPERM  The target file fd does not support epoll.  This error can occur if fd refers to, for example, a regular file or a directory

也就是说,如果我们读写的是一个普通文件则直接使用系统的 poll 函数(非 Hook 版本),对于在硬盘的普通文件这不大可能会阻塞,但是如果这是一个网络文件系统的文件(通过 NFS 或者 fuse 的方式挂载),则有可能会出现阻塞。

接着,添加一个超时事件,然后调用 co_yield_env() 让出时间片 。

unsigned long long now = GetTickMS();
arg.ullExpireTime = now + timeout;
int ret = AddTimeout( ctx->pTimeout,&arg,now );
int iRaiseCnt = 0;
if( ret != 0 ){
    co_log_err("CO_ERR: AddTimeout ret %d now %lld timeout %d arg.ullExpireTime %lld",
            ret,now,timeout,arg.ullExpireTime);
    errno = EINVAL;
    iRaiseCnt = -1;
} else {
    co_yield_env( co_get_curr_thread_env() );
    iRaiseCnt = arg.iRaiseCnt;
}
void co_yield_env( stCoRoutineEnv_t *env )
{
    stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ];
    stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ];
    env->iCallStackSize--;
    co_swap( curr, last);
}

最后当协程重新切回来的时候执行清理操作。

对每个 IO 事件指定了 OnPollPreparePfn 为 pfnPrepare,没对 IO 事件指定 pfnProcess。 对超时事件指定 OnPollProcessEvent 为 pfnProcess。

下面来看看这两个函数的实现:

void OnPollPreparePfn( stTimeoutItem_t * ap,struct epoll_event &e,stTimeoutItemLink_t *active {
    stPollItem_t *lp = (stPollItem_t *)ap;
    lp->pSelf->revents = EpollEvent2Poll( e.events );
    stPoll_t *pPoll = lp->pPoll;
    pPoll->iRaiseCnt++;
    if( !pPoll->iAllEventDetach ) {
        pPoll->iAllEventDetach = 1;
        RemoveFromLink<stTimeoutItem_t,stTimeoutItemLink_t>( pPoll );
        AddTail( active,pPoll );
    }
}

这个函数主要是将 epoll 的 event 转换为 poll 类型的。

void OnPollProcessEvent( stTimeoutItem_t * ap )
{
    stCoRoutine_t *co = (stCoRoutine_t*)ap->pArg;
    co_resume( co );
}

可以看出 OnPollProcessEvent 主要是将协程切回来,由此也可以知道 co_poll_inner 中注册一个超时事件的用意是保证协程能按时切回来。

协程私有变量

由于一个线程会运行多个协程,操作系统提供的线程私有变量这时候就解决不了问题了。 libco 针对协程环境提供了协程私有变量,相关接口如下。

int co_setspecific( pthread_key_t key, const void *value );
void *co_getspecific( pthread_key_t key );

libco 还在 co_specific.h 提供了一个宏让用户可以方便地使用协程私有变量。

#define CO_ROUTINE_SPECIFIC( name,y ) \
\
static pthread_once_t _routine_once_##name = PTHREAD_ONCE_INIT; \
static pthread_key_t _routine_key_##name;\
static int _routine_init_##name = 0;\
static void _routine_make_key_##name() \
{\
    (void) pthread_key_create(&_routine_key_##name, NULL); \
}\
template <class T>\
class clsRoutineData_routine_##name\
{\
public:\
    inline T *operator->()\
    {\
        if( !_routine_init_##name ) \
        {\
            pthread_once( &_routine_once_##name,_routine_make_key_##name );\
            _routine_init_##name = 1;\
        }\
        T* p = (T*)co_getspecific( _routine_key_##name );\
        if( !p )\
        {\
            p = (T*)calloc(1,sizeof( T ));\
            int ret = co_setspecific( _routine_key_##name,p) ;\
                if ( ret )\
                {\
                    if ( p )\
                    {\
                        free(p);\
                        p = NULL;\
                    }\
                }\
        }\
        return p;\
    }\
};\
\
static clsRoutineData_routine_##name<name> y;

可以看到,pthread_key_t 是需要用到线程私有变量的相关设施来创建的。

现在看看 libco 内部是如何实现协程变量的:

void *co_getspecific(pthread_key_t key)
{
    stCoRoutine_t *co = GetCurrThreadCo();
    if( !co || co->cIsMain ) {
        return pthread_getspecific( key );
    }
    return co->aSpec[ key ].value;
}

int co_setspecific(pthread_key_t key, const void *value)
{
    stCoRoutine_t *co = GetCurrThreadCo();
    if( !co || co->cIsMain ) {
        return pthread_setspecific( key,value );
    }
    co->aSpec[ key ].value = (void*)value;
    return 0;
}

可以看到相关实现非常简单,如果是主协程则直接使用线程私有变量的相关函数,否则使用协程结构(上面有给出协程结构的定义)的 aSpec 数组来保存,aSpec 是一个大小为 1024 的数组,其数组元素类型为 stCoSpec_t。

struct stCoSpec_t
{
    void *value;
};

由于协程私有变量的 key 是通过 pthread_key_create 来创建的,在 Linux 下线程私有变量的个数是有限的,最多只能有 PTHREAD_KEYS_MAX 个,在大部分系统这个限制就是 1024。

协程信号量

libco 提供了如下信号量函数,使得我们可以进行协程间的同步操作。

struct stCoCond_t;

stCoCond_t *co_cond_alloc();
int co_cond_free( stCoCond_t * cc );

int co_cond_signal( stCoCond_t * );
int co_cond_broadcast( stCoCond_t * );
int co_cond_timedwait( stCoCond_t *,int timeout_ms );

co_cond_alloc 的作用是创建一个信号量。

struct stCoCondItem_t
{
    stCoCondItem_t *pPrev;
    stCoCondItem_t *pNext;
    stCoCond_t *pLink;

    stTimeoutItem_t timeout;
};
struct stCoCond_t
{
    stCoCondItem_t *head;
    stCoCondItem_t *tail;
};

stCoCond_t *co_cond_alloc()
{
    return (stCoCond_t*)calloc( 1,sizeof(stCoCond_t) );
}
int co_cond_free( stCoCond_t * cc )
{
    free( cc );
    return 0;
}

co_cond_timedwait 用于进行信号量的等待,而这是通过在 pEpoll 的 timeout 链表加入一个 timeout 的 item 来实现超时触发的,同时将 cond item 加入信号量所在的链表,然后使用 co_yield_ct 让出时间片. 当发生超时触发时,会调用 OnSignalProcessEvent 将协程环境恢复。

static void OnSignalProcessEvent( stTimeoutItem_t * ap )
{
    stCoRoutine_t *co = (stCoRoutine_t*)ap->pArg;
    co_resume( co );
}

int co_cond_timedwait( stCoCond_t *link,int ms )
{
    stCoCondItem_t* psi = (stCoCondItem_t*)calloc(1, sizeof(stCoCondItem_t));
    psi->timeout.pArg = GetCurrThreadCo();
    psi->timeout.pfnProcess = OnSignalProcessEvent;

    if( ms > 0 ) {
        unsigned long long now = GetTickMS();
        psi->timeout.ullExpireTime = now + ms;

        int ret = AddTimeout( co_get_curr_thread_env()->pEpoll->pTimeout,&psi->timeout,now );
        if( ret != 0 ) {
            free(psi);
            return ret;
        }
    }
    AddTail( link, psi);
    co_yield_ct();
    RemoveFromLink<stCoCondItem_t,stCoCond_t>( psi );
    free(psi);
    return 0;
}

void co_yield_ct()
{
	co_yield_env( co_get_curr_thread_env() );
}

当我们要通知等待的协程条件已经就绪时需要使用 co_cond_signal 或者 co_cond_broadcast。 这两个函数都是通过将等待的协程的超时对象从当前线程环境的 timeout 队列移除然后将其加入 active 列表来实现的,这样就可以使等待的协程不必等到超时再触发,而是下一个 eventloop 的处理周期就触发。

stCoCondItem_t *co_cond_pop( stCoCond_t *link )
{
    stCoCondItem_t *p = link->head;
    if( p ) {
        PopHead<stCoCondItem_t,stCoCond_t>( link );
    }
    return p;
}

int co_cond_signal( stCoCond_t *si )
{
    stCoCondItem_t * sp = co_cond_pop( si );
    if( !sp ) {
        return 0;
    }
    RemoveFromLink<stTimeoutItem_t,stTimeoutItemLink_t>( &sp->timeout );
    AddTail( co_get_curr_thread_env()->pEpoll->pstActiveList,&sp->timeout );
    return 0;
}

int co_cond_broadcast( stCoCond_t *si )
{
    for(;;) {
        stCoCondItem_t * sp = co_cond_pop( si );
        if( !sp ) return 0;
        RemoveFromLink<stTimeoutItem_t,stTimeoutItemLink_t>( &sp->timeout );
        AddTail( co_get_curr_thread_env()->pEpoll->pstActiveList,&sp->timeout );
    }

    return 0;
}

协程切换原理

在上面的分析中,我们发现 libco 的协程切换都是通过 co_swap 函数来完成的。 在这部分内容主要讲解 libco 的协程切换原理。

不考虑共享栈的情况

void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co)
{
    stCoRoutineEnv_t* env = co_get_curr_thread_env();
    // ...
    //swap context
    coctx_swap(&(curr->ctx),&(pending_co->ctx) );

    //stack buffer may be overwrite, so get again;
    // ....
}

在不考虑有共享栈的情况下,co_swap 的主体功能就只剩下 coctx_swap 了。 libco 的上下文切换主要在 coctx.h 、coctx.cpp 和 coctx_swap.S 中。

协程的 ctx 成员是一个 coctx_t 结构体。

struct coctx_t {
    void *regs[ 14 ];
    size_t ss_size;
    char *ss_sp;
};
//low | regs[0]: r15 |
//    | regs[1]: r14 |
//    | regs[2]: r13 |
//    | regs[3]: r12 |
//    | regs[4]: r9  |
//    | regs[5]: r8  |
//    | regs[6]: rbp |
//    | regs[7]: rdi |
//    | regs[8]: rsi |
//    | regs[9]: ret |  //ret func addr
//    | regs[10]: rdx |
//    | regs[11]: rcx |
//    | regs[12]: rbx |
//hig | regs[13]: rsp |
enum {
    kRDI = 7,
    kRSI = 8,
    kRETAddr = 9,
    kRSP = 13,
};

ctx 的定义是跟架构相关的,我们这里只给出了 x64 的定义。 ctx 保存了协程切出时的寄存器信息和栈信息。

在协程切出时,coctx_swap 会将当前协程的寄存器信息和运行到的函数地址保存起来,当协程再次获得时间片时,coctx_swap 会将之前保存的信息恢复然后返回控制权。

那么当协程第一次切入的时候的入口函数是什么时候设置的呢?实际上协程第一次获的时间片总是 co_resume 函数获得的。

void co_resume( stCoRoutine_t *co )
{
    stCoRoutineEnv_t *env = co->env;
    stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];
    if( !co->cStart ) {
        coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 );
        co->cStart = 1;
    }
    env->pCallStack[ env->iCallStackSize++ ] = co;
    co_swap( lpCurrRoutine, co );
}

coctx_make 初始化 ctx 并设置入口函数,所有主协程外的协程的入口地址都是 CoRoutineFunc,这个函数实现如下:

static int CoRoutineFunc( stCoRoutine_t *co,void * )
{
    if( co->pfn ) {
        co->pfn( co->arg );
    }
    co->cEnd = 1;

    stCoRoutineEnv_t *env = co->env;
    co_yield_env( env );
    return 0;
}

基本就是简单地回调了创建协程是提供的入口函数。

libco 的上下文切换是用汇编实现的,实现如下。

coctx_swap:
    leaq 8(%rsp),%rax
    leaq 112(%rdi),%rsp
    pushq %rax
    pushq %rbx
    pushq %rcx
    pushq %rdx

    pushq -8(%rax) //ret func addr

    pushq %rsi
    pushq %rdi
    pushq %rbp
    pushq %r8
    pushq %r9
    pushq %r12
    pushq %r13
    pushq %r14
    pushq %r15

    movq %rsi, %rsp
    popq %r15
    popq %r14
    popq %r13
    popq %r12
    popq %r9
    popq %r8
    popq %rbp
    popq %rdi
    popq %rsi
    popq %rax //ret func addr
    popq %rdx
    popq %rcx
    popq %rbx
    popq %rsp
    pushq %rax

    xorl %eax, %eax
    ret

如果没有汇编基础看了上面的实现可能会有点懵,没关系,下面我们来详细讲解。 首先这是 AT&T 语法的汇编,(如果你学过 Intel 语法的汇编,需要注意一下语法的区别),左边的是源操作数,右边的是目标操作数。

leaq 8(%rsp),%rax

rsp 保存的是栈顶的地址,这一句表示将栈顶加 8 然后赋值给 rax 寄存器。

执行完后栈的情况为:

| xxx |
| xx2 |
| ret | <-rsp
|     | <-rax

接下来是:

leaq 112(%rdi),%rsp

rdi 保存的是调用 coctx_swap 时第一个参数的地址(这是 System V AMD64 ABI 规范确定的,Linux 下 GCC 编译出来的二进制代码会遵守这个规定 ),这这里 rdi 保存的是当前协程的 ctx 的地址,rsi 保存的是将要切换的协程的地址。 这句执行完后栈的情况为:

------------原先的栈------------------
| xxx |
| xx2 |
| ret |
|     | <-rax

------------当前协程的 ctx->reg -------
|         | <- rsp
| reg[13] |
| reg[12] |
| ...     |
| reg[0]  |

接下来的 pushq 就是将 当前的寄存器和返回的函数地址保存起来, 然后调用:

movq %rsi, %rsp

将栈指针要切换的协程的 ctx->reg,这句执行后栈空间为:

------------原先的栈------------------
| xxx |
| xx2 |
| ret |
|     | <-rax

------------当前协程的 ctx->reg -------
|         |
| reg[13] |
| reg[12] |
| ...     |
| reg[0]  |

------------要切换的协程的 ctx->reg -------
|         |
| reg[13] |
| reg[12] |
| ...     |
| reg[0]  | <- rsp

最后进行一系列的 popq 操作恢复之前保存的寄存器和返回地址(如果是第一次切入此协程则是由 coctx_make 设置的)

需要注意的是最后的 popq 操作将 rsp 由指向 “要切换的协程的 ctx->reg” 设置为了真正的调用栈,此时栈空间为:

------------原先的栈------------------
| xxx |
| xx2 |
| ret |
|     |

------------当前协程的 ctx->reg -------
|         |
| reg[13] |
| reg[12] |
| ...     |
| reg[0]  |

------------要切换的协程的 ctx->reg -------
|         |
| reg[13] |
| reg[12] |
| ...     |
| reg[0]  | <- rsp

------------要切换的协程的栈 -------
| xxx     |
| xxx2    | <- rsp
| ret     |

popq rsp 对应的是之前保存 rsp+8,因此 rsp 指向的是 ret 上面的一个值,而恢复后的 rax 保存了真正的返回地址,因此最后三条语句用于完成上下文的切换。

pushq %rax
xorl %eax, %eax
ret

可以看到这里的实现有些奇怪的地方,比如为什么不直接保存 rsp 而是保存 rsp+8 和真正的返回地址呢? 其实这跟将要将的共享栈的实现有关。

共享栈的情况

之前我们给出的 co_swap 的实现,没有考虑共享栈的情况,现在给出完整的实现:

void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co)
{
    stCoRoutineEnv_t* env = co_get_curr_thread_env();

    //get curr stack sp
    char c;
    curr->stack_sp= &c;

    if (!pending_co->cIsShareStack) {
        env->pending_co = NULL;
        env->occupy_co = NULL;
    } else {
        env->pending_co = pending_co;
        //get last occupy co on the same stack mem
        stCoRoutine_t* occupy_co = pending_co->stack_mem->occupy_co;
        //set pending co to occupy thest stack mem;
        pending_co->stack_mem->occupy_co = pending_co;
        env->occupy_co = occupy_co;
        if (occupy_co && occupy_co != pending_co) {
            save_stack_buffer(occupy_co);
        }
    }
    //swap context
    coctx_swap(&(curr->ctx),&(pending_co->ctx) );
    //stack buffer may be overwrite, so get again;
    stCoRoutineEnv_t* curr_env = co_get_curr_thread_env();
        stCoRoutine_t* update_occupy_co =  curr_env->occupy_co;
        stCoRoutine_t* update_pending_co = curr_env->pending_co;
        if (update_occupy_co && update_pending_co && update_occupy_co != update_pending_co) {
            //resume stack buffer
            if (update_pending_co->save_buffer && update_pending_co->save_size > 0) {
                memcpy(update_pending_co->stack_sp,update_pending_co->save_buffer, update_pending_co->save_size);
        }
    }
}

在有共享栈的情况下,多个协程会共享一个堆栈,因此要切入协程时需要检查协程的栈的拥有者是不是当前协程,若不是则需要在切换协程前保存拥有者协程的栈,在协程恢复后需要栈。

由于我们是在 co_swap 这里保存栈的,此时还没调用 coctx_swap。因此保存的栈为(以上面的情况为例):

------------原先的栈------------------
| xxx |
| xx2 | <- rsp

因此我们在保存环境的时候将 rsp 保存为 rsp+8,ret 需要额外保存起来。

总结

  • co_eventloop 为事件循环函数,主要处理 IO 事件超时事件。
  • libco 通过 Hook 系统调用的方式将一些阻塞的 IO 操作变为了非阻塞的。
  • libco 的协程都是有栈的,可以是共享栈也可以是独立栈。
  • 上下文切换通过汇编完成,主要是通过保存和恢复寄存器和返回地址来完成切换。
  • libco 的 Hook 层会记录用户打开文件时的原始的 flag,如果用户以非阻塞方式打开文件,libco 会直接使用原始的系统函数进行操作,而不是使用 Hook 层的处理逻辑。

参考