文档章节

tigase内部处理(2):packet流转

greki
 greki
发布于 2014/06/07 15:36
字数 1544
阅读 1034
收藏 4


在tigase内部处理(1):启动 里有体现http://my.oschina.net/greki/blog/275256

看tigase源码你会发现所有的tigase处理都是基于多线程,每个component都有自己的in和out处理线程,线程间的数据传输通过queue

总的流程大致就是:

1.socket监听收到客户端报文

这里主要介绍下,socket收到tcp报文转化为tigase内部的对象packet的过程;

主处理component:c2s:ClientConnectionManager

c2s在启动的时候创建了3类线程(创建过程参考启动篇):

socketReadThread():负责读socket的数据;

socketWriteThread():负责写socket;

ResultsListener:负责componet执行结果的处理;


  • socketReadThread和socketWriteThread循环selector.select(),得到selectionKey--->XMPPIOService放到队列forCompletion;
  • forCompletion的XMPPIOService,通过completionService.submit(serv)提交执行,调用XMPPIOService.call();
  • XMPPIOService.call(),完成了接收的socket报文到packet,作为packet在内部处理的入口(后面介绍这些处理);
  • ResultsListener根据XMPPIOService.call()返回的XMPPIOService对象,如果对象不为空重新放回到waiting列表(用于重新注册到clientsSel(selector))这里有点绕,也没太明白,好像是为了回避一个jvm bug;

2.component对packet的处理

  •     从XMPPIOService.call();
processSocketData();
if ((receivedPackets() > 0) && (serviceListener != null)) {
	serviceListener.packetsReady(this);//serviceListener 这里是clientConnectManager;在connectionOpenTrhead里设置
}
  • processSocketData()把报文转成xml element;再把element转成对应的packet;
while ((elem = elems.poll()) != null) {
  Packet pack = Packet.packetInstance(elem);//Message、Presence、IQ、其他Packet
  addReceivedPacket(pack);//puts processing results(packet) to queue(receivedPackets),
   sendAck(pack);//ack 模式,收到后发送ack
}
  • serviceListener.packetsReady()--->clientConnectManager.packetsReady()--->processSocketData([这步设置了packet.to和from属性,在message_router的时候用到])-->addOutPacket(packet)-->放到c2s.out_queues;
  • c2s.out_queues由QueueListener线程来读取处理;
if ((packet = filterPacket(packet, outgoing_filters)) != null) {//out_filter处理	
    processOutPacket(packet);//
}
  • processOutPacket()//By default this method just copies the given packet between queue
if (parent != null) {//parent指message_router,在初始化的时候设置
     parent.addPacket(packet);---->clientConnectManager一定是走这里,把packet构建完成后,交给messageRouter 来处理,添加到它的in_queue; } else {
     // It may happen for MessageRouter and this is intentional
     addPacketNB(packet);
}


  •  messageRouter对他的in_queue里的packet,被QueueListener线程轮训,主要调用messageRouter.processPacket()



ServerComponent comp = getLocalComponent(packet.getTo());//根据to属性,查找local的component,普通聊天,一般是路由到SessionManager组件

if (comp != null) {  Queue<Packet> results = new ArrayDeque<Packet>();

	if (comp == this) {

		// This is addressed to the MessageRouter itself. Has to be processed
		// separately to avoid recurential calls by the packet processing
		// method.
		processPacketMR(packet, results);
	} else {

		// All other components process the packet the same way.
		comp.processPacket(packet, results);//调用component处理
	}
	if (results.size() > 0) {//结果有返回继续,添回答到in_queue
		for (Packet res : results) {

			// No more recurrential calls!!
			addOutPacketNB(res);

			// processPacket(res);
		}    // end of for ()
	}

	// If the component is found the processing ends here as there can be
	// only one component with specific ID.
	return;
}

