手写一个Jdbc数据库连接池

Java访问mysql的时候,需要用到jdbc驱动,传统连接方式为:

1
2
3
4
5
6
7
8
9
10
11
try {
Driver mysqlDriver = (Driver) Class.forName("com.mysql.jdbc.Driver").newInstance();
DriverManager.registerDriver(mysqlDriver);
Connection connection = DriverManager.getConnection("jdbc:mysql://192.168.0.***:3306/rzframe?useSSL=false&serverTimezone=UTC", "root", "*******");
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("select * from rz_user");//查询
connection.close();
} catch (Exception e) {
e.printStackTrace();
}

我们对上面的代码做一个简单的性能测试,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public static void main(String[] args) {
long start = System.currentTimeMillis();
CountDownLatch countDownLatch = new CountDownLatch(100);
for (int i = 0; i < 1000; i++) {
try {
CountDownLatch finalCountDownLatch = countDownLatch;
Thread thread = new Thread(() -> {
try {
doJDBC();
} catch (Exception ex) {

} finally {
finalCountDownLatch.countDown();
}
});
thread.start();
if (i != 0 && i % 100 == 0) {
countDownLatch.await();
System.out.println(i);
countDownLatch = new CountDownLatch(100);
}
} catch (Exception ex) {

}
}
long end = System.currentTimeMillis();
System.out.println("耗时:" + (end - start));

}

上面代码用了100个线程分批次去完成查询的动作,在我的机器上运行时间45s左右。

从上面的代码可以看出问题,Connection对象每一次都是重新创建,查询完成后,直接是调用close方法,如果不释放,会报连接数过多的异常。 如果查询多次,那浪费在创建Connection的时间就会很多,我们知道在程序优化的手段中,有一个池化可以很好的解决这个问题。

池化的概念就是先创建多个对方存在在一个容器中,当时候的时候可以直接拿出来时候,用完后再进行归还。 跟着这个思想,我们来创建自己的连接池。

编写思路

  1. 创建一个线程安全的容器(由于是多线程访问),队列或者是list,因为Connection的对象并不是有序的,所以可以使用list容器

  2. 对Connection的对象进行封装,增加一个isBusy变量,每次读取的时候就可以选出空闲的Connection对象

  3. 如果取的时候,没有可用的Connection对象,则可以再自动创建对象,可以自动扩容,直到扩容到允许的最大值。

封装的Connection类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class PooledConnection {

private boolean isBusy=false;
private Connection connection;

public PooledConnection(Connection connection, boolean b) {
this.isBusy=b;
this.connection=connection;
}

public boolean isBusy() {
return isBusy;
}

public void setBusy(boolean busy) {
isBusy = busy;
}

public Connection getConnection() {
return connection;
}

public void setConnection(Connection connection) {
this.connection = connection;
}
public void close() {
this.setBusy(false);
}
}

包装好Connection后,可以考虑如何对Connection进行创建和分配,需要有以下几个方法:

1
2
3
PooledConnection getPooledConnection();

void createPooledConnection();

为了更好的程序调试,先定义几个初始的参数变量:

1
2
3
4
5
6
7
8
9
10
11
12
//数据库相关参数
private static String jdbcDriver = null;
private static String jdbcUrl = null;
private static String userName = null;
private static String password = null;

//容器参数
private static int initCount;//初始数量
private static int stepSize;//每次扩容的数量
private static int poolMaxSize;//最大数量
//全局锁
private static Lock lock;

为了保证线程安全,使用线程安全的Vector集合。

获得对象方法

  1. 获得对象的方法,应该是先找到一个空闲的PooledConnection变量,如果有就直接返回。

  2. 如果没有空闲的变量,则尝试进行扩充,扩充由一个线程完成,其他线程则等待,或者尝试再次获取。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    public PooledConnection getPooledConnection() throws RuntimeException, SQLException {
    PooledConnection realConnection = getRealConnection();
    while (realConnection == null) {
    if (lock.tryLock()) {//尝试获取锁
    createConnections(stepSize);//只能让一个线程扩容 获得锁之后进行扩容
    lock.unlock();
    } else {
    try {
    Thread.sleep(200);//线程等待
    } catch (InterruptedException e) {
    }
    }
    realConnection = getRealConnection();//再次尝试获取
    if (realConnection != null) {
    return realConnection;
    }
    }
    System.out.println("线程池线程数量:" + PoolsConnections.size());
    return realConnection;
    }

    private PooledConnection getRealConnection() throws SQLException {
    for (PooledConnection pooledConnection : PoolsConnections) {
    try {
    if (pooledConnection.isBusy())
    continue;
    Connection connection = pooledConnection.getConnection();
    if (!connection.isValid(200)) {//是否有效,200ms 没有被超时
    System.out.println("连接无效");
    Connection validConnect = DriverManager.getConnection(jdbcUrl, userName, password);
    pooledConnection.setConnection(validConnect);
    }
    pooledConnection.setBusy(true);
    return pooledConnection;
    } catch (SQLException e) {
    return null;
    }
    }
    return null;
    }

扩容方法对象

扩容的方法相对比较简单,判断当前对象数量有没有溢出,如果没有溢出,就进行扩容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public void createConnections(int count) throws OutofMaxCountException, IllegalArgumentException {
if (poolMaxSize <= 0) {
System.out.println("创建管道对象失败,最大值参数错误");
throw new IllegalArgumentException("创建管道对象失败,最大值参数错误");
}
//判断是否有溢出
boolean overFlow = isOverFlow(count);
if (overFlow) {
return;
}
System.out.println("扩容");
for (int i = 0; i < count; i++) {
try {
overFlow = isOverFlow(count);
if (overFlow)
return;
Connection connection = DriverManager.getConnection(jdbcUrl, userName, password);
PooledConnection pooledConnection = new PooledConnection(connection, false);
PoolsConnections.add(pooledConnection);
} catch (SQLException e) {
e.printStackTrace();
}
}
System.out.println("扩容数量:" + PoolsConnections.size());
}

private boolean isOverFlow(int count) {
if (PoolsConnections.size() + count >= poolMaxSize) {
return true;
}
return false;
}

上面的代码隐藏一个问题,我们增加对数据的查询方法,方便我们测试。 查询方法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public ResultSet querySql(String sql) {
try {
PooledConnection pooledConnection = getPooledConnection();
Connection connection = pooledConnection.getConnection();
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql);
Thread.sleep(1000);
pooledConnection.close();
return resultSet;
} catch (Exception e) {

}
return null;
}

我们对代码做性能测试同样的测试,在我的电脑运行时间为5s左右,大概快了10倍。 但经过多次测试,代码抛出了ConcurrentModificationException异常,这个异常的原因是因为在使用的时候,我们又修改了正在使用的对象。所以在使用的时候要对对象进行加一个读写锁。

为了锁不至于影响到锁的性能,我们把锁碎片化,采用针对每一个对象进行加锁,而不是全局加锁。修改后的封装对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class PooledConnection {
private boolean isBusy = false;
private Connection connection;
private ReentrantReadWriteLock reentrantReadWriteLock;

public PooledConnection(Connection connection, boolean b) {
this.connection = connection;
reentrantReadWriteLock = new ReentrantReadWriteLock();
}
public boolean isBusy() {
return isBusy;
}

public void setBusy(boolean busy) {
isBusy = busy;
}

public Connection getConnection() {
return connection;
}
public void setConnection(Connection connection) {
this.connection = connection;
}
public void close() {
this.setBusy(false);
}
public void shutDown() {
try {
this.connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}

//增加读写锁的操作
public void writeLock() {
this.reentrantReadWriteLock.writeLock().lock();
}
public void unWriteLock() {
this.reentrantReadWriteLock.writeLock().unlock();
}
public void readLock() {
this.reentrantReadWriteLock.readLock().lock();
}
public void unReadLock() {
this.reentrantReadWriteLock.readLock().unlock();
}
}

最终结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
public PooledConnection getPooledConnection() throws RuntimeException, SQLException {
if (poolMaxSize <= 0) {
System.out.println("创建管道对象失败,最大值参数错误");
throw new IllegalArgumentException("创建管道对象失败,最大值参数错误");
}
PooledConnection realConnection = getRealConnection();
while (realConnection == null) {
if (lock.tryLock()) {//尝试获取锁
createConnections(stepSize);//获得锁之后进行扩容
lock.unlock();
} else {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
}
realConnection = getRealConnection();
if (realConnection != null) {
return realConnection;
}
}

return realConnection;
}

private PooledConnection getRealConnection() {
for (PooledConnection pooledConnection : PoolsConnections) {
try {
if (pooledConnection.isBusy())
continue;
/*
此处要保证写的时候不能被读取,不然会报ConcurrentModificationException异常
*/
pooledConnection.writeLock();//读写互斥,写写互斥
Connection connection = pooledConnection.getConnection();
if (!connection.isValid(200)) {//是否有效,200ms 没有被超时
Connection validConnect = DriverManager.getConnection(jdbcUrl, userName, password);
pooledConnection.setConnection(validConnect);
}
pooledConnection.setBusy(true);
pooledConnection.unWriteLock();
return pooledConnection;
} catch (SQLException e) {

return null;
}
}
return null;
}
public void createConnections(int count) throws OutofMaxCountException, IllegalArgumentException {
if (poolMaxSize <= 0) {
System.out.println("创建管道对象失败,最大值参数错误");
throw new IllegalArgumentException("创建管道对象失败,最大值参数错误");
}
//判断是否有溢出
boolean overFlow = isOverFlow(count);
if (overFlow) {
return;
}
System.out.println("扩容");
for (int i = 0; i < count; i++) {
try {
overFlow = isOverFlow(count);
if (overFlow)
return;
Connection connection = DriverManager.getConnection(jdbcUrl, userName, password);
PooledConnection pooledConnection = new PooledConnection(connection, false);
PoolsConnections.add(pooledConnection);
} catch (SQLException e) {

}
}
System.out.println("扩容数量:" + PoolsConnections.size());
}

private boolean isOverFlow(int count) {
if (PoolsConnections.size() + count >= poolMaxSize) {
return true;
}
return false;
}

碰到问题

  1. 首先是无法控制连接最大的数量 ,问题出在扩容没有控制一个线程扩容,使用tryLock解决,代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    while (realConnection == null) {
    if (lock.tryLock()) {//尝试获取锁
    createConnections(stepSize);//只能让一个线程扩容 获得锁之后进行扩容
    lock.unlock();
    } else {
    try {
    Thread.sleep(200);
    } catch (InterruptedException e) {
    }
    }
    realConnection = getRealConnection();
    if (realConnection != null) {
    return realConnection;
    }
    }

  2. ConcurrentModificationException异常,在读取的使用的时候,对象有写入操作,需要保证读取可以并发,读写不能一起,写不同对象是可以并发,使用读写锁可以解决:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    reentrantReadWriteLock.writeLock().lock();//读写互斥,写写互斥
    if (!connection.isValid(2000)) {//是否有效,200ms 没有被超时
    System.out.println("连接无效");
    Connection validConnect = DriverManager.getConnection(jdbcUrl, userName, password);
    pooledConnection.setConnection(validConnect);
    }
    pooledConnection.setBusy(true);
    reentrantReadWriteLock.writeLock().unlock();

    1
    2
    3
    4
    reentrantReadWriteLock.readLock().lock();
    Statement statement = connection.createStatement();
    ResultSet resultSet = statement.executeQuery(sql);
    reentrantReadWriteLock.readLock().unlock();

    使用上面的代码会存在一个性能问题,就是在写入的时候,如果写入的是不同对象,写入也会进行排斥,所以应该对单个PooledConnection使用锁。

  1. 把锁进行碎片化优化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    public class PooledConnection {
    private boolean isBusy = false;
    private Connection connection;
    private boolean isUsing = false;
    private ReentrantReadWriteLock reentrantReadWriteLock;

    public PooledConnection(Connection connection, boolean b) {
    this.isBusy = b;
    this.connection = connection;
    this.isUsing = false;
    reentrantReadWriteLock = new ReentrantReadWriteLock();
    }
    public PooledConnection() {

    reentrantReadWriteLock = new ReentrantReadWriteLock();
    }
    public boolean isBusy() {
    return isBusy;
    }
    public void setBusy(boolean busy) {
    isBusy = busy;
    }
    public Connection getConnection() {
    this.isUsing = true;
    return connection;
    }
    public boolean isUsing() {
    return isUsing;
    }
    public void setUsing(boolean using) {
    isUsing = using;
    }
    public void setConnection(Connection connection) {
    this.connection = connection;
    }
    public void close() {

    this.isUsing = false;
    this.setBusy(false);
    }
    public void shutDown() {
    try {
    this.connection.close();
    } catch (SQLException e) {
    e.printStackTrace();
    }
    }
    public void writeLock() {
    this.reentrantReadWriteLock.writeLock().lock();
    }
    public void unWriteLock() {
    this.reentrantReadWriteLock.writeLock().unlock();
    }
    public void readLock() {
    this.reentrantReadWriteLock.readLock().lock();
    }
    public void unReadLock() {
    this.reentrantReadWriteLock.readLock().unlock();
    }
    }

读的时候加入读锁:

1
2
3
4
5
6
7
8
9
10
11
12
PooledConnection pooledConnection = getPooledConnection();
/*
此处要保证读的时候不能被修改,使用读锁
*/
pooledConnection.readLock();
Connection connection = pooledConnection.getConnection();
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql);
Thread.sleep(1000);
pooledConnection.close();
pooledConnection.unReadLock();
return resultSet;

写入加锁:

1
2
3
4
5
6
7
8
9
pooledConnection.writeLock();//读写互斥,写写互斥
Connection connection = pooledConnection.getConnection();
if (!connection.isValid(200)) {//是否有效,200ms 没有被超时
Connection validConnect = DriverManager.getConnection(jdbcUrl, userName, password);
pooledConnection.setConnection(validConnect);
}
pooledConnection.setBusy(true);
pooledConnection.unWriteLock();
return pooledConnection;

优化后耗时:耗时为:3692ms 。