package cn.yangg.scala.akka.init
import akka.actor.Actor
import akka.actor.Props
import akka.actor.ActorRef
import akka.actor.Terminated
import akka.event.Logging
import akka.actor.ActorSystem
import akka.pattern.ask
import scala.util.Success
import scala.util.Failure
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.ExecutionContext
import akka.actor.Terminated
case class HeartBeat(taskid:Int,parent:String)
case class TaskFinished(taskid:Int)
case object JobFinished
case object JobStart
case class TaskFailure(taskid:Int)
case class TaskMessage
case class TaskRestartMessage
case class TaskFinishedMessage
class Parent(jobid:String,tasknum:Int) extends Actor{
val log=Logging(this.context.system,this)
var tasks=Array[ActorRef]()
var replySender=this.context.system.deadLetters
varcount=0;//存在问题
def receive={
case JobStart=>{
this.replySender=this.context.sender
tasks=(1 to tasknum).map(id=>this.context.actorOf(Props(new Child(id)))).toArray
tasks.foreach(actor=>(actor ! TaskMessage))
}
case heartBeat:HeartBeat=>{
println("taskid-0000"+heartBeat.taskid+",finished:"+heartBeat.parent)
}
case TaskFinished(taskid)=>{
println("taskid-0000"+taskid+"finished...")
this.self ! TaskFinishedMessage
}
case Terminated(actor)=>
println(actor.path.toString()+"stop")
case TaskFailure(taskid)=>{
//restart task
valrestartActor=this.context.actorOf(Props(new Child(taskid*10)))
restartActor ! TaskRestartMessage
}
case TaskFinishedMessage=>{
this.count+=1
if(this.count==tasknum){
this.replySender ! akka.actor.Status.Success("all task finished")
}
println(this.count)
}
}
}
class Child(taskid:Int) extends Actor{
val log=Logging(this.context.system,this)
def receive={
case TaskMessage=>{
Thread.sleep(1000)
this.context.parent ! HeartBeat(taskid,"10%")
Thread.sleep(2000)
this.context.parent ! HeartBeat(taskid,"70%")
//task failed
this.context.stop(this.self)
if(taskid%3==0){
this.context.parent ! TaskFailure(this.taskid)
log.info("taskid="+taskid+" task failed")
}else{
this.context.parent ! TaskFinished(this.taskid)
}
}
case TaskRestartMessage=>{
log.info(taskid+" restart...")
this.context.parent ! TaskFinished(this.taskid)
}
}
}
object StartMoon {
def main(args:Array[String]){
val system=ActorSystem("actorSystem")
val jobActor=system.actorOf(Props(new Parent("DataSplit-job",10)),"DataSplitJob")
val jobListener=ask(jobActor,JobStart)(10000)
jobListener.onComplete(result => resultmatch{
case Success(result)=>{
println("job finished...,message:"+result)
}
case Failure(result)=>{
println("job failed...,message:"+result.getMessage())
}
})
}
}
http://blog.csdn.net/yangguo_2011/article/details/27399431
相关推荐
响应式架构 消息模式Actor实现与Scala.Akka应用集成 响应式架构 消息模式Actor实现与Scala.Akka应用集成
消息模式Actor实现与Scala、Akka应用集成
响应式架构++消息模式Actor实现与Scala.Akka应用集成+,沃恩·弗农+
《响应式架构:消息模式Actor实现与Scala、Akka应用集成》由10章构成,详细介绍了使用Actor模型中的响应式消息传输模式的理论和实用技巧。其中包括:Actor模型和响应式软件的主要概念、Scala语言的基础知识、Akka...
响应式架构 消息模式Actor实现与Scala.Akka应用集成 高清扫描版
用Scala写的akka actor简单demo,已经打包成SBT程序,因为上传大小限制依赖包没上传,用户安装了sbt后只需要执行update命令即可
akka集群,scala函数式编程。
Akka scala 并发 actor 高清原版pdf 学习scala实现akka进行并发编程
响应式架构 消息模式Actor实现与Scala.Akka应用集成 ,沃恩·弗农
Scala Akka项目源码
akka scala 实现求连续平方和,分布式计算,快速理解分布式计算原理!
akka-js, Scala.js的Akka actor实现 但是,这个项目不被维护,另外一个项目 !目前并没有积极地开展这个项目。 在这里提供一个主动维护的到 Scala.js的端口,可以: https://github.com/unicredit/akka.js 。我们...
Akka是JAVA虚拟机JVM平台上构建高并发、分布式和容错应用的工具包和运行时。Akka用Scala语言写成,同时提供了Scala和JAVA的开发接口。 Akka处理并发的方法基于Actor模型。在Akka里,Actor之间通信的唯一机制就是消息...
Akka是JAVA虚拟机JVM平台上构建高并发、分布式和容错应用的工具包和运行时。Akka用Scala语言写成,同时提供了Scala和JAVA的开发接口。
Spark 的RPC是通过Akka类库实现的,Akka用Scala语言开发,基于Actor并发模型实现
IntelliJ IDEA使用SBT构建一个AKKA Scala程序
Akka是JAVA虚拟机JVM平台上构建高并发、分布式和容错应用的工具包和运行时。Akka用Scala语言写成,同时提供了Scala和JAVA的开发接口。 Akka处理并发的方法基于Actor模型。在Akka里,Actor之间通信的唯一机制就是消息...