文档章节

RabbitMQ监控(三):监控队列状态

大光头兰翔
 大光头兰翔
发布于 2017/05/04 15:01
字数 1298
阅读 2582
收藏 22

#RabbitMQ 监控(三)

  验证RabbitMQ健康运行只是确保消息通信架构可靠性的一部分,同时,你也需要确保消息通信结构配置没有遭受意外修改,从而避免应用消息丢失。
  
  RabbitMQ Management HTTP API提供了一个方法允许你查看任何vhost上的任何队列:/api/queues/<vhost>/<queue>。你不仅可以查看配置详情,还可以查看队列的数据统计,例如队列消耗的内存,或者队列的平均消息吞吐量。使用curl测试一下该API,这里的/%2F还是代表默认的vhost(/)。

curl -u guest:guest http://127.0.0.1:15672/api/queues/%2F/springrabbitexercise

response
{
    "consumer_details": [
        {
            "channel_details": {
                "peer_host": "127.0.0.1",
                "peer_port": 62679,
                "connection_name": "127.0.0.1:62679 -> 127.0.0.1:5672",
                "user": "guest",
                "number": 2,
                "node": "rabbit@localhost",
                "name": "127.0.0.1:62679 -> 127.0.0.1:5672 (2)"
            },
            "arguments": [],
            "prefetch_count": 1,
            "ack_required": true,
            "exclusive": false,
            "consumer_tag": "amq.ctag-YImeU8Fm_VahDpxv8EAw2Q",
            "queue": {
                "vhost": "/",
                "name": "springrabbitexercise"
            }
        }
    ],
    "messages_details": {
        "rate": 7357
    },
    "messages": 232517,
    "messages_unacknowledged_details": {
        "rate": 0.2
    },
    "messages_unacknowledged": 5,
    "messages_ready_details": {
        "rate": 7356.8
    },
    "messages_ready": 232512,
    "reductions_details": {
        "rate": 1861021.8
    },
    "reductions": 58754154,
    ...
    "auto_delete": false,
    "durable": true,
    "vhost": "/",
    "name": "springrabbitexercise",
    "message_bytes_persistent": 2220250,
    "message_bytes_ram": 2220250,
    "message_bytes_unacknowledged": 40,
    "message_bytes_ready": 2220210,
    "message_bytes": 2220250,
    "messages_persistent": 232517,
    "messages_unacknowledged_ram": 5,
    "messages_ready_ram": 232512,
    "messages_ram": 232517,
    "garbage_collection": {
        "minor_gcs": 0,
        "fullsweep_after": 65535,
        "min_heap_size": 233,
        "min_bin_vheap_size": 46422,
        "max_heap_size": 0
    },
    "state": "running"
}

  为了方便阅读,去掉了部分返回值,但是还是可以看到队列的很多信息。例如可以看到一个consumer的信息、消息占用的内存、队列的durable、auto_delete属性等。利用这些配置信息,新的健康监控程序可以通过API方法的输出来轻松监控队列的属性,并在发生变更时通知你。
  就像之前编写健康检测程序那样,除了服务器、端口、vhost、用户名和密码之外,还需要知道:

  * 队列的名称,以便监控其配置

  * 该队列是否将durable和auto_delete选项打开

###清单3.1 检测队列配置

  完整代码在我的github,下面代码中的@Data和@Slf4j都是插件lombok中的注解,想要了解的可自行百度。

1.定义查看队列信息的接口 RMQResource.java

@Path("api")
@Consumes({MediaType.APPLICATION_JSON})
@Produces({MediaType.APPLICATION_JSON})
public interface RMQResource {
    
    /**
     * Return a queue`s info
     *
     * @param vhost
     * @param name
     * @return {@link QueueInfo}
     */
    @GET
    @Path("queues/{vhost}/{name}")
    Response getQueueInfo(@PathParam("vhost") String vhost, @PathParam("name") String name);
}

2.定义查看队列接口的返回值 QueueInfo.java

@Data
public class QueueInfo {

    private ConsumerDetails[] consumer_details;

