文档章节

pgpool-II分析篇

人比黄花瘦太多
 人比黄花瘦太多
发布于 2017/05/22 13:47
字数 1428
阅读 74
收藏 0

pgpool-II的SQL解析

从源码中,我们看到相关函数调用顺序如下:

                                      

其中,do child函数如下:

/*                                    
* child main loop                                    
*/                                    
void do_child(int unix_fd, int inet_fd)                                    
{                                    
…                                    
    for (;;)                                
    {                                
        …                            
        /* perform accept() */                            
        frontend = do_accept(unix_fd, inet_fd, &timeout);                            
        if (frontend =/=* N cUonLLn)ection request from frontend timed out */                            
        {                            
            /* check select() timeout */                        
            if (connected && pool_config->child_life_time > 0 &&                        
                timeout.tv_sec == 0 && timeout.tv_usec == 0)                    
            {                        
                pool_debug("child life %d seconds expired", pool_config->child_life_time);                    
                /*                    
                * Doesn't need to call this. child_exit() calls it.                    
                * send_frontend_exits();                    
                */                    
                child_exit(2);                    
            }                        
            continue;                        
        }                            
        …                            
        /*                            
        * Ok, negotiaton with frontend has been done. Let's go to the                            
        * next step. Connect to backend if there's no existing                            
        * connection which can be reused by this frontend.                            
        * Authentication is also done in this step.                            
        */                            
        …                            
        /*                            
        * if there's no connection associated with user and database,                            
        * we need to connect to the backend and send the startup packet.                            
        */                            
        /* look for existing connection */                            
        found = 0;                            
        backend = pool_get_cp(sp->user, sp->database, sp->major, 1);                            
        …                            
        /* Mark this connection pool is conncted from frontend */                            
        pool_coninfo_set_frontend_connected(pool_get_process_context()->proc_id, pool_pool_index());                            
        /* query process loop */                            
        for (;;)                            
        {                            
            POOL_STATUS status;                        
            status = pool_process_query(frontend, backend, 0);                        
            sp = MASTER_CONNECTION(backend)->sp;                        
            switch (status)                        
            {                        
                …                    
            }                        
            if (status != POOL_CONTINUE)                        
                break;                    
        }                            
        …                            
    }                                
    child_exit(0);                                
}                           

在查询过程循环中,调用了函数pool_process_query,该函数是主要查询处理模块,代码如下:

/*                                    
* Main module for query processing                                    
* reset_request: if non 0, call reset_backend to execute reset queries                                    
*/                                    
POOL_STATUS pool_process_query(POOL_CONNECTION *frontend,                                    
                        POOL_CONNECTION_POOL *backend,            
                        int reset_request)            
{                                    
    …                                
    for (;;)                                
    {                                
        …                            
        /*                            
        * If we are prcessing query, process it.                            
        */                            
        if (pool_is_query_in_progress())                            
        {                            
            status = ProcessBackendResponse(frontend, backend, &state, &num_fields);                        
            if (status != POOL_CONTINUE)                        
                return status;                    
        }                            
        /*                            
        * If frontend and all backends do not have any pending data in                            
        * the receiving data cache, then issue select(2) to wait for new                            
        * data arrival                            
        */                            
        else if (is_cache_empty(frontend, backend))                            
        {                            
            bool cont = true;                        
            status = read_packets_and_process(frontend, backend, reset_request,                        
                                    &state, &num_fields, &cont);
            if (status != POOL_CONTINUE)                        
                return status;                    
            else if (!c/o*n Dt)etected admin shutdown */                        
                return status;                    
        }                            
        else                            
        {                            
            …                        
        }                            
        …                            
    }                                
    return POOL_CONTINUE;                                
}                                    
                          

检查是否有等待的数据,如果没有则调用read_packets_and_process函数并等待数据到达,其中read_packets_and_process函数定义如下:

/*                                    
* Read packet from either frontend or backend and process it.                                    
*/                                    
static POOL_STATUS read_packets_and_process(POOL_CONNECTION *frontend,                                    
POOL_CONNECTION_POOL *backend, int reset_request, int *state, short *num_fields, bool *cont)                                    
{                                    
    …                                
    if (!reset_request)                                
    {                                
        if (FD_ISSET(frontend->fd, &exceptmask))                            
            return POOL_END;                        
        else if (FD_ISSET(frontend->fd, &readmask))                            
        {                            
            status = ProcessFrontendResponse(frontend, backend);                        
            if (status != POOL_CONTINUE)                        
                return status;                    
        }                            
    }                                
    …                                
    return POOL_CONTINUE;                                
}          

其中调用了ProcessFrontendResponse函数,其定义如下:

POOL_STATUS ProcessFrontendResponse(POOL_CONNECTION *frontend,                                    
                            POOL_CONNECTION_POOL *backend)        
{                                    
    …                                
    switch (fkind)                                
    {                                
        …                            
        case 'X': /* Terminate */                            
            free(contents);                        
            return POOL_END;                        
        case 'Q': /* Query */                            
            allow_close_transaction = 1;                        
            status = SimpleQuery(frontend, backend, len, contents);                        
            break;                        
        …                            
        default:                            
            pool_error("ProcessFrontendResponse: unknown message type %c(%02x)", fkind, fkind);                        
            status = POOL_ERROR;                        
    }                                
    free(contents);                                
    if (status != POOL_CONTINUE)                                
        status = POOL_ERROR;                            
    return status;                                
}          

在switch语句中,当case为query时,调用SimpleQuery函数,代码如下:

/*                                    
* Process Query('Q') message                                    
* Query messages include an SQL string.                                    
*/                                    
POOL_STATUS SimpleQuery(POOL_CONNECTION *frontend,                                    
                    POOL_CONNECTION_POOL *backend, int len, char *contents)                
{                                    
    …                                
    /* log query to log file if necessary */                                
    if (pool_config->log_statement)                                
    {                                
        pool_log("statement: %s", contents);                            
    }                                
    else                                
    {                                
        pool_debug("statement2: %s", contents);                            
    }                                
    …                                
    if (parse_tree_list != NIL)                                
    {                                
        …                            
        /*                            
        * Decide where to send query                            
        */                            
        pool_where_to_send(query_context, query_context->original_query,                            
                        query_context->parse_tree);            
        …                            
    }                                
    …                                
    /* switch memory context */                                
    pool_memory_context_switch_to(old_context);                                
    return POOL_CONTINUE;                                
}           

其中调用了决定发送查询的位置的函数pool_where_to_send,其定义如下:

/*                                    
* Decide where to send queries(thus expecting response)                                    
*/                                    
void pool_where_to_send(POOL_QUERY_CONTEXT *query_context, char *query, Node *node)                                    
{                                    
    …                                
    /*                                
    * In raw mode, we send only to master node. Simple enough.                                
    */                                
    if (RAW_MODE)                                
    {                                
        pool_set_node_to_be_sent(query_context, REAL_MASTER_NODE_ID);                            
    }                                
    else if (MASTER_SLAVE && query_context->is_multi_statement)                                
    {                                
    …                                
    }                                
    else if (MASTER_SLAVE)                                
    {                                
        POOL_DEST dest;                            
        POOL_MEMORY_POOL *old_context;                            
        old_context = pool_memory_context_switch_to(query_context->memory_context);                            
        dest = send_to_where(node, query);                            
        pool_memory_context_switch_to(old_context);                            
        pool_debug("send_to_where: %d query: %s", dest, query);                            
        /* Should be sent to primary only? */                            
        if (dest == POOL_PRIMARY)                            
        {                            
            pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);                        
        }                            
        /* Should be sent to both primary and standby? */                            
        else if (dest == POOL_BOTH)                            
        {                            
            pool_setall_node_to_be_sent(query_context);                        
        }                            
        /*                            
        * Ok, we might be able to load balance the SELECT query.                            
        */                            
        else                            
        {                            
            …                        
        }                            
    }                                
    else if (REPLICATION || PARALLEL_MODE)                                
    {                                
        …                            
    }                                
    else                                
    {                                
        pool_error("pool_where_to_send: unknown mode");                            
        return;                            
    }                                
    …                                
    return;                                
}

当处在Master/Slave模式的时候,调用了函数send_to_where,代码如下:

