在《java message service》的第二章用sun提供的JMS API实现了一个简单的聊天场景。这个实现本身不难,这样的实现在局域网中可以用socket实现,在web实现上可以采用session或者其他如web socket等方式。而用消息系统来实现聊天程序确实也是一种很巧妙的办法。
先把代码贴出来,代码来自《java message service》第二章:
package ch02.chat;
import java.io.*;
import javax.jms.*;
import javax.naming.*;
public class Chat implements javax.jms.MessageListener {
private TopicSession pubSession;
private TopicPublisher publisher;
private TopicConnection connection;
private String username;
/* Constructor used to Initialize Chat */
public Chat(String topicFactory, String topicName, String username)
throws Exception {
// Obtain a JNDI connection using the jndi.properties file
InitialContext ctx = new InitialContext();
// Look up a JMS connection factory and create the connection
TopicConnectionFactory conFactory = (TopicConnectionFactory) ctx
.lookup(topicFactory);
TopicConnection connection = conFactory.createTopicConnection();
// Create two JMS session objects
TopicSession pubSession = connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
TopicSession subSession = connection.createTopicSession(false,
Session.AUTO_ACKNOWLEDGE);
// Look up a JMS topic
Topic chatTopic = (Topic) ctx.lookup(topicName);
// Create a JMS publisher and subscriber. The additional parameters
// on the createSubscriber are a message selector (null) and a true
// value for the noLocal flag indicating that messages produced from
// this publisher should not be consumed by this publisher.
TopicPublisher publisher = pubSession.createPublisher(chatTopic);
TopicSubscriber subscriber = subSession.createSubscriber(chatTopic,
null, true);
// Set a JMS message listener
subscriber.setMessageListener(this);
// Intialize the Chat application variables
this.connection = connection;
this.pubSession = pubSession;
this.publisher = publisher;
this.username = username;
// Start the JMS connection; allows messages to be delivered
connection.start();
}
/* Receive Messages From Topic Subscriber */
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println(textMessage.getText());
} catch (JMSException jmse) {
jmse.printStackTrace();
}
}
/* Create and Send Message Using Publisher */
protected void writeMessage(String text) throws JMSException {
TextMessage message = pubSession.createTextMessage();
message.setText(username + ": " + text);
publisher.publish(message);
}
/* Close the JMS Connection */
public void close() throws JMSException {
connection.close();
}
/* Run the Chat Client */
public static void main(String[] args) {
try {
if (args.length != 3)
System.out.println("Factory, Topic, or username missing");
// args[0]=topicFactory; args[1]=topicName; args[2]=username
Chat chat = new Chat(args[0], args[1], args[2]);
// Read from command line
BufferedReader commandLine = new java.io.BufferedReader(
new InputStreamReader(System.in));
// Loop until the word "exit" is typed
while (true) {
String s = commandLine.readLine();
if (s.equalsIgnoreCase("exit")) {
chat.close();
System.exit(0);
} else
chat.writeMessage(s);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
当然这段代码还需要一个server,这个server可以用任意一个开源的符合JMS规范的MQ。这个代码本身只是实现了消息的发送端和接受端,我们不关注代码本身的实现,我们主要去了解实现的思路。
这是一个典型的sub/pub模式,消费者可以选择一个他关心的topic去订阅消息,producer也可以指定的将消息发送到某个topic下,这样的话,一般有消息到了server中,server就负责将数据分发到各个订阅者中。这样一说,这就是一个我们很熟悉的网络聊天室。
但是我们也应该想到其中的一些问题,首先也是最重要的,消息的顺序问题。特别是聊天程序对顺序的要求还是比较高的,那JMS是否能保证消息的顺序。这部分应该是server来保证,还是发送端或者接收端来保证。如果不能保证,那原因是什么?消息系统的搭建很多都用来支撑分布式系统,如果涉及到交易等对事务性要求很高的系统,JMS又怎么来保证事务,当然还有消息重复性的处理是如何实现的。其次,代码中用到了两个session,为什么不能发送者与接受者共用同一个session。共享一个session会产生线程安全的问题么?又比如,我们知道整个聊天的过程都是异步的,那JMS底层是如何实现消息的监听的,为什么一收到消息就能有所反应,这个event model是如何实现的。紧接着接收端是去pull好还是poll或者是server做push,这几种方式的不同应用场景又是什么。
这些问题都会在下篇文章中介绍,这也是JMS设计中一些核心的问题,再回到例子本身,我们可以考虑下对这个例子的扩展和应用。关于扩展,很容易的就可以想到怎么把单个发送端或者接收端进行横向的扩展形成集群,然后做到负载均衡,当然服务器端也应如此去做。这也是商业化MQ的实现细节。另外针对消息的过滤、存储等细节也是扩展的一部分。类似上面聊天程序这样的订阅模式其实是消息系统中运用最多的一种方式,这种方式可以实现系统之间的异步操作,并实现了系统间的解耦。