文档章节

Lua Web快速开发指南(10) - 利用MQ实现异步任务、订阅/发布、消息队列

水果糖的小铺子
 水果糖的小铺子
发布于 06/25 20:43
字数 3712
阅读 17
收藏 0

Lua Web快速开发指南(10) - 利用MQ实现异步任务、订阅/发布、消息队列

本章节我们将学习如何使用MQ库.

MQ库简介

MQ库实现了各类消息代理中间件(Message Broker)的连接协议, 目前支持:redismqttstomp协议.

MQ库基于上述协议实现了: 生产者 -> 消费者订阅 -> 发布模型, 可以在不依赖其它服务的情况下独立完成任务.

API介绍

cf框架提供了多种MQ的封装, 当我们需要使用的时候需要根据实际的协议进行选择:

-- local MQ = require "MQ.mqtt"
-- local MQ = require "MQ.redis"
-- local MQ = require "MQ.stomp"

MQ:new(opt)

此方法将会创建一个的MQ对象实例.

opt是一个table类型的参数, 可以传递如下值:

  • host - 字符串类型, 消息队列的域名或者IP地址.

  • port - int类型, 消息队列监听的端口.

  • auth/db - 字符串类型, 仅在redis协议下用作登录认证或者db选择(没有可以不填写).

  • username/password - 字符串类型, 仅在stomp/mqtt协议下用作登录认证(没有可以不填写).

  • vhost - 字符串类型, 仅在使用某些特定消息队列server的时候填写(例如:rabbit).

  • keepalive - int类型, 仅在使用mqtt的时候用来出发客户端主动发出心跳包的时间.

以redis broker为示例:

local MQ = require "MQ.redis"
local mq = MQ:new {
  host = "localhost",
  port = 6379,
  -- db = 0,
  -- auth = "123456789",
}

MQ:on(pattern, function)

此方法用来订阅一个指定pattern. 当broker将消息传递到cf后, function将会被调用.

MQ库会为function注入一个table类型的参数msg, 此参数将在断开连接的时候为nil.

msg根据采用的协议的不同msg的内容也将有所不同. 具体内容以logging库的打印为准.

标准使用示例:

local Log = require("logging"):new()
mq:on("/notice", function(msg)
  if not msg then
    return Log:ERROR("['/notice'] SUBSCRIBE ERROR: 连接已断开.")
  end
  Log:DEBUG(msg)
end)

开发者可以同时订阅多个parttern.

MQ:emit(pattern, msg)

此方法用来向指定pattern发送消息. msg为字符串类型的消息.

使用示例:

mq:emit('/notice', '{"code":200,"data":[1,2,3,4,5,6,7,8,9,10]}')

单个MQ可以一直复用emit, 内部会创建一个写入队列去完成消息的顺序发送. (在多个实例中无法保证消息先后)

MQ:start()

此方法在作为独立运行服务端时候调用.

使用示例:

mq:start()

MQ:clsoe()

此方法可以关闭不再使用的MQ; 在任何情况下MQ使用完毕后都需要调用此方法来释放资源.

使用示例:

mq:close()

开始实践

为了演示更加直观, 这里仅使用redis作为broker中专消息.

1. 模拟生产者与消费者

我们模拟100个生产者向redis的/queue投递消息, 同时定义了一个消费者订阅/queue持续进行消费

代码如下:

local cf = require "cf"
local json = require "json"
local Log = require("logging"):new()
local MQ = require "MQ.redis"

cf.fork(function ()
  local consumer = MQ:new {
    host = "localhost",
    port = 6379
  }

  local count = 0
  consumer:on("/queue", function (msg)
    if not msg then
      Log:ERROR("[/queue]连接失败", "已经消费了"..count.."个消息")
      return
    end
    count = count + 1
    Log:DEBUG("开始消费:", msg, "已经消费了"..count.."个消息")
  end)

  consumer:start() -- Websoket内部无需使用这个方法
end)

