文档章节

netty websocket 简单消息推送demo

爱喝貓的咖啡
 爱喝貓的咖啡
发布于 2014/11/03 17:08
字数 1012
阅读 8108
收藏 16

今天心情很不好!!! 原因保密。


这篇是基于"netty与websocket通信demo"。

错误想法:大量客户请求,共用一个worker,来实现推送。

正确作法:应该是对Channel对应的ChannelGroup进行操作,来实现推送。

一个Channel可以划分到多个ChannelGroup中。


PushServerChannelHandler和DynMessage这两个类最重要,其实类基本没变。


package org.sl.demo.chatserver;

import java.util.List;
import java.util.Map;

import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketFrame;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;

public class PushServerChannelHandler extends SimpleChannelHandler {
	static boolean debug = true;
	
	@Override
	public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e){
		if(debug){
			System.out.println("channelOpen");
		}
		DynMessage.addAudience(e.getChannel());
	}
	
	@Override
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception{
		Channel ch = e.getChannel();
		Object msg = e.getMessage();
		
		if(debug){
			System.out.println("---------------");
			System.out.println("message: "+msg.getClass());
		}
		try{
			if(msg instanceof HttpRequest){
				processHttpRequest(ch, (HttpRequest)msg);
			}else if(msg instanceof WebSocketFrame){
				processWebsocketRequest(ch,(WebSocketFrame)msg);
			}else{
				//未处理的请求类型
			}
		}catch(Exception ex){
			ch.close().sync();
		}
		super.messageReceived(ctx, e);
	}
	
	@Override
	public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e){
		if(debug){
			System.out.println("channelClosed");
		}
		if(e instanceof MessageEvent){
			MessageEvent me = (MessageEvent) e;			
		}
		DynMessage.removeAudience(e.getChannel());
		e.getChannel().close();
	}
	
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e){
		if(debug){
			System.out.println("channelClosed");
		}
		DynMessage.removeAudience(e.getChannel());
		e.getCause().printStackTrace();
		e.getChannel().close();
		try {
			super.exceptionCaught(ctx, e);
		} catch (Exception e1) {		
			e1.printStackTrace();
		}
	}
	
	void processHttpRequest(Channel channel,HttpRequest request){
		HttpHeaders headers = request.headers();
		if(debug){
			List<Map.Entry<String,String>> ls = headers.entries();
			for(Map.Entry<String,String> i: ls){
				System.out.println("header  "+i.getKey()+":"+i.getValue());
			}
		}	
		
		//non-get request
		if(!HttpMethod.GET.equals(request.getMethod())){
			DefaultHttpResponse resp = new DefaultHttpResponse(
					HttpVersion.HTTP_1_1,
					HttpResponseStatus.BAD_REQUEST);
			channel.write(resp);			
			channel.close();
			return;
		}
				
		WebSocketServerHandshakerFactory wsShakerFactory = new WebSocketServerHandshakerFactory(
				"ws://"+request.headers().get(HttpHeaders.Names.HOST),
				null,false );
		WebSocketServerHandshaker wsShakerHandler = wsShakerFactory.newHandshaker(request);
		if(null==wsShakerHandler){
			//无法处理的websocket版本
			wsShakerFactory.sendUnsupportedWebSocketVersionResponse(channel);
		}else{
			//向客户端发送websocket握手,完成握手
			//客户端收到的状态是101 sitching protocol
			wsShakerHandler.handshake(channel, request);
		}		
	}
	
	void processWebsocketRequest(Channel channel, WebSocketFrame request) throws Exception{		
		if(request instanceof CloseWebSocketFrame){
			DynMessage.removeAudience(channel);
			channel.close().sync();
		}else if(request instanceof PingWebSocketFrame){			
			channel.write(new PongWebSocketFrame(request.getBinaryData()));  
		}else if(request instanceof TextWebSocketFrame){
			//这个地方 可以根据需求,加上一些业务逻辑
			TextWebSocketFrame txtReq = (TextWebSocketFrame) request;		
			if(debug){ System.out.println("txtReq:"+txtReq.getText());}
			if("disconnect".equalsIgnoreCase(txtReq.getText())){
				DynMessage.removeAudience(channel);
				channel.close().sync();
				return;
			}
			//把符合条件的channel添加到DynMessage的channelGroup中
			DynMessage.addAudience(channel);
		}else{
			//WebSocketFrame还有一些
		}
	}
}

package org.sl.demo.chatserver;

import java.util.Random;

import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame;

/**
*动态产生消息,并向Channel组推送。
*/
public class DynMessage implements Runnable{
	public static ChannelGroup audiences = new DefaultChannelGroup("msg-group");
	
	static public void addAudience(Channel ch){		
		audiences.add(ch);
	}
	
