Java如何实现零拷贝

什么是零拷贝

在操作系统中,从内核的形态区分,可以分为内核态(Kernel Space)和用户态(User Space)。

在传统的IO中,如果把数据通过网络发送到指定端的时候,数据需要经历下面的几个过程:

IO

  1. 当调用系统函数的时候,CPU执行一系列准备工作,然后把请求发送给DMA处理(DMA可以理解为专门处理IO的组件),DMA将硬盘数据通过总线传输到内存中。
  2. 当程序需要读取内存的时候,这个时候会执行CPU Copy,内存会有内核态写入用户的缓存区。
  3. 系统调用write()方法时,数据从用户态缓冲区写入到网络缓冲区(Socket Buffer), 由用户态编程内核态。
  4. 最后由DMA写入网卡驱动中,传输到网卡的驱动。
阅读更多

Java中实现顺序IO

顺序IO和随机IO


对于磁盘的读写分为两种模式,顺序IO和随机IO。 随机IO存在一个寻址的过程,所以效率比较低。而顺序IO,相当于有一个物理索引,在读取的时候不需要寻找地址,效率很高。

网上盗了一个图(侵权删)
IO

阅读更多

ServiceLoader的使用

获得接口的实现类有点困难

在Java中,由于反射的局限性,无法直接获取一个接口的所有实现子类,所以为了能够实现一个接口动态的注入实现的子类对象,需要借助ServiceLoader

简单的Demo使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

public interface IService {
void doSomeThing();
}

public class DefalutService implements IService{
@Override
public void doSomeThing() {
RzLogger.info("默认服务");
}
}

public class LogService implements IService {
@Override
public void doSomeThing() {
RzLogger.info("日志服务");
}
}


public static void main(String[] args) {
ServiceLoader<IService> loader = ServiceLoader.load(IService.class);
for (IService service : loader) {
service.doSomeThing();
}
}

阅读更多

一个有效的收拾程序运行残局的方法--ShutdownHook

发现addShutdownHook

在阅读QMQ的源码的时候,在Server端启动的时候,注册了一个shutdown的代码,具体的代码如下:

1
2
Runtime.getRuntime().addShutdownHook(new Thread(wrapper::destroy));

addShutdownHook作用

addShutdownHook方法可以添加一个指定的线程来在Java程序退出的时候做一些事情,在以下几个场景会被调用:

  1. 程序运行完成后退出
  2. 使用Ctrl+C时终端退出
  3. 调用系统退出的方法, System.exit(0)
阅读更多

Unsage类的使用

Java是以安全著称,但在Java中有一个类是一个Bug级别的存在,那就是Unsafe. 前面已经说过Unsafe在java中的使用,此处我们直接说用法:

避免初始化

当你想跳过对象初始化的阶段,或者绕过构造函数的检查,去实例化没有任何公共构造函数的类,可以使用allocateInstance:

1
2
3
4
5
6
7
class A {
private long a; // not initialized value
public A() {
this.a = 1; // initialization
}
public long a() { return this.a; }
}

使用构造函数、反射和unsafe初始化它,将得到不同的结果。

1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) throws IllegalAccessException, InstantiationException {
A o1 = new A(); // constructor
System.out.println(o1.a()); // prints 1

A o2 = A.class.newInstance(); // reflection
System.out.println(o2.a()); // prints 1

A o3 = (A) UnsafeUtils.getUnsafe().allocateInstance(A.class); // unsafe
System.out.println(o3.a()); // prints 0
}

内存崩溃(Memory corruption)

我们可以使用Unsafe去做一些绕过安全的技术:

1
2
3
4
5
6
7
class Guard {
private int ACCESS_ALLOWED = 1;

public boolean giveAccess() {
return 42 == ACCESS_ALLOWED;
}
}

当客户端调用giveAccess代码是,始终返回的都是false.使用Unsafe可以绕过权限:

1
2
3
4
5
Guard guard=new Guard();
System.out.println("修改前:"+guard.giveAccess());
Field f = guard.getClass().getDeclaredField("ACCESS_ALLOWED");
unsafe.putInt(guard, unsafe.objectFieldOffset(f), 42); // memory corruption
System.out.println("修改后:"+guard.giveAccess());

计算对象的大小

sizeOf是返回对象的自身内存大小。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static long sizeOf(Object o) {
Unsafe u = UnsafeUtils.getUnsafe();
HashSet<Field> fields = new HashSet<Field>();
Class c = o.getClass();
while (c != Object.class) {
for (Field f : c.getDeclaredFields()) {
if ((f.getModifiers() & Modifier.STATIC) == 0) {
fields.add(f);
}
}
c = c.getSuperclass();
}
// get offset
long maxSize = 0;
for (Field f : fields) {
long offset = u.objectFieldOffset(f);
if (offset > maxSize) {
maxSize = offset;
}
}

return ((maxSize/8) + 1) * 8; // padding
}

如果只是对象类的结构大小,那么可以更简单的实现:

1
2
3
4
5
6
7
8
9
public static long sizeOf(Object object){
return UnsafeUtils.getUnsafe().getAddress(
normalize(UnsafeUtils.getUnsafe().getInt(object, 4L)) + 12L);
}
private static long normalize(int value) {
if(value >= 0) return value;
return (~0L >>> 32) & value;
}

浅拷贝(Shallow copy)

为了计算自身内存的大小,可以简单的添加拷贝的对象方法,标准的解决方案是使用Cloneable修改代码,或者实现自定义的拷贝方法.

浅拷贝:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class ShallowCopy {
static Unsafe unsafe = UnsafeUtils.getUnsafe();
public static Object shallowCopy(Object obj) {
long size = sizeOf(obj);
long start = toAddress(obj);
long address = unsafe.allocateMemory(size);
unsafe.copyMemory(start, address, size);
return fromAddress(address);
}
//toAddress和fromAddress将对象转换为其在内存中的地址,反之亦然。
static long toAddress(Object obj) {
Object[] array = new Object[] {obj};
long baseOffset = unsafe.arrayBaseOffset(Object[].class);
return normalize(unsafe.getInt(array, baseOffset));
}

static Object fromAddress(long address) {
Object[] array = new Object[] {null};
long baseOffset = unsafe.arrayBaseOffset(Object[].class);
unsafe.putLong(array, baseOffset, address);
return array[0];
}
public static long sizeOf(Object o) {

HashSet<Field> fields = new HashSet<Field>();
Class c = o.getClass();
while (c != Object.class) {
for (Field f : c.getDeclaredFields()) {
if ((f.getModifiers() & Modifier.STATIC) == 0) {
fields.add(f);
}
}
c = c.getSuperclass();
}

// get offset
long maxSize = 0;
for (Field f : fields) {
long offset = unsafe.objectFieldOffset(f);
if (offset > maxSize) {
maxSize = offset;
}
}
return ((maxSize/8) + 1) * 8; // padding
}
//normalize是一个为了正确内存地址使用,将有符号的int类型强制转换成无符号的long类型的方法。
private static long normalize(int value) {
if(value >= 0) return value;
return (~0L >>> 32) & value;
}
}

隐藏密码(Hide Password)

Unsafe类中可以删除内存中的对象,比如密码信息。用户的密码大多数都是byte[]或char[]数组,为什么要使用数组呢?

这个是出于安全的考虑,因为我们可以删除不需要的数组元素,如果是一个字符串对象的话,这可以像一个对象在内存中保存,删除该对象只是删除了引用,真正的数据还保存在内存中。

1
2
3
4
5
6
7
8
9
10
11
12

String password = new String("l00k@myHor$e");
String fake = new String(password.replaceAll(".", "?"));
System.out.println(password); // l00k@myHor$e
System.out.println(fake); // ????????????

unsafe.copyMemory(
fake, 0L, null, toAddress(password), sizeOf(password));

System.out.println(password); // ????????????
System.out.println(fake); // ????????????

手写JDK动态代理

反射动态代理

为了能够更好实现AOP的思想,在Java中有了动态代理概念,与动态代理相对应的就是静态代理,首先看下面的代码。

1
2
3
4
5
6
7
8
9
10
11
public interface IWoker {
String sayHello(String name,String code) ;
}

public class Worker implements IWoker{
@Override
public String sayHello(String name,String code) {
System.out.println("hello");
}
}

如果我们想在sayHello的方法前后分别都打印一条日志,实现AOP的思想,那使用静态代理的方法如下:

1
2
3
4
5
6
7
public class WorkerProxy {
public void aopWorker(IWoker doWoker){
System.out.println("before");
doWoker.sayHello("Rz","Mz");
System.out.println("end");
}
}

定义一个WorkProxy的代理类,在执行方法的时候,使用代理类来执行:

1
2
Worker worker = new Worker();
new WorkerProxy().aopWorker(worker);

静态代理很好立即,即是使用组合类的方式来实现代理模式。这种形式的就是过于单一,代理的接口多的时候,就对应会产生很多代理类。

而java的动态代理就是为了解决这个问题,动态代理实现的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
//动态代理类
public class DynaminProxy implements InvocationHandler {
private Object subject;
public DynaminProxy(Object subject)
{
this.subject = subject;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
System.out.println("before");
Object obj = method.invoke(subject, args);
System.out.println("end");
return obj;
}
}

代码调用:

1
2
3
4
5
6
7

IWoker proxy = (IWoker) Proxy.newProxyInstance(
IWoker.class.getClassLoader(),
new Class[]{IWoker.class},
new DynaminProxy(worker));
proxy.sayHello();

从上面的代码可以看出,动态代理返回来的proxy对象很奇怪,通过调试可以看到对应的类型为:

Proxy对象

对于Proxy对象可以通过java语法分析如下:

  1. Proxy对象的类一定是实现了IWorker接口
  2. 执行方法的时候,是通过调用DynaminProxyinvoke方法来调用真正的方法

