轻量级消息队列

2013/06/27 11:26
阅读数 5.3K

1. UCMQ简介

UCMQ是一款支持简单HTTP协议的轻量级消息队列服务,基本特性如下:

l  支持HTTP协议的GET/POST方法,支持长连接(keep-alive);

l  请求响应非常快速,入队列、出队列速度超过10000/秒;

l  每个UCMQ实例支持多队列,队列通过操作接口自动创建;

l  单个队列支持的最大消息数量(未取出的)可以不限制,默认限制在1百万以内;

l  可以在不停止服务的情况下便捷地修改单个队列的属性;

l  可以实时查看队列属性(入队列数量、出队列数量、未读消息数量、消息积压数量)。

项目的最初原型来自于HTTPSQShttp://blog.s135.com/httpsqs/),UCMQ的主要修改如下:

l  数据存储弃用TC,改用日志型文件方式存储。积压数据量不受限于内存,性能更高,大量的数据积压也不会导致性能波动,队列数据可以独立搬迁。

l  增加了多个运维接口和功能接口。

l  完全支持标准HTTP协议,同时兼容原有HTTPSQS协议。

2. 安装部署

2.1. 系统需求

项目

需求

说明

编译器

g++

4.1.2上测试通过

libevent版本

2.0.10以上stable版本

 

2.2. 安装依赖包

2.2.1. Libevent安装

libevent的官方主页上下载相应的源码包进行安装,下面以2.0.21版本为例:

wget https://github.com/downloads/libevent/libevent/libevent-2.0.21-stable.tar.gz

$ ./configure --prefix=$HOME/local/libevent

$ make clean && make && make install

2.3. UCMQ安装

2.3.1. 获取安装包

github下载UCMQ官方最新版本,下载指令如下:

git clone git://github.com/ucopensource/ucmq.git

基本目录结构如下:

tree -d

|-- test

|-- conf

|-- client    #客户端的代码

`-- source    #服务端的代码

2.3.2. 编译和安装

配置脚本(configure)可以附加额外的参数,常用的参数有如下:

参数

说明

示例

--with-libevent

DIR目录下查找libevent

--with-libevent=$HOME/local/

--prefix

安装的根目录

--prefix=$HOME/local/ucmq

--with-arch=ARCH

编译的目标系统

--with-arch=i686

编译和安装:

$ ./configure --with-libevent=$HOME/local/libevent --prefix=$HOME/local/ucmq

$ make clean && make && make install

2.3.3. 配置说明

[server]

http_listen_addr=127.0.0.1       //监听地址

http_listen_port=1818            //监听端口

allow_exec_ip=0                  //允许执行运维指令的ip0为不限制

binlog_file_path=../binlog       //binlog目录(暂未实现主从)

output_log_path=../log           //日志目录

output_log_level=INFO            //日志记录级别

keep_alive=300                   //长连接有效时间(单位:秒)

conf_file=../conf/ucmq.ini       //配置文件路径

pid_file=/tmp/ucmq_eth0_1818.pid //进程保护文件

res_store_space=4                //预留磁盘空间(单位:GB

 

[rtag]

sync_interval=10                 //数据文件内存映射持久化操作间隔(写操作次数)

sync_time_interval=100           //间隔n秒后持久化rtagDB的信息,会调用fsync

 

[queue]

def_max_queue=1000000            //队列允许积压消息量,设置为0则不限制

#延时只能设置更大

def_delay=0                      //队列延时时间

 

#以下内容自从启动后不允许修改

[db]

data_file_path=../data           //data文件存放路径

#文件大小不允许超过64,必须是系统页大小的倍数

db_file_max_size=64              //允许的data文件的最大限制(单位:MB

2.3.4. 启动参数说明

$ ./ucmq -h

---------------------------------------------------------------------------

HTTP Simple Message Queue Service - ucmq v2.0.1 (May 21 2013 11:19:54)

 

    -c            config file path

    -d            run as a daemon

    -v, --version print version and exit

    -h, --help    print this help and exit

 

Note1: Use command "killall ucmq" and "kill `cat /tmp/ucmq.pid`" to stop ucmq.

Note2: Please don't use the command "pkill -9 ucmq" and "kill -9 PID of ucmq" !

----------------------------------------------------------------------------

2.4. 启动服务

$ ./ucmq –c ../conf/ucmq.ini –d

2.5. 关闭服务

要使ucmq正常退出,除了kill进程号之外,还可以在使用以下命令:

$ curl "http://<mq_ip>:<mq_port>/exec?cmd=kill"

3. 协议接口

3.1. 协议说明

协议中所有参数均大小写敏感。服务端返回的内容中以“UCMQ_HTTP”开头的返回内容被称为“返回标识”,该标识描述了HTTP请求的服务返回状态(返回标识尾部有隐藏的“\r\n)。如果队列还未创建,可通过执行某些操作(put/maxqueue/delay)创建新的队列。

PS:如果在linux终端(如bash)中使用curl操作下述命令,请务必注意在url前后加上括号,以避免“&”字符被转义。

3.2. 业务接口

3.2.1. 入队列(put

把消息写入队列有两种方式:HTTP GETHTTP POST,如果希望传递大于2k字节或者非文本数据,建议使用HTTP POST方式。

入队列HTTP GET方式,格式如下:

http://<mq_ip>:<mq_port>/?name=<queue_name>&opt=put&data=<data_string>&ver=2

使用curl方式执行POST方法,如:

curl -d "<data>" "http://<mq_ip>:<mq_port>/?name=<queue_name>&opt=put&ver=2"

UCMQ_HTTP_OK

参数说明:

参数名

描述

是否允许为空

约束

备注

queue_name

队列名称

不能为空

队列名由“字母”,“数字”及“下划线”构成;“下划线”不能出现在开头或结尾。队列名称不超过32字节

如果该队列不存在则自动创建

opt

操作标识

不能为空

需要绝对匹配,大小写敏感

 

data

被写入消息

不能为空

消息内容不能为空

如消息记录长度超过服务端数据文件大小,将返回失败。(见服务端参数:db_file_max_size

ver

接口版本标识

可为空

数字

本文档均以ver=2为样例,加入此标识则使用标准的http协议

HTTP返回值如下:

结果

HTTP Code

HTTP Reason

返回内容

描述

成功

200

OK

UCMQ_HTTP_OK

写队列成功

失败

400

Bad Request

UCMQ_HTTP_ERR_BAD_REQ

请求非法

200

OK

UCMQ_HTTP_ERR_INV_NAME

队列名非法

UCMQ_HTTP_ERR_INV_DATA

消息非法

(例如:消息长度为空或大于数据文件大小)

UCMQ_HTTP_ERR_WLOCK

队列写锁

UCMQ_HTTP_ERR_QUE_FULL

队列已满

(超过本队列消息数上线)

UCMQ_HTTP_ERR_NO_STORAGE

服务端存储空间不足

UCMQ_HTTP_ERR_PUT_ERR

写队列失败

UCMQ_HTTP_ERR_UNKNOWN_OPT

未知操作

UCMQ_HTTP_ERR_QUE_ADD_ERR

队列数量以超出服务端限制,无法创建。

3.2.2. 出队列(get

读取消息队列,格式如下:

http://<mq_ip>:<mq_port>/?name=<queue_name>&opt=get&ver=2

HTTP返回值如下:

结果

HTTP Code

HTTP Reason

返回内容

描述

成功

200

OK

UCMQ_HTTP_OK具体内容

读队列成功

失败

400

Bad Request

UCMQ_HTTP_ERR_BAD_REQ

请求非法

200

OK

UCMQ_HTTP_ERR_INV_NAME

队列名非法

UCMQ_HTTP_ERR_QUE_NO_EXIST

队列不存在

UCMQ_HTTP_ERR_QUE_EMPTY

消息已取空

UCMQ_HTTP_ERR_GET_ERR

读队列失败

UCMQ_HTTP_ERR_UNKNOWN_OPT

未知操作

3.2.3. 获取属性

查看某队列状态,格式如下:

http://<mq_ip>:<mq_port>/?name=<queue_name>&opt=status&ver=2

HTTP返回值如下:

结果

HTTP Code

HTTP Reason

返回内容

描述

成功

200

OK

具体内容

获取队列消息成功

失败

400

Bad Request

UCMQ_HTTP_ERR_BAD_REQ

请求非法

200

OK

UCMQ_HTTP_ERR_INV_NAME

队列名非法

UCMQ_HTTP_ERR_QUE_NO_EXIST

队列不存在

UCMQ_HTTP_ERR_UNKNOWN_OPT

未知操作

       输出结果如:

HTTP Simple Message Queue Service v2.0.1 (May 24 2013 11:15:50)

 

Queue Name: 001

Put the number of queue : 1

Get the number of queue : 0

Unread the number of queue : 1

Maximum number of queue: 1000000

Delay time of queue: 0

Queue write lock cut-OFF time: 0

其中的含义如下:

字段

含义

Queue Name

当前队列名

Put the number of queue

已写消息记录数

Get the number of queue

已读消息记录数

Unread the number of queue

未读消息数

Maximum items of queue

存储消息数限制

Delay times of queue

该队列消息延时时间

Queue write lock cut-OFF time

队列写入锁截止时间,0表示无写锁

为了便于程序对该数据的处理,因此提供了json方式的返回:

http://<mq_ip>:<mq_port>/?name=<queue_name>&opt=status_json&ver=2

       输出结果如:

{“name”:”<queue_name>”,”put_num”:2,”get_num”:1,”unread”:1,”maxqueue”:1000000,”delay”:0,”wlock”:0}

另外,如果对一个完全没有使用过的队列名获取状态,Put the number of queueGet the number of queueUnread the number of queueDelay times of queueQueue write lock cut-OFF time0

3.2.4. 设置属性

3.2.4.1. 记录数限制

设置某个队列存储消息记录数最大限制,可以使用以下命令:

http://<mq_ip>:<mq_port>/?name=<queue_name>&opt=maxqueue&num=<size>&ver=2

参数说明:

参数名

描述

是否为空

约束

备注

num

消息记录数

不能为空

整数

如果设置为“0”(不建议),则不限制队列大小。有可能导致磁盘写满。

HTTP返回值如下:

结果

HTTP Code

HTTP Reason

返回内容

描述

成功

200

OK

UCMQ_HTTP_OK

设置队列最大消息上限成功

失败

400

Bad Request

UCMQ_HTTP_ERR_BAD_REQ

请求非法

200

OK

UCMQ_HTTP_ERR_INV_NAME

队列名非法

UCMQ_HTTP_ERR_QUE_ADD_ERR

队列数量以超出服务端限制,无法创建。

UCMQ_HTTP_ERR_INV_NUM

错误的取值

(超出合法取值范围,合法取值不大于:1000000000条消息)

UCMQ_HTTP_ERR_MAXQUE_ERR

设置队列最大消息数失败

UCMQ_HTTP_ERR_UNKNOWN_OPT

未知操作

3.2.4.2. 延时队列

队列被设置延时队列后,该消息需在写入后等候“time”秒钟后才能被读取。设置队列延时可以使用以下命令:

http://<mq_ip>:<mq_port>/?name=<queue_name>&opt=delay&num=<time>&ver=2

参数说明:

参数名

描述

是否为空

约束

备注

num

队列延时时间

不能为空

整数

队列延时由当前设置决定,设置为“0”时即时解除所有消息的延时

HTTP返回值如下:

结果

HTTP Code

HTTP Reason

返回内容

描述

成功

200

OK

UCMQ_HTTP_OK

设置延时队列成功

失败

400

Bad Request

UCMQ_HTTP_ERR_BAD_REQ

请求非法

200

OK

UCMQ_HTTP_ERR_INV_NAME

队列名非法

UCMQ_HTTP_ERR_QUE_ADD_ERR

队列数量以超出服务端限制,无法创建。

UCMQ_HTTP_ERR_INV_NUM

错误的取值

(超出合法取值范围)

UCMQ_HTTP_ERR_DELAY_ERR

设置延时队列失败

UCMQ_HTTP_ERR_UNKNOWN_OPT

未知操作

3.2.4.3. 队列写锁

队列写锁(又称:设置队列只读)此接口常被用于运维,目的是让队列锁定写入,只提供对外的消息读取。设置某个队列写锁可以使用以下命令:

http://<mq_ip>:<mq_port>/?name=<queue_name>&opt=wlock&num=<time>&ver=2

参数说明:

参数名

描述

是否为空

约束

备注

num

队列写入锁时间

可为空

整数

为空:永久写锁(只读)

0:释放写锁

n:写锁时间(单位:秒)

HTTP返回值如下:

结果

HTTP Code

HTTP Reason

返回内容

描述

成功

200

OK

UCMQ_HTTP_OK

设置队列写锁成功

失败

400

Bad Request

UCMQ_HTTP_ERR_BAD_REQ

请求非法

200

OK

UCMQ_HTTP_ERR_INV_NAME

队列名非法

UCMQ_HTTP_ERR_QUE_NO_EXIST

队列不存在

UCMQ_HTTP_ERR_INV_NUM

错误的取值

(超出合法取值范围)

UCMQ_HTTP_ERR_WLOCK_ERR

设置队列写锁失败

UCMQ_HTTP_ERR_UNKNOWN_OPT

未知操作

3.2.4.4. 持久化间隔

本接口将为所有队列设置持久化间隔时间。此接口是为了保证数据持久化,设置后数据将会定时fsync到磁盘中。过于频繁的此类操作将较的影响服务端性能。同步间隔依数据安全性需求而定。通过以下命令可以修改同步时间间隔:

http://<mq_ip>:<mq_port>/?opt=synctime&num=<value>&ver=2

参数说明:

参数名

描述

是否为空

约束

备注

num

间隔时间

不能为空

整数

num为时间间隔的值,当设置目标值为“0”时不同步。默认值在配置文件中设置。单位为:“秒”。

HTTP返回值如下:

结果

HTTP Code

HTTP Reason

返回内容

描述

成功

200

OK

UCMQ_HTTP_OK

设置同步间隔时间成功

失败

400

Bad Request

UCMQ_HTTP_ERR_BAD_REQ

请求非法

200

OK

UCMQ_HTTP_ERR_INV_NUM

错误的取值

(超出合法取值范围)

UCMQ_HTTP_ERR_SYNC_TIEM_ERR

设置同步间隔时间失败

UCMQ_HTTP_ERR_UNKNOWN_OPT

未知操作

3.2.5. 重置队列

重置队列指的是把一条队列全部恢复到初始状态:

http://<mq_ip>:<mq_port>/?name=<queue_name>&opt=reset&ver=2

HTTP返回值如下:

结果

HTTP Code

HTTP Reason

返回内容

描述

成功

200

OK

UCMQ_HTTP_OK

重置队列成功

失败

400

Bad Request

UCMQ_HTTP_ERR_BAD_REQ

请求非法

200

OK

UCMQ_HTTP_ERR_INV_NAME

队列名非法

UCMQ_HTTP_ERR_QUE_NO_EXIST

队列不存在

UCMQ_HTTP_ERR_RESET_ERR

重置队列失败

UCMQ_HTTP_ERR_UNKNOWN_OPT

未知操作

3.2.6. 删除队列

删除队列把一条队列的数据全部清除,包括其历史纪录和磁盘上的数据。命令如下:

http://<mq_ip>:<mq_port>/?name=<queue_name>&opt=remove&ver=2

HTTP返回值如下:

结果

HTTP Code

HTTP Reason

返回内容

描述

成功

200

OK

UCMQ_HTTP_OK

删除队列成功

失败

400

Bad Request

UCMQ_HTTP_ERR_BAD_REQ

请求非法

200

OK

UCMQ_HTTP_ERR_INV_NAME

队列名非法

UCMQ_HTTP_ERR_QUE_NO_EXIST

队列不存在

UCMQ_HTTP_ERR_REMOVE_ERR

删除队列失败

UCMQ_HTTP_ERR_UNKNOWN_OPT

未知操作

3.3. 运维接口

运维接口可以通过配置文件的allow_exec_ip(如果配置文件设置为“0”则所有ip可执行,如果设置指导ip则只有此ip可操作)允许的ipUCMQ做操作。基本功能如下表:

操作

命令

备注

关闭服务

http:// <mq_ip>:<mq_port>/exec?cmd=kill

关闭实例

重载配置

http:// <mq_ip>:<mq_port>/exec?cmd=reload

配置重载

获取proc/pid/file

http:// <mq_ip>:<mq_port>/exec?cmd=get&file=stat

获取进程信息

获取所有队列信息

http:// <mq_ip>:<mq_port>/stat?type

所有队列监控

内存使用情况

http:// <mq_ip>:<mq_port>/stat?type=mem

内存使用情况

队列最后一分钟运行情况

http:// <mq_ip>:<mq_port>/stat?type=info

一分钟内实例情况

cpu资源消耗情况

http:// <mq_ip>:<mq_port>/stat?type=cpu

cpu使用情况

3.4. 协议错误返回

如果以上协议都不能匹配,服务端会根据不同情况给出相应的错误返回,如:

结果

HTTP Code

HTTP Reason

返回内容

描述

语法解析失败

400

Bad Request

UCMQ_HTTP_ERR_BAD_REQ

请求非法,使用HTTP GET方法时,需将上传的消息做uri encode

未知操作

200

OK

UCMQ_HTTP_ERR_UNKNOWN_OPT

队列操作不可知

非法队列名

200

OK

UCMQ_HTTP_ERR_INV_NAME

非法队列名,队列名为空或超长

队列不存在

200

OK

UCMQ_HTTP_ERR_QUE_NO_EXIST

队列尚未创建

无法创建队列

200

OK

UCMQ_HTTP_ERR_QUE_ADD_ERR

队列数量以超出服务端限制,无法创建。

 

展开阅读全文
打赏
0
17 收藏
分享
加载中
支持,只是,如何主动通知? 我的数据进了队列后,还要我主动拉取吗?
2015/07/03 10:48
回复
举报
更多评论
打赏
1 评论
17 收藏
0
分享
返回顶部
顶部