0%

几个概念

broker:集群中的每一台服务器,称为Broker

topic或者subject:队列名

partition:一个队列中的消息可以存储到多台broker上面,一个broker中的分区,称为partition

kafka的架构

消息队列的流程从生产者生产消息,通过TCP协议的网络请求发送到Kafka集群,然后Consumers来进行消费,具体如下图:

Kafka架构

其中生产者和消费者都相当于客户端。

Topic/Log/Partition

阅读全文 »

kafka的文件系统

我们知道Kafka是一个Topic下有多个partition,具体结构如下:
Kafka架构

下面我们就探究下Kafka的partition的组成到底是什么。

partition的目录

假定我们在指定的一个集群中有两个Broker,有2个topic(testTopic,testTopic1),每个topic的都有2个partition,在不同的partition中互为对方的Leader。

Kafka架构

则会产生的文件目录应该为,partition的目录为:

 |---testTopic-0
 |---testTopic1-0
 |---testTopic-1
 |---testTopic1-1

从上面的分析中可以看出,kafka在文件的存储中,同一个topic下面有多个不同的partition,每一个partition对应为一个文件夹,partition的命名规则为topic+有序的序号。

阅读全文 »

Guava EventBus

EventBus 是Guava的一个发布订阅的模型,先看一个简单的实现:

  1. 定义一个Event的消息传递对象

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18

    public class TestEvent1 {
    private final int message;

    /**
    * 构造方法
    * @param message
    */
    public TestEvent1(int message) {
    this.message = message;
    // System.out.println("TestEvent1 事件message:"+message);
    }

    public int getMessage() {
    return message;
    }
    }

  1. 定义一个Listener

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    public class EventListener {
    public int lastMessage = 0;

    @Subscribe
    public void listen(TestEvent1 event) {
    lastMessage = event.getMessage();
    System.out.println("Message:"+lastMessage);
    }

    public int getLastMessage() {
    return lastMessage;
    }
    }

  2. 定义一个使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15

    public static void main(String[] args) {
    // 1.构造一个事件总线
    EventBus eventBus = new EventBus("test");

    // 2.构造一个事件监听器
    EventListener listener = new EventListener();

    // 3.把事件监听器注册到事件总线上
    eventBus.register(listener);

    // 4.事件总线发布事件,触发监听器方法
    eventBus.post(new TestEvent1(1));

    }

运行结果是:

1
Message:1

如果是多个方法需要调用,只需要在新方法上面加上@Subscribe注解。

1
2
3
4
5
@Subscribe         
public void listen2(TestEvent1 event) {
lastMessage = event.getMessage();
System.out.println("Message2:"+lastMessage);
}

如果想使用异步的方法处理,可以使用AsyncEventBus ,具体代码:

1
2
3

AsyncEventBus eventBus = new AsyncEventBus (Executors.newFixedThreadPool(3));

阅读全文 »

观察者模式

观察者模式是一个消息的派发的模式,是把被观察者的状态能够及时的通知给观察者。

比如一个超市的打折了,需要把消息通知给每一个超市的顾客,这样就可以把超市作为一个被观察者,而顾客是观察者。

实现逻辑

观察者模式实现的类图如下:

观察者模式

实现步骤:

  1. 定义一个Observer接口,约束观察者(顾客)需要实现的方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21

    public interface Observer {
    void reciveMessage(double price);
    }

    public class CusmtomerObserver1 implements Observer {
    @Override
    public void reciveMessage(double price) {

    System.out.println("CusmtomerObserver1 收到了价格消息:" + price);
    }
    }

    public class CusmtomerObserver2 implements Observer {
    @Override
    public void reciveMessage(double price) {

    System.out.println("CusmtomerObserver2 收到了价格消息:" + price);
    }
    }

  2. 实现被观察对象的方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    public class MarketSubject {
    private double price;
    public double getPrice() {
    return price;
    }

    public void setPrice(double price) {
    this.price = price;
    notifyObserver(price);
    }

    private ArrayList<Observer> observerList=new ArrayList<>();
    public void addObserver(Observer observer){
    observerList.add(observer);
    }

    public void removeObserver(Observer observer){
    observerList.remove(observer);
    }

    public void notifyObserver(double price){
    for (Observer ob : observerList) {
    ob.reciveMessage(price);
    }
    }
    }

  3. 实现方法的注册和通知

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public class ObserverMain {
    public static void main(String[] args) {
    MarketSubject marketSubject=new MarketSubject();
    marketSubject.addObserver(new CusmtomerObserver1());
    marketSubject.addObserver(new CusmtomerObserver2());
    marketSubject.setPrice(100d);

    }
    }

    运行结果:

1
2
CusmtomerObserver1 收到了价格消息:100.0
CusmtomerObserver2 收到了价格消息:100.0
阅读全文 »

命令模式

命令模式是把对象的操作方法分成一个命令,分别去执行。在分布式环境中,熔断和降级组件使用的设计模式就是命令模式。

为了了解什么是设计模式,可以类比下设计一个万能遥控器的设置,遥控器只负责一个方法的调用,真正的方法实现都在对应的电器上面。

使用的时候,只需要对对应的命令和实体进行注册下就可以了。具体的设计类图如下:

IO

具体实现代码分下面几个步骤:

  1. 定义实体方法的约束,也就是当前类实体有哪些方法,比如控制灯和电视,都有开和关的方法

    1
    2
    3
    4
    5
    6
    7
    public interface CommandObj {

    void on() ;

    void off();
    }

  2. 定义对应的类实体具体的实现方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    public class Light implements CommandObj {
    public void on(){
    System.out.println("打开电灯。。。");
    }

    public void off(){
    System.out.println("关闭电灯。。。");
    }
    }

    public class TV implements CommandObj{
    public void on(){
    System.out.println("打开电视。。。");
    }

    public void off(){
    System.out.println("关闭电视。。。");
    }

    }

  3. 定义一个命令执行的约束,来约束所有的Command的执行者需要实现的方法

    此处需要注意,在是实现了同一个CommandObj接口的实体,不需要Command的约束,为了Demo的完整性,把Commad接口加上。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    public interface Command {
    void execute();
    }

    public class WrapperOffCommand implements Command {
    CommandObj commandObj;

    public WrapperOffCommand(CommandObj commandObj){
    this.commandObj = commandObj;
    }

    @Override
    public void execute() {
    commandObj.off();
    }
    }

    public class WrapperOnCommand implements Command {
    CommandObj commandObj;

    public WrapperOnCommand(CommandObj commandObj){
    this.commandObj = commandObj;
    }

    @Override
    public void execute() {
    commandObj.on();
    }
    }

  4. 实现Controller的方法,即控制器本身

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public class RemoteController {
    ConcurrentHashMap<String,Command> onCommands;
    ConcurrentHashMap<String,Command> offCommands;

    public RemoteController(){
    this.onCommands = new ConcurrentHashMap<>();
    this.offCommands = new ConcurrentHashMap<>();
    }

    public void registerCommand(String key, Command onCommand, Command offCommand){
    onCommands.put(key,onCommand);
    offCommands.put(key,offCommand);
    }

    // 按下开按钮
    public void onButtonPressed(String key){
    onCommands.get(key).execute();
    }

    // 按下关按钮
    public void offButtonPressed(String key){
    offCommands.get(key).execute();
    }
    }

  5. 控制器的使用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
       public static void main(String[] args) {
    CommandObj light = new Light();
    CommandObj tv = new TV();

    Command lightOn = new WrapperOnCommand(light);
    Command lightOff = new WrapperOffCommand(light);
    Command TVOn = new WrapperOnCommand(tv);
    Command TVOff = new WrapperOffCommand(tv);


    RemoteController remoteController = new RemoteController();
    //注册对应的命令
    remoteController.registerCommand(RemoteTypeEnum.Light.toString(), lightOn, lightOff);
    //注册对应的命令
    remoteController.registerCommand(RemoteTypeEnum.TV.toString(), TVOn, TVOff);


    remoteController.onButtonPressed(RemoteTypeEnum.Light.toString());
    remoteController.offButtonPressed(RemoteTypeEnum.Light.toString());
    remoteController.onButtonPressed(RemoteTypeEnum.TV.toString());
    remoteController.offButtonPressed(RemoteTypeEnum.TV.toString());
    }

阅读全文 »

什么是零拷贝

在操作系统中,从内核的形态区分,可以分为内核态(Kernel Space)和用户态(User Space)。

在传统的IO中,如果把数据通过网络发送到指定端的时候,数据需要经历下面的几个过程:

IO

  1. 当调用系统函数的时候,CPU执行一系列准备工作,然后把请求发送给DMA处理(DMA可以理解为专门处理IO的组件),DMA将硬盘数据通过总线传输到内存中。
  2. 当程序需要读取内存的时候,这个时候会执行CPU Copy,内存会有内核态写入用户的缓存区。
  3. 系统调用write()方法时,数据从用户态缓冲区写入到网络缓冲区(Socket Buffer), 由用户态编程内核态。
  4. 最后由DMA写入网卡驱动中,传输到网卡的驱动。

可以看到,传统的IO的读写,数据会经历4次内存的拷贝,这种拷贝拷贝会带来资源的浪费和效率的底下。


如何实现零拷贝


阅读全文 »

顺序IO和随机IO


对于磁盘的读写分为两种模式,顺序IO和随机IO。 随机IO存在一个寻址的过程,所以效率比较低。而顺序IO,相当于有一个物理索引,在读取的时候不需要寻找地址,效率很高。

网上盗了一个图(侵权删)
IO


Java中的随机读写


在Java中读写文件的方式有很多种,先总结以下3种方法:

  1. FileWriter和FileReader

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    public static void fileWrite(String filePath, String content) {
    File file = new File(filePath);
    //创建FileWriter对象
    FileWriter writer = null;
    try {
    //如果文件不存在,创建文件
    if (!file.exists())
    file.createNewFile();
    writer = new FileWriter(file);
    writer.write(content);//写入内容
    writer.flush();
    writer.close();
    } catch (IOException e) {
    e.printStackTrace();
    }
    }

    public static void fileRead(String filePath) {
    File file = new File(filePath);
    if (file.exists()) {
    try {
    //创建FileReader对象,读取文件中的内容
    FileReader reader = new FileReader(file);
    char[] ch = new char[1];
    while (reader.read(ch) != -1) {
    System.out.print(ch);
    }
    reader.close();
    } catch (IOException ex) {
    ex.printStackTrace();
    }

    }
    }
阅读全文 »

心跳机制

心跳机制是常用的一个健康监测的机制,说白了就是每隔一段时间向服务器发送一个心跳的报文,服务收到报文后,就认为当前的客户端在活动的状态,否则会进入异常的机制,比如说主从切换

既然存在一个通信,就一定会有服务端和客户端。服务端开启监听,客户端发起心跳报文,然后服务就再次响应。

系统的设计

  1. 消息的类型

    在服务端和客户端进行通信的时候,需要区分消息的类型,根据消息的类型分别进行不同的处理。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    public enum MessageType {
    SERVICE_REQ((byte) 0),/*业务请求消息*/
    SERVICE_RESP((byte) 1), /*业务应答消息*/
    ONE_WAY((byte) 2), /*无需应答的消息*/
    LOGIN_REQ((byte) 3), /*登录请求消息*/
    LOGIN_RESP((byte) 4), /*登录响应消息*/
    HEARTBEAT_REQ((byte) 5), /*心跳请求消息*/
    HEARTBEAT_RESP((byte) 6);/*心跳应答消息*/
    private byte code;

    MessageType(byte code) {
    this.code = code;
    }

    public byte getValue() {
    return code;
    }

    public static MessageType getMessageType(String typeName){
    for (MessageType mt :MessageType.values()) {
    if(mt.toString().equals(typeName.trim())){
    return mt;
    }

    }
    return null;
    }
    }

  2. 内容的类型

    在设计这个传输的模型的时候考虑的文件的传输(当然也可以作为消息的类型),所以还需要定义一个内容的类型

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    public enum ContentType {
    Default((byte) 0),
    File((byte) 1),
    Other((byte) 2);
    private byte code;
    ContentType(byte code) {
    this.code = code;
    }

    public byte getValue() {
    return code;
    }

    public static ContentType getContentType(String typeName){
    for (ContentType mt :ContentType.values()) {
    if(mt.toString().equals(typeName.trim())){
    return mt;
    }

    }
    return null;
    }
    }

  3. 消息头

    消息头包含了消息的认证信息和长度,用来认证信息的合法来源和消息的截取。定义如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public class MessageHead {
    private int headData = DEFAULT_MAGIC_START_CODE;//协议开始标志
    private int length;//包的长度
    private String token;//认证的Token,可以设置时效
    private LocalDateTime createDate;
    private String messageId;
    private MessageType messageType;
    private ContentType contentType;
    }
  1. 自定义传输Encoder和Decoder

    在Netty中几乎所有的业务逻辑在Handler中,Encoder和Decoder是特殊的handler,用于对消息的编码和反编码。类似序列号和反序列号。

    Netty也有很多用于通信的Encoder,比如Kryo等序列号框架。这里我们使用的自定义的编码和解码的方式:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    public class RzEncoder extends MessageToByteEncoder<Message> {
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
    // TODO Auto-generated method stub
    // 写入开头的标志
    out.writeInt(msg.getHeader().getHeadData());
    // 写入包的的长度
    out.writeInt(msg.getContent().length);
    /**
    * token定长50个字节
    * 第一个参数 原数组
    * 第二个参数 原数组位置
    * 第三个参数 目标数组
    * 第四个参数 目标数组位置
    * 第五个参数 copy多少个长度
    */
    byte[] indexByte = msg.getHeader().getToken().getBytes();
    writeByte(out, indexByte, 50);


    byte[] createTimeByte = msg.getHeader().getCreateDate().toString().getBytes();
    writeByte(out, createTimeByte, 50);

    byte[] idByte = msg.getHeader().getMessageId().getBytes();
    writeByte(out, idByte, 50);

    byte[] msgType = new byte[]{msg.getHeader().getMessageType().getValue()};
    out.writeBytes(msgType);
    byte[] contentType = new byte[]{msg.getHeader().getContentType().getValue()};
    out.writeBytes(contentType);


    out.writeBytes(msg.getContent());

    }

    private void writeByte(ByteBuf out, byte[] bytes, int length) {
    byte[] writeArr = new byte[length];
    /**
    *
    * 第一个参数 原数组
    * 第二个参数 原数组位置
    * 第三个参数 目标数组
    * 第四个参数 目标数组位置
    * 第五个参数 copy多少个长度
    */
    System.arraycopy(bytes, 0, writeArr, 0, bytes.length > writeArr.length ? writeArr.length : bytes.length);
    out.writeBytes(writeArr);
    }

    private void writeByte(ByteBuf out, String content, int length) {
    if (StringUtils.isEmpty(content)) {
    content = "";
    }
    writeByte(out, content.getBytes(), length);
    }

    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    public class RzDecoder extends ByteToMessageDecoder {
    private int BASE_LENGTH = 4 + 4 + 50 + 50 + 50 + 1 +1 ;//协议头 类型 int+length 4个字节+令牌和 令牌生成时间50个字节
    private int headData = DEFAULT_MAGIC_START_CODE;//协议开始标志

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) {
    // 刻度长度必须大于基本长度
    if (buffer.readableBytes() >= BASE_LENGTH) {
    /**
    * 粘包 发送频繁 可能多次发送黏在一起 需要考虑 不过一个客户端发送太频繁也可以推断是否是攻击
    */
    //防止soket流攻击。客户端传过来的数据太大不合理
    if (buffer.readableBytes() > 1024*1024*10) {
    buffer.skipBytes(buffer.readableBytes());

    }
    }
    int beginIndex;//记录包开始位置
    while (true) {
    // 获取包头开始的index
    beginIndex = buffer.readerIndex();
    //如果读到开始标记位置 结束读取避免拆包和粘包
    if (buffer.readInt() == headData) {
    break;
    }

    //初始化读的index为0
    buffer.resetReaderIndex();
    // 当略过,一个字节之后,
    //如果当前buffer数据小于基础数据 返回等待下一次读取
    if (buffer.readableBytes() < BASE_LENGTH) {
    return;
    }
    }
    // 消息的长度
    int length = buffer.readInt();
    // 判断请求数据包数据是否到齐
    if ((buffer.readableBytes() - 100) < length) {
    //没有到期 返回读的指针 等待下一次数据到期再读
    buffer.readerIndex(beginIndex);
    return;
    }
    //读取令牌
    byte[] tokenByte = new byte[50];
    buffer.readBytes(tokenByte);


    //读取令牌生成时间
    byte[] createDateByte = new byte[50];
    buffer.readBytes(createDateByte);

    //读取Id
    byte[] messageIdByte = new byte[50];
    buffer.readBytes(messageIdByte);

    byte[] messageTypeByte = new byte[1];
    buffer.readBytes(messageTypeByte);
    byte[] contentTypeByte = new byte[1];
    buffer.readBytes(contentTypeByte);
    ContentType contentType = ContentType.values()[contentTypeByte[0]];

    //读取content
    byte[] data = new byte[length];
    buffer.readBytes(data);
    MessageHead head = new MessageHead();
    head.setHeadData(headData);
    head.setToken(new String(tokenByte).trim());
    head.setCreateDate(LocalDateTime.parse(new String(createDateByte).trim()));
    head.setLength(length);
    head.setMessageId(new String(messageIdByte).trim());
    head.setMessageType(MessageType.values()[messageTypeByte[0]]);
    head.setContentType(contentType);
    Message message = new Message(head, data);
    //认证不通过
    if (!message.authorization(message.buidToken())) {
    ctx.close();
    return;
    }
    out.add(message);
    buffer.discardReadBytes();//回收已读字节
    }
    }

  2. 心跳的发送

    心跳的发送就只剩下生成消息和发送了,此处略。。