for i = 1, 100 do
  cf.fork(function()

    local producer = MQ:new {
      host = "localhost",
      port = 6379
    }

    producer:emit("/queue", json.encode({
      code = 200,
      data = {
        id = math.random(1, 1 << 32)
      },
    }))

    producer:close()
  end)
end

输出如下:

[candy@MacBookPro:~/Documents/core_framework] $ ./cfadmin
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":3912595079}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了1个消息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":2938696189}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了2个消息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":3499397173}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了3个消息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":1711272453}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了4个消息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":3968420025}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了5个消息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":1887895479}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了6个消息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":3687986737}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了7个消息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":2823099353}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了8个消息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":2528190121}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了9个消息
[2019-06-25 16:05:36,240] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":4107999865}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了10个消息
.
..
...
....
.....
[2019-06-25 16:05:36,247] [@script/main.lua:19] [DEBUG] : 开始消费:, {["pattern"]="/queue", ["payload"]="{"code":200,"data":{"id":3608578767}}", ["source"]="/queue", ["type"]="pmessage"}, 已经消费了100个消息

为了方便阅读. 我们这里取出前10条与最后第100条并且将msg的数据结构打印出来方便阅读.

消费者的处理方式采用同步非阻塞处理的(当前业务未处理完成是不会继续处理下个消息的), 如果不想阻塞当前消息队列事件循环可以考虑自行fork一个协程来处理.

2. 推送消息给某个用户

用户通过认证后接入到Server后订阅自己专属的频道, 当有用户专属消息的时候任何服务都可以利用此方法进行业务消息推送.

我们

代码实现如下:

local cf = require "cf"
local json = require "json"
local Log = require("logging"):new()
local MQ = require "MQ.redis"

for uid = 1, 10 do
  cf.fork(function ()
    local client = MQ:new {
      host = "localhost",
      port = 6379
    }

    client:on("/user/"..uid.."/*", function (msg)
      if not msg then
        Log:ERROR("[/user/9257]连接失败")
        return
      end
      Log:DEBUG("UID:["..uid.."]接收到推送消息", msg)
    end)

    client:start() -- Websoket内部无需使用这个方法
  end)
end

local server = MQ:new {
  host = "localhost",
  port = 6379
}

cf.at(1, function (...)
  server:emit("/user/"..math.random(1, 10).."/ad", json.encode({
    code = 200,
    data = {}
  }))
end)

server:start()

运行后终端输出如下所示:

