手写一个jdbc数据库连接池
在Java
访问mysql的时候,需要用到jdbc驱动,传统连接方式为:
1 | try { |
上面代码用了100个线程分批次去完成查询的动作,在我的机器上运行时间45s左右。
从上面的代码可以看出问题,Connection
对象每一次都是重新创建,查询完成后,直接是调用close方法,如果不释放,会报连接数过多的异常。 如果查询多次,那浪费在创建Connection
的时间就会很多,我们知道在程序优化的手段中,有一个池化
可以很好的解决这个问题。
池化
的概念就是先创建多个对方存在在一个容器中,当时候的时候可以直接拿出来时候,用完后再进行归还。 跟着这个思想,我们来创建自己的连接池。
编写思路
创建一个线程安全的容器(由于是多线程访问),队列或者是list,因为Connection的对象并不是有序的,所以可以使用list容器
对Connection的对象进行封装,增加一个isBusy变量,每次读取的时候就可以选出空闲的Connection对象
如果取的时候,没有可用的Connection对象,则可以再自动创建对象,可以自动扩容,直到扩容到允许的最大值。
封装的Connection类
:
1 | public class PooledConnection { |
为了保证线程安全,使用线程安全的Vector
集合。
获得对象方法
获得对象的方法,应该是先找到一个空闲的PooledConnection变量,如果有就直接返回。
如果没有空闲的变量,则尝试进行扩充,扩充由一个线程完成,其他线程则等待,或者尝试再次获取。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242public PooledConnection getPooledConnection() throws RuntimeException, SQLException {
PooledConnection realConnection = getRealConnection();
while (realConnection == null) {
if (lock.tryLock()) {//尝试获取锁
createConnections(stepSize);//只能让一个线程扩容 获得锁之后进行扩容
lock.unlock();
} else {
try {
Thread.sleep(200);//线程等待
} catch (InterruptedException e) {
}
}
realConnection = getRealConnection();//再次尝试获取
if (realConnection != null) {
return realConnection;
}
}
System.out.println("线程池线程数量:" + PoolsConnections.size());
return realConnection;
}
private PooledConnection getRealConnection() throws SQLException {
for (PooledConnection pooledConnection : PoolsConnections) {
try {
if (pooledConnection.isBusy())
continue;
Connection connection = pooledConnection.getConnection();
if (!connection.isValid(200)) {//是否有效,200ms 没有被超时
System.out.println("连接无效");
Connection validConnect = DriverManager.getConnection(jdbcUrl, userName, password);
pooledConnection.setConnection(validConnect);
}
pooledConnection.setBusy(true);
return pooledConnection;
} catch (SQLException e) {
return null;
}
}
return null;
}
```
#### 扩容方法对象
扩容的方法相对比较简单,判断当前对象数量有没有溢出,如果没有溢出,就进行扩容
``` Java
public void createConnections(int count) throws OutofMaxCountException, IllegalArgumentException {
if (poolMaxSize <= 0) {
System.out.println("创建管道对象失败,最大值参数错误");
throw new IllegalArgumentException("创建管道对象失败,最大值参数错误");
}
//判断是否有溢出
boolean overFlow = isOverFlow(count);
if (overFlow) {
return;
}
System.out.println("扩容");
for (int i = 0; i < count; i++) {
try {
overFlow = isOverFlow(count);
if (overFlow)
return;
Connection connection = DriverManager.getConnection(jdbcUrl, userName, password);
PooledConnection pooledConnection = new PooledConnection(connection, false);
PoolsConnections.add(pooledConnection);
} catch (SQLException e) {
e.printStackTrace();
}
}
System.out.println("扩容数量:" + PoolsConnections.size());
}
private boolean isOverFlow(int count) {
if (PoolsConnections.size() + count >= poolMaxSize) {
return true;
}
return false;
}
```
上面的代码隐藏一个问题,我们增加对数据的查询方法,方便我们测试。 查询方法如下:
``` Java
public ResultSet querySql(String sql) {
try {
PooledConnection pooledConnection = getPooledConnection();
Connection connection = pooledConnection.getConnection();
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql);
Thread.sleep(1000);
pooledConnection.close();
return resultSet;
} catch (Exception e) {
}
return null;
}
```
我们对代码做性能测试同样的测试,在我的电脑运行时间为5s左右,大概快了10倍。 但经过多次测试,代码抛出了ConcurrentModificationException异常,这个异常的原因是因为在使用的时候,我们又修改了正在使用的对象。所以在使用的时候要对对象进行加一个读写锁。
为了锁不至于影响到锁的性能,我们把锁碎片化,采用针对每一个对象进行加锁,而不是全局加锁。修改后的封装对象:
``` Java
public class PooledConnection {
private boolean isBusy = false;
private Connection connection;
private ReentrantReadWriteLock reentrantReadWriteLock;
public PooledConnection(Connection connection, boolean b) {
this.connection = connection;
reentrantReadWriteLock = new ReentrantReadWriteLock();
}
public boolean isBusy() {
return isBusy;
}
public void setBusy(boolean busy) {
isBusy = busy;
}
public Connection getConnection() {
return connection;
}
public void setConnection(Connection connection) {
this.connection = connection;
}
public void close() {
this.setBusy(false);
}
public void shutDown() {
try {
this.connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
//增加读写锁的操作
public void writeLock() {
this.reentrantReadWriteLock.writeLock().lock();
}
public void unWriteLock() {
this.reentrantReadWriteLock.writeLock().unlock();
}
public void readLock() {
this.reentrantReadWriteLock.readLock().lock();
}
public void unReadLock() {
this.reentrantReadWriteLock.readLock().unlock();
}
}
```
最终结果:
``` Java
public PooledConnection getPooledConnection() throws RuntimeException, SQLException {
if (poolMaxSize <= 0) {
System.out.println("创建管道对象失败,最大值参数错误");
throw new IllegalArgumentException("创建管道对象失败,最大值参数错误");
}
PooledConnection realConnection = getRealConnection();
while (realConnection == null) {
if (lock.tryLock()) {//尝试获取锁
createConnections(stepSize);//获得锁之后进行扩容
lock.unlock();
} else {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
}
realConnection = getRealConnection();
if (realConnection != null) {
return realConnection;
}
}
return realConnection;
}
private PooledConnection getRealConnection() {
for (PooledConnection pooledConnection : PoolsConnections) {
try {
if (pooledConnection.isBusy())
continue;
/*
此处要保证写的时候不能被读取,不然会报ConcurrentModificationException异常
*/
pooledConnection.writeLock();//读写互斥,写写互斥
Connection connection = pooledConnection.getConnection();
if (!connection.isValid(200)) {//是否有效,200ms 没有被超时
Connection validConnect = DriverManager.getConnection(jdbcUrl, userName, password);
pooledConnection.setConnection(validConnect);
}
pooledConnection.setBusy(true);
pooledConnection.unWriteLock();
return pooledConnection;
} catch (SQLException e) {
return null;
}
}
return null;
}
public void createConnections(int count) throws OutofMaxCountException, IllegalArgumentException {
if (poolMaxSize <= 0) {
System.out.println("创建管道对象失败,最大值参数错误");
throw new IllegalArgumentException("创建管道对象失败,最大值参数错误");
}
//判断是否有溢出
boolean overFlow = isOverFlow(count);
if (overFlow) {
return;
}
System.out.println("扩容");
for (int i = 0; i < count; i++) {
try {
overFlow = isOverFlow(count);
if (overFlow)
return;
Connection connection = DriverManager.getConnection(jdbcUrl, userName, password);
PooledConnection pooledConnection = new PooledConnection(connection, false);
PoolsConnections.add(pooledConnection);
} catch (SQLException e) {
}
}
System.out.println("扩容数量:" + PoolsConnections.size());
}
private boolean isOverFlow(int count) {
if (PoolsConnections.size() + count >= poolMaxSize) {
return true;
}
return false;
}
碰到问题
首先是无法控制连接最大的数量 ,问题出在扩容没有控制一个线程扩容,使用
tryLock
解决,代码如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16while (realConnection == null) {
if (lock.tryLock()) {//尝试获取锁
createConnections(stepSize);//只能让一个线程扩容 获得锁之后进行扩容
lock.unlock();
} else {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
}
realConnection = getRealConnection();
if (realConnection != null) {
return realConnection;
}
}ConcurrentModificationException异常,在读取的使用的时候,对象有写入操作,需要保证读取可以并发,读写不能一起,写不同对象是可以并发,使用读写锁可以解决:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117reentrantReadWriteLock.writeLock().lock();//读写互斥,写写互斥
if (!connection.isValid(2000)) {//是否有效,200ms 没有被超时
System.out.println("连接无效");
Connection validConnect = DriverManager.getConnection(jdbcUrl, userName, password);
pooledConnection.setConnection(validConnect);
}
pooledConnection.setBusy(true);
reentrantReadWriteLock.writeLock().unlock();
```
``` Java
reentrantReadWriteLock.readLock().lock();
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql);
reentrantReadWriteLock.readLock().unlock();
```
使用上面的代码会存在一个性能问题,就是在写入的时候,如果写入的是不同对象,写入也会进行排斥,所以应该对单个`PooledConnection`使用锁。
3. 把锁进行碎片化优化
``` Java
public class PooledConnection {
private boolean isBusy = false;
private Connection connection;
private boolean isUsing = false;
private ReentrantReadWriteLock reentrantReadWriteLock;
public PooledConnection(Connection connection, boolean b) {
this.isBusy = b;
this.connection = connection;
this.isUsing = false;
reentrantReadWriteLock = new ReentrantReadWriteLock();
}
public PooledConnection() {
reentrantReadWriteLock = new ReentrantReadWriteLock();
}
public boolean isBusy() {
return isBusy;
}
public void setBusy(boolean busy) {
isBusy = busy;
}
public Connection getConnection() {
this.isUsing = true;
return connection;
}
public boolean isUsing() {
return isUsing;
}
public void setUsing(boolean using) {
isUsing = using;
}
public void setConnection(Connection connection) {
this.connection = connection;
}
public void close() {
this.isUsing = false;
this.setBusy(false);
}
public void shutDown() {
try {
this.connection.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
public void writeLock() {
this.reentrantReadWriteLock.writeLock().lock();
}
public void unWriteLock() {
this.reentrantReadWriteLock.writeLock().unlock();
}
public void readLock() {
this.reentrantReadWriteLock.readLock().lock();
}
public void unReadLock() {
this.reentrantReadWriteLock.readLock().unlock();
}
}
```
读的时候加入读锁:
``` Java
PooledConnection pooledConnection = getPooledConnection();
/*
此处要保证读的时候不能被修改,使用读锁
*/
pooledConnection.readLock();
Connection connection = pooledConnection.getConnection();
Statement statement = connection.createStatement();
ResultSet resultSet = statement.executeQuery(sql);
Thread.sleep(1000);
pooledConnection.close();
pooledConnection.unReadLock();
return resultSet;
```
写入加锁:
``` Java
pooledConnection.writeLock();//读写互斥,写写互斥
Connection connection = pooledConnection.getConnection();
if (!connection.isValid(200)) {//是否有效,200ms 没有被超时
Connection validConnect = DriverManager.getConnection(jdbcUrl, userName, password);
pooledConnection.setConnection(validConnect);
}
pooledConnection.setBusy(true);
pooledConnection.unWriteLock();
return pooledConnection;优化后耗时:耗时为:3692ms 。