本文共 27291 字,大约阅读时间需要 90 分钟。
Swift启用WSGI服务的事件循环队列pipeline: catch_errors, proxy-logging, cache, authtoken, keystone, (slo), proxy-server。通过proxy-server的服务入口点,实现请求的具体处理和响应。
proxy-server服务入口点(/swift-kilo-eol/swift/proxy/server.py)
def __call__(self, env, start_response): """ WSGI 服务入口点 :param env: WSGI 环境变量(字典类型) :param start_response: WSGI 可调用对象 """ # 获取环境变量env中的'swift.cache',若为None,直接报错返回 # 若req.headers中只有'x-storage-token',则用req.headers的'x-auth-token'来更新 # 进而调用handle_request进行具体的执行和转发 try: if self.memcache is None: self.memcache = cache_from_env(env, True) req = self.update_request(Request(env)) return self.handle_request(req)(env, start_response) except UnicodeError: err = HTTPPreconditionFailed( request=req, body='Invalid UTF8 or contains NULL') return err(env, start_response) except (Exception, Timeout): start_response('500 Server Error', [('Content-Type', 'text/plain')]) return ['Internal server error.\n']
事实上真正调用的是handle_request()来完成请求的具体处理: 1.根据req.path的信息返回对应的控制器类;
2.实例化具体的控制器类对象;
3.获取控制器类中由req.method指定的方法;
4.执行具体的方法并返回。
def handle_request(self, req): """ Entry point for proxy server. Should return a WSGI-style callable (such as swob.Response). :param req: swob.Request object """ try: self.logger.set_statsd_prefix('proxy-server') if req.content_length and req.content_length < 0: self.logger.increment('errors') return HTTPBadRequest(request=req, body='Invalid Content-Length') try: if not check_utf8(req.path_info): self.logger.increment('errors') return HTTPPreconditionFailed( request=req, body='Invalid UTF8 or contains NULL') except UnicodeError: self.logger.increment('errors') return HTTPPreconditionFailed( request=req, body='Invalid UTF8 or contains NULL') try: # 根据req.path的信息返回对应的控制器类和字典(由版本、Account、Container和Object名组成) # 若req.path是/info,则返回InfoController; # 若req.path中account、container、object都存在,则返回ObjectController # 若req.path中只有account、container,则返回ObjectController # 若req.path中只有account,则返回AccountController controller, path_parts = self.get_controller(req) p = req.path_info if isinstance(p, unicode): p = p.encode('utf-8') except APIVersionError: self.logger.increment('errors') return HTTPBadRequest(request=req) except ValueError: self.logger.increment('errors') return HTTPNotFound(request=req) if not controller: self.logger.increment('errors') return HTTPPreconditionFailed(request=req, body='Bad URL') if self.deny_host_headers and \ req.host.split(':')[0] in self.deny_host_headers: return HTTPForbidden(request=req, body='Invalid host header') self.logger.set_statsd_prefix('proxy-server.' + controller.server_type.lower()) # 用get_controller返回的字典 实例化具体的控制器类对象 controller = controller(self, **path_parts) if 'swift.trans_id' not in req.environ: # if this wasn't set by an earlier middleware, set it now trans_id_suffix = self.trans_id_suffix trans_id_extra = req.headers.get('x-trans-id-extra') if trans_id_extra: trans_id_suffix += '-' + trans_id_extra[:32] trans_id = generate_trans_id(trans_id_suffix) req.environ['swift.trans_id'] = trans_id self.logger.txn_id = trans_id req.headers['x-trans-id'] = req.environ['swift.trans_id'] controller.trans_id = req.environ['swift.trans_id'] self.logger.client_ip = get_remote_client(req) try: # 获取具体控制器类中由request指定的方法 handler = getattr(controller, req.method) getattr(handler, 'publicly_accessible') except AttributeError: allowed_methods = getattr(controller, 'allowed_methods', set()) return HTTPMethodNotAllowed( request=req, headers={'Allow': ', '.join(allowed_methods)}) old_authorize = None if 'swift.authorize' in req.environ: # We call authorize before the handler, always. If authorized, # we remove the swift.authorize hook so isn't ever called # again. If not authorized, we return the denial unless the # controller's method indicates it'd like to gather more # information and try again later. resp = req.environ['swift.authorize'](req) if not resp and not req.headers.get('X-Copy-From-Account') \ and not req.headers.get('Destination-Account'): # No resp means authorized, no delayed recheck required. old_authorize = req.environ['swift.authorize'] else: # Response indicates denial, but we might delay the denial # and recheck later. If not delayed, return the error now. if not getattr(handler, 'delay_denial', None): return resp # Save off original request method (GET, POST, etc.) in case it # gets mutated during handling. This way logging can display the # method the client actually sent. req.environ['swift.orig_req_method'] = req.method try: if old_authorize: req.environ.pop('swift.authorize', None) # 执行具体控制器类中由request指定的方法 return handler(req) finally: if old_authorize: req.environ['swift.authorize'] = old_authorize except HTTPException as error_response: return error_response except (Exception, Timeout): self.logger.exception(_('ERROR Unhandled exception in request')) return HTTPServerError(request=req)
具体调用过程:请求到达proxy-server的服务入口点之后,在handle_request()中获取具体的控制器类(AccountController), 接着调用/swift-kilo-eol/swift/controllers/account.py下面的GET/HEAD/PUT/POST/DELETE等方法实现与具体存储服务(AccountServer)的连接,进而调用具体的GET/HEAD/PUT/POST/DELETE等方法实现请求的处理和相应
下述是GET/HEAD/PUT/POST/DELETE等方法在源码中的具体调用过程,详细过程已在源码中注释
/swift-kilo-eol/swift/proxy/controller/account.py
def GETorHEAD(self, req): """Handler for HTTP GET/HEAD requests.""" # 处理HTTP GET/HEAD请求 if len(self.account_name) > constraints.MAX_ACCOUNT_NAME_LENGTH: resp = HTTPBadRequest(request=req) resp.body = 'Account name length of %d longer than %d' % \ (len(self.account_name), constraints.MAX_ACCOUNT_NAME_LENGTH) return resp # 调用get_part获取经过一致性Hash取值和移位之后的Object存放的分区号 # 调用iter_nodes获取排除了HandOff以及Error的Object存放的节点号 # 调用GETorHEAD_base处理HTTP GET/HEAD请求 返回swob.Response对象 partition = self.app.account_ring.get_part(self.account_name) node_iter = self.app.iter_nodes(self.app.account_ring, partition) resp = self.GETorHEAD_base( req, _('Account'), node_iter, partition, req.swift_entity_path.rstrip('/')) if resp.status_int == HTTP_NOT_FOUND: if resp.headers.get('X-Account-Status', '').lower() == 'deleted': resp.status = HTTP_GONE elif self.app.account_autocreate: resp = account_listing_response(self.account_name, req, get_listing_content_type(req)) if req.environ.get('swift_owner'): self.add_acls_from_sys_metadata(resp) else: for header in self.app.swift_owner_headers: resp.headers.pop(header, None) return resp
/swift-kilo-eol/swift/account/server.py
def HEAD(self, req): """Handle HTTP HEAD request.""" # 处理HEAD请求,返回Account的基本信息,以KV的形式保存在HEAD中 drive, part, account = split_and_validate_path(req, 3) out_content_type = get_listing_content_type(req) # 进行mount检查 if self.mount_check and not check_mount(self.root, drive): return HTTPInsufficientStorage(drive=drive, request=req) # 返回一个AccountBroker实例,用于对sqlite数据的查询操作 broker = self._get_account_broker(drive, part, account, pending_timeout=0.1, stale_reads_ok=True) if broker.is_deleted(): return self._deleted_response(broker, req, HTTPNotFound) # get_response_headers内部调用get_info()获取Account的基本信息,并更新res的HEAD('X-Account-Container-Count','X-Account-Object-Count': info['object_count'], # 'X-Account-Bytes-Used', 'X-Timestamp', 'X-PUT-Timestamp') headers = get_response_headers(broker) headers['Content-Type'] = out_content_type return HTTPNoContent(request=req, headers=headers, charset='utf-8')
/swift-kilo-eol/swift/account/server.py
def GET(self, req): """Handle HTTP GET request.""" # 处理GET请求,返回Account的基本信息,以KV的形式保存在HEAD中,但与HEAD不一样的是GET方法中获取了 # 指定Account下的Container列表,存储在Body中 # 调用机制与HEAD类似 drive, part, account = split_and_validate_path(req, 3) prefix = get_param(req, 'prefix') delimiter = get_param(req, 'delimiter') if delimiter and (len(delimiter) > 1 or ord(delimiter) > 254): # delimiters can be made more flexible later return HTTPPreconditionFailed(body='Bad delimiter') limit = constraints.ACCOUNT_LISTING_LIMIT given_limit = get_param(req, 'limit') if given_limit and given_limit.isdigit(): limit = int(given_limit) if limit > constraints.ACCOUNT_LISTING_LIMIT: return HTTPPreconditionFailed( request=req, body='Maximum limit is %d' % constraints.ACCOUNT_LISTING_LIMIT) marker = get_param(req, 'marker', '') end_marker = get_param(req, 'end_marker') out_content_type = get_listing_content_type(req) if self.mount_check and not check_mount(self.root, drive): return HTTPInsufficientStorage(drive=drive, request=req) broker = self._get_account_broker(drive, part, account, pending_timeout=0.1, stale_reads_ok=True) if broker.is_deleted(): return self._deleted_response(broker, req, HTTPNotFound) # 获取Account对应的Container列表,每个Container信息包括(name, object_count, bytes_used, 0),以list形式返回 # 以不同的形式返回Account对应的Container列表,最普通的是放在body中 return account_listing_response(account, req, out_content_type, broker, limit, marker, end_marker, prefix, delimiter)
def account_listing_response(account, req, response_content_type, broker=None, limit='', marker='', end_marker='', prefix='', delimiter=''): if broker is None: broker = FakeAccountBroker() resp_headers = get_response_headers(broker) # 获取Account对应的Container列表,每个Container信息包括(name, object_count, bytes_used, 0),以list形式返回 account_list = broker.list_containers_iter(limit, marker, end_marker, prefix, delimiter) # 以不同的形式返回Account对应的Container列表,最普通的是放在body中 if response_content_type == 'application/json': data = [] for (name, object_count, bytes_used, is_subdir) in account_list: if is_subdir: data.append({'subdir': name}) else: data.append({'name': name, 'count': object_count, 'bytes': bytes_used}) account_list = json.dumps(data) elif response_content_type.endswith('/xml'): output_list = [' ', '' % saxutils.quoteattr(account)] for (name, object_count, bytes_used, is_subdir) in account_list: if is_subdir: output_list.append( ' ') account_list = '\n'.join(output_list) else: if not account_list: resp = HTTPNoContent(request=req, headers=resp_headers) resp.content_type = response_content_type resp.charset = 'utf-8' return resp account_list = '\n'.join(r[0] for r in account_list) + '\n' ret = HTTPOk(body=account_list, request=req, headers=resp_headers) ret.content_type = response_content_type ret.charset = 'utf-8' return ret' % saxutils.quoteattr(name)) else: item = ' ' % \ (saxutils.escape(name), object_count, bytes_used) output_list.append(item) output_list.append(' %s %s ' \ '%s
/swift-kilo-eol/swift/proxy/controller/account.py
def PUT(self, req): """HTTP PUT request handler.""" # 处理HTTP PUT请求 if not self.app.allow_account_management: return HTTPMethodNotAllowed( request=req, headers={'Allow': ', '.join(self.allowed_methods)}) error_response = check_metadata(req, 'account') if error_response: return error_response if len(self.account_name) > constraints.MAX_ACCOUNT_NAME_LENGTH: resp = HTTPBadRequest(request=req) resp.body = 'Account name length of %d longer than %d' % \ (len(self.account_name), constraints.MAX_ACCOUNT_NAME_LENGTH) return resp # 获取Account对应的分区号,及其对应的节点号(节点以元祖形式返回) account_partition, accounts = \ self.app.account_ring.get_nodes(self.account_name) # 根据原始请求头信息调用generate_request_headers生成新格式的请求头 headers = self.generate_request_headers(req, transfer=True) clear_info_cache(self.app, req.environ, self.account_name) # 调用make_requests,迭代发送多个HTTP请求到多个节点,并汇聚所有返回的相应结果;根据所有的响应信息,通过投票机制返回最佳响应信息 resp = self.make_requests( req, self.app.account_ring, account_partition, 'PUT', req.swift_entity_path, [headers] * len(accounts)) self.add_acls_from_sys_metadata(resp) return resp
def make_requests(self, req, ring, part, method, path, headers, query_string='', overrides=None): """ Sends an HTTP request to multiple nodes and aggregates the results. It attempts the primary nodes concurrently, then iterates over the handoff nodes as needed. :param req: a request sent by the client :param ring: the ring used for finding backend servers :param part: the partition number :param method: the method to send to the backend :param path: the path to send to the backend (full path ends up being /<$device>/<$part>/<$path>) :param headers: a list of dicts, where each dict represents one backend request that should be made. :param query_string: optional query string to send to the backend :param overrides: optional return status override map used to override the returned status of a request. :returns: a swob.Response object """ # 迭代发送多个HTTP请求到多个节点,并汇聚所有返回的相应结果;根据所有的响应信息,通过投票机制返回最佳响应信息 # 调用get_part_nodes返回分区号对应的所有分区号 start_nodes = ring.get_part_nodes(part) nodes = GreenthreadSafeIterator(self.app.iter_nodes(ring, part)) # 创建协程池 pile = GreenAsyncPile(len(start_nodes)) for head in headers: # 从协程池中获取一个协程发送请求到一个远程节点(根据选定的备份数) pile.spawn(self._make_request, nodes, part, method, path, head, query_string, self.app.logger.thread_locals) response = [] statuses = [] for resp in pile: if not resp: continue response.append(resp) statuses.append(resp[0]) if self.have_quorum(statuses, len(start_nodes)): break # give any pending requests *some* chance to finish # 等到所有请求都返回 finished_quickly = pile.waitall(self.app.post_quorum_timeout) for resp in finished_quickly: if not resp: continue response.append(resp) statuses.append(resp[0]) while len(response) < len(start_nodes): response.append((HTTP_SERVICE_UNAVAILABLE, '', '', '')) statuses, reasons, resp_headers, bodies = zip(*response) # 通过投票机制返回最佳响应信息 return self.best_response(req, statuses, reasons, bodies, '%s %s' % (self.server_type, req.method), overrides=overrides, headers=resp_headers)
def best_response(self, req, statuses, reasons, bodies, server_type, etag=None, headers=None, overrides=None, quorum_size=None): """ Given a list of responses from several servers, choose the best to return to the API. :param req: swob.Request object :param statuses: list of statuses returned :param reasons: list of reasons for each status :param bodies: bodies of each response :param server_type: type of server the responses came from :param etag: etag :param headers: headers of each response :param overrides: overrides to apply when lacking quorum :param quorum_size: quorum size to use :returns: swob.Response object with the correct status, body, etc. set """ # 调用_compute_quorum_response,根据Response的状态等选出最佳响应 if quorum_size is None: quorum_size = self._quorum_size(len(statuses)) resp = self._compute_quorum_response( req, statuses, reasons, bodies, etag, headers, quorum_size=quorum_size) if overrides and not resp: faked_up_status_indices = set() transformed = [] for (i, (status, reason, hdrs, body)) in enumerate(zip( statuses, reasons, headers, bodies)): if status in overrides: faked_up_status_indices.add(i) transformed.append((overrides[status], '', '', '')) else: transformed.append((status, reason, hdrs, body)) statuses, reasons, headers, bodies = zip(*transformed) resp = self._compute_quorum_response( req, statuses, reasons, bodies, etag, headers, indices_to_avoid=faked_up_status_indices, quorum_size=quorum_size) if not resp: resp = Response(request=req) self.app.logger.error(_('%(type)s returning 503 for %(statuses)s'), {'type': server_type, 'statuses': statuses}) resp.status = '503 Internal Server Error' return resp
/swift-kilo-eol/swift/account/server.py
def PUT(self, req): """Handle HTTP PUT request.""" # 处理PUT请求 # 若url中包括,新建数据库中的Container信息 # 若url中不包括 ,更新数据库中的Account的metadata信息 drive, part, account, container = split_and_validate_path(req, 3, 4) if self.mount_check and not check_mount(self.root, drive): return HTTPInsufficientStorage(drive=drive, request=req) if container: # put account container if 'x-timestamp' not in req.headers: timestamp = Timestamp(time.time()) else: timestamp = valid_timestamp(req) pending_timeout = None container_policy_index = \ req.headers.get('X-Backend-Storage-Policy-Index', 0) if 'x-trans-id' in req.headers: pending_timeout = 3 # 调用_get_account_broker返回AccountBroker实例 broker = self._get_account_broker(drive, part, account, pending_timeout=pending_timeout) # 检查account对应的数据库不存在,则初始化AccountBroker的数据库 if account.startswith(self.auto_create_account_prefix) and \ not os.path.exists(broker.db_file): try: broker.initialize(timestamp.internal) except DatabaseAlreadyExists: pass if req.headers.get('x-account-override-deleted', 'no').lower() != \ 'yes' and broker.is_deleted(): return HTTPNotFound(request=req) # 将Container的信息存入数据库 broker.put_container(container, req.headers['x-put-timestamp'], req.headers['x-delete-timestamp'], req.headers['x-object-count'], req.headers['x-bytes-used'], container_policy_index) if req.headers['x-delete-timestamp'] > \ req.headers['x-put-timestamp']: return HTTPNoContent(request=req) else: return HTTPCreated(request=req) else: # put account timestamp = valid_timestamp(req) broker = self._get_account_broker(drive, part, account) if not os.path.exists(broker.db_file): try: broker.initialize(timestamp.internal) created = True except DatabaseAlreadyExists: created = False # 检查Account是否被标记为删除 elif broker.is_status_deleted(): return self._deleted_response(broker, req, HTTPForbidden, body='Recently deleted') # 检查Account是否已被删除 else: created = broker.is_deleted() broker.update_put_timestamp(timestamp.internal) if broker.is_deleted(): return HTTPConflict(request=req) # 更新数据库中的Account 的metadata信息 metadata = {} metadata.update((key, (value, timestamp.internal)) for key, value in req.headers.iteritems() if is_sys_or_user_meta('account', key)) if metadata: broker.update_metadata(metadata, validate_metadata=True) if created: return HTTPCreated(request=req) else: return HTTPAccepted(request=req)
/swift-kilo-eol/swift/proxy/controller/account.py
def POST(self, req): """HTTP POST request handler.""" # 处理HTTP POST请求 if len(self.account_name) > constraints.MAX_ACCOUNT_NAME_LENGTH: resp = HTTPBadRequest(request=req) resp.body = 'Account name length of %d longer than %d' % \ (len(self.account_name), constraints.MAX_ACCOUNT_NAME_LENGTH) return resp error_response = check_metadata(req, 'account') if error_response: return error_response # 获取Account对应的分区号,及其对应的节点号 account_partition, accounts = \ self.app.account_ring.get_nodes(self.account_name) # 根据原始请求头信息调用generate_request_headers生成新格式的请求头 headers = self.generate_request_headers(req, transfer=True) clear_info_cache(self.app, req.environ, self.account_name) # 调用make_requests,迭代发送多个HTTP请求到多个节点,并汇聚所有返回的相应结果;根据所有的响应信息,通过投票机制返回最佳响应信息 resp = self.make_requests( req, self.app.account_ring, account_partition, 'POST', req.swift_entity_path, [headers] * len(accounts)) # 如果指定的account_name不存在,则先新建Account,再调用make_requests if resp.status_int == HTTP_NOT_FOUND and self.app.account_autocreate: self.autocreate_account(req, self.account_name) resp = self.make_requests( req, self.app.account_ring, account_partition, 'POST', req.swift_entity_path, [headers] * len(accounts)) self.add_acls_from_sys_metadata(resp) return resp
/swift-kilo-eol/swift/account/server.py
def POST(self, req): """Handle HTTP POST request.""" # 处理POST请求 # 更新Account的matadata drive, part, account = split_and_validate_path(req, 3) req_timestamp = valid_timestamp(req) if self.mount_check and not check_mount(self.root, drive): return HTTPInsufficientStorage(drive=drive, request=req) # 调用_get_account_broker返回AccountBroker实例 broker = self._get_account_broker(drive, part, account) if broker.is_deleted(): return self._deleted_response(broker, req, HTTPNotFound) # 利用Head中的metadata更新数据库中Account的metadata metadata = {} metadata.update((key, (value, req_timestamp.internal)) for key, value in req.headers.iteritems() if is_sys_or_user_meta('account', key)) if metadata: broker.update_metadata(metadata, validate_metadata=True) return HTTPNoContent(request=req)
/swift-kilo-eol/swift/proxy/controller/account.py
def DELETE(self, req): """HTTP DELETE request handler.""" # 处理HTTP DELETE请求 # Extra safety in case someone typos a query string for an # account-level DELETE request that was really meant to be caught by # some middleware. if req.query_string: return HTTPBadRequest(request=req) if not self.app.allow_account_management: return HTTPMethodNotAllowed( request=req, headers={'Allow': ', '.join(self.allowed_methods)}) # 获取Account对应的分区号,及其对应的节点号 account_partition, accounts = \ self.app.account_ring.get_nodes(self.account_name) # 根据原始请求头信息调用generate_request_headers生成新格式的请求头 headers = self.generate_request_headers(req) clear_info_cache(self.app, req.environ, self.account_name) # 调用make_requests resp = self.make_requests( req, self.app.account_ring, account_partition, 'DELETE', req.swift_entity_path, [headers] * len(accounts)) return resp
/swift-kilo-eol/swift/account/server.py
def DELETE(self, req): """Handle HTTP DELETE request.""" # 处理DELETE请求 drive, part, account = split_and_validate_path(req, 3) if self.mount_check and not check_mount(self.root, drive): return HTTPInsufficientStorage(drive=drive, request=req) req_timestamp = valid_timestamp(req) broker = self._get_account_broker(drive, part, account) # 检查当前Account是否已经被删除,如果已经被删除则直接返回 if broker.is_deleted(): return self._deleted_response(broker, req, HTTPNotFound) # 将数据库中当前Account标记为删除状态,由AccountReaper服务来完成真正的清理工作 broker.delete_db(req_timestamp.internal) return self._deleted_response(broker, req, HTTPNoContent)
对于ContainerController和ObjectController的调用过程类似,详情有空再分析。本文还有许多可改进的地方,比如有些具体方法调用过程分析地不够详细
转载地址:http://pmbii.baihongyu.com/