0%

Topic 或者Subject

每一个生产者都需要向队列中生产消息,不同的生产者生产消息需要有所区别,供对应的消费者消费消息,这个是队列名称之为 Topic或者Subject

MessageLog,ConsumerLog

队列的消息需要一个存储的介质,Kafka的对应的存储为文件存储,生产者生产的消息存储在MessageLog, 然后根据不同的消费和路由规则路由,投递到对应的服务器上面,产生对应的ConsumerLog.

Partition

当投递的消息比较多的时候,就需要对ConsumerLog进行分片,分到不同的服务器上面,这个分片称之为partition,对于Kafka来说,一个Consumer一般和Partition成倍数关系,一个Consumer可以消费一个或者多个Partition.

Broken

Broken可以理解为消费者的服务器。

顺序IO

阅读全文 »

在程序设计中,需要保证一个只有一个对象实例,就是所谓的单例模式。在java中,有很多单例模式的实现,这篇博客是对这几种单例模式的优缺点进行分析和优化:

饿汉模式


所谓饿汗模式,就是优先创建对象,对象在类加载的时候就已经创建好了,具体的代码如下:

1
2
3
4
5
private static Singleton01 instance = new Singleton01();

public static Singleton01 getInstance() {
return instance;
}

饿汗模式优点是代码简单,由于是在类加载的时候就已经创建好了对象,所以不存在线程安全的问题。

缺点是:有的类在没有使用的时候就已经创建了对象,产生了多余的垃圾对象。


懒汉模式

阅读全文 »

对于Guava的认识来自于阅读QMQ源码的时候,有很多没有使用过的Java方法和集合,所以就查了查,发现是用的是Guava的类库,所以就有了下面的博客:

1. 字符串的拼接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 
private final List<String> stringList = Arrays.asList("Google", "Guava", "Java", "Scala", "Kafka");
private final List<String> stringListWithNullValue = Arrays.asList("Google", "Guava", "Java", "Scala", null);


//1. 字符串的拼接
@Test
public void Test() {
String join = Joiner.on("*").join(stringList);
System.out.println(join);
assertEquals(join, "Google*Guava*Java*Scala*Kafka");
}

@Test(expected = NullPointerException.class)
public void TestJoinValue() {
String join = Joiner.on("*").join(stringListWithNullValue);
System.out.println(join);
assertEquals(join, "Google*Guava*Java*Scala*Kafka");
}

2. 跳过空数据

1
2
3
4
5
6
7
//2. 跳过空数据
@Test
public void TestJoinSkipValue() {
String join = Joiner.on("*").skipNulls().join(stringListWithNullValue);
System.out.println(join);
assertEquals(join, "Google*Guava*Java*Scala");
}

3. 为空的数据指定一个默认值

1
2
3
4
5
6
7
 //3. 为空的数据指定一个默认值
@Test
public void TestJoinDefalutValue() {
String join = Joiner.on("*").useForNull("Defalut").join(stringListWithNullValue);
System.out.println(join);
assertEquals(join, "Google*Guava*Java*Scala*Defalut");
}

4. 追加到StringBuilder

1
2
3
4
5
6
7
8
9
//4. 追加到StringBuilder
@Test
public void TestJoinDefalutValueStringBuffer() {
StringBuilder stringBuilder = new StringBuilder();
StringBuilder thatBuilder = Joiner.on("*").useForNull("Defalut").appendTo(stringBuilder, stringListWithNullValue);
assertThat(thatBuilder, sameInstance(stringBuilder));
assertThat(stringBuilder.toString(), equalTo("Google*Guava*Java*Scala*Defalut"));
assertThat(thatBuilder.toString(), equalTo("Google*Guava*Java*Scala*Defalut"));
}
阅读全文 »

三次握手—连接

在通信的过程中,ClientServer建立TCP连接需要三次握手,为什么需要三次握手呢?又是怎么握手的过程?

TCP连接是可靠的通信方式,必须要保证两端都同时有效,且线路通畅。

如同两个人通话,但并不确定对方能不能听到,经历几次才能确保通信方式可靠呢?

1
2
3
4
5
A: 请求连接,收到请回复密码1
B:可以连接,密码是1。你听到了吗?听到回复密码2
A:我听到了,密码2。

A就绪,B就绪。线路通畅,可以通话

通过上面的分析,可以明确的知道为什么TCP需要三次握手,因为少了任何一次,都不能确保当前的网络是通畅的。

Q:使用两次握手会有什么问题?

A: 如果使用两次握手协议,B不知道A是否能够收到自己的消息,如果B此时进行补偿,就有可能多次发送。如果B此时忽略,A可能真的没有收到消息。
如果B的消息在传输的过程中丢失了,A也将不知道B有没有真正的准备好。

实际具体过程如下:

阅读全文 »

在不借助DB引擎情况下,想完成数据的持久化存储,最简单的方法写一个文件存在本地,读取的时候加载文件到内存,然后进行筛选。 存储一个user在本地。

1
2
3
4
id|userName|userCode|sex|phoneNumber
1,xiaoming,001,m,126xxx
2,xiaohong,002,w,123xxx
...

上面满足了我们的需求,看到这种数据格式,查询的时间复杂度为O(n).

而且在每次读取文件的时候,需要把所有的数据都加载到内存(全表扫描),对系统的IO读写也很高。

解决上面的问题的最先想到的办法,再增加加一个文件(索引文件),根据字段(id)构建一个Hash表映射,哈希表中存储的是哈希值和数据对应的地址。 由于没有具体的数据,所以这个文件小的多,加载的成本比源文件低。

每次查询先hash对应得字段的值,然后直接在Hask表中找到地址,加载数据返回查询结果,时间复杂度为O(1)这个就是哈希索引雏形。

hash的加入解决查询的问题,但是局限性也很多,如果精确查询id=1能够发挥hash表的优势,查询中如果有>,<<>等查询的时候,就无法使用Hash索引。

能够提高搜索效率的数据结构除了hash表外,还有树形结构,二叉搜索树,B树等。

采用二叉树时候,为了避免查询的深度太深,采用平衡二叉树。 假设user表有10条数据,然后把id作为索引构建成一个二叉平衡树,创建好的数据结构作为索引存储到文件中。

阅读全文 »

B树和B+树是很多数据库索引采用的数据结构,为什么会使用B树,而不采用更常见的二叉树的呢?

举个例子,有这么几个数字:1,2,3,4,5,6,7,8,9,0,分别生成AVL树,B树

AVL树结构
B树

二叉树生出的树的度为4,而3阶B树高度只是3.如果B树的阶数再多的话,就可以获得更小的高度度。

B树

(6阶的B树)

树的度带来更深入查询,会带来更多的IO读写。

除了二叉树的深度太深的原因,二叉树对于操作系统IO的读取也不是特别的友好。二叉树的节点过于简单,信息过于少。当操作系统进行IO操作的时候,最少读取的字节数是4K,为了保证IO的读取性能,也可能进行预读下个节点等等。

如果二叉树的一个节点只有1KB的话,操作系统每次读取二叉树的时候,只有1KB的有效信息,那这次IO操作的剩余的3KB就是无效的读取。

B树

B树又称为多路平衡查找树,在上面看到了B树的简单结构,在真正的文件存储结构如下:

阅读全文 »

昨天写了一个计数器的类,性能高于JDK,思考了很久,后来被同学点破。

 public void increase() {
    long before = unsafe.getLongVolatile(this, offset);
    while (!unsafe.compareAndSwapLong(this, offset, before, before + 1))
    {
        before = unsafe.getLongVolatile(this, offset);
        Thread.yield();
    }
}

有人怀疑是测试的代码问题,后来发现并不是,真正的原因是:在高并发的环境下,CAS修改旧值时经常被其他线程中断,就会进行重试,不断的重试的代价就很高。yield操作能够缓解这个情况,但是也会带来多次上下文切换,在并发没那么高的情况下,反而更浪费资源

所以JDK在写的Atomic的类型的时候,应该是考虑到重试一次的代价,小于线程的上下文切换,所以并没有采用yield操作。

阅读全文 »

