package cn.allengao.actor import java.io.File import scala.actors.{Actor, Future} //旧版本还是使用actors的actor,而不是使用akka的actor import scala.collection.mutable import scala.io.Source class Task extends Actor { override def act(): Unit = { //希望程序线程池可以复用,效率会更高一些 loop { react { //首先提交任务,告诉程序要执行的文件 case SubmitTask(fileName) => { //getLines取出的是一个迭代器,可以实现从文件中取一些,读一些的操作。进行局部汇总。 val result = Source.fromFile(fileName).getLines().flatMap(_.split(" ")).map((_, 1)).toList.groupBy(_._1).mapValues(_.size) //把result的结果发送给case class ResultTask。 sender ! ResultTask(result) } //如果任务执行完就停止 case StopTask => { exit() } } } } } case class SubmitTask(fileName: String) case class ResultTask(result: Map[String, Int]) case object StopTask object MyWordCount { def main(args: Array[String]) { val files = Array[String]("d://files//word1.txt", "d://files//word2.log") //用HashSet可变集合将replySet装起来 val replySet = new mutable.HashSet[Future[Any]] val resultList = new mutable.ListBuffer[ResultTask] //在这里有多少个文件就new多少个actor for (f <- files) { val actor = new Task //使用异步发送消息,返回值是 Future[Any] val reply = actor.start() !! SubmitTask(f) replySet += reply } //通过while循环读取replySet的内容,如果replySet.size=0,程序停止计算。 while (replySet.size > 0) { //通过filter方法生成一个新的HashSet,里面装载的是任务完成的Future。 val toCumpute = replySet.filter(_.isSet) for (r <- toCumpute) { //将结果取出来,相当于java中的get方法,取出来的是一个[Any]。 val result = r.apply().asInstanceOf[ResultTask] resultList += result //将计算完的replaySet replySet -= r } //防止局部任务执行冲突,设置一个延迟 Thread.sleep(100) } //汇总的功能 //List(Map(),Map(),Map()),flat之后将Map()压平,生成List((hello,5),(tom,2),(hello,2),(jerry,2)) val finalResult = resultList.flatMap(_.result).groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2)).toList.sortBy(_._2).reverse println(finalResult) } }