文档章节

消息队列、OSS常用操作封装

深圳大道
 深圳大道
发布于 2016/12/29 15:37
字数 1901
阅读 31
收藏 0
public class MessageQueue
    {
        #region Private Properties
        private const string _accessKeyId = "";
        private const string _secretAccessKey = "";
        private const string _endpoint = "";

        private static string _queueName;
        private const int _receiveTimes = 1;
        private const int _receiveInterval = 2;
        private const int batchSize = 6;

        #endregion

        #region 设置队列名称
        public string queueName
        {
            set
            {
                _queueName = value;

            }
            get
            {
                return _queueName;
            }
        }
        #endregion

        #region 判断消息队列是否存在
        /// <summary>
        /// 判断消息队列是否存在
        /// </summary>
        /// <returns></returns>
        public static bool QueueIsExist()
        {
            bool flag = false;
            try
            {
                using (IMNS client = new Aliyun.MNS.MNSClient(_accessKeyId, _secretAccessKey, _endpoint))
                {
                    var nativeQueue = client.GetNativeQueue(_queueName);
                    var getQueueAttributesResponse = nativeQueue.GetAttributes();
                    flag = true;
                }
            }
            catch (Exception ex) { Console.WriteLine(ex.ToString()); }
            return flag;
        }
        #endregion

        #region 创建消息队列
        /// <summary>
        /// 创建消息队列
        /// </summary>
        /// <returns></returns>
        public static string CreateQueue()
        {
            /*
                DelaySeconds	发送到该 Queue 的所有消息默认将以DelaySeconds参数指定的秒数延后可被消费,单位为秒。	0-604800秒(7天)范围内某个整数值,默认值为0
                MaximumMessageSize	发送到该Queue的消息体的最大长度,单位为byte。	1024(1KB)-65536(64KB)范围内的某个整数值,默认值为65536(64KB)。
                MessageRetentionPeriod	消息在该 Queue 中最长的存活时间,从发送到该队列开始经过此参数指定的时间后,不论消息是否被取出过都将被删除,单位为秒。	60 (1分钟)-1296000 (15 天)范围内某个整数值,默认值345600 (4 天)
                VisibilityTimeout	消息从该 Queue 中取出后从Active状态变成Inactive状态后的持续时间,单位为秒。	1-43200(12小时)范围内的某个值整数值,默认为30(秒)
                PollingWaitSeconds	当 Queue 中没有消息时,针对该 Queue 的 ReceiveMessage 请求最长的等待时间,单位为秒。	0-30秒范围内的某个整数值,默认为0(秒)
                LoggingEnabled	是否开启日志管理功能,True表示启用,False表示停用	True/False,默认为False             
             */
            string queueName = string.Empty;
            var createQueueRequest = new CreateQueueRequest
            {
                QueueName = _queueName,
                Attributes =
                {
                    DelaySeconds = 0,
                    MaximumMessageSize = 65536,
                    MessageRetentionPeriod = 345600,
                    VisibilityTimeout = 3600,
                    PollingWaitSeconds = 3
                }
            };
            try
            {
                using (IMNS client = new Aliyun.MNS.MNSClient(_accessKeyId, _secretAccessKey, _endpoint))
                {
                    var queue = client.CreateQueue(createQueueRequest);
                    queueName = queue.QueueName;
                }
            }
            catch (Exception ex) { Console.WriteLine(ex.ToString()); }
            Thread.Sleep(2000);
            return queueName;
        }
        #endregion

        #region 删除消息队列
        /// <summary>
        /// 删除消息队列
        /// </summary>
        /// <returns></returns>
        public static bool DeleteQueue()
        {
            bool flag = false;
            var deleteQueueRequest = new DeleteQueueRequest(_queueName);
            deleteQueueRequest.AddHeader("Accept", "IE6"); //Add extra request headers
            //deleteQueueRequest.AddParameter("param1", "value1"); //InvalidQueryString
            try
            {
                using (IMNS client = new Aliyun.MNS.MNSClient(_accessKeyId, _secretAccessKey, _endpoint))
                {
                    var deleteQueueResponse = client.DeleteQueue(deleteQueueRequest);
                    flag = true;
                }
            }
            catch (Exception ex) { Console.WriteLine(ex.ToString()); }
            return flag;
        }
        #endregion

        #region 发送消息(单条或多条)
        /// <summary>
        /// 发送消息(单条或多条)
        /// </summary>
        /// <param name="models">SendMessageRequest集合</param>
        /// <returns></returns>
        public static bool BathSendMessage(List<SendMessageRequest> models)
        {
            bool flag = false;
            try
            {
                using (IMNS client = new Aliyun.MNS.MNSClient(_accessKeyId, _secretAccessKey, _endpoint))
                {
                    var nativeQueue = client.GetNativeQueue(_queueName);
                    List<SendMessageRequest> requests = new List<SendMessageRequest>();
                    for (int i = 0; i < models.Count; i++)
                    {
                        requests.Add(models[i]);
                    }
                    BatchSendMessageRequest batchSendRequest = new BatchSendMessageRequest()
                    {
                        Requests = requests
                    };
                    var sendMessageResponse = nativeQueue.BatchSendMessage(batchSendRequest);
                    flag = true;
                }
            }
            catch (Exception ex) { Console.WriteLine(ex.ToString()); }
            return flag;
        }
        #endregion

        #region 消费消息(单条或多条)
        /// <summary>
        /// 消费消息(单条或多条)
        /// </summary>
        /// <param name="itemNum">数目</param>
        /// <returns></returns>
        public static List<Message> ReceiveMessage(int itemNum)
        {
            List<Message> lists = new List<Message>();
            try
            {
                using (IMNS client = new Aliyun.MNS.MNSClient(_accessKeyId, _secretAccessKey, _endpoint))
                {
                    var nativeQueue = client.GetNativeQueue(_queueName);
                    for (int i = 0; i < itemNum; i++)
                    {
                        var receiveMessageResponse = nativeQueue.ReceiveMessage();
                        Message message = receiveMessageResponse.Message;
                        lists.Add(message);
                    }
                }
            }
            catch (Exception ex) { Console.WriteLine(ex.ToString()); }
            return lists;
        }
        #endregion

        #region 删除消息
        /// <summary>
        /// 删除消息
        /// </summary>
        /// <param name="receiptHandle">receiptHandle</param>
        /// <returns></returns>
        public static bool DeleteMessage(string receiptHandle)
        {
            bool flag = false;
            var deletedReceiptHandle = receiptHandle;
            try
            {
                using (IMNS client = new Aliyun.MNS.MNSClient(_accessKeyId, _secretAccessKey, _endpoint))
                {
                    var nativeQueue = client.GetNativeQueue(_queueName);
                    var deleteMessageResponse = nativeQueue.DeleteMessage(receiptHandle);
                    flag = true;
                }
            }
            catch (Exception ex) { Console.WriteLine(ex.ToString()); }
            return flag;
        }
        #endregion

        #region 修改消息可见时间
        /// <summary>
        /// 修改消息可见时间
        /// </summary>
        /// <param name="receiptHandle">receiptHandle</param>
        /// <param name="visibilityTimeout">从现在到下次可被用来消费的时间间隔</param>
        /// <returns></returns>
        public static bool ChangeMessageVisibility(string receiptHandle, int visibilityTimeout)
        {
            bool flag = false;
            var deletedReceiptHandle = receiptHandle;
            try
            {
                using (IMNS client = new Aliyun.MNS.MNSClient(_accessKeyId, _secretAccessKey, _endpoint))
                {
                    var nativeQueue = client.GetNativeQueue(_queueName);
                    var changeMessageVisibilityRequest = new ChangeMessageVisibilityRequest
                    {
                        ReceiptHandle = receiptHandle,
                        VisibilityTimeout = visibilityTimeout
                    };
                    var changeMessageVisibilityResponse = nativeQueue.ChangeMessageVisibility(changeMessageVisibilityRequest);
                    flag = true;
                }
            }
            catch (Exception ex) { Console.WriteLine(ex.ToString()); }
            return flag;
        }
        #endregion
    }
