Netty02--Netty实现心跳

付威     2020-02-01   7257   20min  

心跳机制

心跳机制是常用的一个健康监测的机制,说白了就是每隔一段时间向服务器发送一个心跳的报文,服务收到报文后,就认为当前的客户端在活动的状态,否则会进入异常的机制,比如说主从切换

既然存在一个通信,就一定会有服务端和客户端。服务端开启监听,客户端发起心跳报文,然后服务就再次响应。

系统的设计

  1. 消息的类型

    在服务端和客户端进行通信的时候,需要区分消息的类型,根据消息的类型分别进行不同的处理。

      public enum MessageType {
      SERVICE_REQ((byte) 0),/*业务请求消息*/
      SERVICE_RESP((byte) 1), /*业务应答消息*/
      ONE_WAY((byte) 2), /*无需应答的消息*/
      LOGIN_REQ((byte) 3), /*登录请求消息*/
      LOGIN_RESP((byte) 4), /*登录响应消息*/
      HEARTBEAT_REQ((byte) 5), /*心跳请求消息*/
      HEARTBEAT_RESP((byte) 6);/*心跳应答消息*/
      private byte code;
    
      MessageType(byte code) {
           this.code = code;
      }
         
      public byte getValue() {
           return code;
      }
    
      public static MessageType getMessageType(String typeName){
           for (MessageType mt :MessageType.values()) {
                if(mt.toString().equals(typeName.trim())){
                     return mt;
                }
    
           }
           return null;
      }
      }
    
    
  2. 内容的类型

    在设计这个传输的模型的时候考虑的文件的传输(当然也可以作为消息的类型),所以还需要定义一个内容的类型

      public enum ContentType {
           Default((byte) 0),
           File((byte) 1),
           Other((byte) 2);
           private byte code;
           ContentType(byte code) {
                this.code = code;
           }
              
           public byte getValue() {
                return code;
           }
              
           public static ContentType getContentType(String typeName){
                for (ContentType mt :ContentType.values()) {
                     if(mt.toString().equals(typeName.trim())){
                          return mt;
                     }
                        
                }
                return null;
           }
      }
    
    
  3. 消息头

    消息头包含了消息的认证信息和长度,用来认证信息的合法来源和消息的截取。定义如下:

      public class MessageHead {
           private int headData = DEFAULT_MAGIC_START_CODE;//协议开始标志
           private int length;//包的长度
           private String token;//认证的Token,可以设置时效
           private LocalDateTime createDate;
           private String messageId;
           private MessageType messageType;
           private ContentType  contentType;
      }
    
  4. 自定义传输Encoder和Decoder

    在Netty中几乎所有的业务逻辑在Handler中,Encoder和Decoder是特殊的handler,用于对消息的编码和反编码。类似序列号和反序列号。

    Netty也有很多用于通信的Encoder,比如Kryo等序列号框架。这里我们使用的自定义的编码和解码的方式:

      public class RzEncoder extends MessageToByteEncoder<Message> {
           @Override
           protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
                // TODO Auto-generated method stub
                // 写入开头的标志
                out.writeInt(msg.getHeader().getHeadData());
                // 写入包的的长度
                out.writeInt(msg.getContent().length);
                /**
                * token定长50个字节
                *  第一个参数 原数组
                *  第二个参数 原数组位置
                *  第三个参数 目标数组
                *  第四个参数 目标数组位置
                *  第五个参数 copy多少个长度
                */
                byte[] indexByte = msg.getHeader().getToken().getBytes();
                writeByte(out, indexByte, 50);
                   
                   
                byte[] createTimeByte = msg.getHeader().getCreateDate().toString().getBytes();
                writeByte(out, createTimeByte, 50);
                   
                byte[] idByte = msg.getHeader().getMessageId().getBytes();
                writeByte(out, idByte, 50);
                   
                byte[] msgType = new byte[]{msg.getHeader().getMessageType().getValue()};
                out.writeBytes(msgType);
                byte[] contentType = new byte[]{msg.getHeader().getContentType().getValue()};
                out.writeBytes(contentType);
              
                   
                out.writeBytes(msg.getContent());
                   
           }
              
           private void writeByte(ByteBuf out, byte[] bytes, int length) {
                byte[] writeArr = new byte[length];
                /**
                *
                *  第一个参数 原数组
                *  第二个参数 原数组位置
                *  第三个参数 目标数组
                *  第四个参数 目标数组位置
                *  第五个参数 copy多少个长度
                */
                System.arraycopy(bytes, 0, writeArr, 0, bytes.length > writeArr.length ? writeArr.length : bytes.length);
                out.writeBytes(writeArr);
           }
              
           private void writeByte(ByteBuf out, String content, int length) {
                if (StringUtils.isEmpty(content)) {
                     content = "";
                }
                writeByte(out, content.getBytes(), length);
           }
              
      }
    
      public class RzDecoder extends ByteToMessageDecoder {
           private int BASE_LENGTH = 4 + 4 + 50 + 50 + 50 + 1 +1 ;//协议头 类型 int+length 4个字节+令牌和 令牌生成时间50个字节
           private int headData = DEFAULT_MAGIC_START_CODE;//协议开始标志
              
           @Override
           protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) {
                // 刻度长度必须大于基本长度
                if (buffer.readableBytes() >= BASE_LENGTH) {
                     /**
                     * 粘包 发送频繁 可能多次发送黏在一起 需要考虑  不过一个客户端发送太频繁也可以推断是否是攻击
                     */
                     //防止soket流攻击。客户端传过来的数据太大不合理
                     if (buffer.readableBytes() > 1024*1024*10) {
                          buffer.skipBytes(buffer.readableBytes());
                             
                     }
                }
                int beginIndex;//记录包开始位置
                while (true) {
                     // 获取包头开始的index
                     beginIndex = buffer.readerIndex();
                     //如果读到开始标记位置 结束读取避免拆包和粘包
                     if (buffer.readInt() == headData) {
                          break;
                     }
                        
                     //初始化读的index为0
                     buffer.resetReaderIndex();
                     // 当略过,一个字节之后,
                     //如果当前buffer数据小于基础数据 返回等待下一次读取
                     if (buffer.readableBytes() < BASE_LENGTH) {
                          return;
                     }
                }
                // 消息的长度
                int length = buffer.readInt();
                // 判断请求数据包数据是否到齐
                if ((buffer.readableBytes() - 100) < length) {
                     //没有到期 返回读的指针 等待下一次数据到期再读
                     buffer.readerIndex(beginIndex);
                     return;
                }
                //读取令牌
                byte[] tokenByte = new byte[50];
                buffer.readBytes(tokenByte);
                   
                   
                //读取令牌生成时间
                byte[] createDateByte = new byte[50];
                buffer.readBytes(createDateByte);
                   
                //读取Id
                byte[] messageIdByte = new byte[50];
                buffer.readBytes(messageIdByte);
                   
                byte[] messageTypeByte = new byte[1];
                buffer.readBytes(messageTypeByte);
                byte[] contentTypeByte = new byte[1];
                buffer.readBytes(contentTypeByte);
                ContentType contentType = ContentType.values()[contentTypeByte[0]];
    
                //读取content
                byte[] data = new byte[length];
                buffer.readBytes(data);
                MessageHead head = new MessageHead();
                head.setHeadData(headData);
                head.setToken(new String(tokenByte).trim());
                head.setCreateDate(LocalDateTime.parse(new String(createDateByte).trim()));
                head.setLength(length);
                head.setMessageId(new String(messageIdByte).trim());
                head.setMessageType(MessageType.values()[messageTypeByte[0]]);
                head.setContentType(contentType);
                Message message = new Message(head, data);
                //认证不通过
                if (!message.authorization(message.buidToken())) {
                     ctx.close();
                     return;
                }
                out.add(message);
                buffer.discardReadBytes();//回收已读字节
           }
      }
    
    
  5. 心跳的发送

    心跳的发送就只剩下生成消息和发送了,此处略。。

(本文完)

作者:付威

博客地址:http://blog.laofu.online

如果觉得对您有帮助,可以下方的RSS订阅,谢谢合作

如有任何知识产权、版权问题或理论错误,还请指正。

本文是付威的网络博客原创,自由转载-非商用-非衍生-保持署名,请遵循:创意共享3.0许可证

交流请加群113249828: 点击加群   或发我邮件 laofu_online@163.com

付威

获得最新的博主文章,请关注上方公众号