文档章节

22_BasicTaskScheduler基本任务调度器(二)——Live555源码阅读(一)任务调

乌合之众
 乌合之众
发布于 2015/06/16 09:03
字数 1841
阅读 159
收藏 0

#22_BasicTaskScheduler基本任务调度器(二)——Live555源码阅读(一)任务调度相关类

[TOC]

这是Live555源码阅读的第二部分,包括了任务调度相关的三个类。任务调度是Live555源码中很重要的部分。

本文由乌合之众 lym瞎编,欢迎转载 my.oschina.net/oloroso

##SingleStep方法

这是这里最重要的一个方法。每一次调用都是一次真正的处理数据的过程。 前面的延时队列DelayQueue、处理程序链表HanlerSet、触发器数组fTriggeredEventHandlersfTriggeredEventClientDatas都是在这里被真正的调度起来的。 这一段的代码很长,过程有点多。要联系了前面讲过的内容来看才能比较好理解。这里要注意的fLastHandledSocketNum成员的操作。因为其在别的位置都没有修改过,只在这里轮询处理的时候,如果有处理了fHandlers中某个节点的时候才会去设置。再一个要思考的是,fHandlers中的元素是**从何而来的?**在BasicTaskScheduler的两个基类中都没有对fHandlers成员有相关的操作。

这个函数做了三件事情。

  1. 获取延时队列头结点的延时剩余时间,作为select操作的超时时间。调用select监控三个集合。 如果select调用成功了,那么就开始轮询HandlerSet对象fHandlers中的节点,有符合条件的就使用其内部保存的函数指针和数据指针以及条件掩码来调用函数。
  2. 处理等待触发事件集里面的事件。
  3. 处理延时队列中到达延时时间的节点。
void BasicTaskScheduler::SingleStep(unsigned maxDelayTime) {
	//拷贝三个集合去给select调用做参数
	fd_set readSet = fReadSet; // make a copy for this select() call
	fd_set writeSet = fWriteSet; // ditto
	fd_set exceptionSet = fExceptionSet; // ditto
	//获取延时队列头结点的延时剩余时间(作为select超时时间)
	DelayInterval const& timeToDelay = fDelayQueue.timeToNextAlarm();
	struct timeval tv_timeToDelay;
	tv_timeToDelay.tv_sec = timeToDelay.seconds();
	tv_timeToDelay.tv_usec = timeToDelay.useconds();
	// Very large "tv_sec" values cause select() to fail.
	// Don't make it any larger than 1 million seconds (11.5 days)
	// 控制在1百万秒以内
	const long MAX_TV_SEC = MILLION;
	if (tv_timeToDelay.tv_sec > MAX_TV_SEC) {
		tv_timeToDelay.tv_sec = MAX_TV_SEC;
	}
	// Also check our "maxDelayTime" parameter (if it's > 0):
	if (maxDelayTime > 0 &&
		(tv_timeToDelay.tv_sec > (long)maxDelayTime / MILLION ||
		(tv_timeToDelay.tv_sec == (long)maxDelayTime / MILLION &&
		tv_timeToDelay.tv_usec > (long)maxDelayTime%MILLION))) {
		tv_timeToDelay.tv_sec = maxDelayTime / MILLION;
		tv_timeToDelay.tv_usec = maxDelayTime%MILLION;
	}

	//调用select来监控集合
	int selectResult = select(fMaxNumSockets, &readSet, &writeSet, &exceptionSet, &tv_timeToDelay);
//------------------------------------------------------------------------------------------
	//select出错返回,处理错误
	if (selectResult < 0) {
#if defined(__WIN32__) || defined(_WIN32)
		int err = WSAGetLastError();
		// For some unknown reason, select() in Windoze sometimes fails with WSAEINVAL if
		// it was called with no entries set in "readSet".  If this happens, ignore it:
		if (err == WSAEINVAL && readSet.fd_count == 0) {
			err = EINTR;
			// To stop this from happening again, create a dummy socket:
			int dummySocketNum = socket(AF_INET, SOCK_DGRAM, 0);
			FD_SET((unsigned)dummySocketNum, &fReadSet);
		}
		if (err != EINTR) {
#else
		if (errno != EINTR && errno != EAGAIN) {
#endif
			// Unexpected error - treat this as fatal:
#if !defined(_WIN32_WCE)
			perror("BasicTaskScheduler::SingleStep(): select() fails");
#endif
			//内部错误,调用abort()
			internalError();
		}
	}
//-----------------------------------------------------------------------------
	//开始处理
	// Call the handler function for one readable socket:
	HandlerIterator iter(*fHandlers);
	HandlerDescriptor* handler;
	// To ensure forward progress through the handlers, begin past the last
	// socket number that we handled:
	//注意fLastHandledSocketNum如果不为-1,说明已经调度过某些任务了
	if (fLastHandledSocketNum >= 0) {
		while ((handler = iter.next()) != NULL) {
			//从链表中找上一次最后调度的处理程序描述对象
			if (handler->socketNum == fLastHandledSocketNum) break;
		}
		if (handler == NULL) {
			fLastHandledSocketNum = -1;	//没有找到
			iter.reset(); // start from the beginning instead	迭代器回到起点
		}
	}
	//轮询处理
	//如果上面最后一个Handle == NULL成立了,那么这里不会进入,iter.next()还是会返回NULL
	//也就是说上次最后被调度的对象被找到了,这里的循环才会进入
	//这是为了提高效率,因为找到了最后一个被调度的元素,那么其之前的元素就都已经被调度过了
	while ((handler = iter.next()) != NULL) {
		int sock = handler->socketNum; // alias 别名
		int resultConditionSet = 0;		// 结果条件(状态)集合
		if (FD_ISSET(sock, &readSet) && FD_ISSET(sock, &fReadSet)/*sanity理智 check*/) resultConditionSet |= SOCKET_READABLE; //添加可读属性
		if (FD_ISSET(sock, &writeSet) && FD_ISSET(sock, &fWriteSet)/*sanity check*/) resultConditionSet |= SOCKET_WRITABLE; //添加可写属性
		if (FD_ISSET(sock, &exceptionSet) && FD_ISSET(sock, &fExceptionSet)/*sanity check*/) resultConditionSet |= SOCKET_EXCEPTION;	   //添加异常属性
		if ((resultConditionSet&handler->conditionSet) != 0 && handler->handlerProc != NULL) {
			fLastHandledSocketNum = sock;
			// Note: we set "fLastHandledSocketNum" before calling the handler,
			// in case the handler calls "doEventLoop()" reentrantly.
			//调用相关处理
			(*handler->handlerProc)(handler->clientData, resultConditionSet);
			break;
		}
	}
	//如果没有找到上次最后被调度的对象,并且fLastHandledSocketNum标识存在
	if (handler == NULL && fLastHandledSocketNum >= 0) {
		// We didn't call a handler, but we didn't get to check all of them,
		// so try again from the beginning:
		// 我们没有给一个处理程序,但我们没有去检查所有这些,所以试着重新开始:
		iter.reset();	//回到链表头
		//从链表第头开始轮询处理
		while ((handler = iter.next()) != NULL) {
			int sock = handler->socketNum; // alias
			int resultConditionSet = 0;
			if (FD_ISSET(sock, &readSet) && FD_ISSET(sock, &fReadSet)/*sanity check*/) resultConditionSet |= SOCKET_READABLE;
			if (FD_ISSET(sock, &writeSet) && FD_ISSET(sock, &fWriteSet)/*sanity check*/) resultConditionSet |= SOCKET_WRITABLE;
			if (FD_ISSET(sock, &exceptionSet) && FD_ISSET(sock, &fExceptionSet)/*sanity check*/) resultConditionSet |= SOCKET_EXCEPTION;

			if ((resultConditionSet&handler->conditionSet) != 0 && handler->handlerProc != NULL) {
				//设置fLastHandledSocketNum为最后一个被调用的处理程序的标识
				fLastHandledSocketNum = sock;
				// Note: we set "fLastHandledSocketNum" before calling the handler,
				// in case the handler calls "doEventLoop()" reentrantly.
				(*handler->handlerProc)(handler->clientData, resultConditionSet);
				break;
			}
		}
		// 没有一个合适的处理程序被调用
		if (handler == NULL) fLastHandledSocketNum = -1;//because we didn't call a handler
	}
//==========================================================================================

	// Also handle any newly-triggered event (Note that we do this *after* calling a socket handler,
	// in case the triggered event handler modifies The set of readable sockets.)
	// 处理等待触发的事件,这个在fTriggersAwaitingHandling中被标识
	if (fTriggersAwaitingHandling != 0) {
		if (fTriggersAwaitingHandling == fLastUsedTriggerMask) {
		//只有一个等待触发的事件
			// Common-case optimization for a single event trigger:
			fTriggersAwaitingHandling = 0;
			if (fTriggeredEventHandlers[fLastUsedTriggerNum] != NULL) {
				//函数调用
				(*fTriggeredEventHandlers[fLastUsedTriggerNum])(fTriggeredEventClientDatas[fLastUsedTriggerNum]);
			}
		}
		else {
			// 有多个等待触发的事件
			// Look for an event trigger that needs handling (making sure that we make forward progress through all possible triggers):
			unsigned i = fLastUsedTriggerNum;
			EventTriggerId mask = fLastUsedTriggerMask;

			do {
				i = (i + 1) % MAX_NUM_EVENT_TRIGGERS;
				mask >>= 1;
				if (mask == 0) mask = 0x80000000;

				if ((fTriggersAwaitingHandling&mask) != 0) {
					fTriggersAwaitingHandling &= ~mask;
					if (fTriggeredEventHandlers[i] != NULL) {
						(*fTriggeredEventHandlers[i])(fTriggeredEventClientDatas[i]);
					}

					fLastUsedTriggerMask = mask;
					fLastUsedTriggerNum = i;
					break;
				}
			} while (i != fLastUsedTriggerNum);
		}
	}
//======================================================================================
	// Also handle any delayed event that may have come due.
	// 处理延时队列中已经到时间的延时任务
	fDelayQueue.handleAlarm();
}

##setBackgroundHandling方法(添加后台处理程序)

setBackgroundHandling方法用于添加或更新一个处理程序到fHandlers链表。如果conditionSet为0,就将socketNum标识的节点从fHandlers中移除。否则若socketNum标识的节点存在,就更新,否则就添加一个节点。

void BasicTaskScheduler
::setBackgroundHandling(int socketNum, int conditionSet, BackgroundHandlerProc* handlerProc, void* clientData) {
	if (socketNum < 0) return;	//标识不合法
	FD_CLR((unsigned)socketNum, &fReadSet);		//不监控此套接口的可读状态
	FD_CLR((unsigned)socketNum, &fWriteSet);	//写
	FD_CLR((unsigned)socketNum, &fExceptionSet);//异常
	if (conditionSet == 0) {	//不监控任何可操作状态
		fHandlers->clearHandler(socketNum);	//从链表中移除
		if (socketNum + 1 == fMaxNumSockets) {	//最大socket数减1,效率提升
			--fMaxNumSockets;
		}
	}
	else {
		//更新链表,分配处理程序
		fHandlers->assignHandler(socketNum, conditionSet, handlerProc, clientData);
		if (socketNum + 1 > fMaxNumSockets) {
			fMaxNumSockets = socketNum + 1;	//更新最大socket数
		}
		//设置要监控的状态
		if (conditionSet&SOCKET_READABLE) FD_SET((unsigned)socketNum, &fReadSet);
		if (conditionSet&SOCKET_WRITABLE) FD_SET((unsigned)socketNum, &fWriteSet);
		if (conditionSet&SOCKET_EXCEPTION) FD_SET((unsigned)socketNum, &fExceptionSet);
	}
}

##moveSocketHandling方法(转移socket处理)

这个方法名不怎么好翻译,有点类似C++11 move操作。都是转移操作。这里是将原本对oldSocketNum套接口操作的处理程序转移到去操作newSocketNum套接口。如果原本oldSocketNum就不再链表fHandler中呢?那就相当于仅仅把对oldSocketNum的监控给移除了。注意,这里设置了对newSocketNum的监控,而无论其是否被加入到fHandler链表。

void BasicTaskScheduler::moveSocketHandling(int oldSocketNum, int newSocketNum) {
	if (oldSocketNum < 0 || newSocketNum < 0) return; // sanity check完整性检查
	//清理三个集合中对oldSocketNum的监控
	if (FD_ISSET(oldSocketNum, &fReadSet)) { FD_CLR((unsigned)oldSocketNum, &fReadSet); FD_SET((unsigned)newSocketNum, &fReadSet); }
	if (FD_ISSET(oldSocketNum, &fWriteSet)) { FD_CLR((unsigned)oldSocketNum, &fWriteSet); FD_SET((unsigned)newSocketNum, &fWriteSet); }
	if (FD_ISSET(oldSocketNum, &fExceptionSet)) { FD_CLR((unsigned)oldSocketNum, &fExceptionSet); FD_SET((unsigned)newSocketNum, &fExceptionSet); }
	//替换socketNum
	fHandlers->moveHandler(oldSocketNum, newSocketNum);

	if (oldSocketNum + 1 == fMaxNumSockets) {
		--fMaxNumSockets;
	}
	if (newSocketNum + 1 > fMaxNumSockets) {
		fMaxNumSockets = newSocketNum + 1;
	}
}

© 著作权归作者所有

乌合之众
粉丝 14
博文 90
码字总数 79438
作品 1
海淀
程序员
私信 提问
Live555学习基础类

Live555主要用到的类介绍: 1.UsageEnvironment类:抽象基类 class UsageEnvironment {public:void reclaim(); TaskScheduler& taskScheduler() const {return fScheduler;}//返回TaskSched......

rootusers
2015/01/08
0
0
RTSP学习笔记(1)live555

1、UsageEnvironment UsageEnvironment代表了整个程序的运行环境,其中保存了一些全局变量,定义了一些输出函数(纯虚函数)。从这些输出函数就可以看出该类主要用于充当程序的全局上下文,并...

Sean-x
2016/02/23
95
0
基于Mesos的作业云 Elastic-Job-Cloud 源码分析 —— 作业调度(一)

本文基于 Elastic-Job V2.1.5 版本分享 Elastic-Job-Cloud 源码分析系列(6篇)传送门 1. 概述 2. 作业执行类型 3. Producer 发布任务 3.1 常驻作业 3.2 瞬时作业 3.3 小结 4. TaskLaunchSch...

芋道源码掘金Java群217878901
2017/09/07
0
0
taobao-pamirs-schedule-2.0源码分析——类设计

使用方法 首先学习一个开源项目,一定要先学习该开源项目的使用方法。该项目的使用方法本文不再详述。请参考博文: http://pinsir.iteye.com/blog/882275 http://pinsir.iteye.com/blog/882...

杨武兵
2016/03/02
1K
8
Live555源代码解读(3)

四、计划任务(TaskScheduler) 我们且把三种任务命名为:socket handler,event handler,delay task。这三种任务的特点是,前两个加入执行队列后会一直存在,而delay task在执行完一次后会立...

Sean-x
2016/02/24
48
0

没有更多内容

加载失败,请刷新页面

加载更多

VMware vSphere ESXi主机的访问控制

在vShpere中,访问ESXi主机的途径很多,如下: ESXi DCUI ESXi Shell ESXi SSH ESXi Host Client vCenter --> vSphere web client / vSphere Client VMware vSphere ESXi主机的访问控制,除了......

大别阿郎
33分钟前
4
0
大神讲解CGI、FastCGI和PHP-FPM关系图解

参考资料 概念了解:CGI,FastCGI,PHP-CGI与PHP-FPM:http://www.nowamagic.net/librarys/veda/detail/1319 php中fastcgi和php-fpm是什么东西:https://www.zybuluo.com/phper/note/50231 ......

网络小虾米
42分钟前
4
0
《DNS攻击防范科普系列3》 -如何保障 DNS 操作安全

引言 前两讲我们介绍了 DNS 相关的攻击类型,以及针对 DDoS 攻击的防范措施。这些都是更底层的知识,有同学就来问能否讲讲和我们的日常操作相关的知识点,今天我们就来说说和我们日常 DNS 操...

Mr_zebra
42分钟前
4
0
zk中ServerCnxn

实现接口Stats, Watcher 内部类 DisconnectReason CloseRequestException EndOfStreamException(流关闭) 属性 方法 getSessionTimeout 获取session失效时间 sendResponse 发送回复数据 se......

writeademo
47分钟前
4
0
如何将 Redis 用于微服务通信的事件存储

来源:Redislabs 作者:Martin Forstner 翻译:Kevin (公众号:中间件小哥) 以我的经验,将某些应用拆分成更小的、松耦合的、可协同工作的独立逻辑业务服务会更易于构建和维护。这些服务(也...

中间件小哥
51分钟前
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部