using System;
using System.IO;
using System.Text;
using System.Threading;
using System.Security.Cryptography;
using Aliyun.OSS.Common;

namespace Aliyun.OSS.Samples
{
    /// <summary>
    /// 获取OSS对象
    /// </summary>
   public static class GetObjectSample
   {
       static string accessKeyId = Config.AccessKeyId;
       static string accessKeySecret = Config.AccessKeySecret;
       static string endpoint = Config.Endpoint;
       static OssClient client = new OssClient(endpoint, accessKeyId, accessKeySecret);

       static string key = "123456.jpg";
       static string fileToUpload = Config.FileToUpload;
       static string dirToDownload = Config.DirToDownload;

       static AutoResetEvent _event = new AutoResetEvent(false);

       public static void GetObjects(string bucketName)
       {
           GetObject(bucketName); //获取文件

           GetObjectByRequest(bucketName);

           AsyncGetObject(bucketName); //异步方式获取文件
       }

       public static void GetObject(string bucketName)
       {
           try
           {
               client.PutObject(bucketName, key, fileToUpload);

               var result = client.GetObject(bucketName, key);

               using (var requestStream = result.Content)
               {
                   using (var fs = File.Open(Path.Combine(dirToDownload, key), FileMode.OpenOrCreate))
                   {
                       int length = 4 * 1024;
                       var buf = new byte[length];
                       do
                       {
                           length = requestStream.Read(buf, 0, length);
                           fs.Write(buf, 0, length);
                       } while (length != 0);
                   }
               }

               Console.WriteLine("Get object succeeded");
           }
           catch (OssException ex)
           {
                Console.WriteLine("Failed with error code: {0}; Error info: {1}. \nRequestID:{2}\tHostID:{3}", 
                    ex.ErrorCode, ex.Message, ex.RequestId, ex.HostId);
           }
           catch (Exception ex)
           {
                Console.WriteLine("Failed with error info: {0}", ex.Message);
           }
        }