根据上面的推断,proxy的对象的代码应该是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class $Proxy implements IWoker {
private DynaminProxy h;
public $Proxy(DynaminProxy dynaminProxy) {
this.h = dynaminProxy;
}
@Override
public String sayHello(String name,String code) {
try {
Method m = IWoker.class.getDeclaredMethod("sayHello", name.getClass(),code.getClass);
Object invoke = this.h.invoke(null, m, new Object[]{name,code});
} catch (Throwable ex) {
ex.printStackTrace();
}
}
}

使用代码进行的调用的时候,运行结果一致。在刚刚的代码中,如何实现的这个功能的呢?

1
Proxy.newProxyInstance(IWoker.class.getClassLoader(), new Class[]{IWoker.class}, new DynaminProxy(worker));

再观察动态代理的创建方式,有三个参数ClassLoader,Class的数组和最终的是执行的代码。

根据上面的代码,可以先确定代码实现的思路:

  1. 动态创建动态代理类$Proxy.java
  2. 编译成$Proxy.class
  3. 使用ClassLoader加载对应的类,创建对象返回。

根据上面的思路,可以编写代码如下:

创建$Proxy.java类,实现动态生成类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
StringBuilder classContent = new StringBuilder();
classContent.append("package com.rz.frame.dynamicproxy;");
Class targetInfo = clazz[0];
String targetInfoName = targetInfo.getSimpleName();
String dynamicName = obj.getClass().getSimpleName();
classContent.append(" import ").append(targetInfo.getName()).append(";");
classContent.append("import java.lang.reflect.Method;");
classContent.append("public class $Proxy implements ").append(targetInfoName);
classContent.append("{private ").append(dynamicName).append(" target;");
classContent.append("public $Proxy(").append(dynamicName).append(" target){this.target=target;}");
Method[] declaredMethods = targetInfo.getDeclaredMethods();
for (Method m : declaredMethods) {
String methodName = m.getName();
Class returnTYpe = m.getReturnType();
Class<?>[] parameterTypes = m.getParameterTypes();
StringBuilder argContent = new StringBuilder();
StringBuilder argsName = new StringBuilder();
StringBuilder argsClass = new StringBuilder();
int i = 0;
for (Class param : parameterTypes) {
String sName = param.getSimpleName();
argContent.append(sName).append(" var").append(i).append(",");
argsName.append(" var").append(i).append(",");
argsClass.append(" var").append(i).append(".getClass(),");

i++;
}
if (argContent.length() > 0) {
argContent = new StringBuilder(argContent.substring(0, argContent.lastIndexOf(",")));
argsName = new StringBuilder(argsName.substring(0, argsName.lastIndexOf(",")));
argsClass = new StringBuilder(argsClass.substring(0, argsClass.lastIndexOf(",")));
}
classContent.append("public ").append(returnTYpe.getSimpleName()).append(" ").append(methodName).append("(").append(argContent).append("){");
classContent.append(" try{Method m =").append(targetInfoName).append(".class.getDeclaredMethod(\"").append(methodName).append("\",").append(argsClass).append(");");

classContent.append(" Object invoke = this.target.invoke(").append("this,m,new Object[]{ ").append(argsName).append("});");
classContent.append("return (").append(returnTYpe.getSimpleName()).append(")invoke;} catch (Throwable ex) {return null;}}}");
}

生成的类结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.rz.frame.dynamicproxy;

import com.rz.frame.dynamicproxy.IWoker;

import java.lang.reflect.Method;

public class $Proxy implements IWoker {
private DynaminProxy target;

public $Proxy(DynaminProxy target) {
this.target = target;
}

public String sayHello(String var0, String var1) {
try {
Method m = IWoker.class.getDeclaredMethod("sayHello", var0.getClass(), var1.getClass());
Object invoke = this.target.invoke(this, m, new Object[]{var0, var1});
return (String) invoke;
} catch (Throwable ex) {
return null;
}
}
}

把类写入文件,调用编译器编译成.class文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14

File file = new File(this.getClass().getResource("").getPath().concat("$Proxy.java"));
FileWriter fw = new FileWriter(file);
fw.write(classContent.toString());
fw.flush();
fw.close();

JavaCompiler javaCompiler = ToolProvider.getSystemJavaCompiler();
StandardJavaFileManager fileManager = javaCompiler.getStandardFileManager(null, null, null);
Iterable unit = fileManager.getJavaFileObjects(file);
JavaCompiler.CompilationTask task = javaCompiler.getTask(null, fileManager, null, null, null, unit);
task.call();
fileManager.close();

使用classloader加载class

1
2
3
4
5
6

Class<?> aClass = classLoader.loadClass("com.rz.frame.dynamicproxy.$Proxy");
Constructor<?> constructor = aClass.getConstructor(obj.getClass());
Object o = constructor.newInstance(obj);
return o;

手写一个jdbc数据库连接池

Java访问mysql的时候,需要用到jdbc驱动,传统连接方式为:

