文档章节

使用go sdk进行ceph s3 分片上传和分段下载

R
 RinChen
发布于 2017/03/20 20:03
字数 966
阅读 1101
收藏 1
package main

import (
	"bufio"
	"bytes"
	"errors"
	"fmt"
	"github.com/astaxie/beego"
	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/credentials"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/s3"
	"io"
	"os"
)

type CephMgmt struct {
	host        string `ceph host`
	DoName      string `ceph doname`
	bucket_id   string `bucket_id`
	PathStyle   bool   `ceph url style, true means you can use host directly,  false means bucket_id.doname will be used`
	AccessKey   string `aws s3 aceessKey`
	SecretKey   string `aws s3 secretKey`
	block_size int64  `block_size`
}

var Ceph CephMgmt

type MyProvider struct{}

func (m *MyProvider) Retrieve() (credentials.Value, error) {

	return credentials.Value{
		AccessKeyID:     Ceph.AccessKey, //"9YGYCFH1V3QUT4B9KZD1",
		SecretAccessKey: Ceph.SecretKey, //"9hdiwSjCfyrZPKGObH8Kctdur8PiBPJu3B4zGYaZ",
	}, nil
}
func (m *MyProvider) IsExpired() bool { return false }

func (this *CephMgmt) Init() error {
	// "http://s3.devopscloud.com"
	this.host = beego.AppConfig.String("ceph::host")
	if len(this.host) <= 0 {
		return errors.New("ceph conf host is nil")
	}
	this.bucket_id = beego.AppConfig.String("ceph::bucket_id")
	if len(this.bucket_id) <= 0 {
		return errors.New("ceph conf bucket_id is nil")
	}
	this.block_size, _ = beego.AppConfig.Int64("ceph::block_size")
	if this.block_size <= 5*1024*1024 {
		this.block_size = 5 * 1024 * 1024
	}
	return nil
}


func (this *CephMgmt) Init2(host string, bucket string, accesskey string, secretkey string) error {

	this.host = host
	this.bucket_id = bucket
	this.AccessKey = accesskey
	this.SecretKey = secretkey
	this.PathStyle = true
	this.block_size = 8 * 1024 * 1024

	return nil
}

func (this *CephMgmt) connect() (*s3.S3, error) {
	///////////////创建连接
	sess := session.Must(session.NewSessionWithOptions(session.Options{
		Config: aws.Config{
			Region: aws.String("default"), //Required 目前尚未分区,填写default即可
			//EndpointResolver: endpoints.ResolverFunc(s3CustResolverFn),
			Endpoint:         &this.host,
			S3ForcePathStyle: &this.PathStyle,
			Credentials:      credentials.NewCredentials(&MyProvider{}),
		},
	}))
	// Create the S3 service client with the shared session. This will
	// automatically use the S3 custom endpoint configured in the custom
	// endpoint resolver wrapping the default endpoint resolver.
	return s3.New(sess), nil
}


func (this *CephMgmt) Download(src_name, dst_name string) error {
	s3Svc, _ := this.connect()
	// Operation calls will be made to the custom endpoint.
	resp, err := s3Svc.GetObject(&s3.GetObjectInput{
		Bucket: &this.bucket_id, //Required 可认为是cpid,也可认为是对于根目录而言的第一级目录.必须先创建,在存储。
		Key:    &src_name,       //Required 文件名,中间可以带着路径,格式如:{path}/{filename}
		//Range:  aws.String("bytes=0-499"),             //not must be Required 文件范围,如果没有则是全文件
	})
	if err != nil {
		// Print the error, cast err to awserr.Error to get the Code and
		// Message from an error.
		fmt.Println("hello , i met error")
		fmt.Println(err.Error())
		return err
	}

	fmt.Println("AcceptRanges :", *resp.AcceptRanges)
	fmt.Println("ContentLength:", *resp.ContentLength)
	//fmt.Println("ContentRange :", *resp.ContentRange)
	//fmt.Println("PartsCount   :", *resp.PartsCount)
	// Pretty-print the response data.
	//fmt.Println(resp)

	//数据都在resp.Body中,可以转储到文件,具体操作参考go的操作
	out, err := os.OpenFile(dst_name, os.O_CREATE|os.O_RDWR, 0666)
	if out == nil {
		fmt.Println("Open fail")
		return err
	}
	num, err := io.Copy(out, resp.Body)
	fmt.Printf("\n write %d err %v \n", num, err)
	return nil
}