       public static void GetObjectByRequest(string bucketName)
       {
           try
           {
               client.PutObject(bucketName, key, fileToUpload);

               var request = new GetObjectRequest(bucketName, key);
               request.SetRange(0, 100);

               var result = client.GetObject(request);

               Console.WriteLine("Get object succeeded, length:{0}", result.Metadata.ContentLength);
           }
           catch (OssException ex)
           {
               Console.WriteLine("Failed with error code: {0}; Error info: {1}. \nRequestID:{2}\tHostID:{3}",
                   ex.ErrorCode, ex.Message, ex.RequestId, ex.HostId);
           }
           catch (Exception ex)
           {
               Console.WriteLine("Failed with error info: {0}", ex.Message);
           }
       }

       public static void AsyncGetObject(string bucketName)
       {
           const string key = "AsyncGetObject";
           try
           {
               client.PutObject(bucketName, key, fileToUpload);

               string result = "Notice user: put object finish";
               client.BeginGetObject(bucketName, key, GetObjectCallback, result.Clone());

               _event.WaitOne();
           }
           catch (OssException ex)
           {
               Console.WriteLine("Failed with error code: {0}; Error info: {1}. \nRequestID:{2}\tHostID:{3}",
                   ex.ErrorCode, ex.Message, ex.RequestId, ex.HostId);
           }
           catch (Exception ex)
           {
               Console.WriteLine("Failed with error info: {0}", ex.Message);
           }
       }

