该系列译自 Rerun.me, 并在原作者的基础上有所改动。本篇为该系列的第四篇——Actor 的消息请求和响应。

前文 Akka Notes 2 - Actor Messaging 中已经对 Actor 的消息机制做了简介,在那我们可以看到一条 fire-n-forget 消息是如何发送的。(即我们向 Actor 发送一条消息但是并不期望得到它的回应)

从技术角度考虑,我们发送消息给 Actor 总是期望有副作用发生的,从设计上便是如此。目标 Actor 除了不作回应之外还可以对消息做如下事情:

  1. 返回一条回复给发送者(在我们的例子中,TeacherActor 回应一条名言给 StudentActor)
  2. 给其他有可能成为目标受众的 Actor 转发一条回应,这个 Actor 反过来会回应/转发而产生副作用。Routers 和 Supervisors 就是这一类例子。之后我们对这两类进行详解。

请求和响应

在本文中我们将集中在上述第一点:请求与响应 整个周期。一图胜千言,上图!

RequestReponse

这张图传达了我们将要达成的目标,简单起见,这张图并不展示 ActorSystem, Dispatcher 或者 Mailboxes.

  1. DriverApp 发送一条 InitSignal 消息给 StudentActor
  2. StudentActorInitSignal 消息作出回应并发送一条 QuoteRequest 消息给 TeacherActor
  3. 和我们第一次讨论的一样,TeacherActor 则回应一条 QuoteResponse
  4. StudentActor 记录 QuoteResponse 到 console/logger

我们接下来写一个测试用例来验证它。让我们对以上四点逐一剖析。

1. DriverApp 发送 InitSignal

DriverApp

如上图所见,你可能猜到这个 DriverApp 可能要干什么了,它做了如下四件事:

  1. 初始化 ActorSystem
//Initialize the ActorSystem
  val system = ActorSystem("UniversityMessageSystem")
  1. 创建 TeacherActor
//create the teacher actor
  val teacherRef = system.actorOf(Props[TeacherActor], "teacherActor")
  1. 创建 StudentActor
//create the Student Actor - pass the teacher actorref as a constructor parameter to StudentActor
  val studentRef = system.actorOf(Props(new StudentActor(teacherRef)), "studentActor")

你可能注意到我将 TeacherActor 的 ActorRef 作为构造参数传递给了 StudentActor, 这样 StudentActor 就能使用 ActorRef 发送消息给 TeacherActor 了。当然你也可以使用其他方法达到同样的效果(比如通过 Props)。但是通过构造参数的方式在我们后续使用 Supervisors 和 Routers 时将会非常方便。很快我们就会发现子 Actor 也能做到,但是这在这从名字上听起来不太适合—— 使用 Student 来生成 Teacher 听起来确实不太好。

  1. DriverApp 发送 InitSignal 给 StudentActor

DriverApp 发送 InitSignal 给 StudentActor 后 StudentActor 就能发送一条 QuoteRequest 给 TeacherActor 了。

//send a message to the Student Actor
  studentRef ! InitSignal

这就是 DriverClass 的核心了,Thread.sleep 和 ActorSystem.shutdown 只是用来等待数秒以使得消息在最终关闭 ActorSystem 之前发送完毕。

DriverApp.scala 程序如下:

package me.yuanbin.akkanotes.requestresponse

import akka.actor.ActorSystem
import akka.actor.Props
import me.yuanbin.akkanotes.actormsg.StudentActor
import me.yuanbin.akkanotes.actormsg.TeacherActor
import me.yuanbin.akkanotes.protocols.StudentProtocol._

object DriverApp extends App {
  //Initialize the ActorSystem
  val system = ActorSystem("UniversityMessageSystem")

  //create the teacher actor
  val teacherRef = system.actorOf(Props[TeacherActor], "teacherActor")

