Swift Proxy 调用流程(源码剖析)
阅读量:4090 次

本文共 27291 字,大约阅读时间需要 90 分钟。

  Swift启用WSGI服务的事件循环队列pipeline: catch_errors, proxy-logging, cache, authtoken, keystone, (slo), proxy-server。通过proxy-server的服务入口点,实现请求的具体处理和响应。


def __call__(self, env, start_response):        """        WSGI 服务入口点        :param env: WSGI 环境变量(字典类型)        :param start_response: WSGI 可调用对象        """        # 获取环境变量env中的'swift.cache',若为None,直接报错返回        # 若req.headers中只有'x-storage-token',则用req.headers的'x-auth-token'来更新        # 进而调用handle_request进行具体的执行和转发        try:            if self.memcache is None:                self.memcache = cache_from_env(env, True)            req = self.update_request(Request(env))            return self.handle_request(req)(env, start_response)        except UnicodeError:            err = HTTPPreconditionFailed(                request=req, body='Invalid UTF8 or contains NULL')            return err(env, start_response)        except (Exception, Timeout):            start_response('500 Server Error',                           [('Content-Type', 'text/plain')])            return ['Internal server error.\n']






def handle_request(self, req):        """        Entry point for proxy server.        Should return a WSGI-style callable (such as swob.Response).        :param req: swob.Request object        """        try:            self.logger.set_statsd_prefix('proxy-server')            if req.content_length and req.content_length < 0:                self.logger.increment('errors')                return HTTPBadRequest(request=req,                                      body='Invalid Content-Length')            try:                if not check_utf8(req.path_info):                    self.logger.increment('errors')                    return HTTPPreconditionFailed(                        request=req, body='Invalid UTF8 or contains NULL')            except UnicodeError:                self.logger.increment('errors')                return HTTPPreconditionFailed(                    request=req, body='Invalid UTF8 or contains NULL')            try:                # 根据req.path的信息返回对应的控制器类和字典(由版本、Account、Container和Object名组成)                # 若req.path是/info,则返回InfoController;                # 若req.path中account、container、object都存在,则返回ObjectController                # 若req.path中只有account、container,则返回ObjectController                # 若req.path中只有account,则返回AccountController                controller, path_parts = self.get_controller(req)                p = req.path_info                if isinstance(p, unicode):                    p = p.encode('utf-8')            except APIVersionError:                self.logger.increment('errors')                return HTTPBadRequest(request=req)            except ValueError:                self.logger.increment('errors')                return HTTPNotFound(request=req)            if not controller:                self.logger.increment('errors')                return HTTPPreconditionFailed(request=req, body='Bad URL')            if self.deny_host_headers and \                    req.host.split(':')[0] in self.deny_host_headers:                return HTTPForbidden(request=req, body='Invalid host header')            self.logger.set_statsd_prefix('proxy-server.' +                                          controller.server_type.lower())            # 用get_controller返回的字典 实例化具体的控制器类对象            controller = controller(self, **path_parts)            if 'swift.trans_id' not in req.environ:                # if this wasn't set by an earlier middleware, set it now                trans_id_suffix = self.trans_id_suffix                trans_id_extra = req.headers.get('x-trans-id-extra')                if trans_id_extra:                    trans_id_suffix += '-' + trans_id_extra[:32]                trans_id = generate_trans_id(trans_id_suffix)                req.environ['swift.trans_id'] = trans_id                self.logger.txn_id = trans_id            req.headers['x-trans-id'] = req.environ['swift.trans_id']            controller.trans_id = req.environ['swift.trans_id']            self.logger.client_ip = get_remote_client(req)            try:                # 获取具体控制器类中由request指定的方法                handler = getattr(controller, req.method)                getattr(handler, 'publicly_accessible')            except AttributeError:                allowed_methods = getattr(controller, 'allowed_methods', set())                return HTTPMethodNotAllowed(                    request=req, headers={'Allow': ', '.join(allowed_methods)})            old_authorize = None            if 'swift.authorize' in req.environ:                # We call authorize before the handler, always. If authorized,                # we remove the swift.authorize hook so isn't ever called                # again. If not authorized, we return the denial unless the                # controller's method indicates it'd like to gather more                # information and try again later.                resp = req.environ['swift.authorize'](req)                if not resp and not req.headers.get('X-Copy-From-Account') \                        and not req.headers.get('Destination-Account'):                    # No resp means authorized, no delayed recheck required.                    old_authorize = req.environ['swift.authorize']                else:                    # Response indicates denial, but we might delay the denial                    # and recheck later. If not delayed, return the error now.                    if not getattr(handler, 'delay_denial', None):                        return resp            # Save off original request method (GET, POST, etc.) in case it            # gets mutated during handling.  This way logging can display the            # method the client actually sent.            req.environ['swift.orig_req_method'] = req.method            try:                if old_authorize:                    req.environ.pop('swift.authorize', None)                # 执行具体控制器类中由request指定的方法                return handler(req)            finally:                if old_authorize:                    req.environ['swift.authorize'] = old_authorize        except HTTPException as error_response:            return error_response        except (Exception, Timeout):            self.logger.exception(_('ERROR Unhandled exception in request'))            return HTTPServerError(request=req)

  具体调用过程:请求到达proxy-server的服务入口点之后,在handle_request()中获取具体的控制器类(AccountController), 接着调用/swift-kilo-eol/swift/controllers/account.py下面的GET/HEAD/PUT/POST/DELETE等方法实现与具体存储服务(AccountServer)的连接,进而调用具体的GET/HEAD/PUT/POST/DELETE等方法实现请求的处理和相应



def GETorHEAD(self, req):        """Handler for HTTP GET/HEAD requests."""        # 处理HTTP GET/HEAD请求        if len(self.account_name) > constraints.MAX_ACCOUNT_NAME_LENGTH:            resp = HTTPBadRequest(request=req)            resp.body = 'Account name length of %d longer than %d' % \                        (len(self.account_name),                         constraints.MAX_ACCOUNT_NAME_LENGTH)            return resp        # 调用get_part获取经过一致性Hash取值和移位之后的Object存放的分区号        # 调用iter_nodes获取排除了HandOff以及Error的Object存放的节点号        # 调用GETorHEAD_base处理HTTP GET/HEAD请求 返回swob.Response对象        partition = self.app.account_ring.get_part(self.account_name)        node_iter = self.app.iter_nodes(self.app.account_ring, partition)        resp = self.GETorHEAD_base(            req, _('Account'), node_iter, partition,            req.swift_entity_path.rstrip('/'))        if resp.status_int == HTTP_NOT_FOUND:            if resp.headers.get('X-Account-Status', '').lower() == 'deleted':                resp.status = HTTP_GONE            elif self.app.account_autocreate:                resp = account_listing_response(self.account_name, req,                                                get_listing_content_type(req))        if req.environ.get('swift_owner'):            self.add_acls_from_sys_metadata(resp)        else:            for header in self.app.swift_owner_headers:                resp.headers.pop(header, None)        return resp


def HEAD(self, req):        """Handle HTTP HEAD request."""        # 处理HEAD请求,返回Account的基本信息,以KV的形式保存在HEAD中        drive, part, account = split_and_validate_path(req, 3)        out_content_type = get_listing_content_type(req)        # 进行mount检查        if self.mount_check and not check_mount(self.root, drive):            return HTTPInsufficientStorage(drive=drive, request=req)        # 返回一个AccountBroker实例,用于对sqlite数据的查询操作        broker = self._get_account_broker(drive, part, account,                                          pending_timeout=0.1,                                          stale_reads_ok=True)        if broker.is_deleted():            return self._deleted_response(broker, req, HTTPNotFound)        # get_response_headers内部调用get_info()获取Account的基本信息,并更新res的HEAD('X-Account-Container-Count','X-Account-Object-Count': info['object_count'],        # 'X-Account-Bytes-Used', 'X-Timestamp', 'X-PUT-Timestamp')        headers = get_response_headers(broker)        headers['Content-Type'] = out_content_type        return HTTPNoContent(request=req, headers=headers, charset='utf-8')


def GET(self, req):        """Handle HTTP GET request."""        # 处理GET请求,返回Account的基本信息,以KV的形式保存在HEAD中,但与HEAD不一样的是GET方法中获取了        # 指定Account下的Container列表,存储在Body中        # 调用机制与HEAD类似        drive, part, account = split_and_validate_path(req, 3)        prefix = get_param(req, 'prefix')        delimiter = get_param(req, 'delimiter')        if delimiter and (len(delimiter) > 1 or ord(delimiter) > 254):            # delimiters can be made more flexible later            return HTTPPreconditionFailed(body='Bad delimiter')        limit = constraints.ACCOUNT_LISTING_LIMIT        given_limit = get_param(req, 'limit')        if given_limit and given_limit.isdigit():            limit = int(given_limit)            if limit > constraints.ACCOUNT_LISTING_LIMIT:                return HTTPPreconditionFailed(                    request=req,                    body='Maximum limit is %d' %                    constraints.ACCOUNT_LISTING_LIMIT)        marker = get_param(req, 'marker', '')        end_marker = get_param(req, 'end_marker')        out_content_type = get_listing_content_type(req)        if self.mount_check and not check_mount(self.root, drive):            return HTTPInsufficientStorage(drive=drive, request=req)        broker = self._get_account_broker(drive, part, account,                                          pending_timeout=0.1,                                          stale_reads_ok=True)        if broker.is_deleted():            return self._deleted_response(broker, req, HTTPNotFound)        # 获取Account对应的Container列表,每个Container信息包括(name, object_count, bytes_used, 0),以list形式返回        # 以不同的形式返回Account对应的Container列表,最普通的是放在body中        return account_listing_response(account, req, out_content_type, broker,                                        limit, marker, end_marker, prefix,                                        delimiter)

def account_listing_response(account, req, response_content_type, broker=None,                             limit='', marker='', end_marker='', prefix='',                             delimiter=''):    if broker is None:        broker = FakeAccountBroker()    resp_headers = get_response_headers(broker)    # 获取Account对应的Container列表,每个Container信息包括(name, object_count, bytes_used, 0),以list形式返回    account_list = broker.list_containers_iter(limit, marker, end_marker,                                               prefix, delimiter)    # 以不同的形式返回Account对应的Container列表,最普通的是放在body中    if response_content_type == 'application/json':        data = []        for (name, object_count, bytes_used, is_subdir) in account_list:            if is_subdir:                data.append({'subdir': name})            else:                data.append({'name': name, 'count': object_count,                             'bytes': bytes_used})        account_list = json.dumps(data)    elif response_content_type.endswith('/xml'):        output_list = ['
', '
' % saxutils.quoteattr(account)] for (name, object_count, bytes_used, is_subdir) in account_list: if is_subdir: output_list.append( '
' % saxutils.quoteattr(name)) else: item = '
' \ '
' % \ (saxutils.escape(name), object_count, bytes_used) output_list.append(item) output_list.append('
') account_list = '\n'.join(output_list) else: if not account_list: resp = HTTPNoContent(request=req, headers=resp_headers) resp.content_type = response_content_type resp.charset = 'utf-8' return resp account_list = '\n'.join(r[0] for r in account_list) + '\n' ret = HTTPOk(body=account_list, request=req, headers=resp_headers) ret.content_type = response_content_type ret.charset = 'utf-8' return ret


def PUT(self, req):        """HTTP PUT request handler."""        # 处理HTTP PUT请求        if not self.app.allow_account_management:            return HTTPMethodNotAllowed(                request=req,                headers={'Allow': ', '.join(self.allowed_methods)})        error_response = check_metadata(req, 'account')        if error_response:            return error_response        if len(self.account_name) > constraints.MAX_ACCOUNT_NAME_LENGTH:            resp = HTTPBadRequest(request=req)            resp.body = 'Account name length of %d longer than %d' % \                        (len(self.account_name),                         constraints.MAX_ACCOUNT_NAME_LENGTH)            return resp        # 获取Account对应的分区号,及其对应的节点号(节点以元祖形式返回)        account_partition, accounts = \            self.app.account_ring.get_nodes(self.account_name)        # 根据原始请求头信息调用generate_request_headers生成新格式的请求头        headers = self.generate_request_headers(req, transfer=True)        clear_info_cache(self.app, req.environ, self.account_name)        # 调用make_requests,迭代发送多个HTTP请求到多个节点,并汇聚所有返回的相应结果;根据所有的响应信息,通过投票机制返回最佳响应信息        resp = self.make_requests(            req, self.app.account_ring, account_partition, 'PUT',            req.swift_entity_path, [headers] * len(accounts))        self.add_acls_from_sys_metadata(resp)        return resp

def make_requests(self, req, ring, part, method, path, headers,                      query_string='', overrides=None):        """        Sends an HTTP request to multiple nodes and aggregates the results.        It attempts the primary nodes concurrently, then iterates over the        handoff nodes as needed.        :param req: a request sent by the client        :param ring: the ring used for finding backend servers        :param part: the partition number        :param method: the method to send to the backend        :param path: the path to send to the backend                     (full path ends up being  /<$device>/<$part>/<$path>)        :param headers: a list of dicts, where each dict represents one                        backend request that should be made.        :param query_string: optional query string to send to the backend        :param overrides: optional return status override map used to override                          the returned status of a request.        :returns: a swob.Response object        """        # 迭代发送多个HTTP请求到多个节点,并汇聚所有返回的相应结果;根据所有的响应信息,通过投票机制返回最佳响应信息        # 调用get_part_nodes返回分区号对应的所有分区号        start_nodes = ring.get_part_nodes(part)        nodes = GreenthreadSafeIterator(self.app.iter_nodes(ring, part))        # 创建协程池        pile = GreenAsyncPile(len(start_nodes))        for head in headers:            # 从协程池中获取一个协程发送请求到一个远程节点(根据选定的备份数)            pile.spawn(self._make_request, nodes, part, method, path,                       head, query_string, self.app.logger.thread_locals)        response = []        statuses = []        for resp in pile:            if not resp:                continue            response.append(resp)            statuses.append(resp[0])            if self.have_quorum(statuses, len(start_nodes)):                break        # give any pending requests *some* chance to finish        # 等到所有请求都返回        finished_quickly = pile.waitall(self.app.post_quorum_timeout)        for resp in finished_quickly:            if not resp:                continue            response.append(resp)            statuses.append(resp[0])        while len(response) < len(start_nodes):            response.append((HTTP_SERVICE_UNAVAILABLE, '', '', ''))        statuses, reasons, resp_headers, bodies = zip(*response)        # 通过投票机制返回最佳响应信息        return self.best_response(req, statuses, reasons, bodies,                                  '%s %s' % (self.server_type, req.method),                                  overrides=overrides, headers=resp_headers)

def best_response(self, req, statuses, reasons, bodies, server_type,                      etag=None, headers=None, overrides=None,                      quorum_size=None):        """        Given a list of responses from several servers, choose the best to        return to the API.        :param req: swob.Request object        :param statuses: list of statuses returned        :param reasons: list of reasons for each status        :param bodies: bodies of each response        :param server_type: type of server the responses came from        :param etag: etag        :param headers: headers of each response        :param overrides: overrides to apply when lacking quorum        :param quorum_size: quorum size to use        :returns: swob.Response object with the correct status, body, etc. set        """        # 调用_compute_quorum_response,根据Response的状态等选出最佳响应        if quorum_size is None:            quorum_size = self._quorum_size(len(statuses))        resp = self._compute_quorum_response(            req, statuses, reasons, bodies, etag, headers,            quorum_size=quorum_size)        if overrides and not resp:            faked_up_status_indices = set()            transformed = []            for (i, (status, reason, hdrs, body)) in enumerate(zip(                    statuses, reasons, headers, bodies)):                if status in overrides:                    faked_up_status_indices.add(i)                    transformed.append((overrides[status], '', '', ''))                else:                    transformed.append((status, reason, hdrs, body))            statuses, reasons, headers, bodies = zip(*transformed)            resp = self._compute_quorum_response(                req, statuses, reasons, bodies, etag, headers,                indices_to_avoid=faked_up_status_indices,                quorum_size=quorum_size)        if not resp:            resp = Response(request=req)            self.app.logger.error(_('%(type)s returning 503 for %(statuses)s'),                                  {'type': server_type, 'statuses': statuses})            resp.status = '503 Internal Server Error'        return resp


def PUT(self, req):        """Handle HTTP PUT request."""        # 处理PUT请求        # 若url中包括
,新建数据库中的Container信息 # 若url中不包括
,更新数据库中的Account的metadata信息 drive, part, account, container = split_and_validate_path(req, 3, 4) if self.mount_check and not check_mount(self.root, drive): return HTTPInsufficientStorage(drive=drive, request=req) if container: # put account container if 'x-timestamp' not in req.headers: timestamp = Timestamp(time.time()) else: timestamp = valid_timestamp(req) pending_timeout = None container_policy_index = \ req.headers.get('X-Backend-Storage-Policy-Index', 0) if 'x-trans-id' in req.headers: pending_timeout = 3 # 调用_get_account_broker返回AccountBroker实例 broker = self._get_account_broker(drive, part, account, pending_timeout=pending_timeout) # 检查account对应的数据库不存在,则初始化AccountBroker的数据库 if account.startswith(self.auto_create_account_prefix) and \ not os.path.exists(broker.db_file): try: broker.initialize(timestamp.internal) except DatabaseAlreadyExists: pass if req.headers.get('x-account-override-deleted', 'no').lower() != \ 'yes' and broker.is_deleted(): return HTTPNotFound(request=req) # 将Container的信息存入数据库 broker.put_container(container, req.headers['x-put-timestamp'], req.headers['x-delete-timestamp'], req.headers['x-object-count'], req.headers['x-bytes-used'], container_policy_index) if req.headers['x-delete-timestamp'] > \ req.headers['x-put-timestamp']: return HTTPNoContent(request=req) else: return HTTPCreated(request=req) else: # put account timestamp = valid_timestamp(req) broker = self._get_account_broker(drive, part, account) if not os.path.exists(broker.db_file): try: broker.initialize(timestamp.internal) created = True except DatabaseAlreadyExists: created = False # 检查Account是否被标记为删除 elif broker.is_status_deleted(): return self._deleted_response(broker, req, HTTPForbidden, body='Recently deleted') # 检查Account是否已被删除 else: created = broker.is_deleted() broker.update_put_timestamp(timestamp.internal) if broker.is_deleted(): return HTTPConflict(request=req) # 更新数据库中的Account 的metadata信息 metadata = {} metadata.update((key, (value, timestamp.internal)) for key, value in req.headers.iteritems() if is_sys_or_user_meta('account', key)) if metadata: broker.update_metadata(metadata, validate_metadata=True) if created: return HTTPCreated(request=req) else: return HTTPAccepted(request=req)


