文档章节

常用aws2 python 和 boto代码片段

wzyuliyang
 wzyuliyang
发布于 2016/10/29 09:19
字数 3037
阅读 160
收藏 1

常用aws2 python片段

import requests
from awsauth import S3Auth
host = 'yuliyangdebugweb68.tunnel.qydev.com'
access_key = 'admin'
secret_key = 'admin'
cmd = '/bucket/object'
url = 'http://%s%s' % (host,cmd)
response = requests.get(url, auth=S3Auth(access_key, secret_key,service_url=host))
data = dump.dump_all(response)
print(data.decode('utf-8'))


# -*- coding: utf-8 -*-
import hmac
import hashlib
import base64 
import datetime
import sys
import requests
from requests_toolbelt.utils import dump

if len(sys.argv) < 3:
    print('bad syntax, usage: {script_name} host bname')
    exit()
host, bname = sys.argv[1], sys.argv[2]
access_key = 'admin'
secret_key = 'admin'
timestr = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
hstr = ''
hstr += 'GET\n'
hstr += '\n'
hstr += '\n'
hstr += timestr + '\n'
hstr += '/' + bname
key = bytearray(secret_key, 'utf-8')
hres = hmac.new(key, hstr.encode('utf-8'), hashlib.sha1).digest()
hres = base64.b64encode(hres)
hres = hres.decode('utf-8')
headers = {'Host': host,'Date': timestr,'Authorization':'AWS ' + access_key + ':' + hres}
response = requests.get('http://' + host + '/' + bname,headers=headers)

data = dump.dump_all(response)
print(data.decode('utf-8'))


import requests
from requests_toolbelt.utils import dump
from awsauth import S3Auth
host = 'yuliyangdebugweb68.tunnel.qydev.com'
access_key = 'admin'
secret_key = 'admin'

cmd = '/beijing-placement-b5/'
content = '''<CreateBucketConfiguration><LocationConstraint>:beijing-placement</LocationConstraint></CreateBucketConfiguration>'''
serverurl = 'http://%s' % host
url = 'http://%s%s' % (host,cmd)
response = requests.put(url,auth=S3Auth(access_key, secret_key, serverurl),data=content)
data = dump.dump_all(response)
print(data.decode('utf-8'))

cmd = '/suzhou-placement-b5/'
content = '''<CreateBucketConfiguration><LocationConstraint>:suzhou-placement</LocationConstraint></CreateBucketConfiguration>'''
serverurl = 'http://%s' % host
url = 'http://%s%s' % (host,cmd)
response = requests.put(url,auth=S3Auth(access_key, secret_key, serverurl),data=content)
data = dump.dump_all(response)
print(data.decode('utf-8'))

cmd = '/shanghai-placement-b5/'
content = '''<CreateBucketConfiguration><LocationConstraint>:shanghai-placement</LocationConstraint></CreateBucketConfiguration>'''
serverurl = 'http://%s' % host
url = 'http://%s%s' % (host,cmd)

response = requests.put(url,auth=S3Auth(access_key, secret_key, serverurl),data=content)
data = dump.dump_all(response)
print(data.decode('utf-8'))









# -*- coding: utf-8 -*-
import hmac
import hashlib
import base64 
import datetime
import sys
import requests
from requests_toolbelt.utils import dump

if len(sys.argv) < 3:
    print('bad syntax, usage: {script_name} host bname')
    exit()
host, bname = sys.argv[1], sys.argv[2]
access_key = 'admin'
secret_key = 'admin'
timestr = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
hstr = ''
hstr += 'PUT\n'
hstr += '\n'
hstr += '\n'
hstr += timestr + '\n'
hstr += '/' + bname
key = bytearray(secret_key, 'utf-8')
hres = hmac.new(key, hstr.encode('utf-8'), hashlib.sha1).digest()
hres = base64.b64encode(hres)
hres = hres.decode('utf-8')
headers = {'Host': host,'Date': timestr,'Authorization':'AWS ' + access_key + ':' + hres}
response = requests.put('http://' + host + '/' + bname,headers=headers)

data = dump.dump_all(response)
print(data.decode('utf-8'))



import requests
import logging
from requests_toolbelt.utils import dump
from aws_requests_auth.aws_auth import AWSRequestsAuth
from awsauth import S3Auth

# host = 'yuliyangdebugwebhammer.tunnel.qydev.com'
# host = 'yuliyangdebugwebjewel.tunnel.qydev.com'
logging.basicConfig(level=logging.DEBUG)
host = '192.168.10.10:7480'
# host = 'yuliyangdebugweb68.tunnel.qydev.com'
# access_key = 'yly-test1'
# secret_key = 'yly-test1'
# access_key = 'D5K1G7NOD4385ITYCOPO'
# secret_key = 'm8dlnoy0xW69zcNcUSACBcXN7DfgDEYb9DDWiodq'
access_key = 'admin'
secret_key = 'admin'
# /admin/bucket?format=json&uid=admin&stats=True
# cmd = '/admin/user?info&uid=date1&stats=True'
cmd = '/admin/config'
# cmd = '/admin/bucket?info&uid=date1&stats=True'
# cmd = '/admin/bucket?info&stats=True&show-summary=True'
# cmd = '/admin/bucket?info&uid=date1'
url = 'http://%s%s' % (host,cmd)
# url = 'http://%s' % host
response = requests.get(url, auth=S3Auth(access_key, secret_key,service_url=host))
data = dump.dump_all(response)
print(data.decode('utf-8'))





常用rgwadmin

# -*- coding: UTF-8 -*-
from rgwadmin import *
from requests_toolbelt.utils import dump
import logging
logging.basicConfig(level=logging.DEBUG)
rgw = RGWAdmin(access_key='admin', secret_key='admin', server='10.254.9.40:80',secure=False)
# resp = rgw.get_user(uid='admin')   #create_subuser(uid='admin',subuser='read-only',key_type='swift',generate_secret=True)
# print resp
# rgw = RGWAdmin(access_key='admin', secret_key='admin', server='10.254.3.81:8000',secure=False)
# rgw = RGWAdmin(access_key='admin', secret_key='admin', server='yuliyangdebugwebjewel.tunnel.qydev.com',secure=False)
# rsp = rgw.create_key(uid='27a6fbe45b354bda88007beb7a71f1bc',key_type='s3',access_key='82HA7EN6BDYZ26738UIE')
# rgw.get_bucket(uid='admin',stats=True)
# rsp = rgw.remove_key(uid='27a6fbe45b354bda88007beb7a71f1bc',key_type='s3',access_key='JBHCFUSM7OG0NHZ40IMR')
# rsp = rgw.create_bucket(bucket="sssdssssss")
# rsp =rgw.remove_bucket(bucket='lalala',purge_objects=True)
# rsp = rgw.create_user(uid='testquato2',display_name='testquato2',access_key='quta2',secret_key='quta2',max_buckets=10)
# rsp = rgw.set_quota(uid='testquato1',quota_type='bucket',max_size_kb=1024,enabled=True)
# rgw.set_quota()
# rsp = rgw.get_usage(uid='admin',show_summary=True,show_entries=True,start="2016-07-08 02:00:00")
rsp = rgw.get_metadata(metadata_type='user',key="testtest")
# print rsp
data = dump.dump_all(rsp)
print(data.decode('utf-8'))
# from datetime import *
# datetime.utcnow()
# import time
# print time.strftime("%Y-%m-%d  %H:%M:%S", time.gmtime())
# response = rgw.get_usage(uid='admin',show_summary=True,show_entries=True,start="2016-07-08 02:00:00")
# rgw.get_bucket(bucket="yuliyang_count",uid="admin",stats=True)
# rgw.create_subuser(uid='admin',subuser='admin-sub5')
# resp =  rgw.get_user(uid='admin');
# rgw.get_user(uid='admin')
# print resp
# response = rgw.trim_usage(uid="user2",start="2016-07-09 02:00:00",end="2016-07-10 03:00:00")
# response = rgw.get_users()
# rgw.create_user("testcreateuser5","testcreateuser5","testcreateuser5@qq.com",access_key="testcreateuser5",secret_key="testcreateuser5",user_caps="usage=write;users=write",generate_key=False,max_buckets=10,suspended=False)
# rgw.modify_user("testcreateuser5","testcreateuser5","testcreateuser5@qq.com",access_key="testcreateuser5",secret_key="testcreateuser5",user_caps="usage=write;users=read",max_buckets=10,suspended=False)
# response = rgw.modify_user("testcreateuser","testcreateuser","testcreateuser@163.com",access_key="testcreateuser",secret_key="testcreateuser",generate_key=False,max_buckets=20,suspended=False)
# rgw.remove_user('testcreateuser',purge_data=True)
# rgw.create_subuser(uid='admin',subuser="admin-subuser2",access='full',generate_secret=True)
# rgw.modify_subuser(uid='admin',subuser="admin-subuser2",access="write",secret="admin-subuser2",generate_secret=True)
# rgw.remove_subuser(uid='admin',subuser='admin-subuser2',purge_keys=True)
# rgw.create_key(uid='admin',generate_key=True,subuser='admin-subuser3',key_type='swift',access_key="admin-subuser3",secret_key='admin-subuser3')
# rgw.remove_key('admin-subuser3',uid='admin',subuser='admin-subuser3',key_type='swift')
# rgw.get_bucket(bucket='multi',stats=False)
# rgw.check_bucket_index('multi',check_objects=True,fix=True)
# rgw.remove_bucket('user1-003',purge_objects=True)
# rgw.unlink_bucket('user1-002',uid='user1')
# rgw.link_bucket('user1-002',uid='user2',bucket_id='default.14181.5')
# rgw.link_bucket('user1-002',uid='user2',bucket_id='31c33a77-df26-4fdb-a69f-49ed49d8615c.84107.5')
# rgw.remove_object(bucket='multi',object_name='init-multi')
# rgw.get_policy('acltest',object_name='aclobj')
# rgw.add_capability(uid='user1',user_caps='usage=read')
# rgw.remove_capability(uid='user1',user_caps='usage=read')
# rgw.get_quota(uid='user1',quota_type='user')
# rgw.get_user_quota(uid='user1')
# rgw.get_user_bucket_quota(uid='user1')
# rgw.set_quota(uid='user1',quota_type='user',max_size_kb=102400,max_objects=20)
# rgw.set_quota(uid='user1',quota_type='bucket',max_size_kb=1024000,max_objects=200)
# rgw.get_bucket('user1-002',stats=True)
# print response
# data = dump.dump_all(response)
# print(data.decode('utf-8'))
# u = RGWUser.create(user_id='testsdadasd', display_name='Test')
# u.user_quota.size = 1024 * 1024  # in bytes
# u.user_quota.enabled = True
# u.save()
#u.delete()
# -*- coding: UTF-8 -*-
from rgwadmin import *
from requests_toolbelt.utils import dump
import logging
import sys
from datetime import *
import math
logging.basicConfig(level=logging.DEBUG)
# print '{:%Y-%m-%d %H:%M:%S}'.format(datetime.now().replace(minute=0,second=0,microsecond=0)-timedelta(hours=1))
# print '{:%Y-%m-%d %H:%M:%S}'.format(datetime.now() - timedelta(hours=8))
# print '{:%Y-%m-%d %H:%M:%S}'.format(datetime.now() - timedelta(hours=16))
# print '{:%Y-%m-%d %H:%M:%S}'.format(datetime.now() - timedelta(hours=18))
# rgw = RGWAdmin(access_key='admin', secret_key='admin', server='yuliyangdebugwebjewel.tunnel.qydev.com',secure=False)
# rgw = RGWAdmin(access_key='admin', secret_key='admin', server='yuliyangdebugwebjewel.tunnel.qydev.com',secure=False)
# rgw = RGWAdmin(access_key='testcaps1', secret_key='testcaps1', server='yuliyangdebugweb68.tunnel.qydev.com',secure=False)
# response = rgw.get_usage(show_summary=True,show_entries=True,start='{:%Y-%m-%d %H:%M:%S}'.format(datetime.now().replace(minute=0,second=0,microsecond=0))
#                         )
# rgw = RGWAdmin(access_key='testcaps2', secret_key='testcaps2', server='yuliyangdebugweb68.tunnel.qydev.com',secure=False)
# response = rgw.get_usage(show_summary=True,show_entries=True,start='{:%Y-%m-%d %H:%M:%S}'.format(datetime.now().replace(minute=0,second=0,microsecond=0))
#                          )
# rgw = RGWAdmin(access_key='testcaps3', secret_key='testcaps3', server='yuliyangdebugweb68.tunnel.qydev.com',secure=False)
# response = rgw.get_usage(show_summary=True,show_entries=True,start='{:%Y-%m-%d %H:%M:%S}'.format(datetime.now().replace(minute=0,second=0,microsecond=0))
#                          )
# rgw = RGWAdmin(access_key='testcaps4', secret_key='testcaps4', server='yuliyangdebugweb68.tunnel.qydev.com',secure=False)
# response = rgw.get_usage(show_summary=True,show_entries=True,start='{:%Y-%m-%d %H:%M:%S}'.format(datetime.now().replace(minute=0,second=0,microsecond=0))
#                          )
# rgw = RGWAdmin(access_key='testcaps3', secret_key='testcaps3', server='yuliyangdebugweb68.tunnel.qydev.com',secure=False)
# # response = rgw.get_usage(show_summary=True,show_entries=True,start='{:%Y-%m-%d %H:%M:%S}'.format(datetime.now().replace(minute=0,second=0,microsecond=0))
# #                          )
# # response = rgw.get_metadata(metadata_type='bucket',key='bababa')
# response = rgw.set_metadata(metadata_type='bucket',key='bababa',json_string='{"key":"bucket:bababa","ver":{"tag":"_8KAo6w6VPo5fhGtzTvxwRaE","ver":1},"mtime":"2016-07-24 23:43:19.214419Z","data":{"bucket":{"name":"bababa","pool":"yuliyang","data_extra_pool":"default.rgw.buckets.non-ec","index_pool":"default.rgw.buckets.index","marker":"b74b128b-eac1-4f3a-a5ca-60536d190664.694099.2","bucket_id":"b74b128b-eac1-4f3a-a5ca-60536d190664.694099.2"},"owner":"date2","creation_time":"0.000000","linked":"true","has_bucket_info":"false"}}')
# #
# rgw = RGWAdmin(access_key='admin', secret_key='admin', server='yuliyangdebugweb68.tunnel.qydev.com',secure=False)
rgw = RGWAdmin(access_key='admin', secret_key='admin', server='10.254.9.40:80',secure=False)
# response = rgw.get_usage(show_summary=True,show_entries=True,start='{:%Y-%m-%d %H:%M:%S}'.format(datetime.now().replace(minute=0,second=0,microsecond=0))
#                          )
# response = rgw.get_metadata(metadata_type='bucket',key='bababa')
# response = rgw.set_metadata(metadata_type='bucket',key='bababa',json_string='{"key":"bucket:bababa","ver":{"tag":"_8KAo6w6VPo5fhGtzTvxwRaE","ver":1},"mtime":"2016-07-24 23:43:19.214419Z","data":{"bucket":{"name":"bababa","pool":"yuliyang","data_extra_pool":"default.rgw.buckets.non-ec","index_pool":"default.rgw.buckets.index","marker":"b74b128b-eac1-4f3a-a5ca-60536d190664.694099.2","bucket_id":"b74b128b-eac1-4f3a-a5ca-60536d190664.694099.2"},"owner":"date2","creation_time":"0.000000","linked":"true","has_bucket_info":"false"}}')
#
rsp = rgw.get_metadata(metadata_type='user',key="testtest")
rsp = rgw.get_user(uid="testtest")
rsp = rgw.get_metadata(metadata_type='bucket',key='testtesttest')
# import json
# import pprint
# parsed = json.loads(str(rsp)
# json.dumps(parsed, indent=4)
# data = dump.dump_all(rsp)
# print(data.decode('utf-8'))
print rsp
# response = rgw.get_usage(uid='date2', show_summary=True, show_entries=True,
#                          start='{:%Y-%m-%d %H:%M:%S}'.format(datetime.now() - timedelta(hours=16)))
# response = rgw.get_usage(uid='date2',show_summary=True,show_entries=True,start="2016-07-22 00:00:00")
# print response
# response = rgw.get_usage(uid='date2',show_summary=True,show_entries=True,start="2016-07-22 08:00:00")
# print response
#rep = rgw.create_key(uid='admin',generate_key=True,subuser='admin-subuser4',key_type='s3',access_key="admin-subuser4",secret_key='admin-subuser4')
# rgw.remove_subuser(uid='uid_test2',subuser='ngnn',purge_keys=True)
#
# rgw.get_users()
# response = rgw.create_subuser(uid='wc',subuser='wc:swift2',generate_secret=True,access='full')
# print response
# print sys.getsizeof(response)



