文档章节

【原创】MySQL Proxy中socketpair的使用

摩云飞
 摩云飞
发布于 2013/04/09 20:36
字数 1371
阅读 1724
收藏 25

      学习 MySQL Proxy 0.8.3 的源码后可知,其全部事件处理线程均对全局 socketpair 的读端进行了监听,以实现通知管道的功能:threads->event_notify_fds[0] 。
int chassis_event_threads_init_thread(chassis_event_threads_t *threads, chassis_event_thread_t *event_thread, chassis *chas) {

	event_thread->event_base = event_base_new();
  ...
	// 设置当前线程监听 fd 为 socketpair 的读端 fd
	event_thread->notify_fd = dup(threads->event_notify_fds[0]);
  ...
	event_set(&(event_thread->notify_fd_event), event_thread->notify_fd, EV_READ | EV_PERSIST, chassis_event_handle, event_thread);
	event_base_set(event_thread->event_base, &(event_thread->notify_fd_event));
	event_add(&(event_thread->notify_fd_event), NULL);

	return 0;
}
该 socketpair 是在主线程初始化过程中创建的:
chassis_event_threads_t *chassis_event_threads_new() {
  ...
	threads = g_new0(chassis_event_threads_t, 1);

	/* create the ping-fds
	 *
	 * the event-thread write a byte to the ping-pipe to trigger a fd-event when
	 * something is available in the event-async-queues
	 */
	// 创建 socketpair
	if (0 != evutil_socketpair(AF_UNIX, SOCK_STREAM, 0, threads->event_notify_fds)) {
	...
	}
  ...
	/* make both ends non-blocking */
	evutil_make_socket_nonblocking(threads->event_notify_fds[0]);
	evutil_make_socket_nonblocking(threads->event_notify_fds[1]);

	return threads;
}
其中 evutil_socketpair 实现如下(取自 libevent 1.4.13): 
int
evutil_socketpair(int family, int type, int protocol, int fd[2])
{
#ifndef WIN32
	return socketpair(family, type, protocol, fd);
#else
	/* This code is originally from Tor.  Used with permission. */

	/* This socketpair does not work when localhost is down. So
	 * it's really not the same thing at all. But it's close enough
	 * for now, and really, when localhost is down sometimes, we
	 * have other problems too.
	 */
	int listener = -1;
	int connector = -1;
	int acceptor = -1;
	struct sockaddr_in listen_addr;
	struct sockaddr_in connect_addr;
	int size;
	int saved_errno = -1;

	if (protocol
#ifdef AF_UNIX
		|| family != AF_UNIX
#endif
		) {
		EVUTIL_SET_SOCKET_ERROR(WSAEAFNOSUPPORT);
		return -1;
	}
	if (!fd) {
		EVUTIL_SET_SOCKET_ERROR(WSAEINVAL);
		return -1;
	}

	// 创建作为listener 的socket
	listener = socket(AF_INET, type, 0);
	if (listener < 0)
		return -1;
	memset(&listen_addr, 0, sizeof(listen_addr));
	listen_addr.sin_family = AF_INET;
	listen_addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
	listen_addr.sin_port = 0;	/* kernel chooses port.	 */
	// 进行绑定,内核会分配port
	if (bind(listener, (struct sockaddr *) &listen_addr, sizeof (listen_addr)) == -1)
		goto tidy_up_and_fail;
	// 宣告开始监听连接请求
	if (listen(listener, 1) == -1)
		goto tidy_up_and_fail;

       // 创建作为connector 的socket
	connector = socket(AF_INET, type, 0);
	if (connector < 0)
		goto tidy_up_and_fail;
	/* We want to find out the port number to connect to.  */
	size = sizeof(connect_addr);
	// 获取bind 后内核为listener 分配的port ( ip 为INADDR_LOOPBACK )
	if (getsockname(listener, (struct sockaddr *) &connect_addr, &size) == -1)
		goto tidy_up_and_fail;
	if (size != sizeof (connect_addr))
		goto abort_tidy_up_and_fail;
	// 从connector 向listener 发起连接,connect_addr 为连接目的地址
	if (connect(connector, (struct sockaddr *) &connect_addr, sizeof(connect_addr)) == -1)
		goto tidy_up_and_fail;

	size = sizeof(listen_addr);
	// 在套接字listener 上accept ,函数返回后listen_addr 中为对端地址
	acceptor = accept(listener, (struct sockaddr *) &listen_addr, &size);
	if (acceptor < 0)
		goto tidy_up_and_fail;
	if (size != sizeof(listen_addr))
		goto abort_tidy_up_and_fail;
	// 关闭listener
	EVUTIL_CLOSESOCKET(listener);
	/* Now check we are talking to ourself by matching port and host on the
	   two sockets.	 */
	// 获取connect 后内核为connector 分配的地址信息-- 自动绑定功能
	if (getsockname(connector, (struct sockaddr *) &connect_addr, &size) == -1)
		goto tidy_up_and_fail;
	// 将从两侧分别获得的地址地址进行比较
	if (size != sizeof (connect_addr)
		|| listen_addr.sin_family != connect_addr.sin_family
		|| listen_addr.sin_addr.s_addr != connect_addr.sin_addr.s_addr
		|| listen_addr.sin_port != connect_addr.sin_port)
		goto abort_tidy_up_and_fail;
	fd[0] = connector;
	fd[1] = acceptor;

	return 0;

 abort_tidy_up_and_fail:
	saved_errno = WSAECONNABORTED;
 tidy_up_and_fail:
	if (saved_errno < 0)
		saved_errno = WSAGetLastError();
	if (listener != -1)
		EVUTIL_CLOSESOCKET(listener);
	if (connector != -1)
		EVUTIL_CLOSESOCKET(connector);
	if (acceptor != -1)
		EVUTIL_CLOSESOCKET(acceptor);

	EVUTIL_SET_SOCKET_ERROR(saved_errno);
	return -1;
#endif
}
      从上述实现中可以看出,在非 WIN32 平台,直接就可以使用现成的 API 函数创建 socketpair ;在 WIN32 平台上,是通过创建两个本地 socket 相互连接建立的 socketpair 。

      实现上述功能的另外一种方式是,使用 pipe 。用法很简单,摘抄代码如下(摘自 memcached-1.4.14):
