文档章节

tornado 源码初识

国夫君
 国夫君
发布于 2015/07/08 21:39
字数 2107
阅读 2167
收藏 33

##序言 最近闲暇无事,阅读了一下tornado的源码,对整体的结构有了初步认识,与大家分享

##ioloop iolooptornado的核心模块,也是个调度模块,各种异步事件都是由他调度的,所以必须弄清他的执行逻辑

###源码分析 而ioloop的核心部分则是 while True这个循环内部的逻辑,贴上他的代码如下

   def start(self):
        if self._running:
            raise RuntimeError("IOLoop is already running")
        self._setup_logging()
        if self._stopped:
            self._stopped = False
            return
        old_current = getattr(IOLoop._current, "instance", None)
        IOLoop._current.instance = self
        self._thread_ident = thread.get_ident()
        self._running = True

        old_wakeup_fd = None
        if hasattr(signal, 'set_wakeup_fd') and os.name == 'posix':
           
            try:
                old_wakeup_fd = signal.set_wakeup_fd(self._waker.write_fileno())
                if old_wakeup_fd != -1:
                   
                    signal.set_wakeup_fd(old_wakeup_fd)
                    old_wakeup_fd = None
            except ValueError:
                
                old_wakeup_fd = None

        try:
            while True:
                
                with self._callback_lock:
                    callbacks = self._callbacks
                    self._callbacks = []

                due_timeouts = []

                if self._timeouts:
                    now = self.time()
                    while self._timeouts:
                        if self._timeouts[0].callback is None:
                        
                            heapq.heappop(self._timeouts)
                            self._cancellations -= 1
                        elif self._timeouts[0].deadline <= now:
                            due_timeouts.append(heapq.heappop(self._timeouts))
                        else:
                            break
                    if (self._cancellations > 512
                            and self._cancellations > (len(self._timeouts) >> 1)):
                        self._cancellations = 0
                        self._timeouts = [x for x in self._timeouts
                                          if x.callback is not None]
                        heapq.heapify(self._timeouts)

                for callback in callbacks:
                    self._run_callback(callback)
                for timeout in due_timeouts:
                    if timeout.callback is not None:
                        self._run_callback(timeout.callback)

                callbacks = callback = due_timeouts = timeout = None

                if self._callbacks:
                   
                    poll_timeout = 0.0
                elif self._timeouts:
                   
                    poll_timeout = self._timeouts[0].deadline - self.time()
                    poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
                else:
                   
                    poll_timeout = _POLL_TIMEOUT

                if not self._running:
                    break

                if self._blocking_signal_threshold is not None:
                   
                    signal.setitimer(signal.ITIMER_REAL, 0, 0)

                try:
                    event_pairs = self._impl.poll(poll_timeout)
                except Exception as e:
                    
                    if errno_from_exception(e) == errno.EINTR:
                        continue
                    else:
                        raise

                if self._blocking_signal_threshold is not None:
                    signal.setitimer(signal.ITIMER_REAL,
                                     self._blocking_signal_threshold, 0)

                self._events.update(event_pairs)
                while self._events:
                    fd, events = self._events.popitem()
                    try:
                        fd_obj, handler_func = self._handlers[fd]
                        handler_func(fd_obj, events)
                    except (OSError, IOError) as e:
                        if errno_from_exception(e) == errno.EPIPE:
                            
                            pass
                        else:
                            self.handle_callback_exception(self._handlers.get(fd))
                    except Exception:
                        self.handle_callback_exception(self._handlers.get(fd))
                fd_obj = handler_func = None

        finally:
        
            self._stopped = False
            if self._blocking_signal_threshold is not None:
                signal.setitimer(signal.ITIMER_REAL, 0, 0)
            IOLoop._current.instance = old_current
            if old_wakeup_fd is not None:
                signal.set_wakeup_fd(old_wakeup_fd)
除去注释,代码其实没多少行. 由while 内部代码可以看出ioloop主要由三部分组成:

###1.回调 callbacks

他是ioloop回调的基础部分,通过IOLoop.instance().add_callback()添加到self._callbacks 他们将在每一次loop中被运行.

主要用途是将逻辑分块,在适合时机将包装好的callback添加到self._callbacks让其执行.

例如ioloop中的add_future

def add_future(self, future, callback):
        """Schedules a callback on the ``IOLoop`` when the given
        `.Future` is finished.

        The callback is invoked with one argument, the
        `.Future`.
        """
        assert is_future(future)
        callback = stack_context.wrap(callback)
        future.add_done_callback(
            lambda future: self.add_callback(callback, future))

future对象得到result的时候会调用future.add_done_callback添加的callback,再将其转至ioloop执行

###2.定时器 due_timeouts

这是定时器,在指定的事件执行callback. 跟1中的callback类似,通过IOLoop.instance().add_callback

在每一次循环,会计算timeouts回调列表里的事件,运行已到期的callback. 当然不是无节操的循环.

因为poll操作会阻塞到有io操作发生,所以只要计算最近的timeout, 然后用这个时间作为self._impl.poll(poll_timeout)poll_timeout , 就可以达到按时运行了

但是,假设poll_timeout的时间很大时,self._impl.poll一直在堵塞中(没有io事件,但在处理某一个io事件), 那添加刚才1中的callback不是要等很久才会被运行吗? 答案当然是不会. ioloop中有个waker对象,他是由两个fd组成,一个读一个写. ioloop在初始化的时候把waker绑定到epoll里了,add_callback时会触发waker的读写. 这样ioloop就会在poll中被唤醒了,接着就可以及时处理timeout callback

用这样的方式也可以自己封装一个小的定时器功能玩玩

###3.io事件的event loop

处理epoll事件的功能 通过IOLoop.instance().add_handler(fd, handler, events)绑定fd event的处理事件 在httpserver.listen的代码内, netutil.py中的netutil.pyadd_accept_handler绑定accept handler处理客户端接入的逻辑

如法炮制,其他的io事件也这样绑定,业务逻辑的分块交由ioloopcallbackfuture处理

关于epoll的用法的内容.详情见我第一篇文章吧,哈哈

###总结 ioloop由callback(业务分块), timeout callback(定时任务) io event(io传输和解析) 三块组成,互相配合完成异步的功能,构建gen,httpclient,iostream等功能

串联大致的流程是,tornado 绑定io event,处理io传输解析,传输完成后(结合Future)回调(callback)业务处理的逻辑和一些固定操作 . 定时器则是较为独立的模块

##Futrue

个人认为Futuretornado仅此ioloop重要的模块,他贯穿全文,所有异步操作都有他的身影 顾名思义,他主要是关注日后要做的事,类似jqueryDeferred

一般的用法是通过ioloopadd_future定义futuredone callback, 当futureset_result的时候,futuredone callback就会被调用. 从而完成Future的功能.

具体可以参考gen.coroutine的实现,本文后面也会讲到

他的组成不复杂,只有几个重要的方法 最重要的是 add_done_callback , set_result

tornadoFutureioloop,yield实现了gen.coroutine

###1. add_done_callback

ioloopcallback类似 , 存储事件完成后的callbackself._callbacks

def add_done_callback(self, fn):
        if self._done:
            fn(self)
        else:
            self._callbacks.append(fn)

###2.set_result

设置事件的结果,并运行之前存储好的callback

def set_result(self, result):
        self._result = result
        self._set_done()

def _set_done(self):
        self._done = True
        for cb in self._callbacks:
            try:
                cb(self)
            except Exception:
                app_log.exception('Exception in callback %r for %r',
                                  cb, self)
        self._callbacks = None

为了验证之前所说的,上一段测试代码

#! /usr/bin/env python
#coding=utf-8

import tornado.web
import tornado.ioloop

from tornado.gen import coroutine
from tornado.concurrent import Future


def test():
	def pp(s):
		print s

	future = Future()
	iol = tornado.ioloop.IOLoop.instance()

	print 'init future %s'%future

	iol.add_future(future, lambda f: pp('ioloop callback after future done,future is %s'%f))

	#模拟io延迟操作
	iol.add_timeout(iol.time()+5,lambda:future.set_result('set future is done'))

	print 'init complete'
	tornado.ioloop.IOLoop.instance().start()

if __name__ == "__main__":
	test()	

运行结果:

输入图片说明

gen.coroutine

接着继续延伸,看看coroutine的实现 gen.coroutine实现的功能其实是将原来的callback的写法,用yield的写法代替. 即以yield为分界,将代码分成两部分. 如:

#! /usr/bin/env python
#coding=utf-8

import tornado.ioloop
from tornado.gen import coroutine
from tornado.httpclient import AsyncHTTPClient

@coroutine
def cotest():
	client = AsyncHTTPClient()
	res = yield client.fetch("http://www.segmentfault.com/")
	print res

if __name__ == "__main__":
	f = cotest()	
	print f #这里返回了一个future哦
	tornado.ioloop.IOLoop.instance().start()

运行结果:

输入图片说明

###源码分析 接下来分析下coroutine的实现

def _make_coroutine_wrapper(func, replace_callback):
    
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        future = TracebackFuture()

        if replace_callback and 'callback' in kwargs:
            callback = kwargs.pop('callback')
            IOLoop.current().add_future(
                future, lambda future: callback(future.result()))

        try:
            result = func(*args, **kwargs)
        except (Return, StopIteration) as e:
            result = getattr(e, 'value', None)
        except Exception:
            future.set_exc_info(sys.exc_info())
            return future
        else:
            if isinstance(result, types.GeneratorType):
                try:
                    orig_stack_contexts = stack_context._state.contexts
                    yielded = next(result)
                    if stack_context._state.contexts is not orig_stack_contexts:
                        yielded = TracebackFuture()
                        yielded.set_exception(
                            stack_context.StackContextInconsistentError(
                                'stack_context inconsistency (probably caused '
                                'by yield within a "with StackContext" block)'))
                except (StopIteration, Return) as e:
                    future.set_result(getattr(e, 'value', None))
                except Exception:
                    future.set_exc_info(sys.exc_info())
                else:
                    Runner(result, future, yielded)
                try:
                    return future
                finally:
                    future = None
        future.set_result(result)
        return future
    return wrapper

如源码所示,func运行的结果是GeneratorType ,yielded = next(result), 运行至原函数的yield位置,返回的是原函数func内部 yield 右边返回的对象(必须是FutureFuturelist)给yielded. 经过Runner(result, future, yielded) 对yielded进行处理. 在此就 贴出Runner的代码了. Runner初始化过程,调用handle_yield, 查看yielded是否已done了,否则add_future运行Runnerrun方法, run方法中如果yielded对象已完成,用对它的gen调用send,发送完成的结果. 所以yielded在什么地方被set_result非常重要, 当被set_result的时候,才会send结果给原func,完成整个异步操作

详情可以查看tornado 中重要的对象 iostream,源码中iostream的 _handle_connect,如此设置了连接的result.

def _handle_connect(self):
        err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
        if err != 0:
            self.error = socket.error(err, os.strerror(err))
            if self._connect_future is None:
                gen_log.warning("Connect error on fd %s: %s",
                                self.socket.fileno(), errno.errorcode[err])
            self.close()
            return
        if self._connect_callback is not None:
            callback = self._connect_callback
            self._connect_callback = None
            self._run_callback(callback)
        if self._connect_future is not None:
            future = self._connect_future
            self._connect_future = None
            future.set_result(self)
        self._connecting = False

最后贴上一个简单的测试代码,演示coroutine,future的用法

import tornado.ioloop
from tornado.gen import coroutine
from tornado.concurrent import Future

@coroutine
def asyn_sum(a, b):
	print("begin calculate:sum %d+%d"%(a,b))
	future = Future()
	future2 = Future()
	iol = tornado.ioloop.IOLoop.instance()
	
	print future
	
	def callback(a, b):
		print("calculating the sum of %d+%d:"%(a,b))
		future.set_result(a+b)

		iol.add_timeout(iol.time()+3,lambda f:f.set_result(None),future2)
	iol.add_timeout(iol.time()+3,callback, a, b)

	result = yield future

	print("after yielded")
	print("the %d+%d=%d"%(a, b, result))

	yield future2

	print 'after future2'

def main():
	f =  asyn_sum(2,3)
	
	print ''
	print f
	tornado.ioloop.IOLoop.instance().start()

if __name__ == "__main__":
	main()

运行结果:

输入图片说明

为什么代码中个yield都起作用了? 因为Runner.run里,最后继续用handle_yield处理了send后返回的yielded对象,意思是func里可以有n干个yield操作

if not self.handle_yield(yielded):
                    return

###总结 至此,已完成tornado中重要的几个模块的流程,其他模块也是由此而来.写了这么多,越写越卡,就到此为止先吧,

##最后的最后的最后

啊~~好想有份工作女朋友~

© 著作权归作者所有

共有 人打赏支持
上一篇: epoll 简单用法
下一篇: docker 初试经验
国夫君
粉丝 7
博文 5
码字总数 7443
作品 0
惠州
私信 提问
加载中

评论(7)

国夫君
国夫君

引用来自“国夫君”的评论

