Python定义一个Actor任务

 更新时间:2020年07月29日 11:33:54   作者:David Beazley  
这篇文章主要介绍了Python定义一个Actor任务,文中讲解非常细致,代码帮助大家更好的理解和学习,感兴趣的朋友可以了解下

问题

你想定义跟actor模式中类似“actors”角色的任务

解决方案

actor模式是一种最古老的也是最简单的并行和分布式计算解决方案。 事实上,它天生的简单性是它如此受欢迎的重要原因之一。 简单来讲,一个actor就是一个并发执行的任务,只是简单的执行发送给它的消息任务。 响应这些消息时,它可能还会给其他actor发送更进一步的消息。 actor之间的通信是单向和异步的。因此,消息发送者不知道消息是什么时候被发送, 也不会接收到一个消息已被处理的回应或通知。

结合使用一个线程和一个队列可以很容易的定义actor,例如:

from queue import Queue
from threading import Thread, Event

# Sentinel used for shutdown
class ActorExit(Exception):
  pass

class Actor:
  def __init__(self):
    self._mailbox = Queue()

  def send(self, msg):
    '''
    Send a message to the actor
    '''
    self._mailbox.put(msg)

  def recv(self):
    '''
    Receive an incoming message
    '''
    msg = self._mailbox.get()
    if msg is ActorExit:
      raise ActorExit()
    return msg

  def close(self):
    '''
    Close the actor, thus shutting it down
    '''
    self.send(ActorExit)

  def start(self):
    '''
    Start concurrent execution
    '''
    self._terminated = Event()
    t = Thread(target=self._bootstrap)

    t.daemon = True
    t.start()

  def _bootstrap(self):
    try:
      self.run()
    except ActorExit:
      pass
    finally:
      self._terminated.set()

  def join(self):
    self._terminated.wait()

  def run(self):
    '''
    Run method to be implemented by the user
    '''
    while True:
      msg = self.recv()

# Sample ActorTask
class PrintActor(Actor):
  def run(self):
    while True:
      msg = self.recv()
      print('Got:', msg)

# Sample use
p = PrintActor()
p.start()
p.send('Hello')
p.send('World')
p.close()
p.join()

这个例子中,你使用actor实例的 send() 方法发送消息给它们。 其机制是,这个方法会将消息放入一个队里中, 然后将其转交给处理被接受消息的一个内部线程。 close() 方法通过在队列中放入一个特殊的哨兵值(ActorExit)来关闭这个actor。 用户可以通过继承Actor并定义实现自己处理逻辑run()方法来定义新的actor。 ActorExit 异常的使用就是用户自定义代码可以在需要的时候来捕获终止请求 (异常被get()方法抛出并传播出去)。

如果你放宽对于同步和异步消息发送的要求, 类actor对象还可以通过生成器来简化定义。例如:

def print_actor():
  while True:

    try:
      msg = yield   # Get a message
      print('Got:', msg)
    except GeneratorExit:
      print('Actor terminating')

# Sample use
p = print_actor()
next(p)   # Advance to the yield (ready to receive)
p.send('Hello')
p.send('World')
p.close()

讨论

actor模式的魅力就在于它的简单性。 实际上,这里仅仅只有一个核心操作 send() . 甚至,对于在基于actor系统中的“消息”的泛化概念可以已多种方式被扩展。 例如,你可以以元组形式传递标签消息,让actor执行不同的操作,如下:

class TaggedActor(Actor):
  def run(self):
    while True:
       tag, *payload = self.recv()
       getattr(self,'do_'+tag)(*payload)

  # Methods correponding to different message tags
  def do_A(self, x):
    print('Running A', x)

  def do_B(self, x, y):
    print('Running B', x, y)

# Example
a = TaggedActor()
a.start()
a.send(('A', 1))   # Invokes do_A(1)
a.send(('B', 2, 3))  # Invokes do_B(2,3)
a.close()
a.join()

