RabbitMQ

2018/08/06 09:39
阅读数 81

一 环境搭建

首先,由于RabbitMQ使用Erlang编写的,需要运行在Erlang运行时环境上,所以在安装RabbitMQ Server之前需要安装Erlang 运行时环境可以到Erlang官网下载对应平台的安装文件。如果没有安装运行时环境,安装RabbitMQ Server的时候,会提示需要先安装Erlang环境。 安装完成之后,确保已经将Erlang的安装路径注册到系统的环境变量中。安装完Erlang之后,这个环境会自动设置,如果没有,在administrator环境下在控制台下面输入,也可以设置:

  

ERLANG_HOME C:\Program Files (x86)\erl5.10.4

Path 路径也需要配置

  

Path C:\Program Files (x86)\erl5.10.4\bin;

然后,去RabbitMQ官网下载RabbitMQ Server服务端程序,选择合适的平台版本下载。安装完成之后,就可以开始使用了

现在就可以对RabbitMQ Server进行配置了。

首先,切换到RabbitMQ Server的安装目录:

RABBITMQ_SERVER  C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3

PATH: C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin;

默认安装的Rabbit MQ 监听端口是5672。先安装Erlang OTP后安装RabbitMQ,安装方式默认即可,RabbitMQ可以勾选安装后台服务、服务启动和停止等操作。

激活Rabbit MQ's Management Plugin

使用Rabbit MQ 管理插件,可以更好的可视化方式查看Rabbit MQ 服务器实例的状态,打开CMD命令,cd到安装目录(..\rabbitmq_server-3.2.3\sbin)下,输入下面的命令激活:

使用RabbitMQ 管理插件

"C:\Program Files (x86)\RabbitMQ Server\rabbitmq_server-3.2.3\sbin\rabbitmq-plugins.bat" enable rabbitmq_management

启动服务 命令:

net stop RabbitMQ && net start RabbitMQ

输入网址,打开监控页面:  http://localhost:15672 (默认账号和密码:guest 和guest)

配置RabbitMQ用户权限

RabbitMQ是存在用户权限的,默认是guest 密码也是guest,隶属于Administrator管理员下。现需要配置新用户和权限,继续打开CMD命令,cd到安装目录sbin下:

用户操作指令:

::查询服务状态
rabbitmqctl status


::列举虚拟主机列表
rabbitmqctl list_vhosts
::列举用户列表
rabbitmqctl list_users

:: 添加用户和密码
rabbitmqctl  add_user  hao  abc123

:: 设置权限   
rabbitmqctl  set_permissions  yy  ".*"  ".*"  ".*"

:: 分配用户组
rabbitmqctl  set_user_tags yy administrator

:: 删除guest用户
rabbitmqctl delete_user guest
::修改用户密码
rabbitmqctl change_password {username}  {newpassowrd}

二 .NET中RabbitMQ使用

1、Nuget下载RabbitMQ.Client第三方类库,版本V3.6.5,高版本与.NET Framework 4.5有冲突,RabbitMQ Client地址

2、利用RabbitMQ Clinet类库编码

 

生产者:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;

namespace Clinet
{
    class Program
    {

        /// <summary>
        /// 连接配置
        /// </summary>
        private static readonly ConnectionFactory RabbitMqFactory = new
            ConnectionFactory
        {
            UserName = ConnectionFactory.DefaultUser,
            Password = ConnectionFactory.DefaultPass,
            Port = 5672,
            VirtualHost = ConnectionFactory.DefaultVHost,
            HostName = "192.168.21.199", //"localhost"
            Protocol = Protocols.DefaultProtocol
        };


        /// <summary>
        /// 路由名称
        /// </summary>
        const string ExchangeName = "cyw.exchange";

        //队列名称
        const string QueueName = "cyw.queue";

        /// <summary>
        /// 路由名称
        /// </summary>
        const string TopExchangeName = "topic.cyw.exchange";

        //队列名称
        const string TopQueueName = "topic.cyw.queue";

        static void Main(string[] args)
        {
            //DirectExchangeSendMsg();
            TopicExchangeSendMsg();
            Console.WriteLine("按任意值,退出程序");
            Console.ReadKey();
        }

        /// <summary>
        ///  单点精确路由模式
        /// </summary>
        private static void DirectExchangeSendMsg()
        {
            using (IConnection conn = RabbitMqFactory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    channel.ExchangeDeclare(ExchangeName, "direct", true, false, null);
                    channel.QueueDeclare(QueueName, true, autoDelete: false, exclusive: false, arguments: null);
                    channel.QueueBind(QueueName, ExchangeName, QueueName);
                    var props = channel.CreateBasicProperties();
                    props.Persistent = true;
                    string vadata = Console.ReadLine();
                    while (vadata != null && !string.Equals(vadata, "exit", StringComparison.InvariantCultureIgnoreCase))
                    {
                        var msgBody = Encoding.UTF8.GetBytes(vadata);
                        channel.BasicPublish(exchange: ExchangeName, routingKey: QueueName, basicProperties: props, body: msgBody);
                        vadata = Console.ReadLine();
                    }

                }
            }
        }


        public static void TopicExchangeSendMsg()
        {
            using (IConnection conn = RabbitMqFactory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    channel.ExchangeDeclare(TopExchangeName, "topic", durable: false, autoDelete: false, arguments: null);
                    channel.QueueDeclare(TopQueueName, durable: false, autoDelete: false, exclusive: false, arguments: null);
                    channel.QueueBind(TopQueueName, TopExchangeName, routingKey: TopQueueName);
                    //var props = channel.CreateBasicProperties();
                    //props.Persistent = true;
                    string vadata = Console.ReadLine();
                    while (vadata != "exit")
                    {
                        var msgBody = Encoding.UTF8.GetBytes(vadata);
                        channel.BasicPublish(exchange: TopExchangeName, routingKey: TopQueueName, basicProperties: null, body: msgBody);
                        Console.WriteLine(string.Format("***发送时间:{0},发送完成,输入exit退出消息发送", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")));
                        vadata = Console.ReadLine();
                    }
                }
            }
        }
    }
}

 

消费者:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace Service
{
    internal class Program
    {
        /// <summary>
        /// 连接配置
        /// </summary>
        private static readonly ConnectionFactory RabbitMqFactory = new
            ConnectionFactory
            {
                UserName = ConnectionFactory.DefaultUser,
                Password = ConnectionFactory.DefaultPass,
                Port = 5672,
                VirtualHost = ConnectionFactory.DefaultVHost,
                HostName = "192.168.21.199", //"localhost",
                Protocol = Protocols.DefaultProtocol
            };

        /// <summary>
        /// 路由名称
        /// </summary>
        const string ExchangeName = "cyw.exchange";

        //队列名称
        const string QueueName = "cyw.queue";

        /// <summary>
        /// 路由名称
        /// </summary>
        const string TopExchangeName = "topic.cyw.exchange";

        //队列名称
        const string TopQueueName = "topic.cyw.queue";

        static void Main(string[] args)
        {
            //DirectAcceptExchange();
            //DirectAcceptExchangeEvent();
            //DirectAcceptExchangeTask();
            TopicAcceptExchange();
            Console.WriteLine("按任意值,退出程序");
            Console.ReadKey();
        }


        public static void DirectAcceptExchange()
        {
            using (IConnection conn = RabbitMqFactory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);
                    channel.QueueDeclare(QueueName, durable: true, autoDelete: false, exclusive: false, arguments: null);
                    channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
                    while (true)
                    {
                        BasicGetResult msgResponse = channel.BasicGet(QueueName, true);
                        if (msgResponse != null)
                        {
                            var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
                            Console.WriteLine(string.Format("***接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                        }

                        //BasicGetResult msgResponse2 = channel.BasicGet(QueueName, noAck: false);

                        ////process message ...

                        //channel.BasicAck(msgResponse2.DeliveryTag, multiple: false);
                        System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
                    }
                }
            }
        }

        public static void DirectAcceptExchangeEvent()
        {
            using (IConnection conn = RabbitMqFactory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    //channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);
                    channel.QueueDeclare(QueueName, durable: true, autoDelete: false, exclusive: false, arguments: null);
                    //channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var msgBody = Encoding.UTF8.GetString(ea.Body);
                        Console.WriteLine(string.Format("***接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                    };
                    channel.BasicConsume(QueueName, true, consumer: consumer);

                    //已过时用EventingBasicConsumer代替
                    //var consumer2 = new QueueingBasicConsumer(channel);
                    //channel.BasicConsume(QueueName, noAck: true, consumer: consumer);
                    //var msgResponse = consumer2.Queue.Dequeue(); //blocking
                    //var msgBody2 = Encoding.UTF8.GetString(msgResponse.Body);

                    Console.WriteLine("按任意值,退出程序");
                    Console.ReadKey();
                }
            }
        }

        public static void DirectAcceptExchangeTask()
        {
            using (IConnection conn = RabbitMqFactory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    //channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);
                    channel.QueueDeclare(QueueName, durable: true, autoDelete: false, exclusive: false, arguments: null);
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);//告诉broker同一时间只处理一个消息
                    //channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var msgBody = Encoding.UTF8.GetString(ea.Body);
                        Console.WriteLine(string.Format("***接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                        int dots = msgBody.Split('.').Length - 1;
                        System.Threading.Thread.Sleep(dots * 1000);
                        Console.WriteLine(" [x] Done");
                        //处理完成,告诉Broker可以服务端可以删除消息,分配新的消息过来
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    //noAck设置false,告诉broker,发送消息之后,消息暂时不要删除,等消费者处理完成再说
                    channel.BasicConsume(QueueName,  false, consumer: consumer);

                    Console.WriteLine("按任意值,退出程序");
                    Console.ReadKey();
                }
            }
        }

        public static void TopicAcceptExchange()
        {
            using (IConnection conn = RabbitMqFactory.CreateConnection())
            {
                using (IModel channel = conn.CreateModel())
                {
                    channel.ExchangeDeclare(TopExchangeName, "topic", durable: false, autoDelete: false, arguments: null);
                    channel.QueueDeclare(TopQueueName, durable: false, autoDelete: false, exclusive: false, arguments: null);
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                    channel.QueueBind(TopQueueName, TopExchangeName, routingKey: TopQueueName);
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var msgBody = Encoding.UTF8.GetString(ea.Body);
                        Console.WriteLine(string.Format("***接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                        int dots = msgBody.Split('.').Length - 1;
                        System.Threading.Thread.Sleep(dots * 1000);
                        Console.WriteLine(" [x] Done");
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                    };
                    channel.BasicConsume(TopQueueName, false, consumer: consumer);

                    Console.WriteLine("按任意值,退出程序");
                    Console.ReadKey();
                }
            }
        }

    }
}

 

 

github:https://github.com/842549829/RabbitMQ

 参考地址:

  1. http://www.infoq.com/cn/articles/message-based-distributed-architecture
  2. http://www.rabbitmq.com/getstarted.html
  3. http://www.codethinked.com/using-rabbitmq-with-c-and-net
  4. http://www.infoq.com/cn/articles/AMQP-RabbitMQ
  5. http://www.infoq.com/cn/articles/ebay-scalability-best-practices
展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部