文档章节

NSQLookupd Main方法分析

智深
 智深
发布于 2014/08/01 11:23
字数 926
阅读 386
收藏 6
上一篇  http://my.oschina.net/astute/blog/296955  已经分析了 nsqlookupd 启动时的命令行解析,最终构造了 NSQLookupd 结构体,这是一个很重要的结构体。

type NSQLookupd struct {
     options      *nsqlookupdOptions     // 命令行参数
     tcpAddr      *net.TCPAddr     // tcp 端口
     httpAddr     *net.TCPAddr     // http 端口
     tcpListener  net.Listener
     httpListener net.Listener
     waitGroup    util.WaitGroupWrapper
     DB           *RegistrationDB
}


这篇文章分析下 daemon.Main() 执行过程


1、创建 context,内部只有一个指向NSQLookupd的指针

context := &Context{l}


2、创建 listener;Listen 方法支持面向流的监听,包括 unix domain stream socket。

tcpListener, err := net.Listen("tcp", l.tcpAddr.String())


3、创建 tcpServer struct,内部只含有一个 context 指针,为什么需要对 NSQLookupd

tcpServer 实现了 Handle(net.Conn) 方法,表明实现了 TCPHandler 接口

tcpServer := &tcpServer{context: context}


4、分析 l.waitGroup.Wrap(func() { util.TCPServer(tcpListener, tcpServer) })

waitGroup 是结构体 util.WaitGroupWrapper 实例,此结构体继承自 sync.WaitGroup;WaitGroup 等待一组协程结束。主协程调用 Add 方法来设置等待协程的数目;func() { util.TCPServer(tcpListener, tcpServer) } 这是一个匿名函数。

总结:主协程调用 Wrap 方法,调用 Add,设置等待的协程数目为1,启动新的协程去调用 匿名函数,主协程最后会在 exitChan 通道上等待消息,如果收到中断信号,关闭 tcpListener, httpListner,最后在 waitGroup 上等待协程结束。需要 waitGroup 的目的,就是为了 主协程 和 监听协程之间的状态同步。


5、监听协程执行的逻辑如下,简化版本,忽略日志和错误处理。监听协程循环 Accept,然后分配新的协程调用 handler 处理接收的 clientConn。处理客户端连接的协程,我把它叫做 工作协程。

func TCPServer(listener net.Listener, handler TCPHandler) {

     for {
          clientConn, err := listener.Accept()
          go handler.Handle(clientConn)
     }

}


6、工作协程的处理逻辑,最终调用的是 tcpServer 的 Handle 方法,这是最重要的一个方法,重点分析

func (p *tcpServer) Handle(clientConn net.Conn)

buf := make([]byte, 4)
_, err := io.ReadFull(clientConn, buf)

读取客户端发送过来的前4个字节,里面含有协议的版本号。看到了没有,完全阻塞式编程,没有屎一样的事件驱动!

判断版本号后,最终调用 LookupProtocolV1 的 IOLoop 方法


7、方法分析 func (p *LookupProtocolV1) IOLoop(conn net.Conn) error

client := NewClientV1(conn)     // ClientV1 struct 继承 net.Conn,封装一个 PeerInfo

reader := bufio.NewReader(client) // 创建一个带缓冲的 reader

for {     // 此部分代码有省略
    line, err = reader.ReadString('\n’)     // 按行来处理

    line = strings.TrimSpace(line)
    params := strings.Split(line, " ")

    response, err := p.Exec(client, reader, params)  // 执行命令

    if response != nil {
        _, err = util.SendResponse(client, response)     // 发送 response
    }
}

IO 出错,或者执行命令出错,会退出循环。


8、命令分派

func (p *LookupProtocolV1) Exec(client *ClientV1, reader *bufio.Reader, params []string) ([]byte, error)

switch params[0] {
case "PING":
    return p.PING(client, params)
// .........
}

根据第一个参数做命令区分,调用响应的指令


9、命令执行,以 ping 命令为例,看看命令如何执行

func (p *LookupProtocolV1) PING(client *ClientV1, params []string) ([]byte, error)

atomic.StoreInt64(&client.peerInfo.lastUpdate, now.UnixNano()) // 更新客户端的状态

return []byte("OK"), nil      // 返回 “OK”,经由第 7 步的 util.SendResponse 发送给 client

至此 tcpServer 的处理流程就介绍完了。


10、和 2,3 步对应的,还会创建 httpListener, httpServer, 主要负责 topic,channel 相关的操作

httpListener, err := net.Listen("tcp", l.httpAddr.String())
httpServer := &httpServer{context: context}

处理 httpServer 的协程, 会调用 func() { util.HTTPServer(httpListener, httpServer, "HTTP") }


11、分析 util.HTTPServer 函数

关键是如下两句
server := &http.Server{
    Handler: handler,
}
err := server.Serve(listener)

默认的 http handler 就是 httpServer 本身


12、分析 httpServer 的 ServeHTTP 方法

err := s.v1Router(w, req)     // 路由请求


13、分析 v1Router

switch req.URL.Path {
case "/ping”:
//......
}

按照请求路径分派


14、以 "/topics” 为例,看如何处理请求

执行函数 util.NegotiateAPIResponseWrapper(w, req,
               func() (interface{}, error) { return s.doTopics(req) })


内部调用传入的匿名函数,也就是执行 s.doTopics(req),会把所有的 topic 对应的 producer 信息组成 map[string]interface{} 返回给客户端 

© 著作权归作者所有

智深
粉丝 91
博文 65
码字总数 42744
作品 0
朝阳
程序员
私信 提问
NSQ系列之nsqlookupd代码分析一(初探nsqlookup)

NSQ系列之nsqlookupd代码分析一(初探nsqlookup) 是守护进程负责管理拓扑信息。客户端通过查询 来发现指定话题()的生产者,并且提供 节点广播话题()和通道()信息。 有两个接口: 接口...

大蓝妹
2015/08/27
1.8K
0
NSQ系列之nsqlookupd代码分析二(初识nsqlookupd tcpServer)

NSQ系列之nsqlookupd代码分析二(初识nsqlookupd tcpServer) 在上一章初探中了解到,中开启了一个 和一个 ,那么今天我们来初步了解下。 废话不多说,直接上代码吧,简单粗暴点比较好。 通过...

大蓝妹
2015/08/29
530
0
nsqlookupd 入口文件分析

nsq 中 nsqlookupd 角色相对简单,适合作为分析 nsq 的入口 apps/nsqlookupd/nsqlookupd.go nsqlookupd 是一个独立的程序,所以放到 apps 目录下,依赖内部包 nsqlookupd, util,外部包: gi...

智深
2014/07/31
458
0
NSQ系列之nsqlookupd代码分析四(详解nsqlookupd中的RegitrationDB)

NSQ系列之nsqlookupd代码分析四(详解nsqlookupd中的RegitrationDB操作方法) 上一章我们大致了解了的中的协议的处理逻辑,里面有提到一个存储的以及 数据信息的的一些操作方法。今天我们就来...

大蓝妹
2015/09/02
306
2
NSQ系列之nsqlookupd代码分析三(详解tcpServer 中的IOLoop方法)

NSQ系列之nsqlookupd代码分析三(详解nsqlookupd tcpServer 中的IOLoop) 上一章我们大致了解了中的的大致的代码,与client也就之间协议处理在这个方法中,今天我们就分析一下这个方法 废话不...

大蓝妹
2015/09/01
692
0

没有更多内容

加载失败,请刷新页面

加载更多

toast组件单元测试

先看是否存在 describe('Toast', () => { it('存在.', () => { expect(Toast).to.be.exist }) }); 看属性,我们要测 ToastVue 和 plugin.js describe('Toast', () =>......

ories
20分钟前
57
0
如何将整个MySQL数据库字符集和排序规则转换为UTF-8?

如何将整个MySQL数据库字符集转换为UTF-8并将排序规则转换为UTF-8? #1楼 在命令行外壳上 如果您是命令行外壳程序之一,则可以非常快速地执行此操作。 只需填写“ dbname”:D DB="dbname"(...

javail
今天
80
0
开源矿工系统内部的层

开源矿工系统内部的层 所谓“层”、“界”、“域”、“集合”,这些词其实是在试图表达物质系统的组成结构和运动景象中的规矩,这些不同人发明的词都是来源于对同一个规律的观察、发现、表达...

NTMiner
今天
88
0
如何将文件从一个git repo移到另一个(不是克隆),保留历史记录

我们的Git储存库是作为单个Monster SVN储存库的一部分开始的,其中每个项目都有自己的树,如下所示: project1/branches /tags /trunkproject2/branches /tags ...

技术盛宴
今天
65
0
数据结构之数组-c代码实现

在上一篇文章里讲了数组的具体内容,然后自己使用c语言对数组进行了实现。 其中定义了一个结构体,定义了长度、已使用长度和地址指针。 定义alloc函数来分配内存空间 之后便是插入元素的ins...

无心的梦呓
今天
65
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部