^C[candy@MacBookPro:~/Documents/core_framework] $ ./cfadmin
[2019-06-25 16:20:23,506] [@script/main.lua:18] [DEBUG] : UID:[9]接收到推送消息, {["source"]="/user/9/ad", ["pattern"]="/user/9/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:24,504] [@script/main.lua:18] [DEBUG] : UID:[4]接收到推送消息, {["source"]="/user/4/ad", ["pattern"]="/user/4/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:25,506] [@script/main.lua:18] [DEBUG] : UID:[8]接收到推送消息, {["source"]="/user/8/ad", ["pattern"]="/user/8/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:26,506] [@script/main.lua:18] [DEBUG] : UID:[8]接收到推送消息, {["source"]="/user/8/ad", ["pattern"]="/user/8/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:27,505] [@script/main.lua:18] [DEBUG] : UID:[10]接收到推送消息, {["source"]="/user/10/ad", ["pattern"]="/user/10/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:28,506] [@script/main.lua:18] [DEBUG] : UID:[2]接收到推送消息, {["source"]="/user/2/ad", ["pattern"]="/user/2/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:29,506] [@script/main.lua:18] [DEBUG] : UID:[4]接收到推送消息, {["source"]="/user/4/ad", ["pattern"]="/user/4/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:30,506] [@script/main.lua:18] [DEBUG] : UID:[8]接收到推送消息, {["source"]="/user/8/ad", ["pattern"]="/user/8/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:31,505] [@script/main.lua:18] [DEBUG] : UID:[3]接收到推送消息, {["source"]="/user/3/ad", ["pattern"]="/user/3/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:32,506] [@script/main.lua:18] [DEBUG] : UID:[6]接收到推送消息, {["source"]="/user/6/ad", ["pattern"]="/user/6/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:33,506] [@script/main.lua:18] [DEBUG] : UID:[5]接收到推送消息, {["source"]="/user/5/ad", ["pattern"]="/user/5/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:34,503] [@script/main.lua:18] [DEBUG] : UID:[7]接收到推送消息, {["source"]="/user/7/ad", ["pattern"]="/user/7/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:35,506] [@script/main.lua:18] [DEBUG] : UID:[4]接收到推送消息, {["source"]="/user/4/ad", ["pattern"]="/user/4/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:36,506] [@script/main.lua:18] [DEBUG] : UID:[6]接收到推送消息, {["source"]="/user/6/ad", ["pattern"]="/user/6/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
[2019-06-25 16:20:37,505] [@script/main.lua:18] [DEBUG] : UID:[10]接收到推送消息, {["source"]="/user/10/ad", ["pattern"]="/user/10/*", ["type"]="pmessage", ["payload"]="{"data":{},"code":200}"}
^C[candy@MacBookPro:~/Documents/core_framework] $

这里我们可以看到, 由消息发布到/user/9527/*下的topic的时候, 我们可以通过一次通配符订阅就可以接收到所有下属路由消息.

3. 消息广播

在各种领域内, 消息推送已经成为了一种最常见的业务. 我们现在来尝试利用MQ实现消息推送业务.

首先, 我们将script/main.lua的文件写入如下代码:

-- main.lua
local cf = require "cf"
local json = require "json"
local Log = require("logging"):new()
local MQ = require "MQ.redis"

for i = 1, 3 do
  cf.fork(function ()
    local uid = math.random(1, 1 << 32)
    local client_mq = MQ:new {
      host = "localhost", -- 主机名
      port = 6379,        -- 端口号
      -- db = nil,        -- 默认数据库
      -- auth = nil,      -- 密码
    }
    client_mq:on("/system/notice", function (msg)
      if not msg then
        Log:ERROR("['/system/notice'] SUBSCRIBE ERROR: 连接已断开.")
        return
      end
      Log:DEBUG("UID:["..uid.."]接收到消息: ", msg)
    end)

    client_mq:start()
  end)
end

local server_mq = MQ:new {
  host = "localhost", -- 主机名
  port = 6379,        -- 端口号
  -- db = nil,        -- 默认数据库
  -- auth = nil,      -- 密码
}

cf.at(3, function (args)
  server_mq:emit("/system/notice", json.encode({
    code = 200,
    msg = "系统即将关闭"
  }))
end)

server_mq:start()

这里我们用启动了3个协程来模拟用户订阅消息, 并且每个协程都使用不同的UID来打印. 然后再启动一个定时器模拟每三秒的消息推送业务.

打开终端运行./cfadmin后, 输出如下:

[candy@MacBookPro:~/Documents/core_framework] $ ./cfadmin
[2019-06-25 15:43:24,842] [@script/main.lua:19] [DEBUG] : UID:[3363385555]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系统即将关闭","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[2019-06-25 15:43:24,842] [@script/main.lua:19] [DEBUG] : UID:[1693861773]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系统即将关闭","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[2019-06-25 15:43:24,842] [@script/main.lua:19] [DEBUG] : UID:[3608578767]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系统即将关闭","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}

[2019-06-25 15:43:27,841] [@script/main.lua:19] [DEBUG] : UID:[3363385555]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系统即将关闭","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[2019-06-25 15:43:27,841] [@script/main.lua:19] [DEBUG] : UID:[1693861773]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系统即将关闭","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[2019-06-25 15:43:27,841] [@script/main.lua:19] [DEBUG] : UID:[3608578767]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系统即将关闭","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}

[2019-06-25 15:43:30,841] [@script/main.lua:19] [DEBUG] : UID:[3363385555]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系统即将关闭","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[2019-06-25 15:43:30,841] [@script/main.lua:19] [DEBUG] : UID:[1693861773]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系统即将关闭","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[2019-06-25 15:43:30,841] [@script/main.lua:19] [DEBUG] : UID:[3608578767]接收到消息: , {["pattern"]="/system/notice", ["payload"]="{"msg":"系统即将关闭","code":200}", ["type"]="pmessage", ["source"]="/system/notice"}
[candy@MacBookPro:~/Documents/core_framework] $

从终端的输出内容中可以看到, 我们确实每隔3秒就收到了一次消息推送.

4. 对基于Websocket协议的客户端实现业务推送

首先, 我们需要建立一套基于httpd库的Websocket路由. 让我们打开script/main.lua文件并将下面的代码写入进去.

local httpd require "httpd"

local app = httpd:new("Web")

app:ws('/ws', require "ws")

app:listen("0.0.0.0", 8080)

app:run()

Websocket必须在建立与客户端的连接完成的同时利用MQ库订阅/chat. 每当客户端发送消息过来触发on_message的时候, 都将会消息直接发布到/chat内部通过中转后实现推送聊天.

然后我们利用前面章节所学的Websocket指南, 编写一段简单的Websocket路由处理代码. 由于示例代码没有UID生成机制. 为了方便调试, 我们随机生成32位整数作为唯一ID标识符.

script/ws.lua具体代码如下所示:

local MQ = require "MQ.redis"
local class = require "class"

local websocket = class("websocket")

function websocket:ctor (opt)
  self.ws = opt.ws
  self.id = math.random(1, 1 << 32)
end

function websocket:on_open ()
  self.mq = MQ:new { host = 'localhost', port = 6379 }
  self.mq:on("/chat", function (msg)
    if not msg then
      return
    end
    self.ws:send(msg.payload)
  end)
end

function websocket:on_message (data, typ)
  if self.mq then
    self.mq:emit("/chat", data)
  end
  print("客户端["..self.id.."]发送了消息:["..data.."]")
end

function websocket:on_error (error)

end

function websocket:on_close ()
  if self.mq then
    self.mq:close()
    self.mq = nil
  end
end

return websocket

注意: 我们需要记住当客户端连接断开的时候记得关闭订阅回收资源. 启动./cfadmin, 查看是否正常运行.

让我们下载客户端工具, 并且安装到我们的Chrome浏览器上. 提取码:cgwr

现在, 我们运行客户端工具在地址栏输入localhost:8080/ws连接我们刚刚启动的Websocket Server, 然后开始向服务器发送消息.

如果从终端中和客户端看到类似的输出内容, 说明我们的示例编写完成.

[candy@MacBookPro:~/Documents/core_framework] $ ./cfadmin
[2019/06/25 20:11:59] [INFO] httpd正在监听: 0.0.0.0:8080
[2019/06/25 20:11:59] [INFO] httpd正在运行Web Server服务...
[2019/06/25 20:12:01] - ::1 - ::1 - /ws - GET - 101 - req_time: 0.000095/Sec
[2019/06/25 20:12:17] - ::1 - ::1 - /ws - GET - 101 - req_time: 0.000080/Sec
客户端[1693861773]发送了消息:[hello! 我是2]
客户端[1693861773]发送了消息:[hello! 我是2]
客户端[1693861773]发送了消息:[hello! 我是2]
客户端[1693861773]发送了消息:[hello! 我是2]
客户端[1693861773]发送了消息:[hello! 我是2]
客户端[1693861773]发送了消息:[hello! 我是2]
客户端[1693861773]发送了消息:[hello! 我是2]
[2019/06/25 20:12:23] - ::1 - ::1 - /ws - GET - 101 - req_time: 0.000052/Sec
客户端[3363385555]发送了消息:[hello! 我是1]
客户端[3363385555]发送了消息:[hello! 我是1]
客户端[3363385555]发送了消息:[hello! 我是1]
客户端[3363385555]发送了消息:[hello! 我是1]
客户端[3363385555]发送了消息:[hello! 我是1]
客户端[3363385555]发送了消息:[hello! 我是1]
客户端[3363385555]发送了消息:[hello! 我是1]
客户端[1693861773]发送了消息:[hello! 我是2]

最后

上述代码仅用redis协议进行模拟, 其它协议请参考Wiki.

学习完成

至此Lua Web开发指南已经编写完毕. cf框架都内置库非常的多, 维护框架都同时还要编写使用教程. 作者不可能一个一个介绍完全.

软件开发领域内不仅仅需要师傅领进门, 个人修行也是一种能力的体现. cf框架已经有了专属的QQ讨论社区: 727531854, 点击加群.

目前内部就作者一个人在里面. 如果您也对它比较感兴趣, 欢迎您到群里来一起交流技术.

© 著作权归作者所有

水果糖的小铺子
粉丝 23
博文 153
码字总数 73315
作品 1
广州
程序员
私信 提问
Lua Web快速开发指南(1) - 初识cf框架

cf是什么? cf全称为: CoreFramework. 一个基于Reactor事件驱动与协程的lua高性能网络框架, 目前主要面向HTTP Application开发. cf内部主要实现了包括HTTP与HTTP Over Websoket协议的Server,...

水果糖的小铺子
06/14
0
0
MQ消息中间件(工作+面试)

AMQP协议介绍 AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。 AMQP的主要特征是面向消息、队列、路由(包括点对点和...

Lunaqi
2017/11/13
0
0
再谈队列技术

再谈消息队列技术 上周,我们举办了第二届技术沙龙,我这边主要演讲了消息队列技术的议题,现分享给大家: 在我们团队内部,随着消息应用中心(任务中心)的广泛应用,有时候我们感觉不到消息...

yuanzhitang
2017/07/03
0
0
用redis实现消息队列(实时消费+ack机制)

消息队列 首先做简单的引入。 MQ主要是用来: 解耦应用、 异步化消息 流量削峰填谷 目前使用的较多的有ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ等。 网上的资源对各种情况都有详...

美的让人心动
2018/05/03
0
0
分布式异步编程框架--Koper

Koper Koper是一个基于消息和事件驱动的分布式异步编程框架, Koper的设计目标是为大型场景提供高性能高吞吐的简单编程方案。 Koper为分布式环境提供了简化的消息监听和数据事件监听模型,它...

raymondkk
2016/11/18
568
1

没有更多内容

加载失败,请刷新页面

加载更多

拥有有趣灵魂的程序员们,程序员访谈(一)

点击上方关注我们,让小care关爱你! 程序员群体一直都是低调多金的代表,而近段时间以来,程序员在网络上除了高薪之外,总是会和屌丝、苦逼、格子衫、没情趣...联系在一起。黑程序员的段子也...

ITCare
今天
14
0
Linux输入法fcitx的安装问题

Fcitx 总共要安装的包如下 fcitxfcitx-binfcitx-config-commonfcitx-config-gtk | fcitx-config-gtk2fcitx-datafcitx-frontend-allfcitx-frontend-gtk2fcitx-frontend-gtk3......

CHONGCHEN
今天
8
0
网络基础

前言: 最近整理一些以前的学习笔记(有部分缺失,会有些乱,日后再补)。 过去都是存储在本地,此次传到网络留待备用。 计算机网络的功能: 1.数据通信; 2.资源共享; 3.增加数据可靠性; 4....

迷失De挣扎
今天
7
0
spring boot升级到spring cloud

1、先升级spring boot 版本到2.1.3 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>2.1.3.RELEAS......

moon888
今天
12
0
从蓝鲸视角谈DevOps

DevOps源于Development和Operations的组合 常见的定义 DevOps是一种重视“软件开发人员(Dev)”和“IT运维技术人员(Ops)”之间沟通合作的文化、运动或惯例。透过自动化“软件交付”和“架构变...

嘉为科技
今天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部