首页 > 代码库 > ActorLite: 一个轻量级Actor模型实现

ActorLite: 一个轻量级Actor模型实现

Actor模型

Actor模型为并行而生,具Wikipedia中的描述,它原本是为大量独立的微型处理器所构建的高性能网络而设计的模型。而目前,单台机器也有了多个独立的计算单元,这就是为什么在并行程序愈演愈烈的今天,Actor模型又重新回到了人们的视线之中了。Actor模型的理念非常简单:天下万物皆为Actor,Actor之间通过发送消息进行通信。Actor模型的执行方式有两个特点:

  1. 每个Actor,单线程地依次执行发送给它的消息。
  2. 不同的Actor可以同时执行它们的消息。

对于第1点至今还有一些争论,例如Actor是否可以并行执行它的消息,Actor是否应该保证执行顺序与消息到达的一致(祥见Wikipedia的相关词条)。而第2点是毋庸置疑的,因此Actor模型天生就带有强大的并发特性。我们知道,系统中执行任务的最小单元是线程,数量一定程度上是有限的,而过多的线程会占用大量资源,也无法带来最好的运行效率,因此真正在同时运行的Actor就会少很多。不过,这并不影响我们从概念上去理解“同一时刻可能有成千上万个Actor正在运行”这个观点。在这里,“正在运行”的含义是“处于运行状态”。

Actor模型的使用无处不在,即使有些地方并没有明确说采用的Actor模型:

  • Google提出的Map/Reduce分布式运算平台
  • C#,Java等语言中的lock互斥实现
  • 传统Email信箱的实现
  • ……

Actor模型的现有实现

提到Actor模型的实现就不得不提Erlang。Erlang专以Actor模型为准则进行设计,它的每个Actor被称作是“进程(Process)”,而进程之间唯一的通信方式便是相互发送消息。一个进程要做的,其实只是以下三件事情:

  • 创建其他进程
  • 向其他进程发送消息
  • 接受并处理消息

例如《Programming Erlang》中的一段代码:

loop() ->    receive        {From, {store, Key, Value}} ->            put(Key, {ok, Value}),            From ! {kvs, true},            loop();        {From, {lookup, Key}} ->            From ! {kvs, get(Key)},            loop()    end.

在Erlang中,大写开头的标识表示“变量(variable)”,而小写开头的标识表示“原子(atom)”,而大括号及其内部以逗号分割的数据结构,则被称作是“元组(tuple)”。以上代码的作用为一个简单的“名字服务(naming service)”,当接受到{From, {store, Key, Value}}的消息时,则表示从From这个进程发来一个store请求,要求把Value与Key进行映射。而接受到{From, {lookup, Key}}消息时,则表示从From这个进程发来一个请求,要求返回Key所对应的内容。服务本身,也是通过向消息来源进程(即From)发送消息来进行回复的。

从Erlang语言的设计并不复杂,其类型系统更加几乎可以用“简陋”来形容,这使得其抽象能力十分欠缺,唯一的复杂数据结构似乎只有“元组”一种而已——不过我们现在不谈其缺陷,谈其“优势”。Erlang语言设计的最大特点便是引入了“模式匹配(pattern matching)”,当且仅当受到的消息匹配了我们预设的结构(例如上面的{XXX, {store, YYY, ZZZ}}),则会进入相应的逻辑片断。其次便是其尾递归的特性,可见上面的代码中在loop方法的结尾再次调用了loop方法。

如果说Erlang语言专为Actor模型而设计,那么Scala语言(学Java的朋友们都去学Scala吧,那才是发展方向)中内置的Actor类库则是外部语言Actor模型实现的经典案例了:

class Pong extends Actor {  def act() {    var pongCount = 0    while (true) {      receive {        case Ping =>          if (pongCount % 1000 == 0)            Console.println("Pong: ping " + pongCount)          sender ! Pong          pongCount = pongCount + 1        case Stop =>          Console.println("Pong: stop")          exit()      }    }  }}

Pong类继承了Actor模型,并覆盖其act方法。由于没有Erlang的尾递归特性,Scala Actor使用一个while (true)进行不断的循环。获取到消息之后,将会使用case语句对消息进行判断,并执行相应逻辑。Scala的Actor类库充分利用了Scala的语法特性,让Actor模型好像是Scala内置功能一样,非常漂亮。

此外,其他较为著名的Actor模型实现还有Io Language、Jetlang、以及.NET平台下的MS CCR和Retlang。后文中我们还会简单提到.NET下Actor Model实现,其他内容就需要感兴趣的朋友们自行挖掘了。

 

Actor模型中的任务调度

Actor模型的任务调度方式分为“基于线程(thread-based)的调度”以及“基于事件(event-based)的调度”两种。

基于线程的调度为每个Actor分配一个线程,在接受一个消息(如在Scala Actor中使用receive)时,如果当前Actor的“邮箱(mail box)”为空,则会阻塞当前线程直到获得消息为止。基于线程的调度实现起来较为简单,例如在.NET中可以通过Monitor.Wait/Pulse来轻松实现这样的生产/消费逻辑。不过基于线程的调度缺点也是非常明显的,由于线程数量受到操作系统的限制,把线程和Actor捆绑起来势必影响到系统中可以同时的Actor数量。而线程数量一多也会影响到系统资源占用以及调度,而在某些情况下大部分的Actor会处于空闲状态,而大量阻塞线程既是系统的负担,也是资源的浪费。因此基于线程的调度是一个拥有重大缺陷的实现,现有的Actor Model大都不会采取这种方式。

于是另一种Actor模型的任务调度方式便是基于事件的调度。“事件”在这里可以简单理解为“消息到达”事件,而此时才会为Actor的任务分配线程并执行。很容易理解,我们现在便可以使用少量的线程来执行大量Actor产生的任务,既保证了运算资源的充分占用,也不会让系统在同时进行的太多任务中“疲惫不堪”,这样系统便可以得到很好的伸缩性。在Scala Actor中也可以选择使用“react”而不是“recive”方法来使用基于事件的方式来执行任务。

现有的Actor Model一般都会使用基于事件的调度方式。不过某些实现,如MS CCR、Retlang、Jetlang等类库还需要客户指定资源分配方式,显式地指定Actor与资源池(即线程池)之间的对应关系。而如Erlang或Scala则隐藏了这方面的分配逻辑,由系统整体进行统一管理。前者与后者相比,由于进行了更多的人工干涉,其资源分配可以更加合理,执行效率也会更高——不过其缺点也很明显:会由此带来额外的复杂度。

我们即将实现的简单Actor Model类库,也将使用了基于事件的调度方式。同样为了简化资源分配的过程,我们将直接使用.NET自带的线程池来运行任务。

 

任务分配逻辑

如上文所述,这次要实现的是一个非常简单的Actor模型,使用基于事件的分配方式,直接把任务交给.NET自带的线程池去使用。不过我们又该什么时候把一个Actor推入线程池的执行队列呢?这其实取决于我们执行Actor的两个“基本原则”:

  • 如果Actor的邮箱中包含消息,那么要尽早执行。
  • 对于单个Actor对象来说,它的消息是顺序执行的。

因此,我们有两个“时机”可以把一个Actor交由线程池去执行:

  • 当Actor接收到一个消息(且该Actor处于“等待”状态)
  • 当Actor执行完一个消息(且Actor的邮箱中存在更多消息)

显然,在进行操作时需要小心处理并发造成的问题,因为一个“执行完”和多个“接受到”事件可能同时出现。如果操作不当,则容易出现各种错误的情况:

  • 某个Actor的邮箱未空,却已停止执行。
  • 同一个Actor的两个消息被并行地处理。
  • Actor的邮箱已经没有消息,却被要求再次执行。

至于并行控制的方式,就请关注下面的实现吧。

 

简单的Actor模型实现

Actor模型中最关键的莫过于Actor对象的实现。一个Actor的功能有如下三种:

  • 将消息放入邮箱
  • 接受并处理消息
  • 循环/退出循环

因此Actor抽象类对外的接口大致如下:

public abstract class Actor<T> : IActor{    protected abstract void Receive(T message);    protected void Exit() { ... }    public void Post(T message) { ... }}

三个方法的签名应该已经充分说明了各自的含义。不过IActor又是什么呢?请看它的定义:

internal interface IActor{    void Execute();    bool Existed { get; }    int MessageCount { get; }    ActorContext Context { get; }}

