C3P0源码解析一
博客专区 > 关河 的博客 > 博客详情
C3P0源码解析一
关河 发表于5个月前
C3P0源码解析一
  • 发表于 5个月前
  • 阅读 30
  • 收藏 0
  • 点赞 1
  • 评论 0

腾讯云 技术升级10大核心产品年终让利>>>   

摘要: C3P0是一个开源的JDBC连接池,它实现了数据源和JNDI绑定,支持JDBC3规范和JDBC2的标准扩展。目前使用它的开源项目有hibernate,spring等。

 

      C3P0是一个开源的JDBC连接池,它实现了数据源和JNDI绑定,支持JDBC3规范和JDBC2的标准扩展。目前使用它的开源项目hibernatespring等。

       我们来看相关的获取链接的实现代码

首先是配置文件部分:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
    <bean name="CHZ-MASTER" class="com.mchange.v2.c3p0.ComboPooledDataSource"
          destroy-method="close">
        <property name="driverClass" value="com.mysql.jdbc.Driver" />
        <property name="jdbcUrl" value="jdbc:mysql://xxxx:3306/db?characterEncoding=UTF-8&amp;zeroDateTimeBehavior=convertToNull" />
        <property name="user" value="user" />
        <property name="password" value="password" />
        <property name="minPoolSize" value="3" />
        <property name="maxPoolSize" value="20" />
        <property name="maxIdleTime" value="1800" />
        <property name="acquireIncrement" value="2" />
        <property name="maxStatements" value="0" />
        <property name="initialPoolSize" value="2" />
        <property name="idleConnectionTestPeriod" value="1800" />
        <property name="acquireRetryAttempts" value="30" />
        <property name="breakAfterAcquireFailure" value="true" />
        <property name="testConnectionOnCheckout" value="false" />
    </bean>
</beans>

相关的test类

public class C3p0DBTest {
    public static void main(String[] args) throws SQLException {
        FileSystemXmlApplicationContext applicationContext = new FileSystemXmlApplicationContext();
        String location = "D:\\workpro\\CHZ\\src\\test\\resources\\spring\\spring-res-test.xml";
        applicationContext.setConfigLocation(location);
        applicationContext.refresh();
        ComboPooledDataSource dataSource =
            (ComboPooledDataSource) applicationContext.getBean("CHZ-MASTER");
        Connection connection = dataSource.getConnection();
        String sql = "select count(*) as numCount from t_supplier_price_rule";
        PreparedStatement ps = connection.prepareStatement(sql);
        ResultSet rs = ps.executeQuery();
        if (rs.next()){
            System.out.println("rs:"+ JSON.toJSONString(rs.getString(1)));
        }
        Connection connectionNew = dataSource.getConnection();
        String sqlnew = "select count(*) as numCount from vendor_order";
        PreparedStatement psnew = connectionNew.prepareStatement(sqlnew);
        ResultSet rsnew = psnew.executeQuery();
        if (rsnew.next()){
            System.out.println("rsnew:"+ JSON.toJSONString(rsnew.getString(1)));
        }
    }
}

        这里我们可以看到获取C3p0的入口是dataSource.getConnection(),由于C3p0是懒加载模式的,即只有在第一次使用getConnection的时候才会真正的去获取相关的connection

我们调用dataSource.getConnection()可以看到代码如下:

//implementation of javax.sql.DataSource
    public Connection getConnection() throws SQLException
    {
        PooledConnection pc = getPoolManager().getPool().checkoutPooledConnection();
        return pc.getConnection();
    }

    这里不是com.mchange.v2.c3p0.ComboPooledDataSource而是它的父类AbstractPoolBackedDataSource来实现这部分代码。

我们一步一步接着看:

     getPoolManager获取:

private synchronized C3P0PooledConnectionPoolManager getPoolManager() throws SQLException
    {
        if (poolManager == null)
        {
            ConnectionPoolDataSource cpds = assertCpds();
            poolManager = new C3P0PooledConnectionPoolManager(cpds, null, null, this.getNumHelperThreads(), this.getIdentityToken(), this.getDataSourceName());
            if (logger.isLoggable(MLevel.INFO))
                logger.info("Initializing c3p0 pool... " + this.toString( true )  /* + "; using pool manager: " + poolManager */);
        }
        return poolManager;	    
    }

这里进行了加锁操作

public C3P0PooledConnectionPoolManager(ConnectionPoolDataSource cpds, 
					   Map flatPropertyOverrides,     // Map of properties, usually null
					   Map forceUserOverrides,        // userNames to Map of properties, usually null
					   int num_task_threads,
					   String parentDataSourceIdentityToken,
					   String parentDataSourceName)
    throws SQLException
    {
        try
        {
            this.cpds = cpds;
            this.flatPropertyOverrides = flatPropertyOverrides;
            this.num_task_threads = num_task_threads;
            this.parentDataSourceIdentityToken = parentDataSourceIdentityToken;
	    this.parentDataSourceName = parentDataSourceName;
            DbAuth auth = null;
            if ( flatPropertyOverrides != null )
            {
                String overrideUser     = (String) flatPropertyOverrides.get("overrideDefaultUser");
                String overridePassword = (String) flatPropertyOverrides.get("overrideDefaultPassword");
                if (overrideUser == null)
                {
                    overrideUser     = (String) flatPropertyOverrides.get("user");
                    overridePassword = (String) flatPropertyOverrides.get("password");
                }
                if (overrideUser != null)
                    auth = new DbAuth( overrideUser, overridePassword );
            }
            if (auth == null)
                auth = C3P0ImplUtils.findAuth( cpds );
            this.defaultAuth = auth;
            Map tmp = new HashMap();
            BeanInfo bi = Introspector.getBeanInfo( cpds.getClass() );
            PropertyDescriptor[] pds = bi.getPropertyDescriptors();
            PropertyDescriptor pd = null;
            for (int i = 0, len = pds.length; i < len; ++i)
            {
                pd = pds[i];
                String name = pd.getName();
                Method m = pd.getReadMethod();
                if (m != null)
                    tmp.put( name, m );
            }
            this.propNamesToReadMethods = tmp;
            if (forceUserOverrides == null)
            {
                Method uom = (Method) propNamesToReadMethods.get( "userOverridesAsString" );
                if (uom != null)
                {
                    String uoas = (String) uom.invoke( cpds, (Object[]) null ); // cast to suppress inexact type warning
                    //System.err.println("uoas: " + uoas);
                    Map uo = C3P0ImplUtils.parseUserOverridesAsString( uoas );
                    this.userOverrides = uo;
                }
                else
                    this.userOverrides = Collections.EMPTY_MAP;
            }
            else
                this.userOverrides = forceUserOverrides;
            poolsInit();
        }
        catch (Exception e)
        {
            if (Debug.DEBUG)
                logger.log(MLevel.FINE, null, e);
            //e.printStackTrace();
            throw SqlUtils.toSQLException(e);
        }
    }

        这部分代码中真正有效的是poolsInit();方法,我们继续往下可以看到poolInit方法中重要的是maybePrivilegedPoolsInit方法,而maybePrivilegedPoolsInit方法中重要的是_poolsInit方法,

我们接下来看_poolsInit方法:

private synchronized void _poolsInit()
    {
	String idStr = idString();
        //创建相关的定时器    
        this.timer = new Timer(idStr + "-AdminTaskTimer", true );
        int matt = this.getMaxAdministrativeTaskTime();
	    //这个地方很重要,我们着重分析 
        this.taskRunner = createTaskRunner( num_task_threads, matt, timer, idStr + "-HelperThread" );
        //this.taskRunner = new RoundRobinAsynchronousRunner( num_task_threads, true );
        //this.rpfact = ResourcePoolFactory.createInstance( taskRunner, timer );
        int num_deferred_close_threads = this.getStatementCacheNumDeferredCloseThreads();
	
	if (num_deferred_close_threads > 0)
	    this.deferredStatementDestroyer = createTaskRunner( num_deferred_close_threads, matt, timer, idStr + "-DeferredStatementDestroyerThread" );
	else
	    this.deferredStatementDestroyer = null;
        if (POOL_EVENT_SUPPORT)
            this.rpfact = ResourcePoolFactory.createInstance( taskRunner, null, timer );
        else
            this.rpfact = BasicResourcePoolFactory.createNoEventSupportInstance( taskRunner, timer );
        this.authsToPools = new HashMap();
    }

_poolsInit方法中创建了一个timer定时器,还有最重要的便是createTaskRunner方法:

private ThreadPoolAsynchronousRunner createTaskRunner( int num_threads, int matt /* maxAdministrativeTaskTime */, Timer timer, String threadLabel )
    {
	ThreadPoolAsynchronousRunner out = null;
        if ( matt > 0 )
        {
            int matt_ms = matt * 1000;
            out = new ThreadPoolAsynchronousRunner( num_threads, 
						    true,        // daemon thread
						    matt_ms,     // wait before interrupt()
						    matt_ms * 3, // wait before deadlock declared if no tasks clear
						    matt_ms * 6, // wait before deadlock tasks are interrupted (again)
							         // after the hung thread has been cleared and replaced
							         // (in hopes of getting the thread to terminate for
							         // garbage collection)
						    timer,
						    threadLabel );
        }
        else
            out = new ThreadPoolAsynchronousRunner( num_threads, true, timer, threadLabel );
	return out;
    }

ThreadPoolAsynchronousRunner构造方法如下:

private ThreadPoolAsynchronousRunner( int num_threads, 
					  boolean daemon, 
					  int max_individual_task_time,
					  int deadlock_detector_interval, 
					  int interrupt_delay_after_apparent_deadlock,
					  Timer myTimer,
					  boolean should_cancel_timer,
					  String threadLabel )
    {
        this.num_threads = num_threads;
        this.daemon = daemon;
        this.max_individual_task_time = max_individual_task_time;
        this.deadlock_detector_interval = deadlock_detector_interval;
        this.interrupt_delay_after_apparent_deadlock = interrupt_delay_after_apparent_deadlock;
        this.myTimer = myTimer;
        this.should_cancel_timer = should_cancel_timer;
	this.threadLabel = threadLabel;
        recreateThreadsAndTasks();
        myTimer.schedule( deadlockDetector, deadlock_detector_interval, deadlock_detector_interval );
    }

        在这里有两个方法比较重要recreateThreadsAndTasks和myTimer.schedule( deadlockDetector, deadlock_detector_interval, deadlock_detector_interval );

myTimer.schedule方法主要是定时调用DeadlockDetector任务.

        我们看recreateThreadsAndTasks:

// protected by ThreadPoolAsynchronousRunner.this' lock
    // BE SURE CALLER OWNS ThreadPoolAsynchronousRunner.this' lock (or is ctor)
    private void recreateThreadsAndTasks()
    {
        if ( this.managed != null)
        {
            Date aboutNow = new Date();
            for (Iterator ii = managed.iterator(); ii.hasNext(); )
            {
                PoolThread pt = (PoolThread) ii.next();
                pt.gentleStop();
                stoppedThreadsToStopDates.put( pt, aboutNow );
                ensureReplacedThreadsProcessing();
            }
        }
        this.managed = new HashSet();
        this.available = new HashSet();
        this.pendingTasks = new LinkedList();
        for (int i = 0; i < num_threads; ++i)
        {
            Thread t = new PoolThread(i, daemon);
            managed.add( t );
            available.add( t );
            t.start();
        }
    }

这里我们可以看到主要是创建了多个PoolThread线程,并启动,我们来看PoolThread:

class PoolThread extends Thread
    {
        // protected by ThreadPoolAsynchronousRunner.this' lock
        Runnable currentTask;
        // protected by ThreadPoolAsynchronousRunner.this' lock
        boolean should_stop;
        // post ctor immutable
        int index;
        // not shared. only accessed by the PoolThread itself
        TimerTask maxIndividualTaskTimeEnforcer = null;
        PoolThread(int index, boolean daemon)
        {
            this.setName( (threadLabel == null ? this.getClass().getName() : threadLabel) + "-#" + index);
            this.setDaemon( daemon );
            this.index = index;
        }
        public int getIndex()
        { return index; }
        // protected by ThreadPoolAsynchronousRunner.this' lock
        // BE SURE CALLER OWNS ThreadPoolAsynchronousRunner.this' lock
        void gentleStop()
        { should_stop = true; }
        // protected by ThreadPoolAsynchronousRunner.this' lock
        // BE SURE CALLER OWNS ThreadPoolAsynchronousRunner.this' lock
        Runnable getCurrentTask()
        { return currentTask; }
        // no need to sync. data not shared
        private /* synchronized */  void setMaxIndividualTaskTimeEnforcer()
        {
            this.maxIndividualTaskTimeEnforcer = new MaxIndividualTaskTimeEnforcer( this );
            myTimer.schedule( maxIndividualTaskTimeEnforcer, max_individual_task_time );
        }
        // no need to sync. data not shared
        private /* synchronized */ void cancelMaxIndividualTaskTimeEnforcer()
        {
            this.maxIndividualTaskTimeEnforcer.cancel();
            this.maxIndividualTaskTimeEnforcer = null;
        }
        // no need to sync. Timer threadsafe, no other data access
        private void purgeTimer()
        { 
	    myTimer.purge(); 
	    if ( logger.isLoggable( MLevel.FINER ) )
		logger.log(MLevel.FINER, this.getClass().getName() + " -- PURGING TIMER");
	}
        public void run()
        {
	    long checkForPurge = rnd.nextLong();
            try
            {
                thread_loop:
                    while (true)
                    {
                        Runnable myTask;
                        synchronized ( ThreadPoolAsynchronousRunner.this )
                        {
                            while ( !should_stop && pendingTasks.size() == 0 )
                                ThreadPoolAsynchronousRunner.this.wait( POLL_FOR_STOP_INTERVAL );
                            if (should_stop) 
                                break thread_loop;
                            if (! available.remove( this ) )
                                throw new InternalError("An unavailable PoolThread tried to check itself out!!!");
                            myTask = (Runnable) pendingTasks.remove(0);
                            currentTask = myTask;
                        }
                        try
                        { 
                            if (max_individual_task_time > 0)
                                setMaxIndividualTaskTimeEnforcer();
                            myTask.run(); 
                        }
                        catch ( RuntimeException e )
                        {
                            if ( logger.isLoggable( MLevel.WARNING ) )
                                logger.log(MLevel.WARNING, this + " -- caught unexpected Exception while executing posted task.", e);
                            //e.printStackTrace();
                        }
                        finally
                        {
                            if ( maxIndividualTaskTimeEnforcer != null )
				{
				    cancelMaxIndividualTaskTimeEnforcer();
				    // we stochastically purge the timer roughly every PURGE_EVERY cancels
				    // math below is an inline, fast, pseudorandom long generator.
				    // see com.mchange.v2.util.XORShiftRandomUtils
				    checkForPurge ^= (checkForPurge << 21);
				    checkForPurge ^= (checkForPurge >>> 35);
				    checkForPurge ^= (checkForPurge << 4);
				    if ( (checkForPurge % PURGE_EVERY ) == 0 )
					purgeTimer();
				}
                            synchronized ( ThreadPoolAsynchronousRunner.this )
                            {
                                if (should_stop)
                                    break thread_loop;
                                if ( available != null && ! available.add( this ) )
                                    throw new InternalError("An apparently available PoolThread tried to check itself in!!!");
                                currentTask = null;
                            }
                        }
                    }
            }
            catch ( InterruptedException exc )
            {
//              if ( Debug.TRACE > Debug.TRACE_NONE )
//              System.err.println(this + " interrupted. Shutting down.");
                if ( Debug.TRACE > Debug.TRACE_NONE && logger.isLoggable( MLevel.FINE ) )
                    logger.fine(this + " interrupted. Shutting down.");
            }
            catch (RuntimeException re)
            {
                if (logger.isLoggable(MLevel.WARNING))
                    logger.log(MLevel.WARNING, "An unexpected RuntimException is implicated in the closing of " + this, re);
                throw re;
            }
            catch (Error err)
            {
                if (logger.isLoggable(MLevel.WARNING))
                    logger.log(MLevel.WARNING, 
                               "An Error forced the closing of " + this + 
                               ". Will attempt to reconstruct, but this might mean that something bad is happening.", 
                               err);
                throw err;
            }
            finally
            {
                synchronized ( ThreadPoolAsynchronousRunner.this )
                { ThreadPoolAsynchronousRunner.this.shuttingDown( this ); }
            }
        }
    }

