手写一个jdbc数据库连接池

付威     2019-07-06   14698   41min  

Java访问mysql的时候,需要用到jdbc驱动,传统连接方式为:

 try {
    Driver mysqlDriver = (Driver) Class.forName("com.mysql.jdbc.Driver").newInstance();
    DriverManager.registerDriver(mysqlDriver);
    Connection connection = DriverManager.getConnection("jdbc:mysql://192.168.0.***:3306/rzframe?useSSL=false&serverTimezone=UTC", "root", "*******");
    Statement statement = connection.createStatement();
    ResultSet resultSet = statement.executeQuery("select * from rz_user");//查询
    connection.close();
  } catch (Exception e) {
    e.printStackTrace();
 }

我们对上面的代码做一个简单的性能测试,代码如下:

  public static void main(String[] args) {
        long start = System.currentTimeMillis();
        CountDownLatch countDownLatch = new CountDownLatch(100);
        for (int i = 0; i < 1000; i++) {
            try {
                CountDownLatch finalCountDownLatch = countDownLatch;
                Thread thread = new Thread(() -> {
                    try {
                       doJDBC();
                    } catch (Exception ex) {

                    } finally {
                        finalCountDownLatch.countDown();
                    }
                });
                thread.start();
                if (i != 0 && i % 100 == 0) {
                    countDownLatch.await();
                    System.out.println(i);
                    countDownLatch = new CountDownLatch(100);
                }
            } catch (Exception ex) {

            }
        }
        long end = System.currentTimeMillis();
        System.out.println("耗时:" + (end - start));

    }

上面代码用了100个线程分批次去完成查询的动作,在我的机器上运行时间45s左右。

从上面的代码可以看出问题,Connection对象每一次都是重新创建,查询完成后,直接是调用close方法,如果不释放,会报连接数过多的异常。 如果查询多次,那浪费在创建Connection的时间就会很多,我们知道在程序优化的手段中,有一个池化可以很好的解决这个问题。

池化的概念就是先创建多个对方存在在一个容器中,当时候的时候可以直接拿出来时候,用完后再进行归还。 跟着这个思想,我们来创建自己的连接池。

编写思路

  1. 创建一个线程安全的容器(由于是多线程访问),队列或者是list,因为Connection的对象并不是有序的,所以可以使用list容器

  2. 对Connection的对象进行封装,增加一个isBusy变量,每次读取的时候就可以选出空闲的Connection对象

  3. 如果取的时候,没有可用的Connection对象,则可以再自动创建对象,可以自动扩容,直到扩容到允许的最大值。

封装的Connection类:

public class PooledConnection {

    private boolean isBusy=false;
    private Connection connection;

    public PooledConnection(Connection connection, boolean b) {
        this.isBusy=b;
        this.connection=connection;
    }

    public boolean isBusy() {
        return isBusy;
    }

    public void setBusy(boolean busy) {
        isBusy = busy;
    }

    public Connection getConnection() {
        return connection;
    }

    public void setConnection(Connection connection) {
        this.connection = connection;
    }
    public void close() {
        this.setBusy(false);
    }
}

包装好Connection后,可以考虑如何对Connection进行创建和分配,需要有以下几个方法:

PooledConnection getPooledConnection();

void  createPooledConnection();

为了更好的程序调试,先定义几个初始的参数变量:

 //数据库相关参数
    private static String jdbcDriver = null;
    private static String jdbcUrl = null;
    private static String userName = null;
    private static String password = null;
    
    //容器参数
    private static int initCount;//初始数量
    private static int stepSize;//每次扩容的数量
    private static int poolMaxSize;//最大数量
    //全局锁
     private static Lock lock;

为了保证线程安全,使用线程安全的Vector集合。

获得对象方法

  1. 获得对象的方法,应该是先找到一个空闲的PooledConnection变量,如果有就直接返回。

  2. 如果没有空闲的变量,则尝试进行扩充,扩充由一个线程完成,其他线程则等待,或者尝试再次获取。

   public PooledConnection getPooledConnection() throws RuntimeException, SQLException {
        PooledConnection realConnection = getRealConnection();
        while (realConnection == null) {
            if (lock.tryLock()) {//尝试获取锁
                createConnections(stepSize);//只能让一个线程扩容 获得锁之后进行扩容
                lock.unlock();
            } else {
                try {
                    Thread.sleep(200);//线程等待
                } catch (InterruptedException e) {
                }
            }
            realConnection = getRealConnection();//再次尝试获取
            if (realConnection != null) {
                return realConnection;
            }
        }
        System.out.println("线程池线程数量:" + PoolsConnections.size());
        return realConnection;
    }

    private PooledConnection getRealConnection() throws SQLException {
        for (PooledConnection pooledConnection : PoolsConnections) {
            try {
                if (pooledConnection.isBusy())
                    continue;
                Connection connection = pooledConnection.getConnection();
                if (!connection.isValid(200)) {//是否有效,200ms 没有被超时
                    System.out.println("连接无效");
                    Connection validConnect = DriverManager.getConnection(jdbcUrl, userName, password);
                    pooledConnection.setConnection(validConnect);
                }
                pooledConnection.setBusy(true);
                return pooledConnection;
            } catch (SQLException e) {
                return null;
            }
        }
        return null;
    }

扩容方法对象

扩容的方法相对比较简单,判断当前对象数量有没有溢出,如果没有溢出,就进行扩容

        public void createConnections(int count) throws OutofMaxCountException, IllegalArgumentException {
            if (poolMaxSize <= 0) {
                System.out.println("创建管道对象失败,最大值参数错误");
                throw new IllegalArgumentException("创建管道对象失败,最大值参数错误");
            }
            //判断是否有溢出
            boolean overFlow = isOverFlow(count);
            if (overFlow) {
                return;
            }
            System.out.println("扩容");
            for (int i = 0; i < count; i++) {
                try {
                    overFlow = isOverFlow(count);
                    if (overFlow)
                        return;
                    Connection connection = DriverManager.getConnection(jdbcUrl, userName, password);
                    PooledConnection pooledConnection = new PooledConnection(connection, false);
                    PoolsConnections.add(pooledConnection);
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
            System.out.println("扩容数量:" + PoolsConnections.size());
        }

        private boolean isOverFlow(int count) {
            if (PoolsConnections.size() + count >= poolMaxSize) {
                return true;
            }
            return false;
        } 

上面的代码隐藏一个问题,我们增加对数据的查询方法,方便我们测试。 查询方法如下:

    public ResultSet querySql(String sql) {
        try {
            PooledConnection pooledConnection = getPooledConnection();
            Connection connection = pooledConnection.getConnection();
            Statement statement = connection.createStatement();
            ResultSet resultSet = statement.executeQuery(sql);
            Thread.sleep(1000);
            pooledConnection.close();
            return resultSet;
        } catch (Exception e) {

        }
        return null;
    }

我们对代码做性能测试同样的测试,在我的电脑运行时间为5s左右,大概快了10倍。 但经过多次测试,代码抛出了ConcurrentModificationException异常,这个异常的原因是因为在使用的时候,我们又修改了正在使用的对象。所以在使用的时候要对对象进行加一个读写锁。

为了锁不至于影响到锁的性能,我们把锁碎片化,采用针对每一个对象进行加锁,而不是全局加锁。修改后的封装对象:

    public class PooledConnection {
        private boolean isBusy = false;
        private Connection connection;
        private ReentrantReadWriteLock reentrantReadWriteLock;

        public PooledConnection(Connection connection, boolean b) {
            this.connection = connection;
            reentrantReadWriteLock = new ReentrantReadWriteLock();
        }
        public boolean isBusy() {
            return isBusy;
        }

        public void setBusy(boolean busy) {
            isBusy = busy;
        }

        public Connection getConnection() {
            return connection;
        }
        public void setConnection(Connection connection) {
            this.connection = connection;
        }
        public void close() {
            this.setBusy(false);
        }
        public void shutDown() {
            try {
                this.connection.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }

    //增加读写锁的操作
        public void writeLock() {
            this.reentrantReadWriteLock.writeLock().lock();
        }
        public void unWriteLock() {
            this.reentrantReadWriteLock.writeLock().unlock();
        }
        public void readLock() {
            this.reentrantReadWriteLock.readLock().lock();
        }
        public void unReadLock() {
            this.reentrantReadWriteLock.readLock().unlock();
        }
    }

最终结果:

   public PooledConnection getPooledConnection() throws RuntimeException, SQLException {
        if (poolMaxSize <= 0) {
            System.out.println("创建管道对象失败,最大值参数错误");
            throw new IllegalArgumentException("创建管道对象失败,最大值参数错误");
        }
        PooledConnection realConnection = getRealConnection();
        while (realConnection == null) {
            if (lock.tryLock()) {//尝试获取锁
                createConnections(stepSize);//获得锁之后进行扩容
                lock.unlock();
            } else {
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                }
            }
            realConnection = getRealConnection();
            if (realConnection != null) {
                return realConnection;
            }
        }

        return realConnection;
    }

    private PooledConnection getRealConnection() {
        for (PooledConnection pooledConnection : PoolsConnections) {
            try {
                if (pooledConnection.isBusy())
                    continue;
                /*
                此处要保证写的时候不能被读取,不然会报ConcurrentModificationException异常
                 */
                pooledConnection.writeLock();//读写互斥,写写互斥
                Connection connection = pooledConnection.getConnection();
                if (!connection.isValid(200)) {//是否有效,200ms 没有被超时
                    Connection validConnect = DriverManager.getConnection(jdbcUrl, userName, password);
                    pooledConnection.setConnection(validConnect);
                }
                pooledConnection.setBusy(true);
                pooledConnection.unWriteLock();
                return pooledConnection;
            } catch (SQLException e) {

                return null;
            }
        }
        return null;
    }
    public void createConnections(int count) throws OutofMaxCountException, IllegalArgumentException {
        if (poolMaxSize <= 0) {
            System.out.println("创建管道对象失败,最大值参数错误");
            throw new IllegalArgumentException("创建管道对象失败,最大值参数错误");
        }
        //判断是否有溢出
        boolean overFlow = isOverFlow(count);
        if (overFlow) {
            return;
        }
        System.out.println("扩容");
        for (int i = 0; i < count; i++) {
            try {
                overFlow = isOverFlow(count);
                if (overFlow)
                    return;
                Connection connection = DriverManager.getConnection(jdbcUrl, userName, password);
                PooledConnection pooledConnection = new PooledConnection(connection, false);
                PoolsConnections.add(pooledConnection);
            } catch (SQLException e) {

            }
        }
        System.out.println("扩容数量:" + PoolsConnections.size());
    }

    private boolean isOverFlow(int count) {
        if (PoolsConnections.size() + count >= poolMaxSize) {
            return true;
        }
        return false;
    } 

碰到问题

  1. 首先是无法控制连接最大的数量 ,问题出在扩容没有控制一个线程扩容,使用tryLock解决,代码如下:

     while (realConnection == null) {
         if (lock.tryLock()) {//尝试获取锁
             createConnections(stepSize);//只能让一个线程扩容 获得锁之后进行扩容
             lock.unlock();
         } else {
             try {
                 Thread.sleep(200);
             } catch (InterruptedException e) {
             }
         }
         realConnection = getRealConnection();
         if (realConnection != null) {
             return realConnection;
         }
     }
    
    
  2. ConcurrentModificationException异常,在读取的使用的时候,对象有写入操作,需要保证读取可以并发,读写不能一起,写不同对象是可以并发,使用读写锁可以解决:

     reentrantReadWriteLock.writeLock().lock();//读写互斥,写写互斥
     if (!connection.isValid(2000)) {//是否有效,200ms 没有被超时
         System.out.println("连接无效");
         Connection validConnect = DriverManager.getConnection(jdbcUrl, userName, password);
         pooledConnection.setConnection(validConnect);
     }
     pooledConnection.setBusy(true);
     reentrantReadWriteLock.writeLock().unlock();
    
    
       reentrantReadWriteLock.readLock().lock();
       Statement statement = connection.createStatement();
       ResultSet resultSet = statement.executeQuery(sql);
       reentrantReadWriteLock.readLock().unlock();  
    

    使用上面的代码会存在一个性能问题,就是在写入的时候,如果写入的是不同对象,写入也会进行排斥,所以应该对单个PooledConnection使用锁。

  3. 把锁进行碎片化优化

     public class PooledConnection {
         private boolean isBusy = false;
         private Connection connection;
         private boolean isUsing = false;
         private ReentrantReadWriteLock reentrantReadWriteLock;
    
         public PooledConnection(Connection connection, boolean b) {
             this.isBusy = b;
             this.connection = connection;
             this.isUsing = false;
             reentrantReadWriteLock = new ReentrantReadWriteLock();
         }
         public PooledConnection() {
    
             reentrantReadWriteLock = new ReentrantReadWriteLock();
         }
         public boolean isBusy() {
             return isBusy;
         }
         public void setBusy(boolean busy) {
             isBusy = busy;
         }
         public Connection getConnection() {
             this.isUsing = true;
             return connection;
         }
         public boolean isUsing() {
             return isUsing;
         }
         public void setUsing(boolean using) {
             isUsing = using;
         }
         public void setConnection(Connection connection) {
             this.connection = connection;
         }
         public void close() {
    
             this.isUsing = false;
             this.setBusy(false);
         }
         public void shutDown() {
             try {
                 this.connection.close();
             } catch (SQLException e) {
                 e.printStackTrace();
             }
         }
         public void writeLock() {
             this.reentrantReadWriteLock.writeLock().lock();
         }
         public void unWriteLock() {
             this.reentrantReadWriteLock.writeLock().unlock();
         }
         public void readLock() {
             this.reentrantReadWriteLock.readLock().lock();
         }
         public void unReadLock() {
             this.reentrantReadWriteLock.readLock().unlock();
         }
     }
    
    

读的时候加入读锁:

    PooledConnection pooledConnection = getPooledConnection();
    /*
        此处要保证读的时候不能被修改,使用读锁
        */
    pooledConnection.readLock();
    Connection connection = pooledConnection.getConnection();
    Statement statement = connection.createStatement();
    ResultSet resultSet = statement.executeQuery(sql);
    Thread.sleep(1000);
    pooledConnection.close();
    pooledConnection.unReadLock();
    return resultSet;

写入加锁:

    pooledConnection.writeLock();//读写互斥,写写互斥
    Connection connection = pooledConnection.getConnection();
    if (!connection.isValid(200)) {//是否有效,200ms 没有被超时
        Connection validConnect = DriverManager.getConnection(jdbcUrl, userName, password);
        pooledConnection.setConnection(validConnect);
    }
    pooledConnection.setBusy(true);
    pooledConnection.unWriteLock();
    return pooledConnection;

优化后耗时:耗时为:3692ms 。

(本文完)

作者:付威

博客地址:http://blog.laofu.online

如果觉得对您有帮助,可以下方的RSS订阅,谢谢合作

如有任何知识产权、版权问题或理论错误,还请指正。

本文是付威的网络博客原创,自由转载-非商用-非衍生-保持署名,请遵循:创意共享3.0许可证

交流请加群113249828: 点击加群   或发我邮件 laofu_online@163.com

付威

获得最新的博主文章,请关注上方公众号