文档章节

rabbitmq学习(二)

hensemlee
 hensemlee
发布于 10/19 23:08
字数 560
阅读 4
收藏 0

生产者消费者初级案列

ChannelUtils

package com.hensemlee.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.HashMap;
import java.util.Map;

public class ChannelUtils {

    private static final String HOST = "192.168.1.5";
    private static final int PORT = 5672;

    public static Channel getChannel(String connectionDescription) {
        try {
            ConnectionFactory factory = getConnectionFactory();
            Connection connection = factory.newConnection(connectionDescription);
            return connection.createChannel();
        } catch (Exception e) {
            throw new RuntimeException("获取Channel连接失败");
        }
    }

    private static ConnectionFactory getConnectionFactory() {
        ConnectionFactory factory = new ConnectionFactory();
        // 配置连接信息
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setUsername("root");
        factory.setPassword("root");
        // 网络异常自动连接恢复
        factory.setAutomaticRecoveryEnabled(true);
        // 每10秒尝试重试连接一次
        factory.setNetworkRecoveryInterval(10000);

        // 设置ConnectionFactory属性信息
        Map<String, Object> propertiesMap = new HashMap<>();
        propertiesMap.put("principal", "hensemlee");
        propertiesMap.put("description", "rabbitmq入门案例");
        propertiesMap.put("emailAddress", "hensemlee@163.com");
        factory.setClientProperties(propertiesMap);
        return factory;
    }
}

Producer

package com.hensemlee.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;

import java.io.IOException;

public class Producer {
    private static final String EXCHANGE_NAME = "hello_exchange";
    private static final String ROUTINGKEY_NAME = "hello_routingkey";

    public static void main(String[] args) throws IOException {
        Channel channel = ChannelUtils.getChannel("rabbitmq入门案例消息生产者");

        // 声明交换机 (交换机名, 交换机类型, 是否持久化, 是否自动删除, 是否是内部交换机, 交换机属性);
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, false, null);

        // 设置消息属性 发布消息 (交换机名, Routing key, 可靠消息相关属性 后续会介绍, 消息属性, 消息体);
        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().deliveryMode(2).contentType("UTF-8").build();
        channel.basicPublish(EXCHANGE_NAME, ROUTINGKEY_NAME, false, basicProperties, "Hello RabbitMQ".getBytes());
    }
}

Consumer

package com.hensemlee.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer {

    private static final String EXCHANGE_NAME = "hello_exchange";
    private static final String QUEUE_NAME = "hello_queue";
    private static final String ROUTINGKEY_NAME = "hello_routingkey";

    public static void main(String[] args) throws IOException {
        Channel channel = ChannelUtils.getChannel("rabbitmq入门案例消息消费者");

        // 声明队列 (队列名, 是否持久化, 是否排他, 是否自动删除, 队列属性);
        AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // 声明交换机 (交换机名, 交换机类型, 是否持久化, 是否自动删除, 是否是内部交换机, 交换机属性);
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true, false, false, null);
        // 将队列Binding到交换机上 (队列名, 交换机名, Routing key, 绑定属性);
        channel.queueBind(declareOk.getQueue(), EXCHANGE_NAME, ROUTINGKEY_NAME, null);
        // 消费者订阅消息 监听如上声明的队列 (队列名, 是否自动应答(与消息可靠有关 后续会介绍), 消费者标签, 消费者)
        channel.basicConsume(declareOk.getQueue(), true, "rabbitmq入门案例消费者", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(consumerTag);
                System.out.println(envelope.toString());
                System.out.println(properties.toString());
                System.out.println("消息内容:" + new String(body));
            }
        });
    }
}

© 著作权归作者所有

共有 人打赏支持
hensemlee
粉丝 8
博文 65
码字总数 43176
作品 0
徐汇
程序员
私信 提问
#DDBMS#rabbitmq安装

考虑了下,决定以可靠性为前提,在DDBMS低层支撑架构中用上rabbitmq,像OpenStack一样以RPC为主交互 安装rabbitmq: apt-get install rabbitmq-server 安装pika库: pip install pika 相关的...

Hochikong
2015/02/28
0
0
RabbitMQ安装(CentOS 7 64位)

一、安装Erlang 详细的安装介绍在这里(https://www.erlang-solutions.com/downloads/download-erlang-otp) wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rp......

nibilly
2015/04/28
0
0
rabbitmq-server 安装

一,安装rabbitmq-server 1.安装erlang wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm rpm -Uvh erlang-solutions-1.0-1.noarch.rpm rpm --import https:/......

丿小贰丶
05/08
0
0
CentOS7 安装 rabbitmq

安装rabbitmq比较简单,但前提条件是已经装好了erlang环境,如果没安装erlang环境的请移步: --> https://my.oschina.net/u/1257739/blog/1553212 有了erlang环境后开始rabbitmq的安装,如下...

ax2472
2017/10/19
0
0
zabbix自动发现rabbitmq

参考文档 http://blog.csdn.net/qq29778131/article/details/52537288?ticket=ST-77459-cUGNcZF1BJBtNuZoZe1i-passport.csdn.net #python脚本 一,实现功能 实现自动发现rabbitmq queue,并监......

typuc
06/26
0
0

没有更多内容

加载失败,请刷新页面

加载更多

SpringBoot源码:启动过程分析(二)

接着上篇继续分析 SpringBoot 的启动过程。 SpringBoot的版本为:2.1.0 release,最新版本。 一.时序图 一样的,我们先把时序图贴上来,方便理解: 二.源码分析 回顾一下,前面我们分析到了下...

Jacktanger
昨天
0
0
Apache防盗链配置,Directory访问控制,FilesMatch进行访问控制

防盗链配置 通过限制referer来实现防盗链的功能 配置前,使用curl -e 指定referer [root@test-a test-webroot]# curl -e "http://www.test.com/1.html" -x127.0.0.1:80 "www.test.com/1.jpg......

野雪球
昨天
2
0
RxJava threading

因为Rx针对异步系统设计,并且Rx也自然支持多线程,所以新的Rx开发人员有时会假设Rx默认是多线程的。在其他任何事情之前,重要的是澄清Rx默认是单线程的。 除非另有说明,否则每次调用onNex...

woshixin
昨天
0
0
Python的安装及文件类型、变量

一、为什么学习python 服务于大数据、人工智能、自动化运维。 简单易学 代码简洁 薪资高 近几年越来越火 二、Python的安装 linux 系统默认安装, CentOS7 默认安装了python2.7 安装ipython y...

枫叶云
昨天
1
0
JeeSite 4.x 树形结构的表设计和用法

有些同仁对于 JeeSite 4 中的树表设计不太了解,本应简单的方法就可实现,却写了很多复杂的语句和代码,所以有了这篇文章。 在 JeeSite 4 中的树表设计我还是相对满意的,这种设计比较容易理...

ThinkGem
昨天
30
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部