`
wbj0110
  • 浏览: 1549337 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

scala akka 修炼之路3(基于java nio的socket编程)

    博客分类:
  • Akka
阅读更多

package cn.yangg.scala.base.init

 

import java.io.Closeable

import java.nio.channels.ServerSocketChannel

import java.net.InetSocketAddress

import java.nio.channels.SocketChannel

import java.nio.charset.Charset

import java.nio.ByteBuffer

import akka.actor.Actor

import java.nio.channels.Selector

import java.nio.channels.SelectionKey

import akka.actor.ActorSystem

import akka.actor.Props

 

case class StartService(port: Int)

class ServerActor extends Actor {

  val serverChannel = ServerSocketChannel.open()

  valserverSocket =serverChannel.socket()

  val selector = Selector.open()

 

  val buffer = ByteBuffer.allocate(1024)

  val charset = Charset.forName("UTF-8")

  val charDecoder =charset.newDecoder()

 

  def receive = {

    case StartService(port) =>

      this.serverListenerStart(port)

  }

  def serverListenerStart(port: Int) = {

    serverSocket.bind(new InetSocketAddress(port))

    serverChannel.configureBlocking(false)

    serverChannel.register(selector, SelectionKey.OP_ACCEPT)

    var n =0

    while (true) {

      n = selector.select()

      if (n >0) {

        val it =selector.selectedKeys().iterator()

        while (it.hasNext()) {

          val key =it.next()

          it.remove() ////删除已选key,防止重复处理

          if (key.isAcceptable()) {

            val server =key.channel().asInstanceOf[ServerSocketChannel]

            val channel =server.accept()

            println(channel)

            if (null !=channel) {

              channel.configureBlocking(false)

              channel.register(selector, SelectionKey.OP_READ)

            }

          } else if (key.isReadable()) {

            val socket =key.channel().asInstanceOf[SocketChannel]

            var size: Int =0

            println("read data ....." + key)

            buffer.clear()

            size = socket.read(buffer)

            while (size >0) {

              buffer.flip()

              charDecoder.decode(buffer.asReadOnlyBuffer())

              .toString().split("\\.\\.\\.\\.").foreach(println)

              buffer.clear()

              size = socket.read(buffer)

            }

            if (size == -1) { //当对端主动关闭后移除key,要不然selector会一直返回可读

              socket.close()

              selector.selectedKeys().remove(key)

            }

          }

        }

      }

    }

  }

}

class ClientActor extends Actor {

  val client = SocketChannel.open()

  val buffer = ByteBuffer.allocate(1024)

  def receive = {

    case StartService(port) =>

      clientStart(port)

  }

  def clientStart(port: Int) {

    client.connect(new InetSocketAddress(port))

    while (true) {

 

      for (i <-1 to5) {

        buffer.clear()

        buffer.put(("hello server" +i +"....").getBytes("utf8"))

        buffer.flip()

        client.write(buffer)

      }

 

      Thread.sleep(1000)

      println(System.currentTimeMillis() + "message to parent....")

    }

  }

}

object SocketStart {

  def main(args: Array[String]) {

    this.akkaSocketTest("socketActor",11111)

  }

  def akkaSocketTest(actorName: String, port: Int) = {

    val actorSystem = ActorSystem(actorName)

    val serverActor =actorSystem.actorOf(Props[ServerActor],"serverActor")

    val clientActor =actorSystem.actorOf(Props[ClientActor],"clientActor")

    val startCMD =new StartService(port)

 

    serverActor !startCMD

    clientActor !startCMD

  }

}

http://blog.csdn.net/yangguo_2011/article/details/28446807

 
 
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics