文档章节

RabbitMQ(8)-集群架构知识的补充以及java实现

你我他有个梦
 你我他有个梦
发布于 2015/12/22 10:30
字数 1383
阅读 368
收藏 0

由于RabbitMQ集群对延迟非常敏感,所以只适合在本地局域网内使用

一.知识补充

1.设计目标:

允许生产者和消费者在RabbitMQ节点崩溃的情况下继续运行;

通过添加更多的节点来线性扩展消息通信吞吐量。

2.RabbitMQ会始终记录四种类型的内部元数据:

队列元数据-----队列名称和属性(持久化、自动删除);

交换器元数据------交换器名称、类型和属性;

绑定元数据-----一张简单的表格展示了如何将消息路由到队列

vhost元数据-----vhost内的队列、交换器和绑定提供命名空间和安全属性

引入集群时,rabbitmq需要追踪新的元数据类型

3.RabbitMQ默认不会将队列和状态复制到所有节点的原因:

存储空间----添加新的节点不会带来更多的存储空间;

性能----消息的发布需要将消息复制到集群每个节点,对于持久化消息,每一条消息都会引发磁盘活动,每次增加节点,网络和磁盘负载都会增加

4.分布交换器:

交换器只是一个名称,一个队列的绑定列表(一张查询队列的表),消息的路由是信道将消息上的路由键同交换器的绑定列表进行比较然后 路由消息。

5.消息丢失的解决办法

场景:AMQPbasic.publish命令不会返回消息状态,当信道崩溃时有可能生产者还在不断地创建消息

a.AMQP事务:消息路由到队列之前一直阻塞

b.发送方确认模式

6.内存节点与磁盘节点

内存节点会将所有的队列、交换器、绑定、用户、权限和vhost的元数据定义都存储于内存中,磁盘节点会将其保存在磁盘上,内存节点就可以提供出色的性能,磁盘节点能保障集群配置信息丢失于重启的操作上,所以磁盘节点至少为一个以上,碰巧磁盘节点宕机,还可以路由消息,只是不能创建队列、交换器、绑定、用户、权限和vhost

7.内存节点在重启之后如何获得元数据和状态信息

新节点加入集群时,你必须列出集群的所有磁盘节点并作为运行集群的命令参数

8.镜像队列以及工作原理

镜像队列的主拷贝仅存在于一个节点上(主节点master),在集群中的其他节点拥有从队列的拷贝,主节点不可用时最老的从节点会被选举为新的主队列

工作原理:

信道负责将消息路由到合适的队列上,并将消息队列投递到镜像队列的从队列上

RabbitMQ不能区分故障转移中丢失的确认消息和那些尚未得到确认的消息,已经消费但尚未被确认的消息会重新入队到原来的位置,如果说客户端不支持消息取消通知,应该避免使用镜像队列,那么队列会源源不断的接收投递过来的消息,当然在AMQP2.4开始已经添加这个取消通知的扩展

9.升级集群节点:

9.1通过RabbitMQ Management插件来备份当前配置

9.2关闭所有生产者等待消费者消费完队列中所有消息

9.3关闭节点,并解压新版到现在的安装目录

9.4选择其中一个磁盘节点作为升级节点,启动时会将持久化的集群数据升级到新版本,然后启动其他磁盘节点,它们会获取升级后的集群数据

9.5启动集群内存节点,所有的元数据和配置信息都会被保留



2.代码实现

Consumer:

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ClusterTestConsumer {

    private static final String EXCHANGE_NAME="cluster_test";
    private static final String QUEUE_NAME="cluster_test";
    private static final String ROUTING_KEY="cluster_test";
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = null;
        //haproxy监听其他三台rabbitmq服务,连接haproxy即可进行负载均衡和故障转移等操作
        factory.setPort(5670);
        factory.setHost("192.168.111.131");
        factory.setUsername("admin");
        factory.setPassword("admin");
        while(true) {
            try {
                connection = factory.newConnection();
                final Channel channel = connection.createChannel();
                channel.exchangeDeclare(EXCHANGE_NAME,"direct",false,false,null);
                channel.queueDeclare(QUEUE_NAME,false,false,false,null);
                channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
                System.out.println("Ready for testing!");
                Consumer cluster_test = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        String msg = new String(body,"UTF-8");
                        System.out.println("Receive message:........"+msg);
                        //通过投递消息的标记DeliveryTag向服务器确认该消息被成功消费
                        channel.basicAck(envelope.getDeliveryTag(),false);
                    }
                };
                //消息消费
                channel.basicConsume(QUEUE_NAME,false,"cluster_test",cluster_test);
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
    }
}

