文档章节

RabbitMq入门级教程之完全掌握5种开发方案

木九天
 木九天
发布于 08/22 15:07
字数 2533
阅读 238
收藏 3

1、安装

    windows安装rabbitmq

    mac安装rabbitmq 需要注意的是,mac安装rabbitmq,启动的时候命令前,需要加 sudo,不然会报错误 

2、rabbitmq 开发概念词

    2.1 Producer(生产者)

    2.2 Consumer(消费者)

    2.3 Exchange(交换机)

    2.4 Queue(队列)

    2.5 rountingKey(交换机与队列之间的关系)

3、RabbitMQ开发方案

官网的6中模式,可以点开这个网址,显示6中模式,第6中模式RPC远程调用我们不需要用该模式,所以我们只要关注前五种就可以了。

接下来我们就直接简单教学rabbitmq的简单使用

3.0 代码讲解

/**
 * 声明队列,五个参数列表,如果直接使用默认channel.queueDeclare("queue"),那么其他参数都会自动默认设置属性,所以一般我们几乎都默认它
 * String queue  队列名称
 * boolean durable 队列是需要持久化,意思就是rabbitmq重启的时候,如果不是持久化,那么该队列就会消失,默认true
 * boolean exclusive 如果你想创建一个只有自己可见的队列,不允许其它用户访问RabbitMQ允许你将一个Queue声明成为排他性的,只对首次声明它的连接(Connection)可见,会在其连接断开的时候自动删除。所以我们开发中一般不需要此操作,默认false
 * boolean autoDelete 消息是需要持久化,意思就是rabbitmq重启的时候,如果为true,那么关闭期间接受的消息就会自动消失,默认false
 * Map<String, Object> arguments 这个参数我们只会在使用延迟队列中才会用到,就是延迟队列的相关配置等属性
 * 方法channel.queueDelete("queue")等同于channel.queueDeclare("queue",true,false,false,null);
 */

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
/**
 * 绑定队列到交换机 
 * String queue 队列名
 * String exchange 交换机名
 * String routingKey 绑定关系 如 大头儿子,小头爸爸 他们的rountingKey就是父子
 */
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");

3.1 pom.xml

版本随意,本博客任何版本高版本也行。

<dependency>
   <groupId>com.rabbitmq</groupId>
   <artifactId>amqp-client</artifactId>
   <version>3.4.1</version>
</dependency>
<dependency>
   <groupId>org.slf4j</groupId>
   <artifactId>slf4j-log4j12</artifactId>
   <version>1.7.7</version>
</dependency>
<dependency>
   <groupId>org.apache.commons</groupId>
   <artifactId>commons-lang3</artifactId>
   <version>3.3.2</version>
</dependency>
<dependency>
   <groupId>org.springframework.amqp</groupId>
   <artifactId>spring-rabbit</artifactId>
   <version>1.4.0.RELEASE</version>
</dependency>

3.2 工具类准备

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;

public class ConnectionUtil {
    public static Connection getConnection() throws Exception {
        //定义连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置服务地址
        factory.setHost("localhost");
        //端口
        factory.setPort(5672);
        //设置账号信息,用户名、密码、vhost
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 通过工程获取连接
        Connection connection = factory.newConnection();
        return connection;
    }
}

3.3 简单模式(simple)

理解:该rabbit服务器只有一个单一的队列

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

    private final static String QUEUE_NAME = "test_queue";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        // 从连接中创建通道
        Channel channel = connection.createChannel();
        // 声明(创建)队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 消息内容
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        //关闭通道和连接
        channel.close();
        connection.close();
    }
}
public class Recv {

    private final static String QUEUE_NAME = "mujiutian_queue";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        //创建channel
        Channel channel = connection.createChannel();
        //创建队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //监听队列,发送消息
        channel.basicConsume(QUEUE_NAME, true, consumer);

        // 获取消息,该线程一直进行
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("获取到消息"+message);
        }
    }
}

先打开rece的main函数,这样就可以执行send发送消息。

3.4 工作模式(work)

理解:rabbit服务器有一个exchange(交换机),该交换机下有两个队列,总共发送了100条消息,A队列效率高可以抢到80条消息给消费者,B队列只能抢到20条消息给发送者,他们的总和是100条,为工作模式。

先执行,先执行消费者main函数,在看结果图:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        for (int i = 0; i < 100; i++) {
            // 消息内容
            String message = "" + i;
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println(" [x] Sent '" + message + "'");
        }

        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(10);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println("收到消息'" + message + "'");
            // 返回确认状态
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_work";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成状态
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

如图:

// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);

跟这个有关系,这也就是我们所说的工作模式

3.5 订阅模式(Publish/Subribe)

理解:就是群发,该交换机下的所有队列,都会接受相同的所有交换机发来的消息,类似于qq群一样

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 消息内容
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

    private final static String QUEUE_NAME = "test_queue_work";

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_work2";

    private final static String EXCHANGE_NAME = "test_exchange_fanout";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

3.6 路由模式(Routing)