void thread_init(int nthreads, struct event_base *main_base) {
...
    // nthreads 为创建的工作线程数
    for (i = 0; i < nthreads; i++) {
        int fds[2];
        if (pipe(fds)) {  // 使用pipe 作为工作线程获取任务的通道
            perror("Can't create notify pipe");
            exit(1);
        }

        threads[i].notify_receive_fd = fds[0];  // 读端
        threads[i].notify_send_fd = fds[1];  // 写端

        // 设置用于每个工作线程的libevent 相关信息并创建CQ 结构
        setup_thread(&threads[i]);
        ...
    }

    /* Create threads after we've done all the libevent setup. */
    // 创建工作线程
    for (i = 0; i < nthreads; i++) {
        create_worker(worker_libevent, &threads[i]);
    }
...
}
       至于用哪种更好,大家自己思考~~

====== 更新 2013-11-11 ======

      最近写 Modb 代码时,想要利用上面的线程间通信机制,所以使用了相对简单的 pipe 实现方案,但在 windows 下调试时总会遇到  “Unknown error 10038” 错误。查阅相关文档后发现,结论是  windows 下不能将 pipe 和 select 一起使用,因为会认为 pipe 不是一个合法的 socket 句柄,然后 linux 下是没有这个问题的。
解决方案:
  • 通过 socket 模拟 pipe 的实现;
  • 使用上面的 socketpair 实现;
      网上找到一份“为了 windows 上能够对 pipe 句柄进行 select” 而采用 socket 模拟 pipe 的 实现 。代码留存如下:
int pipe(int fildes[2])
{
    int tcp1, tcp2;
    sockaddr_in name;
    memset(&name, 0, sizeof(name));
    name.sin_family = AF_INET;
    name.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
    int namelen = sizeof(name);
    tcp1 = tcp2 = -1;

    int tcp = socket(AF_INET, SOCK_STREAM, 0);
    if (tcp == -1){
        goto clean;
    }
    if (bind(tcp, (sockaddr*)&name, namelen) == -1){
        goto clean;
    }
    if (listen(tcp, 5) == -1){
        goto clean;
    }
    if (getsockname(tcp, (sockaddr*)&name, &namelen) == -1){
        goto clean;
    }
    tcp1 = socket(AF_INET, SOCK_STREAM, 0);
    if (tcp1 == -1){
        goto clean;
    }
    if (-1 == connect(tcp1, (sockaddr*)&name, namelen)){
        goto clean;
    }

    tcp2 = accept(tcp, (sockaddr*)&name, &namelen);
    if (tcp2 == -1){
        goto clean;
    }
    if (closesocket(tcp) == -1){
        goto clean;
    }
    fildes[0] = tcp1;
    fildes[1] = tcp2;
    return 0;
clean:
    if (tcp != -1){
        closesocket(tcp);
    }
    if (tcp2 != -1){
        closesocket(tcp2);
    }
    if (tcp1 != -1){
        closesocket(tcp1);
    }
    return -1;
}
原文作者指出有如下缺点: 
  • 效率低下(是否所有其他实现方式都比基于 socket 的方式高效?)
  • 占用了两个 TCP 端口(pipe 不会占用端口)
  • accept 的返回值未必就是 tcp1 连接过来的(多线程或者别的进程在干预), 所以最好通过发送数据进行确认(这个比较严重,在有多个连接同时进入的时候确实无法保证当前连接时正确的)
  • 由于不是匿名的, 所以可以在 netstat 里面看到(看到又怎样?)
优点只有一个, 可以使用 select 调用。

      将该 pipe 实现和上面的 socketpair 的实现进行对比,发现两者根本就是同一个东东,并且 pipe 的实现没有 libevent 中 socketpair 实现写的好。所以 pipe 实现的作者指出的那些缺点,本人持保留意见。看客自己斟酌。

补充:由于上面的 socketpair 是基于  INADDR_LOOPBACK 的,所以如果 lo 必须处于 up 状态才行。

© 著作权归作者所有

摩云飞
粉丝 372
博文 534
码字总数 952694
作品 0
徐汇
程序员
私信 提问
【原创】modb 功能设计之“跨线程通信”

在《 modb 功能设计之“支持多消费者单生产者” 》中曾提到,需要解决“rabbitmq 线程与 sql 执行线程之间的跨线程通信手段”。本文针对这个问题进行一些说明。 【第一个版本】 使用 pipe 作...

摩云飞
2013/12/16
0
0
(转载)低成本和高性能MySQL云数据的架构探索

转载自Erlang非业余研究 本文链接地址: 低成本和高性能MySQL云数据的架构探索 原文地址:http://www.alibabatech.org/article/detail/3405/0?ticket=d69f07f8-b60b-43f8-9572-7d795bb8429d ...

景德真人
2012/10/26
819
2
【原创】MySQL Proxy - 概况

【15.7. MySQL Proxy】 MySQL Proxy 是一种在网络上使用 MySQL 网络协议进行通信的应用,提供了一或多个 MySQL server 与一或多个 MySQL client 之间的通信功能。由于 MySQL Proxy 使用的是 ...

摩云飞
2013/03/05
0
0
【原创】MySQL Proxy - 命令行选项

【 15.7.3. MySQL Proxy Command Options】 直接从命令行上启动 MySQL Proxy : shell> mysql-proxy 在大多数情况下,你至少应该确定 MySQL Proxy 应该将 query 转发到哪个 backend MySQL se...

摩云飞
2013/03/05
0
0
【原创】MySQL Proxy - 内部结构

【15.7.4.2. Internal Structures】 在 MySQL Proxy 的脚本元素中有一些基本的内部结构需要知道。其中最主要的结构就是 proxy ,其提供了访问贯穿脚本中的许多公共结构的接口,例如连接列表和...

摩云飞
2013/03/05
0
0

没有更多内容

加载失败,请刷新页面

加载更多

NIO基于长度域的报文在Netty下的解码

1, 先复习一下粘包/拆包 1.1, 粘包/拆包的含义 TCP是个“流”协议, 并不了解上层业务数据的具体含义, 它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP...

老菜鸟0217
今天
8
0
从零开始搭建spring-cloud(2) ----ribbon

在微服务架构中,业务都会被拆分成一个独立的服务,服务与服务的通讯是基于http restful的。Spring cloud有两种服务调用方式,一种是ribbon+restTemplate,另一种是feign。 其实我们已经在上...

Vincent-Duan
今天
19
0
get和post的区别?

doGet:路径传参。效率高,安全性差(get的传送数据量有限制,不能大于2Kb) doPOST:实体传参。效率低,安全性好 建议: 1、get方式的安全性较Post方式要差些,包含机密信息的话,建议用Pos...

花无谢
昨天
4
0
当谈论迭代器时,我谈些什么?

当谈论迭代器时,我谈些什么? 花下猫语:之前说过,我对于编程语言跟其它学科的融合非常感兴趣,但我还说漏了一点,就是我对于 Python 跟其它编程语言的对比学习,也很感兴趣。所以,我一直...

豌豆花下猫
昨天
14
0
10天学Python直接做项目,我做了这5件事

初学者如何尽快上手python? 市面上关于如何学python的资料很多,但是讲的都太复杂。 我就是很简单的几句话,从小白到开发工程师,我只做了五件事。 我觉得任何商业计划书如果不能用几句话讲...

Python派森
昨天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部