对了,忘了说,0看的源码是tornado (4.2)版本的

引用来自“陈亦”的评论

国夫群,求带

引用来自“国夫君”的评论

girl神又来嘲笑我了 准备再写一篇,貌似这篇讲的有点泛?

引用来自“红白机”的评论

国夫,阿力??
不是阿力
红白机
红白机

引用来自“国夫君”的评论

对了,忘了说,0看的源码是tornado (4.2)版本的

引用来自“陈亦”的评论

国夫群,求带

引用来自“国夫君”的评论

girl神又来嘲笑我了 准备再写一篇,貌似这篇讲的有点泛?
国夫,阿力??
国夫君
国夫君

引用来自“国夫君”的评论

对了,忘了说,0看的源码是tornado (4.2)版本的

引用来自“陈亦”的评论

国夫群,求带
girl神又来嘲笑我了 准备再写一篇,貌似这篇讲的有点泛?
陈亦
陈亦

引用来自“国夫君”的评论

对了,忘了说,0看的源码是tornado (4.2)版本的
国夫君,求带
陈亦
陈亦

引用来自“国夫君”的评论

对了,忘了说,0看的源码是tornado (4.2)版本的
国夫群,求带
Tony_zhangl
Tony_zhangl
000
国夫君
国夫君
对了,忘了说,0看的源码是tornado (4.2)版本的
【Python Web学习路线】干货整理,不谈虚的,让你短时间高效学好它!

无论是大数据、人工智能还是机器学习,Python都是最热门的首选语言。 学好Python,可以从事Python Web工程师、Python数据分析、人工智能专家等岗位。本期专题,分享的主题是“如何成为一枚优...

Eddie_yang
2018/12/24
0
0
为什么要阅读Tornado的源代码

为什么要阅读Tornado的源代码 Tornado由前google员工开发, 代码非常精练, 实现也很轻巧, 加上清晰的注释和丰富的demo, 我们可以很容易的阅读分析tornado. 通过阅读Tornado的源码, 你将学到:...

山下狮子
2014/06/05
0
0
tornado常见的异步非堵塞写法

非堵塞和异步有什么区别? 非堵塞 在tornado的框架中非堵塞一般指得是网络I/O层面的socket数据接收模式(select或者epoll),不论用哪个模式,最终程序都会收到数据并处理数据(这个数据要么被...

极光火狐狸
2016/07/21
1K
2
Tornado学习笔记(二)

我一直用python2.x,python2.x内置的字符编码方式是unicode,这就对中文的处理造成了一些困扰,尤其是在用tornado写json接口的时候,如果不做处理,出来的没有中文,都是x4d5f之类的东西。所...

Slaytanic
2015/07/13
0
0
[原]tornado源码分析系列(一)[tornado简介]

引言: tornado是由Facebook开源的一个服务器“套装",适合于做python的web或者使用其本身提供的可扩展的功能,完成了不完整的wsgi协议,可用于做快速的web开发,封装了epoll性能较好。文章主...

长平狐
2012/11/14
845
0

没有更多内容

加载失败,请刷新页面

加载更多

取变量的地址赋值给另一个变量,C通过,C++编译出错

取变量的地址赋值给另一个变量,C通过。正常运行,C++编译出错。 代码如下: #include <stdio.h>int main(int argc, char *argv[]){int x = 3;int *p = &x;int y = p;/*c ...

SamXIAO
今天
1
0
利用隐写术实施攻击

尽管隐写术是一种低频攻击途径,但网络犯罪分子已经开始利用它结合社交媒体的普遍性和快速传播性来传递恶意有效负载。 低调但有效的隐写技术虽然是旧把戏,但将代码隐藏在看似正常的图像中,...

Linux就该这么学
今天
4
0
YII2的乐观锁和悲观锁

乐观锁与悲观锁¶ Web应用往往面临多用户环境,这种情况下的并发写入控制, 几乎成为每个开发人员都必须掌握的一项技能。 在并发环境下,有可能会出现脏读(Dirty Read)、不可重复读(Unrep...

echojson
今天
2
0
UCOS线程切换原理

黑客画家
今天
3
0
最牛Java架构师进阶路线(年薪80W)

1、源码分析专题 详细介绍源码中所用到的经典设计思想,看看大牛是如何写代码的,提升技术审美、提高核心竞争力。 帮助大家寻找分析源码的切入点,在思想上来一次巨大的升华。知其然,并知其...

别打我会飞
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部