	static public void removeAudience(Channel ch){
		audiences.remove(ch);
	}
	
	static String[] names = {
		"Tom", "Jerry",
		"Terry", "Looney",
		"Merrie", "William",
		"Joseph", "Hanna",
		"Speike", "Tyke",
		"Tuffy", "Lightning",
	};
	static String message = "";
	
	public static String getMessage(){
		StringBuffer sb = new StringBuffer();
		sb.append("hello,my name is ");
		sb.append(names[new Random().nextInt(names.length)]);
		sb.append(".");		
		return sb.toString();
//		return message;
	}

	@Override
	public void run() {		
		System.out.println("DynMessage start");
		for(;;){
			String msg = getMessage();			
			radiate(msg);
			try{Thread.sleep(1000); }catch(Exception ex){}
		}
	}
	
	void radiate(String msg){
		audiences.write(new TextWebSocketFrame(msg));
	}
}

<html>
<head>
<script src="jquery-1.9.1.js"></script>
<script src="messagepush.js"></script>
<script >
function doStop(){
	stopMsgPush();
}

function doWsStart(){
	var  r6 = generateMixed(6);
	$("#txtReq").val(r6);
	var  params = $("#txtReq").val();
	doStop();
	
	wsMsgPush('127.0.0.1',params,
		function(data){
			$("#txtResp").val(data);			
		},
		function(){
			$("#txtResp").val("ws close...");
		} ,
		function(){
			$("#txtResp").val("ws error...");
		} );		
}
</script>
</head>

<body>
 
<br/>
<br/><br/>
send: <input id="txtReq" readonly="readonly" type="text" value="" />
<input type="button" value="start" onclick="doWsStart()">
<input type="button" value="stop" onclick="doStop()"/> 
<br/>

recv: <input id="txtResp" type="text" value=""  size="50"/>
</body>
</html>

var _mp_ws = null;
var _mp_ajax_it = null;

function msgPush(url, params,onmessage,onclose,onerror){
	wsMsgPush(url,params,onmessage,onclose,onerror);
	if(!_mp_ws){
		ajaxMsgPush(url,params,10000,onmessage,onclose,onerror);
	}
}

function old_wsMsgPush(url, params,onmessage,onclose,onerror){	
	var ws = new WebSocket("ws://"+url); 
	ws.onopen = function(){ws.send('1111')};
	ws.onmessage = function(evt){ onmessage(evt.data);};
}

function wsMsgPush(url, params,onmessage,onclose,onerror){	
	_mp_ws = new WebSocket("ws://"+url); 
	if(!_mp_ws){ return; }
		
	_mp_ws.onopen = function(){ 
		_mp_ws.send(params); 
	};
	if(onmessage) _mp_ws.onmessage = function(evt){ onmessage(evt.data); }
	if(onerror) _mp_ws.onerror = function (evt){ onerror(); }
	if(onclose) _mp_ws.onclose = function (evt){ onclose(); }	
}

function ajaxMsgPush(url, params,interval,onmessage,onclose,onerror){	
	function __getmsg(){
		$.ajax({
			url:				url,
			data:			params,
			cache:			true,
			type:			"get",
			dataType:		"text",		
			success:		function(data, textStatus, jqXHR){ 
				if(onmessage) onmessage(data);
			},
			error:			function(jqXHR, textStatus, errorThrown){
				if(onerror) onerror();
			},
			complete:		function(jqXHR, textStatus){
				if(onclose) onclose();
			}
		});
	}	
	
	_mp_ajax_it = setInterval("__getmsg()",interval);
}

function stopMsgPush(){
	if(_mp_ws){
		_mp_ws.send("disconnect");
		_mp_ws.close();
	}

	if(_mp_ajax_it){
		clearInterval(_mp_ajax_it);
	}
}

var chars = ['0','1','2','3','4','5','6','7','8','9','A','B','C','D','E','F','G','H','I','J','K','L','M','N','O','P','Q','R','S','T','U','V','W','X','Y','Z'];
function generateMixed(n) {
     var res = "";
     for(var i = 0; i < n ; i ++) {
         var id = Math.ceil(Math.random()*35);
         res += chars[id];
     }
     return res;
}

package org.sl.demo.chatserver;

import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
import org.jboss.netty.handler.timeout.WriteTimeoutHandler;
import org.jboss.netty.util.HashedWheelTimer;

public class PushServerChannelPiplelineFactory  implements ChannelPipelineFactory{

	@Override
	public ChannelPipeline getPipeline() throws Exception {
		ChannelPipeline cp = Channels.pipeline();
		cp.addLast("decoder", new HttpRequestDecoder());
		cp.addLast("encoder", new HttpResponseEncoder());
		cp.addLast("writeTimeout", new WriteTimeoutHandler(new HashedWheelTimer(),10));
		cp.addLast("handler", new PushServerChannelHandler());
		return cp;
	}

}

