文档章节

RabbitMQ监控(二):使用REST API来检测

大光头兰翔
 大光头兰翔
发布于 2017/05/03 10:47
字数 1217
阅读 221
收藏 2

#RabbitMQ 监控(二)

  通过测试RabbitMQ是否能够接收新的请求和构造AMQP信道,可以用来验证RabbitMQ服务器是否健康。接下来,我们将检测消息通信的整个过程,向RabbitMQ发布消息然后消费该消息,来验证消息被正确地路由了。
  虽然可以通过简单的扩展AMQP健康检测程序来对路由过程进行完整的测试,但是检测程序会因此增添额外的复杂性。因为它需要创建队列,并确保如果健康检测程序没有完成的话消息不会建立。这里有其他选择,随RabbitMQ Management插件一同发布的REST API的特性之一,就是一个可以内部检测RabbitMQ服务器健康状态的API。aliveness-test,使用三个步骤来验证RabbitMQ服务器是否健康:
  
  * 创建一个队列来接收测试消息

  * 用队列名称作为消息路由键,将消息发往默认交换器

  * 当消息到达队列的时候就消费该消息,否则就报错

  由于检测程序(aliveness-test)运行在Erlang虚拟机内部,因此它不会受到网络问题的影响。如果在虚拟机外部的话,网络问题可能会阻止你连接到RabbitMQ的端口(5672)。因此,最好是结合RabbitMQ 监控(一)构建的AMQP健康检测和基于API的健康检测两种方式,可以确保对RabbitMQ服务器的全方位监控。特别需要注意的是aliveness-test API检测的过程不会删除创建的队列,这意味着你的健康检测程序可以以非常短的周期重复运行。
  那么如何使用aliveness-test API来编写健康检测程序呢?RabbitMQ自带的Management Plugin提供了一些REST API,在RabbitMQ Management页面可以看到latest HTTP API documentation here的链接,点击可以查看这些API。目前最新的是RabbitMQ Management HTTP API
  
  在上面的API页面可以看到关于aliveness-testAPI的描述:

GET request
Path:/api/aliveness-test/vhost
Description:
Declares a test queue, then publishes and consumes a message. Intended for use by monitoring tools. If everything is working correctly, will return HTTP status 200 with body:
{"status":"ok"}
Note: the test queue will not be deleted (to to prevent queue churn 
if this is repeatedly pinged).

  使用curl测试一下该API,这里的/%2F代表默认的vhost(/)

curl -u guest:guest http://127.0.0.1:15672/api/aliveness-test/%2F
response:{"status":"ok"}

  经测试API可用,因此我们现在要做的就是封装RabbitMQ Management HTTP API的方法。在这个DEMO中,我使用的是Jersey Client。需要看完整代码的请点击我的github

###清单2.1 针对RabbitMQ的基于REST API的健康检测程序

1.定义需要调用的接口 RMQResource.java

@Path("api")
@Consumes({MediaType.APPLICATION_JSON})
@Produces({MediaType.APPLICATION_JSON})
public interface RMQResource {

    [@GET](https://my.oschina.net/get)
    @Path("aliveness-test/{vhost}")
    Response testAliveness(@PathParam("vhost") String vhost);
    
}

2.aliveness-test的response CheckResp.java

/**
 * aliveness-test接口的返回值
 */
public class CheckResp {

    private String status;

    public String getStatus() {
        return status;
    }

    public void setStatus(String status) {
        this.status = status;
    }

    @Override
    public String toString() {
        return "CheckResp{" +
                "status='" + status + '\'' +
                '}';
    }
}

3.封装RabbitMQ的API RMQAPI.java

/**
 * RabbitMQ的REST API
 */
public class RMQApi {

    private final static Logger log = LoggerFactory.getLogger(RMQApi.class);

    private static Map<Class<?>, Object> restInstance = new HashMap<>();

