首页 > 代码库 > 初见akka-02:rpc框架

初见akka-02:rpc框架

  1.RPC:简单点说,就是多线程之间的通信,我们今天用了scala以及akka

   来简单的实现了

   rpc框架的一些简单的内容,一脸包括了,心跳,间隔时间,

   注册以及一些问题,

   模式匹配的一些东西,虽然比较简单,但是属于麻雀虽小,五脏俱全

   这个里面一共有有四个文件:

   Master.scala

   RemoteMessage.scala

   Worker.scala

   WorkerInfo

  

   Master.scala

package cn.wj.rpc

import akka.actor.{Actor, ActorSystem}
import akka.actor.Actor.Receive
import com.typesafe.config.ConfigFactory
import akka.actor.Props
import scala.concurrent.duration._

import scala.collection.mutable

/**
  * Created by WJ on 2016/12/23.
  */
class Master(val host:String,val port:Int ) extends Actor {

  // workerId -> WorkerInfo
  val idToWorker = new mutable.HashMap[String,WorkerInfo]()
  val workers = new mutable.HashSet[WorkerInfo]()

  //时间间隔时间,超时检测的间隔
  val CHECK_INTERVAL = 15000
  //用于接收消息
  override def receive: Receive = {
    case RegisterWorker(id,memory,cores) => {
//      println("a client connected")
//      sender ! "reply"  //往发送给他消息的人回复一个消息
      //判断一下是不是已经注册过了
    if(!(idToWorker.contains(id))){
      //把Worker的信息封装以前,保存到内存当中
      val workerInfo = new WorkerInfo(id,memory,cores)
      idToWorker(id) = workerInfo  //这个应该是scala的特定版本
      workers += workerInfo
      sender ! RegisteredWorker(s"akka.tcp://MasterSystem@$host:$port/user/Master")
    }

    }
    case Heartbeat(id) =>{
      if(idToWorker.contains(id)) {
        val workerInfo = idToWorker(id)
        //报活
        //得到系统当前时间
        val currentTime = System.currentTimeMillis()
        workerInfo.lastHeartbeatTime = currentTime
      }
    }

    case CheckTimeOutWorker => {
      val currentTime = System.currentTimeMillis()
      val toRemove = workers.filter(x => currentTime - x.lastHeartbeatTime > CHECK_INTERVAL)
      for(w <- toRemove){
        workers -= w
        idToWorker -= w.id
      }
      println(workers.size)
    }
  }

  override def preStart(): Unit = {
     println("prestart invoked")
     //导入隐式转换的功能
     import context.dispatcher
     context.system.scheduler.schedule(0 millis,CHECK_INTERVAL millis,self,CheckTimeOutWorker)
  }
}

object Master{
  def main(args: Array[String]): Unit = {
    val host = args(0)
    val port = args(1).toInt
    // 准备配置
    val configStr =
      s"""
         |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
         |akka.remote.netty.tcp.hostname = "$host"
         |akka.remote.netty.tcp.port = "$port"
       """.stripMargin
    val config = ConfigFactory.parseString(configStr)
    //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的
    val actorSystem = ActorSystem("MasterSystem",config )
    //创建Actor
    val master = actorSystem.actorOf(Props(new Master(host,port)),"Master")
    actorSystem.awaitTermination()
  }
}

  Worker.scala

package cn.wj.rpc

import java.util.UUID

import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
/**
  * Created by WJ on 2016/12/23.
  */
class Worker(val masterHost:String,val masterPort:Int,val memory:Int,val cores:Int) extends Actor {

  var master : ActorSelection = _
  val workerId = UUID.randomUUID().toString
  val HEART_INTERVAL = 10000
  //preStart执行方法的时机:构造器之后,receive之前
  //与Master(Actor)建立连接
  override def preStart(): Unit = {
    //master已经是别的Master的引用了 ,这是跟master建立连接
    master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master")
    //向Master发送注册消息
    master ! RegisterWorker(workerId,memory,cores)
  }

  override def receive: Receive = {

    case RegisteredWorker(masterUrl) => {
      println(masterUrl)
      //启动定时器发送心跳
      import context.dispatcher
      context.system.scheduler.schedule(0 millis,HEART_INTERVAL millis,self,SendHeartbeat)
    }

    case SendHeartbeat =>{
      println("send heartbeat to master")
      master ! Heartbeat(workerId)
    }

  }
}

object Worker{
  def main(args: Array[String]): Unit = {
    val host = args(0)
    val port = args(1).toInt
    val masterHost = args(2)
    val masterPort = args(3).toInt
    val memory = args(4).toInt
    val cores = args(5).toInt

    // 准备配置
    val configStr =
    s"""
       |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
       |akka.remote.netty.tcp.hostname = "$host"
       |akka.remote.netty.tcp.port = "$port"
       """.stripMargin
    val config = ConfigFactory.parseString(configStr)
    //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的
    val actorSystem = ActorSystem("WorkerSystem",config )
    //创建Actor,此时调用该(Actor)的prestart以及receive方法
      actorSystem.actorOf(Props(new Worker(masterHost,masterPort,memory,cores)),"Worker")
     actorSystem.awaitTermination()
  }
}

  RemoteMessage.scala

package cn.wj.rpc

/**
  * Created by WJ on 2016/12/25.
  */
trait RemoteMessage extends Serializable

//Worker->Master(这个表明当master接受这个worker时的信息,是receive)
 case class RegisterWorker (id:String, memory:Int, cores:Int) extends RemoteMessage

//Master -> Worker(这个是master收到workerd的注册信息,表明已经注册过这条信息,是sender ! xxx时候出现的)
case class RegisteredWorker(masterUrl:String) extends RemoteMessage

//这是进程之间自己给自己发送消息,所以采用case object,并且不需要实现Serializable
//Worker -> Worker(self)
case object SendHeartbeat

//这个是work向master发送定时器,其中的id是work的id,因为要向master说明,是哪一个work给他发送的心跳
//Worker -> Master
case class Heartbeat(id:String)  extends RemoteMessage

//Master -> self
case object CheckTimeOutWorker

  WorkerInfo.scala

package cn.wj.rpc

/**
  * Created by WJ on 2016/12/25.
  */
class WorkerInfo(val id:String ,val memory :Int,val cores:Int) {
    //TODO 上一次心跳
  var lastHeartbeatTime:Long = _
}

  这个上面的四个就是简单的实现了RPC框架,其实就是一个Master监控多个Worker,

  当一个Worker创建了,他就是需要在Master注册信息,其实这个Master个人感觉就像

  是个Zookeeper,掌管Worker的信息,为其Worker分配一些资源,当Master接到Worker

  的注册信息的时候,他就在自己的注册表添加上这个Worker,然后向Worker发送一个注册

  成功的信息,此时这个Worker的收到这个注册信息,然后他就给Master发送心跳,这个的

  作用是在告诉Master,我这个Worker是存活的(报活),当一个Worke发送心跳的时间间隔

  过长,长过我们规定的时间,那么此时我们就需要主动杀死这个Worker,感觉hadoop的一些

  分布式和这个原理差不多。

  下面奉上原理图一张:

  技术分享

  其中的receive是用于接受信息,因为继承Actor,

  prestart这个方法是执行实在类实例之后,receive的方法之后

  

 

初见akka-02:rpc框架