首页 > 代码库 > 初见akka-01

初见akka-01

  最近在学习akka,在看rpc相关的东西,有点脑子疼,哈哈

  1.需求:

    目前大多数分布式架构底层通信是通过RPC实现的,RPC框架非常多,

    比如我们学过的Hadoop项目的RPC通信框架,但是Hadoop在设计之初就

    是为了运行长达数小时的批量而设计的,在某些极端的情况下,

    任务提交的延迟很高,所有Hadoop的RPC显得有些笨重。

  2.特点

    Spark 的RPC是通过Akka类库实现的,Akka用Scala语言开发,

    基于Actor并发模型实现,

    Akka具有高可靠、高性能、可扩展等特点,使用Akka可以轻松

    实现分布式RPC功能

  3.Akka简介

    Akka基于Actor模型,提供了一个用于构建可扩展的(Scalable)、

    弹性的(Resilient)、快速响应的(Responsive)应用程序的平台

  4.一句话描述RPC:

    不同进程之间的通信调用叫做RPC,只要有网络通信即可

  5.进程和线程之间的关系

    一个进程包含多个线程,因为启动一个进程就相当于启动了

    一个jvm(虚拟机)

  6.重要的类的描述

    ActorSystem是这个进程中Actor的老大,负责和监控所有的actor,

    我们可以使用这个,ActorSystem创建很多个Actor,通常是

    一个单例对象,Actor负责通信

  7.关于一个简单的akka的小例子,自己给自己发送信息

package cn.xx.rpc

import akka.actor.{Actor, ActorSystem}
import akka.actor.Actor.Receive
import com.typesafe.config.ConfigFactory
import akka.actor.Props

/**
  * Created by XX on 2016/12/23.
  */
class Master extends Actor {

  println("constructor invoked")

  //用于接收消息
  override def receive: Receive = {
    case "connect" => {
      println("a client connected")
    }
    case "hello" =>{
      println("hello")
    }
  }

  override def preStart(): Unit = {
     println("prestart invoked")
  }
}

object Master{
  def main(args: Array[String]): Unit = {
    val host = args(0)
    val port = args(1).toInt
    // 准备配置
    val configStr =
      s"""                                                //这个s要确定,只有这样才能加入变量
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
       """.stripMargin
    val config = ConfigFactory.parseString(configStr)
    //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的
    val actorSystem = ActorSystem("MasterSystem",config )
    //创建Actor
    val master = actorSystem.actorOf(Props[Master],"Master")
    master ! "hello"
    actorSystem.awaitTermination()
  }
}

  8.简单的不同通信之间的RPC的通行

  Master.scala

package cn.wj.rpc

import akka.actor.{Actor, ActorSystem}
import akka.actor.Actor.Receive
import com.typesafe.config.ConfigFactory
import akka.actor.Props

/**
  * Created by WJ on 2016/12/23.
  */
class Master extends Actor {

  println("constructor invoked")

  //用于接收消息
  override def receive: Receive = {
    case "connect" => {
      println("a client connected")
      sender ! "reply"  //往发送给他消息的人回复一个消息
    }
    case "hello" =>{
      println("hello")
    }
  }

  override def preStart(): Unit = {
     println("prestart invoked")
  }
}

object Master{
  def main(args: Array[String]): Unit = {
    val host = args(0)
    val port = args(1).toInt
    // 准备配置
    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
       """.stripMargin
    val config = ConfigFactory.parseString(configStr)
    //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的
    val actorSystem = ActorSystem("MasterSystem",config )
    //创建Actor
    val master = actorSystem.actorOf(Props[Master],"Master")
    master ! "hello"
    actorSystem.awaitTermination()
  }
}

  Worker.scala

package cn.wj.rpc

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory

/**
  * Created by WJ on 2016/12/23.
  */
class Worker(val mastHost:String,val mastPort:Int) extends Actor {

  var master : ActorSelection = _

  //preStart执行方法的时机:构造器之后,receive之前
  //与Master(Actor)建立连接
  override def preStart(): Unit = {
    //master已经是别的Master的引用了
//    master = context.actorSelection(s"akka.tcp://MasterSystem@$mastHost:$mastPort/user/Master")
    master = context.actorSelection(s"akka.tcp://MasterSystem@192.168.109.1:8888/user/Master")
    //akka.tcp://MasterSystem@192.168.109.1:8888
    master ! "connect"
  }

  override def receive: Receive = {
    case "reply" => {
      println("a reply form master")
    }
  }
}

object Worker{
  def main(args: Array[String]): Unit = {
    val host = args(0)
    val port = args(1).toInt
    val masterHost = args(2)
    val masterPort = args(3).toInt
    // 准备配置
    val configStr =
    s"""
       |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
       |akka.remote.netty.tcp.hostname = "$host"
       |akka.remote.netty.tcp.port = "$port"
       """.stripMargin
    val config = ConfigFactory.parseString(configStr)
    //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的
    val actorSystem = ActorSystem("WorkerSystem",config )
    //创建Actor,此时调用该(Actor)的prestart以及receive方法
      actorSystem.actorOf(Props(new Worker(masterHost,masterPort)),"Worker")
     actorSystem.awaitTermination()
  }
}

  9.通信业务逻辑

    首先启动Master,然后启动所有的Worker
    1.Worker启动后,在preStart方法中与Master建立连接,

     向Master发送注册,将Worker的信息

     通过case class封装起来发送给Master

    2.Master接受Worker的注册消息后将Worker的消息保存起来
    3.Worker定期向Master发送心跳,为了报活

初见akka-01