网站建设要做固定资产吗,设计网络网站建设,小说网站做编辑器,大连网站开发哪儿好薇文章目录 前言一、超时监测二、IdleStateHandler类三、ReadTimeoutHandler类四、WriteTimeoutHandler类五、实现心跳机制5.1. 定义心跳处理器5.2. 定义 ChannelInitializer5.3. 编写服务器5.4. 测试 结语 前言
回顾Netty系列文章#xff1a;
Netty 概述#xff08;一#… 文章目录 前言一、超时监测二、IdleStateHandler类三、ReadTimeoutHandler类四、WriteTimeoutHandler类五、实现心跳机制5.1. 定义心跳处理器5.2. 定义 ChannelInitializer5.3. 编写服务器5.4. 测试 结语 前言
回顾Netty系列文章
Netty 概述一Netty 架构设计二Netty Channel 概述三Netty ChannelHandler四ChannelPipeline源码分析五字节缓冲区 ByteBuf 六上字节缓冲区 ByteBuf七下Netty 如何实现零拷贝八Netty 程序引导类九Reactor 模型十工作原理详解十一Netty 解码器十二Netty 编码器十三Netty 编解码器十四自定义解码器、编码器、编解码器十五Future 源码分析十六Promise 源码分析十七一行简单的writeAndFlush都做了哪些事十八
一、超时监测
Netty 的超时类型 IdleState 主要分为以下3类
ALL_IDLE : 一段时间内没有数据接收或者发送。READER_IDLE 一段时间内没有数据接收。WRITER_IDLE 一段时间内没有数据发送。
针对上面的 3 类超时异常Netty 提供了 3 类ChannelHandler来进行监测。
IdleStateHandler 当 Channel 一段时间未执行读取、写入或者两者都未执行时触发 -IdleStateEvent 事件。ReadTimeoutHandler 在一定时间内未读取任何数据时引发 ReadTimeoutEvent 事件。WriteTimeoutHandler 当写操作在一定时间内无法完成时引发 WriteTimeoutEvent 事件。
二、IdleStateHandler类
IdleStateHandler 包括了读\写超时状态处理观察以下 IdleStateHandler 类的构造函数源码。
public IdleStateHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {this((long)readerIdleTimeSeconds, (long)writerIdleTimeSeconds, (long)allIdleTimeSeconds, TimeUnit.SECONDS);
}public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}public IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {this.writeListener new ChannelFutureListener() {public void operationComplete(ChannelFuture future) throws Exception {IdleStateHandler.this.lastWriteTime IdleStateHandler.this.ticksInNanos();IdleStateHandler.this.firstWriterIdleEvent IdleStateHandler.this.firstAllIdleEvent true;}};this.firstReaderIdleEvent true;this.firstWriterIdleEvent true;this.firstAllIdleEvent true;ObjectUtil.checkNotNull(unit, unit);this.observeOutput observeOutput;if (readerIdleTime 0L) {this.readerIdleTimeNanos 0L;} else {this.readerIdleTimeNanos Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);}if (writerIdleTime 0L) {this.writerIdleTimeNanos 0L;} else {this.writerIdleTimeNanos Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);}if (allIdleTime 0L) {this.allIdleTimeNanos 0L;} else {this.allIdleTimeNanos Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);}}在上述源码中构造函数可以接收以下参数 readerIdleTimeSecond指定读超时时间指定 0 表明为禁用。 writerIdleTimeSecond指定写超时时间指定 0 表明为禁用。 allIdleTimeSecond在指定读写超时时间指定 0 表明为禁用。
IdleStateHandler 使用示例
public class MyChannelInitializer extends ChannelInitializerChannel {Overrideprotected void initChannel(Channel channel) throws Exception {channel.pipeline().addLast(idleStateHandler,new IdleStateHandler(60,30,0));channel.pipeline().addLast(myHandler,new MyHandler());}
}public class MyHandler extends ChannelDuplexHandler {Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if(evt instanceof IdleStateEvent){IdleStateEvent e (IdleStateEvent) evt;if(e.state() IdleState.READER_IDLE){ctx.close();}else if(e.state() IdleState.WRITER_IDLE){ctx.writeAndFlush(new PingMessage());}}}
}在上述示例中IdleStateHandler 设置了读超时时间为 60 秒写超时时间为 30 秒。MyHandler 是针对超时事件 IdleStateEvent 的处理。
如果 30 秒内没有出站流量写超时时发送 ping 消息的示例。如果 60 秒内没有入站流量读超时时连接关闭。
三、ReadTimeoutHandler类
ReadTimeoutHandler 类包括了读超时状态处理。ReadTimeoutHandler 类的源码如下
public class ReadTimeoutHandler extends IdleStateHandler {private boolean closed;public ReadTimeoutHandler(int timeoutSeconds) {this((long)timeoutSeconds, TimeUnit.SECONDS);}public ReadTimeoutHandler(long timeout, TimeUnit unit) {super(timeout, 0L, 0L, unit);//禁用了写超时、读写超时}protected final void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {assert evt.state() IdleState.READER_IDLE;//只处理读超时this.readTimedOut(ctx);}protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {if (!this.closed) {ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);//引发异常ctx.close();this.closed true;}}
}从上述源码可以看出ReadTimeoutHandler 继承自 IdleStateHandler并在构造函数中禁用了写超时、读写超时而且在处理超时时只会针对 READER_IDLE状态进行处理并引发 ReadTimeoutException 异常。 ReadTimeoutHandler 的使用示例如下
public class MyChannelInitializer extends ChannelInitializerChannel {Overrideprotected void initChannel(Channel channel) throws Exception {channel.pipeline().addLast(readTimeoutHandler,new ReadTimeoutHandler(30));channel.pipeline().addLast(myHandler,new MyHandler());}
}//处理器处理ReadTimeoutException
public class MyHandler extends ChannelDuplexHandler {Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if(cause instanceof ReadTimeoutException){//...}else {super.exceptionCaught(ctx,cause);}}
}在上述示例中ReadTimeoutHandler 设置了读超时时间是 30 秒。
四、WriteTimeoutHandler类
WriteTimeoutHandler 类包括了写超时状态处理。WriteTimeoutHandler 类的源码如下
public class WriteTimeoutHandler extends ChannelOutboundHandlerAdapter {private static final long MIN_TIMEOUT_NANOS;private final long timeoutNanos;private WriteTimeoutHandler.WriteTimeoutTask lastTask;private boolean closed;public WriteTimeoutHandler(int timeoutSeconds) {this((long)timeoutSeconds, TimeUnit.SECONDS);}public WriteTimeoutHandler(long timeout, TimeUnit unit) {ObjectUtil.checkNotNull(unit, unit);if (timeout 0L) {this.timeoutNanos 0L;} else {this.timeoutNanos Math.max(unit.toNanos(timeout), MIN_TIMEOUT_NANOS);}}public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {if (this.timeoutNanos 0L) {promise promise.unvoid();this.scheduleTimeout(ctx, promise);}ctx.write(msg, promise);}public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {WriteTimeoutHandler.WriteTimeoutTask task this.lastTask;WriteTimeoutHandler.WriteTimeoutTask prev;for(this.lastTask null; task ! null; task prev) {task.scheduledFuture.cancel(false);prev task.prev;task.prev null;task.next null;}}private void scheduleTimeout(ChannelHandlerContext ctx, ChannelPromise promise) {WriteTimeoutHandler.WriteTimeoutTask task new WriteTimeoutHandler.WriteTimeoutTask(ctx, promise);task.scheduledFuture ctx.executor().schedule(task, this.timeoutNanos, TimeUnit.NANOSECONDS);if (!task.scheduledFuture.isDone()) {this.addWriteTimeoutTask(task);promise.addListener(task);}}private void addWriteTimeoutTask(WriteTimeoutHandler.WriteTimeoutTask task) {if (this.lastTask ! null) {this.lastTask.next task;task.prev this.lastTask;}this.lastTask task;}private void removeWriteTimeoutTask(WriteTimeoutHandler.WriteTimeoutTask task) {if (task this.lastTask) {assert task.next null;this.lastTask this.lastTask.prev;if (this.lastTask ! null) {this.lastTask.next null;}} else {if (task.prev null task.next null) {return;}if (task.prev null) {task.next.prev null;} else {task.prev.next task.next;task.next.prev task.prev;}}task.prev null;task.next null;}protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {if (!this.closed) {ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE);ctx.close();this.closed true;}}//...
}从上述源码可以看出WriteTimeoutHandler 在处理超时时引发了 WriteTimeoutException 异常。 WriteTimeoutHandler 的使用示例如下
public class MyChannelInitializer extends ChannelInitializerChannel {Overrideprotected void initChannel(Channel channel) throws Exception {channel.pipeline().addLast(writeTimeoutHandler,new WriteTimeoutHandler(30));channel.pipeline().addLast(myHandler,new MyHandler());}
}//处理器处理ReadTimeoutException
public class MyHandler extends ChannelDuplexHandler {Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if(cause instanceof WriteTimeoutException ){//...}else {super.exceptionCaught(ctx,cause);}}
}在上述示例中WriteTimeoutHandler 设置了写超时时间是 30 秒。
五、实现心跳机制
针对超时的解决方案——心跳机制。 在程序开发中心跳机制是非常常见的。其原理是当连接闲置时可以发送一个心跳来维持连接。一般而言心跳就是一段小的通信。
5.1. 定义心跳处理器
public class HeartbeatServerHandler extends ChannelInboundHandlerAdapter {// 1心跳内容private static final ByteBuf HEARTBEAT_SEQUENCE Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(Heartbeat,CharsetUtil.UTF_8)); Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt)throws Exception {// 2判断超时类型if (evt instanceof IdleStateEvent) {IdleStateEvent event (IdleStateEvent) evt;String type ;if (event.state() IdleState.READER_IDLE) {type read idle;} else if (event.state() IdleState.WRITER_IDLE) {type write idle;} else if (event.state() IdleState.ALL_IDLE) {type all idle;}// 3发送心跳ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);System.out.println( ctx.channel().remoteAddress()超时类型 type);} else {super.userEventTriggered(ctx, evt);}}
}对上述代码说明 定义了心跳时要发送的内容。 判断是不是 IdleStateEvent 事件是则处理。 将心跳内容发送给客户端。
5.2. 定义 ChannelInitializer
HeartbeatHandlerInitializer用于封装各类ChannelHandler代码如下
public class HeartbeatHandlerInitializer extends ChannelInitializerChannel {private static final int READ_IDEL_TIME_OUT 4; // 读超时private static final int WRITE_IDEL_TIME_OUT 5;// 写超时private static final int ALL_IDEL_TIME_OUT 7; // 所有超时Overrideprotected void initChannel(Channel ch) throws Exception {ChannelPipeline pipeline ch.pipeline();pipeline.addLast(new IdleStateHandler(READ_IDEL_TIME_OUT,WRITE_IDEL_TIME_OUT, ALL_IDEL_TIME_OUT, TimeUnit.SECONDS)); // 1pipeline.addLast(new HeartbeatServerHandler()); // 2}
}对上述代码说明如下
添加了一个IdleStateHandler到 ChannelPipeline并分别设置了读、写超时的时间。为了方便演示将超时时间设置的比较短。添加了HeartbeatServerHandler用来处理超时时发送心跳。
5.3. 编写服务器
服务器代码比较简单启动后侦听 8083 端口。
public final class HeartbeatServer {static final int PORT 8083;public static void main(String[] args) throws Exception {// 配置服务器EventLoopGroup bossGroup new NioEventLoopGroup(1);EventLoopGroup workerGroup new NioEventLoopGroup();try {ServerBootstrap b new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new HeartbeatHandlerInitializer());// 启动ChannelFuture f b.bind(PORT).sync();f.channel().closeFuture().sync();} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}5.4. 测试
首先启动 HeartbeatServer客户端用操作系统自带的 Telnet 程序即可
telnet 127.0.0.1 8083可以看到客户端与服务器的交互效果如下图。
结语
文章如果对你有帮助看完记得点赞、关注、收藏。