RabbitMQ消息队列帮助类

2019/06/18 17:25
阅读数 294

调用

//消息队列发消息
            MqConfigInfo config = new MqConfigInfo();
            config.MQExChange = "DrawingOutput";
            config.MQQueueName = "DrawingOutput";
            config.MQRoutingKey = "DrawingOutput";
            MqHelper heper = new MqHelper(config);
            byte[] body = Encoding.UTF8.GetBytes("98K");//发送的内容
            heper.SendMsg(body);

消息队列帮助类MqHelper

using Newtonsoft.Json;
using RabbitMQ.Client;
using RabbitMQ.Client.Content;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace LogTest
{
    public class MqHelper : IDisposable
    {
        #region 消息队列的配置信息

        public IConnection MQConnection { get; set; }

        public IModel MQModel { get; set; }

        public MqConfigInfo MqConfigInfo { get; set; }

        #endregion
        public MqHelper(MqConfigInfo configInfo)
        {
            MqConfigInfo = configInfo;
            var username = "guest";//用户名
            //if (string.IsNullOrEmpty(username))
            //{
            //    throw new ConfigurationErrorsException("MQHelper配置节MQUserName错误");
            //}
            var password = "guest";//密码
            //if (string.IsNullOrEmpty(password))
            //{
            //    throw new ConfigurationErrorsException("MQHelper配置节MQPassWord错误");
            //}
            var virtualhost = "mq_test";//虚拟主机名
            //if (string.IsNullOrEmpty(virtualhost))
            //{
            //    throw new ConfigurationErrorsException("MQHelper配置节MQVirtualHost错误");
            //}

            var connectionFactory = new ConnectionFactory
            {
                UserName = username,
                Password = password,
                VirtualHost = virtualhost,
                RequestedHeartbeat = 0,
                HostName = "192.168.1.49",//消息队列的ip
                Port = 5672
            };

            try
            {
                MQConnection = connectionFactory.CreateConnection();
                MQModel = MQConnection.CreateModel();
                if (MqConfigInfo.MQExChangeType != null)
                {
                    MQModel.ExchangeDeclare(MqConfigInfo.MQExChange, MqConfigInfo.MQExChangeType);
                    QueueDeclareOk ok = MQModel.QueueDeclare(MqConfigInfo.MQQueueName, true, false, false, null);

                    MQModel.QueueBind(MqConfigInfo.MQQueueName, MqConfigInfo.MQExChange, MqConfigInfo.MQRoutingKey);
                }
            }
            catch (Exception ex)
            {
                throw new Exception("MQHelper创建连接失败", ex);
            }
        }

        /// <summary>
        /// 发送消息
        /// </summary>
        /// <typeparam name="T">消息类型</typeparam>
        /// <param name="message">消息主体</param>
        /// <returns></returns>
        public bool SendMsg(object message)
        {
            try
            {
                IMapMessageBuilder mmb = new MapMessageBuilder(MQModel);
                System.Collections.Generic.IDictionary<string, object> header = mmb.Headers;
                //header["Header"] =MqConfigInfo.MQHeader;

                string json = JsonConvert.SerializeObject(message);

                byte[] body = Encoding.UTF8.GetBytes(json);
                if (MqConfigInfo.MQPersistModel)
                {
                    ((IBasicProperties)mmb.GetContentHeader()).DeliveryMode = 2;
                }
                MQModel.BasicPublish(MqConfigInfo.MQExChange, MqConfigInfo.MQRoutingKey, (IBasicProperties)mmb.GetContentHeader(), body);
            }
            catch (Exception ex)
            {
                throw ex;
            }
            return true;
        }

        /// <summary>
        /// 发送消息
        /// </summary>
        /// <param name="message">消息主体</param>
        /// <returns></returns>
        public bool SendMsg(byte[] message)
        {
            try
            {
                IMapMessageBuilder mmb = new MapMessageBuilder(MQModel);
                System.Collections.Generic.IDictionary<string, object> header = mmb.Headers;
                //header["Header"] =MqConfigInfo.MQHeader;
                if (MqConfigInfo.MQPersistModel)
                {
                    ((IBasicProperties)mmb.GetContentHeader()).DeliveryMode = 2;
                }
                MQModel.BasicPublish(MqConfigInfo.MQExChange, MqConfigInfo.MQRoutingKey, (IBasicProperties)mmb.GetContentHeader(), message);
            }
            catch (Exception ex)
            {
                throw ex;
            }
            return true;
        }

        /// <summary>
        /// 发送消息
        /// </summary>
        /// <param name="message">消息主体</param>
        /// <returns></returns>
        public bool SendMsg(string message)
        {
            try
            {
                IMapMessageBuilder mmb = new MapMessageBuilder(MQModel);
                System.Collections.Generic.IDictionary<string, object> header = mmb.Headers;
                //header["Header"] =MqConfigInfo.MQHeader;
                byte[] body = Encoding.UTF8.GetBytes(message);
                if (MqConfigInfo.MQPersistModel)
                {
                    ((IBasicProperties)mmb.GetContentHeader()).DeliveryMode = 2;
                }
                MQModel.BasicPublish(MqConfigInfo.MQExChange, MqConfigInfo.MQRoutingKey, (IBasicProperties)mmb.GetContentHeader(), body);
            }
            catch (Exception ex)
            {
                throw ex;
            }
            return true;
        }

        public void Dispose()
        {
            if (MQModel != null)
            {
                MQModel.Dispose();
            }

            if (MQConnection != null)
            {
                MQConnection.Dispose();
            }
        }
    }

    /// <summary>
    /// 消息队列配置信息
    /// </summary>
    public class MqConfigInfo
    {
        public MqConfigInfo()
        {
            MQExChangeType = "direct";
            MQPersistModel = true;
        }

        /// <summary>
        /// 交换机
        /// </summary>
        public string MQExChange { get; set; }

        /// <summary>
        /// 交换机类型(fanout,direct,topic, headers)默认direct
        /// </summary>
        public string MQExChangeType { get; set; }

        /// <summary>
        /// 路由Key
        /// </summary>
        public string MQRoutingKey { get; set; }

        /// <summary>
        /// 消息头
        /// </summary>
        public string MQHeader { get; set; }

        /// <summary>
        /// 消息的持久化
        /// </summary>
        public bool MQPersistModel { get; set; }

        /// <summary>
        /// 队列名称
        /// </summary>
        public string MQQueueName { get; set; }
    }
}

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部