// This packet is not processed yet
// The packet can be addressed to just a domain, one of the virtual hosts
// The code below finds all components which handle packets addressed
// to a virtual domains (implement VHostListener and return 'true' from
// handlesLocalDomains() method call)
String            host  = packet.getTo().getDomain();
ServerComponent[] comps = getComponentsForLocalDomain(host);//local找不到服务器组件,到对应的集群的其他host查询

if (comps == null) {

	// Still no component found, now the most expensive lookup.
	// Checking regex routings provided by the component.
	comps = getServerComponentsForRegex(packet.getTo().getBareJID().toString());
}
if ((comps == null) &&!isLocalDomain(host)) {

	// None of the component want to process the packet.
	// If the packet is addressed to non-local domain then it is processed by
	// all components dealing with external world, like s2s
	comps = getComponentsForNonLocalDomain(host);
}

// Ok, if any component has been found then process the packet in a standard
// way
if (comps != null) {

	// Processing packet and handling results out
	Queue<Packet> results = new ArrayDeque<Packet>();

	for (ServerComponent serverComponent : comps) {
		if (log.isLoggable(Level.FINEST)) {
			log.log(Level.FINEST, "2. Packet will be processed by: {0}, {1}",
					new Object[] { serverComponent.getComponentId(),
					packet });
		}
		serverComponent.processPacket(packet, results);
		if (results.size() > 0) {
			for (Packet res : results) {

				// No more recurrential calls!!
				addOutPacketNB(res);

				// processPacket(res);
			}    // end of for ()
		}
	}
} else {

	// No components for the packet, sending an error back
	if (log.isLoggable(Level.FINEST)) {
		log.finest("There is no component for the packet, sending it back");
	}
	try {
		addOutPacketNB(Authorization.SERVICE_UNAVAILABLE.getResponseMessage(packet,
				"There is no service found to process your request.", true));
	} catch (PacketErrorTypeException e) {

		// This packet is to local domain, we don't want to send it out
		// drop packet :-(
		log.warning("Can't process packet to local domain, dropping..." + packet
				.toStringSecure());
	}
}


3.session-manager组件对packet的处理【插件处理】

参考:http://my.oschina.net/greki/blog/209588

ProcessorWorkerThread 调用 

void process( Packet packet, XMPPResourceConnection session,
NonAuthUserRepository repo, Queue<Packet> results,
Map<String, Object> settings ) throws XMPPException;


附录:packet说明


Objects of this class carry a single XMPP packet (stanza). The XMPP stanza is carried as an XML element in DOM structure by the Packet object which contains some extra information and convenience methods to quickly access the most important stanza information.
// packet是xmpp stanza封装对象

The stanza is accessible directly through the getElement() method and then it can be handles as an XML object. 
Please note! Even though the Packet object and carried the stanza Element is not unmodifiable it should be treated as such. This particular Packet can be processed concurrently at the same time in different components or plugins of the Tigase server. Modifying it may lead to unexpected and hard to diagnose behaviors. Every time you want to change or update the object you should obtaina a copy of it using one of the utility methods: copyElementOnly(), swapFromTo(...), errorResult(...), okResult(...), swapStanzaFromTo(...)
//packet会被并发处理,修改都要先copy

There are no public constructors for the class, instead you have to use factory methods: packetInstance(...) which return instance of one of the classes: Iq, Message or Presence. While creating a new Packet instance JIDs are parsed and processed through the stringprep. Hence some of the factory methods may throw TigaseStringprepException exception. You can avoid this by using the methods which accept preparsed JIDs. Reusing preparsed JIDs is highly recommended. 
//只能通过factory方法创建packetInstance

//有3地址
There are 3 kinds of addresses available from the Packet object: PacketFrom/To, StanzaFrom/To and From/To.
Stanza addresses are the normal XMPP addresses parsed from the XML stanza and as a convenience are available through methods as JID objects. This is not only convenient to the developer but also this is important for performance reasons as parsing JID and processing it through stringprep is quite expensive operation so it is better to do it once and reuse the parsed objects. Please note that any of them can be null. Note also. You should avoid parsing stanza JIDs from the XML element in your code as this may impact the server performance. Reuse the JIDs provided from the Packet methods.
//1.xmpp 报文的jid,