  //create the Student Actor - pass the teacher actorref as a constructor parameter to StudentActor
  val studentRef = system.actorOf(Props(new StudentActor(teacherRef)), "studentActor")

  //send a message to the Student Actor
  studentRef ! InitSignal

  //Let's wait for a couple of seconds before we shut down the system
  Thread.sleep(2000)

  //Shut down the ActorSystem.
  system.shutdown()
}

StudentActor 响应 InitSignal 并发送 QuoteRequest 消息给 TeacherActor

StudentTeacher23

class StudentActor (teacherActorRef:ActorRef) extends Actor with ActorLogging {
  def receive = {
    /*
     * This InitSignal is received from the DriverApp. 
     * On receipt, the Student sends a message to the Teacher actor. 
     * The teacher actor on receipt of the QuoteRequest responds with a QuoteResponse 
     */
    case InitSignal=> {
      teacherActorRef ! QuoteRequest
    }
    ...

TeacherActor 回应一条 QuoteResponse

这部分代码和我们在 Akka Notes 2 - Actor Messaging 中的代码几乎一样。TeacherActor 在接收 QuoteRequest 消息后返回一条 QuoteResponse.

TeacherActor 的代码如下:

package me.yuanbin.akkanotes.actormsg

import akka.actor.{Actor, ActorLogging}
import me.yuanbin.akkanotes.protocols.StudentProtocol._
import me.yuanbin.akkanotes.protocols.TeacherProtocol._
import scala.util.Random

/*
 * The Philosophy Teacher
 *
 */
class TeacherActor extends Actor with ActorLogging {
  val quotes = List(
    "Moderation is for cowards",
    "Anything worth doing is worth overdoing",
    "The trouble is you think you have time",
    "You never gonna know if you never even try")

  def receive = {
    case QuoteRequest => {
      //Get a random Quote from the list and construct a response
      val quoteResponse = QuoteResponse(quotes(Random.nextInt(quotes.size)))

      log.info ("QuoteRequest received. Sending response to Student")
      //respond back to the Student who is the original sender of QuoteRequest
      sender ! quoteResponse
    }
  }
}

StudentActor 接收来自 TeacherActor 的 QuoteResponse 并记录到 console/logger

StudentLog

StudentActor 的完整代码如下:

package me.yuanbin.akkanotes.actormsg

import akka.actor.{Actor, ActorLogging, ActorRef}
import me.yuanbin.akkanotes.protocols.StudentProtocol._
import me.yuanbin.akkanotes.protocols.TeacherProtocol._

/*
 * The Student Actor class. 
 * 
 */
class StudentActor (teacherActorRef:ActorRef) extends Actor with ActorLogging {
  def receive = {
    /*
     * This InitSignal is received from the DriverApp. 
     * On receipt, the Student sends a message to the Teacher actor. 
     * The teacher actor on receipt of the QuoteRequest responds with a QuoteResponse 
     */
    case InitSignal=> {
      teacherActorRef ! QuoteRequest
    }

    /*
     * The Student simply logs the quote received from the TeacherActor
     * 
     */
    case QuoteResponse(quoteString) => {
      log.info("Received QuoteResponse from Teacher")
      log.info(s"Printing from Student Actor $quoteString")
    }
  }
}

测试

Now, our testcase would simulate the DriverApp. Since, the StudentActor just logs the message and we won't be able to assert on the QuoteResponse itself, we'll just assert the presence of the log message in the EventStream (just like we talked last time)

我们的测试应该去驱动 DriverApp, 因为 StudentActor 仅仅只是记录了消息并且我们并不能对 QuoteResponse 本身进行断言,我们仅对 EventStream 中日志消息出现的地方进行断言(正如我们在 Akka Notes 3 - Logging and Testing Actors) 中所提到的。

源码

源码可以从 akka_notes/akka_messaging_request_response 找到。

运行方式,确保系统装好 sbt 0.13.8.

cd akka_notes/akka_messaging_request_response
sbt run
sbt test