EasyCVR编译中通过NSQ实现消息传输及发送的使用总结

原创
2021/11/12 09:35
阅读数 27

EasyCVR 是TSINGSEE青犀视频开发的高稳定、高接入性的视频平台,可接入的协议丰富,且可通过国标协议级联。EasyCVR 各模块之间进行消息通信时,需要一款消息中间件进行消息的传输和发送。在调研各种 MQ 中间件后,确定采用一款 Go 语言实现的消息中间件实现业务,因此本文对其进行使用总结。

NSQ 是一款 Go 语言实现的分布式消息队列,以下以版本 1.2.1 为示例编写。

启动

1. 主要的三个可执行文件

  • nsqd:负责收发消息,可以认为就是队列。并且向 nsqlookupd 注册自己的信息。必须使用,一般建议和发送消息端在一起。保证消息至少成功发送一次。
  • nsqlookupd:服务发现中心,因为 nsq 支持分布式部署,所以这个就是微服务中的注册中心的概念。如果需要多台 nsqd 必须使用。
  • nsqadmin:一个 web 管理界面程序,可以通过 web 浏览器查看很多信息。可以不使用。

2. 执行程序

  1. nsqlookupd -http-address=0.0.0.0:4161 -tcp-address=0.0.0.0:4160
  2. nsqd –lookupd-tcp-address=127.0.0.1:4160 -tcp-address=0.0.0.0:4154 -http-address=0.0.0.0:4155
  3. nsqadmin –lookupd-http-address=127.0.0.1:4161 -http-address=0.0.0.0:4171
  4. -http-address 代表的是监听的 http 端口
  5. -tcp-address 代表的是监听的 tcp 端口
  6. –lookupd-tcp-address 代表 nsqd 向 nsqlookupd 进行服务注册的端口
  7. –lookupd-http-address 代表 nsqadmin 向 nsqlookupd 获取 http 的信息
  8. nsqadmin 访问 http://127.0.0.1:4171/, 即可访问界面查看所有的信息

代码示例编写,生产者和消费者

首先在 go mod 项目中拉取对应的 go 工具:

```go
go get github.com/nsqio/go-nsq

生产者的代码如下:

package main

import (
   "fmt"
   "github.com/nsqio/go-nsq"
   "log"
   "os"
   "os/signal"
   "syscall"
   "time"
   "zhangqiadams.com/gotools/model/consts"
)

func main() {
   config := nsq.NewConfig()
   // 1. 向 nsqd 的 tcp 端口发送消息,因此进行对应的配置
   producer, err := nsq.NewProducer("127.0.0.1:4154", config)
   if err != nil {
      log.Fatal(err)
   }

   messageBody := []byte("hello world")
   topicName := "topic2"

   // 2. 同步推流到 nspd, 同步推流代表等待 nspd 的响应,如果发送失败返回错误。
   // PublishAsync 代表是异步推送消息给 nspd,发送完消息后立刻返回
   err = producer.Publish(topicName, messageBody)
   fmt.Println("发送消息时间为", time.Now().Format(consts.TimeFormat))
   if err != nil {
      log.Fatal(err)
   }


   sigChan := make(chan os.Signal, 1)
   signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
   <-sigChan

   // 3. 停止生产者,一般在停止服务,停止进程的时候需要调用
   producer.Stop()
}

消费者代码如下

package main

import (
   "fmt"
   "github.com/nsqio/go-nsq"
   "log"
   "os"
   "os/signal"
   "syscall"
   "time"
   "zhangqiadams.com/gotools/model/consts"
)

type myMessageHandler struct{}

// HandleMessage 为接口,如果返回 nil, nsq 收到 nil 就会标记消息已经被成功处理
func (h *myMessageHandler) HandleMessage(m *nsq.Message) error {
   if len(m.Body) == 0 {
      // Returning nil will automatically send a FIN command to NSQ to mark the message as processed.
      // In this case, a message with an empty body is simply ignored/discarded.
      return nil
   }

   // do whatever actual message processing is desired
   err := h.processMessage(m.Body)

   // Returning a non-nil error will automatically send a REQ command to NSQ to re-queue the message.
   return err
}

// 自定义的处理消息函数
func (h *myMessageHandler) processMessage(m []byte) error {
   fmt.Println("接收消息时间为", time.Now().Format(consts.TimeFormat))
   fmt.Println("收到消息为", string(m))
   return nil
}

func main() {
   // 1. 初始化配置
   config := nsq.NewConfig()
   consumer, err := nsq.NewConsumer("topic2", "channel2", config)
   if err != nil {
      log.Fatal(err)
   }

   // 2. 消息处理, AddHandler 内部默认采用 1 个协程处理返回的消息
   // AddConcurrentHandlers 可以自定义多少个协程处理返回的消息
   consumer.AddHandler(&myMessageHandler{})

   // 3. 连接 nsqlookupd 用于搜索对应的 nsqd, 连接 nsqlookupd 对应的 http 地址
   // 可以直接发送信息 ConnectToNSQD, ConnectToNSQDs, ConnectToNSQLookupds.
   // 如果不需要分布式,只需要发送消息,暂时不需要分布式,可以直接连接 nsqd 的 tcp 地址
   // 实测使用 ConnectToNSQLookupd 的过程中,如果是新的 topic 和 channel,需要等待大约40s的时间才能收到第一次消息,后面立刻能收到消息
   // 不使用分布式,直接使用 ConnectToNSQD,基本立刻能收到消息
   err = consumer.ConnectToNSQLookupd("127.0.0.1:4154")
   if err != nil {
      log.Fatal(err)
   }

   sigChan := make(chan os.Signal, 1)
   signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
   <-sigChan

   // 程序退出,关闭所有的消费者
   consumer.Stop()
}

通过以上代码,消费者就可以收到生产者生产的 hello world 消息。

展开阅读全文
go
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部