Netty02--Netty实现心跳

心跳机制

心跳机制是常用的一个健康监测的机制,说白了就是每隔一段时间向服务器发送一个心跳的报文,服务收到报文后,就认为当前的客户端在活动的状态,否则会进入异常的机制,比如说主从切换

既然存在一个通信,就一定会有服务端和客户端。服务端开启监听,客户端发起心跳报文,然后服务就再次响应。

系统的设计

  1. 消息的类型

    在服务端和客户端进行通信的时候,需要区分消息的类型,根据消息的类型分别进行不同的处理。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
         public enum MessageType {
    SERVICE_REQ((byte) 0),/*业务请求消息*/
    SERVICE_RESP((byte) 1), /*业务应答消息*/
    ONE_WAY((byte) 2), /*无需应答的消息*/
    LOGIN_REQ((byte) 3), /*登录请求消息*/
    LOGIN_RESP((byte) 4), /*登录响应消息*/
    HEARTBEAT_REQ((byte) 5), /*心跳请求消息*/
    HEARTBEAT_RESP((byte) 6);/*心跳应答消息*/
    private byte code;

    MessageType(byte code) {
    this.code = code;
    }

    public byte getValue() {
    return code;
    }

    public static MessageType getMessageType(String typeName){
    for (MessageType mt :MessageType.values()) {
    if(mt.toString().equals(typeName.trim())){
    return mt;
    }

    }
    return null;
    }
    }

    ```

    2. 内容的类型

    在设计这个传输的模型的时候考虑的文件的传输(当然也可以作为消息的类型),所以还需要定义一个内容的类型

    ``` java
    public enum ContentType {
    Default((byte) 0),
    File((byte) 1),
    Other((byte) 2);
    private byte code;
    ContentType(byte code) {
    this.code = code;
    }

    public byte getValue() {
    return code;
    }

    public static ContentType getContentType(String typeName){
    for (ContentType mt :ContentType.values()) {
    if(mt.toString().equals(typeName.trim())){
    return mt;
    }

    }
    return null;
    }
    }

  2. 消息头

    消息头包含了消息的认证信息和长度,用来认证信息的合法来源和消息的截取。定义如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public class MessageHead {
    private int headData = DEFAULT_MAGIC_START_CODE;//协议开始标志
    private int length;//包的长度
    private String token;//认证的Token,可以设置时效
    private LocalDateTime createDate;
    private String messageId;
    private MessageType messageType;
    private ContentType contentType;
    }
  1. 自定义传输Encoder和Decoder

    在Netty中几乎所有的业务逻辑在Handler中,Encoder和Decoder是特殊的handler,用于对消息的编码和反编码。类似序列号和反序列号。

    Netty也有很多用于通信的Encoder,比如Kryo等序列号框架。这里我们使用的自定义的编码和解码的方式:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    public class RzEncoder extends MessageToByteEncoder<Message> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
    // TODO Auto-generated method stub
    // 写入开头的标志
    out.writeInt(msg.getHeader().getHeadData());
    // 写入包的的长度
    out.writeInt(msg.getContent().length);
    /**
    * token定长50个字节
    * 第一个参数 原数组
    * 第二个参数 原数组位置
    * 第三个参数 目标数组
    * 第四个参数 目标数组位置
    * 第五个参数 copy多少个长度
    */
    byte[] indexByte = msg.getHeader().getToken().getBytes();
    writeByte(out, indexByte, 50);


    byte[] createTimeByte = msg.getHeader().getCreateDate().toString().getBytes();
    writeByte(out, createTimeByte, 50);

    byte[] idByte = msg.getHeader().getMessageId().getBytes();
    writeByte(out, idByte, 50);

    byte[] msgType = new byte[]{msg.getHeader().getMessageType().getValue()};
    out.writeBytes(msgType);
    byte[] contentType = new byte[]{msg.getHeader().getContentType().getValue()};
    out.writeBytes(contentType);


    out.writeBytes(msg.getContent());

    }

    private void writeByte(ByteBuf out, byte[] bytes, int length) {
    byte[] writeArr = new byte[length];
    /**
    *
    * 第一个参数 原数组
    * 第二个参数 原数组位置
    * 第三个参数 目标数组
    * 第四个参数 目标数组位置
    * 第五个参数 copy多少个长度
    */
    System.arraycopy(bytes, 0, writeArr, 0, bytes.length > writeArr.length ? writeArr.length : bytes.length);
    out.writeBytes(writeArr);
    }

    private void writeByte(ByteBuf out, String content, int length) {
    if (StringUtils.isEmpty(content)) {
    content = "";
    }
    writeByte(out, content.getBytes(), length);
    }

    }
    ```

    ``` java
    public class RzDecoder extends ByteToMessageDecoder {
    private int BASE_LENGTH = 4 + 4 + 50 + 50 + 50 + 1 +1 ;//协议头 类型 int+length 4个字节+令牌和 令牌生成时间50个字节
    private int headData = DEFAULT_MAGIC_START_CODE;//协议开始标志

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) {
    // 刻度长度必须大于基本长度
    if (buffer.readableBytes() >= BASE_LENGTH) {
    /**
    * 粘包 发送频繁 可能多次发送黏在一起 需要考虑 不过一个客户端发送太频繁也可以推断是否是攻击
    */
    //防止soket流攻击。客户端传过来的数据太大不合理
    if (buffer.readableBytes() > 1024*1024*10) {
    buffer.skipBytes(buffer.readableBytes());

    }
    }
    int beginIndex;//记录包开始位置
    while (true) {
    // 获取包头开始的index
    beginIndex = buffer.readerIndex();
    //如果读到开始标记位置 结束读取避免拆包和粘包
    if (buffer.readInt() == headData) {
    break;
    }

    //初始化读的index为0
    buffer.resetReaderIndex();
    // 当略过,一个字节之后,
    //如果当前buffer数据小于基础数据 返回等待下一次读取
    if (buffer.readableBytes() < BASE_LENGTH) {
    return;
    }
    }
    // 消息的长度
    int length = buffer.readInt();
    // 判断请求数据包数据是否到齐
    if ((buffer.readableBytes() - 100) < length) {
    //没有到期 返回读的指针 等待下一次数据到期再读
    buffer.readerIndex(beginIndex);
    return;
    }
    //读取令牌
    byte[] tokenByte = new byte[50];
    buffer.readBytes(tokenByte);


    //读取令牌生成时间
    byte[] createDateByte = new byte[50];
    buffer.readBytes(createDateByte);

    //读取Id
    byte[] messageIdByte = new byte[50];
    buffer.readBytes(messageIdByte);

    byte[] messageTypeByte = new byte[1];
    buffer.readBytes(messageTypeByte);
    byte[] contentTypeByte = new byte[1];
    buffer.readBytes(contentTypeByte);
    ContentType contentType = ContentType.values()[contentTypeByte[0]];

    //读取content
    byte[] data = new byte[length];
    buffer.readBytes(data);
    MessageHead head = new MessageHead();
    head.setHeadData(headData);
    head.setToken(new String(tokenByte).trim());
    head.setCreateDate(LocalDateTime.parse(new String(createDateByte).trim()));
    head.setLength(length);
    head.setMessageId(new String(messageIdByte).trim());
    head.setMessageType(MessageType.values()[messageTypeByte[0]]);
    head.setContentType(contentType);
    Message message = new Message(head, data);
    //认证不通过
    if (!message.authorization(message.buidToken())) {
    ctx.close();
    return;
    }
    out.add(message);
    buffer.discardReadBytes();//回收已读字节
    }
    }

  2. 心跳的发送

    心跳的发送就只剩下生成消息和发送了,此处略。。

作者

付威

发布于

2020-02-01

更新于

2020-08-10

许可协议

评论