Java是以安全著称,但在Java中有一个类是一个Bug级别的存在,那就是Unsafe. 前面已经说过Unsafe在java中的使用,此处我们直接说用法:

避免初始化

当你想跳过对象初始化的阶段,或者绕过构造函数的检查,去实例化没有任何公共构造函数的类,可以使用allocateInstance:

1
2
3
4
5
6
7
class A {
private long a; // not initialized value
public A() {
this.a = 1; // initialization
}
public long a() { return this.a; }
}

使用构造函数、反射和unsafe初始化它,将得到不同的结果。

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) throws IllegalAccessException, InstantiationException {
A o1 = new A(); // constructor
System.out.println(o1.a()); // prints 1

A o2 = A.class.newInstance(); // reflection
System.out.println(o2.a()); // prints 1

A o3 = (A) UnsafeUtils.getUnsafe().allocateInstance(A.class); // unsafe
System.out.println(o3.a()); // prints 0
}

内存崩溃(Memory corruption)

我们可以使用Unsafe去做一些绕过安全的技术:

1
2
3
4
5
6
7
class Guard {
private int ACCESS_ALLOWED = 1;

public boolean giveAccess() {
return 42 == ACCESS_ALLOWED;
}
}

当客户端调用giveAccess代码是,始终返回的都是false.使用Unsafe可以绕过权限:

阅读全文 »

反射动态代理

为了能够更好实现AOP的思想,在Java中有了动态代理概念,与动态代理相对应的就是静态代理,首先看下面的代码。

1
2
3
4
5
6
7
8
9
10
11
public interface IWoker {
String sayHello(String name,String code) ;
}

public class Worker implements IWoker{
@Override
public String sayHello(String name,String code) {
System.out.println("hello");
}
}

如果我们想在sayHello的方法前后分别都打印一条日志,实现AOP的思想,那使用静态代理的方法如下:

1
2
3
4
5
6
7
public class WorkerProxy {
public void aopWorker(IWoker doWoker){
System.out.println("before");
doWoker.sayHello("Rz","Mz");
System.out.println("end");
}
}

定义一个WorkProxy的代理类,在执行方法的时候,使用代理类来执行:

1
2
Worker worker = new Worker();
new WorkerProxy().aopWorker(worker);

静态代理很好立即,即是使用组合类的方式来实现代理模式。这种形式的就是过于单一,代理的接口多的时候,就对应会产生很多代理类。

而java的动态代理就是为了解决这个问题,动态代理实现的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//动态代理类
public class DynaminProxy implements InvocationHandler {
private Object subject;
public DynaminProxy(Object subject)
{
this.subject = subject;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
System.out.println("before");
Object obj = method.invoke(subject, args);
System.out.println("end");
return obj;
}
}

阅读全文 »

今天在学习CAS的时候,想手写一个CAS的计数器,与JDK中的Atomic(AtomicLong,AtomicInteger等)系列的做个比较,本想性能应该能比JDK要差一丢丢,但却加了一个让线程让出时间片的代码,性能反而更高。

由于使用java中的Unsafe类,存在安全问题,直接使用会抛出SecurityException异常,所以Unsage无法直接在代码中调用,有两种方法可以解决这个问题:

  1. 增加调用的参数,让JVM信任,运行程序时候,增加java -Xbootclasspath:/usr/jdkxxxx/jre/lib/rt.jar:. com.mishadoff.magic.UnsafeClient

  2. 使用反射窃取字段的值,也是这篇博客使用的方式,代码如下:

    1
    2
    3
    Field f = Unsafe.class.getDeclaredField("theUnsafe");
    f.setAccessible(true);
    unsafe = (Unsafe) f.get(null);

为了更好的对比计数器的性能,首先定义了一个接口:

1
2
3
4
public interface ICounter {
public void increase();
public long getCounter();
}

对应的测试类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private static void testCounter(ICounter counter, String counterName) throws InterruptedException {
int nThreads = 10;
int maxValue = 1000000;
ExecutorService service = Executors.newFixedThreadPool(nThreads);
// creating instance of specific counter
long before = System.currentTimeMillis();
for (int i = 0; i < nThreads; i++) {
service.submit(() -> {
for (int j = 0; j < maxValue; j++) {
counter.increase();
}
});
}

service.shutdown();
service.awaitTermination(1, TimeUnit.MINUTES);
long after = System.currentTimeMillis();
System.out.println(counterName + " Counter计算结果: " + counter.getCounter());
System.out.println(counterName + " Counter计算耗时:" + (after - before));
System.out.println("================================================");
}

分别写了各种不同的锁的版本的计数器:

  1. 线程不安全的计数器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    private long count=0;
    @Override
    public void increase() {
    count++;
    }

    @Override
    public long getCounter() {
    return count;
    }

    运行结果:

    1
    2
    Counter Counter计算结果: 9824109
    Counter Counter计算耗时:120

  2. synchronized关键字计数器:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public class SyncCounter implements ICounter {
    private long count=0;
    @Override
    public synchronized void increase() {
    count++;
    }

    @Override
    public long getCounter() {
    return count;
    }
    }

    运行结果:

    1
    2
    SyncCounter Counter计算结果: 100000000
    SyncCounter Counter计算耗时:4570

  3. 使用读写锁(ReentrantLockCounter)计数器:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public class ReentrantLockCounter  implements ICounter{
    private long counter = 0;
    private ReentrantReadWriteLock.WriteLock lock = new ReentrantReadWriteLock().writeLock();
    @Override
    public void increase() {
    lock.lock();
    counter++;
    lock.unlock();
    }

    @Override
    public long getCounter() {
    return counter;
    }
    }

    运行结果:

    1
    2
    ReentrantLockCounter Counter计算结果: 100000000
    ReentrantLockCounter Counter计算耗时:3734

  4. 使用JDK中的CAS方式的计数器:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public class AutoCounter implements ICounter {
    private AtomicLong count=new AtomicLong(0);
    @Override
    public void increase() {
    count.incrementAndGet();
    }

    @Override
    public long getCounter() {
    return count.get();
    }
    }

    运行结果:

    1
    2
    AutoCounter Counter计算结果: 100000000
    AutoCounter Counter计算耗时:2930

  5. 手写CAS 方式计数器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public class RzCounter implements ICounter {
    private volatile long counter = 0;
    private Unsafe unsafe;
    private long offset;

    public RzCounter() {
    try {
    Field f = Unsafe.class.getDeclaredField("theUnsafe");
    f.setAccessible(true);
    unsafe = (Unsafe) f.get(null);
    offset = unsafe.objectFieldOffset(RzCounter.class.getDeclaredField("counter"));
    } catch (Exception e) {
    e.printStackTrace();
    }

    }

    public void increase() {
    unsafe.getAndAddLong(this, offset, 1);
    }

    public long getCounter() {
    return counter;
    }
    }

    运行结果:

    1
    2
    RzCounter Counter计算结果: 100000000
    RzCounter Counter计算耗时:3139

  6. 手写CAS方式计数器2:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    public class RzCounter1 implements ICounter {
    private volatile long counter = 0;
    private Unsafe unsafe;
    private long offset;

    public RzCounter1() {
    try {
    Field f = Unsafe.class.getDeclaredField("theUnsafe");//
    f.setAccessible(true);
    unsafe = (Unsafe) f.get(null);
    offset = unsafe.objectFieldOffset(RzCounter1.class.getDeclaredField("counter"));
    } catch (Exception e) {
    e.printStackTrace();
    }

    }

    public void increase() {
    long before = unsafe.getLongVolatile(this, offset);
    while (!unsafe.compareAndSwapLong(this, offset, before, before + 1))
    {
    before = unsafe.getLongVolatile(this, offset);
    Thread.yield();
    }
    }

    public long getCounter() {
    return counter;
    }
    }

    运行结果:

    1
    2
    RzCounter1 Counter计算结果: 100000000
    RzCounter1 Counter计算耗时:1565

经过多次测试,性能的排序始终是经过多次测试,性能的排序始终是 CAS(RzCounter)<原生CAS<第二种CAS(RzCounter1) ,如果真是这样的话,jdk为什么不在incrementAndGet增加一个Thread.yield()

阅读全文 »