该系列译自 Rerun.me, 并在原作者的基础上有所改动。本篇为该系列的第二篇——Actor 的消息机制。

今天我们来探讨 Akka 的 Actor 消息机制,我们仍以之前讨论过的 学生-老师 作为范例。

回顾『学生-老师』的例子

我们考虑由应用 StudentSimulatorApp 单独给 TeacherActor 发送消息的情形,这里你可以认为主程序就是 StudentSimulatorApp.

TeacherRequestFlowSimulatedApp

从以上这张流程图我们可以得知:

  1. 『学生』首先初始化一个叫做 ActorSystem 的东西
  2. 然后它使用 ActorSystem 来创建一个叫做 ActorRef 的东西,并将 QuoteRequest 消息发送给 ActorRef(TeacherActor 的代理)
  3. ActorRef 将消息传送至Message Dispatcher 进行分发
  4. Dispatcher 将消息压入目标 Actor 的 MailBox 队列
  5. DispatcherMailBox 送入到某个线程去执行(后续章节介绍更多细节)
  6. MailBox 抛出一个message 并最终将其委托给真正的 Actor - Teacher 的 receive 方法处理。

下面我们将会对以上各个步骤的细节进行详解,你可以随时回来体会这6个步骤。

StudentSimulatorApp 程序

我们使用StudentSimulatorApp 程序启动 JVM 并初始化 ActorSystem.

StudentSimulatorApp

从图中我们可以发现,StudentSimulatorApp 主要完成以下三个步骤:

  1. 创建 ActorSystem
  2. 使用 ActorSystem 创建指向 Teacher Actor 的代理(ActorRef)
  3. 向 Teacher 的代理发送 QuoteRequest 消息

以下对这三点逐一剖析。

1. 创建 ActorSystem

ActorSystem 是通往 Actor 世界的入口,你可以通过 ActorSystems 创建或者停止 Actors, 甚至停止整个 Actor 环境。

从另一方面来讲,Actors 是层级化的并且对于所有的 Actors 来讲,ActorSystem 和 java.lang.Objectscala.Any 是类似的, 也就是说这是所有 Actors 的祖先。当你使用 ActorSystem 的 actorOf 方法创建 Actor 时,你创建的 Actor 总是在 ActorSystem 之下。

ActorSystemActorCreation

初始化 ActorSystem 的语句看起来像是如下这个样子:

val system = ActorSystem("UniversityMessageSystem")

UniversityMessageSystem 仅仅只是 ActorSystem 的一个名字。

2. 创建指向 TeacherActor 的代理

创建指向 TeacherActor 的代理的代码片段如下:

val teacherActorRef:ActorRef = actorSystem.actorOf(Props[TeacherActor])

actorOf 是 ActorSystem 中创建 Actor 的方法。但是正如你所见,它并不返回我们需要的 TeacherActor, 而是返回一个类型为 ActorRef 的东西。 ActorRef 扮演的角色类似实际 Actors 的一个代理,客户端并不与 Actor 直接通信。这正是 Actor 模型的处理方式:避免直接访问 TeacherActor 中任意自定义或者私有方法及变量。 再复述一遍,你只能将消息发送给 ActorRef, 它最终会到达实际的 Actor. 你永远都不能与 Actor 直接通信,如果你找到了某种方法做到了,其他人会恨你到死的。:-(

ActorRef

3. 向代理发送 QuoteRequest

这同样只需一行即可完成,你可以告诉 QuoteRequest 消息发送给 ActorRef. Actor 中的 tell 方法实际上是 !. (实际上也有tell 方法,简略起见这里使用 !)

//send a message to the Teacher Actor
teacherActorRef ! QuoteRequest

好了!这就是StudentSimulatorApp 的核心机制了。StudentSimulatorApp.scala 的源码如下:

package me.yuanbin.akkanotes.actormsg

import akka.actor.ActorSystem
import akka.actor.Props
import me.yuanbin.akkanotes.protocols.StudentProtocol._

/**
 * Let's have the student as a simple App now instead of an Actor for the first part.
 *
 */
object StudentSimulatorApp extends App {
  //Initialize the ActorSystem
  val actorSystem = ActorSystem("UniversityMessageSystem")

  //construct the Teacher Actor Ref
  val teacherActorRef = actorSystem.actorOf(Props[TeacherActor])

  //send a message to the Teacher Actor
  teacherActorRef ! QuoteRequest

  //Let's wait for a couple of seconds before we shut down the system
  Thread.sleep(2000)

  //Shut down the ActorSystem.
  actorSystem.shutdown()
}

在代码的结尾处,你需要关闭 ActorSystem, 否则 JVM 将会一直运行。主线程睡眠一小会以使得 TeacherActor 可以完成它的任务。当然你也可以使用一些测试类而不使主线程睡眠。

消息

我们在之前将 QuoteRequest 发送给了 ActorRef, 但是我们却根本没见着消息类!细思极恐! 为了便于组织,我们通常将消息封装为 scala 中的 object.

QuoteRequest 是 Student 发往 TeacherActor 的请求。TeacherActor 随后则回应一个 QuoteResponse.

StudentProtocol

package me.yuanbin.akkanotes.protocols

object StudentProtocol {
  /*
   * The Student sends this message to request for a Quotation
   *
   */
  case class QuoteRequest()
}

TeacherProtocol

package me.yuanbin.akkanotes.protocols

object TeacherProtocol {
  /*
   * The TeacherActor responds back to the Student with this message object
   * The actual quote string is wrapped inside the response.
   *
   */
  case class QuoteResponse(quoteString:String)
}

Dispatcher 和 Mailbox

ActorRef 将 message handling 的功能委托给 Dispatcher, 深入分析发现,当我们创建 ActorSystem 和 ActorRef 时,相应的 Dispatcher 和 Mailbox 也同时也创建。下面让我们看看他们到底是什么。

MessageDispatcherMailbox

MailBox

每个 Actor 都有一个 Mailbox(有一个特殊例子), 以我们的比喻为例,每个 Teacher 都有一个 mailbox. Teacher 需要检查邮箱并且处理其中的消息。在 Actor 的世界中,使用另一种方法——mailbox 在得到机会后使用 Actor 完成这项工作。mailbox 有一个 queue 以 FIFO 的形式来存储和处理消息,这和我们通常的信箱有点区别,我们通常的信箱通常是最近的信件放在最上面。

Dispatcher

分发器的实现比较精妙,咋一看,Dispatcher 仅仅只是从 ActorRef 中获取消息并将其传送至 Mailbox. 但实际上在其背后却大有玄机: 分发器封装了 ExecutorService (ForkJoinPool or ThreadPoolExecutor), 它依赖 ExecutorService 执行 MailBox 程序。

Akka 中 Dispatcher 相关的程序如下:

protected[akka] override def registerForExecution(mbox: Mailbox, ...): Boolean = {  
  ...
  try {
      executorService execute mbox
      true
  }
  ...
}

我们已经知道 MailBox 将所有 messages 保存在 Queue 中,由于需要 Executor 运行 Mailbox, 所以 MailBox 需要以线程的形式存在。Bingo! 这正是 MailBox 的声明和构造方式!

Mailbox 的签名如下:

private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
  extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable

Teacher Actor

TeacherActor

MailBox 启动了 run 方法时,将会从 message queue 中移除 message 并将其传输给 Actor 进行处理。当你把消息告诉给了 ActorRef 时,最终被调用的方法是目标 Actor 的 receive 方法。 TeacherActor 是一个比较原始的类,它有一组 quotes 并且由 receive 方法 handle 这些消息。

TeacherActor.scala

package me.yuanbin.akkanotes.actormsg

import akka.actor.Actor
import me.yuanbin.akkanotes.protocols.StudentProtocol._
import me.yuanbin.akkanotes.protocols.TeacherProtocol._
import scala.util.Random

/**
 * Your Teacher Actor class.
 *
 * The class could use refinement by way of
 * using ActorLogging which uses the EventBus of the Actor framework
 * instead of the plain old System out
 *
 */

class TeacherActor extends Actor {
  val quotes = List(
    "Moderation is for cowards",
    "Anything worth doing is worth overdoing",
    "The trouble is you think you have time",
    "You never gonna know if you never even try")

  def receive = {
    case QuoteRequest => {
      //Get a random Quote from the list and construct a response
      val quoteResponse = QuoteResponse(quotes(Random.nextInt(quotes.size)))
      println()
      println(quoteResponse)
    }
  }
}

TeacherActor 的 receive 方法通过模式匹配最终仅匹配一个消息——QuoteRequest (实际上我们应该为 default case 匹配,但这里有别的原因)

receive 方法做了如下事情:

  1. 模式匹配:QuoteRequest
  2. 从静态的 quotes list 中随机挑选一条 quote
  3. 构建 QuoteResponse
  4. QuoteResponse 打印到终端

源码

源码可以从 akka_notes/akka_messaging 找到。

运行方式,确保系统装好 sbt 0.13.8.

cd akka_notes/akka_messaging
sbt run

稍等片刻即可看到 TeacherActor 的输出。