import akka.actor._
import akka.routing._
import scala.annotation.tailrec

object FibonacciRoutee {
  case class FibonacciNumber(nbr: Int)
  def props = Props[FibonacciRoutee]
class FibonacciRoutee extends Actor with ActorLogging {
 import FibonacciRoutee._

  override def receive: Receive = {
    case FibonacciNumber(nbr) =>
      val answer = fibonacci(nbr)
      log.info(s"${self.path.name}‘s answer: Fibonacci($nbr)=$answer")
  private def fibonacci(n: Int): Int = {
    def fib(n: Int, b: Int, a: Int): Int = n match {
      case 0 => a
      case _ =>
        fib(n - 1, a + b, b)
    fib(n, 1, 0)
object RouterDemo extends App {
  import FibonacciRoutee._
  val routingSystem = ActorSystem("routingSystem")
  val router = routingSystem.actorOf(

  router ! FibonacciNumber(10)
  router ! FibonacciNumber(13)
  router ! FibonacciNumber(15)
  router ! FibonacciNumber(17)





akka {
  prio-dispatcher {
    mailbox-type = "PriorityMailbox"
  actor {
    deployment {
      /balance-pool-router {
        router = balancing-pool
        nr-of-instances = 3
        pool-dispatcher {
          executor = "fork-join-executor"
          # Configuration for the fork join pool
          fork-join-executor {
            # Min number of threads to cap factor-based parallelism number to
            parallelism-min = 3
            # Parallelism (threads) ... ceil(available processors * factor)
            parallelism-factor = 2.0
            # Max number of threads to cap factor-based parallelism number to
            parallelism-max = 3
          # Throughput defines the maximum number of messages to be
          # processed per actor before the thread jumps to the next actor.
          # Set to 1 for as fair as possible.
          throughput = 1



[INFO] [05/30/2017 20:13:52.323] [routingSystem-BalancingPool-/balance-pool-router-5] [akka://routingSystem/user/balance-pool-router/$b] $b‘s answer: Fibonacci(13)=233
[INFO] [05/30/2017 20:13:52.323] [routingSystem-BalancingPool-/balance-pool-router-7] [akka://routingSystem/user/balance-pool-router/$a] $a‘s answer: Fibonacci(10)=55
[INFO] [05/30/2017 20:13:52.323] [routingSystem-BalancingPool-/balance-pool-router-6] [akka://routingSystem/user/balance-pool-router/$c] $c‘s answer: Fibonacci(15)=610
[INFO] [05/30/2017 20:13:52.323] [routingSystem-BalancingPool-/balance-pool-router-6] [akka://routingSystem/user/balance-pool-router/$c] $c‘s answer: Fibonacci(17)=1597



object FibonacciRoutee {
  case class FibonacciNumber(nbr: Int, msDelay: Int)  //增加延迟参数
  case class GetAnswer(nbr: Int)

  class RouteeException extends Exception

  def props = Props[FibonacciRoutee]
class FibonacciRoutee extends Actor with ActorLogging {
 import FibonacciRoutee._
 import context.dispatcher

  override def receive: Receive = {
    case FibonacciNumber(nbr,ms) =>
      context.system.scheduler.scheduleOnce(ms second,self,GetAnswer(nbr))
    case GetAnswer(nbr) =>
      if (Random.nextBoolean())
        throw new RouteeException
      else {
        val answer = fibonacci(nbr)
        log.info(s"${self.path.name}‘s answer: Fibonacci($nbr)=$answer")
  private def fibonacci(n: Int): Int = {
    def fib(n: Int, b: Int, a: Int): Int = n match {
      case 0 => a
      case _ =>
        fib(n - 1, a + b, b)
    fib(n, 1, 0)

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.info(s"Restarting ${self.path.name} on ${reason.getMessage}")
    message foreach {m => self ! m}
    super.preRestart(reason, message)

  override def postRestart(reason: Throwable): Unit = {
    log.info(s"Restarted ${self.path.name} on ${reason.getMessage}")

  override def postStop(): Unit = {
    log.info(s"Stopped ${self.path.name}!")



object RouterDemo extends App {
  import FibonacciRoutee._
  import scala.concurrent.ExecutionContext.Implicits.global
  val routingSystem = ActorSystem("routingSystem")
  /* cannot set SupervisorStrategy in config file
  val router = routingSystem.actorOf(
  val routingDecider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
    case _: RouteeException => SupervisorStrategy.Restart
  val routerSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(
  val router = routingSystem.actorOf(
    BalancingPool(nrOfInstances = 3
      ,supervisorStrategy=routerSupervisorStrategy    //set SupervisorStrategy here

  router ! FibonacciNumber(10,5)
  router ! FibonacciNumber(13,2)
  router ! FibonacciNumber(15,3)
  router ! FibonacciNumber(17,1)





Akka中有些routing模式支持Router-Pool Routee的自动增减。由于BalancingPool不支持此项功能,下面我们就用RoundRobinPool来做个示范。由于需要定义监管策略,只有在代码中设置Resizer了:

 val resizer = DefaultResizer(
    lowerBound = 2, upperBound = 5, pressureThreshold = 1
    ,rampupRate = 1, backoffRate = 0.25
    ,backoffThreshold = 0.25, messagesPerResize = 1
  val router = routingSystem.actorOf(
    RoundRobinPool(nrOfInstances = 2
    ,resizer = Some(resizer)
    ,supervisorStrategy = routerSupervisorStrategy)

以上resizer设置为:Routee最少2个,可以自动增加到5个。运行后routingSystem自动增加了两个Routee: c,d。



import akka.actor._
import akka.routing._
import scala.annotation.tailrec
import scala.concurrent.duration._
import scala.util.Random

object FibonacciRoutee {
  case class FibonacciNumber(nbr: Int, msDelay: Int)  //增加延迟参数
  case class GetAnswer(nbr: Int)

  class RouteeException extends Exception

  def props = Props[FibonacciRoutee]
class FibonacciRoutee extends Actor with ActorLogging {
 import FibonacciRoutee._
 import context.dispatcher

  override def receive: Receive = {
    case FibonacciNumber(nbr,ms) =>
      context.system.scheduler.scheduleOnce(ms second,self,GetAnswer(nbr))
    case GetAnswer(nbr) =>
      if (Random.nextBoolean())
        throw new RouteeException
      else {
        val answer = fibonacci(nbr)
        log.info(s"${self.path.name}‘s answer: Fibonacci($nbr)=$answer")
  private def fibonacci(n: Int): Int = {
    def fib(n: Int, b: Int, a: Int): Int = n match {
      case 0 => a
      case _ =>
        fib(n - 1, a + b, b)
    fib(n, 1, 0)

  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    log.info(s"Restarting ${self.path.name} on ${reason.getMessage}")
    message foreach {m => self ! m}
    super.preRestart(reason, message)

  override def postRestart(reason: Throwable): Unit = {
    log.info(s"Restarted ${self.path.name} on ${reason.getMessage}")

  override def postStop(): Unit = {
    log.info(s"Stopped ${self.path.name}!")

object RouterDemo extends App {
  import FibonacciRoutee._
  import scala.concurrent.ExecutionContext.Implicits.global
  val routingSystem = ActorSystem("routingSystem")
  /* cannot set SupervisorStrategy in config file
  val router = routingSystem.actorOf(
  val routingDecider: PartialFunction[Throwable,SupervisorStrategy.Directive] = {
    case _: RouteeException => SupervisorStrategy.Restart
  val routerSupervisorStrategy = OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 5 seconds)(
  /* does not support resizing routees
  val router = routingSystem.actorOf(
    BalancingPool(nrOfInstances = 3
      ,supervisorStrategy=routerSupervisorStrategy    //set SupervisorStrategy here
  ) */

  val resizer = DefaultResizer(
    lowerBound = 2, upperBound = 5, pressureThreshold = 1
    ,rampupRate = 1, backoffRate = 0.25
    ,backoffThreshold = 0.25, messagesPerResize = 1
  val router = routingSystem.actorOf(
    RoundRobinPool(nrOfInstances = 2
    ,resizer = Some(resizer)
    ,supervisorStrategy = routerSupervisorStrategy)

  router ! FibonacciNumber(10,5)
  router ! FibonacciNumber(13,2)
  router ! FibonacciNumber(15,3)
  router ! FibonacciNumber(17,1)
  router ! FibonacciNumber(27,1)
  router ! FibonacciNumber(37,1)
  router ! FibonacciNumber(47,1)
