/*                                    
* From syntactically analysis decide the statement to be sent to the                                    
* primary, the standby or either or both in master/slave+HR/SR mode.                                    
*/                                    
static POOL_DEST send_to_where(Node *node, char *query)                                    
{                                    
    if (bsearch(&nodeTag(node), nodemap, sizeof(nodemap)/sizeof(nodemap[0]),                                
            sizeof(NodeTag), compare) != NULL)                        
    {                                
        /*                            
        * SELECT INTO                            
        * SELECT FOR SHARE or UPDATE                            
        */                            
        if (IsA(node, SelectStmt))                            
        {                            
            /* SELECT INTO or SELECT FOR SHARE or UPDATE ? */                        
            if (pool_has_insertinto_or_locking_clause(node))                        
                return POOL_PRIMARY;                    
            return POOL_EITHER;                        
        }                            
        …                            
        /*                            
        * Transaction commands                            
        */                            
        else if (IsA(node, TransactionStmt))                            
        {                            
            /*                        
            * Check "BEGIN READ WRITE" "START TRANSACTION READ WRITE"                        
            */                        
            if (is_start_transaction_query(node))                        
            {                        
                /* But actually, we send BEGIN to standby if it's                    
                BEGIN READ WRITE or START TRANSACTION READ WRITE */                    
                if (is_read_write((TransactionStmt *)node))                    
                    return POOL_BOTH;                
                /* Other TRANSACTION start commands are sent to both primary                    
                    and standby */                
                else                    
                    return POOL_BOTH;                
            }                        
            /* SAVEPOINT related commands are sent to both primary and standby */                        
            else if (is_savepoint_query(node))                        
                return POOL_BOTH;                    
            /*                        
            * 2PC commands                        
            */                        
            else if (is_2pc_transaction_query(node))                        
                return POOL_PRIMARY;                    
            else                        
            /* COMMIT etc. */                        
                return POOL_BOTH;                    
        }                            
        …                            
        /*                            
        * EXECUTE                            
        */                            
        else if (IsA(node, ExecuteStmt))                            
        {                            
            /* This is temporary decision. where_to_send will inherit                        
            * same destination AS PREPARE.                        
            */                        
            return POOL_PRIMARY;                        
        }                            
        …                            
        /*                            
        * Other statements are sent to primary                            
        */                            
        return POOL_PRIMARY;                            
    }                                
                                    
    /*                                
    * All unknown statements are sent to primary                                
    */                                
    return POOL_PRIMARY;                                
}               

send_to_where函数中,处在Master/Slave模式的时候,数据的增、删、改指令只向PrimaryDB发送。begin/commit这样的事务有关的指令,则既向Master送信,也向Slave送信。

pgpool-II进程池

配置文件pgpool.conf中有配置选项num_init_children,其为预先生成的 pgpool-II 服务进程数。默认为 32。num_init_children 也是 pgpool-II 支持的从客户端发起的最大并发连接数。如果超过 num_init_children 数的客户端尝试连接到 pgpool-II,它们将被阻塞(而不是拒绝连接),直到到任何一个 pgpool-II 进程的连接被关闭为止。最多有 2*num_init_children 可以被放入等待队列。

main函数中的部分代码如下:

/*                    
* pgpool main program                    
*/                    
int main(int argc, char **argv)                    
{                    
    ……                
    /* create unix domain socket */                
    unix_fd = create_unix_domain_socket(un_addr);                
                    
    /* create inet domain socket if any */                
    if (pool_config->listen_addresses[0])                
    {                
        inet_fd = create_inet_domain_socket
          (pool_config->listen_addresses, pool_config->port);            
    }                
                    
    ……                
    /*                
     * We need to block signal here. Otherwise child might send some               
     * signals, for example SIGUSR1(fail over).  Children will inherit             
     * signal blocking but they do unblock signals at the very beginning           
     * of process.  So this is harmless.                
     */                
    POOL_SETMASK(&BlockSig);                
                    
    /* fork the children */                
    for (i=0;i<pool_config->num_init_children;i++){                
        process_info[i].pid = fork_a_child(unix_fd, inet_fd, i);            
        process_info[i].start_time = time(NULL);            
    }                
                    
    /* set up signal handlers */                               
    pool_signal(SIGTERM, exit_handler);                
    pool_signal(SIGINT, exit_handler);                
    pool_signal(SIGQUIT, exit_handler);                
    pool_signal(SIGCHLD, reap_handler);                
    pool_signal(SIGUSR1, failover_handler);                
    pool_signal(SIGUSR2, wakeup_handler);                
    pool_signal(SIGHUP, reload_config_handler);                
                    
    /* create pipe for delivering event */                
    if (pipe(pipe_fds) < 0){                
        pool_error("failed to create pipe");            
        myexit(1);            
    }                
             
    pool_log("%s successfully started. version %s (%s)", 
               PACKAGE, VERSION, PGPOOLVERSION);                
                    
    …… ////main loop is here                
            
    pool_shmem_exit(0);                
}                    

在fork the children的for循环中,每一个num_init_children都fork一个子进程,所以有多少个num_init_children,就fork多少个子进程。fork_a_child定义如下:

/*                    
* fork a child                    
*/                    
pid_t fork_a_child(int unix_fd, int inet_fd, int id)                    
{                    
    pid_t pid;                
                    
    pid = fork();                
                    
    if (pid == 0)                
    {                
        ……         
        /* call child main */            
        POOL_SETMASK(&UnBlockSig);            
        reload_config_request = 0;            
        my_proc_id = id;            
        run_as_pcp_child = false;            
        do_child(unix_fd, inet_fd);            
    }                
    else if (pid == -1)                
    {                
        pool_error("fork() failed. reason: %s", strerror(errno));            
        myexit(1);            
    }                
    return pid;                
}        

