云风开发笔记 (4) : Agent 的消息循环及 RPC

2013/03/16 10:35
阅读数 1.4K

话接 开发笔记1 。我们将为每个玩家的接入提供一个 agent 服务。agent 相应玩家发送过来的数据包,然后给于反馈。

对于 agent 服务,是一个典型的包驱动模式。无论我们用 Erlang 框架,还是用 ZeroMQ 自己搭建的框架,agent 都会不会有太多的不同。它将利用一个单一的输入点获取输入,根据这些输入产生输出。这个输入点是由框架提供的。框架把 Agent 感兴趣的包发给它。

我们会有许多 agent ,在没有它感兴趣的包到来时,agent 处于挂起状态(而不是轮询输入点)。这样做可以大量节省 cpu 资源。怎样做的高效,就是框架的责任了,暂时我们不展开讨论框架的设计。

下面来看,一旦数据包进入 agent 应该怎样处理。

游戏服务逻辑复杂度很高,比起很多 web 应用来说,要复杂的多。我们不能简单的把外界来的请求都看成的独立的。把输入包设计成 REST 风格,并依赖数据服务构建这套系统基本上行不通。几乎每个包都有相关的上下文环境,就是说,输入和输入之间是有联系的。简单的把输入看成一组组 session 往往也做不到 session 间独立。最终,我把游戏服务器逻辑归纳以下的需求:

游戏逻辑由若干流程构成。比如,agent 可以看作是登陆流程和场景漫游服务流程。

每个流程中,服务器可以接收若干类型的包,每种类型的包大多可以立刻处理完毕。可以简单的认为提供了一个请求回应模式的处理机。

在处理部分数据包时,可以开启一个子流程,在子流程处理完毕前,不会收到父流程认可的数据包类型。(如果收到,即为非法逻辑)

在处理部分数据包时,也可以开启一个并行流程,这个流程和已有的流程处理逻辑共存。即,框架应根据包类型把数据包分发到不同的并行流程上。例如,在场景中漫游时,可能触发一个玩家交易的并发流程。(玩家交易行为需要多次交互确认的手续,不能一次性的完成。在交易过程中,其它如战斗、聊天的处理不可中断)

每个流程都可能因为某次处理后中断退出。继而进入后续的代码逻辑。这个中断退出行为,对于并发和非并发流程都有效。

RPC 可以看作一个简单的并发流程,即等待唯一返回包,并继续流程逻辑。

我希望能设计一个简单的 DSL 来描述我要的东西,大约像这个样子:

...1

listen :

    case message A :

        ...2

        listen :

            case message B:

                ...3

                break

            case message C:

                ...4

            case message D:

                ...5

        ...6

    case message E:

        ...7

        break

...8

listen :

    case message F:

        fork listen :

            case message G:

                ...9

            case message H:

                ...10

                break

        ...11

    case message I:

        ...12

解释一下上面这张表:

它表示,服务器启动后,会运行 ...1 这些代码,然后开始等待输入 或 两种消息 。如果收到 类消息,就执行 ...7 段代码,再因 break 跳出到 ...8 的位置。

如果收到 类消息,执行 ...2 代码,然后进程改为等待 B,C,D 类消息。此时,A,E 类消息都无效。直到收到 类消息后,执行流程到 ...6 并结束 消息的处理。再次等待输入 或 

...8 后的阶段大致相同,但在 类消息的处理中,使用了并行的输入(fork listen) 。此时,系统会同时等待 G/H 和 F/I 的输入。只到有 类消息输入,中断 的处理流程,执行完 ...11 后,系统去掉 G/H 的输入等待。

要实现这么一套 DSL ,首先需要用已有的动态语言先实现一下所有功能,等需求稳定后,再设计 DSL 的语法。支持 coroutine 的 lua 非常适合做这件事情。

这套东西的框架其实是一个 coroutine 的调度器。每个执行流(就是 case message),不论是不是并行的,都是一个 coroutine 。当遇到 listen fork break 的时候 coroutine yield 出来,由调度器来决定下一步该 resume 哪个分支就好了。