1
2
3
4
5
6
7
8
9
10
11
try {
Driver mysqlDriver = (Driver) Class.forName("com.mysql.jdbc.Driver").newInstance();
DriverManager.registerDriver(mysqlDriver);
Connection connection = DriverManager.getConnection("jdbc:mysql://192.168.0.***:3306/rzframe?useSSL=false&serverTimezone=UTC", "root", "*******");
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("select * from rz_user");//查询
connection.close();
} catch (Exception e) {
e.printStackTrace();
}

我们对上面的代码做一个简单的性能测试,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static void main(String[] args) {
long start = System.currentTimeMillis();
CountDownLatch countDownLatch = new CountDownLatch(100);
for (int i = 0; i < 1000; i++) {
try {
CountDownLatch finalCountDownLatch = countDownLatch;
Thread thread = new Thread(() -> {
try {
doJDBC();
} catch (Exception ex) {

} finally {
finalCountDownLatch.countDown();
}
});
thread.start();
if (i != 0 && i % 100 == 0) {
countDownLatch.await();
System.out.println(i);
countDownLatch = new CountDownLatch(100);
}
} catch (Exception ex) {

}
}
long end = System.currentTimeMillis();
System.out.println("耗时:" + (end - start));

}

上面代码用了100个线程分批次去完成查询的动作,在我的机器上运行时间45s左右。

从上面的代码可以看出问题,Connection对象每一次都是重新创建,查询完成后,直接是调用close方法,如果不释放,会报连接数过多的异常。 如果查询多次,那浪费在创建Connection的时间就会很多,我们知道在程序优化的手段中,有一个池化可以很好的解决这个问题。

池化的概念就是先创建多个对方存在在一个容器中,当时候的时候可以直接拿出来时候,用完后再进行归还。 跟着这个思想,我们来创建自己的连接池。

编写思路

  1. 创建一个线程安全的容器(由于是多线程访问),队列或者是list,因为Connection的对象并不是有序的,所以可以使用list容器

  2. 对Connection的对象进行封装,增加一个isBusy变量,每次读取的时候就可以选出空闲的Connection对象

  3. 如果取的时候,没有可用的Connection对象,则可以再自动创建对象,可以自动扩容,直到扩容到允许的最大值。

封装的Connection类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class PooledConnection {

private boolean isBusy=false;
private Connection connection;

public PooledConnection(Connection connection, boolean b) {
this.isBusy=b;
this.connection=connection;
}

public boolean isBusy() {
return isBusy;
}

public void setBusy(boolean busy) {
isBusy = busy;
}

public Connection getConnection() {
return connection;
}

public void setConnection(Connection connection) {
this.connection = connection;
}
public void close() {
this.setBusy(false);
}
}

包装好Connection后,可以考虑如何对Connection进行创建和分配,需要有以下几个方法:

1
2
3
PooledConnection getPooledConnection();

void createPooledConnection();

为了更好的程序调试,先定义几个初始的参数变量:

1
2
3
4
5
6
7
8
9
10
11
12
//数据库相关参数
private static String jdbcDriver = null;
private static String jdbcUrl = null;
private static String userName = null;
private static String password = null;

//容器参数
private static int initCount;//初始数量
private static int stepSize;//每次扩容的数量
private static int poolMaxSize;//最大数量
//全局锁
private static Lock lock;

为了保证线程安全,使用线程安全的Vector集合。

获得对象方法

  1. 获得对象的方法,应该是先找到一个空闲的PooledConnection变量,如果有就直接返回。

  2. 如果没有空闲的变量,则尝试进行扩充,扩充由一个线程完成,其他线程则等待,或者尝试再次获取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public PooledConnection getPooledConnection() throws RuntimeException, SQLException {
PooledConnection realConnection = getRealConnection();
while (realConnection == null) {
if (lock.tryLock()) {//尝试获取锁
createConnections(stepSize);//只能让一个线程扩容 获得锁之后进行扩容
lock.unlock();
} else {
try {
Thread.sleep(200);//线程等待
} catch (InterruptedException e) {
}
}
realConnection = getRealConnection();//再次尝试获取
if (realConnection != null) {
return realConnection;
}
}
System.out.println("线程池线程数量:" + PoolsConnections.size());
return realConnection;
}

private PooledConnection getRealConnection() throws SQLException {
for (PooledConnection pooledConnection : PoolsConnections) {
try {
if (pooledConnection.isBusy())
continue;
Connection connection = pooledConnection.getConnection();
if (!connection.isValid(200)) {//是否有效,200ms 没有被超时
System.out.println("连接无效");
Connection validConnect = DriverManager.getConnection(jdbcUrl, userName, password);
pooledConnection.setConnection(validConnect);
}
pooledConnection.setBusy(true);
return pooledConnection;
} catch (SQLException e) {
return null;
}
}
return null;
}

扩容方法对象

