Linux网络I/O模型介绍(一)
[TOC]
1.概念
Linux的内核将所有外部设备都看作一个文件来操作,对一个文件的读写操作会调用内核提供的系统指令,返回一个file desc(fd 文件描述符)
对一个socket的读写也会有相应的描述符(指向内核中的一个结构体:文件路径/数据区等信息)
即:内核空间和磁盘控制器的组合概念
2.五种I/O模型
- 阻塞I/O模型
- 非阻塞I/O模型
- I/O复用模型
- 信号驱动I/O模型
- 异步I/O模型
1.阻塞I/O模型:
最常见的I/O模型,缺省情况下,所有文件的操作都是阻塞的,一条线完成
2.非阻塞I/O模型
如java1.4更新的NIO,增加缓冲区:
recvfrom从应用层到内核时,如果该缓冲区没有数据的话,则直接返回一个EWOULDBLOCK错误状态,
一般非阻塞使用轮询检查这个状态,来判断内核是否有数据
3.I/O复用模型
Linux提供select/poll,进程将通过一个或多个fd传递给select或poll系统调用,阻塞在select操作上,这样就可以检测多个fd的准备状态
select/poll的缺点是顺序扫描fd状态,而且数量有限
因此Linux提供了epoll系统调用改进,使用驱动事件方式代替顺序扫描,当有fd准备好后,立刻返回rollback,性能更强大,Netty借鉴部分
4.信号驱动I/O模型
首先开启套接口信号驱动I/O功能,通过系统调用sigaction执行一个信号处理函数然后返回.(非阻塞)
当数据准备好后会出发该函数生成SIGIO信号,通过信号通知应用程序调用recvfrom来读取数据,并通知主循环函数处理数据
该模型告诉应用程序什么时候可以开始I/O操作
5.异步IO模型
告知内核启动某个操作,并让内核在整个操作完成后通知应用程序
告诉应用什么时候I/O操作完成
3.I/O多路复用技术
特点:
多路复用最大的优势是系统开销小,不需要创建新的额外进程活鲜城,也不需要维护这些线程.节约资源
场景:
服务器需要同时处理多个处于监听状态或者多个连接状态的套接字
服务器需要同时处理多种协议的套接字
支持I/O多路复用技术的系统调用包括:select/pselect/poll/epoll
目前epoll替代select,原因如下:
- 支持一个进程打开socket描述符(fd)的数量不受限制(仅受操作系统最大文件句柄数影响)
- I/O效率不会随着FD数目的增加而线性下降(事件驱动机制)
- 使用mmap加速内核与用户空间的消息传递(epoll是通过内核和用户空间mmap同一块内存实现)
- epoll的API更加简单
4.BIO Socket示例
服务器端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| /** * @ClassMame: bio_socket_server * @Description: TODO * @Author 宝全 * @Date 2018/10/26 17:16 * @Version 1.0 */ public class DemoSocketServer { public static void main(String[] args) throws IOException { Integer port = 8080; ServerSocket serverSocket = new ServerSocket(port); System.out.println("服务启动成功!"); while (true){ System.out.println(Arrays.toString(args)); Socket socket = serverSocket.accept(); System.out.println("接入一个Socket客户端"); new Thread(new DemoSocketClientListening(socket)).start(); } } }
class DemoSocketClientListening implements Runnable {
private Socket socket;
public DemoSocketClientListening(Socket socket) { this.socket = socket; }
@Override public void run() { BufferedReader br = null; PrintWriter printWriter = null; try {
br = new BufferedReader(new InputStreamReader(this.socket.getInputStream())); while (true) { String s = br.readLine(); if (s == null) return; System.out.println(s); printWriter = new PrintWriter(this.socket.getOutputStream(), true); printWriter.println("发送成功!"); } } catch (IOException e) { e.printStackTrace(); } finally { try { if (br != null) { br.close(); } if (printWriter != null) { printWriter.close(); } if (socket != null) { socket.close(); } } catch (IOException e) { e.printStackTrace(); } } } }
|
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| /** * @ClassMame: DemoSocketClient * @Description: TODO * @Author 宝全 * @Date 2018/10/26 17:28 * @Version 1.0 */ public class DemoSocketClient {
public static void main(String[] args) { Socket socket = null; BufferedReader br = null; try { socket = new Socket("localhost", 8080);
System.out.println("客户端连接成功"); Scanner scanner = new Scanner(System.in); br = new BufferedReader(new InputStreamReader(socket.getInputStream())); while (true) { System.out.println("输入信息:↓\n\r"); String msg = scanner.nextLine(); PrintStream ps = new PrintStream(socket.getOutputStream(), true); System.out.println("发送中..."); ps.println(msg); if (socket.getInputStream().available() == 0) { System.out.println(br.readLine()); } } } catch (IOException e) { e.printStackTrace(); } finally { try { if (br != null) { br.close(); } if (socket!=null){ socket.close(); } } catch (IOException e) { e.printStackTrace(); } } } }
|
同步Socket服务器产生的问题是阻塞,和线程的创建
每一个接入的客户端都需要创建一个新的线程维护
伪异步I/O
使用一个Task封装socket,通过两个线程池来维护线程
伪异步IO只是对同步IO的简单优化,本质还是同步I/O,并不能解决问题
- 服务器处理缓慢,返回应答消息消耗60s(平时只需要10ms)
- 采用伪异步I/O的线程正在读取故障节点的响应,由于读取输入流是阻塞的,将会被同步阻塞60s
- 当所有可用线程都被故障服务器阻塞,那么后续的大量I/O都在排排队
- 由于采用阻塞队列线程池,当队列集满后,后续入队的操作也会被阻塞
- 由于前段只有一个Accptor线程接收客户端接入,它被阻塞在线程池的同步阻塞队列之后,新的客户端请求将被拒绝,客户端发生大量连接超时
- 由于几乎所有连接都超时,调用者认为该系统已经崩溃,无法接收新的请求
BIO搭建服务器模型/模拟浏览器请求实例
https://blog.csdn.net/sunxing007/article/details/4305956
5.NIO多路复用器(反应堆模式)select模式
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152
| /** * @ClassMame: DemoServerSocket * @Description: TODO * @Author 宝全 * @Date 2018/10/29 16:20 * @Version 1.0 */ public class DemoTimeServer { public static void main(String[] args) { Integer port = 8000; MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port); new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start(); } }
/** * IO多路复用器 */ class MultiplexerTimeServer implements Runnable {
private Selector selector; //选择器(多路复用器) private ServerSocketChannel serverChannel; //可选择通道 private volatile boolean stop;
/** * 通过构造初始化资源 * @param port */ public MultiplexerTimeServer(Integer port) { try {
selector = Selector.open(); serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.socket().bind(new InetSocketAddress(port), 1024); serverChannel.register(selector, SelectionKey.OP_ACCEPT); //注册连接监听操作 System.out.println("时间服务器在" + port + "端口启动"); } catch (Exception e) { e.printStackTrace(); System.exit(1); //如果初始化失败(如端口冲突),则退出程序 } }
public void stop() { this.stop = true; }
@Override public void run() { while (!stop) { //自旋循环 try { selector.select(1000);//多路复用器每隔1秒激活一次准备 Set<SelectionKey> selectionKeys = selector.selectedKeys(); //查看所有selectionKey Iterator<SelectionKey> iterator = selectionKeys.iterator(); SelectionKey key = null; while (iterator.hasNext()) { key = iterator.next();//如果多路复用器中有注册,那么赋值给SelectionKey,并移除原有的key iterator.remove(); try { handleInput(key); //将Key传到IO事件中使用(包含通道信息) } catch (Exception e) { // e.printStackTrace(); System.out.println("一个连接断开"); if (key != null) { key.channel(); } if (key.channel() != null) { key.channel().close(); } } } } catch (IOException e) { e.printStackTrace(); } }
//多路复用器关闭以后,所有上面注册的通道都会关闭,不需要重复释放 if (selector!=null){ try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } }
/** * IO事件处理器 * @param key * @throws IOException */ private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { //判断该键是否有效
//判断连接权限,如果连接成功,则将该Socket作为服务监听Socket,给该链接附加读操作权限 if (key.isAcceptable()) { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel socketChannel = ssc.accept(); socketChannel.configureBlocking(false);; socketChannel.register(selector, SelectionKey.OP_READ); } //---------------------上面两个判断相当于完成了TCP的三次握手,TCP物理链路层正式建立(可以对该链接的配置做更为细致的设置)
//判断读权限,如果该链接具有读权限,则将该Socket作为客户端Socket,并进行读操作 if (key.isReadable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int read = socketChannel.read(byteBuffer);
//因为socketChannel的read已经是非阻塞线程,所以使用返回值判断即可 if (read > 0) { //翻转 byteBuffer.flip(); byte[] bytes = new byte[byteBuffer.remaining()]; byteBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("时间服务器接收到信息:" + body);
String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(socketChannel, currentTime); } else if (read < 0) { //断开初测关系,并关闭通道 key.cancel(); socketChannel.close(); } else { ; //读取为0时不处理 } } } }
/** * 写操作(可以使用hasRemaining()(检测缓冲区是否写干净)来做'写半包'判定(此处没有做,但是会发生该情况)) * @param channel * @param response * @throws IOException */ private void doWrite(SocketChannel channel, String response) throws IOException { if (response != null && response.trim().length() > 0) { byte[] bytes = response.getBytes(); ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length); byteBuffer.put(bytes);
byteBuffer.flip(); channel.write(byteBuffer); } }
}
|
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76
| public void doConnect() throws IOException { socketChannel.connect(new InetSocketAddress(address, port)); if (socketChannel.isConnected()) { socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel); } else if (socketChannel.isConnectionPending()){ if (socketChannel.finishConnect()){ socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel); } } else { socketChannel.register(selector, SelectionKey.OP_CONNECT); } socketChannel.configureBlocking(false); }
public void handleSocket(SelectionKey key) throws IOException { if (key.isValid()) { SocketChannel sc = (SocketChannel) key.channel(); if (key.isConnectable()) { if (sc.finishConnect()) { sc.register(selector, SelectionKey.OP_READ); System.out.println(key.isReadable()); //??不可读 false System.out.println(key.isConnectable()); //已连接 true System.out.println(sc.isRegistered()); //已注册 true doWrite(sc); } else { System.exit(1); //链接失败 } } System.out.println(key.isReadable()); if (key.isReadable()) { ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int read = socketChannel.read(byteBuffer);
if (read > 0) { byteBuffer.flip(); byte[] bytes = new byte[byteBuffer.remaining()]; byteBuffer.get(bytes);
String body = new String(bytes, "UTF-8"); System.out.println("客户端接收消息:" + body); stop(); } else if (read < 0) { key.cancel(); socketChannel.close(); } else { ; }
}
} }
public void doWrite(SocketChannel socketChannel) { byte[] bytes = "QUERY TIME ORDER".getBytes(); ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length); byteBuffer.put(bytes); byteBuffer.flip();
try { socketChannel.write(byteBuffer); if (!byteBuffer.hasRemaining()) { //半包写检测(非处理方案) System.out.println("Send order 2 server succeed"); } } catch (IOException e) { e.printStackTrace(); }
}
}
|
6.AIO
NIO2.0的异步IO,是真正的异步非阻塞IO,
对应Unix中的事件驱动I/O(AIO)
不需要通过多路复用器对注册通道进行了轮询即可实现异步读写
从而简化了NIO的编程模型
因为是事件驱动的被动模式,因此通过被动回调(JDK底层通过线程池ThreadPoolExecutor执行)即可,不需要单独的创建线程