文档章节

tigase内部处理(2):packet流转

greki
 greki
发布于 2014/06/07 15:36
字数 1544
阅读 1009
收藏 4
点赞 0
评论 0


在tigase内部处理(1):启动 里有体现http://my.oschina.net/greki/blog/275256

看tigase源码你会发现所有的tigase处理都是基于多线程,每个component都有自己的in和out处理线程,线程间的数据传输通过queue

总的流程大致就是:

1.socket监听收到客户端报文

这里主要介绍下,socket收到tcp报文转化为tigase内部的对象packet的过程;

主处理component:c2s:ClientConnectionManager

c2s在启动的时候创建了3类线程(创建过程参考启动篇):

socketReadThread():负责读socket的数据;

socketWriteThread():负责写socket;

ResultsListener:负责componet执行结果的处理;


  • socketReadThread和socketWriteThread循环selector.select(),得到selectionKey--->XMPPIOService放到队列forCompletion;
  • forCompletion的XMPPIOService,通过completionService.submit(serv)提交执行,调用XMPPIOService.call();
  • XMPPIOService.call(),完成了接收的socket报文到packet,作为packet在内部处理的入口(后面介绍这些处理);
  • ResultsListener根据XMPPIOService.call()返回的XMPPIOService对象,如果对象不为空重新放回到waiting列表(用于重新注册到clientsSel(selector))这里有点绕,也没太明白,好像是为了回避一个jvm bug;

2.component对packet的处理

  •     从XMPPIOService.call();
processSocketData();
if ((receivedPackets() > 0) && (serviceListener != null)) {
	serviceListener.packetsReady(this);//serviceListener 这里是clientConnectManager;在connectionOpenTrhead里设置
}
  • processSocketData()把报文转成xml element;再把element转成对应的packet;
while ((elem = elems.poll()) != null) {
  Packet pack = Packet.packetInstance(elem);//Message、Presence、IQ、其他Packet
  addReceivedPacket(pack);//puts processing results(packet) to queue(receivedPackets),
   sendAck(pack);//ack 模式,收到后发送ack
}
  • serviceListener.packetsReady()--->clientConnectManager.packetsReady()--->processSocketData([这步设置了packet.to和from属性,在message_router的时候用到])-->addOutPacket(packet)-->放到c2s.out_queues;
  • c2s.out_queues由QueueListener线程来读取处理;
if ((packet = filterPacket(packet, outgoing_filters)) != null) {//out_filter处理	
    processOutPacket(packet);//
}
  • processOutPacket()//By default this method just copies the given packet between queue
if (parent != null) {//parent指message_router,在初始化的时候设置
     parent.addPacket(packet);---->clientConnectManager一定是走这里,把packet构建完成后,交给messageRouter 来处理,添加到它的in_queue; } else {
     // It may happen for MessageRouter and this is intentional
     addPacketNB(packet);
}


  •  messageRouter对他的in_queue里的packet,被QueueListener线程轮训,主要调用messageRouter.processPacket()



ServerComponent comp = getLocalComponent(packet.getTo());//根据to属性,查找local的component,普通聊天,一般是路由到SessionManager组件

if (comp != null) {  Queue<Packet> results = new ArrayDeque<Packet>();

	if (comp == this) {

		// This is addressed to the MessageRouter itself. Has to be processed
		// separately to avoid recurential calls by the packet processing
		// method.
		processPacketMR(packet, results);
	} else {

		// All other components process the packet the same way.
		comp.processPacket(packet, results);//调用component处理
	}
	if (results.size() > 0) {//结果有返回继续,添回答到in_queue
		for (Packet res : results) {

			// No more recurrential calls!!
			addOutPacketNB(res);

			// processPacket(res);
		}    // end of for ()
	}

	// If the component is found the processing ends here as there can be
	// only one component with specific ID.
	return;
}

// This packet is not processed yet
// The packet can be addressed to just a domain, one of the virtual hosts
// The code below finds all components which handle packets addressed
// to a virtual domains (implement VHostListener and return 'true' from
// handlesLocalDomains() method call)
String            host  = packet.getTo().getDomain();
ServerComponent[] comps = getComponentsForLocalDomain(host);//local找不到服务器组件,到对应的集群的其他host查询

if (comps == null) {

	// Still no component found, now the most expensive lookup.
	// Checking regex routings provided by the component.
	comps = getServerComponentsForRegex(packet.getTo().getBareJID().toString());
}
if ((comps == null) &&!isLocalDomain(host)) {

	// None of the component want to process the packet.
	// If the packet is addressed to non-local domain then it is processed by
	// all components dealing with external world, like s2s
	comps = getComponentsForNonLocalDomain(host);
}

// Ok, if any component has been found then process the packet in a standard
// way
if (comps != null) {

	// Processing packet and handling results out
	Queue<Packet> results = new ArrayDeque<Packet>();

	for (ServerComponent serverComponent : comps) {
		if (log.isLoggable(Level.FINEST)) {
			log.log(Level.FINEST, "2. Packet will be processed by: {0}, {1}",
					new Object[] { serverComponent.getComponentId(),
					packet });
		}
		serverComponent.processPacket(packet, results);
		if (results.size() > 0) {
			for (Packet res : results) {

				// No more recurrential calls!!
				addOutPacketNB(res);

				// processPacket(res);
			}    // end of for ()
		}
	}
} else {

	// No components for the packet, sending an error back
	if (log.isLoggable(Level.FINEST)) {
		log.finest("There is no component for the packet, sending it back");
	}
	try {
		addOutPacketNB(Authorization.SERVICE_UNAVAILABLE.getResponseMessage(packet,
				"There is no service found to process your request.", true));
	} catch (PacketErrorTypeException e) {

		// This packet is to local domain, we don't want to send it out
		// drop packet :-(
		log.warning("Can't process packet to local domain, dropping..." + packet
				.toStringSecure());
	}
}


3.session-manager组件对packet的处理【插件处理】

参考:http://my.oschina.net/greki/blog/209588

ProcessorWorkerThread 调用 

void process( Packet packet, XMPPResourceConnection session,
NonAuthUserRepository repo, Queue<Packet> results,
Map<String, Object> settings ) throws XMPPException;


附录:packet说明


Objects of this class carry a single XMPP packet (stanza). The XMPP stanza is carried as an XML element in DOM structure by the Packet object which contains some extra information and convenience methods to quickly access the most important stanza information.
// packet是xmpp stanza封装对象

The stanza is accessible directly through the getElement() method and then it can be handles as an XML object. 
Please note! Even though the Packet object and carried the stanza Element is not unmodifiable it should be treated as such. This particular Packet can be processed concurrently at the same time in different components or plugins of the Tigase server. Modifying it may lead to unexpected and hard to diagnose behaviors. Every time you want to change or update the object you should obtaina a copy of it using one of the utility methods: copyElementOnly(), swapFromTo(...), errorResult(...), okResult(...), swapStanzaFromTo(...)
//packet会被并发处理,修改都要先copy

There are no public constructors for the class, instead you have to use factory methods: packetInstance(...) which return instance of one of the classes: Iq, Message or Presence. While creating a new Packet instance JIDs are parsed and processed through the stringprep. Hence some of the factory methods may throw TigaseStringprepException exception. You can avoid this by using the methods which accept preparsed JIDs. Reusing preparsed JIDs is highly recommended. 
//只能通过factory方法创建packetInstance

//有3地址
There are 3 kinds of addresses available from the Packet object: PacketFrom/To, StanzaFrom/To and From/To.
Stanza addresses are the normal XMPP addresses parsed from the XML stanza and as a convenience are available through methods as JID objects. This is not only convenient to the developer but also this is important for performance reasons as parsing JID and processing it through stringprep is quite expensive operation so it is better to do it once and reuse the parsed objects. Please note that any of them can be null. Note also. You should avoid parsing stanza JIDs from the XML element in your code as this may impact the server performance. Reuse the JIDs provided from the Packet methods.
//1.xmpp 报文的jid,

Packet addresses are also JID objects but they may contain a different values from the Stanza addresses. These are the Tigase internal addresses used by the server and they usually contain Tigase component source and destination address. In most cases they are used between connection managers and session managers and can be ignored by other code. One advantage of setting PacketFrom address to address of your component (getComponentId()) address is that if there is a packet delivery problem it will be returned back to the sender with apropriate error message.
//2.tigase内部地址,组件路由

Simple From/To addresses contains values following the logic: If PacketFrom/To is not null then it contains PacketFrom/To values otherwise it contains StanzaFrom/To values. This is because the Tigase server tries always to deliver and process the Packet using PacketFrom/To addresses if they are null then Stanza addresses are used instead. So these are just convenience methods which allow avoiding extra IFs in the program code and also save some CPU cycles. Created: Tue Nov 22 07:07:11 2005
//3.如果PacketFrom/To不为空,就是PacketFrom/To;否则就是xmpp stanza的地址;目的是减少程序if else判断




© 著作权归作者所有

共有 人打赏支持
greki
粉丝 95
博文 109
码字总数 45236
作品 0
杭州
技术主管
Tigase组件第一节 – 概述和基础信息

本文翻译自 - http://www.tigase.org/content/component-implementation-lesson-1-basics Tigase组件是一个具有jid的实体。它可以接受/处理也可以产生packet。 举一些大家都知道的组件:MUC...

greki
2014/03/19
0
0
Tigase组件 – Packet过滤

本文翻译自 – http://www.tigase.org/content/packet-filtering-component Packet过滤API Tigase为所有组件都提供了一个packet过滤API。你可以分别过滤传入和传出packet。 通过学习过滤,我...

greki
2014/03/20
0
0
Tigase组件第三节 – 多线程

Tigase组件第三节 – 多线程 发表评论作者 储天行 on 2010/11/16 本文翻译自 – http://www.tigase.org/content/component-implementation-lesson-3-multi-threading 拥有多个CPU或多核CPU的...

greki
2014/03/20
0
0
Tigase插件 – 编写插件

本文翻译自 – http://www.tigase.org/content/writing-plugin-code 上一篇文章描述了XMPP stanza如何在session manager当中被处理。处理分为四个步骤,每个步骤都有相对应类型的插件负责处理...

greki
2014/03/19
0
0
Tigase组件第四节 – 服务发现

Tigase组件第四节 – 服务发现 发表评论作者 储天行 on 2010/11/17 本文翻译自 – http://www.tigase.org/content/component-implementation-lesson-4-service-discovery 新组件在服务发现列...

greki
2014/03/20
0
0
Tigase组件第六节 – 脚本支持

脚本支持是Tigase的一个基本内置API,不需要任何额外的代价就能让所有的组件都自动支持脚本。但它只能访问那些通过你的代码继承到的父类组件变量,所以你需要把你的数据传递给脚本API。这篇文...

greki
2014/03/20
0
0
Tigase插件 – packets是如何被session manager和plugins处理的

本文翻译自 – http://www.tigase.org/content/how-packets-are-processed-sm-and-plugins 理解插件是如何工作的对开发插件是非常重要的,在不同的场景下由不同类型的插件来负责处理packet。...

greki
2014/03/19
0
0
Tigase组件第二节 – 配置(TODO:配置API和init.properties需要链接到中文

Tigase组件第二节 – 配置(TODO:配置API和init.properties需要链接到中文文档) 发表评论作者 储天行 on 2010/11/15 本文翻译自 - http://www.tigase.org/content/component-implementati...

greki
2014/03/19
0
0
tigase的每个连接消息量限制配置

现象: 一个客户端一次循环发送了10000多条;tigase会断线,断线一段时间后,客户端自动重连; 每个连接限制值: tigase.server.ConnectionManager里有很多限制值,跟其中有2个限制值有关; ...

greki
2014/05/14
0
0
详解Zoosk千万用户实时通信背后的开源技术[转]

http://www.csdn.net/article/2012-09-10/2809790-zoosk-the-engineering-behind-real-time 摘要:Zoosk是一个具有5000万会员的浪漫的社交约会网站,为了更好的让用户体验实时通信,让他们获...

强子哥哥
2016/05/25
66
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

rabbitmq学习记录(三)

工作队列:一个生产者,多个消费者,生产者直接将消息发送到rabbitmq的队列之中 默认采用的是轮询分配:即不管消费者处理信息的效率,队列给所有消费者轮流发送一条信息,直至消息发送完毕 ...

人觉非常君
18分钟前
0
0
Java 之 反射

反射,剖析 Java类 中的 各个组成部分,映射成 一个个 Java对象,多用于 框架和组件,写出复用性高的通用程序。 测试类代码如下: class Person { private String name; public St...

绝世武神
22分钟前
0
0
华为nova3超级慢动作酷玩抖音,没有办法我就是这么强大

华为nova3超级慢动作酷玩抖音,没有办法我就是这么强大!华为nova3超级慢动作酷玩抖音,没有办法我就是这么强大! 在华为最新发布的nova 3手机上,抖音通过华为himedia SDK集成了60fps、超级...

华为终端开放实验室
28分钟前
0
0
多 SSH Key 实现同一台服务器部署多 Git 仓库

本文以以下需求为背景,介绍详细的做法: 需在同一台服务器同时部署两个不同的 Github 仓库(对 Bitbucket 等 git 服务同样适用) root 用户可在远程登录 SSH 后附上预期的 SSH Key 进行 gi...

yeahlife
30分钟前
0
0
003. es6数值的扩展

一、普通扩展 Number 方法,将字符串、数值转为十进制 : Number('0b111') Number.isFinite() 用来检查一个数值是否为有限的:Number.isFinite(15) Number.isNan() 用来检查一个值是否为NaN N...

秋季长青
44分钟前
0
0
C语言数组和指针的语法糖

对于C语言,我可以这样秀:比如当创建一个数组arr[n]之后,一般我们去遍历数组的时候是for (int i = 0; i < n; i++) { a[i]; }但是我知道下表访问符[]是个语法糖,也就是说a[i]在编译器看来是...

ustbgaofan
52分钟前
0
0
Call to undefined function bcmath()的解决方法

乐意黎的ECS主机环境,Centos7.2 + PHP7 由于使用了bcdiv()函数,运行时总在抛错。 Fatal error: Call to undefined function bcmath() in /usr/loca/apache/htdocs/... on line 4 一查得知:......

dragon_tech
58分钟前
0
0
css优先级

..

architect刘源源
今天
0
0
【转】Twitter的分布式自增ID算法snowflake

结构 snowflake的结构如下(每部分用-分开): 0 - 0000000000 0000000000 0000000000 0000000000 0 - 00000 - 00000 - 000000000000 第一位为未使用,接下来的41位为毫秒级时间(41位的长度可以...

talen
今天
0
0
hive支持行级修改

Hive从0.14版本开始支持事务和行级更新,但缺省是不支持的,需要一些附加的配置。要想支持行级insert、update、delete,需要配置Hive支持事务。 一、Hive具有ACID语义事务的使用场景 1. 流式...

hblt-j
今天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部