首页 > 代码库 > scala Actor -03

scala Actor -03

  1.对于上一篇讲解的scala的一些补充

    val files = Array[String]("a.txt","b.txt","c.txt")

    for(f <- files){xxxx}

  目标一:熟悉Scala Actor并发编程

  目标二:为学习Akka做准备

    注:我们现在学的Scala Actor是scala 2.10.x版本及以前版本的Actor。

    Scala在2.11.x版本中将Akka加入其中,作为其默认的Actor,

    老版本的Actor已经废弃

   2.概念

    Scala中的Actor能够实现并行编程的强大功能,它是基于事件模型的

    并发机制,

    Scala是运用消息(message)的发送、接收来实现多线程的。

    使用Scala能够更容易地实现多线程应用的开发

   3.Actor方法执行顺序

    1.首先调用start()方法执行Actor

    2.调用start()方法后其act()方法会被执行

    3.向Actor发送消息

   4.wordCount的Actor的计算方法,虽然现在不用,但是思路还是有用的

    

package main.cn.wj.test
import scala.actors.{Actor, Future}
import scala.collection.immutable.HashSet
import scala.io.Source
import scala.collection.mutable.ListBuffer
/**
  * Created by WJ on 2016/12/22.
  */

class Task extends Actor{
  override def act(): Unit = {
      loop{
             react{
               case   SubmitTask(filename) =>{
                 val result = Source.fromFile(filename).getLines().flatMap(_.split(" ")).map((_,1)).toList.groupBy(_._1).mapValues(_.size)
                 sender ! ResultTask(result)
               }
               case StopTask =>{
                 exit()
               }
             }
      }
  }
}

case class SubmitTask(filename:String)

case class ResultTask (result:Map[String,Int])

case object StopTask

object ActorWordCount {
  def main(args: Array[String]): Unit = {
    var replySet = new HashSet[Future[Any]]()
    val resultList =  new ListBuffer[ResultTask]
    val files = Array[String]("E://Test/words.log", "E://Test/words.txt")
    for (f <- files) {
         val actor = new Task
         val reply = actor.start() !! SubmitTask(f)    //<reply 等同于Future>
         replySet  += reply
    }
    while(replySet.size > 0 ){
      val toCompute = replySet.filter(_.isSet)
      for(f <- toCompute) {
         val result = f.apply().asInstanceOf[ResultTask]
        resultList += result
        replySet -= f
      }
      Thread.sleep(100)
    }

    // reduce功能 ,汇总
    //List
    val fr = resultList.flatMap(_.result).groupBy((_._1)).mapValues(_.foldLeft(0)(_+_._2))
    println(fr)
  }
}

  5.看了上面的关于多线程相关的知识点,看看我们的线程池的代码

  

package main.cn.wj.test

import java.util.concurrent.{Executor, Executors}

/**
  * Created by WJ on 2016/12/22.
  */
object ThreadDemo {
  def main(args: Array[String]): Unit = {
       val pool = Executors.newFixedThreadPool(5);
    for (i <- 1 to 10){
      pool.execute(new Runnable {
        override def run(): Unit = {
          println(Thread.currentThread().getName)
          Thread.sleep(1000)
        }
      })
    }
  }
}

 

scala Actor -03