德惠市建设局网站,苏州做网站0512jinyan,潮州营销型网站建设推广,南海佛山网站建设介绍下netty
Netty 是一个基于 Java 的异步事件驱动的网络应用框架#xff0c;提供了用于快速开发高性能、高可扩展性的协议服务器和客户端的工具
BIO、NIO、AIO 的区别是什么
BIO
blocking io#xff0c;同步阻塞IO#xff0c;比较简单#xff0c;为每个请求分配一个线…介绍下netty
Netty 是一个基于 Java 的异步事件驱动的网络应用框架提供了用于快速开发高性能、高可扩展性的协议服务器和客户端的工具
BIO、NIO、AIO 的区别是什么
BIO
blocking io同步阻塞IO比较简单为每个请求分配一个线程处理基于stream流
缺点
性能问题 并发量大需要创建大量线程导致系统开销增加性能下降资源浪费 每个连接需要一个线程处理连接空闲时线程也占用系统资源浪费资源
因此jdk1.4引入了NIO
NIO
NIO基于channel和buffer的非阻塞IO
特点
非阻塞 channel是双向的可读可写而传统的stream是单向的设置非阻塞后线程在没有数据可读可写时立即返回而不是阻塞等待通道和缓冲区 缓冲区提高数据的读写效率并支持直接内存访问direct buffer避免JVM内存和系统内存之间的复制选择器selector同时监控多个通道的IO事件使得一个线程可以管理多个通道减少线程的数量和上下文切换的开销
AIO
NIO的selector在做轮询的时候如果没有事件发生也会阻塞如何优化
jdk1.7引入AIO真正异步IO可以提交读写操作立刻返回无需等待操作完成操作系统会通知IO操作完会回调相应的处理器不需要线程阻塞等待。
netty的应用场景
分布式系统中的 RPC 框架网络通信工具、HTTP 服务器、即时通讯系统、消息推送系统等
你们用在什么场景 用作服务比如agi服务提高通话服务的并发
netty的核心组件
包括 ByteBuf 字节容器、Bootstrap 和 ServerBootstrap 启动引导类、Channel 网络操作抽象类、EventLoop 事件循环等
channel
Netty 中用于网络 I/O 操作的基本构件类似于传统的 Java NIO 中的 Channel。它代表了一个打开的连接可以执行读、写、连接和绑定等操作
NioSocketChannel基于 NIO 的 Socket 通道实现了客户端连接。NioServerSocketChannel基于 NIO 的服务器 Socket 通道实现了服务器端的监听。NioDatagramChannel基于 NIO 的 UDP 通道用于无连接的数据报传输
EventLoop 和 EventLoopGroup
EventLoop 是一个处理所有 I/O 事件的核心抽象负责在其生命周期内处理一个或多个 Channel 的 I/O 操作。EventLoopGroup 是一组 EventLoop用于管理和调度多个 EventLoop。
NioEventLoopGroup使用 NIO Selector 实现的 EventLoopGroup。EpollEventLoopGroup使用 epoll 实现的 EventLoopGroup适用于 Linux 平台。
Bootstrap 和 ServerBootstrap
Netty 提供的辅助类用于简化客户端和服务器的启动配置
ChannelFuture
代表一个异步的 I/O 操作结果提供了检查操作是否完成的方法并可以注册监听器在操作完成时得到通知。
ChannelHandler 和 ChannelPipeline
ChannelHandler 是处理 I/O 事件或拦截 I/O 操作的核心组件。ChannelPipeline 是一个处理 ChannelHandler 链的容器负责将 I/O 事件按顺序传递给链中的各个处理器。
常见的 ChannelHandler
ChannelInboundHandler处理入站 I/O 事件。ChannelOutboundHandler处理出站 I/O 操作
ByteBuf
ByteBuf 是 Netty 提供的用于数据读写的缓冲区比 JDK 的 ByteBuffer 更加灵活和高效。它支持动态扩展和各种操作如切片、复制和聚合
ChannelInitializer
ChannelInitializer 是一个特殊的 ChannelInboundHandler用于在 Channel 注册到 EventLoop 后初始化 ChannelPipeline。
Reactor 线程模型
一种并发编程模型定义了三种角色 Reactor负责监听和分配事件将I/O事件分派给对应的Handler。新的事件包含连接建立就绪、读就绪、写就绪等。 Acceptor处理客户端新连接并分派请求到处理器链中。 Handler将自身与事件绑定执行非阻塞读/写任务完成channel的读入完成处理业务逻辑后负责将结果写出channel
三类 单reactor单线程
![[Pasted image 20240626220034.png]] 单reactor多线程 ![[Pasted image 20240626220130.png]] 多reactor多线程主从reactor也称为1MN 线程模式被nginx、netty、memcached等使用
MainReactor 只负责监听客户端连接请求和客户端建立连接之后将连接交由SubReactor 监听后面的 IO 事件 ![[Pasted image 20240626220154.png]]
netty的reactor实现
![[Pasted image 20240626221336.png]]
Netty 的高性能体现在哪些方面
1. 异步非阻塞 I/O (NIO)
Netty 基于 Java NIO 库构建使用异步非阻塞 I/O 模型有效地利用了系统资源。与传统的阻塞 I/O 模型相比NIO 可以在同一个线程中处理多个连接减少了线程切换和上下文切换的开销。
2. 高效的线程模型
Netty 提供了灵活的线程模型通过事件循环 (EventLoop) 和工作线程池来处理 I/O 事件和任务。默认情况下Netty 使用主从 Reactor 模型主线程组处理客户端连接工作线程组处理读写和业务逻辑。这样的设计避免了线程之间的竞争提高了性能。
3. 零拷贝 (Zero-Copy)
Netty 使用了多种零拷贝技术来减少数据在内存中的拷贝次数提高 I/O 效率。例如
FileRegion 用于直接将文件内容传输到网络中。使用 DirectBuffer 直接进行 I/O 操作而不需要将数据从用户空间复制到内核空间。
4. 内存管理
Netty 提供了高效的内存管理机制包括 PooledByteBufAllocator 和 UnpooledByteBufAllocator。通过池化的方式来分配和管理内存减少了频繁的内存分配和回收的开销从而提高了性能。
5. Pipeline 和 Handler 机制
Netty 使用了责任链模式通过 ChannelPipeline 和 ChannelHandler 来处理网络事件。每个 ChannelHandler 只关注自己的处理逻辑避免了复杂的逻辑集中在一个地方。这样的设计不仅提高了代码的可维护性还通过流水线方式提升了处理效率。
6. 事件驱动模型
Netty 的事件驱动模型使得它能够高效地处理网络事件。所有的 I/O 操作都是非阻塞的通过事件通知机制来触发相应的操作而不是通过轮询的方式。这种方式减少了不必要的系统调用和 CPU 占用。
7. 支持多种协议
Netty 支持多种协议的编解码器可以方便地处理各种网络协议如 HTTP, WebSocket, FTP 等。这些编解码器经过优化能够高效地进行协议解析和数据处理减少了开发者自己实现的负担。
8. 高度可定制
Netty 提供了高度可定制的 API可以根据具体应用的需求进行优化。例如可以自定义线程池、事件循环、内存分配器等从而在不同的场景下实现最佳性能。
拆包和粘包
使用 TCP 协议时。它们分别指数据包在传输过程中被拆分成多个小包拆包或者多个数据包被合并成一个大包粘包的现象
拆包
拆包是指一个完整的数据包在传输过程中被拆分成多个小包。例如发送端发送了一个较大的数据包但接收端只能接收到一部分数据然后再接收剩下的数据。
粘包
粘包是指多个数据包在传输过程中被合并成一个大包。例如发送端连续发送多个小数据包但接收端在一次接收操作中接收到多个数据包的数据。
解决方案 固定长度法 每个数据包的长度是固定的接收端每次按照固定的长度进行读取。缺点是可能会浪费带宽因为长度是固定的无论实际数据量多少都要填充到固定长度。 特殊分隔符法 在数据包之间使用特殊分隔符接收端通过分隔符来区分数据包。常见的分隔符有换行符 \n、自定义分隔符等。 包头加包体法 在数据包的头部增加一个固定长度的包头包头中包含数据包的长度信息。接收端首先读取包头根据包头中的长度信息再读取相应长度的包体 Netty 提供了多种解码器来解决粘包问题比如固定长度解码、分隔符解码、长度字段解码等。
使用固定长度帧解码器 (FixedLengthFrameDecoder)
public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new FixedLengthFrameDecoder(20)); // 假设每个消息长度为20 ch.pipeline().addLast(new NettyServerHandler()); }使用行分隔符解码器 (LineBasedFrameDecoder)
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));// 设定最大帧长度为1024使用定界符解码器 (DelimiterBasedFrameDecoder)
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, Unpooled.wrappedBuffer(new byte[]{|})));使用基于长度字段的解码器 (LengthFieldBasedFrameDecoder) 在消息头部添加一个长度字段用于指示消息体的长度
server:
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));client:
ch.pipeline().addLast(new LengthFieldPrepender(4));Netty 的长连接和心跳机制是如何工作
长连接是指客户端和服务器之间建立的连接在一次创建后能够长期保持,Netty 默认使用的就是长连接模式。可以通过设置 ChannelOption.SO_KEEPALIVE 为 true 来启用 TCP 层的心跳检测,通常我们还会在应用层实现自己的心跳机制以确保更细粒度的控制。
实现步骤
添加心跳处理器在服务器端处理心跳请求在客户端定时发送心跳请求
server:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;public class HeartbeatServer {public static void main(String[] args) throws InterruptedException {EventLoopGroup bossGroup new NioEventLoopGroup(1);EventLoopGroup workerGroup new NioEventLoopGroup();try {ServerBootstrap b new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializerSocketChannel() {Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new IdleStateHandler(60, 0, 0)); // 60秒内没有读操作则触发IdleStateEventch.pipeline().addLast(new HeartbeatServerHandler());}});b.bind(8080).sync().channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}static class HeartbeatServerHandler extends SimpleChannelInboundHandlerObject {Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {// 处理其他消息}Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent event (IdleStateEvent) evt;switch (event.state()) {case READER_IDLE:System.out.println(读超时关闭连接);ctx.close();break;default:break;}} else {super.userEventTriggered(ctx, evt);}}Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}}
}
client:
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.concurrent.ScheduledFuture;import java.util.concurrent.TimeUnit;public class HeartbeatClient {public static void main(String[] args) throws InterruptedException {EventLoopGroup group new NioEventLoopGroup();try {Bootstrap b new Bootstrap();b.group(group).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(new ChannelInitializerSocketChannel() {Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new IdleStateHandler(0, 30, 0)); // 30秒内没有写操作则触发IdleStateEventch.pipeline().addLast(new HeartbeatClientHandler());}});ChannelFuture f b.connect(localhost, 8080).sync();f.channel().closeFuture().sync();} finally {group.shutdownGracefully();}}static class HeartbeatClientHandler extends SimpleChannelInboundHandlerObject {private ScheduledFuture? heartBeat;Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {heartBeat ctx.executor().scheduleAtFixedRate(() - {ctx.writeAndFlush(HEARTBEAT);System.out.println(发送心跳);}, 0, 30, TimeUnit.SECONDS);}Overrideprotected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {// 处理服务器响应}Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent event (IdleStateEvent) evt;switch (event.state()) {case WRITER_IDLE:System.out.println(写超时发送心跳);ctx.writeAndFlush(HEARTBEAT);break;default:break;}} else {super.userEventTriggered(ctx, evt);}}Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {if (heartBeat ! null) {heartBeat.cancel(false);}}}
}
Netty 支持哪些序列化协议
Netty 支持多种序列化协议用于不同场景下的数据传输需求。以下是一些常见的序列化协议及其在 Netty 中的实现方式
1. Java 序列化
Java 序列化使用 Java 内置的 ObjectOutputStream 和 ObjectInputStream可以将 Java 对象转换为字节流。
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;// 服务端
ch.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
ch.pipeline().addLast(new ObjectEncoder());// 客户端
ch.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
ch.pipeline().addLast(new ObjectEncoder());
2. JSON 序列化
JSON 序列化使用文本格式易于阅读和调试。可以使用 Jackson 或 Gson 进行 JSON 序列化。
使用 Jackson
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.handler.codec.json.JsonObjectDecoder;
import io.netty.handler.codec.MessageToMessageDecoder;
import io.netty.handler.codec.MessageToMessageEncoder;public class JsonEncoder extends MessageToMessageEncoderObject {private final ObjectMapper objectMapper new ObjectMapper();Overrideprotected void encode(ChannelHandlerContext ctx, Object msg, ListObject out) throws Exception {byte[] bytes objectMapper.writeValueAsBytes(msg);out.add(Unpooled.wrappedBuffer(bytes));}
}public class JsonDecoder extends MessageToMessageDecoderByteBuf {private final ObjectMapper objectMapper new ObjectMapper();private final Class? clazz;public JsonDecoder(Class? clazz) {this.clazz clazz;}Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf msg, ListObject out) throws Exception {byte[] bytes new byte[msg.readableBytes()];msg.readBytes(bytes);Object obj objectMapper.readValue(bytes, clazz);out.add(obj);}
}// 服务端和客户端
ch.pipeline().addLast(new JsonObjectDecoder());
ch.pipeline().addLast(new JsonDecoder(MyClass.class));
ch.pipeline().addLast(new JsonEncoder());
3. Protobuf 序列化
ProtobufProtocol Buffers是 Google 开发的一种高效的二进制序列化协议适用于高性能场景。
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;// 服务端和客户端
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
ch.pipeline().addLast(new ProtobufDecoder(MyProtoClass.getDefaultInstance()));
ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast(new ProtobufEncoder());
5. Kryo 序列化
Kryo 是一个快速、高效的对象图序列化框架适用于需要高性能序列化的 Java 应用
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import io.netty.handler.codec.MessageToByteEncoder;
import io.netty.handler.codec.ByteToMessageDecoder;// 自定义 Kryo 解码器和编码器
public class KryoEncoder extends MessageToByteEncoderObject {private final Kryo kryo new Kryo();Overrideprotected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {ByteArrayOutputStream baos new ByteArrayOutputStream();Output output new Output(baos);kryo.writeClassAndObject(output, msg);output.close();out.writeBytes(baos.toByteArray());}
}public class KryoDecoder extends ByteToMessageDecoder {private final Kryo kryo new Kryo();Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, ListObject out) throws Exception {byte[] bytes new byte[in.readableBytes()];in.readBytes(bytes);Input input new Input(new ByteArrayInputStream(bytes));Object obj kryo.readClassAndObject(input);out.add(obj);}
}// 服务端和客户端
ch.pipeline().addLast(new KryoEncoder());
ch.pipeline().addLast(new KryoDecoder());
bytebuf相对butebuffer的优点
1. 更丰富的 API
读写索引分离ByteBuf 将读索引和写索引分离提供了 readerIndex 和 writerIndex而 ByteBuffer 只有一个位置指针通过 flip 和 clear 方法来切换读写模式。链式调用ByteBuf 的大多数方法都返回 this允许链式调用使代码更加简洁和流畅
2. 容量和动态扩展
容量管理ByteBuf 支持动态扩展容量而 ByteBuffer 的容量是固定的一旦分配不能改变。容量检查ByteBuf 提供了多种方法来检查可读字节数和可写字节数如 readableBytes() 和 writableBytes()可以避免越界错误。
3. 内存管理
池化机制ByteBuf 支持池化可以重用缓冲区减少内存分配和垃圾回收的开销。Netty 的 PooledByteBufAllocator 是一个高效的内存池实现而 ByteBuffer 只能依赖 JVM 的垃圾回收。零拷贝ByteBuf 支持零拷贝操作如 slice()、duplicate() 和 composite buffer减少数据拷贝提高性能。
4. 引用计数和生命周期管理
引用计数ByteBuf 使用引用计数来管理其生命周期通过 retain() 和 release() 方法可以精细控制内存的释放。而 ByteBuffer 只能依赖垃圾回收无法手动管理内存的释放。
5. 更灵活的缓冲区类型
堆内缓冲区和直接缓冲区ByteBuf 支持堆内缓冲区heap buffer和直接缓冲区direct buffer可以根据需要选择合适的类型。而 ByteBuffer 需要通过 ByteBuffer.allocate() 和 ByteBuffer.allocateDirect() 分别创建。复合缓冲区ByteBuf 提供了 CompositeByteBuf允许多个缓冲区组合成一个逻辑缓冲区避免数据复制。