Producer:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

import java.io.IOException;
import java.util.Date;
import java.util.concurrent.TimeoutException;

public class ClusterTestProducer {
    private static final String EXCHANGE_NAME = "cluster_test";
    private static final String ROUTING_KEY = "cluster_test";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        Connection connection = null;
        //haproxy监听其他三台rabbitmq服务,连接haproxy即可进行负载均衡和故障转移等操作
        factory.setPort(5670);
        factory.setHost("192.168.111.131");
        factory.setUsername("admin");
        factory.setPassword("admin");
        try {
            connection = factory.newConnection();
            final Channel channel = connection.createChannel();
            String msg = "{\"context\":\"cluster_test\",\"time\":" + new Date().getTime() + "}";
            channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes("UTF-8"));
            System.out.println("Sent Cluster test message!!!");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

测试时可断开其中一台rabbitmq服务,之后消费者便会重新连接到集群中正常工作

© 著作权归作者所有

你我他有个梦

你我他有个梦

粉丝 96
博文 130
码字总数 109764
作品 0
通州
程序员
私信 提问
Spring Boot 配置多源的 RabbitMQ

简介 是开发中很平常的中间件,本文讲述的是怎么在一个项目中配置多源的,这里不过多的讲解的相关知识点。如果你也有遇到需要往多个中发送消息的需求,希望本文可以帮助到你。 环境 rabbitmq...

innerpeacez
07/19
41
0
RabbitMQ 3.6.5 Milestone 2 发布

RabbitMQ 3.6.5 Milestone 2 发布了,RabbitMQ 是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成,因此也是继...

凝小紫
2016/08/30
916
5
java B2B2C Springboot电子商城系统-消息队列之 RabbitMQ

常见的消息队列 需要JAVA Spring Cloud大型企业分布式微服务云构建的B2B2C电子商务平台源码请加企鹅求求: :二一四七七七五六三三 目前业界有四款常用的消息队列,它们分别是RabbitMQ、Roc...

it菲菲
2018/12/14
0
0
RabbitMQ 3.6.6 Milestone 5 发布

RabbitMQ 3.6.6 Milestone 5 发布了,RabbitMQ 是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成,因此也是继...

淡漠悠然
2016/10/08
2K
2
阿里大牛带你轻松实现RabbitMQ 延时消息

RabbitMQ 延时消息的实现(上) 我们在实际业务中有一些需要延时发送消息的场景,例如: 家里有一台智能热水器,需要在30分钟后启动 未付款的订单,15分钟后关闭 注意这里的场景是延时,不是...

Java架构
01/23
0
0

没有更多内容

加载失败,请刷新页面

加载更多

新建作业20191011121223

2.编写一个程序,发出一声警报,然后打印下面的文本: Startled by the sudden sound,Sally shouted,"By the Great Pumpkin,what was that!" #include<stdio.h>int main(){printf("\a");......

电子197朱妍
22分钟前
3
0
家庭作业——苗钰婷

2 编写一个程序,发出一声警报,然后打印下面的文本: Startled by the sudden sound, Sally shouted, "By the Great Pumpkin, what was that! #include<stdio.h>int main(){......

OSC_Okruuv
43分钟前
6
0
经典系统设计面试题解析:如何设计TinyURL(一)

原文链接: https://www.educative.io/courses/grokking-the-system-design-interview/m2ygV4E81AR 编者注:本文以一道经典的系统设计面试题:《如何设计TinyURL》的参考答案和解析为例,帮助...

APEMESH
44分钟前
5
0
2.面向对象设计原则(7条)

开闭原则 开闭原则的含义是:当应用的需求改变时,在不修改软件实体的源代码或者二进制代码的前提下,可以扩展模块的功能,使其满足新的需求。 实现方法 可以通过“抽象约束、封装变化”来实...

Eappo_Geng
46分钟前
9
0
8086汇编基础 debug P命令 一步完成loop循环

    IDE : Masm for Windows 集成实验环境 2015     OS : Windows 10 x64 typesetting : Markdown    blog : my.oschina.net/zhichengjiu    gitee : gitee.com/zhichengjiu   ......

志成就
51分钟前
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部