python tornado协程调度原理示例解析

 更新时间:2023年09月08日 09:34:43   作者:菜皮日记  
这篇文章主要为大家介绍了python tornado协程调度原理示例解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

tornado 的协程实现原理

本文讨论 tornado 的协程实现原理,简单做了一份笔记。

首先看一段最常见的 tornado web 代码:

import tornado
import tornado.web
import tornado.gen
from tornado.gen import coroutine
from tornado.httpclient import AsyncHTTPClient
class GenHandler(tornado.web.RequestHandler):
    @coroutine
    def get(self):
        url = 'http://www.baidu.com'
        http_client = AsyncHTTPClient()
        response = yield http_client.fetch(url)
        yield tornado.gen.sleep(5)
        self.write(response.body)
class MainHanler(tornado.web.RequestHandler):
    def get(self):
        self.write('root')
if __name__ == "__main__":
    application = tornado.web.Application([
        (r"/", MainHanler),
        (r"/gen_async/", GenHandler),
    ], autoreload=True)
    application.listen(8888)
    tornado.ioloop.IOLoop.current().start()

其中最后一行代码 tornado.ioloop.IOLoop.current().start() 启动服务。

带着几个问题往下看:

  • 知道 yield 可以暂存执行状态,等「合适的时机」重新恢复执行,那么保存的状态到哪去了?
  • 上一个问题中「合适的时机」是到底是什么时候?
  • 继续接上一个问题,具体是怎么恢复执行的?

IOLoop 类相当于是对多路复用的封装,起到事件循环的作用,调度整个协程执行过程。

查看 IOLoop 的源码,可以看到 IOLoop 继承自 Configurable,PollIOLoop 又继承自 IOLoop。当 IOLoop 启动时,会确定使用哪一种多路复用方式,epoll、kqueue 还是 select?

# IOLoop 类
# IOLoop 中的 configurable_default 方法是重写 Configurable 的
# 这里会确定使用哪种多路复用方式
@classmethod
def configurable_default(cls):
    if hasattr(select, "epoll"):
        from tornado.platform.epoll import EPollIOLoop
        return EPollIOLoop
    if hasattr(select, "kqueue"):
        # Python 2.6+ on BSD or Mac
        from tornado.platform.kqueue import KQueueIOLoop
      return KQueueIOLoop
    from tornado.platform.select import SelectIOLoop
  return SelectIOLoop
# PollIOLoop类
def initialize(self, impl, time_func=None, **kwargs):
    super(PollIOLoop, self).initialize(**kwargs)
    self._impl = impl
    if hasattr(self._impl, 'fileno'):
        set_close_exec(self._impl.fileno())
    self.time_func = time_func or time.time
    self._handlers = {}
    self._events = {}
    self._callbacks = []
    self._callback_lock = threading.Lock()
    self._timeouts = []
    self._cancellations = 0
    self._running = False
    self._stopped = False
    self._closing = False
    self._thread_ident = None
    self._blocking_signal_threshold = None
    self._timeout_counter = itertools.count()

    # Create a pipe that we send bogus data to when we want to wake
    # the I/O loop when it is idle
    self._waker = Waker()
    self.add_handler(self._waker.fileno(),
                     lambda fd, events: self._waker.consume(),
                     self.READ)

def add_handler(self, fd, handler, events):
    fd, obj = self.split_fd(fd)
    self._handlers[fd] = (obj, stack_context.wrap(handler))
    self._impl.register(fd, events | self.ERROR)

def update_handler(self, fd, events):
    fd, obj = self.split_fd(fd)
    self._impl.modify(fd, events | self.ERROR)

def remove_handler(self, fd):
    fd, obj = self.split_fd(fd)
    self._handlers.pop(fd, None)
    self._events.pop(fd, None)
    try:
        self._impl.unregister(fd)
    except Exception:
        gen_log.debug("Error deleting fd from IOLoop", exc_info=True)

PollIOLoop 中 initalize 方法中调用 add_handler 方法,注册对应事件的处理函数,如 socket 可读时,回调哪个函数去处理。

IOLoop 和协程之间的信使:Future

class Future(object):
    def __init__(self):
        self._result = None
        self._exc_info = None
        self._callbacks = []
        self.running = True
        
    def set_result(self, result):
        ...
        
    def set_exc_info(self, exce_info):
        ...
        
    def result(self):
        ...
    
    def exc_info(self):
        ...
        
    def add_done_callback(self, callback):
        self._callbacks.append(callback)

Future 对象起到“占位符”的作用,协程的执行结果会通过 set_result 方式写入其中,并调用通过 add_done_callback 设置的回调。

恢复唤醒协程的 Runner

class Runner(object):
    def __init__(self, gen, result_future, first_yielded):
        self.gen = gen
        self.result_future = result_future
        self.future = _null_future
        self.yield_point = None
        self.pending_callbacks = None
        self.results = None
        self.running = False
        self.finished = False
        self.had_exception = False
        self.io_loop = IOLoop.current()
        self.stack_context_deactivate = None
        # 上面一堆不需要看的初始化
        if self.handle_yield(first_yielded):
            gen = result_future = first_yielded = None
            self.run()
     
    
    def handle_yield(self, yielded):

        self.future = convert_yielded(yielded)

        if self.future is moment:
            self.io_loop.add_callback(self.run)
            return False
        elif not self.future.done():
            def inner(f):
                # Break a reference cycle to speed GC.
                f = None
                self.run()
            self.io_loop.add_future(
                self.future, inner)
            return False
        return True
    
    def run(self):
        if self.running or self.finished:
            return
        try:
            self.running = True
            while True:
                future = self.future
                if not future.done():
                    return
                self.future = None
                try:
                    orig_stack_contexts = stack_context._state.contexts
                    exc_info = None

                    try:
                        value = future.result()
                    except Exception:
                        self.had_exception = True
                        exc_info = sys.exc_info()
                    future = None
  
                    yielded = self.gen.send(value)

                except (StopIteration, Return) as e:
                    self.finished = True
                    self.future = _null_future
                    if self.pending_callbacks and not self.had_exception:
                        raise LeakedCallbackError(
                            "finished without waiting for callbacks %r" %
                            self.pending_callbacks)
                    future_set_result_unless_cancelled(self.result_future,
_value_from_stopiteration(e))
                    self.result_future = None
                    self._deactivate_stack_context()
                    return
                except Exception:
                    # 一些结束操作
                    return
                if not self.handle_yield(yielded):
                    return
                yielded = None
        finally:
            self.running = False

协程每生成一个 Future,都会生成对应的一个 Runner,并将 Future 初始化注入都其中。Runner 的 run 方法中,通过 self.gen.send(Future) 来启动 Future,当 Future 完成时,将其设置成 done,并回调其预设的 callback。

第一个问题:协程的状态保存到哪去了

IOLoop 中通过 add_future 调用实现类 PollIOLoop 中的 add_callback 方法,其中通过 functools 生成偏函数,放入 _callbacks 列表,等待被回调执行。

# IOLoop 的add_future
def add_future(self, future, callback):
    """Schedules a callback on the ``IOLoop`` when the given
    `.Future` is finished.

    The callback is invoked with one argument, the
    `.Future`.
    """
    assert is_future(future)
    callback = stack_context.wrap(callback)
    future.add_done_callback(
        lambda future: self.add_callback(callback, future))

# PollIOLoop 的add_callback
def add_callback(self, callback, *args, **kwargs):
        if thread.get_ident() != self._thread_ident:
            with self._callback_lock:
                if self._closing:
                    return
                list_empty = not self._callbacks
                self._callbacks.append(functools.partial(
                    stack_context.wrap(callback), *args, **kwargs))
                if list_empty:
                    self._waker.wake()
        else:
            if self._closing:
                return
            self._callbacks.append(functools.partial(
                stack_context.wrap(callback), *args, **kwargs))

第二个问题:「合适的时机」是什么?

IOLoop 实际上就是对多路复用的封装,当底层 epoll_wait 事件发生时,即会通知 IOLoop 主线程。

这一段是 IOLoop 中等待多路复用的事件,以及处理事件。

try:
    # 等待事件
      event_pairs = self._impl.poll(poll_timeout)
except Exception as e:
      print("wait fail")
      if errno_from_exception(e) == errno.EINTR:
          continue
      else:
          raise
if self._blocking_signal_threshold is not None:
                    signal.setitimer(signal.ITIMER_REAL,
                                     self._blocking_signal_threshold, 0)
# 处理事件
self._events.update(event_pairs)
while self._events:
    fd, events = self._events.popitem()
    try:
        fd_obj, handler_func = self._handlers[fd]
        handler_func(fd_obj, events)
    except (OSError, IOError) as e:
        if errno_from_exception(e) == errno.EPIPE:
            pass
        else:
            self.handle_callback_exception(self._handlers.get(fd))
    except Exception:
        self.handle_callback_exception(self._handlers.get(fd))
fd_obj = handler_func = None

第三个问题:具体是怎么恢复的。

Runner 通过不断 check Future 的状态,最后调用 callback 来返回结果。

总结

首先 tornado 对多路复用系统调用做了封装,来实现非阻塞 web 服务。

其次 tornado 通过 yield+Future+Runner 实现了生成 Future,Runner 监控结果,回调 callback 来实现协程的执行。

参考:

https://www.jb51.net/python/2976505cr.htm

https://www.jb51.net/article/132918.htm

tornado的事件循环机制

以上就是python tornado协程调度原理示例解析的详细内容,更多关于python tornado协程调度的资料请关注脚本之家其它相关文章!

相关文章

  • 无惧面试,带你搞懂python 装饰器

    无惧面试,带你搞懂python 装饰器

    这篇文章主要介绍了python 装饰器的相关资料,帮助大家更好的理解和学习python,感兴趣的朋友可以了解下
    2020-08-08
  • Python的内存泄漏及gc模块的使用分析

    Python的内存泄漏及gc模块的使用分析

    这篇文章主要介绍了Python的内存泄漏及gc模块的使用分析,有助于读者进一步了解Python的内存分配及回收机制,增强代码编写的安全意识,需要的朋友可以参考下
    2014-07-07
  • pytorch教程之Tensor的值及操作使用学习

    pytorch教程之Tensor的值及操作使用学习

    这篇文章主要为大家介绍了pytorch教程中关于Tensor的操作使用,有需要的朋友可以借鉴参考下,希望可以有所帮助,祝大家升职加薪,共同进步
    2021-09-09
  • Django管理员账号和密码忘记的完美解决方法

    Django管理员账号和密码忘记的完美解决方法

    这篇文章主要给大家介绍了关于Django管理员账号和密码忘记的完美解决方法,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面来一起看看吧
    2018-12-12
  • Python 3.3实现计算两个日期间隔秒数/天数的方法示例

    Python 3.3实现计算两个日期间隔秒数/天数的方法示例

    这篇文章主要介绍了Python 3.3实现计算两个日期间隔秒数/天数的方法,结合实例形式较为详细的分析了基于Python3.3的日期时间转换与运算相关操作技巧,需要的朋友可以参考下
    2019-01-01
  • python openpyxl使用方法详解

    python openpyxl使用方法详解

    这篇文章主要介绍了python openpyxl使用方法详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
    2019-07-07
  • python中的_和__用法及说明

    python中的_和__用法及说明

    这篇文章主要介绍了python中的_和__用法及说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教
    2023-12-12
  • Django框架model模型对象验证实现方法分析

    Django框架model模型对象验证实现方法分析

    这篇文章主要介绍了Django框架model模型对象验证实现方法,结合实例形式分析了Django框架model模型对象验证相关原理、实现步骤及操作注意事项,需要的朋友可以参考下
    2019-10-10
  • 浅谈怎么给Python添加类型标注

    浅谈怎么给Python添加类型标注

    今天给大家带来的文章是Python的相关知识,文章围绕着怎么给Python添加类型标注展开,文中有非常详细的介绍,需要的朋友可以参考下
    2021-06-06
  • 10个易被忽视但应掌握的Python基本用法

    10个易被忽视但应掌握的Python基本用法

    这篇文章主要介绍了10个易被忽视但应掌握的Python基本用法,如字典推导、内省工具等,主要针对Python3版本,需要的朋友可以参考下
    2015-04-04

最新评论