文档章节

nsq01介绍

AllenOR灵感
 AllenOR灵感
发布于 2017/09/10 01:18
字数 1184
阅读 0
收藏 0

初始

# 下载nsq
[root@localhost ~]# wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-1.0.0-compat.linux-amd64.go1.8.tar.gz

# 解压
[root@localhost ~]# tar zxvf nsq-1.0.0-compat.linux-amd64.go1.8.tar.gz -C /usr/local/

# 重命名
[root@localhost ~]# mv /usr/local/nsq-1.0.0-compat.linux-amd64.go1.8 /usr/local/nsq-1.0.0

# 添加到环境变量
[root@localhost ~]# echo "export PATH=\$PATH:/usr/local/nsq-1.0.0/bin" >> /etc/profile

# 重新加载环境变量
[root@localhost ~]# source /etc/profile

# 启动nsq
# nsqlookupd 监听 4160、4161 端口
[root@localhost ~]# nsqlookupd 

# nsqd 监听 4150、4151 端口
[root@localhost ~]# nsqd --lookupd-tcp-address=127.0.0.1:4160

# nsqadmin 监听 4171 端口
[root@localhost ~]# nsqadmin --lookupd-http-address=127.0.0.1:4161

# 校验安装是否正确, 需要打开浏览器来访问
# nsqadmin管理窗口: http://127.0.0.1:4171

# 安装python版本的nsq组件(pynsq)
[root@localhost ~]# pip install pynsq

备注: 我的环境是CentOS 7, Python 2.7.13

术语

主题(topic)

在NSQ中,消息是需要被明确归类的,这有利于对程序和数据以及行为的管理,topic就是归类的管道。
NSQ不需要明确落实topic创建行为(只要提供正确的数据发布指令,它就会自动创建topic),同时它也提供手动创建topic的接口。但是当你的只有主题没有没有频道(channel)时,不论数据推送都少,都不会被NSQ记录(也就是说这些数据是无效的数据),所以在使用NSQ的时候,还是要先创建主题和频道(channel)。言归正传,下面提供了几个例子分别记录<仅提供数据发布指令, nsq就会自动创建topic>和<手动创建topic>。

  • 仅提供数据发布指令(nsq会自动创建topic)
    restful api via curl版本(二选一)

    [root@localhost ~]# curl -XPOST 'http://127.0.0.1:4151/pub?topic=learning_nsq&channel=curl' -d "hlo world\!"

    tcp protocol via python版本(二选一)
    在最后的附录中补充了两个代码简短,但稍微难理解的代码版本

    [root@localhost ~]# vim producer.py 
    # -.- coding:utf-8 -.-
    from __future__ import print_function
    import nsq
    import tornado
    from functools import partial
    
    def producer(topic, channel, callback=None):
        # 当writer连接nsqd成功后, 再推送数据
        if not writer.conns:
            return io_loop.add_callback(
                   partial(producer, *(topic, channel, callback))
            )
    
        # 推送数据
        writer.pub(topic, channel, callback)
    
    def on_finish(conn, status_code):
        # 打印操作返回的结果
        print(conn, status_code)
    
        # 退出程序
        io_loop.stop()
    
    if __name__ == '__main__':
        # 实例化Writer
        writer = nsq.Writer(['127.0.0.1:4150'])
        # 实例化IOLoop
        io_loop = tornado.ioloop.IOLoop.instance()
        # 将producer函数加入到io_loop的callbacks列表中
        io_loop.add_callback(
            partial(producer, *('learning_nsq', 'python', on_finish))
        )
        # 启动IOLoop, 它会在内部执行callbacks列表中的函数
        io_loop.start()
    
    # 运行结果
    [root@localhost ~]# python producer.py 
    OK
  • 手动创建一个topic
    curl版本(二选一)

    [root@localhost ~]# curl -XPOST 'http://127.0.0.1:4151/topic/create?topic=curl_topic'

    nsq的 tcp protocol 不支持创建topic
    所以pynsq组件代码层面就没有create_topic方法,如果硬要通过python程序来创建,那就用requests或AsyncHTTPClient来完成
    requests

    [root@localhost ~]# pip install requests
    [root@localhost ~]# vim create_topic_via_requests.py 
    import requests
    
    if __name__ == '__main__':
        resp = requests.post(
            'http://127.0.0.1:4151/topic/create?topic=python_requests_topic',
            data='using python to learn nsq',
        )
        print(resp, resp.content)
    
    # 运行程序
    [root@localhost ~]# python create_topic_via_requests.py 
    (<Response [200]>, '')

    AsyncHTTPClient

    from tornado.httpclient import AsyncHTTPClient
    import tornado.gen 
    import tornado.ioloop
    
    @tornado.gen.coroutine
    def create_topic():
        client = AsyncHTTPClient()
        resp = yield client.fetch(
            request=('http://127.0.0.1:4151'                   # host
                 '/topic/create'                               # uri
                 '?topic=python_AsyncHTTPClient_topic'         # parameters
             ),
            body='using python to learn nsq',                  # body
            method="POST"
        )
        print(resp)
        raise tornado.gen.Return(resp) 
    
    if __name__ == '__main__':
    
        io_loop = tornado.ioloop.IOLoop.instance()
        io_loop.run_sync(create_topic)

总结
完成主体的创建之后,可以通过访问nsqadmin页面(http://127.0.0.1:4171)来查看主题是否已经创建成功,如果创建没问题的话,在主页(Stream模块)就可以看到topic的名称列表。
通过点击列表中的某个topic,将会进入到这个topic的详情页面。

频道(channel)

正如上面所说,通过推送一条有效消息,NSQ就会自动创建topic,但是打开了nsqadmin界面后会发现两个问题,第一个问题是message为0;第二个问题是页面有警告提示(当前topic中还没有创建channel,只有当channel存在时推送过来的消息才会被加入到队列中等待消费)。

curl创建channel

curl -X POST 'http://127.0.0.1:4151/channel/create?topic=curl_topic&channel=curl_channel'

python创建channel
requests

import requests


if __name__ == '__main__':
    resp = requests.post(
        ('http://127.0.0.1:4151'
         '/channel/create'
         '?topic=python_topic&channel=python_channel'),
    )
    print(resp, resp.content)

AsyncHTTPClient

from tornado.httpclient import AsyncHTTPClient
import tornado.gen
import tornado.ioloop


@tornado.gen.coroutine
def create_topic():
    client = AsyncHTTPClient()
    resp = yield client.fetch(
        request=('http://127.0.0.1:4151'
                 '/channel/create'
                 '?topic=python_AsyncHTTPClient_topic'
                 '&channel=python_channel'
         ),
        body='',
        method="POST"
    )
    print(resp)
    raise tornado.gen.Return(resp)


if __name__ == '__main__':

    io_loop = tornado.ioloop.IOLoop.instance()
    io_loop.run_sync(create_topic)
单点故障(SPOF全称: Single Point Of Failure)

NSQ并没有采用复制的特性,而是利用nsqlookup来充当指令传输的枢纽,基于多个nsqd节点 + topic + channel + 持久化来解决单点故障问题。

附录

更简短的消息推送代码
  • 官方提供的代码

    import nsq
    import tornado.ioloop
    import time
    
    def pub_message():
        writer.pub('test', time.strftime('%H:%M:%S'), finish_pub)
    
    def finish_pub(conn, data):
        print(data)
    
    writer = nsq.Writer(['127.0.0.1:4150'])
    tornado.ioloop.PeriodicCallback(pub_message, 1000).start()
    nsq.run()
  • github上讨论问题过程中提供的代码

    import nsq
    import tornado
    
    writer = nsq.Writer(['127.0.0.1:4150'])
    
    @tornado.gen.coroutine
    def do_pub():
        yield tornado.gen.sleep(1)
        writer.pub("test_topic", "hello world")
        yield tornado.gen.sleep(1)
    
    tornado.ioloop.IOLoop.instance().run_sync(do_pub)
    tornado.ioloop.IOLoop.instance().run_sync(do_pub)

参考

本文转载自:http://www.jianshu.com/p/c8197304cd6b

AllenOR灵感
粉丝 11
博文 2635
码字总数 83001
作品 0
程序员
私信 提问

暂无文章

《Designing.Data-Intensive.Applications》笔记 四

第九章 一致性与共识 分布式系统最重要的的抽象之一是共识(consensus):让所有的节点对某件事达成一致。 最终一致性(eventual consistency)只提供较弱的保证,需要探索更高的一致性保证(stro...

丰田破产标志
今天
6
0
docker 使用mysql

1, 进入容器 比如 myslq1 里面进行操作 docker exec -it mysql1 /bin/bash 2. 退出 容器 交互: exit 3. mysql 启动在容器里面,并且 可以本地连接mysql docker run --name mysql1 --env MY...

之渊
今天
7
0
python数据结构

1、字符串及其方法(案例来自Python-100-Days) def main(): str1 = 'hello, world!' # 通过len函数计算字符串的长度 print(len(str1)) # 13 # 获得字符串首字母大写的...

huijue
今天
5
0
OSChina 周日乱弹 —— 我,小小编辑,食人族酋长

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @宇辰OSC :分享娃娃的单曲《飘洋过海来看你》: #今日歌曲推荐# 《飘洋过海来看你》- 娃娃 手机党少年们想听歌,请使劲儿戳(这里) @宇辰OSC...

小小编辑
今天
1K
11
MongoDB系列-- SpringBoot 中对 MongoDB 的 基本操作

SpringBoot 中对 MongoDB 的 基本操作 Database 库的创建 首先 在MongoDB 操作客户端 Robo 3T 中 创建数据库: 增加用户User: 创建 Collections 集合(类似mysql 中的 表): 后面我们大部分都...

TcWong
今天
40
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部