阅读全文 »

IO本质上是对数据缓冲区的读写,主要分为文件IO和网络IO,基本模型有很多,可以从两个方面去认识 同步和异步,阻塞和非阻塞。根据上面分类可以分为下面五类:

  1. 阻塞I/O(blocking I/O)
  2. 非阻塞I/O (nonblocking I/O)
  3. I/O复用(select 、poll和epoll) (I/O multiplexing)
  4. 信号驱动I/O (signal driven I/O (SIGIO))
  5. 异步I/O (asynchronous I/O )

阻塞IO

阻塞IO也是常说的BIO,是一个单线程的阻塞模型,在执行数据拷贝的时候,会阻塞主进程,直到数据拷贝完成。具体模式如下图:
BIO

从图中可以看出,这种模型的比较简单,及时性比较高,但是会阻塞用户进程,在使用的时候常常结合多线程或者多进程来使用。

如果在连接数比较多的情况下,多线程只能缓解,无法彻底解决。总之,多线程可以方便高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈。

非阻塞IO

由于BIO的执行效率和阻塞的问题,单机无法承载过多的数据处理。对于用户来说来说,其实不用等待数据的准备过程,只需要返回数据有没有准备好就行。

NIO

阅读全文 »

在多路复用的IO的模型中,存在三种机制,分别是selectpollepoll.为了便于理解,可以使用简单的伪代码来表示一个原始的IO的读写:

1
2
3
4
5
6
7
8
9
10
while(true)  
{
for(Stream i: streamArr)
{
if(i.isNotReady()){
continue;
}
doSomething();
}
}

select

时间复杂度O(n),它仅仅知道了,有I/O事件发生了,却并不知道是哪那几个流(可能有一个,多个,甚至全部),我们只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们进行操作。所以select具有O(n)的无差别轮询复杂度,同时处理的流越多,无差别轮询时间就越长。 具体的伪代码如下:

1
2
3
4
5
6
7
8
9
10
11
while(true)  
{
getSelectReadyStream();//此处是同步方法,如果有准备好的数据才会向下走。
for(Stream i: streamArr)
{
if(i.isNotReady()){
continue;
}
doSomething();
}
}

select的缺点:

(1)每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大

(2)同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大

(3)select支持的文件描述符数量太小了,默认是1024

阅读全文 »