其中调用了do child函数,而且每fork一个进程就会执行一次do child函数,各个子进程就开始工作了,do child函数定义如下:

/*                    
* child main loop                    
*/                    
void do_child(int unix_fd, int inet_fd)                    
{                
    ……                
                    
    for (;;)                
    {                
        ……            
        /* perform accept() */            
        frontend = do_accept(unix_fd, inet_fd, &timeout);            
                    
        if (frontend == NULL)/* connection request from frontend timed out */
        {            
            ……        
        }  
        ……          
        /* query process loop */            
        for (;;)            
        {            
            ……        
        }            
        ……            
    }                
    child_exit(0);                
}

do_child函数里面调用 do_accept函数,如果客户端有请求,就会开始响应客户端,开始工作。

© 著作权归作者所有

人比黄花瘦太多
粉丝 9
博文 7
码字总数 19401
作品 0
济南
私信 提问
PostgreSQL 负载均衡中间件 Pgpool-II 5 版齐发

Pgpool-II 是一个给 PostgreSQL 补充实用功能的工具,包括:连接池、负载均衡、自动故障切换等等。 Pgpool全球开发集团宣布推出以下版本的Pgpool-II: 3.7.3 3.6.10 3.5.14 3.4.17 3.3.21 这...

周其
2018/04/18
1K
5
用Pgpool-II实现Postgresql高可用集群

其实整个安装和配置过程比较简单,官方网站有比较好的文档,在此只是根据前几天的实际部署整理一下。(实际执行的命令都用红色标出) 服务器: 10.18.27.181 pgpool服务器 --------此服务器上...

javasql
2015/02/27
3.8K
0
PostgreSQL 负载均衡中间件 Pgpool-II 五版齐发

Pgpool-II 是一个给 PostgreSQL 补充实用功能的工具,包括:连接池、负载均衡、自动故障切换等等。 Pgpool-II 刚刚发布了五个新版本: 3.7.2 3.6.9 3.5.13 3.4.16 3.3.20 新版本修复了在 Pg...

王练
2018/02/14
2.2K
6
pgpool-II 数据库集群工具的安装配置(主要是记录下安装过程中遇到的问题)

前部分: 下载:下载后的源码包中就包含中文手册和入门教程。 http://www.pgpool.net/mediawiki/index.php/Downloads 记住,ubuntu的话,挑选源代码下载!否则./configure的时候会报".PO"文件...

威武不能笑
2014/04/30
2.9K
0
Pgpool-II4.0.2, 3.7.7, 3.6.14, 3.5.18小版本更新发布

Pgpool-II是一款PostgreSQL周边工具软件,功能包括:链接池、负载均衡、自动故障转移等。 今天,Pgpool全球开发组发布了最新的小版本更新,包括:Pgpool-II 4.0.2, 3.7.7, 3.6.14, 3.5.18, 3...

闻术苑
2018/11/23
718
0

没有更多内容

加载失败,请刷新页面

加载更多

如何设计抗住100亿次请求的抢红包系统?(附GitHub代码)

1. 前言 前几天,偶然看到了 《扛住100亿次请求——如何做一个“有把握”的春晚红包系统”》一文,看完以后,感慨良多,收益很多。 正所谓他山之石,可以攻玉,虽然此文发表于2015年,我看到...

Java程序员之家
36分钟前
3
0
动图+源码,演示Java中常用数据结构执行过程及原理

最近在整理数据结构方面的知识, 系统化看了下Java中常用数据结构, 突发奇想用动画来绘制数据流转过程. 主要基于jdk8, 可能会有些特性与jdk7之前不相同, 例如LinkedList LinkedHashMap中的双向...

Java技术剑
今天
4
0
怎样在ps中制作对话气泡?一招教你轻松解决

PS是在工作中经常使用的平面设计软件,利用ps可以实现很多操作。换天,换发色,添加亮灯等操作都是比较常见的,今天将为大家分享怎样在ps中制作对话气泡的方法,希望能给大家带来帮助。 绘制...

干货趣分享
今天
2
0
EDI 电子数据交换全解指南

EDI(Electronic Data Interchange,电子数据交换)技术使得企业与企业(B2B)实现通信自动化,帮助交易伙伴和组织更快更好地完成更多工作,并消除了人工操作带来的错误。从零售商到制造商、物...

EDI知行软件
今天
3
0
CentOS7的LVM动态扩容

# 问题 CentOS7上面的磁盘空间有点紧张,需要扩容。 解决 查询当前磁盘状态 [root@xxx ~]# lsblkNAME MAJ:MIN RM SIZE RO TYPE MOUNTPOINTfd0 2:0 1 4K ...

亚林瓜子
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部