扩容的方法相对比较简单,判断当前对象数量有没有溢出,如果没有溢出,就进行扩容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void createConnections(int count) throws OutofMaxCountException, IllegalArgumentException {
if (poolMaxSize <= 0) {
System.out.println("创建管道对象失败,最大值参数错误");
throw new IllegalArgumentException("创建管道对象失败,最大值参数错误");
}
//判断是否有溢出
boolean overFlow = isOverFlow(count);
if (overFlow) {
return;
}
System.out.println("扩容");
for (int i = 0; i < count; i++) {
try {
overFlow = isOverFlow(count);
if (overFlow)
return;
Connection connection = DriverManager.getConnection(jdbcUrl, userName, password);
PooledConnection pooledConnection = new PooledConnection(connection, false);
PoolsConnections.add(pooledConnection);
} catch (SQLException e) {
e.printStackTrace();
}
}
System.out.println("扩容数量:" + PoolsConnections.size());
}

private boolean isOverFlow(int count) {
if (PoolsConnections.size() + count >= poolMaxSize) {
return true;
}
return false;
}

上面的代码隐藏一个问题,我们增加对数据的查询方法,方便我们测试。 查询方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public ResultSet querySql(String sql) {
try {
PooledConnection pooledConnection = getPooledConnection();
Connection connection = pooledConnection.getConnection();
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql);
Thread.sleep(1000);
pooledConnection.close();
return resultSet;
} catch (Exception e) {

}
return null;
}

我们对代码做性能测试同样的测试,在我的电脑运行时间为5s左右,大概快了10倍。 但经过多次测试,代码抛出了ConcurrentModificationException异常,这个异常的原因是因为在使用的时候,我们又修改了正在使用的对象。所以在使用的时候要对对象进行加一个读写锁。