# -*- coding: UTF-8 -*-
from rgwadmin import *
from requests_toolbelt.utils import dump
import logging
import sys
from datetime import *
import math
logging.basicConfig(level=logging.DEBUG)
rgw = RGWAdmin(access_key='admin', secret_key='admin', server='10.254.3.68:7480',secure=False)


print '{:%Y-%m-%d %H:%M:%S}'.format(datetime.now().replace(minute=0,second=0,microsecond=0)-timedelta(hours=1))
print '{:%Y-%m-%d %H:%M:%S}'.format(datetime.now() - timedelta(hours=8))
print '{:%Y-%m-%d %H:%M:%S}'.format(datetime.now() - timedelta(hours=16))
# print '{:%Y-%m-%d %H:%M:%S}'.format(datetime.now() - timedelta(hours=18))
# rgw = RGWAdmin(access_key='admin', secret_key='admin', server='yuliyangdebugwebjewel.tunnel.qydev.com',secure=False)
# rgw = RGWAdmin(access_key='admin', secret_key='admin', server='yuliyangdebugwebjewel.tunnel.qydev.com',secure=False)
response = rgw.get_usage(show_summary=True,show_entries=True,uid='admin')


#
# response = rgw.get_usage(uid='date2', show_summary=True, show_entries=True,
#                          start='{:%Y-%m-%d %H:%M:%S}'.format(datetime.now().replace(minute=0,second=0,microsecond=0)-timedelta(hours=0)),
#                          )
print response
# response = rgw.get_usage(uid='date2', show_summary=True, show_entries=True,
#                          start='{:%Y-%m-%d %H:%M:%S}'.format(datetime.now() - timedelta(hours=16)))
# response = rgw.get_usage(uid='date2',show_summary=True,show_entries=True,start="2016-07-22 00:00:00")
# print response


