首页 > 代码库 > Python框架之Tornado(请求)
Python框架之Tornado(请求)
概述
本篇就来详细介绍tornado服务器(socket服务端)是如何接收用户请求数据以及如果根据用户请求的URL处理并返回数据,也就是上图的3系列所有步骤,如上图【start】是一个死循环,其中利用epoll监听服务端socket句柄,一旦客户端发送请求,则立即调用HttpServer对象的_handle_events方法来进行请求的处理。
对于整个3系列按照功能可以划分为四大部分:
- 获取用户请求数据(上图3.4)
- 根据用户请求URL进行路由匹配,从而使得某个方法处理具体的请求(上图3.5~3.19)
- 将处理后的数据返回给客户端(上图3.21~3.23)
- 关闭客户端socket(上图3.24~3.26)
3.1、HTTPServer对象的_handle_events方法
此处代码主要有三项任务:
1、 socket.accept() 接收了客户端请求。
2、创建封装了客户端socket对象和IOLoop对象的IOStream实例(用于之后获取或输出数据)。
3、创建HTTPConnection对象,其内容是实现整个功能的逻辑。
class HTTPServer(object):def _handle_events(self, fd, events): while True: try: #======== 获取客户端请求 =========# connection, address = self._socket.accept() except socket.error, e: if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN): return raise if self.ssl_options is not None: assert ssl, "Python 2.6+ and OpenSSL required for SSL" try: connection = ssl.wrap_socket(connection, server_side=True, do_handshake_on_connect=False, **self.ssl_options) except ssl.SSLError, err: if err.args[0] == ssl.SSL_ERROR_EOF: return connection.close() else: raise except socket.error, err: if err.args[0] == errno.ECONNABORTED: return connection.close() else: raise try: #这是的条件是选择https和http请求方式 if self.ssl_options is not None: stream = iostream.SSLIOStream(connection, io_loop=self.io_loop) else: #将客户端socket对象和IOLoop对象封装到IOStream对象中 #IOStream用于从客户端socket中读取请求信息 stream = iostream.IOStream(connection, io_loop=self.io_loop) #创建HTTPConnection对象 #address是客户端IPdizhi #self.request_callback是Application对象,其中包含了:url映射关系和配置文件等.. #so,HTTPConnection的构造函数就是下一步处理请求的位置了.. HTTPConnection(stream, address, self.request_callback,self.no_keep_alive, self.xheaders) except: logging.error("Error in connection callback", exc_info=True)
3.2、IOStream的__init__方法
此处代码主要两项目任务:
- 封装客户端socket和其他信息,以便之后执行该对象的其他方法获取客户端请求的数据和响应客户信息
- 将客户端socket对象添加到epoll,并且指定当客户端socket对象变化时,就去执行 IOStream的_handle_events方法(调用socket.send给用户响应数据)
class IOStream(object): def __init__(self, socket, io_loop=None, max_buffer_size=104857600, read_chunk_size=4096): #客户端socket对象 self.socket = socket self.socket.setblocking(False) self.io_loop = io_loop or ioloop.IOLoop.instance() self.max_buffer_size = max_buffer_size self.read_chunk_size = read_chunk_size self._read_buffer = collections.deque() self._write_buffer = collections.deque() self._write_buffer_frozen = False self._read_delimiter = None self._read_bytes = None self._read_callback = None self._write_callback = None self._close_callback = None self._connect_callback = None self._connecting = False self._state = self.io_loop.ERROR with stack_context.NullContext(): #将客户端socket句柄添加的epoll中,并将IOStream的_handle_events方法添加到 Start 的While循环中 #Start 的While循环中监听客户端socket句柄的状态,以便再最后调用IOStream的_handle_events方法把处理后的信息响应给用户 self.io_loop.add_handler(self.socket.fileno(), self._handle_events, self._state)
3.3、HTTPConnections的__init__方法
此处代码主要两项任务:
- 获取请求数据
- 调用 _on_headers 继续处理请求
对于获取请求数据,其实就是执行IOStream的read_until函数来完成,其内部通过socket.recv(4096)方法获取客户端请求的数据,并以 【\r\n\r\n】作为请求信息结束符(http请求头和内容通过\r\n\r\n分割)。
class HTTPConnection(object): def __init__(self, stream, address, request_callback, no_keep_alive=False,xheaders=False): self.stream = stream #stream是封装了客户端socket和IOLoop实例的IOStream对象 self.address = address #address是客户端IP地址 self.request_callback = request_callback #request_callback是封装了URL映射和配置文件的Application对象。 self.no_keep_alive = no_keep_alive self.xheaders = xheaders self._request = None self._request_finished = False #获取请求信息(请求头和内容),然后执行 HTTPConnection的_on_headers方法继续处理请求 self._header_callback = stack_context.wrap(self._on_headers) self.stream.read_until("\r\n\r\n", self._header_callback)
请求数据格式:
GET / HTTP/1.1Host: localhost:8888Connection: keep-aliveCache-Control: max-age=0Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8User-Agent: Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.118 Safari/537.36Accept-Encoding: gzip, deflate, sdchAccept-Language: zh-CN,zh;q=0.8,en;q=0.6,zh-TW;q=0.4If-None-Match: "e02aa1b106d5c7c6a98def2b13005d5b84fd8dc8"
详细代码解析:
class IOStream(object): def read_until(self, delimiter, callback): """Call callback when we read the given delimiter.""" assert not self._read_callback, "Already reading" #终止界定 \r\n\r\n self._read_delimiter = delimiter #回调函数,即:HTTPConnection的 _on_headers 方法 self._read_callback = stack_context.wrap(callback) while True: #代码概述: #先从socket中读取信息并保存到buffer中 #然后再读取buffer中的数据,以其为参数执行回调函数(HTTPConnection的 _on_headers 方法) #buffer其实是一个线程安装的双端队列collections.deque #从buffer中读取数据,并执行回调函数。 #注意:首次执行时buffer中没有数据 if self._read_from_buffer(): return self._check_closed() #从socket中读取信息到buffer(线程安全的一个双向消息队列) if self._read_to_buffer() == 0: break self._add_io_state(self.io_loop.READ)
class IOStream(object): def _read_to_buffer(self): #省略部分代码 chunk = self._read_from_socket() self._read_buffer.append(chunk) return len(chunk) def _read_from_socket(self): #socket对象的recv函数接收数据 #read_chunk_size在构造函数中默认设置为:4096 chunk = self.socket.recv(self.read_chunk_size) if not chunk: self.close() return None return chunk
class IOStream(object): def _read_from_buffer(self): """Attempts to complete the currently-pending read from the buffer. Returns True if the read was completed. """ #构造函数中默认设置为None if self._read_bytes: if self._read_buffer_size() >= self._read_bytes: num_bytes = self._read_bytes callback = self._read_callback self._read_callback = None self._read_bytes = None self._run_callback(callback, self._consume(num_bytes)) return True #_read_delimiter的值为 \r\n\r\n elif self._read_delimiter: #buffer列表首元素合并,合并详细见_merge_prefix函数 _merge_prefix(self._read_buffer, sys.maxint) #获取 \r\n\r\n 所在 buffer 首元素的位置索引 loc = self._read_buffer[0].find(self._read_delimiter) if loc != -1: #如果在请求中找到了 \r\n\r\n #self._read_callback 是HTTPConnection对象的 _on_headers 方法 callback = self._read_callback delimiter_len = len(self._read_delimiter) #获取 \r\n\r\n 的长度 self._read_callback = None self._read_delimiter = None #============ 执行HTTPConnection对象的 _on_headers 方法 ============= #self._consume(loc + delimiter_len)用来获取 buffer 的首元素(请求的信息其实就被封装到了buffer的首个元素中) self._run_callback(callback,self._consume(loc + delimiter_len)) return True return False
3.4、HTTPConnnection的 _on_headers 方法(含3.5)
上述代码主要有两个任务:
- 根据获取的请求信息生成响应的请求头键值对,并把信息封装到HttpRequest对象中
- 调用Application的__call__方法,继续处理请求
class HTTPConnection(object): def _on_headers(self, data): try: data = http://www.mamicode.com/native_str(data.decode(‘latin1‘))"\r\n") #获取请求的起始行数据,例如:GET / HTTP/1.1 start_line = data[:eol] try: #请求方式、请求地址、http版本号 method, uri, version = start_line.split(" ") except ValueError: raise _BadRequestException("Malformed HTTP request line") if not version.startswith("HTTP/"): raise _BadRequestException("Malformed HTTP version in HTTP Request-Line") #把请求头信息包装到一个字典中。(不包括第一行) headers = httputil.HTTPHeaders.parse(data[eol:]) #把请求信息封装到一个HTTPRequest对象中 #注意:self._request = HTTPRequest, #HTTPRequest中封装了HTTPConnection #HTTPConnection中封装了stream和application self._request = HTTPRequest(connection=self, method=method, uri=uri, version=version,headers=headers, remote_ip=self.address[0]) #从请求头中获取 Content-Length content_length = headers.get("Content-Length") if content_length: content_length = int(content_length) if content_length > self.stream.max_buffer_size: raise _BadRequestException("Content-Length too long") if headers.get("Expect") == "100-continue": self.stream.write("HTTP/1.1 100 (Continue)\r\n\r\n") self.stream.read_bytes(content_length, self._on_request_body) return #**************** 执行Application对象的 __call__ 方法,也就是路由系统的入口 ******************* self.request_callback(self._request) except _BadRequestException, e: logging.info("Malformed HTTP request from %s: %s", self.address[0], e) self.stream.close() return
class HTTPRequest(object): def __init__(self, method, uri, version="HTTP/1.0", headers=None, body=None, remote_ip=None, protocol=None, host=None, files=None, connection=None): self.method = method self.uri = uri self.version = version self.headers = headers or httputil.HTTPHeaders() self.body = body or "" if connection and connection.xheaders: # Squid uses X-Forwarded-For, others use X-Real-Ip self.remote_ip = self.headers.get( "X-Real-Ip", self.headers.get("X-Forwarded-For", remote_ip)) # AWS uses X-Forwarded-Proto self.protocol = self.headers.get( "X-Scheme", self.headers.get("X-Forwarded-Proto", protocol)) if self.protocol not in ("http", "https"): self.protocol = "http" else: self.remote_ip = remote_ip if protocol: self.protocol = protocol elif connection and isinstance(connection.stream, iostream.SSLIOStream): self.protocol = "https" else: self.protocol = "http" self.host = host or self.headers.get("Host") or "127.0.0.1" self.files = files or {} self.connection = connection self._start_time = time.time() self._finish_time = None scheme, netloc, path, query, fragment = urlparse.urlsplit(uri) self.path = path self.query = query arguments = cgi.parse_qs(query) self.arguments = {} for name, values in arguments.iteritems(): values = [v for v in values if v] if values: self.arguments[name] = values
3.6、Application的__call__方法(含3.7、3.8、3.9)
此处代码主要有三个项任务:
- 根据请求的url和封装在Application对象中的url映射做匹配,获取url所对应的Handler对象。ps:Handlers泛指继承RequestHandler的类
- 创建Handler对象,即:执行Handler的__init__方法
- 执行Handler对象的 _execute 方法
注意:
1、执行Application的 __call__ 方法时,其参数request是HTTPRequest对象(其中封装HTTPConnetion、Stream、Application对象、请求头信息)
2、Handler泛指就是我们定义的用于处理请求的类并且她还继承自RequestHandler
class Application(object): def __call__(self, request): """Called by HTTPServer to execute the request.""" transforms = [t(request) for t in self.transforms] handler = None args = [] kwargs = {} #根据请求的目标主机,匹配主机模版对应的正则表达式和Handlers handlers = self._get_host_handlers(request) if not handlers: handler = RedirectHandler( self, request, url="http://" + self.default_host + "/") else: for spec in handlers: match = spec.regex.match(request.path) if match: # None-safe wrapper around url_unescape to handle # unmatched optional groups correctly def unquote(s): if s is None: return s return escape.url_unescape(s, encoding=None) handler = spec.handler_class(self, request, **spec.kwargs) #创建RquestHandler对象 # Pass matched groups to the handler. Since # match.groups() includes both named and unnamed groups, # we want to use either groups or groupdict but not both. # Note that args are passed as bytes so the handler can # decide what encoding to use. kwargs = dict((k, unquote(v)) for (k, v) in match.groupdict().iteritems()) if kwargs: args = [] else: args = [unquote(s) for s in match.groups()] break if not handler: handler = ErrorHandler(self, request, status_code=404) # In debug mode, re-compile templates and reload static files on every # request so you don‘t need to restart to see changes if self.settings.get("debug"): if getattr(RequestHandler, "_templates", None): for loader in RequestHandler._templates.values(): loader.reset() RequestHandler._static_hashes = {} #==== 执行RequestHandler的_execute方法 ==== handler._execute(transforms, *args, **kwargs) return handler
class Application(object): def _get_host_handlers(self, request): #将请求的host和handlers中的主机模型进行匹配 host = request.host.lower().split(‘:‘)[0] for pattern, handlers in self.handlers: if pattern.match(host): return handlers # Look for default host if not behind load balancer (for debugging) if "X-Real-Ip" not in request.headers: for pattern, handlers in self.handlers: if pattern.match(self.default_host): return handlers return None
class RequestHandler(object): SUPPORTED_METHODS = ("GET", "HEAD", "POST", "DELETE", "PUT", "OPTIONS") def __init__(self, application, request, **kwargs): self.application = application self.request = request self._headers_written = False self._finished = False self._auto_finish = True self._transforms = None # will be set in _execute #获取在application中设置的 ui_modules 和ui_method self.ui = _O((n, self._ui_method(m)) for n, m in application.ui_methods.iteritems()) self.ui["modules"] = _O((n, self._ui_module(n, m)) for n, m in application.ui_modules.iteritems()) self.clear() #设置服务器、内容类型编码和连接 # Check since connection is not available in WSGI #检查连接是否可用,应该是长短连接有关。 if hasattr(self.request, "connection"): self.request.connection.stream.set_close_callback(self.on_connection_close) self.initialize(**kwargs) def initialize(self): pass def clear(self): """Resets all headers and content for this response.""" self._headers = { "Server": "TornadoServer/%s" % tornado.version, "Content-Type": "text/html; charset=UTF-8", } if not self.request.supports_http_1_1(): if self.request.headers.get("Connection") == "Keep-Alive": self.set_header("Connection", "Keep-Alive") self._write_buffer = [] self._status_code = 200
上述过程中,首先根据请求的URL去路由规则中匹配,一旦匹配成功,则创建路由相对应的handler的实例。例如:如果请求 的url是【/index/11】则会创建IndexHandler实例,然后再执行该对象的 _execute 方法。由于所有的 xxxHandler 类是RequestHandler的派生类,所以会默认执行 RequestHandler的 _execute 方法。
3.10 RequestHandler的_execute方法 (含有3.11、3.12、3.13)
此处代码主要有三项任务:
- 扩展点,因为self.prepare默认是空方法,所有可以在这里被重写
- 通过反射执行Handler的get/post/put/delete等方法
- 完成请求处理后,执行finish方法
class RequestHandler(object): def _execute(self, transforms, *args, **kwargs): """Executes this request with the given output transforms.""" self._transforms = transforms with stack_context.ExceptionStackContext( self._stack_context_handle_exception): if self.request.method not in self.SUPPORTED_METHODS: raise HTTPError(405) # If XSRF cookies are turned on, reject form submissions without # the proper cookie if self.request.method not in ("GET", "HEAD") and self.application.settings.get("xsrf_cookies"): self.check_xsrf_cookie() self.prepare() if not self._finished: #通过反射的方法,执行 RequestHandler 派生类的的 get、post、put方法 getattr(self, self.request.method.lower())(*args, **kwargs) if self._auto_finish and not self._finished: self.finish()
例:用户发送get请求
class MyHandler(tornado.web.RequestHandler): def get(self): self.write("Hello, world")
class RequestHandler(object): def write(self, chunk): assert not self._finished if isinstance(chunk, dict): chunk = escape.json_encode(chunk) self.set_header("Content-Type", "text/javascript; charset=UTF-8") chunk = _utf8(chunk) self._write_buffer.append(chunk)
上述在执行RequestHandler的write方法时,讲数据保存在Handler对象的 _write_buffer 列表中,在之后执行finish时再讲数据写到IOStream对象的_write_buffer字段中,其类型是双向队列collections.deque()。
Python框架之Tornado(请求)