分类:C++| 发布时间:2019-05-16 19:48:00
本文主要介绍 libco 协程库的原理与实现要点。
本文不会介绍协程是什么,以及如何使用 libco 编写程序。
本文主要分析 Linux 平台的 libco 实现。
libco 在框架分为三层,分别是接口层、系统函数 Hook 层以及事件驱动层。
libco 在 co_epoll.h 和 co_epoll.cpp 里面封装了实现驱动的接口。
struct co_epoll_res
{
int size;
struct epoll_event *events;
struct kevent *eventlist;
};
int co_epoll_wait( int epfd,struct co_epoll_res *events,int maxevents,int timeout );
int co_epoll_ctl( int epfd,int op,int fd,struct epoll_event * );
int co_epoll_create( int size );
struct co_epoll_res *co_epoll_res_alloc( int n );
void co_epoll_res_free( struct co_epoll_res * );
Linux 平台下主要是简单封装了下 epoll 的相关接口
libco 主要在 co_hook_sys_call.cpp 文件中使用 dlsym(RTLD_NEXT, ...) 将一些相关的系统函数替换为自己的实现。
目前进行 Hook 的函数有:
要启用 libco 的 Hook 函数需要在每个协程里面调用 void co_enable_hook_sys() 函数。
struct rpchook_t
{
int user_flag;
struct sockaddr_in dest; //maybe sockaddr_un;
int domain; //AF_LOCAL , AF_INET
struct timeval read_timeout;
struct timeval write_timeout;
};
static rpchook_t *g_rpchook_socket_fd[ 102400 ] = { 0 };
libco 的 Hook 层会在 g_rpchook_socket_fd 全局变量里面记录每个 fd 的属性,在开源的版本里主要用到了:
libco 里面在 user_flag 记录了原始的 flag,主要是用于区分用户的阻塞 I/O 和非阻塞 I/O,对于非阻塞 I/O,libco 会直接调用相应的系统函数进行处理,而不会使用 Hook 层的逻辑。
对于用户的阻塞 I/O,libco 会在 socket 和 open 的 Hook 函数里面将 fd 设置 O_NONBLOCK(通过 user_flag 可以知道用户是期望以阻塞方式进行操作的), 然后使用 poll 函数(这个函数也是被 Hook 掉的)探测 fd 是否可读/可写, libco 的 poll 函数会使用 co_poll_inner 进行探测,而 co_poll_inner 会将其注册到线程的 iEpollFd 然后让出时间片。
libco 接口层主要提供了一些供用户使用的抽象接口,主要实现代码在 co_routine.h 和 co_routine.cpp:
struct stCoRoutine_t;
struct stShareStack_t;
struct stCoRoutineAttr_t
{
int stack_size;
stShareStack_t* share_stack;
stCoRoutineAttr_t() {
stack_size = 128 * 1024;
share_stack = NULL;
}
}__attribute__ ((packed));
//2.co_routine
int co_create( stCoRoutine_t **co,const stCoRoutineAttr_t *attr,void *(*routine)(void*),void *arg );
void co_resume( stCoRoutine_t *co );
void co_yield( stCoRoutine_t *co );
void co_yield_ct(); //ct = current thread
void co_release( stCoRoutine_t *co );
stCoRoutine_t *co_self();
int co_poll( stCoEpoll_t *ctx,struct pollfd fds[], nfds_t nfds, int timeout_ms );
void co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg );
co_create 用于创建协程,co_resume 用于恢复协程,co_yield 用于让出时间片,co_release 用于释放协程资源。 co_eventloop 用于主协程执行事件循环。
libco 的协程是单调用链的,就是说一个线程内可以创建 N 个协程,协程总是由当前线程调用,一个线程只有一条调用链。
struct stCoRoutineEnv_t
{
stCoRoutine_t *pCallStack[ 128 ];
int iCallStackSize;
stCoEpoll_t *pEpoll;
//for copy stack log lastco and nextco
stCoRoutine_t* pending_co;
stCoRoutine_t* occupy_co;
};
static __thread stCoRoutineEnv_t* gCoEnvPerThread = NULL;
libco 使用 stCoRoutineEnv_t 结构来记录当前的调用链。 当前线程的调用链通过线程专有变量 gCoEnvPerThread 来保存,libco 内部会在第一次使用协程的时候初始化这个变量,我们看到 pCallStack 的大小 128,这意味着,我们最多可以在协程嵌套 co_resume 新协程的深度为 128(协程里面运行新的协程)。
下面我们来看看协程的创建:
struct stCoRoutineAttr_t
{
int stack_size;
stShareStack_t* share_stack;
stCoRoutineAttr_t()
{
stack_size = 128 * 1024;
share_stack = NULL;
}
}__attribute__ ((packed));
struct stCoRoutine_t
{
stCoRoutineEnv_t *env;
pfn_co_routine_t pfn;
void *arg;
coctx_t ctx;
char cStart;
char cEnd;
char cIsMain;
char cEnableSysHook;
char cIsShareStack;
void *pvEnv;
//char sRunStack[ 1024 * 128 ];
stStackMem_t* stack_mem;
//save satck buffer while confilct on same stack_buffer;
char* stack_sp;
unsigned int save_size;
char* save_buffer;
stCoSpec_t aSpec[1024];
};
typedef void *(*pfn_co_routine_t)( void * );
int co_create( stCoRoutine_t **ppco,const stCoRoutineAttr_t *attr,pfn_co_routine_t pfn,void *arg )
{
if( !co_get_curr_thread_env() ) {
co_init_curr_thread_env();
}
stCoRoutine_t *co = co_create_env( co_get_curr_thread_env(), attr, pfn,arg );
*ppco = co;
return 0;
}
协程的创建逻辑相当简单,主要就是检查当前线程的 gCoEnvPerThread 变量是否已经初始化,若未则进行初始化。然后初始化相关的结构体。
void co_init_curr_thread_env()
{
gCoEnvPerThread = (stCoRoutineEnv_t*)calloc( 1, sizeof(stCoRoutineEnv_t) );
stCoRoutineEnv_t *env = gCoEnvPerThread;
env->iCallStackSize = 0;
struct stCoRoutine_t *self = co_create_env( env, NULL, NULL,NULL );
self->cIsMain = 1;
env->pending_co = NULL;
env->occupy_co = NULL;
coctx_init( &self->ctx );
env->pCallStack[ env->iCallStackSize++ ] = self;
stCoEpoll_t *ev = AllocEpoll();
SetEpoll( env,ev );
}
stCoRoutineEnv_t *co_get_curr_thread_env()
{
return gCoEnvPerThread;
}
co_init_curr_thread_env 在初始化 gCoEnvPerThread 的时候会创建一个主协程。
在创建协程后可以通过 co_resume 接口运行协程。
struct coctx_t
{
void *regs[ 14 ];
size_t ss_size;
char *ss_sp;
};
void co_resume( stCoRoutine_t *co )
{
stCoRoutineEnv_t *env = co->env;
stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];
if( !co->cStart ) {
coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 );
co->cStart = 1;
}
env->pCallStack[ env->iCallStackSize++ ] = co;
co_swap( lpCurrRoutine, co );
}
co_resume 里面主要检查其上下文结构(主要用于保存寄存器、入口函数和栈地址)是否已经初始化,若未则将其初始化。 然后将协程加入调用链,最后使用 co_swap 进行上下文切换,co_swap 会保存当前协程的状态(寄存器,栈地址等),然后切入要运行的协程。
协程在发生如下情况的时候会进行时间片的让出
协程内不能调用 sleep 函数,不然会阻塞整个线程的调用链,可以使用 poll 模拟
通常我们会在主协程调用 co_eventloop 来处理事件。
struct stCoEpoll_t
{
int iEpollFd;
static const int _EPOLL_SIZE = 1024 * 10;
struct stTimeout_t *pTimeout;
struct stTimeoutItemLink_t *pstTimeoutList;
struct stTimeoutItemLink_t *pstActiveList;
co_epoll_res *result;
};
struct stCoRoutineEnv_t
{
stCoRoutine_t *pCallStack[ 128 ];
int iCallStackSize;
stCoEpoll_t *pEpoll;
};
typedef int (*pfn_co_eventloop_t)(void *);
void co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg );
stCoEpoll_t 是在 libco 创建 stCoRoutineEnv_t 的时候使用 AllocEpoll 初始化的。
stCoEpoll_t *AllocEpoll()
{
stCoEpoll_t *ctx = (stCoEpoll_t*)calloc( 1,sizeof(stCoEpoll_t) );
ctx->iEpollFd = co_epoll_create( stCoEpoll_t::_EPOLL_SIZE );
ctx->pTimeout = AllocTimeout( 60 * 1000 );
ctx->pstActiveList = (stTimeoutItemLink_t*)calloc( 1,sizeof(stTimeoutItemLink_t) );
ctx->pstTimeoutList = (stTimeoutItemLink_t*)calloc( 1,sizeof(stTimeoutItemLink_t) );
return ctx;
}
struct stTimeout_t
{
stTimeoutItemLink_t *pItems;
int iItemSize;
unsigned long long ullStart;
long long llStartIdx;
};
stTimeout_t *AllocTimeout( int iSize )
{
stTimeout_t *lp = (stTimeout_t*)calloc( 1,sizeof(stTimeout_t) );
lp->iItemSize = iSize;
lp->pItems = (stTimeoutItemLink_t*)calloc( 1,sizeof(stTimeoutItemLink_t) * lp->iItemSize );
lp->ullStart = GetTickMS();
lp->llStartIdx = 0;
return lp;
}
可以看到 iEpollFd 是 epoll_create 创建的 fd,而 pTimeout 是一组保存超时事件的队列, pTimeout 的 pItems 的每一个元素都是一个超时队列,这个队列保存了接下来 N 毫秒超时的事件(数组的索引就是 N)。 由于这个数组初始化为了 60 * 1000 大小,因此我们创建的超时事件不能大于 60 秒。
现在来看看 eventloop 的实现
void co_eventloop( stCoEpoll_t *ctx,pfn_co_eventloop_t pfn,void *arg )
{
if( !ctx->result ) {
ctx->result = co_epoll_res_alloc( stCoEpoll_t::_EPOLL_SIZE );
}
co_epoll_res *result = ctx->result;
for(;;) {
int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 );
stTimeoutItemLink_t *active = (ctx->pstActiveList);
stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList);
memset( timeout,0,sizeof(stTimeoutItemLink_t) );
for(int i=0;i<ret;i++) {
stTimeoutItem_t *item = (stTimeoutItem_t*)result->events[i].data.ptr;
if( item->pfnPrepare ) {
item->pfnPrepare( item,result->events[i],active );
} else {
AddTail( active,item );
}
}
unsigned long long now = GetTickMS();
TakeAllTimeout( ctx->pTimeout,now,timeout );
stTimeoutItem_t *lp = timeout->head;
while( lp ) {
lp->bTimeout = true;
lp = lp->pNext;
}
Join<stTimeoutItem_t,stTimeoutItemLink_t>( active,timeout );
lp = active->head;
while( lp ) {
PopHead<stTimeoutItem_t,stTimeoutItemLink_t>( active );
if (lp->bTimeout && now < lp->ullExpireTime) {
int ret = AddTimeout(ctx->pTimeout, lp, now);
if (!ret) {
lp->bTimeout = false;
lp = active->head;
continue;
}
}
if( lp->pfnProcess ) {
lp->pfnProcess( lp );
}
lp = active->head;
}
if( pfn ) {
if( -1 == pfn( arg ) ) {
break;
}
}
}
}
eventloop 里面是一个无限循环,每次循环的时候先用 co_epoll_wait 等待 IO 事件,之后将对所有准备好的 IO 事件放入 active 队列(或者调用指定的 pfnPrepare 函数)。
int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 );
stTimeoutItemLink_t *active = (ctx->pstActiveList);
stTimeoutItemLink_t *timeout = (ctx->pstTimeoutList);
memset( timeout,0,sizeof(stTimeoutItemLink_t) );
for(int i=0;i<ret;i++) {
stTimeoutItem_t *item = (stTimeoutItem_t*)result->events[i].data.ptr;
if( item->pfnPrepare ) {
item->pfnPrepare( item,result->events[i],active );
} else {
AddTail( active,item );
}
}
然后将所有超时的事件放入 timeout 队列。并将超时队列的事件放入 active 队列。
unsigned long long now = GetTickMS();
TakeAllTimeout( ctx->pTimeout,now,timeout );
stTimeoutItem_t *lp = timeout->head;
while( lp ) {
lp->bTimeout = true;
lp = lp->pNext;
}
Join<stTimeoutItem_t,stTimeoutItemLink_t>( active,timeout );
接下来就是处理 active 的每个事件,若事件的过期时间未到达则将其重新加入 pTimeout。 若事件对象指定了处理函数(pfnProcess)则执行处理函数。
while( lp ) {
PopHead<stTimeoutItem_t,stTimeoutItemLink_t>( active );
if (lp->bTimeout && now < lp->ullExpireTime) {
int ret = AddTimeout(ctx->pTimeout, lp, now);
if (!ret) {
lp->bTimeout = false;
lp = active->head;
continue;
}
}
if( lp->pfnProcess ) {
lp->pfnProcess( lp );
}
lp = active->head;
}
最后事件循环会调用回调函数,检查是否需要退出事件循环。
if( pfn ) {
if( -1 == pfn( arg ) ) {
break;
}
}
在讲 eventloop 的实现时,我们可以看到 eventloop 主要处理两种事件,一种是 IO 事件,另一种是超时事件,本节主要将如何向 eventloop 注册 IO 事件。
注册 IO 事件的一种方式是直接使用 co_poll 函数:
int co_poll( stCoEpoll_t *ctx,struct pollfd fds[], nfds_t nfds, int timeout_ms )
{
return co_poll_inner(ctx, fds, nfds, timeout_ms, NULL);
}
这种情况下直接调用了 co_poll_inner 进行所有的细节处理,还有一种方式是在调用了 co_enable_hook_sys 后直接使用阻塞的 IO 读写操作,从上面的分析可知,这种情况下最终也是调用到了 co_poll_inner。
co_poll_inner 首先要做的就是将创建相应的数据结构。
struct stPoll_t : public stTimeoutItem_t
{
struct pollfd *fds;
nfds_t nfds; // typedef unsigned long int nfds_t;
stPollItem_t *pPollItems;
int iAllEventDetach;
int iEpollFd;
int iRaiseCnt;
};
struct stPollItem_t : public stTimeoutItem_t
{
struct pollfd *pSelf;
stPoll_t *pPoll;
struct epoll_event stEvent;
};
int co_poll_inner( stCoEpoll_t *ctx,struct pollfd fds[], nfds_t nfds, int timeout, poll_pfn_t pollfunc)
{
if (timeout == 0) {
return pollfunc(fds, nfds, timeout);
}
if (timeout < 0) {
timeout = INT_MAX;
}
int epfd = ctx->iEpollFd;
stCoRoutine_t* self = co_self();
//1.struct change
stPoll_t& arg = *((stPoll_t*)malloc(sizeof(stPoll_t)));
memset( &arg,0,sizeof(arg) );
arg.iEpollFd = epfd;
arg.fds = (pollfd*)calloc(nfds, sizeof(pollfd));
arg.nfds = nfds;
stPollItem_t arr[2];
if( nfds < sizeof(arr) / sizeof(arr[0]) && !self->cIsShareStack) {
arg.pPollItems = arr;
} else {
arg.pPollItems = (stPollItem_t*)malloc( nfds * sizeof( stPollItem_t ) );
}
memset( arg.pPollItems,0,nfds * sizeof(stPollItem_t) );
arg.pfnProcess = OnPollProcessEvent;
arg.pArg = GetCurrCo( co_get_curr_thread_env() );
// ...
}
然后,将 pollfd 的结构转换为 epoll 需要的结构,然后使用 epoll_ctl 将其添加。
for(nfds_t i=0;i<nfds;i++) {
arg.pPollItems[i].pSelf = arg.fds + i;
arg.pPollItems[i].pPoll = &arg;
arg.pPollItems[i].pfnPrepare = OnPollPreparePfn;
struct epoll_event &ev = arg.pPollItems[i].stEvent;
if( fds[i].fd > -1 ) {
ev.data.ptr = arg.pPollItems + i;
ev.events = PollEvent2Epoll( fds[i].events );
int ret = co_epoll_ctl( epfd,EPOLL_CTL_ADD, fds[i].fd, &ev );
if (ret < 0 && errno == EPERM && nfds == 1 && pollfunc != NULL) {
if( arg.pPollItems != arr ) {
free( arg.pPollItems );
arg.pPollItems = NULL;
}
free(arg.fds);
free(&arg);
return pollfunc(fds, nfds, timeout);
}
}
}
在这里如果 epoll_ctl 失败并且 errno 是 EPERM 的话,则直接调用 pollfunc,什么时候会失败并且 errno 为 EPERM 呢?查了下 epoll_ctl 的文档,说明如下:
EPERM The target file fd does not support epoll. This error can occur if fd refers to, for example, a regular file or a directory
也就是说,如果我们读写的是一个普通文件则直接使用系统的 poll 函数(非 Hook 版本),对于在硬盘的普通文件这不大可能会阻塞,但是如果这是一个网络文件系统的文件(通过 NFS 或者 fuse 的方式挂载),则有可能会出现阻塞。
接着,添加一个超时事件,然后调用 co_yield_env() 让出时间片 。
unsigned long long now = GetTickMS();
arg.ullExpireTime = now + timeout;
int ret = AddTimeout( ctx->pTimeout,&arg,now );
int iRaiseCnt = 0;
if( ret != 0 ){
co_log_err("CO_ERR: AddTimeout ret %d now %lld timeout %d arg.ullExpireTime %lld",
ret,now,timeout,arg.ullExpireTime);
errno = EINVAL;
iRaiseCnt = -1;
} else {
co_yield_env( co_get_curr_thread_env() );
iRaiseCnt = arg.iRaiseCnt;
}
void co_yield_env( stCoRoutineEnv_t *env )
{
stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ];
stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ];
env->iCallStackSize--;
co_swap( curr, last);
}
最后当协程重新切回来的时候执行清理操作。
对每个 IO 事件指定了 OnPollPreparePfn 为 pfnPrepare,没对 IO 事件指定 pfnProcess。 对超时事件指定 OnPollProcessEvent 为 pfnProcess。
下面来看看这两个函数的实现:
void OnPollPreparePfn( stTimeoutItem_t * ap,struct epoll_event &e,stTimeoutItemLink_t *active {
stPollItem_t *lp = (stPollItem_t *)ap;
lp->pSelf->revents = EpollEvent2Poll( e.events );
stPoll_t *pPoll = lp->pPoll;
pPoll->iRaiseCnt++;
if( !pPoll->iAllEventDetach ) {
pPoll->iAllEventDetach = 1;
RemoveFromLink<stTimeoutItem_t,stTimeoutItemLink_t>( pPoll );
AddTail( active,pPoll );
}
}
这个函数主要是将 epoll 的 event 转换为 poll 类型的。
void OnPollProcessEvent( stTimeoutItem_t * ap )
{
stCoRoutine_t *co = (stCoRoutine_t*)ap->pArg;
co_resume( co );
}
可以看出 OnPollProcessEvent 主要是将协程切回来,由此也可以知道 co_poll_inner 中注册一个超时事件的用意是保证协程能按时切回来。
由于一个线程会运行多个协程,操作系统提供的线程私有变量这时候就解决不了问题了。 libco 针对协程环境提供了协程私有变量,相关接口如下。
int co_setspecific( pthread_key_t key, const void *value );
void *co_getspecific( pthread_key_t key );
libco 还在 co_specific.h 提供了一个宏让用户可以方便地使用协程私有变量。
#define CO_ROUTINE_SPECIFIC( name,y ) \
\
static pthread_once_t _routine_once_##name = PTHREAD_ONCE_INIT; \
static pthread_key_t _routine_key_##name;\
static int _routine_init_##name = 0;\
static void _routine_make_key_##name() \
{\
(void) pthread_key_create(&_routine_key_##name, NULL); \
}\
template <class T>\
class clsRoutineData_routine_##name\
{\
public:\
inline T *operator->()\
{\
if( !_routine_init_##name ) \
{\
pthread_once( &_routine_once_##name,_routine_make_key_##name );\
_routine_init_##name = 1;\
}\
T* p = (T*)co_getspecific( _routine_key_##name );\
if( !p )\
{\
p = (T*)calloc(1,sizeof( T ));\
int ret = co_setspecific( _routine_key_##name,p) ;\
if ( ret )\
{\
if ( p )\
{\
free(p);\
p = NULL;\
}\
}\
}\
return p;\
}\
};\
\
static clsRoutineData_routine_##name<name> y;
可以看到,pthread_key_t 是需要用到线程私有变量的相关设施来创建的。
现在看看 libco 内部是如何实现协程变量的:
void *co_getspecific(pthread_key_t key)
{
stCoRoutine_t *co = GetCurrThreadCo();
if( !co || co->cIsMain ) {
return pthread_getspecific( key );
}
return co->aSpec[ key ].value;
}
int co_setspecific(pthread_key_t key, const void *value)
{
stCoRoutine_t *co = GetCurrThreadCo();
if( !co || co->cIsMain ) {
return pthread_setspecific( key,value );
}
co->aSpec[ key ].value = (void*)value;
return 0;
}
可以看到相关实现非常简单,如果是主协程则直接使用线程私有变量的相关函数,否则使用协程结构(上面有给出协程结构的定义)的 aSpec 数组来保存,aSpec 是一个大小为 1024 的数组,其数组元素类型为 stCoSpec_t。
struct stCoSpec_t
{
void *value;
};
由于协程私有变量的 key 是通过 pthread_key_create 来创建的,在 Linux 下线程私有变量的个数是有限的,最多只能有 PTHREAD_KEYS_MAX 个,在大部分系统这个限制就是 1024。
libco 提供了如下信号量函数,使得我们可以进行协程间的同步操作。
struct stCoCond_t;
stCoCond_t *co_cond_alloc();
int co_cond_free( stCoCond_t * cc );
int co_cond_signal( stCoCond_t * );
int co_cond_broadcast( stCoCond_t * );
int co_cond_timedwait( stCoCond_t *,int timeout_ms );
co_cond_alloc 的作用是创建一个信号量。
struct stCoCondItem_t
{
stCoCondItem_t *pPrev;
stCoCondItem_t *pNext;
stCoCond_t *pLink;
stTimeoutItem_t timeout;
};
struct stCoCond_t
{
stCoCondItem_t *head;
stCoCondItem_t *tail;
};
stCoCond_t *co_cond_alloc()
{
return (stCoCond_t*)calloc( 1,sizeof(stCoCond_t) );
}
int co_cond_free( stCoCond_t * cc )
{
free( cc );
return 0;
}
co_cond_timedwait 用于进行信号量的等待,而这是通过在 pEpoll 的 timeout 链表加入一个 timeout 的 item 来实现超时触发的,同时将 cond item 加入信号量所在的链表,然后使用 co_yield_ct 让出时间片. 当发生超时触发时,会调用 OnSignalProcessEvent 将协程环境恢复。
static void OnSignalProcessEvent( stTimeoutItem_t * ap )
{
stCoRoutine_t *co = (stCoRoutine_t*)ap->pArg;
co_resume( co );
}
int co_cond_timedwait( stCoCond_t *link,int ms )
{
stCoCondItem_t* psi = (stCoCondItem_t*)calloc(1, sizeof(stCoCondItem_t));
psi->timeout.pArg = GetCurrThreadCo();
psi->timeout.pfnProcess = OnSignalProcessEvent;
if( ms > 0 ) {
unsigned long long now = GetTickMS();
psi->timeout.ullExpireTime = now + ms;
int ret = AddTimeout( co_get_curr_thread_env()->pEpoll->pTimeout,&psi->timeout,now );
if( ret != 0 ) {
free(psi);
return ret;
}
}
AddTail( link, psi);
co_yield_ct();
RemoveFromLink<stCoCondItem_t,stCoCond_t>( psi );
free(psi);
return 0;
}
void co_yield_ct()
{
co_yield_env( co_get_curr_thread_env() );
}
当我们要通知等待的协程条件已经就绪时需要使用 co_cond_signal 或者 co_cond_broadcast。 这两个函数都是通过将等待的协程的超时对象从当前线程环境的 timeout 队列移除然后将其加入 active 列表来实现的,这样就可以使等待的协程不必等到超时再触发,而是下一个 eventloop 的处理周期就触发。
stCoCondItem_t *co_cond_pop( stCoCond_t *link )
{
stCoCondItem_t *p = link->head;
if( p ) {
PopHead<stCoCondItem_t,stCoCond_t>( link );
}
return p;
}
int co_cond_signal( stCoCond_t *si )
{
stCoCondItem_t * sp = co_cond_pop( si );
if( !sp ) {
return 0;
}
RemoveFromLink<stTimeoutItem_t,stTimeoutItemLink_t>( &sp->timeout );
AddTail( co_get_curr_thread_env()->pEpoll->pstActiveList,&sp->timeout );
return 0;
}
int co_cond_broadcast( stCoCond_t *si )
{
for(;;) {
stCoCondItem_t * sp = co_cond_pop( si );
if( !sp ) return 0;
RemoveFromLink<stTimeoutItem_t,stTimeoutItemLink_t>( &sp->timeout );
AddTail( co_get_curr_thread_env()->pEpoll->pstActiveList,&sp->timeout );
}
return 0;
}
在上面的分析中,我们发现 libco 的协程切换都是通过 co_swap 函数来完成的。 在这部分内容主要讲解 libco 的协程切换原理。
void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co)
{
stCoRoutineEnv_t* env = co_get_curr_thread_env();
// ...
//swap context
coctx_swap(&(curr->ctx),&(pending_co->ctx) );
//stack buffer may be overwrite, so get again;
// ....
}
在不考虑有共享栈的情况下,co_swap 的主体功能就只剩下 coctx_swap 了。 libco 的上下文切换主要在 coctx.h 、coctx.cpp 和 coctx_swap.S 中。
协程的 ctx 成员是一个 coctx_t 结构体。
struct coctx_t {
void *regs[ 14 ];
size_t ss_size;
char *ss_sp;
};
//low | regs[0]: r15 |
// | regs[1]: r14 |
// | regs[2]: r13 |
// | regs[3]: r12 |
// | regs[4]: r9 |
// | regs[5]: r8 |
// | regs[6]: rbp |
// | regs[7]: rdi |
// | regs[8]: rsi |
// | regs[9]: ret | //ret func addr
// | regs[10]: rdx |
// | regs[11]: rcx |
// | regs[12]: rbx |
//hig | regs[13]: rsp |
enum {
kRDI = 7,
kRSI = 8,
kRETAddr = 9,
kRSP = 13,
};
ctx 的定义是跟架构相关的,我们这里只给出了 x64 的定义。 ctx 保存了协程切出时的寄存器信息和栈信息。
在协程切出时,coctx_swap 会将当前协程的寄存器信息和运行到的函数地址保存起来,当协程再次获得时间片时,coctx_swap 会将之前保存的信息恢复然后返回控制权。
那么当协程第一次切入的时候的入口函数是什么时候设置的呢?实际上协程总是通过 co_resume 函数第一次获得时间片的。
void co_resume( stCoRoutine_t *co )
{
stCoRoutineEnv_t *env = co->env;
stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];
if( !co->cStart ) {
coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 );
co->cStart = 1;
}
env->pCallStack[ env->iCallStackSize++ ] = co;
co_swap( lpCurrRoutine, co );
}
coctx_make 初始化 ctx 并设置入口函数,所有主协程外的协程的入口地址都是 CoRoutineFunc,这个函数实现如下:
static int CoRoutineFunc( stCoRoutine_t *co,void * )
{
if( co->pfn ) {
co->pfn( co->arg );
}
co->cEnd = 1;
stCoRoutineEnv_t *env = co->env;
co_yield_env( env );
return 0;
}
基本就是简单地回调了创建协程是提供的入口函数。
libco 的上下文切换是用汇编实现的,实现如下。
coctx_swap:
leaq 8(%rsp),%rax
leaq 112(%rdi),%rsp
pushq %rax
pushq %rbx
pushq %rcx
pushq %rdx
pushq -8(%rax) //ret func addr
pushq %rsi
pushq %rdi
pushq %rbp
pushq %r8
pushq %r9
pushq %r12
pushq %r13
pushq %r14
pushq %r15
movq %rsi, %rsp
popq %r15
popq %r14
popq %r13
popq %r12
popq %r9
popq %r8
popq %rbp
popq %rdi
popq %rsi
popq %rax //ret func addr
popq %rdx
popq %rcx
popq %rbx
popq %rsp
pushq %rax
xorl %eax, %eax
ret
如果没有汇编基础看了上面的实现可能会有点懵,没关系,下面我们来详细讲解。 首先这是 AT&T 语法的汇编,(如果你学过 Intel 语法的汇编,需要注意一下语法的区别),左边的是源操作数,右边的是目标操作数。
leaq 8(%rsp),%rax
rsp 保存的是栈顶的地址,这一句表示将栈顶加 8 然后赋值给 rax 寄存器。
执行完后栈的情况为:
| xxx |
| xx2 |
| ret | <-rsp
| | <-rax
接下来是:
leaq 112(%rdi),%rsp
rdi 保存的是调用 coctx_swap 时第一个参数的地址(这是 System V AMD64 ABI 规范确定的,Linux 下 GCC 编译出来的二进制代码会遵守这个规定 ),这这里 rdi 保存的是当前协程的 ctx 的地址,rsi 保存的是将要切换的协程的地址。 这句执行完后栈的情况为:
------------原先的栈------------------
| xxx |
| xx2 |
| ret |
| | <-rax
------------当前协程的 ctx->reg -------
| | <- rsp
| reg[13] |
| reg[12] |
| ... |
| reg[0] |
接下来的 pushq 就是将 当前的寄存器和返回的函数地址保存起来, 然后调用:
movq %rsi, %rsp
将栈指针设置为要切换的协程的 ctx->reg,这句执行后栈空间为:
------------原先的栈------------------
| xxx |
| xx2 |
| ret |
| | <-rax
------------当前协程的 ctx->reg -------
| |
| reg[13] |
| reg[12] |
| ... |
| reg[0] |
------------要切换的协程的 ctx->reg -------
| |
| reg[13] |
| reg[12] |
| ... |
| reg[0] | <- rsp
最后进行一系列的 popq 操作恢复之前保存的寄存器和返回地址(如果是第一次切入此协程则是由 coctx_make 设置的)
需要注意的是最后的 popq 操作将 rsp 由指向 "要切换的协程的 ctx->reg" 设置为了真正的调用栈,此时栈空间为:
------------原先的栈------------------
| xxx |
| xx2 |
| ret |
| |
------------当前协程的 ctx->reg -------
| |
| reg[13] |
| reg[12] |
| ... |
| reg[0] |
------------要切换的协程的 ctx->reg -------
| |
| reg[13] |
| reg[12] |
| ... |
| reg[0] | <- rsp
------------要切换的协程的栈 -------
| xxx |
| xxx2 | <- rsp
| ret |
popq rsp 对应的是之前保存 rsp+8,因此 rsp 指向的是 ret 上面的一个值,而恢复后的 rax 保存了真正的返回地址,因此最后三条语句用于完成上下文的切换。
pushq %rax
xorl %eax, %eax
ret
可以看到这里的实现有些奇怪的地方,比如为什么不直接保存 rsp 而是保存 rsp+8 和真正的返回地址呢? 其实这跟将要将的共享栈的实现有关。
之前我们给出的 co_swap 的实现,没有考虑共享栈的情况,现在给出完整的实现:
void co_swap(stCoRoutine_t* curr, stCoRoutine_t* pending_co)
{
stCoRoutineEnv_t* env = co_get_curr_thread_env();
//get curr stack sp
char c;
curr->stack_sp= &c;
if (!pending_co->cIsShareStack) {
env->pending_co = NULL;
env->occupy_co = NULL;
} else {
env->pending_co = pending_co;
//get last occupy co on the same stack mem
stCoRoutine_t* occupy_co = pending_co->stack_mem->occupy_co;
//set pending co to occupy thest stack mem;
pending_co->stack_mem->occupy_co = pending_co;
env->occupy_co = occupy_co;
if (occupy_co && occupy_co != pending_co) {
save_stack_buffer(occupy_co);
}
}
//swap context
coctx_swap(&(curr->ctx),&(pending_co->ctx) );
//stack buffer may be overwrite, so get again;
stCoRoutineEnv_t* curr_env = co_get_curr_thread_env();
stCoRoutine_t* update_occupy_co = curr_env->occupy_co;
stCoRoutine_t* update_pending_co = curr_env->pending_co;
if (update_occupy_co && update_pending_co && update_occupy_co != update_pending_co) {
//resume stack buffer
if (update_pending_co->save_buffer && update_pending_co->save_size > 0) {
memcpy(update_pending_co->stack_sp,update_pending_co->save_buffer, update_pending_co->save_size);
}
}
}
在有共享栈的情况下,多个协程会共享一个堆栈,因此要切入协程时需要检查协程的栈的拥有者是不是当前协程, 若不是则需要在切换协程前保存拥有者协程的栈,在协程切换回来后需要恢复栈。
由于我们是在 co_swap 这里保存栈的,此时还没调用 coctx_swap。因此保存的栈为(以上面的情况为例):
------------原先的栈------------------
| xxx |
| xx2 | <- rsp
因此我们在保存环境的时候将 rsp 保存为 rsp+8,ret 需要额外保存起来。