NIO-网络IO

1. 基础

  • 使用NIO完成网络通信的三个要素

  • 通道(Channel):负责连接

    • 实现了java.nio.channels.Channel接口,分为阻塞模式和非阻塞模式。
    • SelectableChannel
      • AbstractSelectableChannel
        • SocketChannel
        • ServerSocketChannel
        • DatagramChannel
        • Pipe.SinkChannel
        • Pipe.SourceChannel
        • SctpChannel
        • SctpMultiChannel
        • SctpServerChannel
  • 缓冲区(Buffer):负责数据的存取
  • 选择器(Selector):是SelectableChannel的多路复用器,用于监控SelectableChannel的IO状况

2. 阻塞式NIO(TCP)

案例1:客户端向服务端发送文件

public class BlockingTest{
    //客户端
    public void client(){
        Socket socket = new Socket();
        socket.connect(new InetSocketAddress("127.0.0.1",9898),10);
        SocketChannel sChannel = socket.getChannel();
        
        FileChannel inChannel = FileChannel.open(Paths.get("1.txt"),StandardOpenOption.READ);
        
        ByteBuffer byteBuf = ByteBuffer.allocate(1024);
        
        while(inChannel.read(byteBuf) != -1){
            byteBuf.flip();
            sChannel.write(byteBuf);
            byteBuf.clear();
        }
        inChannel.close();
        sChannel.close();
    }

    //服务端
    public void server(){
        //未绑定的
        ServerSocket serverSocket = new ServerSocket(); 
        serverSocket.bind(new InetSocketAddress(9898));  //直接用本地地址
        //直接获取绑定好的
        //
        //ServerSocket serverSocket = new ServerSocket(9898);
        //
        //获取通道
        ServerSocketChannel ssChannel = serverSocket.getChannel();
        //用通道接收返回通道
        SocketChannel sChannel = ssChannel.accept();
        
        //用套接字接收
        //Socket socket = serverSocket.accept();
        //SocketChannel sChannel = socket.getChannel();
        //
        
        FileChannel fileChannel = FileChannel.open(Paths.get("2.txt"),StandardOpenOption.WRITE,StandardOpenOption.CREATE);
        
        ByteBuffer byteBuf = ByteBuffer.allocate(1024);
        while(sChannel.read(byteBuf) != -1){
            byteBuf.flip();
            fileChannel.write(byteBuf);
            byteBuf.clear();
        }
        
        serverSocketChannel.close();
        sChannel.close();
        fileChannel.close();
    }
}

案例2:发送文件并发送接收反馈

public class BlockingTest{
    //客户端
    public void client(){
        Socket socket = new Socket();
        socket.connect(new InetSocketAddress("127.0.0.1",9898),10);
        SocketChannel sChannel = socket.getChannel();
        
        FileChannel inChannel = FileChannel.open(Paths.get("1.txt"),StandardOpenOption.READ);
        
        ByteBuffer byteBuf = ByteBuffer.allocate(1024);
        
        while(inChannel.read(byteBuf) != -1){
            byteBuf.flip();
            sChannel.write(byteBuf);
            byteBuf.clear();
        }
        sChannel.shutdownPutput();
        //接收服务端的反馈,并输出
        int len = 0;
        while((len = sChannel.read(byteBuf)) != -1){
            byteBuf.flip();
            System.out.println(new String(byteBuf.array(),0,len));
            byteBuf.clear();
        }
        inChannel.close();
        sChannel.close();
    }

    //服务端
    public void server(){
        //未绑定的
        ServerSocket serverSocket = new ServerSocket(); 
        //绑定到特定主机和端口号,默认为本地
        serverSocket.bind(new InetSocketAddress(9898));  //直接用本地地址
        //直接获取绑定好的
        //
        //ServerSocket serverSocket = new ServerSocket(9898);
        //
        //获取通道
        ServerSocketChannel ssChannel = serverSocket.getChannel();
        //用通道接收返回通道
        SocketChannel sChannel = ssChannel.accept();
        
        //用套接字接收
        //Socket socket = serverSocket.accept();
        //SocketChannel sChannel = socket.getChannel();
        //
        
        FileChannel fileChannel = FileChannel.open(Paths.get("2.txt"),StandardOpenOption.WRITE,StandardOpenOption.CREATE);
        
        ByteBuffer byteBuf = ByteBuffer.allocate(1024);
        while(sChannel.read(byteBuf) != -1){
            byteBuf.flip();
            fileChannel.write(byteBuf);
            byteBuf.clear();
        }
        
        //发送反馈给客户端
        byteBuf.put("收到客户端发来的文件。".getBytes());
        sChannel.write(byteBuf);
        serverSocketChannel.close();
        sChannel.close();
        fileChannel.close();
    }
}

3. 非阻塞式IO(TCP)

调用Channel接口的configureBlocking(Boolean blocking)来设置是否阻塞,true为阻塞,false为非阻塞,默认阻塞。

  • 选择器(Selector)是SelectableChannel对象的多路复用器,Selector可以同时监听多个SelectableChannel的IO状况,即利用Selector可以利用一个线程管理多个线程。

    • public abstract class Selector extends Object i mplements Closeable
  • 选择器(Selector)的使用

    • 当调用register(Selector selector,int ops)将通道注册到选择器时,选择器对通道的监听事件,需要第二个参数ops来指定。
      • SelectionKey register​(Selector sel, int ops)
      • abstract SelectionKey register​(Selector sel, int ops, Object att)
    • 可以监听的事件类型(可以使用SelectionKey的四个常量表示)
      • 读:SelectionKey.OP_READ
      • 写:SelectionKey.OP_WRITE
      • 连接: SelectionKey.OP_CONNECT
      • 接收:SelectionKey.OP_ACCEPT
    • 若注册时不仅监听一个事件,可以用位或( | )连接
  • SelectionKey是SelectableChannel和Selector之间的注册关系。每次向选择器注册通道时就会选择一个事件(选择键),选择键包含
    两个为整值得操作集,操作集的每一位都表示该键的通道所支持的一类可选择操作。

  • 案例

public class TestNonBlockingIO{
    public void client(){
        Socket socket = new Socket(new InetSocketAddress("127.0.0.1",9898));
        SocketChannel sChannel = socket.getChannel();
        
        sChannel.configureBlocking(false);
        ByteBuffer byteBuf = ByteBuffer.allocate(1024);
		Scanner in = new Scanner(System.in);
		while(in.hasNext()){
			String str = in.next();
			byteBuf.put((new LocalDate().getBytes() + "\n" + str).getBytes());
        	byteBuf.flip();
        	sChannel.write(byteBuf);
		}

        sChannel.close();
    }
    
    public void server(){
        //1.获取通道
        ServerSocket sSocket = new ServerSocket(9898); //直接绑定到本地9898端口
        ServerSocketChannel ssChannel = sSocket.getChannel();
        //2.切换非阻塞模式
        ssChannel.configureBlocking(flase);
        //3.获取选择器,Selector,打开一个多路复用器
        Selector selector = Selector.open();
        //4.将通道注册到选择器上,并制定监听某个事件,这个selectionKey是ssChannel和选择器之间的注册关系
        SelectionKey selectionKey = ssChannel.register(selector,SelectionKey.OP_ACCEPT | SelectionKey.OP_READ);
        //5.轮巡式的获取选择器上已经“准备就绪”的事件
        while(selector.select() > 0){
        selectorKeys()返回一个Set
        	Set<SelectionKey> set = selector.selectedKeys();
        	Iterator<SelectionKey> it = set.iterator(); //获取迭代器
        	//6.获取准备就绪的事件
        	while(it.hasNext()){
        		SelectionKey sk = it.next();
        		//7.判断是哪种时间就绪
        		if(sk.isAcceptable()){
        			SocketChannel sChannel = ssChannel.accept();
        			//8.将sChannel设置成非阻塞模式
        			sChannel.configureBlocking(flase);
        			//9.将该通道注册到选择器上
        			sChannel.register(selector,SelectionKey.READ);
        			
        		} else if(sk.isReadable()){
        			//获取读就绪的通道
        			SocketChannel sChannel = (SocketChannel)sk.channel();
        			
        			ByteBuffer byteBuf = ByteBuffer.allocate(1024);
        			
        			int len = 0;
        			while((len = sChannel.read(byteBuf)) > 0){
        				byteBuf.flip();
        				System.out.println(new String(byteBuf.array(),0,len));
        				byteBuf.clear();
        			}
        		}
        		//取消选择键
        		it.remove();
        	}
        	
        }
    }
}

4. 非阻塞IO(UDP)

  • 案例:客户端服务端发送消息
public class TestBlockingIO2{
	//发送端
	public void send(){
		DatagramSocket datagramSocket = new DatagramSocket();
		//datagramSocket.bind(new InetSocketAddress("127.0.0.1",9898));
		//datagramSocket.connect(new InetSocketAddress("127.0.0.1",9898));		
		DatagramChannel dsChannel = datagram.getChannel();
		dsChannel.configureBlocking(false);
		
		Scanner in = new Scanner(System.in);

		ByteBuffer byteBuf = ByteBuffer.allocate(1024);
		
		while(in.hasNext()){
			String str = in.next();
			byteBuf.put((new Data().toString() + "\n" + str).getBytes());
			byteBuf.flip();
			dsChannel.send(byteBuf,new InetSocetAddress("127.0.0.1",9898));
			byteBuf.clear();
		}
		dsChannel.close();
	}
	//服务端
	public void receive(){
		DatagramChannel dChannel = DatagramChannel.open();
		dChannel.configureBlocking(false);
		dChannel.bind(new InetSocketAddress(9898));
		
		Selector selector = Selector.open();
		
		dChannel.register(selector,SelectionKey.OP_READ);
		
		while(selector.select() > 0){
			Iterator<SelectionKey> it = selector.selectedKeys().iterator();
			while(it.hasNext()){
				SelectionKey sk = it.next();
				if(sk.isReadable()){
					DatagramChannel dc = (DatagramChannel)sk.channel();
					ByteBuffer byteBuf = ByteBuffer.allocate(1024);
					
					dc.receive(byteBuf);
					byteBuf.flip();
					System.out.println(new String(byteBuf.array(),0,byteBuf.limit()));
					byteBuf.clear();
					dc.close();
				}
			}
			//取消选择键
			it.remove();
		}
		dChannel.close();
	}
}

5. 管道(Pipe)

管道是两个线程之间的单向数据连接,Pipe有一个source通道和一个sink通道。数据会被写到sink通道,从source通道读取。

  • 测试方法:
public void testPipe(){
	//获取管道
	Pipe pipe = Pipe.open();
	//写入
	ByteBuffer byteBuf = ByteBuffer.allocate(1024);
	//获取内部类写入通道
	Pipe.SinkChannel sink = pipe.sink();
	byteBuf.put("发送数据".getBytes());
	sink.write(byteBuf);
	
	//读取
	byteBuf.reset();
	Pipe.SourceChannel source = pipe.source();
	int len = source.read(byteBuf);
	System.out.println(new String(byteBuf.array(),0,len));
	
	sink.close();
	source.close();
}