文档章节

nsq01介绍

AllenOR灵感
 AllenOR灵感
发布于 2017/09/10 01:18
字数 1184
阅读 0
收藏 0
点赞 0
评论 0

初始

# 下载nsq
[root@localhost ~]# wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-1.0.0-compat.linux-amd64.go1.8.tar.gz

# 解压
[root@localhost ~]# tar zxvf nsq-1.0.0-compat.linux-amd64.go1.8.tar.gz -C /usr/local/

# 重命名
[root@localhost ~]# mv /usr/local/nsq-1.0.0-compat.linux-amd64.go1.8 /usr/local/nsq-1.0.0

# 添加到环境变量
[root@localhost ~]# echo "export PATH=\$PATH:/usr/local/nsq-1.0.0/bin" >> /etc/profile

# 重新加载环境变量
[root@localhost ~]# source /etc/profile

# 启动nsq
# nsqlookupd 监听 4160、4161 端口
[root@localhost ~]# nsqlookupd 

# nsqd 监听 4150、4151 端口
[root@localhost ~]# nsqd --lookupd-tcp-address=127.0.0.1:4160

# nsqadmin 监听 4171 端口
[root@localhost ~]# nsqadmin --lookupd-http-address=127.0.0.1:4161

# 校验安装是否正确, 需要打开浏览器来访问
# nsqadmin管理窗口: http://127.0.0.1:4171

# 安装python版本的nsq组件(pynsq)
[root@localhost ~]# pip install pynsq

备注: 我的环境是CentOS 7, Python 2.7.13

术语

主题(topic)

在NSQ中,消息是需要被明确归类的,这有利于对程序和数据以及行为的管理,topic就是归类的管道。
NSQ不需要明确落实topic创建行为(只要提供正确的数据发布指令,它就会自动创建topic),同时它也提供手动创建topic的接口。但是当你的只有主题没有没有频道(channel)时,不论数据推送都少,都不会被NSQ记录(也就是说这些数据是无效的数据),所以在使用NSQ的时候,还是要先创建主题和频道(channel)。言归正传,下面提供了几个例子分别记录<仅提供数据发布指令, nsq就会自动创建topic>和<手动创建topic>。

  • 仅提供数据发布指令(nsq会自动创建topic)
    restful api via curl版本(二选一)

    [root@localhost ~]# curl -XPOST 'http://127.0.0.1:4151/pub?topic=learning_nsq&channel=curl' -d "hlo world\!"

    tcp protocol via python版本(二选一)
    在最后的附录中补充了两个代码简短,但稍微难理解的代码版本

    [root@localhost ~]# vim producer.py 
    # -.- coding:utf-8 -.-
    from __future__ import print_function
    import nsq
    import tornado
    from functools import partial
    
    def producer(topic, channel, callback=None):
        # 当writer连接nsqd成功后, 再推送数据
        if not writer.conns:
            return io_loop.add_callback(
                   partial(producer, *(topic, channel, callback))
            )
    
        # 推送数据
        writer.pub(topic, channel, callback)
    
    def on_finish(conn, status_code):
        # 打印操作返回的结果
        print(conn, status_code)
    
        # 退出程序
        io_loop.stop()
    
    if __name__ == '__main__':
        # 实例化Writer
        writer = nsq.Writer(['127.0.0.1:4150'])
        # 实例化IOLoop
        io_loop = tornado.ioloop.IOLoop.instance()
        # 将producer函数加入到io_loop的callbacks列表中
        io_loop.add_callback(
            partial(producer, *('learning_nsq', 'python', on_finish))
        )
        # 启动IOLoop, 它会在内部执行callbacks列表中的函数
        io_loop.start()
    
    # 运行结果
    [root@localhost ~]# python producer.py 
    OK
  • 手动创建一个topic
    curl版本(二选一)

    [root@localhost ~]# curl -XPOST 'http://127.0.0.1:4151/topic/create?topic=curl_topic'

    nsq的 tcp protocol 不支持创建topic
    所以pynsq组件代码层面就没有create_topic方法,如果硬要通过python程序来创建,那就用requests或AsyncHTTPClient来完成
    requests

    [root@localhost ~]# pip install requests
    [root@localhost ~]# vim create_topic_via_requests.py 
    import requests
    
    if __name__ == '__main__':
        resp = requests.post(
            'http://127.0.0.1:4151/topic/create?topic=python_requests_topic',
            data='using python to learn nsq',
        )
        print(resp, resp.content)
    
    # 运行程序
    [root@localhost ~]# python create_topic_via_requests.py 
    (<Response [200]>, '')

    AsyncHTTPClient

    from tornado.httpclient import AsyncHTTPClient
    import tornado.gen 
    import tornado.ioloop
    
    @tornado.gen.coroutine
    def create_topic():
        client = AsyncHTTPClient()
        resp = yield client.fetch(
            request=('http://127.0.0.1:4151'                   # host
                 '/topic/create'                               # uri
                 '?topic=python_AsyncHTTPClient_topic'         # parameters
             ),
            body='using python to learn nsq',                  # body
            method="POST"
        )
        print(resp)
        raise tornado.gen.Return(resp) 
    
    if __name__ == '__main__':
    
        io_loop = tornado.ioloop.IOLoop.instance()
        io_loop.run_sync(create_topic)

总结
完成主体的创建之后,可以通过访问nsqadmin页面(http://127.0.0.1:4171)来查看主题是否已经创建成功,如果创建没问题的话,在主页(Stream模块)就可以看到topic的名称列表。
通过点击列表中的某个topic,将会进入到这个topic的详情页面。

频道(channel)

正如上面所说,通过推送一条有效消息,NSQ就会自动创建topic,但是打开了nsqadmin界面后会发现两个问题,第一个问题是message为0;第二个问题是页面有警告提示(当前topic中还没有创建channel,只有当channel存在时推送过来的消息才会被加入到队列中等待消费)。

curl创建channel

curl -X POST 'http://127.0.0.1:4151/channel/create?topic=curl_topic&channel=curl_channel'

python创建channel
requests

import requests


if __name__ == '__main__':
    resp = requests.post(
        ('http://127.0.0.1:4151'
         '/channel/create'
         '?topic=python_topic&channel=python_channel'),
    )
    print(resp, resp.content)

AsyncHTTPClient

from tornado.httpclient import AsyncHTTPClient
import tornado.gen
import tornado.ioloop


@tornado.gen.coroutine
def create_topic():
    client = AsyncHTTPClient()
    resp = yield client.fetch(
        request=('http://127.0.0.1:4151'
                 '/channel/create'
                 '?topic=python_AsyncHTTPClient_topic'
                 '&channel=python_channel'
         ),
        body='',
        method="POST"
    )
    print(resp)
    raise tornado.gen.Return(resp)


if __name__ == '__main__':

    io_loop = tornado.ioloop.IOLoop.instance()
    io_loop.run_sync(create_topic)
单点故障(SPOF全称: Single Point Of Failure)

NSQ并没有采用复制的特性,而是利用nsqlookup来充当指令传输的枢纽,基于多个nsqd节点 + topic + channel + 持久化来解决单点故障问题。

附录

更简短的消息推送代码
  • 官方提供的代码

    import nsq
    import tornado.ioloop
    import time
    
    def pub_message():
        writer.pub('test', time.strftime('%H:%M:%S'), finish_pub)
    
    def finish_pub(conn, data):
        print(data)
    
    writer = nsq.Writer(['127.0.0.1:4150'])
    tornado.ioloop.PeriodicCallback(pub_message, 1000).start()
    nsq.run()
  • github上讨论问题过程中提供的代码

    import nsq
    import tornado
    
    writer = nsq.Writer(['127.0.0.1:4150'])
    
    @tornado.gen.coroutine
    def do_pub():
        yield tornado.gen.sleep(1)
        writer.pub("test_topic", "hello world")
        yield tornado.gen.sleep(1)
    
    tornado.ioloop.IOLoop.instance().run_sync(do_pub)
    tornado.ioloop.IOLoop.instance().run_sync(do_pub)

参考

本文转载自:http://www.jianshu.com/p/c8197304cd6b

共有 人打赏支持
AllenOR灵感
粉丝 10
博文 2634
码字总数 82983
作品 0
程序员

暂无文章

一款成功的全球服游戏该如何进行架构选型与设计?

全球服游戏如今正在成为出海游戏的主要考虑模式,跨国对战、全球通服打破国界的限制,将不同地区不同语言的玩家放在一起合作/竞技,成功吸引了大量玩家的关注,并逐渐成为主流的游戏玩法。 ...

UCloudTech
5分钟前
0
0
StringUtils类中isEmpty与isBlank的区别

org.apache.commons.lang.StringUtils类提供了String的常用操作,最为常用的判空有如下两种isEmpty(String str)和isBlank(String str)。 StringUtils.isEmpty(String str) 判断某字符串是否为...

说回答
18分钟前
0
0
react native使用redux快速上手

看例图 要求点击组件一中的按钮,改版组件二的背景色。 利用state和props和容易实现。 //app.jsimport React, {Component} from 'react';import {StyleSheet, Button, View, Text} from ...

燕归南
19分钟前
0
0
页面输出JSON格式数据

package com.sysware.utils;import java.io.IOException;import javax.servlet.ServletResponse;import org.apache.log4j.Logger;import com.sysware.SyswareConstant;pub......

AK灬
41分钟前
0
0
springCloud-2.搭建Eureka Client的使用

1.使用IDEA,Spring Initializr创建 2.填写项目资料 3.选择spring boot版本,插件选择Cloud Discovery→Eureka Discovery 4.选择保存地址 5.修改application.yml eureka: client: s...

贺小康
44分钟前
0
0
CenOS 6.5 RPM 安装 elasticsearch 6.3.1

下载 wget --no-check-certificate https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.rpm...

阿白
46分钟前
0
0
1.4 创建虚拟机&1.5 安装CentOS7&1.6 配置ip(上)&1.7 配置ip(下)

1.4 创建虚拟机 知识点 虚拟机网络链接模式 桥连 直接将虚拟网卡桥接到一个物理网卡上面。需要手工为虚拟系统配置IP地址、子网掩码,而且还要和宿主机器处于同一网段,这样虚拟系统才能和宿主...

小丑鱼00
53分钟前
0
0
TrustAsia(亚洲诚信)助力看雪2018安全开发者峰会

2018年7月21日,看雪2018安全开发者峰会在北京国家会议中心圆满落下帷幕。拥有18年悠久历史的老牌安全技术社区——看雪学院联手国内最大开发者社区CSDN,汇聚业内顶尖的安全开发者和技术专家...

亚洲诚信
54分钟前
0
0
Spring注解介绍

@Resource、@AutoWired、@Qualifier 都用来注入对象。其中@Resource可以以 name 或 type 方式注入,@AutoWired只能以 type 方式注入,@Qualifier 只能以 name 方式注入。 但它们有一些细微区...

lqlm
今天
0
0
32位汇编在64位Ubuntu上的汇编和连接

本教程使用的操作系统是Ubuntu Linux 18.04 LTS版本,汇编器是GNU AS(简称as),连接器是GNU LD(简称ld)。 以下是一段用于检测CPU品牌的汇编小程序(cpuid2.s): .section .dataoutput...

ryanliue
今天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部