消息中间件之RabbitMQ初识

原创
2021/06/08 13:44
阅读数 3.6K

作者:threedayman 恒生LIGHT云社区

RabbitMQ是什么

RabbitMQ是部署最广泛的开源消息代理。RabbitMQ有轻量级且易部署的特点。支持多种消息协议。

为什么使用RabbitMQ

常见的使用场景有解耦、异步、削峰填谷。下面我们通过例子来感受下各自场景下使用MQ带来的效益。

解耦

假设有系统A,依赖系统B、系统C、系统D,依赖关系在代码中已经写死,结构如下图。

1621939617(1).png

假设此时又来了一个新需求,系统A需要调用系统E进行一些新的业务操作,那么系统A的程序员又免不了一顿操作,处理接入系统E的需求。同理如果要去掉某个系统的依赖比如系统C,也需要系统A负责的开发进行处理。

那么此时我们如果引入了MQ来看看会带来什么样的变化。

image-20210525185231787.png

系统A发送消息到MQ,系统B、C、D订阅对应的消息进行业务处理。那么我们再来看看之前的场景,假设需要增加一个依赖系统E,只需要系统E的开发人员进行对应的订阅消费即可,同理如果要取消系统C的依赖,只需要系统C取消订阅对应的消息。

异步

假设系统A操作耗时30ms,系统A还将同步调用系统B(300ms)、系统C(600ms)、系统D(200ms)那么这个请求的响应时间将会达到1130ms。过长的响应时间会给客户带来不好的用户体验。

1621940629(1).png

引入MQ之后我们看看会发生什么变化

image-20210525190839346.png

系统A将消息发送给MQ(7ms)之后就返回,系统B、C、D分别监听MQ进行业务处理。那么我们看到针对刚才长耗时的同步依赖,引入MQ进行异步处理后,总体的响应时间从1130ms降到了37ms。

削峰填谷

假设我们有个业务高峰期的请求量能够到达7000 /s而业务低谷流量只有100/s,但是我们的mysql数据库只能承受2000/s的请求。

1621941575(1).png

在这种情况下会导致在高峰期超过了mqsql最高的负载能力而直接打挂,而低峰期没有将mqsql的资源合理利用起来。

引入MQ之后我们看看会发生什么变化

image-20210525192439287.png

此时系统可以按照自己最大的消费能力2000/s去拉取消息,可以平稳度过业务高峰期,同时将一部分消息延迟到业务低谷时期进行处理。不至于出现由于高流量导致数据库被打挂,出现整体服务不可用的现象。

怎样使用RabbitMQ

本小节主要针对RabbitMQ的java客户端编写的几个常用的例子,如果您对使用RabbitMQ已熟练掌握,可跳过本小节。查看完整的RabbitMQ使用说明,请访问官方文档

Hello world

我们通过一个Hello world 的例子来感受下RabbitMQ。首先介绍下本例中使用到的术语

  • Producer:生产者,用来发送消息。
  • Queue:消息队列,用于存储消息,消息经由生产者投递到消息队列,最终被投递到消费者进行消费,消息队列收到机器内存和硬盘资源的限制。
  • Consumer:消费者,用于接收并处理消息。

本例中我们我们将生产Hello World的消息,通过消费者接受并打印出消息。

1621942986(1).png

生产者Send 关键步骤见注释说明

public class Send {
	//队列名称
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        //创建和server之间的连接 connection、channel
        ConnectionFactory factory = new ConnectionFactory();
        //请设置实际部署节点ip
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            //声明一个queue去发送消息
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!";
            //发布消息
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

完整Send代码查阅

消费者Recv 关键步骤见注释说明

public class Recv {
	//队列名称
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        //创建和server之间的连接 connection、channel
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
		//声明要去消费的队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		//通过该类来处理消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

完整Recv代码查阅

Work Queues

在本例中我们将介绍通过RabbitMQ分发耗时任务给多个工作者。RabbitMQ会通过轮询(round-robin)的方式将消息投递给消费者,这是的我们能够很容易的扩展消费能力。

1621944984(1).png

生产者NewTask 关键步骤见注释说明

public class NewTask {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            //将队列设置成持久化
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

            String message = String.join(" ", argv);
			//将消息设置成持久化
            channel.basicPublish("", TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }

}

完整NewTask代码查阅

消费者Woker 关键步骤见注释说明

public class Worker {

    private static final String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        final Channel channel = connection.createChannel();
		//将队列设置成持久化
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
		//一个消费者最多同时处理一个未确认的消息
        channel.basicQos(1);

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");

            System.out.println(" [x] Received '" + message + "'");
            try {
                doWork(message);
            } finally {
                System.out.println(" [x] Done");
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        };
        channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
    }
	//模拟耗时任务,一个.代表耗时1S
    private static void doWork(String task) {
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException _ignored) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }
}

完整Worker代码查阅

Publish/Subscribe

上面我们已经介绍过RabbitMQ的核心消息模型,生产者、消费者、队列,在本小节我们将接触到另一个消息模型exchange** ,它负责从生产者中接收消息,并把消息投递到队列中。exchage主要有以下几种类型**

  • direct
  • topic
  • headers
  • fanout

