首页 > 代码库 > 使用AKKA做分布式爬虫的思路

使用AKKA做分布式爬虫的思路

上周公司其他小组在讨论做分布式爬虫,我也思考了一下,提了一个方案,就是使用akka分布式rpc框架来做,自己写master和worker程序,client向master提交begin任务或者其它爬虫需求,master让worker去爬网页,worker都是kafka的同一个group然后从kafka里面拉取数据(URL),然后处理爬了的网页,解析内容,把爬下来的网页通过正则表达式匹配出嵌套的网页,然后请求actor判断是否爬过(防止生成有向图,让其变成树形结构)(这里应该是个单独的actor,这样多个请求过来不会出现线程同步问题),最后把没有爬的URL扔到Kafka,直到kafka的URL被拉去完

这里有个简单的图例:

技术分享

代码上面没有写爬虫的东西,也没有写checkActor,只是简单的做了下模拟,就写了个简单的分布式事例作为参考

代码结构如下:

技术分享

其中POM同这篇博客一样:http://blog.csdn.net/qq_20641565/article/details/65488828

Master的代码:

package com.lijie.scala.service

import scala.collection.mutable.HashMap
import scala.concurrent.duration.DurationInt

import com.lijie.scala.bean.WorkBean
import com.lijie.scala.utils.ActorUtils

import akka.actor.Actor
import akka.actor.Props
import akka.actor.actorRef2Scala
import com.lijie.scala.bean.WorkBeanInfo
import com.lijie.scala.bean.WorkBeanInfo
import com.lijie.scala.caseclass.Submit
import com.lijie.scala.caseclass.SubmitAble
import com.lijie.scala.caseclass.Hearbeat
import com.lijie.scala.caseclass.RegisterSucess
import com.lijie.scala.caseclass.CheckConn
import com.lijie.scala.caseclass.Register
import com.lijie.scala.caseclass.SubmitCrawler
import com.lijie.scala.caseclass.BeginCrawler

class Master(val masterHost: String, val masterPort: Int, val masterActorSystem: String, val masterName: String) extends Actor {

  //保存work的Actor连接
  var workerConn = new HashMap[String, WorkBean]

  //保存客户端的连接
  var clientConn = new HashMap[String, WorkBean]

  //超时时间
  val OVERTIME = 20000

  override def preStart(): Unit = {

    //隐式转换
    import context.dispatcher

    //启动的时候定时检查worker是否挂了,如果挂了就从workerConn移除
    context.system.scheduler.schedule(0 millis, OVERTIME millis, self, CheckConn)
  }

  def receive: Actor.Receive = {

    //注册
    case Register(workerId, workerHost, workerPort, workerActorSystem, workerName) => {

      //打印worker上线消息
      println(workerId + "," + workerHost + "," + workerPort + "," + workerActorSystem + "," + workerName)

      //获取Master的代理对象
      var workerRef = context.actorSelection(s"akka.tcp://$workerActorSystem@$workerHost:$workerPort/user/$workerName")

      //保存连接
      workerConn += (workerId -> new WorkBean(workerRef, 0))

      //给worker返回应答注册成功
      sender ! RegisterSucess

    }

    //接受心跳
    case Hearbeat(workerId) => {
      if (workerConn.contains(workerId)) {

        //取出workerBean
        var workBean = workerConn.get(workerId).get

        //重新设置时间
        workBean.time = System.currentTimeMillis()

        //移除之前的值
        workerConn -= workerId

        //将新值放入conn
        workerConn += (workerId -> workBean)
      }
    }

    //定时检查
    case CheckConn => {

      //得到超时的值
      var over = workerConn.filter(System.currentTimeMillis() - _._2.time > OVERTIME)

      //得到超时的值
      for (key <- over.keySet) {

        //将超时的从链接中移除
        workerConn -= key
      }

      //测试输出还有多少个链接
      val alive = workerConn.size
      println(s"还有$alive 个worker活着")
    }

    case Submit(clientId, clientHost, clientPort, clientActorSystem, clientName) => {
      //打印client上线消息
      println(clientId + "," + clientHost + "," + clientPort + "," + clientActorSystem + "," + clientName)

      //获取Master的代理对象
      var clientRef = context.actorSelection(s"akka.tcp://$clientActorSystem@$clientHost:$clientPort/user/$clientName")

      //保存连接
      clientConn += (clientId -> new WorkBean(clientRef, 0))

      //给client返回可以提交申请
      sender ! SubmitAble
    }

    //收到爬虫任务分发给worker
    case SubmitCrawler(kafka, redis, other) => {

      //让所有worker开始爬虫任务
      for (workerBean <- workerConn.values) {

        //向所有存活的worker发送爬虫任务
        workerBean.worker ! BeginCrawler(kafka, redis, other)
      }
    }
  }

}

object Master {

  def main(args: Array[String]): Unit = {
    val argss = Array[String]("127.0.0.1", "8080", "masterSystem", "actorMaster")

    val host = argss(0)

    val port = argss(1).toInt

    val actorSystem = argss(2)

    val actorName = argss(3)

    //获取master的actorSystem
    val masterSystem = ActorUtils.getActorSystem(host, port, actorSystem)

    val master = masterSystem.actorOf(Props(new Master(host, port, actorSystem, actorName)), actorName)

    masterSystem.awaitTermination()
  }
}

Worker代码如下:

package com.lijie.scala.service

import akka.actor.Actor
import akka.actor.ActorSelection
import java.util.UUID
import scala.concurrent.duration._
import com.lijie.scala.caseclass.SendHearbeat
import com.lijie.scala.utils.ActorUtils
import akka.actor.Props
import com.lijie.scala.caseclass.BeginCrawler
import com.lijie.scala.caseclass.Hearbeat
import com.lijie.scala.caseclass.RegisterSucess
import com.lijie.scala.caseclass.Register

class Worker(val workerHost: String, val workerPort: Int, val workerActorSystem: String, val workerName: String, val masterHost: String, val masterPort: Int, val masterActorSystem: String, val masterName: String) extends Actor {

  //master的代理对象
  var master: ActorSelection = _

  //每个worker的id
  val workerId = UUID.randomUUID().toString()

  override def preStart(): Unit = {

    //获取Master的代理对象
    master = context.actorSelection(s"akka.tcp://$masterActorSystem@$masterHost:$masterPort/user/$masterName")

    //向master注册
    master ! Register(workerId, workerHost, workerPort, workerActorSystem, workerName)
  }

  def receive: Actor.Receive = {

    //收到注册成功的消息,定时发送心跳
    case RegisterSucess => {
      println("收到注册成功的消息,开始发送心跳")

      //隐式转换
      import context.dispatcher

      //创建定时器,并发送心跳
      context.system.scheduler.schedule(0 millis, 10000 millis, self, SendHearbeat)

    }

    //发送心跳
    case SendHearbeat => {
      println("向master发送心跳")

      //发送心跳
      master ! Hearbeat(workerId)
    }

    //开始爬虫
    case BeginCrawler(kafka, redis, other) => {

      println("开始执行爬虫任务...")
      println("kafka和redis以及其他消息内容:" + kafka + "," + redis + "," + other)
      println("初始化kafka连接和redis连接...")
      println("从队列里面取出url...")
      println("开始爬数据...")
      println("如果失败重试几次...")
      println("............")
      println("解析这个网页的内容,解析出里面的url...")
      //请求actionCheck
      println("请求actionCheck...")
      println("检查是否爬过...")
      println("把该刚爬了的url扔到redis")
      println("把该网页解析的没有爬过的url扔到队列...")
      println("继续从队列里面拿url直到队列里面url被爬完...")
    }

  }
}

object Worker {

  def main(args: Array[String]): Unit = {
    val argss = Array[String]("127.0.0.1", "8088", "workSystem", "actorWorker", "127.0.0.1", "8080", "masterSystem", "actorMaster")

    //worker
    val host = argss(0)

    val port = argss(1).toInt

    val actorSystem = argss(2)

    val actorName = argss(3)

    //master
    val hostM = argss(4)

    val portM = argss(5).toInt

    val actorSystemM = argss(6)

    val actorNameM = argss(7)

    //获取woker的actorSystem
    val workerSystem = ActorUtils.getActorSystem(host, port, actorSystem)

    val worker = workerSystem.actorOf(Props(new Worker(host, port, actorSystem, actorName, hostM, portM, actorSystemM, actorNameM)), actorName)

    workerSystem.awaitTermination()
  }
}

ActionUtils代码如下:

package com.lijie.scala.utils

import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Actor

object ActorUtils {

  //获取actor工具类
  def getActorSystem(host: String, port: Int, actorSystem: String) = {
    val conf = s"""
      |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
      |akka.remote.netty.tcp.hostname = "$host"
      |akka.remote.netty.tcp.port = "$port"
      """.stripMargin

    val config = ConfigFactory.parseString(conf)

    //创建注册worker的的ActorSystem
    val actorSys = ActorSystem(actorSystem, config)

    //返回actorSystem
    actorSys
  }
}

WorkBean代码如下:

package com.lijie.scala.bean

import akka.actor.ActorSelection

//封装worker的引用和当前时间戳
class WorkBean(var worker: ActorSelection, var time: Long)

class WorkBeanInfo(val workerId: String, val workerHost: String, val workerPort: Int, val workerActorSystem: String, val workerName: String, var time: Long)

caseClass代码如下:

package com.lijie.scala.caseclass

//开始提交任务 client2client
case object BeginSubmit
// client2client-------------------------------

//client提交任务  client2master
case class Submit(val clientId: String, val clientHost: String, val clientPort: Int, val clientActorSystem: String, val clientName: String) extends Serializable

//提交爬虫任务
case class SubmitCrawler(val kafkaInfo: String, val redisInfo: String, val otherInfo: String)
// client2master-------------------------------

//可以提交任务 master2client
case object SubmitAble
// master2client-------------------------------

//检查哪些worker挂了 master2master
case object CheckConn

//返回注册成功 master2worker
case object RegisterSucess extends Serializable
// master2worker-------------------------------

//worker注册 worker2master
case class Register(val workerId: String, val workerHost: String, val workerPort: Int, val workerActorSystem: String, val workerName: String) extends Serializable

//发送心跳 worker2master
case class Hearbeat(workId: String) extends Serializable
// worker2master-------------------------------

//发送心跳 worker2worker
case object SendHearbeat

//爬虫 worker2worker
case class BeginCrawler(val kafkaInfo: String, val redisInfo: String, val otherInfo: String)
// worker2worker-------------------------------

最后先运行master,然后运行worker,我这里运行的两个worker,最后运行client 结果如下

Master:

技术分享

Worker01:

技术分享

Worker02:

技术分享

Client:

技术分享

<script type="text/javascript"> $(function () { $(‘pre.prettyprint code‘).each(function () { var lines = $(this).text().split(‘\n‘).length; var $numbering = $(‘
    ‘).addClass(‘pre-numbering‘).hide(); $(this).addClass(‘has-numbering‘).parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($(‘
  • ‘).text(i)); }; $numbering.fadeIn(1700); }); }); </script>

    使用AKKA做分布式爬虫的思路