--db.lua local skynet = require"skynet" require"skynet.manager"-- import skynet.register
local db = {--保存了年龄 zhangsan = 12, lisi = 33, wangwu = 4 }
local command = {}
functioncommand.GET(key) return db[key] end
skynet.start(function() skynet.dispatch("lua", function(session, address, cmd, ...)--对应lua消息注册任务函数,专门处理 lua类型 的请求 local f = command[cmd] --这里收到的 cmd 是"GET" ,参数是 "zhangsan",main服务传入的key if f then skynet.ret(skynet.pack(f(...)))--发送响应给main服务,pack打包,ret发回(ret函数在本文最后面) end end)
functionskynet.dispatch_message(...) local succ, err = pcall(raw_dispatch_message,...) whiletruedo if fork_queue.h > fork_queue.t then -- queue is empty fork_queue.h = 1--head fork_queue.t = 0--tail break end -- pop queue local h = fork_queue.h local co = fork_queue[h] fork_queue[h] = nil fork_queue.h = h + 1
local fork_succ, fork_err = pcall(suspend,co,coroutine_resume(co)) end assert(succ, tostring(err)) end
localfunctionraw_dispatch_message(prototype, msg, sz, session, source) -- prototype消息类型(name = "lua",--协议类型名字;id = skynet.PTYPE_LUA,--协议id), -- msg内存块指针, -- sz内存块长度, -- session是main服务中生成的session号, -- source是main服务地址 if prototype == 1then--处理响应 -- skynet.PTYPE_RESPONSE = 1, read skynet.h --db服务这里是请求消息,再次略 else--这里主要是处理lua text socket 等消息类型 local p = proto[prototype] --通过传入的prototype类型获取协议,这里的lua类型 local f = p.dispatch --获取任务函数,在db服务中注册的dispatch任务函数 if f then local co = co_create(f) --获取一个协程对象并设置任务函数f session_coroutine_id[co] = session --保存session以便找到回去的路;注意这里的session是其他服务独立产生的,所以不同的请求者发过来的session可以是相同的 session_coroutine_address[co] = source --保存source以便找到回去的路 即记录请求者是谁 suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz))) --unpack解包成lua对象 end end end
唤醒协程是通过 coroutine_resume(co, session,source, p.unpack(msg,sz)) 。任务函数 f 是db服务在入口函数里设置的。当db的业务处理完成后,即协程挂起时,就会执行 suspend 。suspend 是一个框架函数,用于管理协程的挂起和恢复状态,会唤醒其他协程。(总结:当lua服务收到一个消息时,就会有一个协程去处理,当协程处理后,挂起时,就会给执行权给其他协程。)
1 2 3 4 5 6 7 8 9 10
functionsuspend(co, result, command) if command == "SUSPEND"then return dispatch_wakeup() --next elseif command == "QUIT"then coroutine.close(co) -- service exit return end end
上面的代码一般情况下 ,挂起时都会返回 “”SUSPEND””。
由 suspend 代码,调用 dispatch_wakeup :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
localfunctiondispatch_wakeup() whiletruedo local token = tremove(wakeup_queue,1)--从唤醒队列中不断取出协程 if token then local session = sleep_session[token] -- 从sleep表中查找 注意 这个表和 唤醒队列 具有不同的意义 if session then local co = session_id_coroutine[session] --通过session从映射表中找到之前挂起的协程 session_id_coroutine[session] = "BREAK"--将这个session在映射表中的条目标记为"BREAK" return suspend(co, coroutine_resume(co, false, "BREAK", nil, session)) end else break end end return dispatch_error_queue()--这里是为了处理这种情况:当前服务在苦苦等待服务x响应,而服务x已经有错误,且已经把错误通知当前服务了 end
dispatch_wakeup 主要的处理过程:
如果唤醒队列中取出 token 通过sleep表获得 session 。
通过session从映射表中找到之前挂起的协程,唤醒并执行。
执行挂起后回到 suspend 再次循环,直到唤醒队列为空。
跳出循环执行 dispatch_error_queue 。
dispatch_error_queue 代码:
1 2 3 4 5 6 7 8
localfunctiondispatch_error_queue() local session = tremove(error_queue,1) if session then local co = session_id_coroutine[session] session_id_coroutine[session] = nil return suspend(co, coroutine_resume(co, false, nil, nil, session)) end end
REG { name = "error", id = skynet.PTYPE_ERROR, unpack = function(...)return ... end, dispatch = _error_dispatch, }
localfunction_error_dispatch(error_session, error_source) skynet.ignoreret() -- don't return for error if error_session == 0then--收到一个即将下线的服务x的错误消息 -- error_source is down, clear unreponse set for session, srv inpairs(watching_session) do--原本发出请求给服务x,等待x响应的协程需要另外处理了 if srv == error_source then tinsert(error_queue, session) end end else -- capture an error for error_session if watching_session[error_session] then tinsert(error_queue, error_session) end end end
functionskynet.fork(func,...)--用于异步创建一个新的执行任务,func是要执行的函数,...是传递给该函数的参数 local n = select("#", ...) -- 获取可变参数的长度(参数个数) local co if n == 0then -- 当没有额外参数时,直接使用co_create函数创建执行func的协程 -- co_create通常从协程池获取空闲协程或创建新协程,并绑定执行函数 co = co_create(func) else local args = { ... } -- 当有额外参数时,将可变参数...收集到一个表args中 -- 创建协程,该协程的执行函数是一个闭包,用于调用func并传入打包好的参数args -- table.unpack(args,1,n) 将参数表args解包为参数列表 co = co_create(function() func(table.unpack(args,1,n)) end) end -- 计算队列新的尾部位置(当前尾部t + 1) local t = fork_queue.t + 1--尾部递增 fork_queue.t = t -- 更新fork_queue的尾指针t fork_queue[t] = co -- 将新创建的协程co放入fork_queue的尾部(实现FIFO队列的入队操作) return co end
local skynet = require"skynet" require"skynet.manager"-- import skynet.register
local db = {--保存了年龄 zhangsan = 12, lisi = 33, wangwu = 4 }
local command = {}
functioncommand.GET(key) return db[key] end
skynet.start(function() skynet.dispatch("lua", function(session, address, cmd, ...)--这里专门处理 lua类型 的请求 local f = command[cmd] --这里收到的 cmd 是"GET" if f then skynet.ret(skynet.pack(f(...)))--发送响应给main服务 end end)
end)
skynet.ret 把返回数据发送给 main服务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
functionskynet.ret(msg, sz) msg = msg or""
local co_session = session_coroutine_id[running_thread] --通过当前的协程找到session
session_coroutine_id[running_thread] = nil-- 获取到会话ID后,立即清除running_thread对应的记录 if co_session == 0then returnfalse-- send don't need ret end local co_address = session_coroutine_address[running_thread] --通过当前的协程找到请求者地址 local ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, msg, sz) --注意消息类型是 skynet.PTYPE_RESPONSE响应消息 if ret then returntrue end end
当db收到请求时,把请求者的信息保存下来。而且是以”当前协程”为 key ,”请求者信息”为 value 保存的。现在把信息取出来,通过 c.send 发送给给 main 服务的队列。