理解就是:你是人,如果性别是女,请进女厕,是男性,请进男厕,如同,一个交换机下面的多个队列,根据rountingKey判断该接受的消息

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();
        // 声明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "direct");
        // 消息内容
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "key2", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");
        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

    private final static String QUEUE_NAME = "test_queue_work";

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_work2";

    private final static String EXCHANGE_NAME = "test_exchange_direct";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

3.7 通配符模式(Topics)

理解就是:路径aas.# A队列接受这种所有路径,B队列接受路径下的所有队列aab.#

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Send {

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {
        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明exchange
        channel.exchangeDeclare(EXCHANGE_NAME, "topic");

        // 消息内容
        String message = "Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "key.1", null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        channel.close();
        connection.close();
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv {

    private final static String QUEUE_NAME = "test_queue_topic_work";

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;

public class Recv2 {

    private final static String QUEUE_NAME = "test_queue_topic_work2";

    private final static String EXCHANGE_NAME = "test_exchange_topic";

    public static void main(String[] argv) throws Exception {

        // 获取到连接以及mq通道
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 绑定队列到交换机
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");

        // 同一时刻服务器只会发一条消息给消费者
        channel.basicQos(1);

        // 定义队列的消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);
        // 监听队列,手动返回完成
        channel.basicConsume(QUEUE_NAME, false, consumer);

        // 获取消息
        while (true) {
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String message = new String(delivery.getBody());
            System.out.println(" [x] Received '" + message + "'");
            Thread.sleep(10);

            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
        }
    }
}

 

 

 

© 著作权归作者所有

木九天

木九天

粉丝 204
博文 230
码字总数 178515
作品 0
海淀
程序员
私信 提问
PHP版 RabbitMQ官方中文入门教程

工作中用到 RabbitMQ,但是RabbitMQ官方的 Tutorials 是英文Python和Jave的,官网的教程很好,正好最近业余在撸Python,所以直接来搞个PHP版本的,仅供PHP菜鸟一起来入门学习,如有错误和问题...

Yuansir
2013/06/03
10.1K
0
【每日一博】RabbitMQ 入门

本文都是RabbitMQ的官方指南翻译过来的,由于本人水平有限难免有翻译不当的地方,如发现不对的地方,请联系下我,好及时改正。好了,正文开始: RabbitMQ 是一个消息代理。这主要的原理十分简...

oschina
2015/02/25
3K
9
php| 初探 rabbitmq

date: 2018-09-03 21:30:23 title: php| 初探 rabbitmq description: 零零散散折腾了 rabbitmq 几次, 归纳总结一下先 经常看到消息队列( MQ ), 实战中比较少, 说说我的一些粗线的理解: 引入消...

daydaygo
2018/09/05
0
0
消息中间件—RabbitMQ(初探篇)

文章摘要:本篇文章为RabbitMQ的入门文章,不像其他一些程序代码和应用实战性的文章会带着大家从一个“Hello World”的简单例子出发,在该篇幅中主要给大家讲下RabbitMQ消息队列的起源、为何...

癫狂侠
2018/05/23
0
0
伍哥原创之安装RabbitMQ

测试环境:CentOS 6.2 首先用root身份登录测试环境 1,安装erlang (R15B01) 安装完成以后,执行erl看是否能打开eshell,用’halt().’退出,注意后面的点号,那是erlang的结束符。 如下: ...

alex.wu
2012/08/06
1K
0

没有更多内容

加载失败,请刷新页面

加载更多

【TencentOS tiny】深度源码分析(4)——消息队列

消息队列 在前一篇文章中【TencentOS tiny学习】源码分析(3)——队列 我们描述了TencentOS tiny的队列实现,同时也点出了TencentOS tiny的队列是依赖于消息队列的,那么我们今天来看看消息...

杰杰1号
6分钟前
1
0
Hive

这就是那个 JAVA 类 package cn.itcast.bigdata;import java.util.HashMap;import org.apache.hadoop.hive.ql.exec.UDF;public class PhoneNbrToArea extends UDF{privat......

Garphy
6分钟前
2
0
Springboot开发,第二天

SpringBoot学习,第二天 目录:1、Springboot整合Listener 2、Springboot访问静态资源 3、异常处理 4、热部署 一、SpringBoot整合Listener 两种方式完成组件的注册 1、通过注解扫描完成组件的...

有一个小阿飞
10分钟前
3
0
BeginnersBook Perl 教程

来源:ApacheCN BeginnersBook 翻译项目 译者:飞龙 协议:CC BY-NC-SA 4.0 贡献指南 本项目需要校对,欢迎大家提交 Pull Request。 请您勇敢地去翻译和改进翻译。虽然我们追求卓越,但我们并...

ApacheCN_飞龙
22分钟前
2
0
我的Java秋招面经大合集

阿里面经 阿里中间件研发面经 蚂蚁金服研发面经 岗位是研发工程师,直接找蚂蚁金服的大佬进行内推。 我参与了阿里巴巴中间件部门的提前批面试,一共经历了四次面试,拿到了口头offer。 然后我...

Java技术江湖
27分钟前
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部