C3P0源码解析二
博客专区 > 关河 的博客 > 博客详情
C3P0源码解析二
关河 发表于3个月前
C3P0源码解析二
  • 发表于 3个月前
  • 阅读 6
  • 收藏 1
  • 点赞 1
  • 评论 0

腾讯云实验室 1小时搭建人工智能应用,让技术更容易入门 免费体验 >>>   

摘要: C3P0是一个开源的JDBC连接池,它实现了数据源和JNDI绑定,支持JDBC3规范和JDBC2的标准扩展。目前使用它的开源项目有hibernate,spring等

本篇基于上篇内容

C3P0源码解析一  https://my.oschina.net/guanhe/blog/1488707

我们现在回过头来看PoolThread中的Run部分:

public void run()
        {
	    long checkForPurge = rnd.nextLong();
            try
            {
                thread_loop:
                    while (true)
                    {
                        Runnable myTask;
                        synchronized ( ThreadPoolAsynchronousRunner.this )
                        {
                            while ( !should_stop && pendingTasks.size() == 0 )
                                ThreadPoolAsynchronousRunner.this.wait( POLL_FOR_STOP_INTERVAL );
                            if (should_stop) 
                                break thread_loop;
                            if (! available.remove( this ) )
                                throw new InternalError("An unavailable PoolThread tried to check itself out!!!");
                            myTask = (Runnable) pendingTasks.remove(0);
                            currentTask = myTask;
                        }
                        try
                        { 
                            if (max_individual_task_time > 0)
                                setMaxIndividualTaskTimeEnforcer();
                            myTask.run(); 
                        }
.....................................................................

      这里我截取了最重要的部分,我们可以看到,当PendingTask的size不为0,即getPool修改了ThreadPoolAsynchronousRunner中PendingTask的内容时,即跳出循环,这里我们可以看到:

myTask = (Runnable) pendingTasks.remove(0);
                            currentTask = myTask;
                        }
                        try
                        { 
                            if (max_individual_task_time > 0)
                                setMaxIndividualTaskTimeEnforcer();
                            myTask.run(); 
                        }

        通过去除pendingTask中的内容,并运行,可以发现我们的ScatteredAcquireTask,在这里被启动运行了,这里我们去看ScatteredAcquireTask的run方法,这里我截取了相关的部分:

public void run()
        {
	    boolean recheck = false;
            try
            {
                boolean fkap;
		boolean bkn;
		synchronized( BasicResourcePool.this ) 
		{
		    fkap = BasicResourcePool.this.force_kill_acquires;
		    bkn  = BasicResourcePool.this.broken;
		}
                if (!bkn && !fkap)
                {
                    //we don't want this call to be sync'd
                    //on the pool, so that resource acquisition
                    //does not interfere with other pool clients.
                    BasicResourcePool.this.doAcquireAndDecrementPendingAcquiresWithinLockOnSuccess();
                }
		else
		{
		    decrementPendingAcquires();
		    recheck = true;
		}

主要是此方法:

BasicResourcePool.this.doAcquireAndDecrementPendingAcquiresWithinLockOnSuccess();
private void doAcquire( int decrement_policy ) throws Exception
    {
        assert !Thread.holdsLock( this );
        Object resc = mgr.acquireResource(); //note we acquire the resource while we DO NOT hold the pool's lock!
        boolean destroy = false;
        int msz;
        synchronized(this) //assimilate resc if we do need it
        {
	    try
		{
		    msz = managed.size();
		    if (!broken && msz < target_pool_size)
			assimilateResource(resc); 
		    else
			destroy = true;
		    
		    if (decrement_policy == DECREMENT_ON_SUCCESS)
			_decrementPendingAcquires();
		}
	    finally
		{
		    if (decrement_policy == DECREMENT_WITH_CERTAINTY)
			_decrementPendingAcquires();
		}
        }
        if (destroy)
        {
	    try
		{
		    mgr.destroyResource( resc, false ); //destroy resc if superfluous, without holding the pool's lock
		    if (logger.isLoggable( MLevel.FINER))
			logger.log(MLevel.FINER, "destroying overacquired resource: " + resc);
		}
	    catch (Exception e)
		{
		    if (logger.isLoggable( MLevel.FINE))
			logger.log(MLevel.FINE, "An exception occurred while trying to destroy an overacquired resource: " + resc, e);
		}
        }
    }

这个里面获取connection用到的就是我们的connection

class PooledConnectionResourcePoolManager implements ResourcePool.Manager
            {	
                //SynchronizedIntHolder totalOpenedCounter  = new SynchronizedIntHolder();
                //SynchronizedIntHolder connectionCounter   = new SynchronizedIntHolder();
                //SynchronizedIntHolder failedCloseCounter  = new SynchronizedIntHolder();
                final boolean connectionTesterIsDefault = (connectionTester instanceof DefaultConnectionTester);
 
                public Object acquireResource() throws Exception
                { 
                    PooledConnection out;
                    if ( connectionCustomizer == null)
                    {
                        out = (auth.equals( C3P0ImplUtils.NULL_AUTH ) ?
                               cpds.getPooledConnection() :
                               cpds.getPooledConnection( auth.getUser(), 
                                                         auth.getPassword() ) );
                    }
                    else
                    {

这个out便是我们获取到的真正的connection

由于PoolThread是多线程执行,所以doAcquire方法中通过加锁方法来进行修改相关的值的内容

synchronized(this) //assimilate resc if we do need it
        {
	    try
		{
		    msz = managed.size();
		    if (!broken && msz < target_pool_size)
			assimilateResource(resc); 
		    else
			destroy = true;
		    
		    if (decrement_policy == DECREMENT_ON_SUCCESS)
			_decrementPendingAcquires();
		}
	    finally
		{
		    if (decrement_policy == DECREMENT_WITH_CERTAINTY)
			_decrementPendingAcquires();
		}
        }

到这里:

private void assimilateResource( Object resc ) throws Exception
    {
        assert Thread.holdsLock( this );
        managed.put(resc, new PunchCard());
        unused.add(0, resc);
        //System.err.println("assimilate resource... unused: " + unused.size());
        asyncFireResourceAcquired( resc, managed.size(), unused.size(), excluded.size() );
        this.notifyAll();
        if (Debug.DEBUG && Debug.TRACE == Debug.TRACE_MAX) trace();
        if (Debug.DEBUG && exampleResource == null)
            exampleResource = resc;
    }

    我们可以看到unused.add(0,resc)这个方法,并在此做了motifyAll的操作,我们接下来看我们的主线程去获取connection的操作,这里的doAcquire操作全部都是在后台的PoolThread中运行了,和主线程分离,这里需要注意

我们来看获取connection的步骤:

//implementation of javax.sql.DataSource
    public Connection getConnection() throws SQLException
    {
        PooledConnection pc = getPoolManager().getPool().checkoutPooledConnection();
        return pc.getConnection();
    }

    其中在获取getpool的过程中,后台进行已经在做doAcquired的操作了

    主线程中我们通过getPool获取到了C3P0PooledConnectionPool线程池,我们来看checkoutPooledConnection方法:

public PooledConnection checkoutPooledConnection() throws SQLException
    { 
        //System.err.println(this + " -- CHECKOUT");
        try 
	    { 
		PooledConnection pc = (PooledConnection) this.checkoutAndMarkConnectionInUse(); 
		pc.addConnectionEventListener( cl );
		return pc;
	    }
        catch (TimeoutException e)
        { throw SqlUtils.toSQLException("An attempt by a client to checkout a Connection has timed out.", e); }
        catch (CannotAcquireResourceException e)
        { throw SqlUtils.toSQLException("Connections could not be acquired from the underlying database!", "08001", e); }
        catch (Exception e)
        { throw SqlUtils.toSQLException(e); }
    }
private Object checkoutAndMarkConnectionInUse() throws TimeoutException, CannotAcquireResourceException, ResourcePoolException, InterruptedException
    {
        Object out = null; 
	boolean success = false;
	while (! success)
	    {
		try
		    {
			out = rp.checkoutResource( checkoutTimeout );
			if (out instanceof AbstractC3P0PooledConnection)
			    {
				// cast should succeed, because effectiveStatementCache implies c3p0 pooled Connections
				AbstractC3P0PooledConnection acpc = (AbstractC3P0PooledConnection) out;
				Connection physicalConnection = acpc.getPhysicalConnection();
				success = tryMarkPhysicalConnectionInUse(physicalConnection);
			    }
			else
			    success = true; //we don't pool statements from non-c3p0 PooledConnections
		    }
		finally
		    {
			try { if (!success && out != null) rp.checkinResource( out );}
			catch (Exception e) { logger.log(MLevel.WARNING, "Failed to check in a Connection that was unusable due to pending Statement closes.", e); }
		    }
            }
        return out;
    }

    这里最重要的获取out操作便是rp.checkoutResource( checkoutTimeout ),我们知道rp是basicResourcePool,所以这里方法这里转换成以下方法:

public Object checkoutResource( long timeout )
	throws TimeoutException, ResourcePoolException, InterruptedException
    {
	try
	{
	    Object resc = prelimCheckoutResource( timeout );
	    
	    // best to do the recheckout while we don't hold this'
	    // lock, so we don't refurbish-on-checkout while holding.
	    boolean refurb = attemptRefurbishResourceOnCheckout( resc );
	    
	    synchronized( this )
	    {
		if (!refurb)
		{
		    if (Debug.DEBUG && logger.isLoggable( MLevel.FINER))
			logger.log( MLevel.FINER, "Resource [" + resc + "] could not be refurbished in preparation for checkout. Will try to find a better resource." );
		    removeResource( resc );
		    ensureMinResources();
		    resc = null;
		}
		else
.......................................................................

我们看prelimCheckoutResource方法,这个方法返回我们获取到的connection:

private synchronized Object prelimCheckoutResource( long timeout )
	throws TimeoutException, ResourcePoolException, InterruptedException
    {
        try
        {
            ensureNotBroken();
            int available = unused.size();
            if (available == 0)
            {
                int msz = managed.size();
                if (msz < max)
                {
                    // to cover all the load, we need the current size, plus those waiting already for acquisition, 
                    // plus the current client 
                    int desired_target = msz + acquireWaiters.size() + 1;
                    if (logger.isLoggable(MLevel.FINER))
                        logger.log(MLevel.FINER, "acquire test -- pool size: " + msz + "; target_pool_size: " + target_pool_size + "; desired target? " + desired_target);
                    if (desired_target >= target_pool_size)
                    {
                        //make sure we don't grab less than inc Connections at a time, if we can help it.
                        desired_target = Math.max(desired_target, target_pool_size + inc);
                        //make sure our target is within its bounds
                        target_pool_size = Math.max( Math.min( max, desired_target ), min );
                        _recheckResizePool();
                    }
                }
                else
                {
                    if (logger.isLoggable(MLevel.FINER))
                        logger.log(MLevel.FINER, "acquire test -- pool is already maxed out. [managed: " + msz + "; max: " + max + "]");
                }
                awaitAvailable(timeout); //throws timeout exception
            }
            Object  resc = unused.get(0);

..........................................................................

    这里我们可以看到awaitAvailable方法,这个里面当我们后台的poolThread还没有获取到相关的connection的时候会阻塞在这边:


................................................

while ((avail = unused.size()) == 0) 
            {
                // the if case below can only occur when 1) a user attempts a
                // checkout which would provoke an acquire; 2) this
                // increments the pending acquires, so we go to the
                // wait below without provoking postAcquireMore(); 3)
                // the resources are acquired; 4) external management
                // of the pool (via for instance unpoolResource() 
                // depletes the newly acquired resources before we
                // regain this' monitor; 5) we fall into wait() with
                // no acquires being scheduled, and perhaps a managed.size()
                // of zero, leading to deadlock. This could only occur in
                // fairly pathological situations where the pool is being
                // externally forced to a very low (even zero) size, but 
                // since I've seen it, I've fixed it.
                if (pending_acquires == 0 && managed.size() < max)
                    _recheckResizePool();
                this.wait(timeout);
                if (timeout > 0 && System.currentTimeMillis() - start > timeout)
                    throw new TimeoutException("A client timed out while waiting to acquire a resource from " + this + " -- timeout at awaitAvailable()");
                if (force_kill_acquires)
                    throw new CannotAcquireResourceException("A ResourcePool could not acquire a resource from its primary factory or source.", getLastAcquisitionFailure());
                ensureNotBroken();
            }

...................................

    this.wait(timeout),有没有发现和之前的this.notifyAll()对应起来了,没错这边就是阻塞在等待poolThread获取到相关的connection链接,然后唤醒之后通过unused.get(0)便可以获取到我们之前通过unused.add(0,resc)所放入的connection了。

这里的获取Connection就算完成了。

总结一下:

获取链接的过程:

ComboPooledDataSource.getConnection()——>
AbstractPoolBackedDataSource.getConnection()——>
getPoolManager().getPool().checkoutPooledConnection()

1、获取PoolManger

getPoolManager()——>
new C3P0PooledConnectionPoolManager()——>
poolsInit()——>
maybePrivilegedPoolsInit()——>
_poolsInit()——>
createTaskRunner()——>
new ThreadPoolAsynchronousRunner()——>
recreateThreadsAndTasks()  myTimer.schedule()【异步】——>
(new PoolThread()).start()

2、获取Pool

getPool()——>
createPooledConnectionPool()——>
new C3P0PooledConnectionPool()——>
fact.createPool()——>
BasicResourcePoolFactory.createPool()——>
new BasicResourcePool()——>
ensureStartResources()——>
recheckResizePool()——>
_recheckResizePool()——>
expandPool()——>
taskRunner.postRunnable()

【异步执行】

PoolThread.start()——>
pendingTasks.run()【这里详细看run方法】——>
ScatteredAcquireTask.run()——>
BasicResourcePool.this.doAcquireAndDecrementPendingAcquiresWithinLockOnSuccess()——>
doAcquire()——>
C3P0PooledConnectionPool.acquireResource()——>
BasicResourcePool.assimilateResource()——>
BasicResourcePool.notifyAll()【步骤aaaaa】


3、获取Connection

checkoutPooledConnection()——>
C3P0PooledConnectionPool.checkoutAndMarkConnectionInUse——>
BasicResourcePool.checkoutResource()——>
BasicResourcePool.prelimCheckoutResource——>
BasicResourcePool.awaitAvailable(timeout)——>
BasicResourcePool.wait()【步骤bbbbb】  ——>
Object  resc = unused.get(0);

注意aaaaa步骤和bbbbb步骤之间的联系

标签: C3p0 源码 Java
共有 人打赏支持
粉丝 9
博文 40
码字总数 51755
×
关河
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: