Skynet_db服务处理请求

Skynet_db服务处理请求

db服务处理main服务发送过来的请求

db 服务指定的lua文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
--db.lua
local skynet = require "skynet"
require "skynet.manager" -- import skynet.register

local db = {--保存了年龄
zhangsan = 12,
lisi = 33,
wangwu = 4
}

local command = {}

function command.GET(key)
return db[key]
end


skynet.start(function()

skynet.dispatch("lua", function(session, address, cmd, ...)--对应lua消息注册任务函数,专门处理 lua类型 的请求

local f = command[cmd] --这里收到的 cmd 是"GET" ,参数是 "zhangsan",main服务传入的key
if f then
skynet.ret(skynet.pack(f(...)))--发送响应给main服务,pack打包,ret发回(ret函数在本文最后面)
end
end)

end)

lua服务把消息队列的消息取出后,调用c的回调函数,最终是把消息交给一个指定的lua函数处理。

snlua服务的lua函数就是 skynet.dispatch_message 。处理消息主要分为两大步:

  1. raw_dispatch_message 函数
  2. 不断的从 fork_queue 队列中把协程取出来做处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
function skynet.dispatch_message(...)
local succ, err = pcall(raw_dispatch_message,...)
while true do
if fork_queue.h > fork_queue.t then
-- queue is empty
fork_queue.h = 1 --head
fork_queue.t = 0 --tail
break
end
-- pop queue
local h = fork_queue.h
local co = fork_queue[h]
fork_queue[h] = nil
fork_queue.h = h + 1

local fork_succ, fork_err = pcall(suspend,co,coroutine_resume(co))

end
assert(succ, tostring(err))
end

在lua服务中会收到请求消息和响应消息。(a发送一个请求给b,然后等待b回应。b收到的这个请求,就是请求消息。当a收到b的回应消息时,这个消息就是响应消息。此时db服务收到了一个lua类型的消息。)

raw_dispatch_message 函数

skynet.dispatch_message 代码,调用 raw_dispatch_message

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
local function raw_dispatch_message(prototype, msg, sz, session, source)
-- prototype消息类型(name = "lua",--协议类型名字;id = skynet.PTYPE_LUA,--协议id),
-- msg内存块指针,
-- sz内存块长度,
-- session是main服务中生成的session号,
-- source是main服务地址
if prototype == 1 then --处理响应 -- skynet.PTYPE_RESPONSE = 1, read skynet.h
--db服务这里是请求消息,再次略
else --这里主要是处理lua text socket 等消息类型
local p = proto[prototype] --通过传入的prototype类型获取协议,这里的lua类型

local f = p.dispatch --获取任务函数,在db服务中注册的dispatch任务函数
if f then
local co = co_create(f) --获取一个协程对象并设置任务函数f
session_coroutine_id[co] = session --保存session以便找到回去的路;注意这里的session是其他服务独立产生的,所以不同的请求者发过来的session可以是相同的
session_coroutine_address[co] = source --保存source以便找到回去的路 即记录请求者是谁

suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz))) --unpack解包成lua对象

end
end
end

代码逻辑:

  1. 首先根据消息类型找到对应的协议,这里是lua类型。
  2. 获取任务处理函数 f
  3. 获取一个协程,给协程设置 f 函数
  4. 唤醒上面3的协程去处理收到的消息

注意:唤醒协程开始执行前,必须先保存请求者的信息,不然后面处理完请求后,都不知道把结果发送给谁。(这里是以 cokey 保存请求者信息的。为什么不用 session ?原因是,不同的请求者发送过来的 session 可能是相同的。因为 session 是每个服务自己产生的。a服务可以产生一个 session 号9527,b服务也可以产生一个 session 号9527。)

当调用 skynet.sleep 或者 skynet.wait 时,会把指定协程加入到 sleep_session 睡眠表中,标识协程是睡眠状态,同时当前协程会挂起。在合适的时机调用 skynet.wakeup 就会把指定的睡眠协程加入到 wakeup_queue 唤醒队列中。之后唤醒队列里面的协程就会被调度,得到执行。

注意:把协程加入唤醒队列,不代表马上就唤醒协程执行。

唤醒协程是通过 coroutine_resume(co, session,source, p.unpack(msg,sz)) 。任务函数 f 是db服务在入口函数里设置的。当db的业务处理完成后,即协程挂起时,就会执行 suspendsuspend 是一个框架函数,用于管理协程的挂起和恢复状态,会唤醒其他协程。(总结:当lua服务收到一个消息时,就会有一个协程去处理,当协程处理后,挂起时,就会给执行权给其他协程。)

1
2
3
4
5
6
7
8
9
10
function suspend(co, result, command)

if command == "SUSPEND" then
return dispatch_wakeup() --next
elseif command == "QUIT" then
coroutine.close(co)
-- service exit
return
end
end

上面的代码一般情况下 ,挂起时都会返回 “”SUSPEND””。

suspend 代码,调用 dispatch_wakeup

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
local function dispatch_wakeup()
while true do
local token = tremove(wakeup_queue,1)--从唤醒队列中不断取出协程
if token then
local session = sleep_session[token] -- 从sleep表中查找 注意 这个表和 唤醒队列 具有不同的意义
if session then
local co = session_id_coroutine[session] --通过session从映射表中找到之前挂起的协程

session_id_coroutine[session] = "BREAK" --将这个session在映射表中的条目标记为"BREAK"
return suspend(co, coroutine_resume(co, false, "BREAK", nil, session))
end
else
break
end
end
return dispatch_error_queue()--这里是为了处理这种情况:当前服务在苦苦等待服务x响应,而服务x已经有错误,且已经把错误通知当前服务了
end

dispatch_wakeup 主要的处理过程:

  1. 如果唤醒队列中取出 token 通过sleep表获得 session
  2. 通过session从映射表中找到之前挂起的协程,唤醒并执行。
  3. 执行挂起后回到 suspend 再次循环,直到唤醒队列为空。
  4. 跳出循环执行 dispatch_error_queue

dispatch_error_queue 代码:

1
2
3
4
5
6
7
8
local function dispatch_error_queue()
local session = tremove(error_queue,1)
if session then
local co = session_id_coroutine[session]
session_id_coroutine[session] = nil
return suspend(co, coroutine_resume(co, false, nil, nil, session))
end
end

dispatch_error_queue 主要的处理过程:

  1. error_queue 错误队列有错误类型消息时,调用 suspend 进行处理。
  2. error_queue 错误队列也没有消息时,函数调用回到 raw_dispatch_message

以上几个函数的共同特点是最后都会调用 suspend 。这里不是递归调用,而是lua的尾调用。调用帧是不会一直递增的。lua函数尾调用

这里的调用逻辑是:

skynet.dispatch_message -> raw_dispatch_message-> suspendsuspend 进行尾调用持续唤醒)

error_queue 错误队列:当协程a发送请求给x服务后,会等待x服务给出响应。如果此时x服务退出或者是处理出现错误,x服务会给a服务发送一个错误类型的消息。针对错误消息的处理函数会把协程a加入错误队列,等待时机唤醒执行,不然协程a就会一直挂起。

错误消息的处理函数:主要是把之前等待的协程加入错误队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
	REG {
name = "error",
id = skynet.PTYPE_ERROR,
unpack = function(...) return ... end,
dispatch = _error_dispatch,
}

local function _error_dispatch(error_session, error_source)
skynet.ignoreret() -- don't return for error
if error_session == 0 then --收到一个即将下线的服务x的错误消息
-- error_source is down, clear unreponse set
for session, srv in pairs(watching_session) do --原本发出请求给服务x,等待x响应的协程需要另外处理了
if srv == error_source then
tinsert(error_queue, session)
end
end
else
-- capture an error for error_session
if watching_session[error_session] then
tinsert(error_queue, error_session)
end
end
end

fork_queue 队列取出协程并处理

处理 fork_queue 次级异步任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
function skynet.dispatch_message(...)
-- 使用pcall保护调用raw_dispatch_message,目的是捕获处理主消息过程中的任何错误
-- raw_dispatch_message 是实际处理消息的内部函数,负责查找对应的回调函数并执行
local succ, err = pcall(raw_dispatch_message,...)
while true do --专门处理fork_queue中等待执行的协程任务
if fork_queue.h > fork_queue.t then -- 检查fork_queue是否为空(头指针h大于尾指针t表示队列空)
-- 队列为空时,重置队列的头尾指针以复用空间,避免表无限增长
fork_queue.h = 1 --head 重置头指针到起始位置
fork_queue.t = 0 --tail 重置尾指针到头指针之前,表示空队列
break
end
-- pop queue: 以下代码从fork_queue的头部取出一个协程对象
local h = fork_queue.h -- 记录当前头指针的位置
local co = fork_queue[h] -- 获取头指针位置对应的协程对象
fork_queue[h] = nil -- 将表中该位置的引用置空,帮助垃圾回收
fork_queue.h = h + 1 -- 头指针向后移动一位,指向下一个待处理元素

-- 使用pcall保护执行,目的是恢复执行刚取出的协程co,并捕获执行中的错误
local fork_succ, fork_err = pcall(suspend,co,coroutine_resume(co))
-- 注意:这里没有对fork_succ和fork_err进行处理。即使某个fork任务出错,也不会影响主消息循环和其他fork任务。
-- 在实际生产环境中,这里可能需要记录错误日志。
end
-- 检查主消息处理(raw_dispatch_message)是否成功。若失败,用tostring转换错误信息并断言抛出错误。
-- 这意味着主消息处理过程中的错误是致命的,会导致服务中断。
assert(succ, tostring(err))
end

以上主要是从 fork_queue 队列中不断的取出协程,然后依旧是调用suspend来处理。

fork_queue 里面的协程是怎么来的?一般当在业务层需要协程的时候,会调用 skynet.fork(func,...) 来用于异步创建一个新的执行任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
function skynet.fork(func,...)	--用于异步创建一个新的执行任务,func是要执行的函数,...是传递给该函数的参数
local n = select("#", ...) -- 获取可变参数的长度(参数个数)
local co
if n == 0 then
-- 当没有额外参数时,直接使用co_create函数创建执行func的协程
-- co_create通常从协程池获取空闲协程或创建新协程,并绑定执行函数
co = co_create(func)
else
local args = { ... } -- 当有额外参数时,将可变参数...收集到一个表args中
-- 创建协程,该协程的执行函数是一个闭包,用于调用func并传入打包好的参数args
-- table.unpack(args,1,n) 将参数表args解包为参数列表
co = co_create(function() func(table.unpack(args,1,n)) end)
end
-- 计算队列新的尾部位置(当前尾部t + 1)
local t = fork_queue.t + 1 --尾部递增
fork_queue.t = t -- 更新fork_queue的尾指针t
fork_queue[t] = co -- 将新创建的协程co放入fork_queue的尾部(实现FIFO队列的入队操作)
return co
end

关键机制说明

  1. 两级队列与调度:
    • skynet.dispatch_message首先处理主消息 (raw_dispatch_message)。
    • 主消息处理完毕后,才集中处理本次消息驱动过程中通过 skynet.fork产生的次级异步任务fork_queue中的协程)。这保证了主消息处理的及时性,并将异步任务积压到同一批次处理,避免在单个消息处理过程中无限创建和切换协程,提高了整体性能和处理公平性。
  2. 错误处理差异:
    • 主消息处理 (raw_dispatch_message) 的错误通过 assert抛出,通常是致命的,可能导致服务处理循环中断。
    • Fork 任务(协程)中的错误被 pcall捕获但仅存储在 fork_err中,未被处理。这意味着单个 fork 任务的失败不会影响其他 fork 任务或主消息循环,但可能需要额外的日志记录和监控。
  3. Fork 队列管理:
    • fork_queue是一个数组模拟的 FIFO 队列,通过头尾指针 ht进行管理。
    • 队列被处理完毕后(h > t),头尾指针会被重置h=1, t=0),这是为了复用数组空间,防止数组索引无限增长(Lua 表的数组部分大小是动态的,但重用空间是一种优化和良好实践)。

db服务返回响应给main服务

db服务是在处理完业务后(拿到年龄信息),把信息发送给main服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
--db.lua

local skynet = require "skynet"
require "skynet.manager" -- import skynet.register

local db = {--保存了年龄
zhangsan = 12,
lisi = 33,
wangwu = 4
}

local command = {}

function command.GET(key)
return db[key]
end


skynet.start(function()

skynet.dispatch("lua", function(session, address, cmd, ...)--这里专门处理 lua类型 的请求

local f = command[cmd] --这里收到的 cmd 是"GET"
if f then
skynet.ret(skynet.pack(f(...)))--发送响应给main服务
end
end)

end)

skynet.ret 把返回数据发送给 main服务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
function skynet.ret(msg, sz)
msg = msg or ""

local co_session = session_coroutine_id[running_thread] --通过当前的协程找到session

session_coroutine_id[running_thread] = nil -- 获取到会话ID后,立即清除running_thread对应的记录
if co_session == 0 then
return false -- send don't need ret
end
local co_address = session_coroutine_address[running_thread] --通过当前的协程找到请求者地址
local ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, msg, sz) --注意消息类型是 skynet.PTYPE_RESPONSE响应消息
if ret then
return true
end

end

当db收到请求时,把请求者的信息保存下来。而且是以”当前协程”为 key ,”请求者信息”为 value 保存的。现在把信息取出来,通过 c.send 发送给给 main 服务的队列。