这是一个internal修饰的类型,这意味着它的访问级别被限制在程序集内部。IActor接口的作用是作为一个统一的类型,交给Dispatcher——也就是Actor模型的任务分发逻辑所使用的。IActor接口的前三个成员很容易从名称上理解其含义,那么ActorContext又是做什么用的呢?

internal class ActorContext{    public ActorContext(IActor actor)    {        this.Actor = actor;    }    public IActor Actor { get; private set; }    ...}public abstract class Actor<T> : IActor{    protected Actor()    {        this.m_context = new ActorContext(this);    }    private ActorContext m_context;    ActorContext IActor.Context    {        get        {            return this.m_context;        }    }    ...}

在多线程的环境中,进行一些同步控制是非常重要的事情。线程同步的常用手段是lock,不过如果要减小锁的粒度,那么势必会使用Interlocked类下的CAS等原子操作,而那些操作只能针对最基础的域变量,而不能针对经过封装的属性或方法等成员。ActorContext便包含了用于同步控制,以及其他直接表示Actor内部状态各种字段的对象。这样,我们便可以通过ActorContext对象来实现一个Lock-Free的链表或队列。您可以会说,那么为什么要用独立的ActorContext类型,而不直接把字段放置在统一的基类(例如ActorBase)中呢?这有两点原因,第一点是所谓的“统一控制”便于管理,而第二点才是更为关键的:后文会涉及到F#对这Actor模型的使用,只可惜F#在对待父类的internal成员时有一个bug,因此不得不把相关实现替换成接口(IActor)。不过这不是本文的主题,我们下次再讨论F#的问题。

ActorContext目前只有一个字段——没错,只需要一个,这个字段便是表示状态的m_status。

internal class ActorContext{    ...    public const int WAITING = 0;    public const int EXECUTING = 1;    public const int EXITED = 2;    public int m_status;}

m_status字段的类型为int,而不是枚举,这是为了可以使用Interlocked中的CAS操作。而对这个状态的操作,也正好形成了我们同步操作过程中的“壁垒”。我们的每个Actor在任意时刻都处于三种状态之一:

  • 等待(Waiting):邮箱为空,或刚执行完一个消息,正等待分配任务。
  • 执行(Executing):正在执行一个消息(确切地说,由于线程池的缘故,它也可能是还在队列中等待,不过从概念上理解,我们认为它“已经”执行了)。
  • 退出(Exited):已经退出,不会再执行任何消息。

显然,只有当m_status为WAITING时才能够为Actor分配运算资源(线程)以便执行,而分配好资源(将其推入.NET线程池)之后,它的状态就要变成EXECUTING。这恰好可以用一个原子操作形成我们需要的“壁垒”,可以让多个“请求”,“有且只有一个”成功,即“把Actor的执行任务塞入线程池”。如下:

internal class Dispatcher{    ...    public void ReadyToExecute(IActor actor)    {        if (actor.Existed) return;        int status = Interlocked.CompareExchange(            ref actor.Context.m_status,            ActorContext.EXECUTING,            ActorContext.WAITING);        if (status == ActorContext.WAITING)        {            ThreadPool.QueueUserWorkItem(this.Execute, actor);        }    }    ...}

CompareExchange方法返回这次原子操作前m_status的值,如果它为WAITING,那么这次操作(也仅有这次操作)成功地将m_status修改为EXECUTING。在这个情况下,Actor将会被放入线程池,将会由Execute方法来执行。从上述实现中我们可以发现,这个方法在多线程的情况下也能够正常工作。那么ReadyToExecute方法该在什么地方被调用呢?应该说是在任何“可能”让Actor开始执行的时候得到调用。按照文章开始的说法,其中一个情况便是“当Actor接收到一个消息时”:

public abstract class Actor<T> : IActor{    ...    private Queue<T> m_messageQueue = new Queue<T>();    ...    public void Post(T message)    {        if (this.m_exited) return;        lock (this.m_messageQueue)        {            this.m_messageQueue.Enqueue(message);        }        Dispatcher.Instance.ReadyToExecute(this);    }}

而另一个地方,自然是消息“执行完毕”,且Actor的邮箱中还拥有消息的时候,则再次为其分配运算资源。这便是Dispatcher.Execute方法的逻辑:

