在Java
访问mysql的时候,需要用到jdbc驱动,传统连接方式为:
1 2 3 4 5 6 7 8 9 10 11
| try { Driver mysqlDriver = (Driver) Class.forName("com.mysql.jdbc.Driver").newInstance(); DriverManager.registerDriver(mysqlDriver); Connection connection = DriverManager.getConnection("jdbc:mysql://192.168.0.***:3306/rzframe?useSSL=false&serverTimezone=UTC", "root", "*******"); Statement statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery("select * from rz_user"); connection.close(); } catch (Exception e) { e.printStackTrace(); }
|
我们对上面的代码做一个简单的性能测试,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public static void main(String[] args) { long start = System.currentTimeMillis(); CountDownLatch countDownLatch = new CountDownLatch(100); for (int i = 0; i < 1000; i++) { try { CountDownLatch finalCountDownLatch = countDownLatch; Thread thread = new Thread(() -> { try { doJDBC(); } catch (Exception ex) {
} finally { finalCountDownLatch.countDown(); } }); thread.start(); if (i != 0 && i % 100 == 0) { countDownLatch.await(); System.out.println(i); countDownLatch = new CountDownLatch(100); } } catch (Exception ex) {
} } long end = System.currentTimeMillis(); System.out.println("耗时:" + (end - start));
}
|
上面代码用了100个线程分批次去完成查询的动作,在我的机器上运行时间45s左右。
从上面的代码可以看出问题,Connection
对象每一次都是重新创建,查询完成后,直接是调用close方法,如果不释放,会报连接数过多的异常。 如果查询多次,那浪费在创建Connection
的时间就会很多,我们知道在程序优化的手段中,有一个池化
可以很好的解决这个问题。
池化
的概念就是先创建多个对方存在在一个容器中,当时候的时候可以直接拿出来时候,用完后再进行归还。 跟着这个思想,我们来创建自己的连接池。
编写思路
创建一个线程安全的容器(由于是多线程访问),队列或者是list,因为Connection的对象并不是有序的,所以可以使用list容器
对Connection的对象进行封装,增加一个isBusy变量,每次读取的时候就可以选出空闲的Connection对象
如果取的时候,没有可用的Connection对象,则可以再自动创建对象,可以自动扩容,直到扩容到允许的最大值。
封装的Connection类
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| public class PooledConnection {
private boolean isBusy=false; private Connection connection;
public PooledConnection(Connection connection, boolean b) { this.isBusy=b; this.connection=connection; }
public boolean isBusy() { return isBusy; }
public void setBusy(boolean busy) { isBusy = busy; }
public Connection getConnection() { return connection; }
public void setConnection(Connection connection) { this.connection = connection; } public void close() { this.setBusy(false); } }
|
包装好Connection
后,可以考虑如何对Connection
进行创建和分配,需要有以下几个方法:
1 2 3
| PooledConnection getPooledConnection();
void createPooledConnection();
|
为了更好的程序调试,先定义几个初始的参数变量:
1 2 3 4 5 6 7 8 9 10 11 12
| private static String jdbcDriver = null; private static String jdbcUrl = null; private static String userName = null; private static String password = null; private static int initCount; private static int stepSize; private static int poolMaxSize; private static Lock lock;
|
为了保证线程安全,使用线程安全的Vector
集合。
获得对象方法
获得对象的方法,应该是先找到一个空闲的PooledConnection变量,如果有就直接返回。
如果没有空闲的变量,则尝试进行扩充,扩充由一个线程完成,其他线程则等待,或者尝试再次获取。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public PooledConnection getPooledConnection() throws RuntimeException, SQLException { PooledConnection realConnection = getRealConnection(); while (realConnection == null) { if (lock.tryLock()) { createConnections(stepSize); lock.unlock(); } else { try { Thread.sleep(200); } catch (InterruptedException e) { } } realConnection = getRealConnection(); if (realConnection != null) { return realConnection; } } System.out.println("线程池线程数量:" + PoolsConnections.size()); return realConnection; }
private PooledConnection getRealConnection() throws SQLException { for (PooledConnection pooledConnection : PoolsConnections) { try { if (pooledConnection.isBusy()) continue; Connection connection = pooledConnection.getConnection(); if (!connection.isValid(200)) { System.out.println("连接无效"); Connection validConnect = DriverManager.getConnection(jdbcUrl, userName, password); pooledConnection.setConnection(validConnect); } pooledConnection.setBusy(true); return pooledConnection; } catch (SQLException e) { return null; } } return null; }
|
扩容方法对象
扩容的方法相对比较简单,判断当前对象数量有没有溢出,如果没有溢出,就进行扩容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public void createConnections(int count) throws OutofMaxCountException, IllegalArgumentException { if (poolMaxSize <= 0) { System.out.println("创建管道对象失败,最大值参数错误"); throw new IllegalArgumentException("创建管道对象失败,最大值参数错误"); } boolean overFlow = isOverFlow(count); if (overFlow) { return; } System.out.println("扩容"); for (int i = 0; i < count; i++) { try { overFlow = isOverFlow(count); if (overFlow) return; Connection connection = DriverManager.getConnection(jdbcUrl, userName, password); PooledConnection pooledConnection = new PooledConnection(connection, false); PoolsConnections.add(pooledConnection); } catch (SQLException e) { e.printStackTrace(); } } System.out.println("扩容数量:" + PoolsConnections.size()); }
private boolean isOverFlow(int count) { if (PoolsConnections.size() + count >= poolMaxSize) { return true; } return false; }
|
上面的代码隐藏一个问题,我们增加对数据的查询方法,方便我们测试。 查询方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public ResultSet querySql(String sql) { try { PooledConnection pooledConnection = getPooledConnection(); Connection connection = pooledConnection.getConnection(); Statement statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); Thread.sleep(1000); pooledConnection.close(); return resultSet; } catch (Exception e) {
} return null; }
|
我们对代码做性能测试同样的测试,在我的电脑运行时间为5s左右,大概快了10倍。 但经过多次测试,代码抛出了ConcurrentModificationException异常,这个异常的原因是因为在使用的时候,我们又修改了正在使用的对象。所以在使用的时候要对对象进行加一个读写锁。
为了锁不至于影响到锁的性能,我们把锁碎片化,采用针对每一个对象进行加锁,而不是全局加锁。修改后的封装对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| public class PooledConnection { private boolean isBusy = false; private Connection connection; private ReentrantReadWriteLock reentrantReadWriteLock;
public PooledConnection(Connection connection, boolean b) { this.connection = connection; reentrantReadWriteLock = new ReentrantReadWriteLock(); } public boolean isBusy() { return isBusy; }
public void setBusy(boolean busy) { isBusy = busy; }
public Connection getConnection() { return connection; } public void setConnection(Connection connection) { this.connection = connection; } public void close() { this.setBusy(false); } public void shutDown() { try { this.connection.close(); } catch (SQLException e) { e.printStackTrace(); } }
public void writeLock() { this.reentrantReadWriteLock.writeLock().lock(); } public void unWriteLock() { this.reentrantReadWriteLock.writeLock().unlock(); } public void readLock() { this.reentrantReadWriteLock.readLock().lock(); } public void unReadLock() { this.reentrantReadWriteLock.readLock().unlock(); } }
|
最终结果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| public PooledConnection getPooledConnection() throws RuntimeException, SQLException { if (poolMaxSize <= 0) { System.out.println("创建管道对象失败,最大值参数错误"); throw new IllegalArgumentException("创建管道对象失败,最大值参数错误"); } PooledConnection realConnection = getRealConnection(); while (realConnection == null) { if (lock.tryLock()) { createConnections(stepSize); lock.unlock(); } else { try { Thread.sleep(200); } catch (InterruptedException e) { } } realConnection = getRealConnection(); if (realConnection != null) { return realConnection; } }
return realConnection; }
private PooledConnection getRealConnection() { for (PooledConnection pooledConnection : PoolsConnections) { try { if (pooledConnection.isBusy()) continue;
pooledConnection.writeLock(); Connection connection = pooledConnection.getConnection(); if (!connection.isValid(200)) { Connection validConnect = DriverManager.getConnection(jdbcUrl, userName, password); pooledConnection.setConnection(validConnect); } pooledConnection.setBusy(true); pooledConnection.unWriteLock(); return pooledConnection; } catch (SQLException e) {
return null; } } return null; } public void createConnections(int count) throws OutofMaxCountException, IllegalArgumentException { if (poolMaxSize <= 0) { System.out.println("创建管道对象失败,最大值参数错误"); throw new IllegalArgumentException("创建管道对象失败,最大值参数错误"); } boolean overFlow = isOverFlow(count); if (overFlow) { return; } System.out.println("扩容"); for (int i = 0; i < count; i++) { try { overFlow = isOverFlow(count); if (overFlow) return; Connection connection = DriverManager.getConnection(jdbcUrl, userName, password); PooledConnection pooledConnection = new PooledConnection(connection, false); PoolsConnections.add(pooledConnection); } catch (SQLException e) {
} } System.out.println("扩容数量:" + PoolsConnections.size()); }
private boolean isOverFlow(int count) { if (PoolsConnections.size() + count >= poolMaxSize) { return true; } return false; }
|
碰到问题
首先是无法控制连接最大的数量 ,问题出在扩容没有控制一个线程扩容,使用tryLock
解决,代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| while (realConnection == null) { if (lock.tryLock()) { createConnections(stepSize); lock.unlock(); } else { try { Thread.sleep(200); } catch (InterruptedException e) { } } realConnection = getRealConnection(); if (realConnection != null) { return realConnection; } }
|
ConcurrentModificationException异常,在读取的使用的时候,对象有写入操作,需要保证读取可以并发,读写不能一起,写不同对象是可以并发,使用读写锁可以解决:
1 2 3 4 5 6 7 8 9
| reentrantReadWriteLock.writeLock().lock(); if (!connection.isValid(2000)) { System.out.println("连接无效"); Connection validConnect = DriverManager.getConnection(jdbcUrl, userName, password); pooledConnection.setConnection(validConnect); } pooledConnection.setBusy(true); reentrantReadWriteLock.writeLock().unlock();
|
1 2 3 4
| reentrantReadWriteLock.readLock().lock(); Statement statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); reentrantReadWriteLock.readLock().unlock();
|
使用上面的代码会存在一个性能问题,就是在写入的时候,如果写入的是不同对象,写入也会进行排斥,所以应该对单个PooledConnection
使用锁。
把锁进行碎片化优化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| public class PooledConnection { private boolean isBusy = false; private Connection connection; private boolean isUsing = false; private ReentrantReadWriteLock reentrantReadWriteLock;
public PooledConnection(Connection connection, boolean b) { this.isBusy = b; this.connection = connection; this.isUsing = false; reentrantReadWriteLock = new ReentrantReadWriteLock(); } public PooledConnection() {
reentrantReadWriteLock = new ReentrantReadWriteLock(); } public boolean isBusy() { return isBusy; } public void setBusy(boolean busy) { isBusy = busy; } public Connection getConnection() { this.isUsing = true; return connection; } public boolean isUsing() { return isUsing; } public void setUsing(boolean using) { isUsing = using; } public void setConnection(Connection connection) { this.connection = connection; } public void close() {
this.isUsing = false; this.setBusy(false); } public void shutDown() { try { this.connection.close(); } catch (SQLException e) { e.printStackTrace(); } } public void writeLock() { this.reentrantReadWriteLock.writeLock().lock(); } public void unWriteLock() { this.reentrantReadWriteLock.writeLock().unlock(); } public void readLock() { this.reentrantReadWriteLock.readLock().lock(); } public void unReadLock() { this.reentrantReadWriteLock.readLock().unlock(); } }
|
读的时候加入读锁:
1 2 3 4 5 6 7 8 9 10 11 12
| PooledConnection pooledConnection = getPooledConnection();
pooledConnection.readLock(); Connection connection = pooledConnection.getConnection(); Statement statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); Thread.sleep(1000); pooledConnection.close(); pooledConnection.unReadLock(); return resultSet;
|
写入加锁:
1 2 3 4 5 6 7 8 9
| pooledConnection.writeLock(); Connection connection = pooledConnection.getConnection(); if (!connection.isValid(200)) { Connection validConnect = DriverManager.getConnection(jdbcUrl, userName, password); pooledConnection.setConnection(validConnect); } pooledConnection.setBusy(true); pooledConnection.unWriteLock(); return pooledConnection;
|
优化后耗时:耗时为:3692ms 。