    /**
     * unknown class
     */
    @JsonIgnore
    private Object[] incoming;

    /**
     * unknown class
     */
    @JsonIgnore
    private Object[] deliveries;

    /**
     * unknown class
     */
    @JsonIgnore
    private Object arguments;

    private Boolean exclusive;
    
    //...

    private Boolean auto_delete;

    private Boolean durable;

    private String vhost;

    private String name;

    /**
     * unknown class
     */
    @JsonIgnore
    private Object head_message_timestamp;

    /**
     * unknown class
     */
    @JsonIgnore
    private Object recoverable_slaves;

    private Long memory;

    private Double consumer_utilisation;

    private Integer consumers;

    /**
     * unknown class
     */
    @JsonIgnore
    private Object exclusive_consumer_tag;

    /**
     * unknown class
     */
    @JsonIgnore
    private Object policy;

    @JsonFormat(pattern = "yyyy-MM-dd hh:mm:ss")
    private Date idle_since;
}

3.检测队列配置 QueueConfigCheck.java

/**
 * 检测队列配置
 */

@Slf4j
public class QueueConfigCheck {

    private final static RMQResource rmqResource = RMQApi.getService(RMQResource.class);

    public static void checkQueueConfig(String vhost, CheckQueue queue) {
        RMQConfig config = RMQConfig.Singleton.INSTANCE.getRmqConfig();
        String host = config.getHost();
        Response response = null;
        try {
            response = rmqResource.getQueueInfo(vhost, queue.getQueue_name());
        } catch (Exception e) {
            log.error("UNKNOWN: Could not connect to {}, cause {}", host, e.getMessage());
            ExitUtil.exit(ExitType.UNKNOWN.getValue());
        }
        if (response == null || response.getStatus() == 404) {
            log.error("CRITICAL: Queue {} does not exist.", queue.getQueue_name());
            ExitUtil.exit(ExitType.CRITICAL.getValue());
        } else if (response.getStatus() > 299) {
            log.error("UNKNOWN: Unexpected API error : {}", response);
            ExitUtil.exit(ExitType.UNKNOWN.getValue());
        } else {
            QueueInfo info = response.readEntity(QueueInfo.class);
            if (!info.getAuto_delete().equals(queue.getAuto_delete())) {
                log.warn("WARN: Queue {} - auto_delete flag is NOT {}", queue.getQueue_name(), info.getAuto_delete());
                ExitUtil.exit(ExitType.WARN.getValue());
            }
            if (!info.getDurable().equals(queue.getDurable())) {
                log.warn("WARN: Queue {} - durable flag is NOT {}", queue.getQueue_name(), info.getDurable());
                ExitUtil.exit(ExitType.WARN.getValue());
            }
        }
        log.info("OK: Queue {} configured correctly.", queue.getQueue_name());
        ExitUtil.exit(ExitType.OK.getValue());
    }
}

4.检测队列配置的方法参数 CheckQueue.java @Data public class CheckQueue {

    private final String queue_name;

    private final Boolean auto_delete;

    private final Boolean durable;

    public CheckQueue(String queue_name, Boolean auto_delete, Boolean durable) {
        this.queue_name = queue_name;
        this.auto_delete = auto_delete;
        this.durable = durable;
    }
}

5.运行检测程序

@Test
public void testQueueConfig() {
    String queue_name = "springrabbitexercise";
    Boolean auto_delete = false;
    Boolean durable = true;
    String vhost = "/";
    CheckQueue queue = new CheckQueue(queue_name, auto_delete, durable);
    QueueConfigCheck.checkQueueConfig(vhost, queue);
}

可以看到监控正常运行:

11:38:23.286 [main] INFO com.lanxiang.rabbitmqmonitor.check.QueueConfigCheck - OK: Queue springrabbitexercise configured correctly.
11:38:23.289 [main] INFO com.lanxiang.rabbitmqmonitor.terminate.ExitUtil - Status is OK

  这段RabbitMQ队列检测的程序有一处修改,如果健康检测程序无法连接到API服务器的话,会返回EXIT_UNKNOWN。前一章的API ping健康检测要么成功要么失败,故障代码之间没有区别,但是队列检测API方法在失败时通过HTTP状态码提供了更多信息。如果HTTP状态码是404就代表尝试验证的队列不存在,检测失败并返回EXIT_CRITICAL。对于其他大于299的HTTP状态码,退出代码为EXIT_UNKNOWN。
  
  在获取到RabbitMQ API的response之后,使用JSON进行解码,并且把得到的durable和auto_delete参数与期望的参数进行比较,如果参数和预期不相符的话,返回EXIT_WARNING或者EXIT_CRITICAL状态码。如果队列所有的配置都正确的话,那么就正确退出。
  
  在了解我们对RabbitMQ做监控的原理之后,可以根据RabbitMQ Management HTTP API定制更多的监控,例如:   
  
  * /api/nodes,可以获取集群中每个节点的数据

  * /api/queues/<vhost>/<queue>,可以获取队列的详细情况,例如消息处理的速率、积压的消息数量等。

  除此之外还有许多其他API,我们要做的就是根据自身的业务逻辑和这些API来设计合理的监控脚本。RabbitMQ监控系列就到此结束啦,还是很可惜没有实战的机会吧,因为最近在工作变动期间,看了一下RabbitMQ实战这本书,兴起想写一下博客试试。
  
  毕业快一年了,想养成写博客的习惯。正好最近也在工作变动中,能有闲暇时间尝试一下,博客写的比较水,多多包涵。

© 著作权归作者所有

大光头兰翔
粉丝 13
博文 5
码字总数 5403
作品 0
私信 提问
加载中

评论(8)

sunzhanpe
sunzhanpe

引用来自“sunzhanpe”的评论

就三篇就不写了?差评

引用来自“大光头兰翔”的评论

在公司最后一周就看了这么点啊,而且你知道一句话叫‘举一反三’吗?😤
ok
大光头兰翔
大光头兰翔

引用来自“sunzhanpe”的评论

就三篇就不写了?差评
在公司最后一周就看了这么点啊,而且你知道一句话叫‘举一反三’吗?😤
sunzhanpe
sunzhanpe
就三篇就不写了?差评
守恒的猫
守恒的猫

引用来自“守恒的猫”的评论

哎,比起RabbitMQ,beanstalkd扯蛋多了......

引用来自“大光头兰翔”的评论

刚搜了一下beanstalkd,好像比RabbitMQ多了延时队列和轮询系统。

感觉用啥的话还是根据业务场景来吧,之前这边直接用的Guava的EventBus,就是一个事件总线,都不是分布式的,偶尔会丢消息。后来改用RabbitMQ就是因为有ACK和持久化的机制。

引用来自“守恒的猫”的评论

没容灾没分布式你怕不怕

引用来自“大光头兰翔”的评论

哈哈,这样就专心消息的收发上,肯定效率贼高😎
没用过RabbitMQ,木有对比过...
大光头兰翔
大光头兰翔

引用来自“守恒的猫”的评论

哎,比起RabbitMQ,beanstalkd扯蛋多了......

引用来自“大光头兰翔”的评论

刚搜了一下beanstalkd,好像比RabbitMQ多了延时队列和轮询系统。

感觉用啥的话还是根据业务场景来吧,之前这边直接用的Guava的EventBus,就是一个事件总线,都不是分布式的,偶尔会丢消息。后来改用RabbitMQ就是因为有ACK和持久化的机制。

引用来自“守恒的猫”的评论

没容灾没分布式你怕不怕
哈哈,这样就专心消息的收发上,肯定效率贼高😎
守恒的猫
守恒的猫

引用来自“守恒的猫”的评论

哎,比起RabbitMQ,beanstalkd扯蛋多了......

引用来自“大光头兰翔”的评论

刚搜了一下beanstalkd,好像比RabbitMQ多了延时队列和轮询系统。

感觉用啥的话还是根据业务场景来吧,之前这边直接用的Guava的EventBus,就是一个事件总线,都不是分布式的,偶尔会丢消息。后来改用RabbitMQ就是因为有ACK和持久化的机制。
没容灾没分布式你怕不怕
大光头兰翔
大光头兰翔

引用来自“守恒的猫”的评论

哎,比起RabbitMQ,beanstalkd扯蛋多了......
刚搜了一下beanstalkd,好像比RabbitMQ多了延时队列和轮询系统。

感觉用啥的话还是根据业务场景来吧,之前这边直接用的Guava的EventBus,就是一个事件总线,都不是分布式的,偶尔会丢消息。后来改用RabbitMQ就是因为有ACK和持久化的机制。
守恒的猫
守恒的猫
哎,比起RabbitMQ,beanstalkd扯蛋多了......
消息中间件—RabbitMQ(集群监控篇1)

摘要:任何没有监控的系统上线,一旦在生产环境发生故障,那么排查和修复问题的及时性将无法得到保证 一、为何要对消息中间件进行监控? 上线的业务系统需要监控,然而诸如消息队列、数据库、...

癫狂侠
2018/05/28
0
0
zabbix自动发现rabbitmq

参考文档 http://blog.csdn.net/qq29778131/article/details/52537288?ticket=ST-77459-cUGNcZF1BJBtNuZoZe1i-passport.csdn.net #python脚本 一,实现功能 实现自动发现rabbitmq queue,并监......

typuc
2018/06/26
0
0
消息中间件—RabbitMQ(集群原理与搭建篇)

摘要:实际生产应用中都会采用消息队列的集群方案,如果选择RabbitMQ那么有必要了解下它的集群方案原理 一般来说,如果只是为了学习RabbitMQ或者验证业务工程的正确性那么在本地环境或者测试...

癫狂侠
2018/05/25
0
0
openstack 最简单的 RabbitMQ 监控方法

先来看张图: 这是 Nova 的架构图,我们可以看到有两个组件处于架构的中心位置:数据库和Queue。数据库保存状态信息,而几乎所有的 nova-* 服务都直接依赖于 Queue 实现服务之间的通信和调用...

zhongbeida_xue
2018/05/09
0
0
配置RabbitMQ默认群集模式

RabbitMQ是什么? MQ(Msaaage Queue,消息队列)是一种应用程序对应用程序的通信方式。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无须专用链接来连接它们。消息传递指...

cchenyz
2018/07/31
0
0

没有更多内容

加载失败,请刷新页面

加载更多

对话亲历者|鲁肃:我在支付宝“拧螺丝“的日子

摘要: 他是支付宝技术平台的奠基人之一,但是他总说“这还不是我心中最完美的架构”;他行事低调但却有着“此时此地,非我莫属”的豪气;他曾无数次充当救火大队长,但自评只是“没有掉队的...

阿里云云栖社区
23分钟前
2
0
设置 npm yarn 淘宝源

设置npm config set chromedriver_cdnurl=http://cdn.npm.taobao.org/dist/chromedriver设置yarn config set "chromedriver_cdnurl" "https://npm.taobao.org/mirrors/chromedriver"......

internetafei
32分钟前
2
0
Docker搭建Mysql集群、主从同步复制

1、创建数据挂载点: mkdir /opt/mysql-master/mysql、/opt/mysql-master/conf.d、/opt/mysql-slave/mysql、/opt/mysql-slave/conf.d 2、分别在master、slave节点文件目录conf.d下创建touch......

WALK_MAN
55分钟前
10
0
手把手教你做中间件开发(分布式缓存篇)-借助redis已有的网络相关.c和.h文件,半小时快速实现一个epoll异步网络框架,程序demo

本文档配合主要对如下demo进行配合说明: 借助redis已有的网络相关.c和.h文件,半小时快速实现一个epoll异步网络框架,程序demo 0. 手把手教你做中间件、高性能服务器、分布式存储技术交流群 ...

y123456yz
56分钟前
3
0
Spring-boot单元测试(私有方法测试)

Spring-boot的单元测试网上有了很多,当项目是可以使用spring-boot正常运行时,只要在测试类上添加如下配置就使用@Autowired的方式进行单元测试 @RunWith(SpringJUnit4ClassRunner.class)@...

琴兽
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部