为了锁不至于影响到锁的性能,我们把锁碎片化,采用针对每一个对象进行加锁,而不是全局加锁。修改后的封装对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class PooledConnection {
private boolean isBusy = false;
private Connection connection;
private ReentrantReadWriteLock reentrantReadWriteLock;

public PooledConnection(Connection connection, boolean b) {
this.connection = connection;
reentrantReadWriteLock = new ReentrantReadWriteLock();
}
public boolean isBusy() {
return isBusy;
}

public void setBusy(boolean busy) {
isBusy = busy;
}

public Connection getConnection() {
return connection;
}
public void setConnection(Connection connection) {
this.connection = connection;
}
public void close() {
this.setBusy(false);
}
public void shutDown() {
try {
this.connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}

//增加读写锁的操作
public void writeLock() {
this.reentrantReadWriteLock.writeLock().lock();
}
public void unWriteLock() {
this.reentrantReadWriteLock.writeLock().unlock();
}
public void readLock() {
this.reentrantReadWriteLock.readLock().lock();
}
public void unReadLock() {
this.reentrantReadWriteLock.readLock().unlock();
}
}

最终结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
public PooledConnection getPooledConnection() throws RuntimeException, SQLException {
if (poolMaxSize <= 0) {
System.out.println("创建管道对象失败,最大值参数错误");
throw new IllegalArgumentException("创建管道对象失败,最大值参数错误");
}
PooledConnection realConnection = getRealConnection();
while (realConnection == null) {
if (lock.tryLock()) {//尝试获取锁
createConnections(stepSize);//获得锁之后进行扩容
lock.unlock();
} else {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
}
realConnection = getRealConnection();
if (realConnection != null) {
return realConnection;
}
}

return realConnection;
}

private PooledConnection getRealConnection() {
for (PooledConnection pooledConnection : PoolsConnections) {
try {
if (pooledConnection.isBusy())
continue;
/*
此处要保证写的时候不能被读取,不然会报ConcurrentModificationException异常
*/
pooledConnection.writeLock();//读写互斥,写写互斥
Connection connection = pooledConnection.getConnection();
if (!connection.isValid(200)) {//是否有效,200ms 没有被超时
Connection validConnect = DriverManager.getConnection(jdbcUrl, userName, password);
pooledConnection.setConnection(validConnect);
}
pooledConnection.setBusy(true);
pooledConnection.unWriteLock();
return pooledConnection;
} catch (SQLException e) {

return null;
}
}
return null;
}
public void createConnections(int count) throws OutofMaxCountException, IllegalArgumentException {
if (poolMaxSize <= 0) {
System.out.println("创建管道对象失败,最大值参数错误");
throw new IllegalArgumentException("创建管道对象失败,最大值参数错误");
}
//判断是否有溢出
boolean overFlow = isOverFlow(count);
if (overFlow) {
return;
}
System.out.println("扩容");
for (int i = 0; i < count; i++) {
try {
overFlow = isOverFlow(count);
if (overFlow)
return;
Connection connection = DriverManager.getConnection(jdbcUrl, userName, password);
PooledConnection pooledConnection = new PooledConnection(connection, false);
PoolsConnections.add(pooledConnection);
} catch (SQLException e) {

}
}
System.out.println("扩容数量:" + PoolsConnections.size());
}

private boolean isOverFlow(int count) {
if (PoolsConnections.size() + count >= poolMaxSize) {
return true;
}
return false;
}

碰到问题

  1. 首先是无法控制连接最大的数量 ,问题出在扩容没有控制一个线程扩容,使用tryLock解决,代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    while (realConnection == null) {
    if (lock.tryLock()) {//尝试获取锁
    createConnections(stepSize);//只能让一个线程扩容 获得锁之后进行扩容
    lock.unlock();
    } else {
    try {
    Thread.sleep(200);
    } catch (InterruptedException e) {
    }
    }
    realConnection = getRealConnection();
    if (realConnection != null) {
    return realConnection;
    }
    }

  2. ConcurrentModificationException异常,在读取的使用的时候,对象有写入操作,需要保证读取可以并发,读写不能一起,写不同对象是可以并发,使用读写锁可以解决:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    reentrantReadWriteLock.writeLock().lock();//读写互斥,写写互斥
    if (!connection.isValid(2000)) {//是否有效,200ms 没有被超时
    System.out.println("连接无效");
    Connection validConnect = DriverManager.getConnection(jdbcUrl, userName, password);
    pooledConnection.setConnection(validConnect);
    }
    pooledConnection.setBusy(true);
    reentrantReadWriteLock.writeLock().unlock();

    1
    2
    3
    4
    reentrantReadWriteLock.readLock().lock();
    Statement statement = connection.createStatement();
    ResultSet resultSet = statement.executeQuery(sql);
    reentrantReadWriteLock.readLock().unlock();

    使用上面的代码会存在一个性能问题,就是在写入的时候,如果写入的是不同对象,写入也会进行排斥,所以应该对单个PooledConnection使用锁。

  3. 把锁进行碎片化优化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    public class PooledConnection {
    private boolean isBusy = false;
    private Connection connection;
    private boolean isUsing = false;
    private ReentrantReadWriteLock reentrantReadWriteLock;

    public PooledConnection(Connection connection, boolean b) {
    this.isBusy = b;
    this.connection = connection;
    this.isUsing = false;
    reentrantReadWriteLock = new ReentrantReadWriteLock();
    }
    public PooledConnection() {

    reentrantReadWriteLock = new ReentrantReadWriteLock();
    }
    public boolean isBusy() {
    return isBusy;
    }
    public void setBusy(boolean busy) {
    isBusy = busy;
    }
    public Connection getConnection() {
    this.isUsing = true;
    return connection;
    }
    public boolean isUsing() {
    return isUsing;
    }
    public void setUsing(boolean using) {
    isUsing = using;
    }
    public void setConnection(Connection connection) {
    this.connection = connection;
    }
    public void close() {

    this.isUsing = false;
    this.setBusy(false);
    }
    public void shutDown() {
    try {
    this.connection.close();
    } catch (SQLException e) {
    e.printStackTrace();
    }
    }
    public void writeLock() {
    this.reentrantReadWriteLock.writeLock().lock();
    }
    public void unWriteLock() {
    this.reentrantReadWriteLock.writeLock().unlock();
    }
    public void readLock() {
    this.reentrantReadWriteLock.readLock().lock();
    }
    public void unReadLock() {
    this.reentrantReadWriteLock.readLock().unlock();
    }
    }

读的时候加入读锁:

1
2
3
4
5
6
7
8
9
10
11
12
PooledConnection pooledConnection = getPooledConnection();
/*
此处要保证读的时候不能被修改,使用读锁
*/
pooledConnection.readLock();
Connection connection = pooledConnection.getConnection();
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql);
Thread.sleep(1000);
pooledConnection.close();
pooledConnection.unReadLock();
return resultSet;

写入加锁:

1
2
3
4
5
6
7
8
9
pooledConnection.writeLock();//读写互斥,写写互斥
Connection connection = pooledConnection.getConnection();
if (!connection.isValid(200)) {//是否有效,200ms 没有被超时
Connection validConnect = DriverManager.getConnection(jdbcUrl, userName, password);
pooledConnection.setConnection(validConnect);
}
pooledConnection.setBusy(true);
pooledConnection.unWriteLock();
return pooledConnection;

优化后耗时:耗时为:3692ms 。

Java的垃圾回收

Java内存模型

Java把内存大概分为:方法区,虚拟机栈,本地方法栈,堆和程序计数器5个区。

程序计数器 是一块比较小的内存空间,可以看成是一个线程的运行的指示器,是一个线程私有的内存。由于JVM的运行时通过线程抢占CPU的时间片实现的(在任何一个时刻,一个CPU只能执行一个指令),如果存在线程的上下文切换,每个线程在被移除时间片的时候都需要记录执行的位置。如果当前正在执行的是Native方法,则不需要计数器来标记。

虚拟机栈也是线程私有的内存,Java代码在运行的时候,会对每一个方法创建一个栈帧,用于存储局部变量,操作数据,动态链接等数据。

本地方法栈 和虚拟机栈作用类似,本地方法栈是为了Java调用Native方法服务。

