Skynet_处理队列消息 工作线程 thread_worker 在设置完参数后,服务队列的消息在 while 循环中不断被处理和分发:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 static void *thread_worker (void *p) { struct worker_parm *wp = p; int id = wp->id; int weight = wp->weight; struct monitor *m = wp->m; struct skynet_monitor *sm = m->m[id]; skynet_initthread(THREAD_WORKER); struct message_queue * q = NULL ; while (!m->quit) { q = skynet_context_message_dispatch(sm, q, weight); if (q == NULL ) { if (pthread_mutex_lock(&m->mutex) == 0 ) { ++ m->sleep; if (!m->quit) pthread_cond_wait(&m->cond, &m->mutex); -- m->sleep; if (pthread_mutex_unlock(&m->mutex)) { fprintf (stderr , "unlock mutex error" ); exit (1 ); } } } } return NULL ; }
服务队列的消息被消息分发函数 skynet_context_message_dispatch 进行处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 struct message_queue * //next skynet_context_message_dispatch (struct skynet_monitor *sm , struct message_queue *q , int weight ) { if (q == NULL ) { q = skynet_globalmq_pop(); if (q==NULL ) return NULL ; } uint32_t handle = skynet_mq_handle(q); struct skynet_context * ctx = skynet_handle_grab(handle); int i,n=1 ; struct skynet_message msg ; for (i=0 ;i<n;i++) { if (skynet_mq_pop(q,&msg)) { skynet_context_release(ctx); return skynet_globalmq_pop(); } else if (i==0 && weight >= 0 ) { n = skynet_mq_length(q); n >> = weight; } int overload = skynet_mq_overload(q); if (overload) { skynet_error(ctx, "May overload, message queue length = %d" , overload); } skynet_monitor_trigger(sm, msg.source , handle); if (ctx->cb == NULL ) { skynet_free(msg.data); } else { dispatch_message(ctx, &msg); } skynet_monitor_trigger(sm, 0 ,0 ); } assert(q == ctx->queue ); struct message_queue *nq = skynet_globalmq_pop(); if (nq) { skynet_globalmq_push(q); q = nq; } skynet_context_release(ctx); return q; }
工作线程的每一轮处理过程是:
从全局队列中取出一个队列,然后取出队列里面的消息进行处理。
队列处理完成后,把队列重新加入到全局队列的尾部。接着处理下一轮的消息。
每次拿到一个队列时,可能会处理其中的一个消息,也可能处理其中的几个消息。(这个与当前线程分配的权重有关。这个权重在工作线程启动时就设置好了)。如下所示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 static int weight[] = { -1 , -1 , -1 , -1 , 0 , 0 , 0 , 0 , 1 , 1 , 1 , 1 , 1 , 1 , 1 , 1 , 2 , 2 , 2 , 2 , 2 , 2 , 2 , 2 , 3 , 3 , 3 , 3 , 3 , 3 , 3 , 3 , }; struct worker_parm wp [thread ]; for (i=0 ;i<thread;i++) { wp[i].m = m; wp[i].id = i; if (i < sizeof (weight)/sizeof (weight[0 ])) { wp[i].weight= weight[i]; } else { wp[i].weight = 0 ; } create_thread(&pid[i+3 ], thread_worker, &wp[i]); }
工作线程中,队列消息的权重:
当工作线程确定处理某条消息的时候调用,通过 dispatch_message 函数把消息交给服务内部的回调函数处理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 static void dispatch_message (struct skynet_context *ctx, struct skynet_message *msg) { assert(ctx->init); CHECKCALLING_BEGIN(ctx) pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t )(ctx->handle)); int type = msg->sz >> MESSAGE_TYPE_SHIFT; size_t sz = msg->sz & MESSAGE_TYPE_MASK; FILE *f = (FILE *)ATOM_LOAD(&ctx->logfile); if (f) { skynet_log_output(f, msg->source, type, msg->session, msg->data, sz); } ++ctx->message_count; int reserve_msg; if (ctx->profile) { ctx->cpu_start = skynet_thread_time(); reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz); uint64_t cost_time = skynet_thread_time() - ctx->cpu_start; ctx->cpu_cost += cost_time; } else { reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz); } if (!reserve_msg) { skynet_free(msg->data); } CHECKCALLING_END(ctx) }
snlua服务的回调函数 对于snula服务来说 ,最终的回调函数是在lua层调用 skynet.start 时注册的
1 2 3 4 5 6 7 function skynet.start (start_func) c.callback(skynet.dispatch_message) init_thread = skynet.timeout(0 , function () skynet.init_service(start_func) init_thread = nil end ) end
查看 c.callback(skynet.dispatch_message) 回调函数的注册过程 lcallback
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 static int lcallback (lua_State *L) { struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1 )); int forward = lua_toboolean(L, 2 ); luaL_checktype(L,1 ,LUA_TFUNCTION); lua_settop(L,1 ); lua_rawsetp(L, LUA_REGISTRYINDEX, _cb); lua_rawgeti(L, LUA_REGISTRYINDEX, LUA_RIDX_MAINTHREAD); lua_State *gL = lua_tothread(L,-1 ); if (forward) { skynet_callback(context, gL, forward_cb); } else { skynet_callback(context, gL, _cb); } return 0 ; }
最终的底层回调函数是 _cb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 static int _cb(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const void * msg, size_t sz) { lua_State *L = ud; int trace = 1 ; int r; int top = lua_gettop(L); if (top == 0 ) { lua_pushcfunction(L, traceback); lua_rawgetp(L, LUA_REGISTRYINDEX, _cb); } else { assert(top == 2 ); } lua_pushvalue(L,2 ); lua_pushinteger(L, type); lua_pushlightuserdata(L, (void *)msg); lua_pushinteger(L,sz); lua_pushinteger(L, session); lua_pushinteger(L, source); r = lua_pcall(L, 5 , 0 , trace); if (r == LUA_OK) { return 0 ; } const char * self = skynet_command(context, "REG" , NULL ); switch (r) { case LUA_ERRRUN: skynet_error(context, "lua call [%x to %s : %d msgsz = %d] error : " KRED "%s" KNRM, source , self, session, sz, lua_tostring(L,-1 )); break ; case LUA_ERRMEM: skynet_error(context, "lua memory error : [%x to %s : %d]" , source , self, session); break ; case LUA_ERRERR: skynet_error(context, "lua error in error : [%x to %s : %d]" , source , self, session); break ; }; lua_pop(L,1 ); return 0 ; }
最终的返回值是 0,表示lua层处理完后,底层会释放 msg指向的内存
Barbecue
今天学习了吗
此文章版权归Barbeuce所有,如有转载,请注明明来自原作者