框架只需要接收外界传入的带类型信息的 message ,在调度器里维护一张消息类型到执行流的映射表,就可以正确的调度这些东西。

剩下的事情就是处理穿插在其中的代码块内,数据相互之间可见性的问题;以及给 RPC 提供一个更易用的接口了。

我大约花了不到 100 行 lua 代码来实现以上提到的功能的雏形,贴在下面,尚需完善:

--- agent.lua

local setmetatable = setmetatable

local coroutine = coroutine

local assert = assert

local unpack = unpack

local print = print

local pairs = pairs

module "agent"

local function create_event_group(self, events, thread , parent_group)

    local group = {

        event = {},

        thread = thread,

        parent = parent_group,

    }

    if parent_group then

        for k,v in pairs(parent_group.event) do

            self.event[k] = nil

        end

    end

    for k,v in pairs(events) do

        group.event[k] = { func = v , group = group }

        assert(self.event[k] == nil , k)

        self.event[k] = group.event[k]

    end

end

local function pop_event_group(self, group)

    for k in pairs(group.event) do

        self.event[k] = nil

    end

    if group.parent then

        for k,v in pairs(group.parent.event) do

            assert(self.event[k] == nil , k)

            self.event[k] = v

        end

    end

end

function create(main)

    local mainthread = coroutine.create(main)

    local self = setmetatable( { event = {} } , { __index = _M })

    local r , command , events = coroutine.resume(mainthread , self)

    assert(r , command)

    assert(command == "listen" ,  command)

    create_event_group(self, events, mainthread)

    return self

end

function send(self, msg , ...)

    local event = self.event[msg]

    if event == nil then

        print (msg .. " unknown" , ...)

    else

        local event_thread = coroutine.create(event.func)

        local group = event.group

        while true do

            local r, command, events = coroutine.resume(event_thread, self, ...)

            assert(r,command)

            if command == "listen" then

                create_event_group(self, events, event_thread , group)

                break

            elseif command == "fork" then

                create_event_group(self, events, event_thread)

                break

            elseif command == "break" then

                pop_event_group(self, group)

                event_thread = group.thread

                group = group.parent

            else

                break

            end

        end

    end

end

function listen(agent , msg)

    coroutine.yield("listen",msg)

end

function quit(agent)

    coroutine.yield "break"

end

function fork(agent, msg)

    coroutine.yield("fork",msg)

end

--- test.lua

local agent = require "agent"

a = agent.create(function (self)

    self:listen {

        login = function (self)

            self:listen {

                username = function(self , ...)

                    print("username", ...)

                    self:listen {

                        password = function(self, ...)

                            print("password", ...)

                            self:quit()

                        end

                    }

                    self:quit()

                end ,

            }

        end,

        ok = function (self)

            self:quit()

        end,

    }

    print "login ok"

    local q = 0

    self:listen {

        chat = function (self, ...)

            print("chat", ...)

        end,

        question = function (self , ...)

            print("question", ...)

            local answer = "answer" .. q

            q = q+1

            self:fork {

                [answer] = function (self, ...)

                    print("answer", ...)

                    self:quit()

                end

            }

        end,

    }

end)

a:send("login")

a:send("username","alice")

a:send("username","bob")

a:send("password","xxx")

a:send("login")

a:send("username","bob")

a:send("password","yyy")

a:send("chat","foobar")

a:send("ok")

a:send("chat", "hello")

a:send("question","?0")

a:send("chat", "world")

a:send("question","?1")

a:send("answer0","!0")

a:send("answer1","!1")

12 月 日补充:

周一上班和蜗牛同学讨论了一下需求, 最后我们商定抽象出一个 session 出来, 供 fork 出来的流程使用。因为并行的流程会使用相同的消息类型,但流程上是独立的。

根据 session id 和 message type 做两级分发要清晰一些。

然后,我们再对 rpc 调用做简单的封装,使用更简单。

改进后的代码就不再列出了。

展开阅读全文
打赏
0
9 收藏
分享
加载中
更多评论
打赏
0 评论
9 收藏
0
分享
返回顶部
顶部