本例中我们将已fanout类型作为讲解,通过名称我们大概也能猜到此类型exchange会广播接收到的消息到其绑定的队列中。

1622082071(1).jpg

生产者EmitLog 关键步骤见注释说明

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            //创建一个exchange 并指定类型
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

            String message = argv.length < 1 ? "info: Hello World!" :
                    String.join(" ", argv);
			//此处和之前发消息不一样,指定具体的exchange没有指定具体的queue
            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }

}

EmitLog完整代码查阅

消费者ReceiveLogs 关键步骤见注释说明

public class ReceiveLogs {
    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
		//创建fanout类型的exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
        //获取一个独有的,非持久化的,自动删除的队列
        String queueName = channel.queueDeclare().getQueue();
        //通过绑定方法将exchage和queue之间简历关系
        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

ReceiveLogs完整代码查阅

Routing

上一个例子中exchange将接收到的信息广播给了绑定的队列中,本例中我们将增加绑定的一些特定,使exchange有能力通过routingKey(全匹配)来投递不同的消息到不同的队列中。例如日常日志区分error日志进单独的队列。

image-20210526095222998.png

生产者EmitLogDirect 关键步骤见注释说明

public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            //声明一个direct类型的exchange
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

            String severity = getSeverity(argv);
            String message = getMessage(argv);

            channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
        }
    }

    private static String getSeverity(String[] strings) {
        if (strings.length < 1)
            return "info";
        return strings[0];
    }

    private static String getMessage(String[] strings) {
        if (strings.length < 2)
            return "Hello World!";
        return joinStrings(strings, " ", 1);
    }

    private static String joinStrings(String[] strings, String delimiter, int startIndex) {
        int length = strings.length;
        if (length == 0) return "";
        if (length <= startIndex) return "";
        StringBuilder words = new StringBuilder(strings[startIndex]);
        for (int i = startIndex + 1; i < length; i++) {
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
}

EmitLogDirect完整代码查阅

消费者ReceiveLogsDirect 关键步骤见注释说明

public class ReceiveLogsDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1) {
            System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
            System.exit(1);
        }

        for (String severity : argv) {
            //建立exchange和queue之间关系并设置routingKey
            channel.queueBind(queueName, EXCHANGE_NAME, severity);
        }
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

ReceiveLogsDirect完整代码查阅

Topics

提供更丰富的exchange到queue之间的路由规则。规则通过.分隔的routingKey,最高限制 255bytes。跟之前的全匹配routingKey不同,topic类型的exchange的routingKey主要增加了两个特性。

  • *代表一个单词**。**
  • **#** 代表0个或一个单词。

image-20210526101017541.png

生产者EmitLogTopic 关键步骤见注释说明

public class EmitLogTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            channel.exchangeDeclare(EXCHANGE_NAME, "topic");

            String routingKey = getRouting(argv);
            String message = getMessage(argv);

            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
            System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
        }
    }

    private static String getRouting(String[] strings) {
        if (strings.length < 1)
            return "anonymous.info";
        return strings[0];
    }

    private static String getMessage(String[] strings) {
        if (strings.length < 2)
            return "Hello World!";
        return joinStrings(strings, " ", 1);
    }

    private static String joinStrings(String[] strings, String delimiter, int startIndex) {
        int length = strings.length;
        if (length == 0) return "";
        if (length < startIndex) return "";
        StringBuilder words = new StringBuilder(strings[startIndex]);
        for (int i = startIndex + 1; i < length; i++) {
            words.append(delimiter).append(strings[i]);
        }
        return words.toString();
    }
}

EmitLogTopic完整代码查阅

消费者ReceiveLogsTopic 关键步骤见注释说明

public class ReceiveLogsTopic {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        String queueName = channel.queueDeclare().getQueue();

        if (argv.length < 1) {
            System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
            System.exit(1);
        }

        for (String bindingKey : argv) {
            channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
        }

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    }
}

ReceiveLogsTopic完整代码查阅

引入RabbitMQ带来什么挑战

看到这,各位看官是不是越越欲试想在项目中引入RabbitMQ去优化现在的使用场景,那么是不是我们部署一个RabbitMQ服务,然后发送消息就高枕无忧了呢?其实在引入一个中间件时,同时伴随着一些问题,如果我们对这些问题了解不够深入或者全面,那恭喜你将进入挖坑选手序列。为了成为一个靠谱的程序员,我们要充分了解引入中间件给我们 项目带来的挑战,才能在之后的应用上从容应对。下面列了下消息中间件中常见的几类问题

  • 消息丢失
  • 消息重复
  • 消息堆积
  • RabbitMQ的可用性保证

之后的文章,我们将逐个去讲解上述问题的解决方案。 下一讲:RabbitMQ消息可靠性传输

参考文档

https://www.rabbitmq.com/ RabbitMQ官方文档

tips:作者个人经验有限,不足之处烦请指正。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
3 收藏
1
分享
返回顶部
顶部