文档章节

Rabbitmq基本使用 SpringBoot整合Rabbit SpringCloud Stream+Rabbit

o
 osc_4nmshwhm
发布于 2018/08/06 20:02
字数 1416
阅读 15
收藏 0

精选30+云产品,助力企业轻松上云!>>>

https://blog.csdn.net/m0_37867405/article/details/80793601

四、docker中使用rabbitmq

1. 搭建和启动

使用地址:rabbitmq docker

#1. 拉去rabbitmq的镜像
docker pull hub.c.163.com/library/rabbitmq:3.6.11-management #2. 由于rabbitmq远程访问是不允许guest的,所以启动时候需要设置一个用户名和密码 docker run -d --hostname my-rabbit --name some-rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 hub.c.163.com/library/rabbitmq:3.6.11-management # 提示信息:d525b1c1004f50284ca5bab76e8e5e2fea55462b72d9f923cea0da1a29e9aa9dnetstat
  • 1
  • 2
  • 3
  • 4
  • 5

在浏览器中访问:192.168.186.135:15672然后输入用户名和密码

2. java Hello world

简单的一对一生产消息

这里写图片描述

官网地址示例

1.引入 pom.xml

<dependency>
      <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.0.0</version> </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

2.消息提供者

package com.itcloud.concurrency.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class MessageProvider { private static final String HOST = "192.168.186.135"; private static final int PORT = 5672; // rabbitmq端口是5672 private static final String USERNAME = "admin"; private static final String PASSWORD = "admin"; private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "hello world"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" 生产消息:'" + message + "'"); } }

3.消费端

package com.itcloud.concurrency.rabbitmq;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.AMQP.BasicProperties; public class MessageConsumer { private static final String HOST = "192.168.186.135"; private static final int PORT = 5672; // rabbitmq端口是5672 private static final String USERNAME = "admin"; private static final String PASSWORD = "admin"; private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST); factory.setPort(PORT); factory.setUsername(USERNAME); factory.setPassword(PASSWORD); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("消费端接收消息:" + message); } }; //true 异步接收消息 channel.basicConsume(QUEUE_NAME, true, consumer); } }

4.先后启动生产者和消费者

3. 一对多

官网示例

这里写图片描述

4. SpringBoot整合RabbitMQ

第一种交换模式:Direct

1.引入依赖

        <dependency>
            <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>

 

application.yml

spring:
  rabbitmq:
    host: 192.168.186.135
    port: 5672 username: admin password: admin

2.配置org.springframework.amqp.core.Queue

​ 【MQConfig.java】

@Configuration
public class MQConfig { public static final String QUEUE = "queue"; @Bean public Queue queue() { return new Queue(QUEUE); } }

 

  1. 【MQSender.java】消息发送端
@Component
@Slf4j
public class MQSender { @Autowired private AmqpTemplate amqpTemplate; public void sendMsg(String msg) { log.info("发送消息:" + msg); amqpTemplate.convertAndSend(MQConfig.QUEUE, msg); } }

4.【MQReceiver.java】消息接收端

@Component
@Slf4j
public class MQReceiver { //绑定队列名称 @RabbitListener(queues = { MQConfig.QUEUE }) public void receive(String message) { log.info("springboot rabbitmq recevie message:" + message); }

5.简单的controller测试

    @PostMapping("/send")
    public String sendMsg(String msg) {
        mqSender.sendMsg(msg);
        return "success"; }

第二种交换模式:Topic

:biking_woman:特点:

:ballot_box_with_check: 可以根据routing_key自由的绑定不同的队列

:ballot_box_with_check:发送端不需要知道发送到哪个队列,由routing_key去分发到队列中

2.配置org.springframework.amqp.core.Queue

​ 【MQConfig.java】

    public static final String TOPIC_EXCHANGE = "topic_exchange";

    public static final String QUEUE_HELLO = "topic_hello"; public static final String QUEUE_WORLD = "topic_world"; public static final String ROUT_HELLO = "hello_key"; public static final String ROUT_WORLD = "world_key"; ... //=============定义两个队列======================== @Bean public Queue queueHello() { return new Queue(QUEUE_HELLO); } @Bean public Queue queueWorld() { return new Queue(QUEUE_WORLD); } //===================声明topinc交换模式========================= @Bean public TopicExchange topicExchange() { return new TopicExchange(TOPIC_EXCHANGE); } //队列绑定到交换模式上 @Bean @Autowired public Binding bindingHello(Queue queueHello, TopicExchange topicExchange) { return BindingBuilder.bind(queueHello).to(topicExchange).with(ROUT_HELLO); } @Bean @Autowired public Binding bindingWorld(Queue queueWorld, TopicExchange topicExchange) { return BindingBuilder.bind(queueWorld).to(topicExchange).with(ROUT_WORLD); }
  1. 【MQSender.java】消息发送端
    public void sendHello(String msg) {
        log.info("hello topic send messsgae:" + msg);
        amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, MQConfig.ROUT_HELLO, msg);
    }

    public void sendWorld(String msg) { log.info("world topic send messsgae:" + msg); amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, MQConfig.ROUT_WORLD, msg); }

4.【MQReceiver.java】消息接收端

    @RabbitListener(queues = MQConfig.QUEUE_HELLO)
    public void receiveHello(String msg) {
        log.info("springboot rabbitmq recevie message topicHello:" + msg); } @RabbitListener(queues = MQConfig.QUEUE_WORLD) public void receiveWorld(String msg) { log.info("springboot rabbitmq recevie message topicWorld:" + msg); }

5.简单的controller测试

    @PostMapping("/send/topic/hello")
    public String sendHello(String msg) {
        mqSender.sendHello(msg);
        return "success"; } @PostMapping("/send/topic/world") public String sendWorld(String msg) { mqSender.sendWorld(msg); return "success"; }

第三种交换:Fanout

:biking_woman: 特点:只要绑定该交换机的消费者都可以接受到消息

1.配置org.springframework.amqp.core.Queue

​ 【MQConfig.java】

    public static final String FANOUT_EXCHANGE = "fanout_exchange";
    public static final String QUEUE_FANOUT = "queue_fanout"; public static final String QUEUE_FANOUT2 = "queue_fanout2"; .... @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUT_EXCHANGE); } @Bean public Queue queueFanout() { return new Queue(QUEUE_FANOUT); } @Bean public Queue queueFanout2() { return new Queue(QUEUE_FANOUT2); } @Bean @Autowired public Binding bindingFanout(FanoutExchange fanoutExchange, Queue queueFanout) { return BindingBuilder.bind(queueFanout).to(fanoutExchange); } @Bean @Autowired public Binding bindingFanout2(FanoutExchange fanoutExchange, Queue queueFanout2) { return BindingBuilder.bind(queueFanout2).to(fanoutExchange); } 

 