       private static void GetObjectCallback(IAsyncResult ar)
       {
           try
           {
               var result = client.EndGetObject(ar);

               using (var requestStream = result.Content)
               {
                   using (var fs = File.Open(dirToDownload + "/sample2.data", FileMode.OpenOrCreate))
                   {
                       int length = 4 * 1024;
                       var buf = new byte[length];
                       do
                       {
                           length = requestStream.Read(buf, 0, length);
                           fs.Write(buf, 0, length);
                       } while (length != 0);
                   }
               }

               Console.WriteLine(ar.AsyncState as string);
           }
           catch (Exception ex)
           {
               Console.WriteLine(ex.Message);
           }
           finally
           {
               _event.Set();
           }
       }
    }
}
using System;
using System.Collections.Generic;
using Aliyun.OSS.Common;

namespace Aliyun.OSS.Samples
{
    /// <summary>
    /// 删除OSS对象
    /// </summary>
    public static class DeleteObjectsSample
    {
        static string accessKeyId = Config.AccessKeyId;
        static string accessKeySecret = Config.AccessKeySecret;
        static string endpoint = Config.Endpoint;
        static OssClient client = new OssClient(endpoint, accessKeyId, accessKeySecret);

        public static void DeleteObject(string bucketName)
        {
            try
            {
                string key = null;
                var listResult = client.ListObjects(bucketName);
                foreach (var summary in listResult.ObjectSummaries)
                {
                    key = summary.Key;
                    break;
                }

                client.DeleteObject(bucketName, key);

                Console.WriteLine("Delete object succeeded");
            }
            catch (OssException ex)
            {
                Console.WriteLine("Failed with error code: {0}; Error info: {1}. \nRequestID:{2}\tHostID:{3}",
                    ex.ErrorCode, ex.Message, ex.RequestId, ex.HostId);
            }
            catch (Exception ex)
            {
                Console.WriteLine("Failed with error info: {0}", ex.Message);
            }
        }

        public static void DeleteObjects(string bucketName)
        {
            try
            {
                var keys = new List<string>();
                var listResult = client.ListObjects(bucketName);
                foreach (var summary in listResult.ObjectSummaries)
                {
                    keys.Add(summary.Key);
                    break; //不跳出删除全部
                }
                var request = new DeleteObjectsRequest(bucketName, keys, false);
                client.DeleteObjects(request);

                Console.WriteLine("Delete objects succeeded");
            }
            catch (OssException ex)
            {
                Console.WriteLine("Failed with error code: {0}; Error info: {1}. \nRequestID:{2}\tHostID:{3}", 
                    ex.ErrorCode, ex.Message, ex.RequestId, ex.HostId);
            }
            catch (Exception ex)
            {
                Console.WriteLine("Failed with error info: {0}", ex.Message);
            }
        }
    }
}
using System;
using System.IO;
using System.Threading;
using Aliyun.OSS.Common;
using System.Text;
using Aliyun.OSS.Util;

namespace Aliyun.OSS.Samples
{
    /// <summary>
    /// 上传文件或对象到OSS
    /// </summary>
    public static class PutObjectSample
    {
        static string accessKeyId = Config.AccessKeyId;
        static string accessKeySecret = Config.AccessKeySecret;
        static string endpoint = Config.Endpoint;
        static OssClient client = new OssClient(endpoint, accessKeyId, accessKeySecret);

        static string fileToUpload = Config.FileToUpload;

        static AutoResetEvent _event = new AutoResetEvent(false);

        /// <summary>
        /// sample for put object to oss
        /// </summary>
        public static void PutObject(string bucketName)
        {
            PutObjectFromFile(bucketName); //上传文件

            PutObjectFromString(bucketName); //上传String

            PutObjectWithDir(bucketName); //创建目录上传

            PutObjectWithMd5(bucketName); //MD5验证上传

            PutObjectWithHeader(bucketName); //设置Header上传

            AsyncPutObject(bucketName); //异步上传
        }

        public static void PutObjectFromFile(string bucketName)
        {
            const string key = "PutObjectFromFile";
            try
            {
                client.PutObject(bucketName, key, fileToUpload);
                Console.WriteLine("Put object:{0} succeeded", key);
            }
            catch (OssException ex)
            {
                Console.WriteLine("Failed with error code: {0}; Error info: {1}. \nRequestID:{2}\tHostID:{3}",
                    ex.ErrorCode, ex.Message, ex.RequestId, ex.HostId);
            }
            catch (Exception ex)
            {
                Console.WriteLine("Failed with error info: {0}", ex.Message);
            }
        }

        public static void PutObjectFromString(string bucketName)
        {
            const string key = "PutObjectFromString";
            const string str = "Aliyun OSS SDK for C#";

            try
            {
                byte[] binaryData = Encoding.ASCII.GetBytes(str); 
                var stream = new MemoryStream(binaryData);
                
                client.PutObject(bucketName, key, stream);
                Console.WriteLine("Put object:{0} succeeded", key);
            }
            catch (OssException ex)
            {
                Console.WriteLine("Failed with error code: {0}; Error info: {1}. \nRequestID:{2}\tHostID:{3}",
                    ex.ErrorCode, ex.Message, ex.RequestId, ex.HostId);
            }
            catch (Exception ex)
            {
                Console.WriteLine("Failed with error info: {0}", ex.Message);
            }
        }

        public static void PutObjectWithDir(string bucketName)
        {
            const string key = "folder/sub_folder/PutObjectFromFile";

            try
            {
                client.PutObject(bucketName, key, fileToUpload);
                Console.WriteLine("Put object:{0} succeeded", key);
            }
            catch (OssException ex)
            {
                Console.WriteLine("Failed with error code: {0}; Error info: {1}. \nRequestID:{2}\tHostID:{3}",
                    ex.ErrorCode, ex.Message, ex.RequestId, ex.HostId);
            }
            catch (Exception ex)
            {
                Console.WriteLine("Failed with error info: {0}", ex.Message);
            }
        }

        public static void PutObjectWithMd5(string bucketName)
        {
            const string key = "PutObjectWithMd5";

            string md5;
            using (var fs = File.Open(fileToUpload, FileMode.Open))
            {
                md5 = OssUtils.ComputeContentMd5(fs, fs.Length);
            }

            var meta = new ObjectMetadata() { ContentMd5 = md5 };
            try
            {
                client.PutObject(bucketName, key, fileToUpload, meta);

                Console.WriteLine("Put object:{0} succeeded", key);
            }
            catch (OssException ex)
            {
                Console.WriteLine("Failed with error code: {0}; Error info: {1}. \nRequestID:{2}\tHostID:{3}",
                    ex.ErrorCode, ex.Message, ex.RequestId, ex.HostId);
            }
            catch (Exception ex)
            {
                Console.WriteLine("Failed with error info: {0}", ex.Message);
            }
        }

        public static void PutObjectWithHeader(string bucketName)
        {
            const string key = "PutObjectWithHeader";
            try
            {
                using (var content = File.Open(fileToUpload, FileMode.Open))
                {
                    var metadata = new ObjectMetadata();                    
                    metadata.ContentLength = content.Length;

                    metadata.UserMetadata.Add("github-account", "qiyuewuyi");

                    client.PutObject(bucketName, key, content, metadata);

                    Console.WriteLine("Put object:{0} succeeded", key);
                }
            }
            catch (OssException ex)
            {
                Console.WriteLine("Failed with error code: {0}; Error info: {1}. \nRequestID:{2}\tHostID:{3}",
                    ex.ErrorCode, ex.Message, ex.RequestId, ex.HostId);
            }
            catch (Exception ex)
            {
                Console.WriteLine("Failed with error info: {0}", ex.Message);
            }
        }

        public static void AsyncPutObject(string bucketName)
        {
            const string key = "AsyncPutObject";
            try
            {
                // 1. put object to specified output stream
                using (var fs = File.Open(fileToUpload, FileMode.Open))
                {
                    var metadata = new ObjectMetadata();
                    metadata.UserMetadata.Add("mykey1", "myval1");
                    metadata.UserMetadata.Add("mykey2", "myval2");
                    metadata.CacheControl = "No-Cache";
                    metadata.ContentType = "text/html";

                    string result = "Notice user: put object finish";
                    client.BeginPutObject(bucketName, key, fs, metadata, PutObjectCallback, result.ToCharArray());

                    _event.WaitOne();
                }
            }
            catch (OssException ex)
            {
                Console.WriteLine("Failed with error code: {0}; Error info: {1}. \nRequestID:{2}\tHostID:{3}",
                    ex.ErrorCode, ex.Message, ex.RequestId, ex.HostId);
            }
            catch (Exception ex)
            {
                Console.WriteLine("Failed with error info: {0}", ex.Message);
            }
        }

        private static void PutObjectCallback(IAsyncResult ar)
        {
            try
            {
                client.EndPutObject(ar);

                Console.WriteLine(ar.AsyncState as string);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex.Message);
            }
            finally
            {
                _event.Set();
            }
        }
    }
}

本文转载自:http://blog.csdn.net/smartsmile2012/article/details/52994186

深圳大道
粉丝 3
博文 877
码字总数 0
作品 0
深圳
架构师
私信 提问
【MPS最佳实践】媒体工作流转码

摘要: 背景 1个输入文件对应多个输出文件(不同分辨率,不同格式等),通过控制台的图形化界面,快速搭建常用视频处理流程。 优势 简单易用,视频上传完成自动触发转码任务。 功能强大,支持...

aliyunyunqi
2018/02/28
0
0
分布式架构开发套件 - jeesuite-libs

简介 jeesuite-libs分布式架构开发套件。包括缓存(一二级缓存、自动缓存管理)、队列、分布式定时任务、文件服务(七牛、阿里云OSS、fastDFS)、日志、搜索、代码生成、API网关、配置中心、统一...

vakinge
2018/11/27
0
0
如何使用API提交转码任务?

当常规的转码工作流无法满足用户的场景时,需用户自己判断业务逻辑,并使用API提交转码任务。例如:并不是所有的视频都需要转码,不同视频需要设置不同的转码配置。本文将介绍API提交转码任务...

樰篱
2017/12/29
0
0
.NetCore利用BlockingCollection实现简易消息队列

消息队列现今的应用场景越来越大,常用的有RabbmitMQ和KafKa。 我们用BlockingCollection来实现简单的消息队列。 实现消息队列 用Vs2017创建一个控制台应用程序。创建DemoQueueBlock类,封装...

范存威
2018/08/31
0
0
Android Handler异步通信:深入详解Handler机制源码

前言 在开发的多线程应用场景中,机制十分常用 今天,我将手把手带你深入分析 机制的源码,希望你们会喜欢 目录 1. Handler 机制简介 在多线程的应用场景中,将工作线程中需更新的操作信息 ...

carson_ho
2018/05/21
0
0

没有更多内容

加载失败,请刷新页面

加载更多

springmvc集成cas,并解决前后端分离情况

1.最近项目需要集成已经存在的cas系统。 但是目前已集成的系统都是jsp。而我们项目是前后端分离开发(伪),没有分开部署。 2.cas原理就不介绍了 网上例子很多。基本都是使用302重定向实现的...

起名字什么的太麻烦了
35分钟前
3
0
HDFS-原理

1. 写操作

叶枫啦啦
今天
2
0
聊聊elasticsearch的MembershipAction

序 本文主要研究一下elasticsearch的MembershipAction MembershipAction elasticsearch-6.7.1/server/src/main/java/org/elasticsearch/discovery/zen/MembershipAction.java public class M......

go4it
今天
3
0
Redis集群

Redis cluster tutorial Redis集群提供一种方式自动将数据分布在多个Redis节点上。 Redis Cluster provides a way to run a Redis installation where data is automatically sharded acros......

OSC首席混子
今天
4
0
AWS codecommit 触发jenkins工作

在gitlab和github上面都有直接发送webhook的配置,但是在AWS上面是没有的直接配置webhook触发jenkins构建的设置的。 通过查看AWS多个service的官方文档,找到了解决方案,方案如下: (1)在c...

守护-创造
今天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部