Java堆 是Java常用的引用对象的真实存储的区域,也是修改和访问最为频繁的区域。

Java的内存模型分类大致如下:

Java内存结构

垃圾回收算法


标记清除方法

标记清除算法是最基础的算法,算法分为“标记”和“清除”两个阶段: 先标记出需要回收的对象,然后再统一回收所有标记的对象。

标记清除算法的缺点主要有两个:一个效率比较慢,一个是会产生内存碎片。碎片太多会导致在分配较大对象的时候,无法找到连续的空间,而不得不提前触发一次垃圾回收。

复制算法

为了解决标记清除法的效率问题,出现了复制算法。把内存分为大小相同的两个部分。使用的时候,只是用其中的一块,每次进行垃圾回收的时候,就把存活的对象复制到另外一块内存,然后把当前的内存区域直接清空。

复制算法比较简单高效,但是带来的后果是内存的浪费,可使用的内存只有原来的半。

标记整理算法

结合上面两种算法的有点和缺点,标记整理的算法就能够很好的解决浪费和内存的碎片问题。

标记整理的算法,标记过程与标记清除算法相同,只是在回收的时候,把存活得对象都向一侧移动,然后直接清理掉边界以外的内存。

标记整理虽然解决了内存的碎片化的问题,但还是没有解决性能问题。

分代收集算法

分代收集算法是一个综合算法,根据不同对象的生存周期而采用上面三种算法。

Java一般把堆内存分为老年代和新生代,新生代存活率低,只需要少量存活得空间就可以了回收,则可以使用复制算法。

老年代存活率高,没有额外的空间担保,就只能是由标记整理或者标记清除方法。

HotSpot垃圾回收算法

在Hotspot的虚拟机中,如何实现垃圾内存的标记,首先我们需要理解一个概念:GC Root节点和可达性分析

GC Root节点包含以下几种类型:

 1. 虚拟机栈(栈帧中的本地变量表)中引用的对象。

 2. 本地方法栈中JNI(即一般说的native方法)引用的对象。

 3. 方法区中的静态变量和常量引用的对象。  

在Java中判断一个对象的是否存活,都是通过判断这个对象是否具有可达性,即变量是否被GC Root节点引用。

在HotSpot的虚拟机中,使用一组ooPMap的数据结构来记录对象的引用关系。

Java中常用的收集器


  1. Serial收集器

    Serial是最基本的,时间最长的垃圾收集器,采用复制算法回收。Serial在JDK1.3以前都已经在使用了,从名字可以看出Serial收集器是单线程工作的。单线程带来的问题是在程序在垃圾回收的时候,会出现停顿。 Serial收集器在有的场景中的有点点也很多,由于没有cpu上下文的切换,是的Serial收集器相对比较简单高效,短暂的暂停只要不是过于频繁,还是能够被接受的。

  2. ParNew收集器

    ParNew是Serial收集器的多线程版本,可以通过XX:ParallelGCThread参数来控制会受到的线程数,在单个CPU的环境下,由于存在线程的上下文的切换,所以性能不一定能保证优于Serial收集器。

  3. Parallel Scavenge 收集器

    Parallel Scavenge 收集器是新生代的收集器,也是采用复制算法。和其他收集器不同的是,Parallel Scavenge 收集器只关心吞吐量,而不关心GC的暂停时间。
    举一个简单的场景,如果一个垃圾回收过程,一次GC需要1s,如果分成4次,每次需要0.5s,两次GC的时间分别是1s和2s,对于程序的体验来后,后者的GC时间的停顿间隔低于前者,大多数GC回收期都会采用后面的回收机制,而对于Parallel Scavenge 收集器会选择前者,而不会选多次回收来降低GC的停顿时间。

    Parallel Scavenge 收集器 提供了两个参数来控制吞吐量,XX:MaxGCPausmillis控制停顿时间,-XX:GCTimeRatio设置吞吐量的大小。

  4. Serial Old 收集器

    Serial Old 收集器是Serial收集器的老年代的版本,同样是一个单线程版本,采用标记整理算法。

  5. Parallel Old收集器

    Parallel Old收集器是 Parallel Scavenge 收集器的一个老年代版本,采用标记整理算法。在JDK1.6版本中才开始提供,在此之前,如果新生代采用了Parallel Scavenge收集器,老年代回收除了Serial收集器之外别无选择。由于Serial收集器的效率拖累,所以Parallel Old收集器应运而生。

  6. CMS收集器

    CMS收集器是一种以获取最短回收停顿时间为目标的收集器,大部分运行在BS的服务端。CMS收集器主要有4个步骤:

    1. 初始标记
    2. 并发标记
    3. 重新标记
    4. 并发清除

    初始标记是仅仅标记一下GC Roots直接关联到的对象,速度很快。并发标记就是进行GC Roots Tracing的过程,重新标记是为了修正有变动的的对象。在初始标记和重新标记的时候,需要“Stop the world”。

  7. G1 收集器

    G1收集器是最新的收集器,也是面向服务端的垃圾收集器,G1具有一下几个特点:

    1. 并发和并行
    2. 分代手机
    3. 空间整合
    4. 可预测的停顿

    G1 收集器大致分为下面几个步骤:

    1. 初始标记
    2. 并发标记
    3. 最终标记
    4. 筛选回收

    与CMS对比,最终标记和CMS重新标记相同,不同的在于筛选回收,筛选回收是先对各个回收的Region的回收价值和成本进行综合排序,根据用户的期望来进行回收。