func (this *CephMgmt) Upload(src_name, dst_name string) error {
	s3Svc, _ := this.connect()

	dst_full_name := fmt.Sprintf("/%s/%s", this.bucket_id, dst_name) //Notice: 如果要讲桶ID拼到全路径下,一定要在最前面加'/'

	//////////////////////////////////////// 存入 /////////////////////////////
	/////////////// 创建一个分片上传context
	param_init := &s3.CreateMultipartUploadInput{
		Bucket: aws.String(this.bucket_id), // Required {bucket}
		Key:    aws.String(dst_full_name),  // Required /{bucket}/{path}/{filename}
		/*
			Metadata: map[string]*string{
				"Key": aws.String("lasttime_if_need"), // Required 可以填充一些文件属性,ceph云存储不关心其中的内容
			},
		*/
	}

	resp_init, err := s3Svc.CreateMultipartUpload(param_init)
	if err != nil {
		// Print the error, cast err to awserr.Error to get the Code and
		// Message from an error.
		fmt.Println("I am create upload.", err.Error())
		return err

	}
	// Pretty-print the response data.
	up_id := resp_init.UploadId
	fmt.Println(resp_init) // 需要保存该resp , resp.UploadId 是对象网关创建的标识

	///////////////////////// 分片上传
	f, err := os.Open(src_name)
	if err != nil {
		return err
	}
	defer f.Close()

	bfRd := bufio.NewReader(f)
	buf := make([]byte, this.block_size) //TODO:一次读取多少个字节

	var part_num int64 = 0
	var completes []*s3.CompletedPart

	for {
		n, err := bfRd.Read(buf)
		if err == io.EOF {
			fmt.Println("read data finished")
			break
		}
		if int64(n) != this.block_size {
			data := make([]byte, n)
			data = buf[0:n]
			buf = data
		}
		//////////////////////// 每次上传一个分片,每次的PartNumber都要唯一
		part_num++
		param := &s3.UploadPartInput{
			Bucket:        aws.String(this.bucket_id), // Required bucket
			Key:           aws.String(dst_name),       // Required {path}/{filename}
			PartNumber:    aws.Int64(part_num),        // Required 每次的序号唯一且递增
			UploadId:      up_id,                      // Required 创建context时返回的值
			Body:          bytes.NewReader(buf),       // Required 数据内容
			ContentLength: aws.Int64(int64(n)),        // Required 数据长度
		}

		resp2, err := s3Svc.UploadPart(param)
		if err != nil {
			fmt.Printf("Hello ,i am wrong[%s][%d][%d]\n", dst_name, part_num, n)
			return err
		}
		fmt.Println(resp2) // 需要保存该resp,因为resp.Etag 需要在通知完成上传时使用

		var c s3.CompletedPart
		c.PartNumber = aws.Int64(part_num) // Required Etag对应的PartNumber, 上一步返回的
		c.ETag = resp2.ETag                // Required 上传分片时返回的值 Etag
		completes = append(completes, &c)
	}

	/////////////////////////// 结束上传
	params := &s3.CompleteMultipartUploadInput{
		Bucket:   aws.String(this.bucket_id), // Required {bucket}
		Key:      aws.String(dst_name),       // Required {path}/{filename}
		UploadId: up_id,                      // Required 创建context时返回的值
		MultipartUpload: &s3.CompletedMultipartUpload{
			Parts: completes,
		},
		//RequestPayer: aws.String("RequestPayer"),
	}
	resp_comp, err := s3Svc.CompleteMultipartUpload(params)
	if err != nil {
		// Print the error, cast err to awserr.Error to get the Code and
		// Message from an error.
		fmt.Println(err.Error())
		return err
	}
	// Pretty-print the response data.
	fmt.Println(resp_comp)
	return nil
}

func main() {
    //既可以使用域名,也可以使用ip+端口的形式
	Ceph.Init2( /*"http://172.16.20.15"*/ "http://s3.devopscloud.com:80", "new-bucket-5de4d39a", "9YGYCFH1V3QUT4B9KZD1", "9hdiwSjCfyrZPKGObH8Kctdur8PiBPJu3B4zGYaZ")
	Ceph.Download("1.mp4", "./1.mp4")
	Ceph.Upload("1.mp4", "api/rest/2017-3-16-10-38.mp4")
}

© 著作权归作者所有

R
粉丝 1
博文 1
码字总数 966
作品 0
廊坊
私信 提问
RGW S3 Multipart解析

S3分段上传技术主要应用在大文件的数据上传上,通常在S3客户端会对上传的大文件做一次分片操作。在RGW内部还会对S3客户端发送过来的数据再进行一次分片处理,RGW默认分片大小是4MB。下面就M...

linuxhunter
2016/04/13
621
1
干货 | 基于Go SDK操作京东云对象存储OSS的入门指南

前言 本文介绍如何使用Go语言对京东云对象存储OSS进行基本的操作,帮助客户快速通过Go SDK接入京东云对象存储,提高应用开发的效率。 在实际操作之前,我们先看一下京东云OSS的API接口支持范...

京东云技术新知
07/02
23
0
Ceph RGW bucket 自动分片介绍和存在的问题

工作中存储集群使用了 Ceph 技术,所用的是版本是 Luminous 12.2.4,因为刚刚上手 Ceph,不少概念和问题也都是头一次听说,比如这次的自动分片(auto resharding)。不得不说,Ceph 对象存储...

blackpiglet
2018/08/14
0
0
rgw object read and write

一、Get Object。 1、读取Object的主要处理流程。 RGWGetObj::execute() |创建RGWGetObjCB类实例,其中handledata()函数为回调函数,该函数会调用RGWGetObj::getdatacb()函数,而该函数最终会...

linuxhunter
2016/04/12
912
4
S3 协议兼容的分布式对象存储系统 - Yig

Yet another Index Gateway Yig 是 S3 协议兼容的分布式对象存储系统。它脱胎于开源软件 ceph ,在多年的商业化运维中, 针对运维中出现的问题和功能上的新需求,重新实现了一遍 radosgw 用于...

匿名
2018/05/23
2.4K
3

没有更多内容

加载失败,请刷新页面

加载更多

c语言实现Sqlite3的创建db和增删改查db操作

SQLite,是一款轻型的数据库,而且目前已经在很多嵌入式产品中使用了它,它占用资源非常的低,在嵌入式设备中使用广泛,现在准备学习一下sqlite3的使用方法并写一个测试demo,后面在项目智能...

jorin_zou
20分钟前
3
0
【2019年8月版本】OCP 071认证考试最新版本的考试原题-第2题

choose three Which three are true about the CREATE TABLE command? A) It can include the CREATE...INDEX statement for creating an index to enforce the primary key constraint. B) ......

oschina_5359
23分钟前
3
0
如何在二维码中循环批量插入图片

现在二维码种类比较多,为了突出二维码的个性及吸引客户,很多朋友都喜欢在二维码上插入图片。想要每个二维码都与众不同,但是有的时候需要批量插入图片数量有限,如果制作的二维码比较多的话...

中琅软件
24分钟前
4
0
LTR那点事—AUC及其与线上点击率的关联详解

LTR(Learning To Rank)学习排序是一种监督学习(SupervisedLearning)的排序方法,现已经广泛应用于信息索引,内容推荐,自然语言处理等多个领域。以推荐系统为例,推荐一般使用多个子策略...

达观数据
24分钟前
3
0
IntelliJ 如何显示代码的代码 docs

希望能够在 IntelliJ 代码上面显示方法的 docs。 如何进行显示? 你可以使用 Ctrl + Q 这个快捷键来查看方法的 Docs。 https://blog.ossez.com/archives/3061...

honeymoose
27分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部