public abstract class Actor<T> : IActor{    ...    bool IActor.Existed    {        get        {            return this.m_exited;        }    }    int IActor.MessageCount    {        get        {            return this.m_messageQueue.Count;        }    }    void IActor.Execute()    {        T message;        lock (this.m_messageQueue)        {            message = this.m_messageQueue.Dequeue();        }        this.Receive(message);    }    private bool m_exited = false;    protected void Exit()    {        this.m_exited = true;    }    ...}internal class Dispatcher{    ...    private void Execute(object o)    {        IActor actor = (IActor)o;        actor.Execute();

当程序执行到此处时,actor的Execute方法已经从邮箱尾部获取了一条消息,并交由用户实现的Receive方法执行。同时,Actor的Exit方法也可能被调用,使它的Exited属性返回true。不过到目前为止,因为ActorContext.m_status一直保持为EXECUTING,因此这段时间中任意新消息所造成的ReadyToExecute方法的调用都不会为Actor再次分配运算资源。不过接下来,我们将会修改m_status,这可能会造成竞争。那么我们又该怎么处理呢?

如果用户调用了Actor.Exit方法,那么它的Exited属性则会返回true,我们可以将m_status设为EXITED,这样Actor再也不会回到WAITING状态,也就避免了无谓的资源分配:

         if (actor.Existed)        {            Thread.VolatileWrite(                ref actor.Context.m_status,                ActorContext.EXITED);        }        else        {

如果Actor没有退出,那么它会被短暂地切换为WAITING状态。此后如果Actor的邮箱中存在剩余的消息,那么我们会再次调用ReadyToExecute方法“尝试”再次为Actor分配运算资源:

            Thread.VolatileWrite(                ref actor.Context.m_status,                ActorContext.WAITING);            if (actor.MessageCount > 0)            {                this.ReadyToExecute(actor);            }        }    }}

显然,在VolatileWrite和ReadyToExecute方法之间,可能会到来一条新的消息,因而再次引发一次并行地ReadyToExecute调用。不过根据我们之前的分析,这样的竞争并不会造成问题,因此在这方面我们可以完全放心。

至此,我们已经完整地实现了一个简单的Actor模型,逻辑清晰,功能完整——而这一切,仅仅用了不到150行代码。不用怀疑,这的确是事实。

 

使用示例

Actor模型的关键在于消息传递形式(Message Passing Style)的工作方式,通信的唯一手段便是传递消息。在使用我们的Actor模型之前,我们需要继承Actor<T>类来构建一个真正的Actor类型。例如一个最简单的计数器:

public class Counter : Actor<int>{    private int m_value;    public Counter() : this(0) { }    public Counter(int initial)    {        this.m_value = http://www.mamicode.com/initial;>

当计数器收到-1以外的数值时,便会累加到它的计数器上,否则便会打印出当前的值并退出。这里无需做任何同步方面的考虑,因为对于单个Actor来说,所有的消息都是依次处理,不会出现并发的情况。Counter的使用自然非常简单:

static void Main(string[] args){    Counter counter = new Counter();    for (int i = 0; i < 10000; i++)    {        counter.Post(i);    }    counter.Post(-1);    Console.ReadLine();}

不过您可能会问,这样的调用又有什么作用,又能实现什么呢?您现在可以去网上搜索一些Actor模型解决问题的示例,或者您可以等待下一篇文章中,我们使用F#来操作这个Actor模型。您会发现,配合F#的一些特性,这个Actor模型会变得更加实用,更为有趣。

此外,在下一篇文章里我们也会对这个Actor模型进行简单的性能分析。如果您要把它用在生产环境中,那么可能还需要对它再进行一些细微地调整。

 

C#使用Actor模型的缺陷

在Erlang中,每个消息都使用模式匹配来限制其“结构”或“格式”,以此表达不同含义。C#类型系统的抽象能力远胜于Erlang,但是Erlang的“动态性”使得开发人员可以在程序中随意发送和接收任何类型,这种“自由”为Erlang带来了灵活。我们的Actor模型中,每个Actor对象都需要一种特定的消息格式,而这种消息格式承担了“表现Actor所有职责”的重任,但是一个Actor的职责是可能由任何数据组合而成。例如一段最简单的“聊天”程序,其Actor表示了一个“人”,用Erlang实现可能就会这么写:

loop() ->    receive        % 系统要求发起聊天,于是向对方打招呼        {start, Person} ->            Person ! {self(), {greeting, "你好")},            loop();        % 有人前来发起聊天,于是向对方说了点什么        {Person, {greeting, Message}} ->            Person ! {self(), {say, "..."}},            loop();        % 有人前来说话,于是拜拜        {Person, {say, Message}} ->            Person ! {self(), {bye, "..."}},            loop();        ...    end.

不同的元组(tuple)配合不同的原子(atom)便表示了一条消息的“含义”,但是使用C#您又该怎样来表现这些“命令”呢?您可能会使用:

  1. 使用object[]作为消息类型,并检查其元素。
  2. 使用object作为消息类型,并判断消息的具体类型。
  3. 使用枚举或字符串代表“命令”,配合一个参数集合。

第1种做法十分麻烦;第2种则需要“先定义,后使用”也颇为不易;而第3种做法,平心而论,如果有一个“分发类库”的支持就会比较理想——可能比这篇文章中的F#还要理想。老赵正在努力实现这一功能,因为C#的这个特性会影响到.NET平台下所有Actor模型(如第一篇文章中所提到的CCR或Retlang)的使用。

而目前,我们先来看看F#是否可以略为缓解一下这方面的问题。

 

在F#中使用Actor模型

Erlang没有严谨的类型系统,其“消息类型”是完全动态的,因此非常灵活。那么F#又有什么“法宝”可以解决C#中所遇到的尴尬呢?在现在这个问题上,F#有三个领先于C#的关键:

  • 灵活的类型系统
  • 强大的模式匹配
  • 自由的语法

虽然F#也是强类型的编译型语言(这点和C#一致),但是F#的类型系统较C#灵活许多,例如在“聊天”这个示例中,我们就可以编写如下类型作为“消息”类型:

type Message = stringtype ChatMsg =     | Start of Person    | Greeting of Person * Message    | Say of Person * Message    | Bye of Person * Message

在这个定义中用到了F#类型系统中的三个特点:

  • 类型别名:即type Message = string。为一个已有的类型定义一个别名,可以得到更好的语义。与C#使用using定义别名不同的是,F#中的别名可以定义为全局性的,而不仅仅是“源代码”级别的别名。
  • Discriminated Unions:即type ChatMsg = …。Discriminated Unions可以为一个类型指定多个discriminator,每个discriminator由一个名称,以及另一种具体类型来表示。不同的discriminator的具体类型可以不同。
  • 元组(Tuple):即Person * Message。在F#中可以通过把现有类型按顺序进行任意组合来得到新的类型,这种类型便被称为“元组”。

在Actor模型中,我们便组合了F#的三个特别特性,定义了消息的具体类型。而在使用时,我们便可以使用“模式匹配”对不同的“消息”——其实是CharMsg的不同discriminator进行不同地处理。于是具体的Actor类型Person,便可以使用如下定义:

and Person(name: string) =     inherit ChatMsg Actor()        let GetRandom =         let r = new Random(DateTime.Now.Millisecond)        fun() -> r.NextDouble()    member self.Name = name        override self.Receive(message) =        match (message) with

Person类的构造函数接受一个name作为参数,并将其放置到Name属性中。我们同时定义了GetRandom函数,它会在内部构造一个System.Random对象,并每次返回NextDouble方法的值(请注意,无论调用多少次GetRandom方法,永远使用了同一个Random对象,因为他是在定义GetRandom方法时创建的)。而在override的Receive方法中,我们使用“模式匹配”对message对象进行处理:

        // 系统要求发起聊天        | Start(p) ->             Console.WriteLine("系统让{0}向{1}打招呼", self.Name, p.Name)            Greeting(self, "Hi, 有空不?") |> p.Post

请注意上述最后一行,原本我们使用p.Post(…)的调用方式,现在使用了“|>”符号代替。在F#中,x |> f便代表了f(x),它的本意是可以把f(g(h(x)))这样冗余的调用方式转变为清晰的“消息发送”形式:x |> h |> g |> f。而“消息发送”也恰好是我们所需要的“感觉”。因此,我们在接下来的代码中也使用这样的方式:

        // 打招呼        | Greeting(p, msg) ->            Console.WriteLine("{0}向{1}打招呼:{2}", p.Name, self.Name, msg)            if (GetRandom() < 0.8) then                Say(self, "好,聊聊。") |> p.Post            else                Bye(self, "没空,bye!") |> p.Post        // 进行聊天        | Say(p, msg) ->            Console.WriteLine("{0}向{1}说道:{2}", p.Name, self.Name, msg)            if (GetRandom() < 0.8) then                Say(self, "继续聊。") |> p.Post            else                Bye(self, "聊不动了,bye!") |> p.Post        // 结束        | Bye(p, msg) ->            Console.WriteLine("{0}向{1}再见:{2}", p.Name, self.Name, msg)

至此,Person类型定义完毕。我们构造三个Person对象,让它们随意聊天:

let startChat() =    let p1 = new Person("Tom")    let p2 = new Person("Jerry")    let p3 = new Person("老赵")    Start(p2) |> p1.Post    Start(p3) |> p2.PoststartChat()

结果如下(内容会根据随机结果不同而有所改变):

系统让Tom向Jerry打招呼系统让Jerry向老赵打招呼Jerry向老赵打招呼:Hi, 有空不?Tom向Jerry打招呼:Hi, 有空不?Jerry向Tom说道:好,聊聊。老赵向Jerry说道:好,聊聊。Jerry向老赵说道:继续聊。Tom向Jerry说道:继续聊。Jerry向Tom说道:继续聊。老赵向Jerry说道:继续聊。Jerry向老赵说道:继续聊。Tom向Jerry再见:聊不动了,bye!老赵向Jerry说道:继续聊。Jerry向老赵再见:聊不动了,bye!

使用Actor模型抓取网络数据

我们再来看一个略为“现实”一点的例子,需要多个Actor进行配合。首先,我们定义一个“抓取”数据用的Actor,它的唯一作用便是接受一个消息,并将抓取结果传回:

type Crawler() =    inherit ((obj Actor) * string) Actor()    override self.Receive(message) =        let (monitor, url) = message        let content = (new WebClient()).DownloadString(url)        (url, content) |> monitor.Post

再使用“单件”方式直接定义一个monitor对象:

let monitor =    { new obj Actor() with        override self.Receive(message) =            match message with            // crawling            | :? string as url -> (self, url) |> (new Crawler()).Post            // get crawled result            | :? (string * string) as p ->                let (url, content) = p                Console.WriteLine("{0} => {1}", url, content.Length)            // unrecognized message            | _ -> failwith "Unrecognized message" }

每次收到“抓取”消息时,monitor都会创建一个Crawler对象,并把url发送给它,并等待回复消息。而在使用时,只要把对象一个一个“发送”给monitor便可:

let urls = [    "http://www.live.com";    "http://www.baidu.com";    "http://www.google.com";    "http://www.cnblogs.com";    "http://www.microsoft.com"]List.iter monitor.Post urls

运行结果如下:

http://www.live.com => 18035http://www.google.com => 6942http://www.cnblogs.com => 62688http://www.microsoft.com => 1020http://www.baidu.com => 3402

性能分析

最后,我们再对这个Actor模型的性能作一点简单的分析。

如果从“锁”的角度来说,这个Actor模型唯一的锁是在消息队列的访问上,这基本上就是唯一的瓶颈。如果把它替换为lock-free的队列,那么整个Actor模型就是完全的lock-free实现,其“调度”性能可谓良好。

不过,从另一个角度来说,这个Actor模型的调度非常频繁,每次只执行一个消息。试想,如果执行一个消息只需要50毫秒,而进行一次调度就需要100毫秒,那么这个性能的瓶颈还是落在“调度”上。因此,如果我们需要进一步提高Actor模型的性能,则需要从Dispatcher.Execute方法上做文章,例如把每次执行一个消息修改为每次执行n个消息,或超过一个时间的阈值再进行下一次调度。减少调度,也是提高Actor模型性能的关键之一。

此外,如果觉得.NET自带的线程池性能不高,或者说会受到程序其他部分的影响,那么也可以使用独立的线程池进行替换。

自然,任何性能优化都不能只凭感觉下手,一切都要用数据说话,因此在优化时一定要先建立合适的Profile机制,保证每一步优化都是有效的。

ActorLite: 一个轻量级Actor模型实现