Python定義一個Actor任務
問題
你想定義跟actor模式中類似“actors”角色的任務
解決方案
actor模式是一種最古老的也是最簡單的并行和分布式計算解決方案。 事實上,它天生的簡單性是它如此受歡迎的重要原因之一。 簡單來講,一個actor就是一個并發執行的任務,只是簡單的執行發送給它的消息任務。 響應這些消息時,它可能還會給其他actor發送更進一步的消息。 actor之間的通信是單向和異步的。因此,消息發送者不知道消息是什么時候被發送, 也不會接收到一個消息已被處理的回應或通知。
結合使用一個線程和一個隊列可以很容易的定義actor,例如:
from queue import Queuefrom threading import Thread, Event# Sentinel used for shutdownclass ActorExit(Exception): passclass Actor: def __init__(self): self._mailbox = Queue() def send(self, msg): ’’’ Send a message to the actor ’’’ self._mailbox.put(msg) def recv(self): ’’’ Receive an incoming message ’’’ msg = self._mailbox.get() if msg is ActorExit: raise ActorExit() return msg def close(self): ’’’ Close the actor, thus shutting it down ’’’ self.send(ActorExit) def start(self): ’’’ Start concurrent execution ’’’ self._terminated = Event() t = Thread(target=self._bootstrap) t.daemon = True t.start() def _bootstrap(self): try: self.run() except ActorExit: pass finally: self._terminated.set() def join(self): self._terminated.wait() def run(self): ’’’ Run method to be implemented by the user ’’’ while True: msg = self.recv()# Sample ActorTaskclass PrintActor(Actor): def run(self): while True: msg = self.recv() print(’Got:’, msg)# Sample usep = PrintActor()p.start()p.send(’Hello’)p.send(’World’)p.close()p.join()
這個例子中,你使用actor實例的 send() 方法發送消息給它們。 其機制是,這個方法會將消息放入一個隊里中, 然后將其轉交給處理被接受消息的一個內部線程。 close() 方法通過在隊列中放入一個特殊的哨兵值(ActorExit)來關閉這個actor。 用戶可以通過繼承Actor并定義實現自己處理邏輯run()方法來定義新的actor。 ActorExit 異常的使用就是用戶自定義代碼可以在需要的時候來捕獲終止請求 (異常被get()方法拋出并傳播出去)。
如果你放寬對于同步和異步消息發送的要求, 類actor對象還可以通過生成器來簡化定義。例如:
def print_actor(): while True: try: msg = yield # Get a message print(’Got:’, msg) except GeneratorExit: print(’Actor terminating’)# Sample usep = print_actor()next(p) # Advance to the yield (ready to receive)p.send(’Hello’)p.send(’World’)p.close()
討論
actor模式的魅力就在于它的簡單性。 實際上,這里僅僅只有一個核心操作 send() . 甚至,對于在基于actor系統中的“消息”的泛化概念可以已多種方式被擴展。 例如,你可以以元組形式傳遞標簽消息,讓actor執行不同的操作,如下:
class TaggedActor(Actor): def run(self): while True: tag, *payload = self.recv() getattr(self,’do_’+tag)(*payload) # Methods correponding to different message tags def do_A(self, x): print(’Running A’, x) def do_B(self, x, y): print(’Running B’, x, y)# Examplea = TaggedActor()a.start()a.send((’A’, 1)) # Invokes do_A(1)a.send((’B’, 2, 3)) # Invokes do_B(2,3)a.close()a.join()
作為另外一個例子,下面的actor允許在一個工作者中運行任意的函數, 并且通過一個特殊的Result對象返回結果:
from threading import Eventclass Result: def __init__(self): self._evt = Event() self._result = None def set_result(self, value): self._result = value self._evt.set() def result(self): self._evt.wait() return self._resultclass Worker(Actor): def submit(self, func, *args, **kwargs): r = Result() self.send((func, args, kwargs, r)) return r def run(self): while True: func, args, kwargs, r = self.recv() r.set_result(func(*args, **kwargs))# Example useworker = Worker()worker.start()r = worker.submit(pow, 2, 3)worker.close()worker.join()print(r.result())
最后,“發送”一個任務消息的概念可以被擴展到多進程甚至是大型分布式系統中去。 例如,一個類actor對象的 send() 方法可以被編程讓它能在一個套接字連接上傳輸數據 或通過某些消息中間件(比如AMQP、ZMQ等)來發送。
以上就是Python定義一個Actor任務的詳細內容,更多關于Python actor任務的資料請關注好吧啦網其它相關文章!
相關文章: