建一个网站需要多少时间表,产品定制网站开发,珠海网站建设网络公司,网页设计行业上次通信的时候用的是自带的编解码器#xff0c;今天自己实现一下自定义的。 1、自定义一下协议
//协议类
Data
public class ProtocolT implements Serializable {private Long id System.currentTimeMillis();private short msgType;// 假设1为请求 2为响应privat…上次通信的时候用的是自带的编解码器今天自己实现一下自定义的。 1、自定义一下协议
//协议类
Data
public class ProtocolT implements Serializable {private Long id System.currentTimeMillis();private short msgType;// 假设1为请求 2为响应private T body;}//消息请求体
Data
public class RequestMsg implements Serializable {private String msg;private String other;}//消息响应体
Data
public class ResponseMsg implements Serializable {private String result;private String error;}2、定义编解码器import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;//编码器
public class EnCodeMsg extends MessageToByteEncoderProtocolObject {Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, ProtocolObject msg, ByteBuf byteBuf) throws Exception {Serialization serialization new JdkSerialization();byte[] body serialization.serialize(msg.getBody());int length body.length;Long id msg.getId();short msgType msg.getMsgType();byteBuf.writeLong(id);byteBuf.writeShort(msgType);byteBuf.writeInt(length);byteBuf.writeBytes(body);}
}import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;import java.util.List;//解码器
public class DeCodeMsg extends ByteToMessageDecoder {Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, ListObject list) throws Exception {Serialization serialization new JdkSerialization();long id in.readLong();short msgType in.readShort();int bodyLength in.readInt();int i in.readableBytes();if(bodyLength!i){in.resetReaderIndex();return;}byte[] bytes new byte[bodyLength];in.readBytes(bytes);if(msgType(short)1){ProtocolRequestMsg requestMsgProtocol new Protocol();RequestMsg requestMsg serialization.deserialize(bytes, RequestMsg.class);requestMsgProtocol.setBody(requestMsg);requestMsgProtocol.setId(id);requestMsgProtocol.setMsgType(msgType);list.add(requestMsgProtocol);}else if(msgType(short)2){ProtocolResponseMsg responseMsgProtocol new Protocol();ResponseMsg responseMsg serialization.deserialize(bytes,ResponseMsg.class);responseMsgProtocol.setId(id);responseMsgProtocol.setMsgType(msgType);responseMsgProtocol.setBody(responseMsg);list.add(responseMsgProtocol);}else {return;}}
}
3、修改消息处理器 public class NettyClientHandler extends SimpleChannelInboundHandlerProtocolResponseMsg {private static final Logger logger LoggerFactory.getLogger(NettyClientHandler.class);private volatile Channel channel;private SocketAddress remotePeer;public Channel getChannel() {return channel;}public SocketAddress getRemotePeer() {return remotePeer;}/*** 注册* param ctx* throws Exception*/Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {logger.info(channelRegistered--------------);super.channelRegistered(ctx);this.channel ctx.channel();}/*** 激活* param ctx* throws Exception*/Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {super.channelActive(ctx);this.remotePeer this.channel.remoteAddress();logger.info(channelActive--------------);}Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext,ProtocolResponseMsg o) throws Exception {logger.info(channelRead0--------------Thread.currentThread().getName());logger.info(消费者接收到的消息为{}, JSONObject.toJSONString(o));}public void sendMsg(ProtocolRequestMsg message){channel.writeAndFlush(message);}public void close(){channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);}}public class NettyServerHandler extends SimpleChannelInboundHandlerProtocolRequestMsg {private static final Logger logger LoggerFactory.getLogger(NettyServerHandler.class);Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, ProtocolRequestMsg o) throws Exception {logger.info(服务端收到的消息为{}, JSONObject.toJSONString(o));ProtocolResponseMsg protocol new Protocol();ResponseMsg responseMsg new ResponseMsg();responseMsg.setResult(SUCCESS);responseMsg.setError(NO ERROR);protocol.setBody(responseMsg);protocol.setMsgType((short) 2);protocol.setId(o.getId());channelHandlerContext.channel().writeAndFlush(protocol);}
}
4、测试
public class NettyTest {public static void main(String[] args) {new Thread(()-{NettyServer.startNettyServer();}).start();new Thread(()-{NettyClient instance NettyClient.getInstance();try {while (true){Thread.sleep(2000);ProtocolRequestMsg protocol new Protocol();protocol.setMsgType((short)1);RequestMsg requestMsg new RequestMsg();requestMsg.setMsg(hello:System.currentTimeMillis());requestMsg.setOther(你好啊);protocol.setBody(requestMsg);instance.sendMsg(protocol);}} catch (Exception e) {e.printStackTrace();}}).start();}
}5、效果截图