Packet addresses are also JID objects but they may contain a different values from the Stanza addresses. These are the Tigase internal addresses used by the server and they usually contain Tigase component source and destination address. In most cases they are used between connection managers and session managers and can be ignored by other code. One advantage of setting PacketFrom address to address of your component (getComponentId()) address is that if there is a packet delivery problem it will be returned back to the sender with apropriate error message.
//2.tigase内部地址,组件路由

Simple From/To addresses contains values following the logic: If PacketFrom/To is not null then it contains PacketFrom/To values otherwise it contains StanzaFrom/To values. This is because the Tigase server tries always to deliver and process the Packet using PacketFrom/To addresses if they are null then Stanza addresses are used instead. So these are just convenience methods which allow avoiding extra IFs in the program code and also save some CPU cycles. Created: Tue Nov 22 07:07:11 2005
//3.如果PacketFrom/To不为空,就是PacketFrom/To;否则就是xmpp stanza的地址;目的是减少程序if else判断




© 著作权归作者所有

共有 人打赏支持
greki
粉丝 98
博文 109
码字总数 45236
作品 0
杭州
技术主管
Tigase组件第一节 – 概述和基础信息

本文翻译自 - http://www.tigase.org/content/component-implementation-lesson-1-basics Tigase组件是一个具有jid的实体。它可以接受/处理也可以产生packet。 举一些大家都知道的组件:MUC...

greki
2014/03/19
0
0
Tigase组件 – Packet过滤

本文翻译自 – http://www.tigase.org/content/packet-filtering-component Packet过滤API Tigase为所有组件都提供了一个packet过滤API。你可以分别过滤传入和传出packet。 通过学习过滤,我...

greki
2014/03/20
0
0
Tigase组件第三节 – 多线程

Tigase组件第三节 – 多线程 发表评论作者 储天行 on 2010/11/16 本文翻译自 – http://www.tigase.org/content/component-implementation-lesson-3-multi-threading 拥有多个CPU或多核CPU的...

greki
2014/03/20
0
0
Tigase插件 – 编写插件

本文翻译自 – http://www.tigase.org/content/writing-plugin-code 上一篇文章描述了XMPP stanza如何在session manager当中被处理。处理分为四个步骤,每个步骤都有相对应类型的插件负责处理...

greki
2014/03/19
0
0
Tigase组件第四节 – 服务发现

Tigase组件第四节 – 服务发现 发表评论作者 储天行 on 2010/11/17 本文翻译自 – http://www.tigase.org/content/component-implementation-lesson-4-service-discovery 新组件在服务发现列...

greki
2014/03/20
0
0

没有更多内容

加载失败,请刷新页面

加载更多

马太效应

马太效应

yizhichao
14分钟前
0
0
69.for while循环 continue break exit

20.10 for循环 20.11/20.12 while循环 20.13 break跳出循环 20.14 continue结束本次循环 20.15 exit退出整个脚本 扩展 select用法 http://www.apelearn.com/bbs/thread-7950-1-1.html 20.10......

王鑫linux
23分钟前
0
0
完整的软件开发流程是怎样的

在it圈混迹了这么久,做过各种各样的工作。但是我确一直不知道一个软件从无到有到底是怎么开发的。于是就产生了强烈的好奇心:一个软件产品的结果为什么是这样?为什么开发的速度不能再快一点...

TreasureWe
29分钟前
0
0
深度学习与图像处理之:人像背景虚化

简单实现思路: 对图像内容进行分割,提取人像 对图像背景进行模糊化处理 将人像和背景重新合成 在这里,使用DeepLabV3模型对图像内容进行分割并提取人像,实现的代码如下: import numpy a...

IOTService
31分钟前
0
0
20180918上课截图

小丑鱼00
39分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部