    static {
        RMQConfig config = RMQConfig.Singleton.INSTANCE.getRmqConfig();
        String rmqUrl = config.getRmqUrl();
        String username = config.getUsername();
        String password = config.getPassword();
		
		//调用RabbitMQ Management HTTP API需要带上验证信息
        String authorization = "Basic " + Base64Util.base64Encode((username + ":" + password).getBytes());

        log.info("RabbitMQ monitor REST url {}", rmqUrl);

        ClientConfig clientConfig = new ClientConfig();
        PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
        connectionManager.setMaxTotal(10);
        connectionManager.setDefaultMaxPerRoute(10);
        connectionManager.setValidateAfterInactivity(60000);

        clientConfig.property(ApacheClientProperties.CONNECTION_MANAGER, connectionManager);

        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
        objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);



        WebTarget target = ClientBuilder.newClient(clientConfig)
                .register(JacksonFeature.class)
                .register(new JacksonJsonProvider(objectMapper))
                .target(rmqUrl);

        restInstance.put(RMQResource.class, bindService(RMQResource.class, target, authorization));
    }

    private static <T> T bindService(Class<T> clazz, WebTarget target, String authorization) {
        MultivaluedMap<String, Object> headers = new MultivaluedHashMap<>();
        headers.put("Authorization", Arrays.asList((Object) authorization));
        return WebResourceFactory.newResource(clazz, target, false, headers, new ArrayList<Cookie>(), new Form());
    }

    @SuppressWarnings("unchecked")
    public static <T> T getService(Class<T> tClass) {
        return (T) restInstance.get(tClass);
    }
}

4.检测RabbitMQ状态 APIPingCheck.java

/**
 * 基于RabbitMQ的REST API的检测
 */
public class APIPingCheck {

    private final static RMQResource rmqResource = RMQApi.getService(RMQResource.class);

    private final static Logger log = LoggerFactory.getLogger(APIPingCheck.class);

    public static void checkAPIPing(String vhost) {
        RMQConfig config = RMQConfig.Singleton.INSTANCE.getRmqConfig();
        String host = config.getHost();
        Response response = null;
        try {
            response = rmqResource.testAliveness(vhost);
        } catch (Exception e) {
            log.error("CRITICAL: Could not connect to {}, cause {}", host, e.getMessage());
            ExitUtil.exit(ExitType.CRITICAL.getValue());
        }
        if (response == null || response.getStatus() > 299) {
            log.error("CRITICAL: Broker not alive : {}", response);
            ExitUtil.exit(ExitType.CRITICAL.getValue());
        } else {
            log.info("OK: Broker alive: {}", response.readEntity(CheckResp.class));
            ExitUtil.exit(ExitType.OK.getValue());
        }
    }
}

5.运行检测程序

@Test
public void alivenessTest() {
    String vhost = "/";
    System.out.println(rmqResource.testAliveness(vhost));
}

可以看到监控程序正常运行

10:43:54.516 [main] INFO com.lanxiang.rabbitmqmonitor.check.APIPingCheck - OK: Broker alive: CheckResp{status='ok'}
10:43:54.517 [main] INFO com.lanxiang.rabbitmqmonitor.terminate.ExitUtil - Status is OK

现在尝试将RabbitMQ关掉,再运行监控程序

rabbitmqctl stop_app

可以看到程序报错,无法连接到RabbitMQ

14:48:06.105 [main] ERROR com.lanxiang.rabbitmqmonitor.check.APIPingCheck - CRITICAL: Could not connect to 127.0.0.1, cause java.net.ConnectException: Connection refused
14:48:06.106 [main] INFO com.lanxiang.rabbitmqmonitor.terminate.ExitUtil - Status is CRITICAL

现在你已经可以监控RabbitMQ是否能接收连接,同时也能检测它是否能成功路由消息。但是如果有人将队列的持久化属性修改为非持久化,导致消息更容易丢失的话,该如何保护RabbitMQ配置免遭危险的修改?

##下一章将编写一个监控队列(或者交换器)配置的健康检测程序。

© 著作权归作者所有

大光头兰翔
粉丝 13
博文 5
码字总数 5403
作品 0
私信 提问
加载中

评论(2)

大光头兰翔
大光头兰翔

引用来自“sunzhanpe”的评论

加油!!!!沙发!!
😘
sunzhanpe
sunzhanpe
加油!!!!沙发!!
zabbix自动发现rabbitmq

参考文档 http://blog.csdn.net/qq29778131/article/details/52537288?ticket=ST-77459-cUGNcZF1BJBtNuZoZe1i-passport.csdn.net #python脚本 一,实现功能 实现自动发现rabbitmq queue,并监......

typuc
2018/06/26
0
0
MQ-RabbitMq部署安装配置

环境准备 本次实验使用的是VMvare虚拟机。详情如下 hostname: node1.server ip地址:192.168.0.150 网卡:eth0,eth1 系统及硬件:CentOS 7.2 内存2G,硬盘50G 一、 什么是RabbitMq 消息队列又...

linuxzkq
2018/06/26
0
0
在CentOS上安装rabbitmq

转自:http://flyingdutchman.iteye.com/blog/1887283 这文章写得很好,除了安装软件编译时间比较长之外,安装这个几乎没出现什么错误。现在去配置下rabbitmq,马上就可以使用了。 在本节中我...

mac_zhao
2014/09/28
0
0
【原创】管理 RabbitMQ 服务器的几种方式

1.通过 rabbitmqctl 脚本 rabbitmqctl 是 shell 脚本,其通过 exec 调用了 erl 程序。会间接调用到如下两个 shell 脚本: rabbitmq-env rabbitmq-defaults在使用该脚本时允许用户定制的环境变...

摩云飞
2013/10/24
0
2
消息中间件—RabbitMQ(集群监控篇1)

摘要:任何没有监控的系统上线,一旦在生产环境发生故障,那么排查和修复问题的及时性将无法得到保证 一、为何要对消息中间件进行监控? 上线的业务系统需要监控,然而诸如消息队列、数据库、...

癫狂侠
2018/05/28
0
0

没有更多内容

加载失败,请刷新页面

加载更多

IT兄弟连 Java语法教程 Java语言的跨平台特性

什么是平台 Java是可以跨平台的编程语言,那么首先我们需要知道什么是平台,通常我们把CPU与操作系统的整体称为平台。 CPU大家都知道,是计算机的大脑,它既负责思维运算,又负责计算机中各种...

老码农的一亩三分地
7分钟前
0
0
http传值问题

这两天遇到一个问题 ,与一个渠道联调接口,http请求,展示ptf 的需求,服务方以一个二进制的方式返回。 当时我们在一开始开发的时候,我们按照读取文件的方式处理,本地存一个ptf 的方式 ,...

鬼才王
15分钟前
1
0
【面试】如果你这样回答“什么是线程安全”,面试官都会对你刮目相看

不是线程的安全 面试官问:“什么是线程安全”,如果你不能很好的回答,那就请往下看吧。 论语中有句话叫“学而优则仕”,相信很多人都觉得是“学习好了可以做官”。然而,这样理解却是错的。...

中关村的老男孩
16分钟前
4
0
5.01- Druid数据源配置

1、配置项 配置 缺省值 说明 name 无 配置这个属性的意义在于,如果存在多个数据源,监控的时候 可以通过名字来区分开来。如果没有配置,将会生成一个名字, 格式是:"DataSource-" + Syste...

静以修身2025
20分钟前
0
0
itop4412开发板-Linux内核的编译

本篇文章基于itop4412开发板 5.3.2.1源码目录 Linux 内核源码在光盘“06_源码_uboot 和 kernel”目录下,如下图所示。 5.3.2.2 编译器 内核的编译器和 uboot 的编译器一样,参考“5.3.1.2 编...

书白
24分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部