文档章节

【原创】OpenStack Swift源码分析(六)object服务

zhouxingxing
 zhouxingxing
发布于 2012/10/22 10:07
字数 1228
阅读 1216
收藏 3

    object-server用来接收用户发送的object请求(即文件的请求,比如上传,下载,删除等)。例如上传一个文件,proxy通过ring文件随机选择三个object-server,然后转发请求到object-server,然后object-server接收请求,做最终处理(例如存储文件的操作) 。

    object-server的__call__方法主要用来判断请求的方法,然后执行。


def __call__(self, env, start_response):
        """WSGI Application entry point for the Swift Object Server."""
        start_time = time.time()
        req = Request(env)
        self.logger.txn_id = req.headers.get('x-trans-id', None)

        if not check_utf8(req.path_info):#不支持UTF8
            res = HTTPPreconditionFailed(body='Invalid UTF8')
        else:
            try:
                # disallow methods which have not been marked 'public'
                try:#判断方法是否是共有的方法
                    method = getattr(self, req.method)
                    getattr(method, 'publicly_accessible')
                except AttributeError:
                    res = HTTPMethodNotAllowed()
                else:
                    res = method(req)#执行方法,例如PUT
            except (Exception, Timeout):
                self.logger.exception(_('ERROR __call__ error with %(method)s'
                    ' %(path)s '), {'method': req.method, 'path': req.path})
                res = HTTPInternalServerError(body=traceback.format_exc())
        trans_time = time.time() - start_time
        if self.log_requests:
            log_line = '%s - - [%s] "%s %s" %s %s "%s" "%s" "%s" %.4f' % (
                req.remote_addr,
                time.strftime('%d/%b/%Y:%H:%M:%S +0000',
                              time.gmtime()),
                req.method, req.path, res.status.split()[0],
                res.content_length or '-', req.referer or '-',
                req.headers.get('x-trans-id', '-'),
                req.user_agent or '-',
                trans_time)
            if req.method == 'REPLICATE':
                self.logger.debug(log_line)
            else:
                self.logger.info(log_line)
        if req.method in ('PUT', 'DELETE'):
            slow = self.slow - trans_time
            if slow > 0:
                sleep(slow)
        return res(env, start_response)

  

    /swift/obj/server.py中的DiskFile类 用来初始化object,其中包括了一切对于一个object的操作,文件上传,会先存储在,tmp中(临时文件),等到文件上传完毕,存储在相应的路径中,关于路径,在DiskFile初始化实例的时候,会生成object的路径,通过哈希算法保证路径的唯一并且随机性。其中把生成的哈希值保存在hashes.pkl文件中。object 生成后,会通知对应的container,然后container会通知对应的account,这样account/container/object就上传成功了。



@public
    def PUT(self, request):
        """Handle HTTP PUT requests for the Swift Object Server."""
        start_time = time.time()
        try:
            device, partition, account, container, obj = \#得到device,partition,account,container,obj等相关信息
                split_path(unquote(request.path), 5, 5, True)
            validate_device_partition(device, partition)
        except ValueError, err:
            self.logger.increment('PUT.errors')
            return HTTPBadRequest(body=str(err), request=request,
                        content_type='text/plain')
        if self.mount_check and not check_mount(self.devices, device):#是否检查挂载
            self.logger.increment('PUT.errors')
            return HTTPInsufficientStorage(drive=device, request=request)
        if 'x-timestamp' not in request.headers or \#如果没有时间戳,或者时间戳不合法
                    not check_float(request.headers['x-timestamp']):
            self.logger.increment('PUT.errors')
            return HTTPBadRequest(body='Missing timestamp', request=request,
                        content_type='text/plain')
        error_response = check_object_creation(request, obj)#检查要创建的obj是否正确,包括它的元数据。如果都正确返回None
        if error_response:
            self.logger.increment('PUT.errors')
            return error_response
        new_delete_at = int(request.headers.get('X-Delete-At') or 0)
        if new_delete_at and new_delete_at < time.time():#判断dalete_at
            self.logger.increment('PUT.errors')
            return HTTPBadRequest(body='X-Delete-At in past', request=request,
                                  content_type='text/plain')
        file = DiskFile(self.devices, device, partition, account, container,#一切OK,实例化,文件的类
                        obj, self.logger, disk_chunk_size=self.disk_chunk_size)
        orig_timestamp = file.metadata.get('X-Timestamp')
        upload_expiration = time.time() + self.max_upload_time
        etag = md5()
        upload_size = 0
        last_sync = 0
        with file.mkstemp() as (fd, tmppath):#创建临时文件
            if 'content-length' in request.headers:
                try:
                    fallocate(fd, int(request.headers['content-length']))#预留硬盘空间
                except OSError:
                    return HTTPInsufficientStorage(drive=device,
                                                   request=request)
            reader = request.environ['wsgi.input'].read
            for chunk in iter(lambda: reader(self.network_chunk_size), ''):#写数据
                upload_size += len(chunk)
                if time.time() > upload_expiration:
                    self.logger.increment('PUT.timeouts')
                    return HTTPRequestTimeout(request=request)
                etag.update(chunk)
                while chunk:
                    written = os.write(fd, chunk)
                    chunk = chunk[written:]
                # For large files sync every 512MB (by default) written
                if upload_size - last_sync >= self.bytes_per_sync:
                    tpool.execute(os.fdatasync, fd)
                    drop_buffer_cache(fd, last_sync, upload_size - last_sync)
                    last_sync = upload_size
                sleep()

            if 'content-length' in request.headers and \
                    int(request.headers['content-length']) != upload_size:
                return HTTPClientDisconnect(request=request)
            etag = etag.hexdigest()
            if 'etag' in request.headers and \
                            request.headers['etag'].lower() != etag:
                return HTTPUnprocessableEntity(request=request)
            metadata = {
                'X-Timestamp': request.headers['x-timestamp'],
                'Content-Type': request.headers['content-type'],
                'ETag': etag,
                'Content-Length': str(os.fstat(fd).st_size),
            }
            metadata.update(val for val in request.headers.iteritems()
                    if val[0].lower().startswith('x-object-meta-') and
                    len(val[0]) > 14)
            for header_key in self.allowed_headers:
                if header_key in request.headers:
                    header_caps = header_key.title()
                    metadata[header_caps] = request.headers[header_key]
            old_delete_at = int(file.metadata.get('X-Delete-At') or 0)
            if old_delete_at != new_delete_at:
                if new_delete_at:
                    self.delete_at_update('PUT', new_delete_at, account,
                        container, obj, request.headers, device)
                if old_delete_at:
                    self.delete_at_update('DELETE', old_delete_at, account,
                        container, obj, request.headers, device)
            file.put(fd, tmppath, metadata)#把生成的临时文件,进行完善,然后rename到时间的位置上去。
        file.unlinkold(metadata['X-Timestamp'])#删除旧版本的文件
        if not orig_timestamp or \#更新container,
                orig_timestamp < request.headers['x-timestamp']:
            self.container_update('PUT', account, container, obj,
                request.headers,
                {'x-size': file.metadata['Content-Length'],
                 'x-content-type': file.metadata['Content-Type'],
                 'x-timestamp': file.metadata['X-Timestamp'],
                 'x-etag': file.metadata['ETag'],
                 'x-trans-id': request.headers.get('x-trans-id', '-')},
                device)
        resp = HTTPCreated(request=request, etag=etag)
        self.logger.timing_since('PUT.timing', start_time)
        return resp



DiskFile.put

def put(self, fd, tmppath, metadata, extension='.data'):
        metadata['name'] = self.name
        timestamp = normalize_timestamp(metadata['X-Timestamp'])
        write_metadata(fd, metadata)#把元数据写入文件中,
        if 'Content-Length' in metadata:
            self.drop_cache(fd, 0, int(metadata['Content-Length']))
        tpool.execute(os.fsync, fd)
        invalidate_hash(os.path.dirname(self.datadir))#生成hashes.pkl
        renamer(tmppath, os.path.join(self.datadir, timestamp + extension))#重命名文件
        self.metadata = metadata



通知对应的container

def container_update(self, op, account, container, obj, headers_in,
                         headers_out, objdevice):
        host = headers_in.get('X-Container-Host', None)
        partition = headers_in.get('X-Container-Partition', None)
        contdevice = headers_in.get('X-Container-Device', None)
        if not all([host, partition, contdevice]):#判断数据的完整性
            return
        self.async_update(op, account, container, obj, host, partition,
                          contdevice, headers_out, objdevice)
aync方法,成功就返回,失败,进行处理,


aync方法,成功就返回,失败,进行处理,把数据写到aync_dir里,然后等待后台的更新器来更新。

def async_update(self, op, account, container, obj, host, partition,
                     contdevice, headers_out, objdevice):
        full_path = '/%s/%s/%s' % (account, container, obj)
        if all([host, partition, contdevice]):
            try:
                with ConnectionTimeout(self.conn_timeout):
                    ip, port = host.rsplit(':', 1)
                    conn = http_connect(ip, port, contdevice, partition, op,#建立连接
                            full_path, headers_out)
                with Timeout(self.node_timeout):
                    response = conn.getresponse()
                    response.read()
                    if is_success(response.status):#成功就返回
                        return
                    else:
                        self.logger.error(_('ERROR Container update failed '
                            '(saving for async update later): %(status)d '
                            'response from %(ip)s:%(port)s/%(dev)s'),
                            {'status': response.status, 'ip': ip, 'port': port,
                             'dev': contdevice})
            except (Exception, Timeout):
                self.logger.exception(_('ERROR container update failed with '
                    '%(ip)s:%(port)s/%(dev)s (saving for async update later)'),
                    {'ip': ip, 'port': port, 'dev': contdevice})
        async_dir = os.path.join(self.devices, objdevice, ASYNCDIR)#失败处理
        ohash = hash_path(account, container, obj)
        self.logger.increment('async_pendings')
        write_pickle(
            {'op': op, 'account': account, 'container': container,
                'obj': obj, 'headers': headers_out},
            os.path.join(async_dir, ohash[-3:], ohash + '-' +
                normalize_timestamp(headers_out['x-timestamp'])),
            os.path.join(self.devices, objdevice, 'tmp'))



© 著作权归作者所有

共有 人打赏支持
zhouxingxing
粉丝 71
博文 69
码字总数 48470
作品 0
杭州
程序员
私信 提问
openstack学习之路-level1-1

Openstack各组件介绍 Dashboard 项目名horizon 提供openstack的交互界面,可以在交互界面中管理openstack,比如启动一个实例,配置IP地址,配置用户认证等 Compute 项目名nova 负责openstack...

gyj0825
06/26
0
0
openstack学习之路-level1-1

Openstack各组件介绍 Dashboard 项目名horizon 提供openstack的交互界面,可以在交互界面中管理openstack,比如启动一个实例,配置IP地址,配置用户认证等 Compute 项目名nova 负责openstack...

gyj0825
06/26
0
0
云计算相关资料/博客/网上收集的关于OpenStack的一些资源

OpenStack Nova code:https://bugs.launchpad.net/nova OpenStack Blog:http://planet.openstack.org/ OpenStack 官方文档:http://docs.openstack.org/cactus/openstack-compute/admin/co......

DongZhang
2012/02/02
0
0
openstack 和hadoop的区别

(一)、openstack仿照的Amazon的云,hadoop仿照的是Google的云 openstack注重的是虚拟化/虚拟机及其配套的服务,hadoop注重的是海量的数据分析和处理。 (二)、OpenStack 主要目的是做一整...

寰宇01
09/07
0
0
OpenStack的各个组件作用及关系

一、OpenStack认证服务----->Keystone Keystone为所有的OpenStack组件提供认证和访问策略服务,它依赖自身REST(基于Identity API)系统进行工作,主要对(但不限于)Swift、Glance、Nova等进...

I_BUG
04/28
0
0

没有更多内容

加载失败,请刷新页面

加载更多

什么是自然语言处理技术

自然语言处理(NLP)是计算机科学,人工智能,语言学关注计算机和人类(自然)语言之间的相互作用的领域。自然语言处理是计算机科学领域与人工智能领域中的一个重要方向。它研究能实现人与计...

本宫没空2
20分钟前
2
0
移动端关闭虚拟键盘

那么document.activeElement.blur()为什么可以阻止虚拟键盘弹出呢?原因是:当你点击input的时候,document.activeElement获得了DOM中被聚焦的元素,也就是你点击的input,而调用.blur()方法...

niuhongxia
20分钟前
3
0
Ubuntu18.04安装RabbitMQ(正确安装)

1、安装erlang 由于rabbitMq需要erlang语言的支持,在安装rabbitMq之前需要安装erlang sudo apt-get install erlang-nox 2、安装Rabbitmq 更新源 sudo apt-get update 安装 sudo apt-get ins...

hansonwong
30分钟前
2
0
如何在以太坊开发发行自己的ERC-20数字货币

今天我将向你展示如何在以太坊区块链上开发你自己的加密货币并将其出售!我将向你展示如何使用以太坊智能合约逐步创建自己的ERC-20代币和众筹销售,如何测试智能合约,如何将智能合约部署到以...

geek12345
30分钟前
3
0
Vlock用于有多个用户访问控制台的共享 Linux 系统

当你在共享的系统上工作时,你可能不希望其他用户偷窥你的控制台中看你在做什么。如果是这样,我知道有个简单的技巧来锁定自己的会话,同时仍然允许其他用户在其他虚拟控制台上使用该系统。 ...

linuxprobe16
31分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部