作为另外一个例子,下面的actor允许在一个工作者中运行任意的函数, 并且通过一个特殊的Result对象返回结果:

from threading import Event
class Result:
  def __init__(self):
    self._evt = Event()
    self._result = None

  def set_result(self, value):
    self._result = value

    self._evt.set()

  def result(self):
    self._evt.wait()
    return self._result

class Worker(Actor):
  def submit(self, func, *args, **kwargs):
    r = Result()
    self.send((func, args, kwargs, r))
    return r

  def run(self):
    while True:
      func, args, kwargs, r = self.recv()
      r.set_result(func(*args, **kwargs))

# Example use
worker = Worker()
worker.start()
r = worker.submit(pow, 2, 3)
worker.close()
worker.join()
print(r.result())

最后,“发送”一个任务消息的概念可以被扩展到多进程甚至是大型分布式系统中去。 例如,一个类actor对象的 send() 方法可以被编程让它能在一个套接字连接上传输数据 或通过某些消息中间件(比如AMQP、ZMQ等)来发送。

以上就是Python定义一个Actor任务的详细内容,更多关于Python actor任务的资料请关注脚本之家其它相关文章!

相关文章

  • pandas创建series的三种方法小结

    pandas创建series的三种方法小结

    这篇文章主要介绍了pandas创建series的三种方法小结,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-05-05
  • Python模块学习 datetime介绍

    Python模块学习 datetime介绍

    Python提供了多个内置模块用于操作日期时间,像calendar,time,datetime。time模块我在之前的文章已经有所介绍,它提供的接口与C标准库time.h基本一致
    2012-08-08
  • Python自动生成代码 使用tkinter图形化操作并生成代码框架

    Python自动生成代码 使用tkinter图形化操作并生成代码框架

    这篇文章主要为大家详细介绍了Python自动生成代码,使用tkinter图形化操作并生成代码框架,具有一定的参考价值,感兴趣的小伙伴们可以参考一下
    2019-09-09
  • 使用scipy.optimize的fsolve,root函数求解非线性方程问题

    使用scipy.optimize的fsolve,root函数求解非线性方程问题

    这篇文章主要介绍了使用scipy.optimize的fsolve,root函数求解非线性方程问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2022-12-12
  • Python使用itchat 功能分析微信好友性别和位置

    Python使用itchat 功能分析微信好友性别和位置

    这篇文章主要介绍了 Python使用itchat 功能分析微信好友性别和位置 的相关资料,需要的朋友可以参考下
    2019-08-08
  • 浅析Python中的序列化存储的方法

    浅析Python中的序列化存储的方法

    这篇文章主要介绍了Python中的序列化存储的方法,序列化存储主要针对的是内存和硬盘之间的写入操作,需要的朋友可以参考下
    2015-04-04
  • Python+Selenium实现短视频自动上传与发布的实践

    Python+Selenium实现短视频自动上传与发布的实践

    本文主要介绍了Python+Selenium实现短视频自动上传与发布的实践,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
    2022-04-04
  • 浅谈Pycharm调用同级目录下的py脚本bug

    浅谈Pycharm调用同级目录下的py脚本bug

    今天小编就为大家分享一篇浅谈Pycharm调用同级目录下的py脚本bug,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-12-12
  • pycharm中选中一个单词替换所有重复单词的实现方法

    pycharm中选中一个单词替换所有重复单词的实现方法

    这篇文章主要介绍了pycharm中选中一个单词替换所有重复单词的实现方法,类似于sublime 里的ctrl+D功能,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧
    2020-11-11
  • Python远程创建docker容器的方法

    Python远程创建docker容器的方法

    这篇文章主要介绍了Python远程创建docker容器的方法,如果docker  ps找不到该容器,可以使用 docker ps -a查看所有的,然后看刚才创建的容器的STATUS是EXIT0还是EXIT1如果是1,那应该是有报错,使用 docker logs 容器id命令来查看日志,根据日志进行解决,需要的朋友可以参考下
    2024-04-04

最新评论