该系列译自 Rerun.me, 并在原作者的基础上有所改动。本篇为该系列的第二篇——Actor 的消息机制。
今天我们来探讨 Akka 的 Actor 消息机制,我们仍以之前讨论过的 学生-老师 作为范例。
回顾『学生-老师』的例子
我们考虑由应用 StudentSimulatorApp
单独给 TeacherActor
发送消息的情形,这里你可以认为主程序就是 StudentSimulatorApp
.
从以上这张流程图我们可以得知:
- 『学生』首先初始化一个叫做
ActorSystem
的东西 - 然后它使用
ActorSystem
来创建一个叫做ActorRef
的东西,并将QuoteRequest
消息发送给ActorRef
(TeacherActor
的代理) ActorRef
将消息传送至Message Dispatcher
进行分发Dispatcher
将消息压入目标 Actor 的MailBox
队列Dispatcher
将MailBox
送入到某个线程去执行(后续章节介绍更多细节)MailBox
抛出一个message
并最终将其委托给真正的Actor
- Teacher 的 receive 方法处理。
下面我们将会对以上各个步骤的细节进行详解,你可以随时回来体会这6个步骤。
StudentSimulatorApp
程序
我们使用StudentSimulatorApp
程序启动 JVM 并初始化 ActorSystem
.
从图中我们可以发现,StudentSimulatorApp
主要完成以下三个步骤:
- 创建
ActorSystem
- 使用
ActorSystem
创建指向 Teacher Actor 的代理(ActorRef) - 向 Teacher 的代理发送
QuoteRequest
消息
以下对这三点逐一剖析。
1. 创建 ActorSystem
ActorSystem 是通往 Actor 世界的入口,你可以通过 ActorSystems
创建或者停止 Actors
, 甚至停止整个 Actor 环境。
从另一方面来讲,Actors 是层级化的并且对于所有的 Actors 来讲,ActorSystem 和 java.lang.Object
或 scala.Any
是类似的, 也就是说这是所有 Actors 的祖先。当你使用 ActorSystem 的 actorOf 方法创建 Actor 时,你创建的 Actor 总是在 ActorSystem 之下。
初始化 ActorSystem 的语句看起来像是如下这个样子:
val system = ActorSystem("UniversityMessageSystem")
UniversityMessageSystem
仅仅只是 ActorSystem 的一个名字。
2. 创建指向 TeacherActor 的代理
创建指向 TeacherActor 的代理的代码片段如下:
val teacherActorRef:ActorRef = actorSystem.actorOf(Props[TeacherActor])
actorOf
是 ActorSystem 中创建 Actor 的方法。但是正如你所见,它并不返回我们需要的 TeacherActor, 而是返回一个类型为 ActorRef 的东西。
ActorRef 扮演的角色类似实际 Actors 的一个代理,客户端并不与 Actor 直接通信。这正是 Actor 模型的处理方式:避免直接访问 TeacherActor 中任意自定义或者私有方法及变量。
再复述一遍,你只能将消息发送给 ActorRef, 它最终会到达实际的 Actor. 你永远都不能与 Actor 直接通信,如果你找到了某种方法做到了,其他人会恨你到死的。:-(
3. 向代理发送 QuoteRequest
这同样只需一行即可完成,你可以告诉 QuoteRequest 消息发送给 ActorRef. Actor 中的 tell
方法实际上是 !
. (实际上也有tell
方法,简略起见这里使用 !
)
//send a message to the Teacher Actor
teacherActorRef ! QuoteRequest
好了!这就是StudentSimulatorApp
的核心机制了。StudentSimulatorApp.scala
的源码如下:
package me.yuanbin.akkanotes.actormsg
import akka.actor.ActorSystem
import akka.actor.Props
import me.yuanbin.akkanotes.protocols.StudentProtocol._
/**
* Let's have the student as a simple App now instead of an Actor for the first part.
*
*/
object StudentSimulatorApp extends App {
//Initialize the ActorSystem
val actorSystem = ActorSystem("UniversityMessageSystem")
//construct the Teacher Actor Ref
val teacherActorRef = actorSystem.actorOf(Props[TeacherActor])
//send a message to the Teacher Actor
teacherActorRef ! QuoteRequest
//Let's wait for a couple of seconds before we shut down the system
Thread.sleep(2000)
//Shut down the ActorSystem.
actorSystem.shutdown()
}
在代码的结尾处,你需要关闭 ActorSystem, 否则 JVM 将会一直运行。主线程睡眠一小会以使得 TeacherActor 可以完成它的任务。当然你也可以使用一些测试类而不使主线程睡眠。
消息
我们在之前将 QuoteRequest 发送给了 ActorRef, 但是我们却根本没见着消息类!细思极恐!
为了便于组织,我们通常将消息封装为 scala 中的 object
.
QuoteRequest
是 Student 发往 TeacherActor 的请求。TeacherActor 随后则回应一个 QuoteResponse
.
StudentProtocol
package me.yuanbin.akkanotes.protocols
object StudentProtocol {
/*
* The Student sends this message to request for a Quotation
*
*/
case class QuoteRequest()
}
TeacherProtocol
package me.yuanbin.akkanotes.protocols
object TeacherProtocol {
/*
* The TeacherActor responds back to the Student with this message object
* The actual quote string is wrapped inside the response.
*
*/
case class QuoteResponse(quoteString:String)
}
Dispatcher 和 Mailbox
ActorRef 将 message handling 的功能委托给 Dispatcher, 深入分析发现,当我们创建 ActorSystem 和 ActorRef 时,相应的 Dispatcher 和 Mailbox 也同时也创建。下面让我们看看他们到底是什么。
MailBox
每个 Actor 都有一个 Mailbox(有一个特殊例子), 以我们的比喻为例,每个 Teacher 都有一个 mailbox. Teacher 需要检查邮箱并且处理其中的消息。在 Actor 的世界中,使用另一种方法——mailbox 在得到机会后使用 Actor 完成这项工作。mailbox 有一个 queue 以 FIFO 的形式来存储和处理消息,这和我们通常的信箱有点区别,我们通常的信箱通常是最近的信件放在最上面。
Dispatcher
分发器的实现比较精妙,咋一看,Dispatcher 仅仅只是从 ActorRef 中获取消息并将其传送至 Mailbox. 但实际上在其背后却大有玄机: 分发器封装了 ExecutorService (ForkJoinPool or ThreadPoolExecutor), 它依赖 ExecutorService 执行 MailBox 程序。
Akka 中 Dispatcher 相关的程序如下:
protected[akka] override def registerForExecution(mbox: Mailbox, ...): Boolean = {
...
try {
executorService execute mbox
true
}
...
}
我们已经知道 MailBox 将所有 messages 保存在 Queue 中,由于需要 Executor 运行 Mailbox, 所以 MailBox 需要以线程的形式存在。Bingo! 这正是 MailBox 的声明和构造方式!
Mailbox 的签名如下:
private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
extends ForkJoinTask[Unit] with SystemMessageQueue with Runnable
Teacher Actor
当 MailBox
启动了 run 方法时,将会从 message queue 中移除 message 并将其传输给 Actor 进行处理。当你把消息告诉给了 ActorRef 时,最终被调用的方法是目标 Actor 的 receive 方法。
TeacherActor 是一个比较原始的类,它有一组 quotes 并且由 receive 方法 handle 这些消息。
TeacherActor.scala
package me.yuanbin.akkanotes.actormsg
import akka.actor.Actor
import me.yuanbin.akkanotes.protocols.StudentProtocol._
import me.yuanbin.akkanotes.protocols.TeacherProtocol._
import scala.util.Random
/**
* Your Teacher Actor class.
*
* The class could use refinement by way of
* using ActorLogging which uses the EventBus of the Actor framework
* instead of the plain old System out
*
*/
class TeacherActor extends Actor {
val quotes = List(
"Moderation is for cowards",
"Anything worth doing is worth overdoing",
"The trouble is you think you have time",
"You never gonna know if you never even try")
def receive = {
case QuoteRequest => {
//Get a random Quote from the list and construct a response
val quoteResponse = QuoteResponse(quotes(Random.nextInt(quotes.size)))
println()
println(quoteResponse)
}
}
}
TeacherActor 的 receive 方法通过模式匹配最终仅匹配一个消息——QuoteRequest (实际上我们应该为 default case 匹配,但这里有别的原因)
receive 方法做了如下事情:
- 模式匹配:
QuoteRequest
- 从静态的 quotes list 中随机挑选一条 quote
- 构建
QuoteResponse
- 将
QuoteResponse
打印到终端
源码
源码可以从 akka_notes/akka_messaging 找到。
运行方式,确保系统装好 sbt 0.13.8.
cd akka_notes/akka_messaging
sbt run
稍等片刻即可看到 TeacherActor 的输出。