指导思想:
1、利用RPC通信框架(AKKA)
2、定义2个类Master、Worker
———————————————-———————————————-———————————–
首先启动Master,然后启动所有的Worker
1、Worker启动后,在PreStart方法中与Master建立连接,向Master发送注册,将Worker的信息通过case class封装起来发送给Master。
2、Master接受到Worker的注册消息后将Worker的信息保存起来,然后向Worker反馈注册成功。
3、Worker定期向Master发送心跳,目的是为了报告存活状态。
4、Master会定时清理超时的Worker。
———————————————-———————————————-———————-————–
首先打开Idea软件,建立maven项目:
编写Pom文件,Pom文件的代码如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.allengao.akka</groupId> <artifactId>my-rpc</artifactId> <version>1.0</version> <properties> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> <encoding>UTF-8</encoding> <scala.version>2.10.6</scala.version> <scala.compat.version>2.10</scala.compat.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-actor_2.10</artifactId> <version>2.3.14</version> </dependency> <dependency> <groupId>com.typesafe.akka</groupId> <artifactId>akka-remote_2.10</artifactId> <version>2.3.14</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-make:transitive</arg> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"> <resource>reference.conf</resource> </transformer> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>cn.allengao.rpc.Worker</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
RemoteMessage的代码如下:
package cn.allengao.rpc /** * class_name: * package: * describe: TODO * creat_user: Allen Gao * creat_date: 2018/1/18 * creat_time: 16:36 **/ trait RemoteMessage extends Serializable //Worker -> Master case class RegisterWorker(id: String, host: String, port: Int, memory: Int, cores: Int) extends RemoteMessage case class Heartbeat(workId: String) extends RemoteMessage //Master -> Worker case class RegisteredWorker(masterUrl: String) extends RemoteMessage //Worker -> self case object SendHeartbeat // Master -> self case object CheckTimeOutWorker
WorkerInfo的代码如下:
package cn.allengao.rpc /** * class_name: * package: * describe: 把worker启动的信息封装起来,并且传给master。 * creat_user: Allen Gao * creat_date: 2018/1/18 * creat_time: 16:37 **/ class WorkerInfo(val id: String, val host:String, val port: Int, val memory: Int, val cores: Int) { //TODO 上一次心跳 var lastHeartbeatTime : Long = _ }
Master的代码如下:
package cn.allengao.rpc import akka.actor.{Actor, ActorSystem, Props} import com.typesafe.config.ConfigFactory import scala.collection.mutable import scala.concurrent.duration._ /** * class_name: * package: * describe: TODO * creat_user: Allen Gao * creat_date: 2018/1/18 * creat_time: 16:36 **/ class Master(val host: String, val port: Int) extends Actor { // workerId -> WorkerInfo val idToWorker = new mutable.HashMap[String, WorkerInfo]() // WorkerInfo val workers = new mutable.HashSet[WorkerInfo]() //使用set删除快, 也可用linkList //超时检查的间隔 val CHECK_INTERVAL = 15000 //preStart方法在构造器执行构造方法以后,receive执行方法之前执行,并且只执行一次。 override def preStart(): Unit = { println("preStart invoked") //导入隐式转换,在preStart启动一个定时器,用于周期检查超时的Worker import context.dispatcher //使用timer太low了, 可以使用akka的, 使用定时器, 要导入这个包 context.system.scheduler.schedule(0 millis, CHECK_INTERVAL millis, self, CheckTimeOutWorker) } // 用于接收消息,receive方法循环执行。 override def receive: Receive = { case RegisterWorker(id, host, port, memory, cores) => { //判断一下,是不是已经注册过 if(!idToWorker.contains(id)){ //把Worker的信息封装起来保存到内存当中 val workerInfo = new WorkerInfo(id, host, port, memory, cores) idToWorker += (id -> workerInfo) //key是id,value是workInfo workers += workerInfo println("a worker registered") // sender ! RegisteredWorker(s"akka.tcp://MasterSystem@$host:$port/user/Master") sender ! RegisteredWorker(s"akka.tcp://${Master.MASTER_SYSTEM}" + s"@$host:$port/user/${Master.MASTER_ACTOR}")//通知worker注册 } } case Heartbeat(workerId) => { if(idToWorker.contains(workerId)){ val workerInfo = idToWorker(workerId) //报活 val current_time = System.currentTimeMillis() workerInfo.lastHeartbeatTime = current_time } } case CheckTimeOutWorker => { val currentTime = System.currentTimeMillis() val toRemove : mutable.HashSet[WorkerInfo] = workers.filter(w => currentTime - w.lastHeartbeatTime > CHECK_INTERVAL) // for(w <- toRemove) { // workers -= w // idToWorker -= w.id // } toRemove.foreach(deadWorker =>{ idToWorker -= deadWorker.id workers -= deadWorker }) println("num of workers " + workers.size) } } } object Master { //声明两个变量MaterSystem和MasterActor val MASTER_SYSTEM = "MasterSystem" val MASTER_ACTOR = "Master" def main(args: Array[String]) { //解析传入的参数主机名和端口号 val host = args(0) val port = args(1).toInt // 准备配置信息:(|之间是解析数据,以“=”分割,“=”前面是参数(key),后面是数值(value)。) val configStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin //ConfigFactory配置文件信息类把配置文件信息传入 val config = ConfigFactory.parseString(configStr) //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的 val actorSystem = ActorSystem(MASTER_SYSTEM, config) //创建Actor actorSystem.actorOf(Props(new Master(host, port)), MASTER_ACTOR) //调用线程等待 actorSystem.awaitTermination() } }
Worker的代码如下:
package cn.allengao.rpc /** * class_name: * package: * describe: TODO * creat_user: Allen Gao * creat_date: 2018/1/18 * creat_time: 16:37 **/ import java.util.UUID import akka.actor.{Actor, ActorSelection, ActorSystem, Props} import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ /** * Created by root on 2016/5/13. */ class Worker(val host: String, val port: Int,val masterHost: String, val masterPort: Int, val memory: Int, val cores: Int) extends Actor{ val worker_id = UUID.randomUUID().toString var masterUrl : String = _ val HEART_INTERVAL = 10000 var master : ActorSelection = _ // override def preStart(): Unit = { //跟Master建立连接 // master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$masterPort/user/Master") master = context.actorSelection(s"akka.tcp://${Master.MASTER_SYSTEM}" + s"@$masterHost:$masterPort/user/${Master.MASTER_ACTOR}") //向Master发送注册消息 master ! RegisterWorker(worker_id,host,port, memory, cores) } override def receive: Receive = { case RegisteredWorker(masterUrl) => { println(masterUrl) //启动定时器发送心跳,心跳是一个case class //导入一个隐式转换,才能启动定时器 import context.dispatcher //多长时间后执行 单位,多长时间执行一次 单位, 消息的接受者 // (直接给master发不好, 先给自己发送消息, 以后可以做下判断, 什么情况下再发送消息), 信息 context.system.scheduler.schedule(0 millis, HEART_INTERVAL millis, self, SendHeartbeat) } case SendHeartbeat => { println("send heartbeat to master") //发送心跳之前要进行一些检查 master ! Heartbeat(worker_id) } } } object Worker { val WORKER_SYSTEM = "WorkerSystem" val WORKER_ACTOR = "Worker" def main(args: Array[String]) { val host = args(0) val port = args(1).toInt val masterHost = args(2) val masterPort = args(3).toInt //分析任务用到的内存,CPU核数 val memory = args(4).toInt val cores = args(5).toInt // 准备配置 val configStr = s""" |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = "$port" """.stripMargin val config = ConfigFactory.parseString(configStr) //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的 val actorSystem = ActorSystem(WORKER_SYSTEM, config) actorSystem.actorOf(Props(new Worker(host, port, masterHost, masterPort, memory, cores)), WORKER_ACTOR) //调用线程等待 actorSystem.awaitTermination() } }
运行Master的代码:模拟传入的参数如下
接着右键Run Master,效果如下:
运行Woker的程序:模拟传入的参数如下
右键运行Worker的代码,效果如下:
当停止worker运行时,可以从master的运行状态中看出检测到worker连接断开,连接数重新回到“0”。
至此整个akka的RPC通信小实例测试完毕。