文档章节

RabbitMq入门级教程之完全掌握5种开发方案

木九天
 木九天
发布于 08/22 15:07
字数 2533
阅读 244
收藏 3

1、安装

    windows安装rabbitmq

    mac安装rabbitmq 需要注意的是,mac安装rabbitmq,启动的时候命令前,需要加 sudo,不然会报错误 

2、rabbitmq 开发概念词

    2.1 Producer(生产者)

    2.2 Consumer(消费者)

    2.3 Exchange(交换机)

    2.4 Queue(队列)

    2.5 rountingKey(交换机与队列之间的关系)

3、RabbitMQ开发方案

官网的6中模式,可以点开这个网址,显示6中模式,第6中模式RPC远程调用我们不需要用该模式,所以我们只要关注前五种就可以了。

接下来我们就直接简单教学rabbitmq的简单使用

3.0 代码讲解

/**
 * 声明队列,五个参数列表,如果直接使用默认channel.queueDeclare("queue"),那么其他参数都会自动默认设置属性,所以一般我们几乎都默认它
 * String queue  队列名称
 * boolean durable 队列是需要持久化,意思就是rabbitmq重启的时候,如果不是持久化,那么该队列就会消失,默认true
 * boolean exclusive 如果你想创建一个只有自己可见的队列,不允许其它用户访问RabbitMQ允许你将一个Queue声明成为排他性的,只对首次声明它的连接(Connection)可见,会在其连接断开的时候自动删除。所以我们开发中一般不需要此操作,默认false
 * boolean autoDelete 消息是需要持久化,意思就是rabbitmq重启的时候,如果为true,那么关闭期间接受的消息就会自动消失,默认false
 * Map<String, Object> arguments 这个参数我们只会在使用延迟队列中才会用到,就是延迟队列的相关配置等属性
 * 方法channel.queueDelete("queue")等同于channel.queueDeclare("queue",true,false,false,null);
 */

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**
 * 绑定队列到交换机 
 * String queue 队列名
 * String exchange 交换机名
 * String routingKey 绑定关系 如 大头儿子,小头爸爸 他们的rountingKey就是父子
 */
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");

3.1 pom.xml

版本随意,本博客任何版本高版本也行。

<dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
   <version>3.4.1</version>
</dependency>
<dependency>
   <groupId>org.slf4j</groupId>
   <artifactId>slf4j-log4j12</artifactId>
   <version>1.7.7</version>
</dependency>
<dependency>
   <groupId>org.apache.commons</groupId>
   <artifactId>commons-lang3</artifactId>
   <version>3.3.2</version>
</dependency>
<dependency>
   <groupId>org.springframework.amqp</groupId>
   <artifactId>spring-rabbit</artifactId>
   <version>1.4.0.RELEASE</version>
</dependency>

3.2 工具类准备

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;

public class ConnectionUtil {
    public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("localhost");
        //端口
        factory.setPort(5672);
        //设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 通过工程获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
}

3.3 简单模式(simple)

理解:该rabbit服务器只有一个单一的队列

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

    private final static String QUEUE_NAME = "test_queue";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 从连接中创建通道
        Channel channel = connection.createChannel();
        // 声明(创建)队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 消息内容
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        //关闭通道和连接
        channel.close();
        connection.close();
    }
}
public class Recv {

    private final static String QUEUE_NAME = "mujiutian_queue";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        //创建channel
        Channel channel = connection.createChannel();
        //创建队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //监听队列,发送消息
        channel.basicConsume(QUEUE_NAME, true, consumer);

        // 获取消息,该线程一直进行
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("获取到消息"+message);
        }
    }
}

先打开rece的main函数,这样就可以执行send发送消息。

3.4 工作模式(work)

理解:rabbit服务器有一个exchange(交换机),该交换机下有两个队列,总共发送了100条消息,A队列效率高可以抢到80条消息给消费者,B队列只能抢到20条消息给发送者,他们的总和是100条,为工作模式。

先执行,先执行消费者main函数,在看结果图:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        for (int i = 0; i < 100; i++) {
            // 消息内容
            String message = "" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }

        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(10);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("收到消息'" + message + "'");
            // 返回确认状态
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成状态
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

如图:

// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);

跟这个有关系,这也就是我们所说的工作模式

3.5 订阅模式(Publish/Subribe)

理解:就是群发,该交换机下的所有队列,都会接受相同的所有交换机发来的消息,类似于qq群一样

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 消息内容
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

    private final static String QUEUE_NAME = "test_queue_work";

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_work2";

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

3.6 路由模式(Routing)

理解就是:你是人,如果性别是女,请进女厕,是男性,请进男厕,如同,一个交换机下面的多个队列,根据rountingKey判断该接受的消息

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 消息内容
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "key2", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

    private final static String QUEUE_NAME = "test_queue_work";

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_work2";

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

3.7 通配符模式(Topics)

理解就是:路径aas.# A队列接受这种所有路径,B队列接受路径下的所有队列aab.#

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        // 消息内容
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "key.1", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

    private final static String QUEUE_NAME = "test_queue_topic_work";

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_topic_work2";

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

 

 

 

© 著作权归作者所有

木九天

木九天

粉丝 208
博文 242
码字总数 183695
作品 0
海淀
程序员
私信 提问
PHP版 RabbitMQ官方中文入门教程

工作中用到 RabbitMQ,但是RabbitMQ官方的 Tutorials 是英文Python和Jave的,官网的教程很好,正好最近业余在撸Python,所以直接来搞个PHP版本的,仅供PHP菜鸟一起来入门学习,如有错误和问题...

Yuansir
2013/06/03
10.1K
0
php| 初探 rabbitmq

date: 2018-09-03 21:30:23 title: php| 初探 rabbitmq description: 零零散散折腾了 rabbitmq 几次, 归纳总结一下先 经常看到消息队列( MQ ), 实战中比较少, 说说我的一些粗线的理解: 引入消...

daydaygo
2018/09/05
0
0
消息中间件—RabbitMQ(初探篇)

文章摘要:本篇文章为RabbitMQ的入门文章,不像其他一些程序代码和应用实战性的文章会带着大家从一个“Hello World”的简单例子出发,在该篇幅中主要给大家讲下RabbitMQ消息队列的起源、为何...

癫狂侠
2018/05/23
0
0
伍哥原创之安装RabbitMQ

测试环境:CentOS 6.2 首先用root身份登录测试环境 1,安装erlang (R15B01) 安装完成以后,执行erl看是否能打开eshell,用’halt().’退出,注意后面的点号,那是erlang的结束符。 如下: ...

alex.wu
2012/08/06
1K
0
使用 Docker 部署 RabbitMQ 集群

🏷 基础环境 虚拟机:node1 / node2 / node3 集群:docker swarm,node1 为 manager 节点,node2 / node3 为 worker 节点 集群搭建可参考:Docker Swarm 入门:单机创建 Swarm 集群 🏷 ...

Anoyi
04/11
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Sqoop之导入Mysql数据到Hive出现ASCII

问题是这样的,从Mysql中导入数据到Hive中,Mysql中数据是"T2",到Hive中后,发现变为"54 32",咦,怎么乱码了,感觉这像ASCII编码。 现象有了,之前都没遇到过这样,觉得很奇怪,迅速找了下...

克虏伯
7分钟前
2
0
深度科技受邀参加鲲鹏计算产业峰会·海南

10月22日,华为及生态伙伴共同举办了以“鲲鹏展翅,力算未来 共赢多样性计算时代”为主题的“鲲鹏计算产业峰会•海南”,来自各领域的300多名行业客户、专家及生态伙伴齐聚一堂,共同分享计算...

后浪涛涛
8分钟前
2
0
【Android JetPack系列】LifeCycles

一、简介 Lifecycles 是一个持有组件生命周期状态(如活动或片段)信息的类,并允许其他对象观察此状态。 生命周期使用两个主要枚举来跟踪其关联组件的生命周期状态: Event:从 框架 和 Li...

Agnes2017
10分钟前
2
0
配置码云git自动更新的webhook

配置项目提交到git的时候自动同步服务器代码 一、在服务器项目跟目录新建文件hook.php 代码如下: 1 2 3 4 5 6 7 8 9 10 11 12 13 <?php $json = file_get_contents("php://input"); $data ......

dragon_tech
16分钟前
2
0
一道题深度剖析this指向

< script > var num = 1 ; var myObject = { num : 2 , add : function (){ this . num = 3 ; ( function (){ // 这里的 this 是 window,因为自调用函数的调用者是windo,使用箭头函数则指向......

李超明
20分钟前
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部