首页 > 代码库 > 初见akka-02:rpc框架
初见akka-02:rpc框架
1.RPC:简单点说,就是多线程之间的通信,我们今天用了scala以及akka
来简单的实现了
rpc框架的一些简单的内容,一脸包括了,心跳,间隔时间,
注册以及一些问题,
模式匹配的一些东西,虽然比较简单,但是属于麻雀虽小,五脏俱全
这个里面一共有有四个文件:
Master.scala
RemoteMessage.scala
Worker.scala
WorkerInfo
Master.scala
package cn.wj.rpc import akka.actor.{Actor, ActorSystem} import akka.actor.Actor.Receive import com.typesafe.config.ConfigFactory import akka.actor.Props import scala.concurrent.duration._ import scala.collection.mutable /** * Created by WJ on 2016/12/23. */ class Master(val host:String,val port:Int ) extends Actor { // workerId -> WorkerInfo val idToWorker = new mutable.HashMap[String,WorkerInfo]() val workers = new mutable.HashSet[WorkerInfo]() //时间间隔时间,超时检测的间隔 val CHECK_INTERVAL = 15000 //用于接收消息 override def receive: Receive = { case RegisterWorker(id,memory,cores) => { // println("a client connected") // sender ! "reply" //往发送给他消息的人回复一个消息 //判断一下是不是已经注册过了 if(!(idToWorker.contains(id))){ //把Worker的信息封装以前,保存到内存当中 val workerInfo = new WorkerInfo(id,memory,cores) idToWorker(id) = workerInfo //这个应该是scala的特定版本 workers += workerInfo sender ! RegisteredWorker(s"akka.tcp://MasterSystem@$host:$port/user/Master") } } case Heartbeat(id) =>{ if(idToWorker.contains(id)) { val workerInfo = idToWorker(id) //报活 //得到系统当前时间 val currentTime = System.currentTimeMillis() workerInfo.lastHeartbeatTime = currentTime } } case CheckTimeOutWorker => { val currentTime = System.currentTimeMillis() val toRemove = workers.filter(x => currentTime - x.lastHeartbeatTime > CHECK_INTERVAL) for(w <- toRemove){ workers -= w idToWorker -= w.id } println(workers.size) } } override def preStart(): Unit = { println("prestart invoked") //导入隐式转换的功能 import context.dispatcher context.system.scheduler.schedule(0 millis,CHECK_INTERVAL millis,self,CheckTimeOutWorker) } } object Master{ def main(args: Array[String]): Unit = { val host = args(0) val port = args(1).toInt // 准备配置 val configStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin val config = ConfigFactory.parseString(configStr) //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的 val actorSystem = ActorSystem("MasterSystem",config ) //创建Actor val master = actorSystem.actorOf(Props(new Master(host,port)),"Master") actorSystem.awaitTermination() } }
Worker.scala
package cn.wj.rpc import java.util.UUID import akka.actor.{Actor, ActorSelection, ActorSystem, Props} import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ /** * Created by WJ on 2016/12/23. */ class Worker(val masterHost:String,val masterPort:Int,val memory:Int,val cores:Int) extends Actor { var master : ActorSelection = _ val workerId = UUID.randomUUID().toString val HEART_INTERVAL = 10000 //preStart执行方法的时机:构造器之后,receive之前 //与Master(Actor)建立连接 override def preStart(): Unit = { //master已经是别的Master的引用了 ,这是跟master建立连接 master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master") //向Master发送注册消息 master ! RegisterWorker(workerId,memory,cores) } override def receive: Receive = { case RegisteredWorker(masterUrl) => { println(masterUrl) //启动定时器发送心跳 import context.dispatcher context.system.scheduler.schedule(0 millis,HEART_INTERVAL millis,self,SendHeartbeat) } case SendHeartbeat =>{ println("send heartbeat to master") master ! Heartbeat(workerId) } } } object Worker{ def main(args: Array[String]): Unit = { val host = args(0) val port = args(1).toInt val masterHost = args(2) val masterPort = args(3).toInt val memory = args(4).toInt val cores = args(5).toInt // 准备配置 val configStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin val config = ConfigFactory.parseString(configStr) //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的 val actorSystem = ActorSystem("WorkerSystem",config ) //创建Actor,此时调用该(Actor)的prestart以及receive方法 actorSystem.actorOf(Props(new Worker(masterHost,masterPort,memory,cores)),"Worker") actorSystem.awaitTermination() } }
RemoteMessage.scala
package cn.wj.rpc /** * Created by WJ on 2016/12/25. */ trait RemoteMessage extends Serializable //Worker->Master(这个表明当master接受这个worker时的信息,是receive) case class RegisterWorker (id:String, memory:Int, cores:Int) extends RemoteMessage //Master -> Worker(这个是master收到workerd的注册信息,表明已经注册过这条信息,是sender ! xxx时候出现的) case class RegisteredWorker(masterUrl:String) extends RemoteMessage //这是进程之间自己给自己发送消息,所以采用case object,并且不需要实现Serializable //Worker -> Worker(self) case object SendHeartbeat //这个是work向master发送定时器,其中的id是work的id,因为要向master说明,是哪一个work给他发送的心跳 //Worker -> Master case class Heartbeat(id:String) extends RemoteMessage //Master -> self case object CheckTimeOutWorker
WorkerInfo.scala
package cn.wj.rpc /** * Created by WJ on 2016/12/25. */ class WorkerInfo(val id:String ,val memory :Int,val cores:Int) { //TODO 上一次心跳 var lastHeartbeatTime:Long = _ }
这个上面的四个就是简单的实现了RPC框架,其实就是一个Master监控多个Worker,
当一个Worker创建了,他就是需要在Master注册信息,其实这个Master个人感觉就像
是个Zookeeper,掌管Worker的信息,为其Worker分配一些资源,当Master接到Worker
的注册信息的时候,他就在自己的注册表添加上这个Worker,然后向Worker发送一个注册
成功的信息,此时这个Worker的收到这个注册信息,然后他就给Master发送心跳,这个的
作用是在告诉Master,我这个Worker是存活的(报活),当一个Worke发送心跳的时间间隔
过长,长过我们规定的时间,那么此时我们就需要主动杀死这个Worker,感觉hadoop的一些
分布式和这个原理差不多。
下面奉上原理图一张:
其中的receive是用于接受信息,因为继承Actor,
prestart这个方法是执行实在类实例之后,receive的方法之后
初见akka-02:rpc框架