AIO(异步IO)-飞外

AIO是异步IO的缩写,即Asynchronized IO。虽然NIO在网络操作中,提供了非阻塞的方法,但是NIO的IO行为还是同步的,对于NIO来说,我们的业务线程是在IO操作准备好时,得到通知,接着就由这个线程自行进行IO操作,IO操作本身还是同步的。

但是对于AIO来说,则更加的进了一步,它不是在IO准备好时再通知线程,而是在IO操作已经完成后,再给线程发出通知。因此,AIO是完全不会阻塞的。此时,我们的业务逻辑将变成一个回调函数,等待IO操作完成后,由系统自动触发。

NIO和AIO的使用场景

NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,JDK1.4开始支持。

AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如HTTP服务器等,充分调用OS参与并发操作,JDK7开始支持

下面来通过AIO实现的服务器来加深了解AIO:

AIOEchoServer:

 1 public class AIOEchoServer { 2 public static final int PORT = 8000; 3 private AsynchronousServerSocketChannel server;//异步通道 5 public AIOEchoServer() throws IOException { 6 server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(PORT)); 9 //接收和处理10 public void start(){11 System.out.println("Server listen on " + PORT);12 server.accept(null, new CompletionHandler AsynchronousSocketChannel, Object () {13 final ByteBuffer buffer = ByteBuffer.allocate(1024);14 public void completed(AsynchronousSocketChannel result,Object attachment){15 System.out.println(Thread.currentThread().getName());16 Future Integer writeResult = null;17 buffer.clear();18 try {19 result.read(buffer).get(100, TimeUnit.SECONDS);20 buffer.flip();21 writeResult = result.write(buffer);22 } catch (InterruptedException e) {23 e.printStackTrace();24 } catch (ExecutionException e) {25 e.printStackTrace();26 } catch (TimeoutException e) {27 e.printStackTrace();28 }finally {30 server.accept(null,this);31 try {32 writeResult.get();33 result.close();34 } catch (InterruptedException e) {35 e.printStackTrace();36 } catch (ExecutionException e) {37 e.printStackTrace();38 } catch (IOException e) {39 e.printStackTrace();43 @Override44 public void failed(Throwable exc, Object attachment) {45 System.out.println("failed : " + exc);47 });48 }
49 }

异步IO(AIO)需要使用异步通道。这里使用的是AsynchronousServerSocketChannel。

上述代码定义的start()方法开启了服务器,值得注意的是,这里只是调用了一个函数server.accept()。之后,这一大堆的代码只是这个函数的参数。

AsynchronousServerSocketChannel.accept()方法会立即返回。它并不会真的等待客户端的到来,这里使用的accept()方法的签名是:

public final A void accept(A attachment,CompletionHandler AsynchronousSocketChannel,? super A handler)

它的第一个参数是一个附件,可以是任意类型,作用是让当前线程和后续的回调方法可以共享这个信息,它会在后续的调用中,传递给handler。它的第二个参数是CompletionHandler接口。这个接口有两个方法:

 void completed(V result,A attachment) void failed(Throwable exc,A attachment)

这两个方法分别在异步操作accept()成功调用completed()和失败调用failed()。

因此,AsynchronousServerSocketChannel.accept()实际上做了两件事,第一就是发起accept请求,告诉系统可以开始监听端口了。第二,注册CompletionHandler实例,告诉系统,一旦有客户端前来连接,如果连接成功,就去执行CompletionHandler.completed()方法;如果连接失败,就去执行CompletionHandler.failed()方法。

所以,server.accept()方法不会阻塞,它会立即返回。

到这里,上述代码的意思其实也就差不多明白了:当completed()被执行时,意味着已经有客户端连接成功了。在第19行,使用read()方法读取客户端的数据,这里需要注意,AsynchronousServerSocketChannel.read()方法也是异步的,换句话说,就是它不会等到数据读取完成了再返回,而是立即返回,返回的结果是一个Future对象,因此这里是Future模式的典型应用。在这里为了编程方便,直接调用了Future.get()方法(第32行),进行等待,将这个异步方法变为了同步方法。因此,在19行执行完成后,数据读取就已经完成了。

之后,将数据回写给客户端(第21行),这里调用的是AsynchronousServerSocketChannel.write()方法,这个方法也是异步的,同样的返回一个Future对象。

再之后,第30行,服务器进行下一个客户端的连接准备。同时关闭当前正在处理的客户端连接。但是在关闭之前,得先确认之前的write()操作已经完成,因此,使用Future.get()方法进行等待(第32行)。

接下来,我们只需要在main函数中调用这个start()方法就可以开启服务器了:

1 public static void main(String[] args) throws IOException, InterruptedException {2 new AIOEchoServer().start();3 while (true){4 Thread.sleep(1000);6 }

上述代码第2行,调用start()方法开启服务器。但是由于start()方法中使用的是异步方法,因此它会立即返回,它并不会像阻塞方法那样会进行等待,因此,如果想让程序驻守执行,第3~5行的等待语句是必须的。否则,在start()方法结束后,不等客户端到来,程序就已经运行完成,主线程就将退出。

AIOClient:

 1 public class AIOClient { 2 public static void main(String[] args) throws IOException, InterruptedException { 3 final AsynchronousSocketChannel client = AsynchronousSocketChannel.open(); 4 client.connect(new InetSocketAddress("localhost", 8000), null, new CompletionHandler Void, Object () { 5 @Override 6 public void completed(Void result, Object attachment) { 7 client.write(ByteBuffer.wrap("Hello!".getBytes()), null, new CompletionHandler Integer, Object () { 8 @Override 9 public void completed(Integer result, Object attachment) {10 ByteBuffer buffer = ByteBuffer.allocate(1024);11 client.read(buffer, buffer, new CompletionHandler Integer, ByteBuffer () {12 @Override13 public void completed(Integer result, ByteBuffer attachment) {14 buffer.flip();15 System.out.println(new String(buffer.array()));16 try {17 client.close();18 } catch (IOException e) {19 e.printStackTrace();22 @Override23 public void failed(Throwable exc, ByteBuffer attachment) {25 });27 @Override28 public void failed(Throwable exc, Object attachment) {30 });32 @Override33 public void failed(Throwable exc, Object attachment) {35 });36 //由于主线程会立即结束,所以这里等待上述处理全部完成37 Thread.sleep(1000);39 }

上述的AIOClient代码看起来很长,实际上只有三个语句:

第一个语句:代码第3行,打开AsynchronousSocketChannel通道。

第二个语句:代码第4~35行,它让客户端去连接指定的服务器,并注册了一系列事件。

第三个语句:代码第37行,让主线程进行等待。

代码的第4行,客户端进行网络连接,并注册了连接成功的回调函数CompletionHandler Void,Object 。待连接成功后,就会进入代码第7行。第7行进行数据写入,向服务端发送数据。这个过程是异步的,会很快返回,写入完成后,会通知回调接口CompletionHandler Integer,Object ,进入第10行。准备进行数据读取,从服务端读取回写的数据。当然代码的第11行的read()方法也是立即返回的,成功读取所有的数据后,会回调CompletionHandler Integer,ByteBuffer 接口,进入第14行。在第15行,打印接收到的数据。

AIO的特点

1.读完了再通知我

2.不会加快IO,只是在读完后进行通知

3.使用回调函数,进行业务处理

参考:《Java高并发程序设计》 葛一鸣 郭超 编著:


作者:Joe 出处:https://www.cnblogs.com/Joe-Go/ 努力了的才叫梦想,不努力的就是空想,努力并且坚持下去,毕竟这是我相信的力量