这里有一个部分需要注意:

while ( !should_stop && pendingTasks.size() == 0 )
                                ThreadPoolAsynchronousRunner.this.wait( POLL_FOR_STOP_INTERVAL );

由于should_stop的值为false,因此如果pendingTasks.size()的值为0的话,那么这边一直会处于await的状态,那么什么时候pendingTasks.size()不为0呢?

我们暂时跳过这里往下看,我们看getPool方法:

        获取pool方法——getPool

public C3P0PooledConnectionPool getPool()
    throws SQLException
    { return getPool( defaultAuth ); }
public synchronized C3P0PooledConnectionPool getPool(DbAuth auth)
    throws SQLException
    {
        C3P0PooledConnectionPool out = (C3P0PooledConnectionPool) authsToPools.get(auth);
        if (out == null)
        {
            out = createPooledConnectionPool(auth);
            authsToPools.put( auth, out );
	    if ( logger.isLoggable( MLevel.FINE ) )
		logger.log( MLevel.FINE, "Created new pool for auth, username (masked): '" + auth.getMaskedUserString() + "'." );
        }
        return out;
    }
// called only from sync'ed methods
    private C3P0PooledConnectionPool createPooledConnectionPool(DbAuth auth) throws SQLException
    {
        String userName = auth.getUser();
        String automaticTestTable = getAutomaticTestTable( userName );
        String realTestQuery;
        if (automaticTestTable != null)
        {
            realTestQuery = initializeAutomaticTestTable( automaticTestTable, auth );
            if (this.getPreferredTestQuery( userName ) != null)
            {
                if ( logger.isLoggable( MLevel.WARNING ) )
                {
                    logger.logp(MLevel.WARNING, 
                                    C3P0PooledConnectionPoolManager.class.getName(),
                                    "createPooledConnectionPool",
                                    "[c3p0] Both automaticTestTable and preferredTestQuery have been set! " +
                                    "Using automaticTestTable, and ignoring preferredTestQuery. Real test query is ''{0}''.",
                                    realTestQuery
                    );
                }
            }
        }
        else
        {
	    // when there is an automaticTestTable to be constructed, we
	    // have little choice but to grab a Connection on initialization
	    // to ensure that the table exists before the pool tries to
	    // test Connections. in c3p0-0.9.1-pre10, i added the check below
	    // to grab and destroy a cxn even when we don't
	    // need one, to ensure that db access params are correct before
	    // we start up a pool. a user who frequently creates and destroys
	    // PooledDataSources complained about the extra initialization
	    // time. the main use of this test was to prevent superfluous
	    // bad pools from being intialized when JMX users type bad
	    // authentification information into a query method. This is
	    // now prevented in AbstractPoolBackedDataSource. Still, it is
	    // easy for clients to start pools uselessly by asking for
	    // Connections with bad authentification information. We adopt
	    // the compromise position of "trusting" the DataSource's default
	    // authentification info (as defined by defaultAuth), but ensuring
	    // that authentification succeeds via the check below when non-default
	    // authentification info is provided.
	    if (! defaultAuth.equals( auth ))
		ensureFirstConnectionAcquisition( auth );
            realTestQuery = this.getPreferredTestQuery( userName );
        }
        C3P0PooledConnectionPool out =  new C3P0PooledConnectionPool( cpds,
								      auth,
								      this.getMinPoolSize( userName ),
								      this.getMaxPoolSize( userName ),
								      this.getInitialPoolSize( userName ),
								      this.getAcquireIncrement( userName ),
								      this.getAcquireRetryAttempts( userName ),
								      this.getAcquireRetryDelay( userName ),
								      this.getBreakAfterAcquireFailure( userName ),
								      this.getCheckoutTimeout( userName ),
								      this.getIdleConnectionTestPeriod( userName ),
								      this.getMaxIdleTime( userName ),
								      this.getMaxIdleTimeExcessConnections( userName ),
								      this.getMaxConnectionAge( userName ),
								      this.getPropertyCycle( userName ),
								      this.getUnreturnedConnectionTimeout( userName ),
								      this.getDebugUnreturnedConnectionStackTraces( userName ),
								      this.getForceSynchronousCheckins( userName ),
								      this.getTestConnectionOnCheckout( userName ),
								      this.getTestConnectionOnCheckin( userName ),
								      this.getMaxStatements( userName ),
								      this.getMaxStatementsPerConnection( userName ),
								      this.getConnectionTester( userName ),
								      this.getConnectionCustomizer( userName ),
								      realTestQuery,
								      rpfact,
								      taskRunner,
								      deferredStatementDestroyer,
								      parentDataSourceIdentityToken );
        return out;
    }

这里我们继续看C3P0PooledConnectionPool的创建:

C3P0PooledConnectionPool( final ConnectionPoolDataSource cpds,
                    final DbAuth auth,
                    int min, 
                    int max, 
                    int start,
                    int inc,
                    int acq_retry_attempts,
                    int acq_retry_delay,
                    boolean break_after_acq_failure,
                    int checkoutTimeout, //milliseconds
                    int idleConnectionTestPeriod, //seconds
                    int maxIdleTime, //seconds
                    int maxIdleTimeExcessConnections, //seconds
                    int maxConnectionAge, //seconds
                    int propertyCycle, //seconds
                    int unreturnedConnectionTimeout, //seconds
                    boolean debugUnreturnedConnectionStackTraces,
                    boolean forceSynchronousCheckins,
                    final boolean testConnectionOnCheckout,
                    final boolean testConnectionOnCheckin,
                    int maxStatements,
                    int maxStatementsPerConnection,
		    /* boolean statementCacheDeferredClose,      */
                    final ConnectionTester connectionTester,
                    final ConnectionCustomizer connectionCustomizer,
                    final String testQuery,
                    final ResourcePoolFactory fact,
                    ThreadPoolAsynchronousRunner taskRunner,
		    ThreadPoolAsynchronousRunner deferredStatementDestroyer,
                    final String parentDataSourceIdentityToken) throws SQLException
                    {
......................................................
synchronized (fact)
            {
                fact.setMin( min );
                fact.setMax( max );
                fact.setStart( start );
                fact.setIncrement( inc );
                fact.setIdleResourceTestPeriod( idleConnectionTestPeriod * 1000);
                fact.setResourceMaxIdleTime( maxIdleTime * 1000 );
                fact.setExcessResourceMaxIdleTime( maxIdleTimeExcessConnections * 1000 );
                fact.setResourceMaxAge( maxConnectionAge * 1000 );
                fact.setExpirationEnforcementDelay( propertyCycle * 1000 );
                fact.setDestroyOverdueResourceTime( unreturnedConnectionTimeout * 1000 );
                fact.setDebugStoreCheckoutStackTrace( debugUnreturnedConnectionStackTraces );
                fact.setForceSynchronousCheckins( forceSynchronousCheckins );
                fact.setAcquisitionRetryAttempts( acq_retry_attempts );
                fact.setAcquisitionRetryDelay( acq_retry_delay );
                fact.setBreakOnAcquisitionFailure( break_after_acq_failure );
                rp = fact.createPool( manager );
            }
}
        catch (ResourcePoolException e)
        { throw SqlUtils.toSQLException(e); }
    }

这里我省略了其他部分,只留下了相关的部分,最重要的是这行

       rp = fact.createPool( manager );

由于fact是BasicResourcePoolFactory,所以createPool最终转化成new BasicResourcePool方法:

public BasicResourcePool(
		    Manager                  mgr, 
                    int                      start,
                    int                      min, 
                    int                      max, 
                    int                      inc,
                    int                      num_acq_attempts,
                    int                      acq_attempt_delay,
                    long                     check_idle_resources_delay,
                    long                     max_resource_age,
                    long                     max_idle_time,
                    long                     excess_max_idle_time,
                    long                     destroy_unreturned_resc_time,
                    long                     expiration_enforcement_delay,
                    boolean                  break_on_acquisition_failure,
                    boolean                  debug_store_checkout_exceptions,
		    boolean                  force_synchronous_checkins,
                    AsynchronousRunner       taskRunner,
                    RunnableQueue            asyncEventQueue,
                    Timer                    cullAndIdleRefurbishTimer,
                    BasicResourcePoolFactory factory)
    throws ResourcePoolException{
...............................................................
//start acquiring our initial resources
            ensureStartResources();
................................................................
}

这里我只保留了相关的部分,在这个方法中,最终是在_recheckResizePool方法中调用的相关内容:

// must be called from synchronized method, idempotent
    private void _recheckResizePool()
    {
        assert Thread.holdsLock(this);
        if (! broken)
        {
            int msz = managed.size();
            //int expected_size = msz + pending_acquires - pending_removes;
//          System.err.print("target: " + target_pool_size);
//          System.err.println(" (msz: " + msz + "; pending_acquires: " + pending_acquires + "; pending_removes: " + pending_removes + ')');
            //new Exception( "_recheckResizePool() STACK TRACE" ).printStackTrace();
            int shrink_count;
            int expand_count;
            if ((shrink_count = msz - pending_removes - target_pool_size) > 0)
                shrinkPool( shrink_count );
            else if ((expand_count = target_pool_size - (msz + pending_acquires)) > 0)
                expandPool( expand_count );
        }
    }

这里我们看expandPool(expand_count)这个方法:

private void expandPool(int count)
    {
        assert Thread.holdsLock(this);
        // XXX: temporary switch -- assuming no problems appear, we'll get rid of AcquireTask
        //      in favor of ScatteredAcquireTask
        if ( USE_SCATTERED_ACQUIRE_TASK )
        {
            for (int i = 0; i < count; ++i)
                taskRunner.postRunnable( new ScatteredAcquireTask() );
        }
        else
        {
            for (int i = 0; i < count; ++i)
                taskRunner.postRunnable( new AcquireTask() );
        }
    }

        这里我们看到,对taskRunner这个对象我们使用了postRunnable方法,由于taskRunner对象是共享的,所以我们我们这里着重看一下postRunnuable方法:

public synchronized void postRunnable(Runnable r)
    {
        try
        {
            pendingTasks.add( r );
            this.notifyAll();
            
            if (Debug.DEBUG && logger.isLoggable(MLevel.FINEST))
                logger.log(MLevel.FINEST, this + ": Adding task to queue -- " + r);
        }
        catch ( NullPointerException e )
        {
            //e.printStackTrace();
            if ( Debug.DEBUG )
            {
                if ( logger.isLoggable( MLevel.FINE ) )
                    logger.log( MLevel.FINE, "NullPointerException while posting Runnable -- Probably we're closed.", e );
            }
            throw new ResourceClosedException("Attempted to use a ThreadPoolAsynchronousRunner in a closed or broken state.");
        }
    }

        这里我们发现,taskRunner对pendingTask变量这里做了修改,将ScatteredAcquireTask任务加入了PendingTasks中,因而之前通过Pendingtask阻塞的线程(PoolThread),这里可以继续往下走了,

我们回过头去看PoolThread线程。

标签: C3P0 源码 Java
共有 人打赏支持
粉丝 9
博文 42
码字总数 52644
×
关河
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: