tornado.gen 源码阅读

原创
2014/03/26 15:34
阅读数 590
<pre class="brush:python;toolbar: true; auto-links: false;">&#39;&#39;&#39;
import tornado.gen
import time
import pdb 
import tornado.httpclient

@tornado.gen.engine
#@tornado.gen.coroutine
def genFun():
    http_client = tornado.httpclient.AsyncHTTPClient()
   # http_client.fetch(&#39;www.google.com&#39;, callback = (yield tornado.gen.Callback(&#39;key&#39;)))
   # ret =yield tornado.gen.Wait(&#39;key&#39;)
    ret = yield tornado.gen.Task(http_client.fetch,&#39;www.google.com&#39;)                                                                                                                                 
    print ret 
genFun()
tornado.ioloop.IOLoop.instance().start() 
&#39;&#39;&#39;
&#39;&#39;&#39;
当调用getFun()时实际是getFun返回engine.wrapper,然后是wrapper()
wrapper并不是gen包的关键,Runner,YieldPoint 才是最重要的

&#39;&#39;&#39;
from __future__ import absolute_import, division, print_function, with_statement

import collections
import functools
import itertools
import sys
import types

from tornado.concurrent import Future, TracebackFuture
from tornado.ioloop import IOLoop
from tornado.stack_context import ExceptionStackContext, wrap
class KeyReuseError(Exception):
    pass


class UnknownKeyError(Exception):
    pass


class LeakedCallbackError(Exception):
    pass


class BadYieldError(Exception):
    pass


class ReturnValueIgnoredError(Exception):
    pass


def engine(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        runner = None

        def handle_exception(typ, value, tb):
            # if the function throws an exception before its first &quot;yield&quot;
            # (or is not a generator at all), the Runner won&#39;t exist yet.
            # However, in that case we haven&#39;t reached anything asynchronous
            # yet, so we can just let the exception propagate.
            if runner is not None:
                return runner.handle_exception(typ, value, tb)
            return False
        with ExceptionStackContext(handle_exception) as deactivate:
            try:
            	&#39;&#39;&#39;
                获取生成器,也就是上面的genFun,然后生成Runner对象,调用其run方法。

            	&#39;&#39;&#39;
                result = func(*args, **kwargs)  
            except (Return, StopIteration) as e:
                result = getattr(e, &#39;value&#39;, None)
            else:
                if isinstance(result, types.GeneratorType):
                    def final_callback(value):
                        if value is not None:
                            raise ReturnValueIgnoredError(
                                &quot;@gen.engine functions cannot return values: &quot;
                                &quot;%r&quot; % (value,))
                        assert value is None
                        deactivate()
                    runner = Runner(result, final_callback)
                    runner.run()
                    return
            if result is not None:
                raise ReturnValueIgnoredError(
                    &quot;@gen.engine functions cannot return values: %r&quot; %
                    (result,))
            deactivate()
            # no yield, so we&#39;re done
    return wrapper


def coroutine(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        runner = None
        future = TracebackFuture()

        if &#39;callback&#39; in kwargs:
            callback = kwargs.pop(&#39;callback&#39;)
            IOLoop.current().add_future(
                future, lambda future: callback(future.result()))

        def handle_exception(typ, value, tb):
            try:
                if runner is not None and runner.handle_exception(typ, value, tb):
                    return True
            except Exception:
                typ, value, tb = sys.exc_info()
            future.set_exc_info((typ, value, tb))
            return True
        with ExceptionStackContext(handle_exception) as deactivate:
            try:
                result = func(*args, **kwargs)
            except (Return, StopIteration) as e:
                result = getattr(e, &#39;value&#39;, None)
            except Exception:
                deactivate()
                future.set_exc_info(sys.exc_info())
                return future
            else:
                if isinstance(result, types.GeneratorType):
                    def final_callback(value):
                        deactivate()
                        future.set_result(value)
                    runner = Runner(result, final_callback)
                    runner.run()
                    return future
            deactivate()
            future.set_result(result)
        return future
    return wrapper


class Return(Exception):
    def __init__(self, value=None):
        super(Return, self).__init__()
        self.value = value
class YieldPoint(object):
    def start(self, runner):
        raise NotImplementedError()

    def is_ready(self):
        raise NotImplementedError()

    def get_result(self):
        raise NotImplementedError()

class Callback(YieldPoint):
    def __init__(self, key):
        self.key = key

    def start(self, runner):
        self.runner = runner
        runner.register_callback(self.key)

    def is_ready(self):
        return True

    def get_result(self):
        return self.runner.result_callback(self.key)


class Wait(YieldPoint):
    def __init__(self, key):
        self.key = key

    def start(self, runner):
        self.runner = runner

    def is_ready(self):
        return self.runner.is_ready(self.key)

    def get_result(self):
        return self.runner.pop_result(self.key)


class WaitAll(YieldPoint):
    def __init__(self, keys):
        self.keys = keys

    def start(self, runner):
        self.runner = runner

    def is_ready(self):
        return all(self.runner.is_ready(key) for key in self.keys)

    def get_result(self):
        return [self.runner.pop_result(key) for key in self.keys]


class Task(YieldPoint):
    def __init__(self, func, *args, **kwargs):
        assert &quot;callback&quot; not in kwargs
        self.args = args
        self.kwargs = kwargs
        self.func = func

    def start(self, runner):
        self.runner = runner
        self.key = object()
        runner.register_callback(self.key)
        self.kwargs[&quot;callback&quot;] = runner.result_callback(self.key)
        self.func(*self.args, **self.kwargs)

    def is_ready(self):
        return self.runner.is_ready(self.key)

    def get_result(self):
        return self.runner.pop_result(self.key)


class YieldFuture(YieldPoint):
    def __init__(self, future, io_loop=None):
        self.future = future
        self.io_loop = io_loop or IOLoop.current()

    def start(self, runner):
        if not self.future.done():
            self.runner = runner
            self.key = object()
            runner.register_callback(self.key)
            self.io_loop.add_future(self.future, runner.result_callback(self.key))
        else:
            self.runner = None
            self.result = self.future.result()

    def is_ready(self):
        if self.runner is not None:
            return self.runner.is_ready(self.key)
        else:
            return True

    def get_result(self):
        if self.runner is not None:
            return self.runner.pop_result(self.key).result()
        else:
            return self.result


class Multi(YieldPoint):
    def __init__(self, children):
        self.keys = None
        if isinstance(children, dict):
            self.keys = list(children.keys())
            children = children.values()
        self.children = []
        for i in children:
            if isinstance(i, Future):
                i = YieldFuture(i)
            self.children.append(i)
        assert all(isinstance(i, YieldPoint) for i in self.children)
        self.unfinished_children = set(self.children)

    def start(self, runner):
        for i in self.children:
            i.start(runner)

    def is_ready(self):
        finished = list(itertools.takewhile(
            lambda i: i.is_ready(), self.unfinished_children))
        self.unfinished_children.difference_update(finished)
        return not self.unfinished_children

    def get_result(self):
        result = (i.get_result() for i in self.children)
        if self.keys is not None:
            return dict(zip(self.keys, result))
        else:
            return list(result)


class _NullYieldPoint(YieldPoint):
    def start(self, runner):
        pass

    def is_ready(self):
        return True

    def get_result(self):
        return None


_null_yield_point = _NullYieldPoint()


class Runner(object):
    def __init__(self, gen, final_callback):
        self.gen = gen
        self.final_callback = final_callback
        self.yield_point = _null_yield_point
        self.pending_callbacks = set()
        self.results = {}
        self.running = False
        self.finished = False
        self.exc_info = None
        self.had_exception = False

    def register_callback(self, key):
        &quot;&quot;&quot;Adds ``key`` to the list of callbacks.&quot;&quot;&quot;
        if key in self.pending_callbacks:
            raise KeyReuseError(&quot;key %r is already pending&quot; % (key,))
        self.pending_callbacks.add(key)

    def is_ready(self, key):
        &quot;&quot;&quot;Returns true if a result is available for ``key``.&quot;&quot;&quot;
        if key not in self.pending_callbacks:
            raise UnknownKeyError(&quot;key %r is not pending&quot; % (key,))
        return key in self.results     #key的结果是在results中

    def set_result(self, key, result):
        &quot;&quot;&quot;Sets the result for ``key`` and attempts to resume the generator.&quot;&quot;&quot;
        self.results[key] = result
        self.run()

    def pop_result(self, key):
        &quot;&quot;&quot;Returns the result for ``key`` and unregisters it.&quot;&quot;&quot;
        self.pending_callbacks.remove(key)
        return self.results.pop(key)

    def run(self):
        if self.running or self.finished:
            return
        try:
            self.running = True
            while True:
                if self.exc_info is None:
                    try:
                        if not self.yield_point.is_ready():    &#39;&#39;&#39; 初始的yield_point 为_NullYieldPoint,因此while的第一次循环这里总是成立,&#39;&#39;&#39; 
                            return
                        next = self.yield_point.get_result()   #get_result 返回None ,用None 来激活 生成器的y。
                        self.yield_point = None
                    except Exception:
                        self.exc_info = sys.exc_info()
                try:
                    if self.exc_info is not None:
                        self.had_exception = True
                        exc_info = self.exc_info
                        self.exc_info = None
                        yielded = self.gen.throw(*exc_info)
                    else:
                        yielded = self.gen.send(next)  #这里是激活生成器,获得生成器中yield返回的对象,按照上面给的例子这里返回的是Task的对象。
                except (StopIteration, Return) as e:
                    self.finished = True
                    self.yield_point = _null_yield_point
                    if self.pending_callbacks and not self.had_exception:
                        # If we ran cleanly without waiting on all callbacks
                        # raise an error (really more of a warning).  If we
                        # had an exception then some callbacks may have been
                        # orphaned, so skip the check in that case.
                        raise LeakedCallbackError(
                            &quot;finished without waiting for callbacks %r&quot; %
                            self.pending_callbacks)
                    self.final_callback(getattr(e, &#39;value&#39;, None))
                    self.final_callback = None
                    return
                except Exception:
                    self.finished = True
                    self.yield_point = _null_yield_point
                    raise
                if isinstance(yielded, (list, dict)):
                    yielded = Multi(yielded)
                elif isinstance(yielded, Future):
                    yielded = YieldFuture(yielded)
                if isinstance(yielded, YieldPoint):
                    self.yield_point = yielded
                    try:
                        self.yield_point.start(self)  #调用YieldPoint的start方法。执行真正的工作。
                    except Exception:
                        self.exc_info = sys.exc_info()
                else:
                    self.exc_info = (BadYieldError(
                        &quot;yielded unknown object %r&quot; % (yielded,)),)
        finally:
            self.running = False

    def result_callback(self, key):
        def inner(*args, **kwargs):
            if kwargs or len(args) &gt; 1:
                result = Arguments(args, kwargs)
            elif args:
                result = args[0]
            else:
                result = None
            self.set_result(key, result)
        return wrap(inner)

    def handle_exception(self, typ, value, tb):
        if not self.running and not self.finished:
            self.exc_info = (typ, value, tb)
            self.run()
            return True
        else:
            return False

Arguments = collections.namedtuple(&#39;Arguments&#39;, [&#39;args&#39;, &#39;kwargs&#39;])

&#39;&#39;&#39;
整个核心的部分应该是Runner,还YieldPoint.
在Runner.run 方法中,构造出相应的YieldPoint子类的对象,yieldpoint的start方法中, 

def start(self, runner):
        self.runner = runner
        self.key = object()
        runner.register_callback(self.key)  -----------------------------&gt;  runner.pending_callbacks.add(key)
        self.kwargs[&quot;callback&quot;] = runner.result_callback(self.key)--------------------------&gt;生成一个callback函数,然后将callback传给真正的工作函数func,也就是开始例子中的fetch。如果返回了,就调用runer.set_result()
        self.func(*self.args, **self.kwargs)

 
class  Runner:       
  def result_callback(self, key):
        def inner(*args, **kwargs):
            if kwargs or len(args) &gt; 1:
                result = Arguments(args, kwargs)
            elif args:
                result = args[0]
            else:
                result = None
            self.set_result(key, result)
        return wrap(inner)


执行完成以上操作后然后接着执行while的第二次循环,当验证 if not self.yield_point.is_ready(): 
如果返回true,run方法结束。
      yield_point 的func执行完毕,调用runer.set_result()。runner.run(),进入while循环缺德取的结果next,send给genFun的yield,返回结果,进入下一次while循环,验证if not self.yield_point.is_ready()为True,结束循环。

如果是false,则表明已经拿到了最终的结果,send给genFun的yield,返回结果,进入下一次while循环,验证if not self.yield_point.is_ready()为True,结束循环。
&#39;&#39;&#39;</pre>
<p>
    <br/>
</p>


展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部