手写一个jdbc数据库连接池

Java访问mysql的时候,需要用到jdbc驱动,传统连接方式为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
 try {
Driver mysqlDriver = (Driver) Class.forName("com.mysql.jdbc.Driver").newInstance();
DriverManager.registerDriver(mysqlDriver);
Connection connection = DriverManager.getConnection("jdbc:mysql://192.168.0.***:3306/rzframe?useSSL=false&serverTimezone=UTC", "root", "*******");
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery("select * from rz_user");//查询
connection.close();
} catch (Exception e) {
e.printStackTrace();
}

```
我们对上面的代码做一个简单的性能测试,代码如下:

``` Java
public static void main(String[] args) {
long start = System.currentTimeMillis();
CountDownLatch countDownLatch = new CountDownLatch(100);
for (int i = 0; i < 1000; i++) {
try {
CountDownLatch finalCountDownLatch = countDownLatch;
Thread thread = new Thread(() -> {
try {
doJDBC();
} catch (Exception ex) {

} finally {
finalCountDownLatch.countDown();
}
});
thread.start();
if (i != 0 && i % 100 == 0) {
countDownLatch.await();
System.out.println(i);
countDownLatch = new CountDownLatch(100);
}
} catch (Exception ex) {

}
}
long end = System.currentTimeMillis();
System.out.println("耗时:" + (end - start));

}

上面代码用了100个线程分批次去完成查询的动作,在我的机器上运行时间45s左右。

从上面的代码可以看出问题,Connection对象每一次都是重新创建,查询完成后,直接是调用close方法,如果不释放,会报连接数过多的异常。 如果查询多次,那浪费在创建Connection的时间就会很多,我们知道在程序优化的手段中,有一个池化可以很好的解决这个问题。

池化的概念就是先创建多个对方存在在一个容器中,当时候的时候可以直接拿出来时候,用完后再进行归还。 跟着这个思想,我们来创建自己的连接池。

编写思路

  1. 创建一个线程安全的容器(由于是多线程访问),队列或者是list,因为Connection的对象并不是有序的,所以可以使用list容器

  2. 对Connection的对象进行封装,增加一个isBusy变量,每次读取的时候就可以选出空闲的Connection对象

  3. 如果取的时候,没有可用的Connection对象,则可以再自动创建对象,可以自动扩容,直到扩容到允许的最大值。

封装的Connection类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class PooledConnection {

private boolean isBusy=false;
private Connection connection;

public PooledConnection(Connection connection, boolean b) {
this.isBusy=b;
this.connection=connection;
}

public boolean isBusy() {
return isBusy;
}

public void setBusy(boolean busy) {
isBusy = busy;
}

public Connection getConnection() {
return connection;
}

public void setConnection(Connection connection) {
this.connection = connection;
}
public void close() {
this.setBusy(false);
}
}

```

包装好`Connection`后,可以考虑如何对`Connection`进行创建和分配,需要有以下几个方法:

``` Java
PooledConnection getPooledConnection();

void createPooledConnection();
```

为了更好的程序调试,先定义几个初始的参数变量:
``` Java
//数据库相关参数
private static String jdbcDriver = null;
private static String jdbcUrl = null;
private static String userName = null;
private static String password = null;

//容器参数
private static int initCount;//初始数量
private static int stepSize;//每次扩容的数量
private static int poolMaxSize;//最大数量
//全局锁
private static Lock lock;

为了保证线程安全,使用线程安全的Vector集合。

获得对象方法

  1. 获得对象的方法,应该是先找到一个空闲的PooledConnection变量,如果有就直接返回。

  2. 如果没有空闲的变量,则尝试进行扩充,扩充由一个线程完成,其他线程则等待,或者尝试再次获取。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
       public PooledConnection getPooledConnection() throws RuntimeException, SQLException {
    PooledConnection realConnection = getRealConnection();
    while (realConnection == null) {
    if (lock.tryLock()) {//尝试获取锁
    createConnections(stepSize);//只能让一个线程扩容 获得锁之后进行扩容
    lock.unlock();
    } else {
    try {
    Thread.sleep(200);//线程等待
    } catch (InterruptedException e) {
    }
    }
    realConnection = getRealConnection();//再次尝试获取
    if (realConnection != null) {
    return realConnection;
    }
    }
    System.out.println("线程池线程数量:" + PoolsConnections.size());
    return realConnection;
    }

    private PooledConnection getRealConnection() throws SQLException {
    for (PooledConnection pooledConnection : PoolsConnections) {
    try {
    if (pooledConnection.isBusy())
    continue;
    Connection connection = pooledConnection.getConnection();
    if (!connection.isValid(200)) {//是否有效,200ms 没有被超时
    System.out.println("连接无效");
    Connection validConnect = DriverManager.getConnection(jdbcUrl, userName, password);
    pooledConnection.setConnection(validConnect);
    }
    pooledConnection.setBusy(true);
    return pooledConnection;
    } catch (SQLException e) {
    return null;
    }
    }
    return null;
    }

    ```

    #### 扩容方法对象

    扩容的方法相对比较简单,判断当前对象数量有没有溢出,如果没有溢出,就进行扩容

    ``` Java
    public void createConnections(int count) throws OutofMaxCountException, IllegalArgumentException {
    if (poolMaxSize <= 0) {
    System.out.println("创建管道对象失败,最大值参数错误");
    throw new IllegalArgumentException("创建管道对象失败,最大值参数错误");
    }
    //判断是否有溢出
    boolean overFlow = isOverFlow(count);
    if (overFlow) {
    return;
    }
    System.out.println("扩容");
    for (int i = 0; i < count; i++) {
    try {
    overFlow = isOverFlow(count);
    if (overFlow)
    return;
    Connection connection = DriverManager.getConnection(jdbcUrl, userName, password);
    PooledConnection pooledConnection = new PooledConnection(connection, false);
    PoolsConnections.add(pooledConnection);
    } catch (SQLException e) {
    e.printStackTrace();
    }
    }
    System.out.println("扩容数量:" + PoolsConnections.size());
    }

    private boolean isOverFlow(int count) {
    if (PoolsConnections.size() + count >= poolMaxSize) {
    return true;
    }
    return false;
    }

    ```

    上面的代码隐藏一个问题,我们增加对数据的查询方法,方便我们测试。 查询方法如下:

    ``` Java
    public ResultSet querySql(String sql) {
    try {
    PooledConnection pooledConnection = getPooledConnection();
    Connection connection = pooledConnection.getConnection();
    Statement statement = connection.createStatement();
    ResultSet resultSet = statement.executeQuery(sql);
    Thread.sleep(1000);
    pooledConnection.close();
    return resultSet;
    } catch (Exception e) {

    }
    return null;
    }
    ```

    我们对代码做性能测试同样的测试,在我的电脑运行时间为5s左右,大概快了10倍。 但经过多次测试,代码抛出了ConcurrentModificationException异常,这个异常的原因是因为在使用的时候,我们又修改了正在使用的对象。所以在使用的时候要对对象进行加一个读写锁。

    为了锁不至于影响到锁的性能,我们把锁碎片化,采用针对每一个对象进行加锁,而不是全局加锁。修改后的封装对象:

    ``` Java
    public class PooledConnection {
    private boolean isBusy = false;
    private Connection connection;
    private ReentrantReadWriteLock reentrantReadWriteLock;

    public PooledConnection(Connection connection, boolean b) {
    this.connection = connection;
    reentrantReadWriteLock = new ReentrantReadWriteLock();
    }
    public boolean isBusy() {
    return isBusy;
    }

    public void setBusy(boolean busy) {
    isBusy = busy;
    }

    public Connection getConnection() {
    return connection;
    }
    public void setConnection(Connection connection) {
    this.connection = connection;
    }
    public void close() {
    this.setBusy(false);
    }
    public void shutDown() {
    try {
    this.connection.close();
    } catch (SQLException e) {
    e.printStackTrace();
    }
    }

    //增加读写锁的操作
    public void writeLock() {
    this.reentrantReadWriteLock.writeLock().lock();
    }
    public void unWriteLock() {
    this.reentrantReadWriteLock.writeLock().unlock();
    }
    public void readLock() {
    this.reentrantReadWriteLock.readLock().lock();
    }
    public void unReadLock() {
    this.reentrantReadWriteLock.readLock().unlock();
    }
    }
    ```

    最终结果:

    ``` Java
    public PooledConnection getPooledConnection() throws RuntimeException, SQLException {
    if (poolMaxSize <= 0) {
    System.out.println("创建管道对象失败,最大值参数错误");
    throw new IllegalArgumentException("创建管道对象失败,最大值参数错误");
    }
    PooledConnection realConnection = getRealConnection();
    while (realConnection == null) {
    if (lock.tryLock()) {//尝试获取锁
    createConnections(stepSize);//获得锁之后进行扩容
    lock.unlock();
    } else {
    try {
    Thread.sleep(200);
    } catch (InterruptedException e) {
    }
    }
    realConnection = getRealConnection();
    if (realConnection != null) {
    return realConnection;
    }
    }

    return realConnection;
    }

    private PooledConnection getRealConnection() {
    for (PooledConnection pooledConnection : PoolsConnections) {
    try {
    if (pooledConnection.isBusy())
    continue;
    /*
    此处要保证写的时候不能被读取,不然会报ConcurrentModificationException异常
    */
    pooledConnection.writeLock();//读写互斥,写写互斥
    Connection connection = pooledConnection.getConnection();
    if (!connection.isValid(200)) {//是否有效,200ms 没有被超时
    Connection validConnect = DriverManager.getConnection(jdbcUrl, userName, password);
    pooledConnection.setConnection(validConnect);
    }
    pooledConnection.setBusy(true);
    pooledConnection.unWriteLock();
    return pooledConnection;
    } catch (SQLException e) {

    return null;
    }
    }
    return null;
    }
    public void createConnections(int count) throws OutofMaxCountException, IllegalArgumentException {
    if (poolMaxSize <= 0) {
    System.out.println("创建管道对象失败,最大值参数错误");
    throw new IllegalArgumentException("创建管道对象失败,最大值参数错误");
    }
    //判断是否有溢出
    boolean overFlow = isOverFlow(count);
    if (overFlow) {
    return;
    }
    System.out.println("扩容");
    for (int i = 0; i < count; i++) {
    try {
    overFlow = isOverFlow(count);
    if (overFlow)
    return;
    Connection connection = DriverManager.getConnection(jdbcUrl, userName, password);
    PooledConnection pooledConnection = new PooledConnection(connection, false);
    PoolsConnections.add(pooledConnection);
    } catch (SQLException e) {

    }
    }
    System.out.println("扩容数量:" + PoolsConnections.size());
    }

    private boolean isOverFlow(int count) {
    if (PoolsConnections.size() + count >= poolMaxSize) {
    return true;
    }
    return false;
    }

