文档章节

tornado.gen 源码阅读

invictus_lee
 invictus_lee
发布于 2014/03/26 15:34
字数 1569
阅读 590
收藏 0
<pre class="brush:python;toolbar: true; auto-links: false;">&#39;&#39;&#39;
import tornado.gen
import time
import pdb 
import tornado.httpclient

@tornado.gen.engine
#@tornado.gen.coroutine
def genFun():
    http_client = tornado.httpclient.AsyncHTTPClient()
   # http_client.fetch(&#39;www.google.com&#39;, callback = (yield tornado.gen.Callback(&#39;key&#39;)))
   # ret =yield tornado.gen.Wait(&#39;key&#39;)
    ret = yield tornado.gen.Task(http_client.fetch,&#39;www.google.com&#39;)                                                                                                                                 
    print ret 
genFun()
tornado.ioloop.IOLoop.instance().start() 
&#39;&#39;&#39;
&#39;&#39;&#39;
当调用getFun()时实际是getFun返回engine.wrapper,然后是wrapper()
wrapper并不是gen包的关键,Runner,YieldPoint 才是最重要的

&#39;&#39;&#39;
from __future__ import absolute_import, division, print_function, with_statement

import collections
import functools
import itertools
import sys
import types

from tornado.concurrent import Future, TracebackFuture
from tornado.ioloop import IOLoop
from tornado.stack_context import ExceptionStackContext, wrap
class KeyReuseError(Exception):
    pass


class UnknownKeyError(Exception):
    pass


class LeakedCallbackError(Exception):
    pass


class BadYieldError(Exception):
    pass


class ReturnValueIgnoredError(Exception):
    pass


def engine(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        runner = None

        def handle_exception(typ, value, tb):
            # if the function throws an exception before its first &quot;yield&quot;
            # (or is not a generator at all), the Runner won&#39;t exist yet.
            # However, in that case we haven&#39;t reached anything asynchronous
            # yet, so we can just let the exception propagate.
            if runner is not None:
                return runner.handle_exception(typ, value, tb)
            return False
        with ExceptionStackContext(handle_exception) as deactivate:
            try:
            	&#39;&#39;&#39;
                获取生成器,也就是上面的genFun,然后生成Runner对象,调用其run方法。

            	&#39;&#39;&#39;
                result = func(*args, **kwargs)  
            except (Return, StopIteration) as e:
                result = getattr(e, &#39;value&#39;, None)
            else:
                if isinstance(result, types.GeneratorType):
                    def final_callback(value):
                        if value is not None:
                            raise ReturnValueIgnoredError(
                                &quot;@gen.engine functions cannot return values: &quot;
                                &quot;%r&quot; % (value,))
                        assert value is None
                        deactivate()
                    runner = Runner(result, final_callback)
                    runner.run()
                    return
            if result is not None:
                raise ReturnValueIgnoredError(
                    &quot;@gen.engine functions cannot return values: %r&quot; %
                    (result,))
            deactivate()
            # no yield, so we&#39;re done
    return wrapper


def coroutine(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        runner = None
        future = TracebackFuture()

        if &#39;callback&#39; in kwargs:
            callback = kwargs.pop(&#39;callback&#39;)
            IOLoop.current().add_future(
                future, lambda future: callback(future.result()))

        def handle_exception(typ, value, tb):
            try:
                if runner is not None and runner.handle_exception(typ, value, tb):
                    return True
            except Exception:
                typ, value, tb = sys.exc_info()
            future.set_exc_info((typ, value, tb))
            return True
        with ExceptionStackContext(handle_exception) as deactivate:
            try:
                result = func(*args, **kwargs)
            except (Return, StopIteration) as e:
                result = getattr(e, &#39;value&#39;, None)
            except Exception:
                deactivate()
                future.set_exc_info(sys.exc_info())
                return future
            else:
                if isinstance(result, types.GeneratorType):
                    def final_callback(value):
                        deactivate()
                        future.set_result(value)
                    runner = Runner(result, final_callback)
                    runner.run()
                    return future
            deactivate()
            future.set_result(result)
        return future
    return wrapper


class Return(Exception):
    def __init__(self, value=None):
        super(Return, self).__init__()
        self.value = value
class YieldPoint(object):
    def start(self, runner):
        raise NotImplementedError()

    def is_ready(self):
        raise NotImplementedError()

    def get_result(self):
        raise NotImplementedError()

class Callback(YieldPoint):
    def __init__(self, key):
        self.key = key

    def start(self, runner):
        self.runner = runner
        runner.register_callback(self.key)

    def is_ready(self):
        return True

    def get_result(self):
        return self.runner.result_callback(self.key)


class Wait(YieldPoint):
    def __init__(self, key):
        self.key = key

    def start(self, runner):
        self.runner = runner

    def is_ready(self):
        return self.runner.is_ready(self.key)

    def get_result(self):
        return self.runner.pop_result(self.key)


class WaitAll(YieldPoint):
    def __init__(self, keys):
        self.keys = keys

    def start(self, runner):
        self.runner = runner

    def is_ready(self):
        return all(self.runner.is_ready(key) for key in self.keys)

    def get_result(self):
        return [self.runner.pop_result(key) for key in self.keys]


class Task(YieldPoint):
    def __init__(self, func, *args, **kwargs):
        assert &quot;callback&quot; not in kwargs
        self.args = args
        self.kwargs = kwargs
        self.func = func

    def start(self, runner):
        self.runner = runner
        self.key = object()
        runner.register_callback(self.key)
        self.kwargs[&quot;callback&quot;] = runner.result_callback(self.key)
        self.func(*self.args, **self.kwargs)

    def is_ready(self):
        return self.runner.is_ready(self.key)

    def get_result(self):
        return self.runner.pop_result(self.key)


class YieldFuture(YieldPoint):
    def __init__(self, future, io_loop=None):
        self.future = future
        self.io_loop = io_loop or IOLoop.current()

    def start(self, runner):
        if not self.future.done():
            self.runner = runner
            self.key = object()
            runner.register_callback(self.key)
            self.io_loop.add_future(self.future, runner.result_callback(self.key))
        else:
            self.runner = None
            self.result = self.future.result()

    def is_ready(self):
        if self.runner is not None:
            return self.runner.is_ready(self.key)
        else:
            return True

    def get_result(self):
        if self.runner is not None:
            return self.runner.pop_result(self.key).result()
        else:
            return self.result


class Multi(YieldPoint):
    def __init__(self, children):
        self.keys = None
        if isinstance(children, dict):
            self.keys = list(children.keys())
            children = children.values()
        self.children = []
        for i in children:
            if isinstance(i, Future):
                i = YieldFuture(i)
            self.children.append(i)
        assert all(isinstance(i, YieldPoint) for i in self.children)
        self.unfinished_children = set(self.children)

    def start(self, runner):
        for i in self.children:
            i.start(runner)

    def is_ready(self):
        finished = list(itertools.takewhile(
            lambda i: i.is_ready(), self.unfinished_children))
        self.unfinished_children.difference_update(finished)
        return not self.unfinished_children

    def get_result(self):
        result = (i.get_result() for i in self.children)
        if self.keys is not None:
            return dict(zip(self.keys, result))
        else:
            return list(result)


class _NullYieldPoint(YieldPoint):
    def start(self, runner):
        pass

    def is_ready(self):
        return True

    def get_result(self):
        return None


_null_yield_point = _NullYieldPoint()


class Runner(object):
    def __init__(self, gen, final_callback):
        self.gen = gen
        self.final_callback = final_callback
        self.yield_point = _null_yield_point
        self.pending_callbacks = set()
        self.results = {}
        self.running = False
        self.finished = False
        self.exc_info = None
        self.had_exception = False

    def register_callback(self, key):
        &quot;&quot;&quot;Adds ``key`` to the list of callbacks.&quot;&quot;&quot;
        if key in self.pending_callbacks:
            raise KeyReuseError(&quot;key %r is already pending&quot; % (key,))
        self.pending_callbacks.add(key)

    def is_ready(self, key):
        &quot;&quot;&quot;Returns true if a result is available for ``key``.&quot;&quot;&quot;
        if key not in self.pending_callbacks:
            raise UnknownKeyError(&quot;key %r is not pending&quot; % (key,))
        return key in self.results     #key的结果是在results中

    def set_result(self, key, result):
        &quot;&quot;&quot;Sets the result for ``key`` and attempts to resume the generator.&quot;&quot;&quot;
        self.results[key] = result
        self.run()

    def pop_result(self, key):
        &quot;&quot;&quot;Returns the result for ``key`` and unregisters it.&quot;&quot;&quot;
        self.pending_callbacks.remove(key)
        return self.results.pop(key)

    def run(self):
        if self.running or self.finished:
            return
        try:
            self.running = True
            while True:
                if self.exc_info is None:
                    try:
                        if not self.yield_point.is_ready():    &#39;&#39;&#39; 初始的yield_point 为_NullYieldPoint,因此while的第一次循环这里总是成立,&#39;&#39;&#39; 
                            return
                        next = self.yield_point.get_result()   #get_result 返回None ,用None 来激活 生成器的y。
                        self.yield_point = None
                    except Exception:
                        self.exc_info = sys.exc_info()
                try:
                    if self.exc_info is not None:
                        self.had_exception = True
                        exc_info = self.exc_info
                        self.exc_info = None
                        yielded = self.gen.throw(*exc_info)
                    else:
                        yielded = self.gen.send(next)  #这里是激活生成器,获得生成器中yield返回的对象,按照上面给的例子这里返回的是Task的对象。
                except (StopIteration, Return) as e:
                    self.finished = True
                    self.yield_point = _null_yield_point
                    if self.pending_callbacks and not self.had_exception:
                        # If we ran cleanly without waiting on all callbacks
                        # raise an error (really more of a warning).  If we
                        # had an exception then some callbacks may have been
                        # orphaned, so skip the check in that case.
                        raise LeakedCallbackError(
                            &quot;finished without waiting for callbacks %r&quot; %
                            self.pending_callbacks)
                    self.final_callback(getattr(e, &#39;value&#39;, None))
                    self.final_callback = None
                    return
                except Exception:
                    self.finished = True
                    self.yield_point = _null_yield_point
                    raise
                if isinstance(yielded, (list, dict)):
                    yielded = Multi(yielded)
                elif isinstance(yielded, Future):
                    yielded = YieldFuture(yielded)
                if isinstance(yielded, YieldPoint):
                    self.yield_point = yielded
                    try:
                        self.yield_point.start(self)  #调用YieldPoint的start方法。执行真正的工作。
                    except Exception:
                        self.exc_info = sys.exc_info()
                else:
                    self.exc_info = (BadYieldError(
                        &quot;yielded unknown object %r&quot; % (yielded,)),)
        finally:
            self.running = False

    def result_callback(self, key):
        def inner(*args, **kwargs):
            if kwargs or len(args) &gt; 1:
                result = Arguments(args, kwargs)
            elif args:
                result = args[0]
            else:
                result = None
            self.set_result(key, result)
        return wrap(inner)

    def handle_exception(self, typ, value, tb):
        if not self.running and not self.finished:
            self.exc_info = (typ, value, tb)
            self.run()
            return True
        else:
            return False

Arguments = collections.namedtuple(&#39;Arguments&#39;, [&#39;args&#39;, &#39;kwargs&#39;])

&#39;&#39;&#39;
整个核心的部分应该是Runner,还YieldPoint.
在Runner.run 方法中,构造出相应的YieldPoint子类的对象,yieldpoint的start方法中, 

def start(self, runner):
        self.runner = runner
        self.key = object()
        runner.register_callback(self.key)  -----------------------------&gt;  runner.pending_callbacks.add(key)
        self.kwargs[&quot;callback&quot;] = runner.result_callback(self.key)--------------------------&gt;生成一个callback函数,然后将callback传给真正的工作函数func,也就是开始例子中的fetch。如果返回了,就调用runer.set_result()
        self.func(*self.args, **self.kwargs)

 
class  Runner:       
  def result_callback(self, key):
        def inner(*args, **kwargs):
            if kwargs or len(args) &gt; 1:
                result = Arguments(args, kwargs)
            elif args:
                result = args[0]
            else:
                result = None
            self.set_result(key, result)
        return wrap(inner)


执行完成以上操作后然后接着执行while的第二次循环,当验证 if not self.yield_point.is_ready(): 
如果返回true,run方法结束。
      yield_point 的func执行完毕,调用runer.set_result()。runner.run(),进入while循环缺德取的结果next,send给genFun的yield,返回结果,进入下一次while循环,验证if not self.yield_point.is_ready()为True,结束循环。

如果是false,则表明已经拿到了最终的结果,send给genFun的yield,返回结果,进入下一次while循环,验证if not self.yield_point.is_ready()为True,结束循环。
&#39;&#39;&#39;</pre>
<p>
    <br/>
</p>


© 著作权归作者所有

上一篇: python 代码赏析
下一篇: 在说我自己
invictus_lee
粉丝 1
博文 141
码字总数 21423
作品 2
徐汇
程序员
私信 提问
Web服务器--Tornado

Tornado是使用Python开发的全栈式(full-stack)Web框架和异步网络库,最早由Friendfeed开发。通过使用非阻塞IO,Tornado可以处理数以万计的开放连接,是long polling、WebSockets和其他需要...

匿名
2008/09/10
68.6K
10
Tornado 4.2 发布,Python Web 服务器

Tornado 4.2 发布, 更新内容如下: 向后兼容改进: SSLIOStream.connectand IOStream.start_tls now validate certificates by default. Certificate validation will now use the system C......

sikkx
2015/05/28
3.2K
5
撸码工匠/vue-admin-block

Vuejs Admin Block 特性 基于 Vue, iView, Axios, Mock 企业级后台管理系统最佳实践。 技术栈 Axios @0.x iView @2.x Mockjs @1.x Vue @2.5.x Vue-Router @3.x Vuex @3.x ES6 SCSS 开发构建 ......

撸码工匠
2018/01/14
0
0
令人生畏的源码,到底该怎样看?

一个软件开发人员,工作到了一定的年限(一般是3、4年左右),如果他还没学会阅读源码,那么他就会遇到瓶颈。因为到了这个时候的开发,他应该不仅仅只会做那些 CURD 的业务逻辑,而应该会根据...

技术小能手
2018/09/05
0
0
【超实用】面对枯燥的源码,如何才能坚持看下去?

一个软件开发人员,工作到了一定的年限(一般是3、4年左右),如果他还没学会阅读源码,那么他就会遇到瓶颈。因为到了这个时候的开发,他应该不仅仅只会做那些 CURD 的业务逻辑,而应该会根据...

陈树义
2018/08/28
0
0

没有更多内容

加载失败,请刷新页面

加载更多

EDI 电子数据交换全解指南

EDI(Electronic Data Interchange,电子数据交换)技术使得企业与企业(B2B)实现通信自动化,帮助交易伙伴和组织更快更好地完成更多工作,并消除了人工操作带来的错误。从零售商到制造商、物...

EDI知行软件
58分钟前
3
0
CentOS7的LVM动态扩容

# 问题 CentOS7上面的磁盘空间有点紧张,需要扩容。 解决 查询当前磁盘状态 [root@xxx ~]# lsblkNAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINTfd0 2:0 1 4K ...

亚林瓜子
今天
3
0
Kafka 0.8 Producer (0.9以前版本适用)

Kafka旧版本producer由scala编写,0.9以后已经废除 示例代码如下: import kafka.producer.KeyedMessage;import kafka.javaapi.producer.Producer;import kafka.producer.ProducerConfig;......

实时计算
今天
4
0
Giraph源码分析(八)—— 统计每个SuperStep中参与计算的顶点数目

作者|白松 目的:科研中,需要分析在每次迭代过程中参与计算的顶点数目,来进一步优化系统。比如,在SSSP的compute()方法最后一行,都会把当前顶点voteToHalt,即变为InActive状态。所以每次...

数澜科技
今天
6
0
Navicat 快捷键

操作 结果 ctrl+q 打开查询窗口 ctrl+/ 注释sql语句 ctrl+shift +/ 解除注释 ctrl+r 运行查询窗口的sql语句 ctrl+shift+r 只运行选中的sql语句 F6 打开一个mysql命令行窗口 ctrl+l 删除一行 ...

低至一折起
今天
13
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部