package org.sl.demo.chatserver;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;

import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;

public class PushServer implements Runnable{
	int port = 80;
	
	public PushServer(int port){
		this.port = port;
	}

	@Override
	public void run() {
		System.out.println("ChatServer "+port);
		
		ServerBootstrap b = new ServerBootstrap(
				new NioServerSocketChannelFactory(
						Executors.newCachedThreadPool(),
						Executors.newCachedThreadPool()));
		b.setOption("child.tcpNoDelay", true);  
		b.setOption("child.keepAlive", true);
		b.setPipelineFactory(new PushServerChannelPiplelineFactory());
		b.bind(new InetSocketAddress(port));
	}
	
	public static void main(String[] args){
		Thread t = new Thread(new DynMessage(),"DynMessage");
		t.start();
		new PushServer(80).run();
	}
}































© 著作权归作者所有

爱喝貓的咖啡
粉丝 17
博文 57
码字总数 31071
作品 0
朝阳
程序员
私信 提问
加载中

评论(4)

E
EmmaGong
前段时间研究了一下goeasy,代码简洁易读,服务稳定。后台推送只需要两行代码, js前端推送也只需要3,4行,而且文档齐全,还提供了后台查询信息收发情况,所以我觉得GoEasy推送服务是个不错的选择。官网: https://goeasy.io/
ly6635
ly6635
0
爱喝貓的咖啡
爱喝貓的咖啡

引用来自“郑楚彬”的评论

能提供整个DEMO的下载吗。。
文章里 就是完整的。
BinGo_91
BinGo_91
能提供整个DEMO的下载吗。。
netty 和ios通信 的问题

我这边是java netty服务端,需要向苹果手机推送信息,谁知道苹果手机如何通过websocket来连上netty呢? 按照网上的说法: WebSocket不同版本的三种握手方式 我用苹果手机来测试,他们所发的请...

天王盖地虎626
2014/07/23
4K
4
八问WebSocket协议:为你快速解答WebSocket热门疑问

本文由“小姐姐养的狗”原创发布于“小姐姐味道”公众号,原题《WebSocket协议 8 问》,收录时有优化和改动。感谢原作者的分享。 一、引言 WebSocket是一种比较新的协议,它是伴随着html5规范...

JackJiang2011
04/25
0
0
基于Netty实现的netty-socketio实现WebSocket

介绍 netty-socketio是socket.io使用Java语言基于Netty网络库编写的WebSocket库.功能非常强大,简单易用,稳定可靠. 后端使用Demo 1.配置SocketIOServer 2.连接,断开连接,推送消息,接收消...

hutaishi
2018/05/28
0
0
八问WebSocket协议:为你快速解答WebSocket热门疑问

一、引言 WebSocket是一种比较新的协议,它是伴随着html5规范而生的,虽然还比较年轻,但大多主流浏览器都已经支持。它使用方面、应用广泛,已经渗透到前后端开发的各种场景中。 对http一问一...

首席大胸器
04/25
0
0
[Java] Netty Websocket Server Javascript Client

WebSocket协议的出现无疑是 HTML5 中最令人兴奋的功能特性之一,它能够很好地替代Comet技术以及Flash的XmlSocket来实现基于HTTP协议的双向通信。目前主流的浏览器,如Chrome、Firefox、IE10、...

长平狐
2012/11/19
774
1

没有更多内容

加载失败,请刷新页面

加载更多

android6.0源码分析之Camera API2.0下的Preview(预览)流程分析

本文将基于android6.0的源码,对Camera API2.0下Camera的preview的流程进行分析。在文章android6.0源码分析之Camera API2.0下的初始化流程分析中,已经对Camera2内置应用的Open即初始化流程进...

天王盖地虎626
28分钟前
1
0
java 序列化和反序列化

1. 概述 序列恢复为Java对象的过程。 对象的序列化主要有两 首先我们介绍下序列化和反序列化的概念: 序列化:把Java对象转换为字节序列的过程。 反序列化:把字节序列恢复为Java对象的过程。...

edison_kwok
40分钟前
1
0
分布式数据一致性

狼王黄师傅
今天
2
0
经验

相信每位开发者在自己开发的过程中,都会反思一些问题,比如怎样提高编程能力、如何保持心态不砍产品经理、996 之后怎样恢复精力……最近开发者 Tomasz Łakomy 将他 7 年的开发生涯中学习到...

WinkJie
今天
4
0
从源码的角度来看SpringMVC

SpringMVC核心流程图 简单总结 首先请求进入DispatcherServlet 由DispatcherServlet 从HandlerMappings中提取对应的Handler 此时只是获取到了对应的Handle,然后得去寻找对应的适配器,即:H...

骚年锦时
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部