2.【MQSender.java】消息发送端

    public void sendFanout(String msg) {
        log.info("fanout send messsgae:" + msg);
        amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE, "", msg); amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE, "", "自定义消息"); }

3.【MQReceiver.java】消息接收端

    @RabbitListener(queues = MQConfig.QUEUE_FANOUT)
    public void receiveFanout(String msg) {
        log.info("springboot rabbitmq recevie message fanout:" + msg); } @RabbitListener(queues = MQConfig.QUEUE_FANOUT2) public void receiveFanout2(String msg) { log.info("springboot rabbitmq recevie message fanout2:" + msg); }

第四种交换模式:Headers

1.【MQConfig.java】

//======================headers交换模式============================

    public static final String HEADERS_EXCHANGE = "headers_exchange"; public static final String QUEUE_HEADERS = "queue_headers"; @Bean public HeadersExchange headersExchange() { return new HeadersExchange(HEADERS_EXCHANGE); } @Bean public Queue headersQueue() { return new Queue(QUEUE_HEADERS, true); } @Bean @Autowired public Binding headersBind(HeadersExchange headersExchange, Queue headersQueue ) { Map<String, Object> headers = new HashMap<>(); headers.put("key1", "value1"); headers.put("key2", "value2"); return BindingBuilder.bind(headersQueue).to(headersExchange).whereAny(headers).match();

2.【MQSender.java】消息发送端

    public void sendHeaders(String msg) {
        log.info("headers send messsgae:" + msg);
        MessageProperties messageProperties = new MessageProperties(); messageProperties.setHeader("key1", "value1"); messageProperties.setHeader("key3", "value2"); Message message = new Message(msg.getBytes(), messageProperties); amqpTemplate.convertAndSend(MQConfig.HEADERS_EXCHANGE, "", message); }

3.【MQReceiver.java】消息接收端

    @RabbitListener(queues = MQConfig.QUEUE_HEADERS)
    public void receiveHeaders(byte [] bytes) { log.info("springboot rabbitmq recevie message headers:" + new String(bytes)); }

参考博客:

博客1 博客2

6. SpringCloud Stream

1. 创建生产者和消费者

1. 消息生产者:

新建【stream】springcloud项目

pom.xml

​ 其他依赖略

        <dependency>
            <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
//注意导包
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageChannel; //消息发送端接口 public interface SendMsg { public void send(Object obj); } //接口实现类 @EnableBinding(Source.class) public class SendMsgImpl implements SendMsg { @Autowired private MessageChannel output; @Override public void send(Object obj) { this.output.send(MessageBuilder.withPayload(obj).build()); } }

【application.yml】文件

server:
  port: 8083
spring:
  cloud:
    stream:
      binders: 
        defaultRabbit: 
          type: rabbit
          environment: #配置rabbimq连接环境
            spring: 
              rabbitmq:
                host: 192.168.186.135 username: admin password: admin virtual-host: / bindings: output: destination: myExchange #exchange名称,交换模式默认是topic content-type: application/json binder: defaultRabbit

 

//测试使用的Controller
@RestController
public class SendController { @Autowired private SendMsg sendMsg; @GetMapping("/send") public String msgSend(String msg) { this.sendMsg.send(msg); return "success"; } }

2 .消息消费者

新建**【input】**springlcloud项目

//接口
public interface ReceiveMsg { public void receive(Message<Object> message); } //实现类 import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; @Component @EnableBinding(Sink.class) public class ReceiveMsgImpl implements ReceiveMsg { @Override @StreamListener(Sink.INPUT) public void receive(Message<Object> message) { System.out.println("接收消息" + message.getPayload()); } }

 

 

application.yml

server:
  port: 8085

spring:
  cloud:
    stream:
      binders:
        defaultRabbit: 
          type: rabbit
          environment:
            spring: 
              rabbitmq:
                host: 192.168.186.135 username: admin password: admin virtual-host: / bindings: input: destination: myExchange content-type: application/json binder: defaultRabbit
o
粉丝 0
博文 500
码字总数 0
作品 0
私信 提问
加载中
请先登录后再评论。
SpringCloud学习(1)——SpringCloud概述

微服务架构 微服务架构是一种架构模式或者说是一种架构风格, 他提倡将单一应用程序划分成一组小的服务, 每个服务运行在其独立的进程中, 服务之间互相协调,互相配合, 为用户提供最终价值。服...

osc_1ik1t32r
2018/06/11
3
0
微服务技术栈有哪些

微服务条目 技术 备注 服务开发 Springboot、Spring、SpringMVC 服务配置与管理 Netflix公司的Archaius、阿里的Diamond等 服务注册与发现 Eureka、Consul、Zookeeper等 服务调用 REST、RPC、...

osc_z7ezpf37
2018/06/18
1
0
Spring Cloud构建微服务架构:消息驱动的微服务(入门)【Dalston版】

之前在写Spring Boot基础教程的时候写过一篇《Spring Boot中使用RabbitMQ》。在该文中,我们通过简单的配置和注解就能实现向RabbitMQ中生产和消费消息。实际上我们使用的对RabbitMQ的starter...

学亮编程手记
2019/08/17
9
0
每天学点SpringCloud(十三):SpringCloud-Stream整合RabbitMQ

我们知道,当微服务越来越来多的时候,仅仅是feign的http调用方式已经满足不了我们的使用场景了。这个时候系统就需要接入消息中间件了。相比较于传统的Spring项目、SpringBoot项目使用消息中...

osc_34byaiqc
2018/12/09
6
0
【Spring Cloud Stream】异步任务

一、前言 前两篇博客提高了用线程池和消息队列才实现异步任务。本篇博客谈一谈用SpringCloud Stream来实现异步任务。 Spring Cloud Stream是一个用来为微服务应用构建消息驱动能力的框架。它...

qq_26545305
2018/05/20
0
0

没有更多内容

加载失败,请刷新页面

加载更多

JIT的Profile神器JITWatch

点击上方的蓝字关注我吧 程序那些事 简介 老是使用命令行工具在现代化社会好像已经跟不上节奏了,尤其是在做JIT分析时,使用LogCompilation输出的日志实在是太大了,让人望而生畏。有没有什么...

flydean
07/04
0
0
运维基础--虚拟机的使用(一)

虚拟机的使用 开始使用Linux操作系统时,首先可能会接触到两个主要的界面:GUI和CLI,即图形界面个命令界面,而运维一般极少使用到图形界面。 一、命令提示符的格式:[root@mylab11~] # roo...

osc_9os5791s
9分钟前
0
0
以程序员的方式,尽绵薄之力

作为程序员,我们不能冲在第一线,参与病毒防疫工作,我们希望通过我们的方式,让更多的人获取到关于疫情的有用的消息,正确的消息 虽然github可能是个相对小众的平台,对于非程序员来说,可...

Jipson
01/26
0
0
Oracle 等待事件之 db file scattered read

db file scattered read 官网解释: This event signifies that the user process is reading buffers into the SGA buffer cache and is waiting for a physical I/O call to return. A db......

osc_qlj7m2h9
10分钟前
0
0
互联网+时代的畅想

封面的台风卫星照片,我认为很形象地可以看作互联网的那一波浪潮。在智能手机普及的初始阶段,还记得我们对于互联网的狂热,有人说要用互联网颠覆一切,亦有人要用互联网干一切事情,当然,这...

zd200572
2015/09/02
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部