微信订单网站模版,微网站背景图片,php可以做移动端网站,公司网站建设制作全包近30集的孙哥视频课程#xff0c;看完一集整理一集来的#xff0c;内容有点多#xff0c;请大家放心食用~ 1. 网络通讯的演变
1.1 多线程版网络通讯
在传统的开发模式中#xff0c;客户端发起一个 HTTP 请求的过程就是建立一个 socket 通信的过程#xff0c;服务端在建立… 近30集的孙哥视频课程看完一集整理一集来的内容有点多请大家放心食用~ 1. 网络通讯的演变
1.1 多线程版网络通讯
在传统的开发模式中客户端发起一个 HTTP 请求的过程就是建立一个 socket 通信的过程服务端在建立连接之后会创建单独的线程来处理当前请求。如下图所示 其中客户端示例代码如下
Socket socket new Socket(127.0.0.1,8080);
PrintWriter printWriter new PrintWriter(socket.getOutputStream());
printWriter.write(send data to server);服务端示例代码如下
ServerSocket serverSocket new ServerSocket(8080);
Socket socket null;
while (true) {socket serverSocket.accept();// 每一个消息都单独创建一个线程去处理new Thread(new MsgServerHandler(socket)).start();
}随着越来越多的请求发起按上述模式服务端会 对每一个请求单独创建线程 处理 在这种模式下会存在以下几个问题
线程创建开销线程是通过 JVM 调用操作系统来创建内存占用高线程是占用存储资源的CPU使用率高CPU轮转线程之间上下文切换
1.2 线程池版网络通讯
为了解决传统网络通讯开发所带来的问题可通过在服务端 创建线程池 的方式来使得线程的创建可控不能来一个请求就创建一个线程去处理 服务端示例代码如下
// 使用线程池预先创建线程
static{threadPoolExecutor new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), 20, 120L, TimeUnit.SECONDS, new ArrayBlockingQueue(1000));
}ServerSocket serverSocket new ServerSocket(8080);
Socket socket;
while (true){socket serverSocket.accept();// 通过提交到线程池处理threadPoolExecutor.execute(new MsgServerHandler(socket));
}这样一来就解决了传统网络开发中的3个问题但是又带来了新的问题 当连接池中的线程被占用由于客户端等待输入或其它操作导致完新的请求不能获取到线程需要进入到队列中进行 等待 ~
1.3 NIO非阻塞网络通讯
可使用 NIO 来解决上述阻塞的问题在整个数据传输过程中NIO 与上述两种通讯方式存在以下区别
传统的数据传输方式采用 流inputStream、outputStream 而 NIO 采用 管道channel 来进行数据传输NIO服务端 除了使用 ServerSocket 外在网络编程中NIO 还引入了 选择器selector 在引入 selector 之后服务端能对客户端的 Channel 进行监控如果能正常读写则分配线程处理反之发现某些客户端 阻塞 之后selector 可以释放已分配给当前 Channel 的线程供其它 Channel 使用 2. NIO的两个核心
2.1 Channel简介
Channel 是 IO 的通讯管道类似于 InputStream 和 OutputStream 但没有方向性流有方向性 常见的Channel 文件操作 FileChannel 读写文件中的数据 网络操作 SocketChannel 通过TCP读写网络中的数据ServerSocketChannel 监听新进来的 TCP 连接并对每一个连接都创建 SocketChannel DatagramChannel 通过 UDP 读写网络中的数据 2.2 Buffer简介 Channel 读取或写入的数据都要写到Buffer中才可以被程序操作
在文件读取的过程中由于 Channel 没有方向性所以 Buffer 为了区分读写引入了 读模式 、 写模式 进行区分均站在程序的角度来看文件通过管道 Channel 将数据存入缓存 Buffer 中供程序操作使用
读模式将文件数据读取到程序flip()写模式将程序中的数据保存到文件新创建clear()compact
读、写模式只能存在一个默认新创建为写模式 常见的Buffer ByteBuffer应用最广泛的BufferCharBufferDoubleBufferFloatBufferIntBufferLongBufferShortBufferMapperByteBufferByteBuffer的子类用于直接内存操作 3. 初识NIO程序
此处我们通过读取一个文件数据的程序来加深理解 Channel 和 Buffer ~
准备好我们的测试文件 data.txt 并写入以下的测试数据
1234567890创建 Channel 的方式有以下几种
FileInputStreamRandomAccessFileFileChannel.open()
3.1 FileInputStream实现
创建并运行以下测试代码
public class NIOTest {public static void main(String[] args) throws IOException {// 1. 创建Channel FileChannelFileChannel channel new FileInputStream(D:\\Rivamed\\awesome\\message\\data.txt).getChannel();// 2. 创建缓冲区此处分配了10字节ByteBuffer byteBuffer ByteBuffer.allocate(10);// 3. 将读取的数据放入缓冲区channel.read(byteBuffer);// 4. 通过程序读取Buffer中的内容设置缓冲区为读模式byteBuffer.flip();// 5. 循环读取缓冲区中的数据while (byteBuffer.hasRemaining()){byte b byteBuffer.get();System.out.println((char)b (char)b);}// 6. 读完之后设置为写模式byteBuffer.clear();}
}上述代码能正常运行并读取打印 data.txt 文件中的内容但如果 data.txt 中的内容超过10位如
1234567890abc则上述代码将不能打印出 abc 这三个字符原因是我们设置的 ByteBuffer 缓冲区大小为 10 个字节程序在第一次读取完之后就正常结束了显然不符合预期~我们需要改造成以下代码来循环读取
public class NIOTest {public static void main(String[] args) throws IOException {// 1. 创建Channel FileChannelFileChannel channel new FileInputStream(D:\\Rivamed\\awesome\\message\\data.txt).getChannel();// 2. 创建缓冲区此处分配了10字节ByteBuffer byteBuffer ByteBuffer.allocate(10);while (true){// 3. 将读取的数据放入缓冲区read为实际读取的字节数如果没有内容则返回 -1int read channel.read(byteBuffer);if(read -1) break;// 4. 通过程序读取Buffer中的内容设置缓冲区为读模式byteBuffer.flip();// 5. 循环读取缓冲区中的数据while (byteBuffer.hasRemaining()){byte b byteBuffer.get();System.out.println((char)b (char)b);}// 6. 读完之后设置为写模式byteBuffer.clear();}}
}3.2 RandomAccessFile实现
使用 RandomAccessFile 实现文件读取的代码如下所示并添加异常处理等
public class NIOTest2 {public static void main(String[] args) {FileChannel fileChannel null;try {// 1. 创建Channel FileChannelfileChannel new RandomAccessFile(D:\\Rivamed\\awesome\\message\\data.txt,rw).getChannel();// 2. 创建缓冲区此处分配了10字节ByteBuffer byteBuffer ByteBuffer.allocate(10);while (true){// 3. 将读取的数据放入缓冲区read为实际读取的字节数如果没有内容则返回 -1int read fileChannel.read(byteBuffer);if(read -1) break;// 4. 通过程序读取Buffer中的内容设置缓冲区为读模式byteBuffer.flip();// 5. 循环读取缓冲区中的数据while(byteBuffer.hasRemaining()){byte b byteBuffer.get();System.out.println((char)b (char)b);}// 6. 读完之后设置为写模式byteBuffer.clear();}}catch (Exception e){e.printStackTrace();}finally {// 7. 关闭通道if(fileChannel!null){try {fileChannel.close();} catch (IOException e) {throw new RuntimeException(e);}}}}
}3.3 FileChannel.open()实现
使用 FileChannel.open() 实现文件读取的代码如下所示
public class NIOTest3 {public static void main(String[] args) {FileChannel fileChannel null;try {// 1. 创建Channel FileChannelfileChannel FileChannel.open(Paths.get(D:\\Rivamed\\awesome\\message\\data.txt), StandardOpenOption.READ);// 2. 创建缓冲区此处分配了10字节ByteBuffer byteBuffer ByteBuffer.allocate(10);while (true){// 3. 将读取的数据放入缓冲区read为实际读取的字节数如果没有内容则返回 -1int read fileChannel.read(byteBuffer);if(read -1) break;// 4. 通过程序读取Buffer中的内容设置缓冲区为读模式byteBuffer.flip();// 5. 循环读取缓冲区中的数据while(byteBuffer.hasRemaining()){byte b byteBuffer.get();System.out.println((char)b (char)b);}// 6. 读完之后设置为写模式byteBuffer.clear();}}catch (Exception e){e.printStackTrace();}finally {// 7. 关闭通道if(fileChannel!null){try {fileChannel.close();} catch (IOException e) {throw new RuntimeException(e);}}}}
}3.4 使用try-resource重构
在 JDK1.7 之后引入了 try-resource 的机制它能帮我们自动完成在 finally 块中对资源的关闭操作如下为改造之后的代码示例
public class NIOTest4 {public static void main(String[] args) {// 1. 创建Channel FileChanneltry(FileChannel fileChannel FileChannel.open(Paths.get(D:\\Rivamed\\awesome\\message\\data.txt), StandardOpenOption.READ)) {// 2. 创建缓冲区此处分配了10字节ByteBuffer byteBuffer ByteBuffer.allocate(10);while (true){// 3. 将读取的数据放入缓冲区read为实际读取的字节数如果没有内容则返回 -1int read fileChannel.read(byteBuffer);if(read -1) break;// 4. 通过程序读取Buffer中的内容设置缓冲区为读模式byteBuffer.flip();// 5. 循环读取缓冲区中的数据while(byteBuffer.hasRemaining()){byte b byteBuffer.get();System.out.println((char)b (char)b);}// 6. 读完之后设置为写模式byteBuffer.clear();}} catch (IOException e) {throw new RuntimeException(e);}}
}注意 ByteBuffer byteBuffer ByteBuffer.allocate(10);ByteBuffer 一旦定义就不能动态调整 4. ByteBuffer详解
4.1 ByteBuffer 主要实现类
ByteBuffer 是 抽象类 它的主要实现类为 HeadByteBuffer用的是 JVM 内的堆内存受 GC 堆内存不够时的影响在 IO 操作中效率不高 MappedByteBuffer(DirectByteBuffer)用的操作系统上的内存一步操作文件系统不会受到 GC 影响但可能造成内存泄漏
内存泄漏和内存溢出的区别 内存泄漏 已分配的内存 没有正常释放 或者存在 内存碎片 导致后续处理过程中出现所需内存不足的情况 内存溢出 程序运行或者处理时需要用到的内存大于能提供的最大内存的情况
4.2 ByteBuffer 核心结构
ByteBuffer 是一个类似数组的结构在整个结构中包括三个主要的状态
Capacity 缓存的容量类似于数组中的sizePosition 当前缓存的下标在读取时记录当前读取的位置在写操作的时候记录写的位置从0开始每读取一次下标1Limit 读写限制在读写操作时帮我们限制了能读多少数据和还能写多少数据
读写的本质就是 Position 和 Limit 的相互作用如下如所示 不同写模式设置的区别 调用 byteBuffer.clear() 设置写模式 调用 compact() 设置写模式
4.3 ByteBuffer 核心API
使用 ByteBuffer 无非就是数据的存取即往 buffer 中写和从 buffer 中读 写入数据创建ByteBuffer、clear()、compact()的方法包含 1. channel 的 read 方法从文件、IO中往buffer中写数据1. channel.read(buffer)
2. buffer 的 put() 方法直接写入byte数据1. buffer.put(byte)2. buffer.put(byte[])读取数据(flip()) 1. channel 的 write 方法从buffer中读数据并往文件写与上述read相反
2. buffer 的 get() 方法每调用一次会影响 Position 的位置
3. rewind() 方法可以将Position重置为0用于复读数据
4. mark()reset() 方法通过mark标记Position通过reset方法调回标记从新执行
5. get(i) 方法获取特定Position位置上的数据但是不会对Position位置产生影响且不受读写模式的影响4.4 ByteBuffer 字符串操作 字符串存储到buffer中 public static void main(String[] args) {ByteBuffer byteBuffer ByteBuffer.allocate(10);// 调用 string 的 getBytes() 方法即可byteBuffer.put(Lannis.getBytes());// 设置读模式byteBuffer.flip();while (byteBuffer.hasRemaining()){System.out.println(byteBuffer (char)byteBuffer.get());}byteBuffer.clear();
}也可使用字符集编码处理 // 将字符串按指定字符集编码之后存储到 ByteBuffer 中
public static void main(String[] args) {//使用encode方法创建ByteBufferByteBuffer byteBuffer StandardCharsets.UTF_8.(lannis);// 如果使用encode方法时已经自动设置了读模式需要省略flip()// 如果此处加上flip(),limit会设置为上一次的position位置而上一次position为0进而导致数据获取不到// byteBuffer.flip();while (byteBuffer.hasRemaining()){byte b byteBuffer.get();System.out.println(byteBuffer (char)byteBuffer.get());}byteBuffer.clear();
}或者使用 ByteBuffer.wrap() 方法 public static void main(String[] args) {ByteBuffer byteBuffer ByteBuffer.wrap(lannis.getBytes());// 在使用wrap方法时已经自动设置了读模式此处需要省略flip()// 如果此处加上flip(),limit会设置为上一次的position位置而上一次position为0// byteBuffer.flip();while (byteBuffer.hasRemaining()){System.out.println(byteBuffer (char)byteBuffer.get());}byteBuffer.clear();
}Buffer中的数据转为字符串 public static void main(String[] args) {// 使用encode方法创建ByteBufferByteBuffer byteBuffer StandardCharsets.UTF_8.encode(文明和谐);// 使用decode方法解码CharBuffer decode StandardCharsets.UTF_8.decode(byteBuffer);System.out.println(decode decode);
}粘包和半包 粘包当前接受的数据包含下一次数据的内容 半包当前接受的数据不完整
5. NIO的开发使用
5.1 文件读取
读取文件的代码在上面第三节已经演示过此处不再复述
5.2 文件写入
以下为将数据写入文件的代码示例
public static void main(String[] args) throws IOException {// 1. 获得Channel可通过 FileOutputStream/RandomAccessFile 获得FileChannel data new FileOutputStream(data).getChannel();// 2. 获得bufferByteBuffer lannis StandardCharsets.UTF_8.encode(lannis);// 3. 写文件data.write(lannis);
}5.3 文件复制 使用输入输出流实现 public static void main(String[] args) throws IOException {//data1 - data2FileInputStream fileInputStream new FileInputStream(data1);FileOutputStream fileOutputStream new FileOutputStream(data2);byte[] buffer new byte[1024];while (true){int read fileInputStream.read(buffer);if(read -1)break;fileOutputStream.write(buffer,0,read);}
}使用commons-io实现 // 这里引入commons-io依赖
public static void main(String[] args) throws IOException {//data1 - data2FileInputStream fileInputStream new FileInputStream(data1);FileOutputStream fileOutputStream new FileOutputStream(data2);IOUtils.copy(fileInputStream,fileOutputStream)
}使用NIO方式实现零拷贝~效率高 public static void main(String[] args) throws IOException {FileChannel from new FileInputStream(data1).getChannel();FileChannel to new FileOutputStream(data2).getChannel();from.transferTo(0,from.size(),to);
}注意: 需要注意的是NIO传输存在文件大小上限最大支持 2G-1kb 当实际文件大小超过 2GB 之后只能进行分段拷贝 public static void main(String[] args) throws IOException {FileChannel from new FileInputStream(data1).getChannel();FileChannel to new FileOutputStream(data2).getChannel();// 还剩多少没有拷贝long left from.size();while (left 0){left left - from.transferTo(from.size()-left,left,to);}
}6. NIO网络编程
6.1 代码示例
在 NIO 网络编程中服务端 中用于接受请求的是 ServerSocketChannel 进行实际通信的是 SocketChannel 以下为创建服务端和客户端的相关代码
/*创建服务端*/
public class NIOServer {public static void main(String[] args) throws IOException {// 创建 ServerSocketChannelServerSocketChannel serverSocketChannel ServerSocketChannel.open();// 设置服务端的监听端口客户端通过网络进行访问的时候需要IP和端口serverSocketChannel.bind(new InetSocketAddress(8000));// 用于保存建立的连接ListSocketChannel socketChannels new ArrayList();ByteBuffer buffer ByteBuffer.allocate(20);// 接受客户端的连接while (true){System.out.println(等待客户端连接... );// socketChannel 代表服务端和客户端连接的一个通道【连接阻塞】SocketChannel socketChannel serverSocketChannel.accept();System.out.println(已于客户端建立连接...socketChannel);socketChannels.add(socketChannel);// 客户端与服务端通信过程for (SocketChannel channel : socketChannels) {System.out.println(开始接受处理客户端数据...);// 读取客户端提交的数据【IO阻塞】channel.read(buffer);// 设置读模式buffer.flip();// 打印输出接收到的消息System.out.println(客户端消息 StandardCharsets.UTF_8.decode(buffer));// 设置写模式buffer.clear();System.out.println(通信已结束...);}}}
}/*NIO客户端*/
public class NIOClient {public static void main(String[] args) throws IOException {// 创建 socketChannel 用于通信连接SocketChannel socketChannel SocketChannel.open();// 连接服务端socketChannel.connect(new InetSocketAddress(8000));socketChannel.write(StandardCharsets.UTF_8.encode(Test));// 此处断点服务端会出现IO阻塞的情况System.out.println(--------------------------------------------------------);}
}6.2 阻塞问题
在上述代码运行过程中服务端存在以下两个阻塞的情况 ServerSocketChannel 阻塞服务端等待客户端连接accept() 方法存在阻塞 // 可通过设置 ServerSocketChannel 为非阻塞
serverSocketChannel.configureBlocking(false);设置完之后serverSocketChannel.accept() 在没有客户端连接的时候返回值为 null : 同时如果 socketChannel 不为 null 的时候放入上述 list 中才有意义需要进行判断 ...
// 只有不为空的时候才添加客户端
if(socketChannel!null){socketChannels.add(socketChannel);
}
...SocketChannel 阻塞客户端IO通信的阻塞channel.read() 方法存在阻塞 // 设置 socketChannel 非阻塞
socketChannel.configureBlocking(false);修改调整过后的代码如下主要在服务端修改
public class NIOServer {public static void main(String[] args) throws IOException {// 创建 ServerSocketChannelServerSocketChannel serverSocketChannel ServerSocketChannel.open();// 设置 ServerSocketChannel 非阻塞serverSocketChannel.configureBlocking(false);// 设置服务端的监听端口客户端通过网络进行访问的时候需要IP和端口serverSocketChannel.bind(new InetSocketAddress(8000));// 用于保存建立的连接ListSocketChannel socketChannels new ArrayList();ByteBuffer buffer ByteBuffer.allocate(20);// 接受客户端的连接while (true){System.out.println(等待客户端连接... );// socketChannel 代表服务端和客户端连接的一个通道【连接阻塞】SocketChannel socketChannel serverSocketChannel.accept();System.out.println(已于客户端建立连接...socketChannel);if(socketChannel!null){// 设置 socketChannel 非阻塞socketChannel.configureBlocking(false);socketChannels.add(socketChannel);}// 客户端与服务端通信过程for (SocketChannel channel : socketChannels) {System.out.println(开始接受处理客户端数据...);// 读取客户端提交的数据【网络通讯IO阻塞】channel.read(buffer);// 设置读模式buffer.flip();// 打印输出接收到的消息System.out.println(客户端消息 StandardCharsets.UTF_8.decode(buffer));// 设置写模式buffer.clear();System.out.println(通信已结束...);}}}
}::: tip 存在的问题
上述代码虽然解决的 ServerSocketChannel 和 SocketChannel 阻塞的问题但是存在 空转、死循环 会进一步导致CPU占用过高的问题
故需要引入一个类似于 监管者 的角色也就是后文的 selector用来监管连接的创建和IO的通讯即 ServerSocketChannel 和 SocketChannel
:::
7. Selector
7.1 基础介绍
在引入 selector 之前需要对它有一个大概的了解。
selector 并不会实时监管所有的 ServerSocketChannel 和 SocketChannel 而是在以下常用的特定场景状态下才会被监管
accept()ServerSocketChannel 的连接建立read()SocketChannel 中的读操作write()SocketChannel 中的写操作connect()主要用于客户端中…
并且在实际使用时 selector 只有在非阻塞的情况下才生效也就是需要添加以下配置才生效
// 设置 ServerSocketChannel 为非阻塞
serverSocketChannel.configureBlocking(false);
// 设置 SocketChannel 为非阻塞
socketChannel.configureBlocking(false);另外还需要了解在 selector 中的两个重要属性 keys 将需要监控的所有的 Channel 都注册到这个 keys 属性中 通过 channel.register() 配置 selector;
通过 interestOps 配置需要监控的状态selectionKeys 存储的是实际发生以上监控状态的 Channel 通过 selector.select() 去监听发生的特定状态的Channel
当监听到特定事件之后会将 keys 中的 Channel 移动到 selectionKeys 中。
后续就可以通过 selectedKeys() 方法获取并处理特定 Channel 事件由于 SelectionKey 中存在的 Channel 可能是 ServerSocketChannel 或者 SocketChannel故在后续业务处理中需要使用以下方法进行区分 key.isAcceptable()如果返回true则表示当前 selectKey 缓存的是 ServerSocketChannel 对象 key.isReadable()如果返回true则表示当前 selectKey 缓存的是 SocketChannel 对象 key.isWritable()如果返回true则表示当前 selectKey 缓存的是 SocketChannel 对象
7.2 创建连接代码示例
为了进一步说明理解 selector 接下来我们将一步一步的结合代码进行测试演示首先创建服务端的代码并设置为非阻塞模式
/*服务端代码*/
public class NIOServer {public static void main(String[] args) throws IOException {// 创建 ServerSocketChanneltry (ServerSocketChannel serverSocketChannel ServerSocketChannel.open()) {// 设置 ServerSocketChannel 非阻塞serverSocketChannel.configureBlocking(false);// 设置服务端的监听端口客户端通过网络进行访问的时候需要IP和端口serverSocketChannel.bind(new InetSocketAddress(8000));}}
}接着引入 selector :
/*服务端代码*/
public static void main(String[] args) throws IOException {// 创建 ServerSocketChanneltry (ServerSocketChannel serverSocketChannel ServerSocketChannel.open()) {// 设置 ServerSocketChannel 非阻塞serverSocketChannel.configureBlocking(false);// 设置服务端的监听端口客户端通过网络进行访问的时候需要IP和端口serverSocketChannel.bind(new InetSocketAddress(8000));// 引入 selectorSelector selector Selector.open();// 将当前 ServerSocketChannel 注册到 selector 中返回 selectKeySelectionKey selectionKey serverSocketChannel.register(selector, 0, null);// 配置 selectKey 监听 accept 状态selectionKey.interestOps(SelectionKey.OP_ACCEPT);while (true) {// 【会阻塞】开始监控只有监控到有实际连接或读写操作才会处理selector.select();}}
}此时的状态
运行上述服务端代码之后在上述 12 行代码前后进行断点执行注册代码前 执行注册代码后 接着代码正常运行会在17行阻塞住一直等待客户端的连接
完成上述配置操作后我们可以启动客户端进行连接上述 selector.select() 方法在没有客户端连接发生的时候会一直处于等待的状态一但有连接发生它就会将 keys 中监控的当前连接 Channel 复制到 selectionKeys 中 接下来添加对 selectedKeys 中的数据进行处理的方法
/*服务端代码*/
public static void main(String[] args) throws IOException {// 创建 ServerSocketChanneltry (ServerSocketChannel serverSocketChannel ServerSocketChannel.open()) {// 设置 ServerSocketChannel 非阻塞serverSocketChannel.configureBlocking(false);// 设置服务端的监听端口客户端通过网络进行访问的时候需要IP和端口serverSocketChannel.bind(new InetSocketAddress(8000));// 引入 selectorSelector selector Selector.open();// 将当前 ServerSocketChannel 注册到 selector 中返回 selectKeyselector {WindowsSelectorImpl910}SelectionKey selectionKey serverSocketChannel.register(selector, 0, null);// 配置 selectKey 监听 accept 状态selectionKey.interestOps(SelectionKey.OP_ACCEPT);while (true) {// 【会阻塞】开始监控只有监控到有实际连接或读写操作才会处理selector.select();// 获取所有有效的SelectionKey需要使用iterator遍历因为后续会删除不能使用for循环for循环不能删除IteratorSelectionKey iterator selector.selectedKeys().iterator();// 只有在确认有有效状态的情况下才会进行以下循环避免了空转和死循环的问题while (iterator.hasNext()) {SelectionKey key iterator.next();// 用完之后务必删除否则会出现空指针iterator.remove();// 获取对应的 Channelif (key.isAcceptable()) {// 连接 ServerSocketChannelServerSocketChannel channel (ServerSocketChannel) key.channel();SocketChannel socketChannel channel.accept();// 或者直接使用上述创建好的 serverSocketChannel// SocketChannel socketChannel serverSocketChannel.accept();System.out.println(channel socketChannel);}}}}
}启动客户端之后服务端正常输出连接信息
7.3 服务端读消息代码示例
针对客户端的写事件需要在连接之后进行创建
/*服务端代码*/
public static void main(String[] args) throws IOException {// 创建 ServerSocketChanneltry (ServerSocketChannel serverSocketChannel ServerSocketChannel.open()) {// 设置 ServerSocketChannel 非阻塞serverSocketChannel.configureBlocking(false);// 设置服务端的监听端口客户端通过网络进行访问的时候需要IP和端口serverSocketChannel.bind(new InetSocketAddress(8000));// 引入 selectorSelector selector Selector.open();// 将当前 ServerSocketChannel 注册到 selector 中返回 selectKeyselector {WindowsSelectorImpl910}SelectionKey selectionKey serverSocketChannel.register(selector, 0, null);// 配置 selectKey 监听 accept 状态selectionKey.interestOps(SelectionKey.OP_ACCEPT);while (true) {// 【会阻塞】开始监控只有监控到有实际连接或读写操作才会处理selector.select();// 获取所有有效的SelectionKey需要使用iterator遍历因为后续会删除不能使用for循环for循环不能删除IteratorSelectionKey iterator selector.selectedKeys().iterator();// 只有在确认有有效状态的情况下才会进行以下循环避免了空转和死循环的问题while (iterator.hasNext()) {SelectionKey key iterator.next();// 用完之后务必删除否则会出现空指针iterator.remove();// 获取对应的 Channelif (key.isAcceptable()) {// 连接 ServerSocketChannelServerSocketChannel channel (ServerSocketChannel) key.channel();SocketChannel socketChannel channel.accept();// 或者直接只有上述创建好的 serverSocketChannel// SocketChannel socketChannel serverSocketChannel.accept();socketChannel.configureBlocking(false);SelectionKey register socketChannel.register(selector, 0, null);register.interestOps(SelectionKey.OP_READ);System.out.println(channel socketChannel);}else if(key.isReadable()){// 读 SocketChannelSocketChannel socketChannel (SocketChannel) key.channel();ByteBuffer byteBuffer ByteBuffer.allocate(20);socketChannel.read(byteBuffer);// 设置读模式byteBuffer.flip();System.out.println(msg StandardCharsets.UTF_8.decode(byteBuffer));}}}}
}正常运行客户端服务端之后服务端打印输出如下
客户端示例代码如下
/*NIO客户端*/
public class NIOClient {public static void main(String[] args) throws IOException {// 创建 socketChannel 用于通信连接SocketChannel socketChannel SocketChannel.open();// 连接服务端socketChannel.connect(new InetSocketAddress(8000));socketChannel.write(StandardCharsets.UTF_8.encode(Test));System.out.println(--------------------------------------------------------);}
}注意当客户端发送的数据长度大于服务端Buffer的长度时
客户端只发送一次数据服务端会多次调用 select() 方法多次处理直到当前消息处理完毕之后整个流程才算结束
在某些特殊操作下服务器端无法处理select() 方法就会频繁调用如在客户端非正常关闭会发送-1的状态服务端处理不了会一直进行 select() 方法的调用可通过调用 selectKey.cancel() 来调用修改调整以下代码
...
}else if(key.isReadable()){try{// 读 SocketChannelSocketChannel socketChannel (SocketChannel) key.channel();ByteBuffer byteBuffer ByteBuffer.allocate(20);int read socketChannel.read(byteBuffer);if(read -1){key.cancel();}else{// 设置读模式byteBuffer.flip();System.out.println(msg StandardCharsets.UTF_8.decode(byteBuffer));}}catch (Exception e){e.printStackTrace();key.cancel();}
}
...7.4 半包和粘包
一旦buffer缓冲区设置不合理就会出现半包和粘包的问题第6章节的代码亦是例如以下客户端像服务端发生Hello World消息的代码
/*客户端代码*/
public static void main(String[] args) throws IOException {// 创建 socketChannel 用于通信连接SocketChannel socketChannel SocketChannel.open();// 连接服务端socketChannel.connect(new InetSocketAddress(8000));// 发送数据socketChannel.write(StandardCharsets.UTF_8.encode(Hello World));socketChannel.close();
}为了能演示出效果此时服务端的代码如下所示
/*服务端代码*/
public static void main(String[] args) throws IOException {// 创建 ServerSocketChanneltry (ServerSocketChannel serverSocketChannel ServerSocketChannel.open()) {// 设置 ServerSocketChannel 非阻塞serverSocketChannel.configureBlocking(false);// 设置服务端的监听端口客户端通过网络进行访问的时候需要IP和端口serverSocketChannel.bind(new InetSocketAddress(8000));// 引入 selectorSelector selector Selector.open();// 将当前 ServerSocketChannel 注册到 selector 中返回 selectKeyselector {WindowsSelectorImpl910}SelectionKey selectionKey serverSocketChannel.register(selector, 0, null);// 配置 selectKey 监听 accept 状态selectionKey.interestOps(SelectionKey.OP_ACCEPT);while (true) {// 【会阻塞】开始监控只有监控到有实际连接或读写操作才会处理selector.select();// 获取所有有效的SelectionKey需要使用iterator遍历因为后续会删除不能使用for循环for循环不能删除IteratorSelectionKey iterator selector.selectedKeys().iterator();// 只有在确认有有效状态的情况下才会进行以下循环避免了空转和死循环的问题while (iterator.hasNext()) {SelectionKey key iterator.next();// 用完之后务必删除否则会出现空指针iterator.remove();// 获取对应的 Channelif (key.isAcceptable()) {// 连接 ServerSocketChannelServerSocketChannel channel (ServerSocketChannel) key.channel();SocketChannel socketChannel channel.accept();// 或者直接只有上述创建好的 serverSocketChannel// SocketChannel socketChannel serverSocketChannel.accept();socketChannel.configureBlocking(false);SelectionKey register socketChannel.register(selector, 0, null);register.interestOps(SelectionKey.OP_READ);System.out.println(channel socketChannel);}else if(key.isReadable()){// 读 SocketChannelSocketChannel socketChannel (SocketChannel) key.channel();ByteBuffer byteBuffer ByteBuffer.allocate(10);int read socketChannel.read(byteBuffer);if(read -1){key.cancel();}else{// 设置读模式byteBuffer.flip();System.out.println(msg StandardCharsets.UTF_8.decode(byteBuffer));}}}}}
}注意上述第 39 行代码我们分配的 ByteBuffer 大小为 10 个字节此时运行服务端和客户端之后服务端会打印输出以下内容 从运行结果分析可以发现客户端明明只发了一次消息但是服务端却打印出两条消息这显然是不符合业务要求的。
Hello World 为11个字节长度此处我们有两种解决方式可选
修改 ByteBuffer 的大小为11保证能接收到客户端发送的消息 ByteBuffer 一旦定义就不可修改故此方法不可靠通过分割符来甄别一条完整的消息以此解决半包和粘包的问题以下为详细的解决办法 为了能完整获取客户端发送的数据需要进行一些数据处理例如添加分隔符分隔符的目的是为了甄别一条完整的信息引入以下方法
/*解决半包粘包问题*/
private static void doLineSplit(ByteBuffer byteBuffer) {// 设置读模式byteBuffer.flip();for (int i 0; i byteBuffer.limit(); i) {if (\n byteBuffer.get(i)) {int length i 1 - byteBuffer.position();ByteBuffer target ByteBuffer.allocate(length);// 取数据for (int j 0; j length; j) {target.put(byteBuffer.get());}// 设置读模式target.flip();System.out.println(StandardCharsets.UTF_8.decode(target) StandardCharsets.UTF_8.decode(target));}}byteBuffer.compact();
}接着修改服务端读取客户端消息部分的方法
...
}else if(key.isReadable()){// 读 SocketChannelSocketChannel socketChannel (SocketChannel) key.channel();ByteBuffer byteBuffer ByteBuffer.allocate(10);int read socketChannel.read(byteBuffer);if(read -1){key.cancel();}else{doLineSplit(byteBuffer);}
}
...修改客户端发送代码在 Hello World 后面增加 \n 如下所示
public static void main(String[] args) throws IOException {// 创建 socketChannel 用于通信连接SocketChannel socketChannel SocketChannel.open();// 连接服务端socketChannel.connect(new InetSocketAddress(8000));// 发送数据socketChannel.write(StandardCharsets.UTF_8.encode(Hello World\n));socketChannel.close();
}运行修改过后的代码服务端输出结果如下 为什么输出的只有 d 前面的内容哪里去了别慌请听我狡辩
在上述的 doLineSplit() 方法中确实是能通过分隔符 \n 来获取完整的消息的但是有一个前提就是 ByteBuffer 必须是同一个。
但是在 select() 事件监听并处理的代码中每一次都是一个新的 ByteBuffer还记得下面的代码吗
在每次进入到 key.isReadable() 条件成立的方法后我们会新建 ByteBuffer:ByteBuffer byteBuffer ByteBuffer.allocate(10);
这样一来就会导致select()方法两次调用处理的ByteBuffer没有关联上第一次不会打印是因为没有读取到 \n 分隔符在第二次读取的时候亦没有获取到前一次读取的结果故只读取并打印到 d\n 字符当然这也有解决办法那就是将 ByteBuffer 和 Channel 绑定在一起保证一个 Channel 多次操作中 ByteBuffer 为同一个
还记得 SelectionKey.register(sql,ops,att) 方法吗这个方法中有三个参数
sql注册Channel的选择器ops设置要监听的状态att需要绑定的附件可以为空
我们可以通过如下设置 att 属性来给每一个 Channel 绑定一个 Channel 共享的 ByteBuffer 修改以下服务端代码
...if (key.isAcceptable()) {// 连接 ServerSocketChannelServerSocketChannel channel (ServerSocketChannel) key.channel();SocketChannel socketChannel channel.accept();socketChannel.configureBlocking(false);// 创建共享 ByteBufferByteBuffer byteBuffer ByteBuffer.allocate(20);SelectionKey register socketChannel.register(selector, 0, byteBuffer);register.interestOps(SelectionKey.OP_READ);System.out.println(channel socketChannel);}else if(key.isReadable()){// 读 SocketChannelSocketChannel socketChannel (SocketChannel) key.channel();// 获取共享 ByteBufferByteBuffer attachment (ByteBuffer) key.attachment();int read socketChannel.read(attachment);if(read -1){key.cancel();}else{doLineSplit(attachment);}}
...存在的问题在上述代码调整中我们创建了共享 ByteBuffer 来保证一个 Channel 中多次操作使用同一个 ByteBuffer 以此确保消息能够完整的处理
但是注意为了避免使用 compact() 方法之后ByteBuffer 中的内容超过容量大小的问题此处我是修改了 ByteBuffer 的容量大小哦
这样一来又会带来另外一个问题我不能动态修改 ByteBuffer 的容量大小如果传入的消息过长怎么办
那就需要找一个办法去扩容 ByteBuffer ~~~
7.5 ByteBuffer扩容 当我们调用上述 doLineSplit() 方法对客户端的消息处理完之后需要判断 Position 和 Limit 的值如果相等则说明当前 ByteBuffer 容量不够需要进行扩容处理反之则跳过示例代码如下
...
}else if(key.isReadable()){// 读 SocketChannelSocketChannel socketChannel (SocketChannel) key.channel();ByteBuffer byteBuffer ByteBuffer.allocate(10);int read socketChannel.read(byteBuffer);if(read -1){key.cancel();}else{doLineSplit(byteBuffer);if(byteBuffer.position() byteBuffer.limit()){//此时说明容量不够了需要进行扩容ByteBuffer newByteBuffer ByteBuffer.allocate(byteBuffer.capacity() * 2);// 将原始的ByteBuffer中的数据复制到新Buffer中newByteBuffer.put(byteBuffer);// 重新绑定新ByteBufferkey.attach(newByteBuffer);}}
}
...待优化和考虑的地方 ByteBuffer 容量不够的时候我们进行了扩容处理但是在后续请求中可能接受的数据长度远远小于扩容后的大小在多线程请求中会造成内存浪费除了扩容之外还需要考虑缩容 ByteBuffer 扩容时旧 Buffer 中的数据往新 Buffer 中的数据写时效率很低可通过零拷贝方式解决 为了避免频繁检索上述代码中的 \n 分隔符可以通过头体分离的方式来保证信息完整性
7.6 服务端写消息代码示例
上述代码已经完成了服务端创建连接并读取客户端发送的数据的代码示例接下来将继续完善服务端向客户端发送数据的功能
此处我们在服务端和客户端连接建立之后随即向客户端发送数据代码如下所示
/*NIO服务端*/
public class NIOServer {public static void main(String[] args) throws IOException {ServerSocketChannel serverSocketChannel ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress(8000));Selector selector Selector.open();serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);while (true) {selector.select();IteratorSelectionKey iterator selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey selectionKey iterator.next();iterator.remove();if (selectionKey.isAcceptable()) {SocketChannel socketChannel serverSocketChannel.accept();socketChannel.configureBlocking(false);SelectionKey skey socketChannel.register(selector, SelectionKey.OP_READ);//准备数据StringBuilder sb new StringBuilder();for (int i 0; i 2000000; i) {sb.append(abcdabcd);}ByteBuffer buffer StandardCharsets.UTF_8.encode(sb.toString());while (buffer.hasRemaining()){int write socketChannel.write(buffer);System.out.println(write write);}}}}}
}/*NIO客户端*/
public class NIOClient {public static void main(String[] args) throws IOException {// 创建 socketChannel 用于通信连接try (SocketChannel socketChannel SocketChannel.open()) {// 连接服务端socketChannel.connect(new InetSocketAddress(8000));// 接受服务端数据ByteBuffer buffer ByteBuffer.allocate(1024);int read 0;while (true) {read socketChannel.read(buffer);System.out.println(read read);buffer.clear();}}}
}上述代码运行之后服务端和客户端控制台打印输出的结果如下图所示
通过上面的运行结果发现服务端发送了很多空数据这是因为受到了发生速率的限制为了解决这个问题这个时候我们就可使用 isWriteable() 方法来监听 write 的状态
...
if (selectionKey.isAcceptable()) {SocketChannel socketChannel serverSocketChannel.accept();socketChannel.configureBlocking(false);SelectionKey skey socketChannel.register(selector, SelectionKey.OP_READ);//准备数据StringBuilder sb new StringBuilder();for (int i 0; i 2000000; i) {sb.append(abcdabcd);}ByteBuffer buffer StandardCharsets.UTF_8.encode(sb.toString());// 先写一次int write socketChannel.write(buffer);System.out.println(write write);// 判断是否写完if (buffer.hasRemaining()) {//说明么有写完为当前的 SocketChannel 增加 write 的监听// READ 和 Writeskey.interestOps(skey.interestOps() SelectionKey.OP_WRITE);// 把当前操作传给下一个操作skey.attach(buffer);}
} else if (selectionKey.isWritable()) {// 获取客户端 ChannelSocketChannel socketChannel (SocketChannel) selectionKey.channel();// 获取 BufferByteBuffer buffer (ByteBuffer) selectionKey.attachment();// 写操作int write socketChannel.write(buffer);System.out.println(write write);if (!buffer.hasRemaining()) {//写完了selectionKey.attach(null);selectionKey.interestOps(selectionKey.interestOps() - SelectionKey.OP_WRITE);}
}
...这样一来运行改动过后的代码服务端就不会发生过多的空数据进而提高了服务端的处理消息的能力服务端输出结果如下
8. Reactor 模式
8.1 单线程模式 在单线程模式中客户端的连接以及后续的读写操作都是由一个线程来完成的存在效率低的问题
8.2 主从多线程模式 在这种模式下将客户端连接相关的交由一个独立的图中Boss线程处理后续读写操作交由其它图中Worker线程处理
8.3 代码实现
参照上述主从多线程模式的图例我们需要将IO的读写操作用单个 Worker 线程来处理故我们首先需要创建 Worker 线程类
// Worker 线程类
public class Worker implements Runnable {private final String name;private Selector selector;// 多线程环境下的状态需要增加 volatileprivate volatile boolean created;// 为了传递线程间的变量private final ConcurrentLinkedDequeRunnable concurrentLinkedDeque new ConcurrentLinkedDeque();public Worker(String name) {this.name name;}public void register(SocketChannel sc) throws IOException {if (!created){// 每个 Worker 创建一个线程Thread thread new Thread(this, name);selector Selector.open();thread.start();created true;}// 放到一个线程中保证有序执行concurrentLinkedDeque.add(()-{try {sc.register(selector, SelectionKey.OP_READ SelectionKey.OP_WRITE);} catch (ClosedChannelException e) {throw new RuntimeException(e);}});// 唤醒阻塞的select.select()selector.wakeup();}Overridepublic void run() {while (true) {try {selector.select();Runnable poll concurrentLinkedDeque.poll();if(poll!null){poll.run();}IteratorSelectionKey iterator selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey scKey iterator.next();iterator.remove();if (scKey.isReadable()) {SocketChannel socketChannel (SocketChannel) scKey.channel();ByteBuffer byteBuffer ByteBuffer.allocate(30);socketChannel.configureBlocking(false);socketChannel.read(byteBuffer);byteBuffer.flip();System.out.println(Message StandardCharsets.UTF_8.decode(byteBuffer));byteBuffer.clear();}}} catch (IOException e) {throw new RuntimeException(e);}}}
}接着修改服务端的代码
// 服务端代码
public class ReactorBossServer {public static void main(String[] args) throws IOException, InterruptedException {ServerSocketChannel ssc ServerSocketChannel.open();ssc.configureBlocking(false);ssc.bind(new InetSocketAddress(8000));Selector selector Selector.open();ssc.register(selector, SelectionKey.OP_ACCEPT);// 模拟多线程此处示例为2个Worker[] workers new Worker[2];for (int i 0; i workers.length; i) {workers[i] new Worker(workeri);}AtomicInteger index new AtomicInteger();while (true) {// 监控连接selector.select();IteratorSelectionKey iterator selector.selectedKeys().iterator();while (iterator.hasNext()) {SelectionKey selectionKey iterator.next();iterator.remove();if (selectionKey.isAcceptable()) {ServerSocketChannel serverSocketChannel (ServerSocketChannel) selectionKey.channel();SocketChannel socketChannel serverSocketChannel.accept();socketChannel.configureBlocking(false);// hash取模 x%2 结果0或1workers[index.getAndIncrement()%workers.length].register(socketChannel);}}}}
}最后运行当多个客户端连接之后服务端轮转进行处理此处自行进行代码测试
9. 零拷贝
在没有任何优化操作前以读取文件数据在到写数据的流程为例进行数据拷贝的分析如下图所示 在调用 Read() 方法之后JVM 会通知操作系统由操作系统调用操作文件相关的 API 来读取硬盘上的数据随后将数据存储在操作系统的内存 高速页缓存/内核缓冲区中进而过渡传递到 JVM 中的应用缓存【做了2次数据的拷贝】同理在调用 write() 写数据时也发生了两次数据拷贝整个操作下来发生了【4次数据拷贝】故此效率偏低
9.1 内存映射
在 NIO 中有个 内存映射 的概念通过内存映射可以将 高速页缓存 中的数据 共享 给 应用缓存 同时减少了数据拷贝的次数示例图如下 在代码中可使用以下代码创建直接缓冲区
ByteBuffer.allocateDirect(10);内存映射 主要用于文件的操作 使用直接内存的好处如上图所示就是减少了数据拷贝的次数但是带来的问题就是需要手动进行内存析构否则会造成内存浪费 9.2 零拷贝
零拷贝不涉及到虚拟机内存的拷贝
在 Linux2.1 和 Linux2.4 内核中存在 sendFile() 方法其两者的拷贝区别如下 可以看出在 Linux2.4 的内核中拷贝次数比 Linux2.1 又少了1次效率又提高了
在 Java 中使用 file.transferTo() 或 file.transferFrom() 方法即可调用 sendFile() 方法