后端 扩展Python Gevent的Actor模型

afgfhjk · February 29, 2020 · 49 hits

什么是 Actor 模型?

Actor 模型中文版)是一种基于消息传递(message-passing)的并发(concurrent)计算模型。

它与 OOP 异同:

  • 它推崇 “一切皆为 Actor”,而 OOP 推崇 “一切皆为 Object”
  • 表面上,Actor 通过发送消息与其他 Actor 通信,OOP 的 Object 通过发送消息与其他 Object 通信。实际上,前者为发送结构化的数据,而后者为调用对方的方法。
  • 它的发送者与已经发送的消息解耦,它允许进行异步通信,从而实现发送者与接收者并发执行。而 OOP 的方法调用者(发送者)与方法被调用者(接收者)通常顺序执行,而且调用者与被调用者通常具有较强的耦合。
  • 它的消息接收者是通过地址区分的,有时也被称作 “邮件地址”。而 OOP 的 Object 通过引用(地址)来区分。
  • 它着重消息传递,而 OOP 着重于类与对象。

Gevent 的 Actor 实现

gevent中文版)是一个基于 libev 的并发库,它为各种并发和网络相关的任务提供了整洁的 API。

Actors中文版)章节已介绍了如何基于 Greenlet 和 Queue 实现

该实现存在的问题:发送者与接收者紧耦合,发送者持有接收者的对象引用。

解决办法

在此基础上,我利用message库将其扩展为发布 - 订阅者模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from gevent.queue import Queue
from gevent import Greenlet
from message import observable


class Actor(Greenlet):
    def __init__(self):
        self.inbox = Queue()
        Greenlet.__init__(self)

    def send(self, message):
        self.inbox.put(message)

    def receive(self, message):
        raise NotImplemented()

    def _run(self):
        self.running = True

        while self.running:
            message = self.inbox.get()
            self.receive(message)


@observable
class Publisher(Actor):
    def __init__(self, subject):
        self.subject(subject)
        Actor.__init__(self)

    def subcribe(self, observer):
        self.sub(self.subject, observer.send)

    def publish(self, message):
        self.pub(self.subject, message)

如此,不仅将发送者与接收者解耦,而且支持发送者发送 1 条消息时,多个接收者接收同 1 条消息。

类似 Ping-Pong 的示例,Pinger 对象订阅了 Ponger 对象的 evt.pong 事件,Ponger 对象订阅 Pinger 对象的 evt.ping 事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import gevent


class Pinger(Publisher):
    def receive(self, message):
        print(message)
        self.publish('ping')
        gevent.sleep(1)


class Ponger(Publisher):
    def receive(self, message):
        print(message)
        self.publish('pong')
        gevent.sleep(1)


ping = Pinger('evt.ping')
pong = Ponger('evt.pong')

ping.subcribe(pong)
pong.subcribe(ping)
ping.start()
pong.start()

ping.publish('start')
gevent.joinall([ping, pong])

接收消息超时 (timeout)

某些应用场景需要周期性激活 Actor,当 Actor 没有收到任何消息时。

基于上述代码,利用 gevent.queue.get 的超时功能来实现接收消息超时。如此,进一步加强 Actor 的并发能力。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
from gevent.queue import Queue
from gevent import Greenlet
from message import observable


class Actor(Greenlet):
    def __init__(self, receive_timeout=None):
        self.inbox = Queue()
        self.receive_timeout = receive_timeout
        Greenlet.__init__(self)

    def send(self, message):
        self.inbox.put(message)

    def receive(self, message):
        raise NotImplemented()

    def handle_timeout(self):
        pass

    def _run(self):
        self.running = True

        while self.running:
            try:
                message = self.inbox.get(True, self.receive_timeout)
            except Empty:
                self.handle_timeout()
            else:
                self.receive(message)


@observable
class Publisher(Actor):
    def __init__(self, subject, receive_timeout=None):
        self.subject = subject
        Actor.__init__(self, receive_timeout)

    def subcribe(self, observer):
        self.sub(self.subject, observer.send)

    def publish(self, message):
        self.pub(self.subject, message)
No Reply at the moment.
You need to Sign in before reply, if you don't have an account, please Sign up first.