# response = rgw.get_usage(uid='date2',show_summary=True,show_entries=True,start="2016-07-22 08:00:00")
# print response

#rep = rgw.create_key(uid='admin',generate_key=True,subuser='admin-subuser4',key_type='s3',access_key="admin-subuser4",secret_key='admin-subuser4')
# rgw.remove_subuser(uid='uid_test2',subuser='ngnn',purge_keys=True)
#

# rgw.get_users()

# response = rgw.create_subuser(uid='wc',subuser='wc:swift2',generate_secret=True,access='full')
# print response

# print sys.getsizeof(response)

boto

import boto
import boto.s3.connection
import boto; 
boto.set_stream_logger('boto')
from boto.s3.key import Key
host = 'yuliyangdebugweb68.tunnel.qydev.com'  #J
access_key = 'admin'
secret_key = 'admin'
conn = boto.connect_s3(
                access_key,
                secret_key,
                host = host,
                is_secure = False,
                # port = 7480,
                port=80,
                calling_format = boto.s3.connection.OrdinaryCallingFormat(),
                )
bucket = conn.get_bucket("yuliyang")
hello_key = bucket.new_key('hello.txt')
hello_key.set_contents_from_string('Hello World!')
# hello_key = bucket.get_key('hello.txt')
hello_key.set_canned_acl('public-read')
plans_key = bucket.new_key('secret_plans.txt')
plans_key.set_contents_from_string('Hello World!')
# plans_key.set_canned_acl('private')
# key = bucket.get_key('perl_poetry.pdf')

hello_key = bucket.get_key('hello.txt')
hello_url = hello_key.generate_url(600, query_auth=False, force_http=True)
print hello_url

plans_key = bucket.get_key('secret_plans.txt')
plans_url = plans_key.generate_url(3600, query_auth=True, force_http=True)
print plans_url


# bucket.set_acl('private')
#bucket.get_acl()
# bucket.get_acl('from-yly-test-full')
# keydown = bucket.get_key("from-yly-test-full")
# print keydown.get_contents_as_string()
# conn.get_all_buckets()
# bucket2 = conn.get_bucket("private-bucket")    #head
# bucket2.set_acl("private")
# bucket2.get_key("16K").set_acl("public-read")
# conn.delete_bucket("h-put-bucket-private")
# for bucket in conn.get_all_buckets():
#     print bucket.name


import boto
import boto.s3.connection
import boto;
boto.set_stream_logger('boto')
import os
import math
from boto.s3.key import Key
import requests
import logging
# boto.set_stream_logger('boto')
from awsauth import S3Auth
host = '10.254.3.68:7480'  #J
# host = 'yuliyangdebugweb001.tunnel.qydev.com'   #H
access_key = 'yuliyangtest4'
secret_key = 'yuliyangtest4'
# access_key = 'yly-test1'
# secret_key = 'yly-test1'
# access_key = 'D5K1G7NOD4385ITYCOPO'         #yly-test1 write only
# secret_key = 'm8dlnoy0xW69zcNcUSACBcXN7DfgDEYb9DDWiodq'

# access_key = 'C28HCACVLHDV6FGHK91Y'   #read only
# secret_key = 'UNcGb834F8GAYsB7Ww0VpdZB2Er9ComoJX8UQoyL'


conn = boto.connect_s3(
                access_key,
                secret_key,
                host = host,
                is_secure = False,
                port = 7480,
                # port=80,
                calling_format = boto.s3.connection.OrdinaryCallingFormat(),
                )
bucket = conn.get_bucket("public-test")
print bucket

# keyv1 = bucket.get_all_versions()

# bucket.set_acl('private')
# bucket.get_acl()
# bucket.get_acl('from-yly-test-full')
# keydown = bucket.get_key("from-yly-test-full")
# print keydown.get_contents_as_string()
# conn.get_all_buckets()
# bucket2 = conn.get_bucket("private-bucket")    #head
# bucket2.set_acl("private")
# bucket2.get_key("16K").set_acl("public-read")
# conn.delete_bucket("h-put-bucket-private")
# for bucket in conn.get_all_buckets():
#     print bucket.name



import boto
import boto.s3.connection
from boto.s3.key import Key
host = '10.254.3.68'
access_key = 'yuliyangtest4'
secret_key = 'yuliyangtest4'
conn = boto.connect_s3(
                access_key,
                secret_key,
                host = host,
                is_secure = False,
                port = 7480,
                # port=80,
                calling_format = boto.s3.connection.OrdinaryCallingFormat(),
                )
# print conn.get_all_buckets()

# bucket = conn.create_bucket(bucket_name="xxxxxxxxxsadasd",location="shanghai")
# print bucket

print conn.get_bucket("xxxxxxxxxsadasd").get_location()

# bucket = conn.get_bucket("multi")  #head  request
# k2 = bucket.get_all_multipart_uploads()
# Lighthouse = bucket.new_key("init-multi")
# Lighthouse.set_contents_from_filename("small.txt")
# copykey = bucket.get_key("small.txt")
# bucket.copy_key("n_small","objects","small")
# bucket.delete_key("n_small")
# save = bucket.get_key("small")

# save.get_contents_to_filename("save-small")
# bucket.get_versioning_status()
# print bucket.get_location()
# print bucket.set_acl("public-read")

# bucket.get_all_multipart_uploads()

# aclkey = bucket.get_key("aclobj")
# print aclkey.set_acl("public-read")

# key_new = bucket.initiate_multipart_upload("init-multi")
# print key_new





# key_new = bucket.initiate_multipart_upload("multi")
# # key_new.set_contents_from_filename("hello.py")
# fp = open('part1','rb')
# key_new.upload_part_from_file(fp,1)
# fp.close()
# fp = open('part2','rb')
# key_new.upload_part_from_file(fp,2)
# fp.close()
# # for part in key_new:
# #     print part.part_number,part.size
# fp = open('part3','rb')
# key_new.upload_part_from_file(fp,3)
# fp.close()
# for part in key_new:
#     print part.part_number,part.size
#
# key_new.complete_upload()


# for bucket in conn.get_all_buckets():
#     print bucket.name

# bucket.configure_versioning(True)

# print "=====================version test==========================="
# for key in bucket.list_versions():
#         print "{name}\t{modified}\t{version_id}".format(
#                 name = key.name,
#                 modified = key.last_modified,
#                 version_id=key.version_id,
#                 )

# bucket = conn.get_bucket('version-bucket')
# bucket.set_canned_acl("public-read")
# print bucket.get_location()

#bucket = conn.create_bucket('my-new-bucket')
#for bucket in conn.get_all_buckets():
#        print "{name}\t{created}".format(
#                name = bucket.name,
#                created = bucket.creation_date,
#)
#conn.delete_bucket('lyhtest')
#print bucket.get_acl()
#bucket.set_canned_acl('public-read')
#print bucket.get_acl()
#for key in bucket.list():
#        print "{name}\t{size}\t{modified}".format(
#                name = key.name,
#                size = key.size,
#                modified = key.last_modified,
#                )
#
# bucket.configure_versioning(False)
# bucket.configure_versioning(True)
# print "version status:",bucket.get_versioning_status()
# 
# print "=====================version test==========================="
# for key in bucket.list_versions():
#         print "{name}\t{modified}\t{version_id}".format(
#                 name = key.name,
#                 modified = key.last_modified,
#                 version_id=key.version_id,
#                 )
# key_del = bucket.get_key("version.txt")
# key_del.delete()
# key2 = bucket.get_key("version.txt")

# key2.delete()
# bucket.delete_key("version.txt",version_id="Sl6n929s6SYBpAt6gCzYSZ-hrIg7TK.")
# print key2.get_contents_as_string()
# all_bucket = conn.get_all_buckets()
# print "done"
# conn.create_bucket("bucket-j-create01")
# print bucket.get_policy()
# print "list:",bucket.list()
# key_new = bucket.initiate_multipart_upload("multi-part-upload.pdf")
# # key_new.set_contents_from_filename("hello.py")
# fp = open('xaa','rb')
# key_new.upload_part_from_file(fp,1)
# fp.close()
# fp = open('xab','rb')
# key_new.upload_part_from_file(fp,2)
# fp.close()
# fp = open('xac','rb')
# key_new.upload_part_from_file(fp,3)
# fp.close()
# for part in key_new:
#     print part.part_number,part.size
#
# key_new.complete_upload()
# multi-part upload
# filename = os.path.basename("s3test.tar.gz")
# k = bucket.new_key(filename)
# mp = bucket.initiate_multipart_upload(filename)
#
# source_size = os.stat("s3test.tar.gz").st_size
# bytes_per_chunk = 5 * 1024 * 1024 #5M per chunk
# chunks_count = int(math.ceil(source_size / float(bytes_per_chunk)))
#
# for i in range(chunks_count):
#     offset = i * bytes_per_chunk
#     remaining_bytes = source_size - offset
#     bytes = min([bytes_per_chunk, remaining_bytes])
#     part_num = i + 1
#     print "uploading part " + str(part_num) + " of " + str(chunks_count)
#     with open("s3test.tar.gz", 'rb') as fp:
#         fp.seek(offset)
#         mp.upload_part_from_file(fp=fp, part_num=part_num, size=bytes)
# if len(mp.get_all_parts()) == chunks_count:
#     mp.complete_upload()
#     print "upload_file done"
# else:
#     mp.cancel_upload()
#     print "upload_file failed"

# for key in bucket.list():
#     print key.name,key.version_id,key.last_modified
        #if key.version_id == "4Sf93SMkc3xJmfMIdx3ZRHwzMk2lewJ":
        #    print "get"
        #    key.get_contents_to_filename("res")
        #print key.get_contents_as_string()
# print bucket.get_key("version.txt",version_id="Sl6n929s6SYBpAt6gCzYSZ-hrIg7TKsss.").exists()

# k = bucket.get_key("s3test.tar.gz",version_id="9KwCNtPZfqfkedCbh27czFHRQGndzQo")
# bucket.set_acl("public-read","s3test.tar.gz",headers=None,version_id="9KwCNtPZfqfkedCbh27czFHRQGndzQo")
# # k = bucket.get_key("multi-part-upload.pdf")
# k.set_acl("public-read")
# print k.get_acl()
# # bucket.set_acl('public-read',"s3test.tar.gz")
# print k.get_acl()
# # k.copy("bucket-j-create01","s3test-cp.tar.gz")
# k.copy("grant-all","copy-form-admin-s3test2.tar.gz",preserve_acl=True)
#server = '192.168.28.81:7480'
#url = 'http://%s/version-bucket/version.txt' % server
#logging.basicConfig(level=logging.DEBUG)
# r = requests.get(url, auth=S3Auth(access_key, secret_key))
#print r.content



# import requests
# import logging
# from requests_toolbelt.utils import dump
# from aws_requests_auth.aws_auth import AWSRequestsAuth
# from awsauth import S3Auth
#
# # host = 'yuliyangdebugwebhammer.tunnel.qydev.com'
# # host = 'yuliyangdebugwebjewel.tunnel.qydev.com'
# logging.basicConfig(level=logging.DEBUG)
# # host = 'yuliyangdebugweb68.tunnel.qydev.com'
# host = '10.254.9.13:7480'
# # host = 'yuliyangdebugweb68.tunnel.qydev.com'
# # access_key = 'yly-test1'
# # secret_key = 'yly-test1'
#
# # access_key = 'D5K1G7NOD4385ITYCOPO'
# # secret_key = 'm8dlnoy0xW69zcNcUSACBcXN7DfgDEYb9DDWiodq'
# access_key = 'yuliyang'
# secret_key = 'yuliyang'
# cmd = '/yuliyangtest2?location'
# # /admin/bucket?format=json&uid=admin&stats=True
# # cmd = '/admin/user?info&uid=admin&stats=True'
# # cmd = '/admin/metadata/bucket?format=json&key=testtesttest&stats=True'
# # cmd = '/admin/bucket?info&uid=yuliyang'

# # cmd = '/admin/metadata/user?format=json'
# # cmd = '/admin/user?info&uid=yuliyang&stats=True'
# # cmd = '/admin/bucket?index&format=json&bucket=yuliyang_b1'
# # cmd = '/admin/user?format=json&uid=yuliyang_emailtest2&display-name=yuliyang_emailtest2&email=yuliyang_emailtest@test.com'
# # cmd = '/admin/bucket?format=json&stats=True'
#
# # cmd = '/admin/bucket?info&uid=admin'
# # cmd = '/admin/user?info&uid=admin&stats=True'
# # cmd = '/admin/usage?format=json&uid=admin&start=2016-06-10%2001:00:00&end=2016-10-10%2001:00:00&show-entries=False&show-summary=True'
# # cmd = '/admin/usage?format=json&uid=yly&start=2016-06-10%2001:00:00&end=2016-08-10%2001:00:00&show-entries=False&show-summary=True'
# # cmd = '/admin/user?uid=admin&stats=True'
# # cmd = '/admin/realm/period'
# # cmd = '/admin/zonegroup?rgw-zonegroup=us'
# # cmd = '/admin/realm?zonegroup'
# # cmd = '/admin/zonegroup?list'
# # cmd = '/admin/bucket?info&uid=date1&stats=True'
# # cmd = '/admin/bucket?info&stats=True&show-summary=True'
# # cmd = '/admin/bucket?info&uid=date1'
# url = 'http://%s%s' % (host,cmd)
# # url = 'http://%s' % host
# response = requests.get(url, auth=S3Auth(access_key, secret_key,service_url=host))
# data = dump.dump_all(response)
# print response.content
# print(data.decode('utf-8'))
# import json
# # print json.loads(response.content)['stats']
#
# import sys
#
# # import requests
# # from awsauth import S3Auth
# # host = 'yuliyangdebugweb68.tunnel.qydev.com'
# # access_key = 'admin'
# # secret_key = 'admin'
# # cmd = '/bucket/object'
# # url = 'http://%s%s' % (host,cmd)
# # response = requests.get(url, auth=S3Auth(access_key, secret_key,service_url=host))
# # data = dump.dump_all(response)
# # print(data.decode('utf-8'))

# # import requests
# # import logging
# # from requests_toolbelt.utils import dump
# # from aws_requests_auth.aws_auth import AWSRequestsAuth
# # from awsauth import S3Auth
# # logging.basicConfig(level=logging.DEBUG)
# # host = '10.254.9.13:7480'
# # access_key = 'yuliyang'
# # secret_key = 'yuliyang'
# # cmd = '/yuliyangtest2?location'
# # url = 'http://%s%s' % (host,cmd)
# # # url = 'http://%s' % host
# # response = requests.get(url, auth=S3Auth(access_key, secret_key,service_url=host))
# # data = dump.dump_all(response)
# # print response.content
# # print(data.decode('utf-8'))

# # import requests
# import requesocks as requests
# session = requests.session()
# session.proxies = {
# 'http': 'socks5://127.0.0.1:1080',
# 'https': 'socks5://127.0.0.1:1080'
# }
# from requests_toolbelt.utils import dump
# from aws_requests_auth.aws_auth import AWSRequestsAuth
# from awsauth import S3Auth
# # host = '10.254.9.13:7480'
# # host = '45.32.28.128:7480'
# #host = "yuliyangdebugweb7480.tunnel.qydev.com"
# #host = "s3.amazonaws.com"
# host = "s3-us-west-2.amazonaws.com"
# access_key = 'AKIAIOHGX57FOXYYYOBQ'
# secret_key = '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'
# cmd = '/yuliyangtest2?location'
# cmd = '/jwj-test-1?location'
# cmd = '/regionj1?location'
# # cmd = '/'
# url = 'http://%s%s' % (host,cmd)
# response = session.get(url, auth=S3Auth(access_key, secret_key,service_url=host))
# print response.content
# data = dump.dump_all(response)
# print response.content
# print(data.decode('utf-8'))
# access_key = 'AKIAIOHGX57FOXYYYOBQ'
# secret_key = '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'
# import requests
# print(requests.get('https://www.google.com', timeout=24).text)



import requests
import logging
from requests_toolbelt.utils import dump
from awsauth import S3Auth
host = '10.254.3.68:7480'
logging.basicConfig(level=logging.DEBUG)
access_key = 'admin'
secret_key = 'admin'
cmd = '/permission2-b1/from-admin-full'
url = 'http://%s%s' % (host,cmd)
# url = 'http://%s' % host
# response = requests.put(url, data="ssssssssssssssss",auth=S3Auth(access_key, secret_key,service_url=host))
data ="new key"
response = requests.put(url,data=data,auth=S3Auth(access_key, secret_key,service_url=host))
data = dump.dump_all(response)
print(data.decode('utf-8'))

import requests
import logging
from requests_toolbelt.utils import dump
from awsauth import S3Auth
logging.basicConfig(level=logging.DEBUG)
host = '192.168.10.146:7480'
access_key = 'admin2'
secret_key = 'admin2'
cmd = '/admin/user?quota&format=json&uid=admin&quota-type=user&max-size-kb=1002400&max-objects=2200&enabled=true'
url = 'http://%s%s' % (host,cmd)
response = requests.put(url,auth=S3Auth(access_key, secret_key,service_url=host))
data = dump.dump_all(response)
print(data.decode('utf-8'))

cmd = '/admin/user?quota&format=json&uid=admin&quota-type=user'
url = 'http://%s%s' % (host,cmd)
response = requests.get(url,auth=S3Auth(access_key, secret_key,service_url=host))
data = dump.dump_all(response)
print(data.decode('utf-8'))

© 著作权归作者所有

上一篇: 安装dns服务器
wzyuliyang

wzyuliyang

粉丝 4
博文 20
码字总数 16746
作品 1
苏州
程序员
私信 提问
加载中

评论(0)

利用AWS boto实现EC2 存储卷的自动快照

boto是AWS的Python SDK,可以利用boto自动生成ec2的存储卷快照,实现ec2数据的备份 1.安装boot pip install boto ,如果没有安装pip,需要提前安装 2.配置boto配置文件 ~/.aws/credentials #设...

weidabao123
2017/05/06
0
0
如何使用boto,ec2.py,ec2.ini采集亚马逊的云主机信息

众所周知,亚马逊是目前最好的云服务,但也是最贵的,现在我们使用 首先应该先装好ansible,boto, 如果你是亚马逊中国的用户,需要修改ec2.ini的2个配置 改为以上配置 地区列表请参考 http...

jastme
2016/06/22
262
0
python调用aws接口添加安全组策略

介绍: 1.aws安全组策略:协议、端口、流入流量、流出流量2.aws 控制python库: boto3,需先安装。3.脚本作用:获取本地外网IP-----》添加到指定安全组 代码:...

Forver_Liu
2018/01/03
0
0
利用bucket localtion实现rgw集群扩容

扩容思路:ceph集群容量不足的时候,新加的OSD组成新的pool,制定新的rule规则,新建的bucket都存放在这些新加入的pool里面,注意扩容仅限于新加的bucket,已有的bucket扩容不适用。 1.新建p...

秦牧羊
2016/07/07
523
1
WebRtc最新编译,最新版本2017

系统环境:win10 vs版本:vs2015 一:在客户端使用ShadowSocks作为代理! 我直接购买的ShadowSocks的服务器版本!10元人民币一个月左右。可以慢慢下!足够用了! 如果实在不想买,也可以自己搭...

rootusers
2017/01/02
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Argo 项目加入 CNCF 孵化器 | 云原生生态周报 Vol. 45

作者 | 陈洁、高相林、陈有坤、敖小剑 业界要闻 Argo 项目加入 CNCF 孵化器 Argo 项目是一组 Kubernetes 原生工具,用于运行和管理 Kubernetes 上的作业和应用程序。目前由 Argo Workflows,...

阿里巴巴云原生
刚刚
0
0
css-方形边框四角

项目中遇到下图这种样式,刚开始想切图解决 后来想到更好的解决办法,代码如下: HTML: <div class="BoxWrap"> <div class="horn"> <div class="lt"></div> <div c......

osc_vo0yi5f8
1分钟前
0
0
离职10天,面挂4家公司!

作者:莫那鲁道 来源:http://1t.click/U4g 楼主离职已有 10 天,这段时间里除了看源码,就是投简历面试了。一共面试了 4 家,说说感受。 # XX 汇 XX 汇是一家小型的电商网站,由于楼主的技术...

Java技术栈
1分钟前
3
0
Navicat12 for Mysql激活

1 下载 注册机和Navicat网盘下载地址 链接:https://pan.baidu.com/s/1AFpQIlHCXVHc8OuBZ9PAlA 提取码:xvi2 2 安装 2.1 安装Navicat 安装完成后,将Navicat_Keygen_Patch_v4.8_By_DFoX复制到...

osc_mra0q9h6
2分钟前
0
0
学习第一周

监督学习(他们中有标签加以区分) 回归算法 我们给出一个数据集,里面包含了正确的答案,假如我们给他一个房价的数据集,在这个数据集中的每个样本,我们都给出正确的答案(房子的实际价格)...

osc_ocl8o73l
3分钟前
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部