内存分配和垃圾回收的策略


在Java的堆内存中可以简单把内存分为如下结构:

Java内存结构

  1. 对象优先在Eden分配

    在大多数情况下,对象的分配优先在新生代Eden中分配,当Eden没有足够的空间进行分配的时候虚拟机触发一次Minor GC

  2. 大对象直接进入老年代

    大对象通常是指很长字符串和数组,新生代的内存无法安置,就直接进入老年代空间尽心存放,大对象内存的阈值可以用-XX:PretenureSizeThreshold参数来定义。

  3. 长期存活的对象进入老年代

    对象在Eden中出生,并经历了一次MinorGC后仍然存活,并且能够被Survivor存放的话,将会把对象从Eden区域移动到S1区域,年代就记为1岁。当年龄增加到一定(默认是15岁)就会被晋升到老年代,这个阈值可以通过-XX:MaxTenuringThreshold设置。

    为了更好的适应不同程序的内存情况,虚拟机并不是简单的要求对象的年龄必须达到某个阈值,如果在Survivor空间中相同年龄所有对象的大小综合大于Survivor空间的一半,则年龄大于或等于这个年龄的对象可以直接进入老年代。

  4. 空间分配担保

    为了保证Minor GC能够顺利执行,虚拟机会在MinorGC 回收前检查老年代最大可用的连续内存空间是否大于新生代所有对象总和,如果条件成立,该次MinorGC可以安全执行。

    如果不成立,虚拟机会查看是否允许担保失败,如果允许,则虚拟机会继续检查可用空间是否大于历次晋升到老年代的平均水平,如果大于,则尝试进行一次MinorGC,显然这次回收是有风险的,如果分配失败则会重新触发一次FULL GC。

    如果虚拟机设置不允许担保失败,则会进行一次FULL GC。

newFixedThreadPool线程池导致线程泄漏

现象问题

最近看到线上的项目线程数过大的报警,开始还是不知道什么原因,因为很多项目都使用同样的线程池管理代码,认为那个项目是偶然的因素造成的,后来经过分析,发现线程数每天都在增加。其他的项目由于发布导致线程会从零开始计算,所以没有那么快达到报警值。 触发报警的代码大概如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
boolean start=true;
public void doSomeThing(){
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
Thread thread = new Thread(() -> {
while (start) {
try {
if(.//判断能否执行){
Thread.sleep(100);
return;
}
executorService.execute(() -> {
try {
//....your code
} catch (Exception e) {

} finally {

}
});
} catch (Exception e) {
}
}
});
thread.start();
}

public int getMaxThreadCount(){
return ....;
}
public void stop(){
this.start=false;
}

上面的代码存在两个问题:

  1. start是个主线程的变量,在主线程修改值,子线程的while循环不会停止

    上述代码能够停止,因为在内部调用`Thread.sleep方法,导致线程内的变量刷新

  2. newFixedThreadPool 线程池没有调用shutdown方法,导致线程不会被回收。

改正方法:

  1. start 设置成线程共享变量volatile类型

  2. 在最后调用停止的时候,让线程池进行回收

修改后代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

volatile boolean start=true;
ExecutorService executorService;
Thread thread ;
public void doSomeThing(){
executorService = Executors.newFixedThreadPool(nThreads);
thread = new Thread(() -> {
while (start) {
try {
if(.//判断能否执行){
Thread.sleep(100);
return;
}
executorService.execute(() -> {
try {
//....your code
} catch (Exception e) {

} finally {

}
});
} catch (Exception e) {
}
}
});
thread.start();
}

public int getMaxThreadCount(){
return ....;
}
public void stop(){
this.start=false;
if (executorService != null) {
executorService.shutdown();
executorService.awaitTermination(MaxConcurrency() * 3, TimeUnit.SECONDS);
for (int i = 0; i < 10 && !executorService.isTerminated(); i++) {
Thread.sleep(1000);
}
}
if (thread != null && thread.isAlive()) {
thread.interrupt();
thread.join(2000);
}
}

最后的疑问

线程池在最后不使用后,为什么线程没有被释放?GC为什么没有把线程池对象回收?是怎么做到的?

目前还没有找到问题的答案,等找到后回来更新。

Java反射和注解

反射

反射是指在运行的状态,对于任意一个类,都能够知道类里面的所有的属性和方法,并能够进行属性的赋值和方法的调用 。 在Java中使用Java.lang下面的Class来表示**类型的”类” ** ,在JDK中定义接口如下

阅读更多
Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×