碰到问题

  1. 首先是无法控制连接最大的数量 ,问题出在扩容没有控制一个线程扩容,使用tryLock解决,代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    while (realConnection == null) {
    if (lock.tryLock()) {//尝试获取锁
    createConnections(stepSize);//只能让一个线程扩容 获得锁之后进行扩容
    lock.unlock();
    } else {
    try {
    Thread.sleep(200);
    } catch (InterruptedException e) {
    }
    }
    realConnection = getRealConnection();
    if (realConnection != null) {
    return realConnection;
    }
    }

  2. ConcurrentModificationException异常,在读取的使用的时候,对象有写入操作,需要保证读取可以并发,读写不能一起,写不同对象是可以并发,使用读写锁可以解决:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
        reentrantReadWriteLock.writeLock().lock();//读写互斥,写写互斥
    if (!connection.isValid(2000)) {//是否有效,200ms 没有被超时
    System.out.println("连接无效");
    Connection validConnect = DriverManager.getConnection(jdbcUrl, userName, password);
    pooledConnection.setConnection(validConnect);
    }
    pooledConnection.setBusy(true);
    reentrantReadWriteLock.writeLock().unlock();

    ```

    ``` Java
    reentrantReadWriteLock.readLock().lock();
    Statement statement = connection.createStatement();
    ResultSet resultSet = statement.executeQuery(sql);
    reentrantReadWriteLock.readLock().unlock();
    ```
    使用上面的代码会存在一个性能问题,就是在写入的时候,如果写入的是不同对象,写入也会进行排斥,所以应该对单个`PooledConnection`使用锁。




    3. 把锁进行碎片化优化

    ``` Java
    public class PooledConnection {
    private boolean isBusy = false;
    private Connection connection;
    private boolean isUsing = false;
    private ReentrantReadWriteLock reentrantReadWriteLock;

    public PooledConnection(Connection connection, boolean b) {
    this.isBusy = b;
    this.connection = connection;
    this.isUsing = false;
    reentrantReadWriteLock = new ReentrantReadWriteLock();
    }
    public PooledConnection() {

    reentrantReadWriteLock = new ReentrantReadWriteLock();
    }
    public boolean isBusy() {
    return isBusy;
    }
    public void setBusy(boolean busy) {
    isBusy = busy;
    }
    public Connection getConnection() {
    this.isUsing = true;
    return connection;
    }
    public boolean isUsing() {
    return isUsing;
    }
    public void setUsing(boolean using) {
    isUsing = using;
    }
    public void setConnection(Connection connection) {
    this.connection = connection;
    }
    public void close() {

    this.isUsing = false;
    this.setBusy(false);
    }
    public void shutDown() {
    try {
    this.connection.close();
    } catch (SQLException e) {
    e.printStackTrace();
    }
    }
    public void writeLock() {
    this.reentrantReadWriteLock.writeLock().lock();
    }
    public void unWriteLock() {
    this.reentrantReadWriteLock.writeLock().unlock();
    }
    public void readLock() {
    this.reentrantReadWriteLock.readLock().lock();
    }
    public void unReadLock() {
    this.reentrantReadWriteLock.readLock().unlock();
    }
    }

    ```

    读的时候加入读锁:

    ``` Java
    PooledConnection pooledConnection = getPooledConnection();
    /*
    此处要保证读的时候不能被修改,使用读锁
    */
    pooledConnection.readLock();
    Connection connection = pooledConnection.getConnection();
    Statement statement = connection.createStatement();
    ResultSet resultSet = statement.executeQuery(sql);
    Thread.sleep(1000);
    pooledConnection.close();
    pooledConnection.unReadLock();
    return resultSet;
    ```

    写入加锁:

    ``` Java
    pooledConnection.writeLock();//读写互斥,写写互斥
    Connection connection = pooledConnection.getConnection();
    if (!connection.isValid(200)) {//是否有效,200ms 没有被超时
    Connection validConnect = DriverManager.getConnection(jdbcUrl, userName, password);
    pooledConnection.setConnection(validConnect);
    }
    pooledConnection.setBusy(true);
    pooledConnection.unWriteLock();
    return pooledConnection;

    优化后耗时:耗时为:3692ms 。

作者

付威

发布于

2019-07-06

更新于

2020-08-10

许可协议

评论