Netty02--Netty实现心跳
心跳机制
心跳机制是常用的一个健康监测的机制,说白了就是每隔一段时间向服务器发送一个心跳的报文,服务收到报文后,就认为当前的客户端在活动的状态,否则会进入异常的机制,比如说主从切换
。
既然存在一个通信,就一定会有服务端和客户端。服务端开启监听,客户端发起心跳报文,然后服务就再次响应。
系统的设计
消息的类型
在服务端和客户端进行通信的时候,需要区分消息的类型,根据消息的类型分别进行不同的处理。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29public enum MessageType {
SERVICE_REQ((byte) 0),/*业务请求消息*/
SERVICE_RESP((byte) 1), /*业务应答消息*/
ONE_WAY((byte) 2), /*无需应答的消息*/
LOGIN_REQ((byte) 3), /*登录请求消息*/
LOGIN_RESP((byte) 4), /*登录响应消息*/
HEARTBEAT_REQ((byte) 5), /*心跳请求消息*/
HEARTBEAT_RESP((byte) 6);/*心跳应答消息*/
private byte code;
MessageType(byte code) {
this.code = code;
}
public byte getValue() {
return code;
}
public static MessageType getMessageType(String typeName){
for (MessageType mt :MessageType.values()) {
if(mt.toString().equals(typeName.trim())){
return mt;
}
}
return null;
}
}内容的类型
在设计这个传输的模型的时候考虑的文件的传输(当然也可以作为消息的类型),所以还需要定义一个内容的类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24public enum ContentType {
Default((byte) 0),
File((byte) 1),
Other((byte) 2);
private byte code;
ContentType(byte code) {
this.code = code;
}
public byte getValue() {
return code;
}
public static ContentType getContentType(String typeName){
for (ContentType mt :ContentType.values()) {
if(mt.toString().equals(typeName.trim())){
return mt;
}
}
return null;
}
}消息头
消息头包含了消息的认证信息和长度,用来认证信息的合法来源和消息的截取。定义如下:
1
2
3
4
5
6
7
8
9public class MessageHead {
private int headData = DEFAULT_MAGIC_START_CODE;//协议开始标志
private int length;//包的长度
private String token;//认证的Token,可以设置时效
private LocalDateTime createDate;
private String messageId;
private MessageType messageType;
private ContentType contentType;
}
自定义传输Encoder和Decoder
在Netty中几乎所有的业务逻辑在Handler中,Encoder和Decoder是特殊的handler,用于对消息的编码和反编码。类似序列号和反序列号。
Netty也有很多用于通信的Encoder,比如Kryo等序列号框架。这里我们使用的自定义的编码和解码的方式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58public class RzEncoder extends MessageToByteEncoder<Message> {
protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
// TODO Auto-generated method stub
// 写入开头的标志
out.writeInt(msg.getHeader().getHeadData());
// 写入包的的长度
out.writeInt(msg.getContent().length);
/**
* token定长50个字节
* 第一个参数 原数组
* 第二个参数 原数组位置
* 第三个参数 目标数组
* 第四个参数 目标数组位置
* 第五个参数 copy多少个长度
*/
byte[] indexByte = msg.getHeader().getToken().getBytes();
writeByte(out, indexByte, 50);
byte[] createTimeByte = msg.getHeader().getCreateDate().toString().getBytes();
writeByte(out, createTimeByte, 50);
byte[] idByte = msg.getHeader().getMessageId().getBytes();
writeByte(out, idByte, 50);
byte[] msgType = new byte[]{msg.getHeader().getMessageType().getValue()};
out.writeBytes(msgType);
byte[] contentType = new byte[]{msg.getHeader().getContentType().getValue()};
out.writeBytes(contentType);
out.writeBytes(msg.getContent());
}
private void writeByte(ByteBuf out, byte[] bytes, int length) {
byte[] writeArr = new byte[length];
/**
*
* 第一个参数 原数组
* 第二个参数 原数组位置
* 第三个参数 目标数组
* 第四个参数 目标数组位置
* 第五个参数 copy多少个长度
*/
System.arraycopy(bytes, 0, writeArr, 0, bytes.length > writeArr.length ? writeArr.length : bytes.length);
out.writeBytes(writeArr);
}
private void writeByte(ByteBuf out, String content, int length) {
if (StringUtils.isEmpty(content)) {
content = "";
}
writeByte(out, content.getBytes(), length);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83public class RzDecoder extends ByteToMessageDecoder {
private int BASE_LENGTH = 4 + 4 + 50 + 50 + 50 + 1 +1 ;//协议头 类型 int+length 4个字节+令牌和 令牌生成时间50个字节
private int headData = DEFAULT_MAGIC_START_CODE;//协议开始标志
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) {
// 刻度长度必须大于基本长度
if (buffer.readableBytes() >= BASE_LENGTH) {
/**
* 粘包 发送频繁 可能多次发送黏在一起 需要考虑 不过一个客户端发送太频繁也可以推断是否是攻击
*/
//防止soket流攻击。客户端传过来的数据太大不合理
if (buffer.readableBytes() > 1024*1024*10) {
buffer.skipBytes(buffer.readableBytes());
}
}
int beginIndex;//记录包开始位置
while (true) {
// 获取包头开始的index
beginIndex = buffer.readerIndex();
//如果读到开始标记位置 结束读取避免拆包和粘包
if (buffer.readInt() == headData) {
break;
}
//初始化读的index为0
buffer.resetReaderIndex();
// 当略过,一个字节之后,
//如果当前buffer数据小于基础数据 返回等待下一次读取
if (buffer.readableBytes() < BASE_LENGTH) {
return;
}
}
// 消息的长度
int length = buffer.readInt();
// 判断请求数据包数据是否到齐
if ((buffer.readableBytes() - 100) < length) {
//没有到期 返回读的指针 等待下一次数据到期再读
buffer.readerIndex(beginIndex);
return;
}
//读取令牌
byte[] tokenByte = new byte[50];
buffer.readBytes(tokenByte);
//读取令牌生成时间
byte[] createDateByte = new byte[50];
buffer.readBytes(createDateByte);
//读取Id
byte[] messageIdByte = new byte[50];
buffer.readBytes(messageIdByte);
byte[] messageTypeByte = new byte[1];
buffer.readBytes(messageTypeByte);
byte[] contentTypeByte = new byte[1];
buffer.readBytes(contentTypeByte);
ContentType contentType = ContentType.values()[contentTypeByte[0]];
//读取content
byte[] data = new byte[length];
buffer.readBytes(data);
MessageHead head = new MessageHead();
head.setHeadData(headData);
head.setToken(new String(tokenByte).trim());
head.setCreateDate(LocalDateTime.parse(new String(createDateByte).trim()));
head.setLength(length);
head.setMessageId(new String(messageIdByte).trim());
head.setMessageType(MessageType.values()[messageTypeByte[0]]);
head.setContentType(contentType);
Message message = new Message(head, data);
//认证不通过
if (!message.authorization(message.buidToken())) {
ctx.close();
return;
}
out.add(message);
buffer.discardReadBytes();//回收已读字节
}
}心跳的发送
心跳的发送就只剩下生成消息和发送了,此处略。。