NIO-网络IO
1. 基础
-
使用NIO完成网络通信的三个要素
-
通道(Channel):负责连接
- 实现了java.nio.channels.Channel接口,分为阻塞模式和非阻塞模式。
- SelectableChannel
- AbstractSelectableChannel
- SocketChannel
- ServerSocketChannel
- DatagramChannel
- Pipe.SinkChannel
- Pipe.SourceChannel
- SctpChannel
- SctpMultiChannel
- SctpServerChannel
- AbstractSelectableChannel
- 缓冲区(Buffer):负责数据的存取
- 选择器(Selector):是SelectableChannel的多路复用器,用于监控SelectableChannel的IO状况
2. 阻塞式NIO(TCP)
案例1:客户端向服务端发送文件
public class BlockingTest{
//客户端
public void client(){
Socket socket = new Socket();
socket.connect(new InetSocketAddress("127.0.0.1",9898),10);
SocketChannel sChannel = socket.getChannel();
FileChannel inChannel = FileChannel.open(Paths.get("1.txt"),StandardOpenOption.READ);
ByteBuffer byteBuf = ByteBuffer.allocate(1024);
while(inChannel.read(byteBuf) != -1){
byteBuf.flip();
sChannel.write(byteBuf);
byteBuf.clear();
}
inChannel.close();
sChannel.close();
}
//服务端
public void server(){
//未绑定的
ServerSocket serverSocket = new ServerSocket();
serverSocket.bind(new InetSocketAddress(9898)); //直接用本地地址
//直接获取绑定好的
//
//ServerSocket serverSocket = new ServerSocket(9898);
//
//获取通道
ServerSocketChannel ssChannel = serverSocket.getChannel();
//用通道接收返回通道
SocketChannel sChannel = ssChannel.accept();
//用套接字接收
//Socket socket = serverSocket.accept();
//SocketChannel sChannel = socket.getChannel();
//
FileChannel fileChannel = FileChannel.open(Paths.get("2.txt"),StandardOpenOption.WRITE,StandardOpenOption.CREATE);
ByteBuffer byteBuf = ByteBuffer.allocate(1024);
while(sChannel.read(byteBuf) != -1){
byteBuf.flip();
fileChannel.write(byteBuf);
byteBuf.clear();
}
serverSocketChannel.close();
sChannel.close();
fileChannel.close();
}
}
案例2:发送文件并发送接收反馈
public class BlockingTest{
//客户端
public void client(){
Socket socket = new Socket();
socket.connect(new InetSocketAddress("127.0.0.1",9898),10);
SocketChannel sChannel = socket.getChannel();
FileChannel inChannel = FileChannel.open(Paths.get("1.txt"),StandardOpenOption.READ);
ByteBuffer byteBuf = ByteBuffer.allocate(1024);
while(inChannel.read(byteBuf) != -1){
byteBuf.flip();
sChannel.write(byteBuf);
byteBuf.clear();
}
sChannel.shutdownPutput();
//接收服务端的反馈,并输出
int len = 0;
while((len = sChannel.read(byteBuf)) != -1){
byteBuf.flip();
System.out.println(new String(byteBuf.array(),0,len));
byteBuf.clear();
}
inChannel.close();
sChannel.close();
}
//服务端
public void server(){
//未绑定的
ServerSocket serverSocket = new ServerSocket();
//绑定到特定主机和端口号,默认为本地
serverSocket.bind(new InetSocketAddress(9898)); //直接用本地地址
//直接获取绑定好的
//
//ServerSocket serverSocket = new ServerSocket(9898);
//
//获取通道
ServerSocketChannel ssChannel = serverSocket.getChannel();
//用通道接收返回通道
SocketChannel sChannel = ssChannel.accept();
//用套接字接收
//Socket socket = serverSocket.accept();
//SocketChannel sChannel = socket.getChannel();
//
FileChannel fileChannel = FileChannel.open(Paths.get("2.txt"),StandardOpenOption.WRITE,StandardOpenOption.CREATE);
ByteBuffer byteBuf = ByteBuffer.allocate(1024);
while(sChannel.read(byteBuf) != -1){
byteBuf.flip();
fileChannel.write(byteBuf);
byteBuf.clear();
}
//发送反馈给客户端
byteBuf.put("收到客户端发来的文件。".getBytes());
sChannel.write(byteBuf);
serverSocketChannel.close();
sChannel.close();
fileChannel.close();
}
}
3. 非阻塞式IO(TCP)
调用Channel接口的configureBlocking(Boolean blocking)来设置是否阻塞,true为阻塞,false为非阻塞,默认阻塞。
-
选择器(Selector)是SelectableChannel对象的多路复用器,Selector可以同时监听多个SelectableChannel的IO状况,即利用Selector可以利用一个线程管理多个线程。
public abstract class Selector extends Object i mplements Closeable
-
选择器(Selector)的使用
- 当调用register(Selector selector,int ops)将通道注册到选择器时,选择器对通道的监听事件,需要第二个参数ops来指定。
SelectionKey register(Selector sel, int ops)
abstract SelectionKey register(Selector sel, int ops, Object att)
- 可以监听的事件类型(可以使用SelectionKey的四个常量表示)
- 读:SelectionKey.OP_READ
- 写:SelectionKey.OP_WRITE
- 连接: SelectionKey.OP_CONNECT
- 接收:SelectionKey.OP_ACCEPT
- 若注册时不仅监听一个事件,可以用位或( | )连接
- 当调用register(Selector selector,int ops)将通道注册到选择器时,选择器对通道的监听事件,需要第二个参数ops来指定。
-
SelectionKey是SelectableChannel和Selector之间的注册关系。每次向选择器注册通道时就会选择一个事件(选择键),选择键包含
两个为整值得操作集,操作集的每一位都表示该键的通道所支持的一类可选择操作。 -
案例
public class TestNonBlockingIO{
public void client(){
Socket socket = new Socket(new InetSocketAddress("127.0.0.1",9898));
SocketChannel sChannel = socket.getChannel();
sChannel.configureBlocking(false);
ByteBuffer byteBuf = ByteBuffer.allocate(1024);
Scanner in = new Scanner(System.in);
while(in.hasNext()){
String str = in.next();
byteBuf.put((new LocalDate().getBytes() + "\n" + str).getBytes());
byteBuf.flip();
sChannel.write(byteBuf);
}
sChannel.close();
}
public void server(){
//1.获取通道
ServerSocket sSocket = new ServerSocket(9898); //直接绑定到本地9898端口
ServerSocketChannel ssChannel = sSocket.getChannel();
//2.切换非阻塞模式
ssChannel.configureBlocking(flase);
//3.获取选择器,Selector,打开一个多路复用器
Selector selector = Selector.open();
//4.将通道注册到选择器上,并制定监听某个事件,这个selectionKey是ssChannel和选择器之间的注册关系
SelectionKey selectionKey = ssChannel.register(selector,SelectionKey.OP_ACCEPT | SelectionKey.OP_READ);
//5.轮巡式的获取选择器上已经“准备就绪”的事件
while(selector.select() > 0){
selectorKeys()返回一个Set
Set<SelectionKey> set = selector.selectedKeys();
Iterator<SelectionKey> it = set.iterator(); //获取迭代器
//6.获取准备就绪的事件
while(it.hasNext()){
SelectionKey sk = it.next();
//7.判断是哪种时间就绪
if(sk.isAcceptable()){
SocketChannel sChannel = ssChannel.accept();
//8.将sChannel设置成非阻塞模式
sChannel.configureBlocking(flase);
//9.将该通道注册到选择器上
sChannel.register(selector,SelectionKey.READ);
} else if(sk.isReadable()){
//获取读就绪的通道
SocketChannel sChannel = (SocketChannel)sk.channel();
ByteBuffer byteBuf = ByteBuffer.allocate(1024);
int len = 0;
while((len = sChannel.read(byteBuf)) > 0){
byteBuf.flip();
System.out.println(new String(byteBuf.array(),0,len));
byteBuf.clear();
}
}
//取消选择键
it.remove();
}
}
}
}
4. 非阻塞IO(UDP)
- 案例:客户端服务端发送消息
public class TestBlockingIO2{
//发送端
public void send(){
DatagramSocket datagramSocket = new DatagramSocket();
//datagramSocket.bind(new InetSocketAddress("127.0.0.1",9898));
//datagramSocket.connect(new InetSocketAddress("127.0.0.1",9898));
DatagramChannel dsChannel = datagram.getChannel();
dsChannel.configureBlocking(false);
Scanner in = new Scanner(System.in);
ByteBuffer byteBuf = ByteBuffer.allocate(1024);
while(in.hasNext()){
String str = in.next();
byteBuf.put((new Data().toString() + "\n" + str).getBytes());
byteBuf.flip();
dsChannel.send(byteBuf,new InetSocetAddress("127.0.0.1",9898));
byteBuf.clear();
}
dsChannel.close();
}
//服务端
public void receive(){
DatagramChannel dChannel = DatagramChannel.open();
dChannel.configureBlocking(false);
dChannel.bind(new InetSocketAddress(9898));
Selector selector = Selector.open();
dChannel.register(selector,SelectionKey.OP_READ);
while(selector.select() > 0){
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
while(it.hasNext()){
SelectionKey sk = it.next();
if(sk.isReadable()){
DatagramChannel dc = (DatagramChannel)sk.channel();
ByteBuffer byteBuf = ByteBuffer.allocate(1024);
dc.receive(byteBuf);
byteBuf.flip();
System.out.println(new String(byteBuf.array(),0,byteBuf.limit()));
byteBuf.clear();
dc.close();
}
}
//取消选择键
it.remove();
}
dChannel.close();
}
}
5. 管道(Pipe)
管道是两个线程之间的单向数据连接,Pipe有一个source通道和一个sink通道。数据会被写到sink通道,从source通道读取。
- 测试方法:
public void testPipe(){
//获取管道
Pipe pipe = Pipe.open();
//写入
ByteBuffer byteBuf = ByteBuffer.allocate(1024);
//获取内部类写入通道
Pipe.SinkChannel sink = pipe.sink();
byteBuf.put("发送数据".getBytes());
sink.write(byteBuf);
//读取
byteBuf.reset();
Pipe.SourceChannel source = pipe.source();
int len = source.read(byteBuf);
System.out.println(new String(byteBuf.array(),0,len));
sink.close();
source.close();
}