def POST(self, req):        """HTTP POST request handler."""        # 处理HTTP POST请求        if len(self.account_name) > constraints.MAX_ACCOUNT_NAME_LENGTH:            resp = HTTPBadRequest(request=req)            resp.body = 'Account name length of %d longer than %d' % \                        (len(self.account_name),                         constraints.MAX_ACCOUNT_NAME_LENGTH)            return resp        error_response = check_metadata(req, 'account')        if error_response:            return error_response        # 获取Account对应的分区号,及其对应的节点号        account_partition, accounts = \            self.app.account_ring.get_nodes(self.account_name)        # 根据原始请求头信息调用generate_request_headers生成新格式的请求头        headers = self.generate_request_headers(req, transfer=True)        clear_info_cache(self.app, req.environ, self.account_name)        # 调用make_requests,迭代发送多个HTTP请求到多个节点,并汇聚所有返回的相应结果;根据所有的响应信息,通过投票机制返回最佳响应信息        resp = self.make_requests(            req, self.app.account_ring, account_partition, 'POST',            req.swift_entity_path, [headers] * len(accounts))        # 如果指定的account_name不存在,则先新建Account,再调用make_requests        if resp.status_int == HTTP_NOT_FOUND and self.app.account_autocreate:            self.autocreate_account(req, self.account_name)            resp = self.make_requests(                req, self.app.account_ring, account_partition, 'POST',                req.swift_entity_path, [headers] * len(accounts))        self.add_acls_from_sys_metadata(resp)        return resp


def POST(self, req):        """Handle HTTP POST request."""        # 处理POST请求        # 更新Account的matadata        drive, part, account = split_and_validate_path(req, 3)        req_timestamp = valid_timestamp(req)        if self.mount_check and not check_mount(self.root, drive):            return HTTPInsufficientStorage(drive=drive, request=req)        # 调用_get_account_broker返回AccountBroker实例        broker = self._get_account_broker(drive, part, account)        if broker.is_deleted():            return self._deleted_response(broker, req, HTTPNotFound)        # 利用Head中的metadata更新数据库中Account的metadata        metadata = {}        metadata.update((key, (value, req_timestamp.internal))                        for key, value in req.headers.iteritems()                        if is_sys_or_user_meta('account', key))        if metadata:            broker.update_metadata(metadata, validate_metadata=True)        return HTTPNoContent(request=req)


def DELETE(self, req):        """HTTP DELETE request handler."""        # 处理HTTP DELETE请求        # Extra safety in case someone typos a query string for an        # account-level DELETE request that was really meant to be caught by        # some middleware.        if req.query_string:            return HTTPBadRequest(request=req)        if not self.app.allow_account_management:            return HTTPMethodNotAllowed(                request=req,                headers={'Allow': ', '.join(self.allowed_methods)})        # 获取Account对应的分区号,及其对应的节点号        account_partition, accounts = \            self.app.account_ring.get_nodes(self.account_name)        # 根据原始请求头信息调用generate_request_headers生成新格式的请求头        headers = self.generate_request_headers(req)        clear_info_cache(self.app, req.environ, self.account_name)        # 调用make_requests        resp = self.make_requests(            req, self.app.account_ring, account_partition, 'DELETE',            req.swift_entity_path, [headers] * len(accounts))        return resp


def DELETE(self, req):        """Handle HTTP DELETE request."""        # 处理DELETE请求        drive, part, account = split_and_validate_path(req, 3)        if self.mount_check and not check_mount(self.root, drive):            return HTTPInsufficientStorage(drive=drive, request=req)        req_timestamp = valid_timestamp(req)        broker = self._get_account_broker(drive, part, account)        # 检查当前Account是否已经被删除,如果已经被删除则直接返回        if broker.is_deleted():            return self._deleted_response(broker, req, HTTPNotFound)        # 将数据库中当前Account标记为删除状态,由AccountReaper服务来完成真正的清理工作        broker.delete_db(req_timestamp.internal)        return self._deleted_response(broker, req, HTTPNoContent)



