nsq01介绍
nsq01介绍
AllenOR灵感 发表于5个月前
nsq01介绍
  • 发表于 5个月前
  • 阅读 0
  • 收藏 0
  • 点赞 0
  • 评论 0

新睿云服务器60天免费使用,快来体验!>>>   

初始

# 下载nsq
[root@localhost ~]# wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-1.0.0-compat.linux-amd64.go1.8.tar.gz

# 解压
[root@localhost ~]# tar zxvf nsq-1.0.0-compat.linux-amd64.go1.8.tar.gz -C /usr/local/

# 重命名
[root@localhost ~]# mv /usr/local/nsq-1.0.0-compat.linux-amd64.go1.8 /usr/local/nsq-1.0.0

# 添加到环境变量
[root@localhost ~]# echo "export PATH=\$PATH:/usr/local/nsq-1.0.0/bin" >> /etc/profile

# 重新加载环境变量
[root@localhost ~]# source /etc/profile

# 启动nsq
# nsqlookupd 监听 4160、4161 端口
[root@localhost ~]# nsqlookupd 

# nsqd 监听 4150、4151 端口
[root@localhost ~]# nsqd --lookupd-tcp-address=127.0.0.1:4160

# nsqadmin 监听 4171 端口
[root@localhost ~]# nsqadmin --lookupd-http-address=127.0.0.1:4161

# 校验安装是否正确, 需要打开浏览器来访问
# nsqadmin管理窗口: http://127.0.0.1:4171

# 安装python版本的nsq组件(pynsq)
[root@localhost ~]# pip install pynsq

备注: 我的环境是CentOS 7, Python 2.7.13

术语

主题(topic)

在NSQ中,消息是需要被明确归类的,这有利于对程序和数据以及行为的管理,topic就是归类的管道。
NSQ不需要明确落实topic创建行为(只要提供正确的数据发布指令,它就会自动创建topic),同时它也提供手动创建topic的接口。但是当你的只有主题没有没有频道(channel)时,不论数据推送都少,都不会被NSQ记录(也就是说这些数据是无效的数据),所以在使用NSQ的时候,还是要先创建主题和频道(channel)。言归正传,下面提供了几个例子分别记录<仅提供数据发布指令, nsq就会自动创建topic>和<手动创建topic>。

  • 仅提供数据发布指令(nsq会自动创建topic)
    restful api via curl版本(二选一)

    [root@localhost ~]# curl -XPOST 'http://127.0.0.1:4151/pub?topic=learning_nsq&channel=curl' -d "hlo world\!"

    tcp protocol via python版本(二选一)
    在最后的附录中补充了两个代码简短,但稍微难理解的代码版本

    [root@localhost ~]# vim producer.py 
    # -.- coding:utf-8 -.-
    from __future__ import print_function
    import nsq
    import tornado
    from functools import partial
    
    def producer(topic, channel, callback=None):
        # 当writer连接nsqd成功后, 再推送数据
        if not writer.conns:
            return io_loop.add_callback(
                   partial(producer, *(topic, channel, callback))
            )
    
        # 推送数据
        writer.pub(topic, channel, callback)
    
    def on_finish(conn, status_code):
        # 打印操作返回的结果
        print(conn, status_code)
    
        # 退出程序
        io_loop.stop()
    
    if __name__ == '__main__':
        # 实例化Writer
        writer = nsq.Writer(['127.0.0.1:4150'])
        # 实例化IOLoop
        io_loop = tornado.ioloop.IOLoop.instance()
        # 将producer函数加入到io_loop的callbacks列表中
        io_loop.add_callback(
            partial(producer, *('learning_nsq', 'python', on_finish))
        )
        # 启动IOLoop, 它会在内部执行callbacks列表中的函数
        io_loop.start()
    
    # 运行结果
    [root@localhost ~]# python producer.py 
    OK
  • 手动创建一个topic
    curl版本(二选一)

    [root@localhost ~]# curl -XPOST 'http://127.0.0.1:4151/topic/create?topic=curl_topic'

    nsq的 tcp protocol 不支持创建topic
    所以pynsq组件代码层面就没有create_topic方法,如果硬要通过python程序来创建,那就用requests或AsyncHTTPClient来完成
    requests

    [root@localhost ~]# pip install requests
    [root@localhost ~]# vim create_topic_via_requests.py 
    import requests
    
    if __name__ == '__main__':
        resp = requests.post(
            'http://127.0.0.1:4151/topic/create?topic=python_requests_topic',
            data='using python to learn nsq',
        )
        print(resp, resp.content)
    
    # 运行程序
    [root@localhost ~]# python create_topic_via_requests.py 
    (<Response [200]>, '')

    AsyncHTTPClient

    from tornado.httpclient import AsyncHTTPClient
    import tornado.gen 
    import tornado.ioloop
    
    @tornado.gen.coroutine
    def create_topic():
        client = AsyncHTTPClient()
        resp = yield client.fetch(
            request=('http://127.0.0.1:4151'                   # host
                 '/topic/create'                               # uri
                 '?topic=python_AsyncHTTPClient_topic'         # parameters
             ),
            body='using python to learn nsq',                  # body
            method="POST"
        )
        print(resp)
        raise tornado.gen.Return(resp) 
    
    if __name__ == '__main__':
    
        io_loop = tornado.ioloop.IOLoop.instance()
        io_loop.run_sync(create_topic)

总结
完成主体的创建之后,可以通过访问nsqadmin页面(http://127.0.0.1:4171)来查看主题是否已经创建成功,如果创建没问题的话,在主页(Stream模块)就可以看到topic的名称列表。
通过点击列表中的某个topic,将会进入到这个topic的详情页面。

频道(channel)

正如上面所说,通过推送一条有效消息,NSQ就会自动创建topic,但是打开了nsqadmin界面后会发现两个问题,第一个问题是message为0;第二个问题是页面有警告提示(当前topic中还没有创建channel,只有当channel存在时推送过来的消息才会被加入到队列中等待消费)。

curl创建channel

curl -X POST 'http://127.0.0.1:4151/channel/create?topic=curl_topic&channel=curl_channel'

python创建channel
requests

import requests


if __name__ == '__main__':
    resp = requests.post(
        ('http://127.0.0.1:4151'
         '/channel/create'
         '?topic=python_topic&channel=python_channel'),
    )
    print(resp, resp.content)

AsyncHTTPClient

from tornado.httpclient import AsyncHTTPClient
import tornado.gen
import tornado.ioloop


@tornado.gen.coroutine
def create_topic():
    client = AsyncHTTPClient()
    resp = yield client.fetch(
        request=('http://127.0.0.1:4151'
                 '/channel/create'
                 '?topic=python_AsyncHTTPClient_topic'
                 '&channel=python_channel'
         ),
        body='',
        method="POST"
    )
    print(resp)
    raise tornado.gen.Return(resp)


if __name__ == '__main__':

    io_loop = tornado.ioloop.IOLoop.instance()
    io_loop.run_sync(create_topic)
单点故障(SPOF全称: Single Point Of Failure)

NSQ并没有采用复制的特性,而是利用nsqlookup来充当指令传输的枢纽,基于多个nsqd节点 + topic + channel + 持久化来解决单点故障问题。

附录

更简短的消息推送代码
  • 官方提供的代码

    import nsq
    import tornado.ioloop
    import time
    
    def pub_message():
        writer.pub('test', time.strftime('%H:%M:%S'), finish_pub)
    
    def finish_pub(conn, data):
        print(data)
    
    writer = nsq.Writer(['127.0.0.1:4150'])
    tornado.ioloop.PeriodicCallback(pub_message, 1000).start()
    nsq.run()
  • github上讨论问题过程中提供的代码

    import nsq
    import tornado
    
    writer = nsq.Writer(['127.0.0.1:4150'])
    
    @tornado.gen.coroutine
    def do_pub():
        yield tornado.gen.sleep(1)
        writer.pub("test_topic", "hello world")
        yield tornado.gen.sleep(1)
    
    tornado.ioloop.IOLoop.instance().run_sync(do_pub)
    tornado.ioloop.IOLoop.instance().run_sync(do_pub)

参考

  • 打赏
  • 点赞
  • 收藏
  • 分享
共有 人打赏支持
粉丝 6
博文 2139
码字总数 82983
×
AllenOR灵感
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: