文档章节

集群通讯组件Jgroups的配置参数动态实现

zjytk05
 zjytk05
发布于 2016/06/13 12:05
字数 993
阅读 187
收藏 5

一、背景

近期在实现websocket长连接集群,需要用到集群通讯的场景,不想用消息队列mq这种比较重的方式,找到了一个比较成熟的用于集群通讯的开源组件Jgroups,发现正好适合我们的需求场景,而且比较轻量,如图所示:

输入图片说明

当服务端向客户端C1发送消息时,假设RPC调用的服务正好落在S2上,而此时S2和C1并未建立连接,和C1连接的是S4。这个时候就需要服务集群之间进行通讯,S2需要通知S4 给C1发送消息,这个步骤让Jgroups来实现,方便可行。

二、Jgroups应用实践

Jgroups支持udp和tcp方式,笔者在尝试用udp方式时,由于环境问题,集群之间不能很好的通讯,于是改用了tcp方式。 tcp.xml的配置如下:

<!--
    TCP based stack, with flow control and message bundling. This is usually used when IP
    multicasting cannot be used in a network, e.g. because it is disabled (routers discard multicast).
    Note that TCP.bind_addr and TCPPING.initial_hosts should be set, possibly via system properties, e.g.
    -Djgroups.bind_addr=192.168.5.2 and -Djgroups.tcpping.initial_hosts=192.168.5.2[7800]
-->
<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.3.xsd">
    <TCP bind_addr="replace_bindlocalhost"
         bind_port="7800"
         loopback="false"
         recv_buf_size="${tcp.recv_buf_size:5M}"
         send_buf_size="${tcp.send_buf_size:640K}"
         max_bundle_size="64K"
         max_bundle_timeout="30"
         use_send_queues="true"
         sock_conn_timeout="300"

         timer_type="new3"
         timer.min_threads="4"
         timer.max_threads="10"
         timer.keep_alive_time="3000"
         timer.queue_max_size="500"

         thread_pool.enabled="true"
         thread_pool.min_threads="1"
         thread_pool.max_threads="10"
         thread_pool.keep_alive_time="5000"
         thread_pool.queue_enabled="false"
         thread_pool.queue_max_size="100"
         thread_pool.rejection_policy="discard"

         oob_thread_pool.enabled="true"
         oob_thread_pool.min_threads="1"
         oob_thread_pool.max_threads="8"
         oob_thread_pool.keep_alive_time="5000"
         oob_thread_pool.queue_enabled="false"
         oob_thread_pool.queue_max_size="100"
         oob_thread_pool.rejection_policy="discard"/>

    <TCPPING timeout="3000"
             initial_hosts="${jgroups.tcpping.initial_hosts:replace_hostcluster}"
             port_range="1"
             num_initial_members="10"/>
    <MERGE2  min_interval="10000"
             max_interval="30000"/>
    <FD_SOCK/>
    <FD timeout="3000" max_tries="3" />
    <VERIFY_SUSPECT timeout="1500"  />
    <BARRIER />
    <pbcast.NAKACK2 use_mcast_xmit="false"
                   discard_delivered_msgs="true"/>
    <UNICAST3 />
    <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
                   max_bytes="4M"/>
    <pbcast.GMS print_local_addr="true" join_timeout="3000"

                view_bundling="true"/>
    <MFC max_credits="2M"
         min_threshold="0.4"/>
    <FRAG2 frag_size="60K"  />
    <!--RSVP resend_interval="2000" timeout="10000"/-->
    <pbcast.STATE_TRANSFER/>
</config>

其中,TCP.bind_addr 和 TCPPING.initial_hosts是必须设置的。里面的replace_bindlocalhost和replace_hostcluster是需要被动态替换的地方。 由于以上两个参数,在测试环境和生产环境的地址不一样,所以如果直接写死显然不行。查看JChannel源码,也没有显示的提供更改以上2个参数的方法。于是,只能根据JChannel的内部实现,查看哪些地方可以动态去修改,发现了XmlConfigurator可以支持到,实现方式如下:

InputStream input=Thread.currentThread().getContextClassLoader().getResourceAsStream("tcp.xml");
XmlConfigurator conf=XmlConfigurator.getInstance(input);
String tmp=conf.getProtocolStackString();
String tmp2 = conf.replace(tmp, "replace_bindlocalhost", "172.241.12.184");//动态设置的地方1
String param = conf.replace(tmp2, "replace_hostcluster", "172.241.12.184[7800]");//动态设置的地方2
jChannel = new JChannel(param);
//			jChannel = new JChannel();
jChannel.setDiscardOwnMessages(true); //不接自己发送的消息
jChannel.setReceiver(this);
jChannel.connect("Jgroups-cluster");
logger.warn("BroadcastNode start success.");

如上2个标识的地方即可动态设置参数来构造JChannel。

基于Jgroups的发送和接收代码整体参考如下:

public class BroadcastNode extends ReceiverAdapter {

	private static Logger logger = LoggerFactory.getLogger(BroadcastNode.class);

	private static final String input_file = "tcp.xml";

	private static final String REPLACE_LOCALHOST = "replace_bindlocalhost";

	private static final String REPLACE_HOSTCLUSTER = "replace_hostcluster";

	private static final BroadcastNode INSTANCE = new BroadcastNode();

	private JChannel jChannel;

	private BroadcastNode(){

	}

	public static BroadcastNode getInstance() {
        return INSTANCE;
    }

	/**
	 * 初始化
	 */
	@SuppressWarnings("static-access")
	public void start(String hostcluster) {
		try {

			InputStream input=Thread.currentThread().getContextClassLoader().getResourceAsStream(input_file);
            XmlConfigurator conf=XmlConfigurator.getInstance(input);
            String tmp=conf.getProtocolStackString();
            String tmp2 = conf.replace(tmp, REPLACE_LOCALHOST, HostUtil.getIP());
            String param = conf.replace(tmp2, REPLACE_HOSTCLUSTER, hostcluster);
			jChannel = new JChannel(param);
//			jChannel = new JChannel();
			jChannel.setDiscardOwnMessages(true); //不接自己发送的消息
			jChannel.setReceiver(this);
			jChannel.connect("Jgroups-cluster");
			logger.warn("BroadcastNode start success.");
		} catch (Exception e) {
			logger.error("BroadcastNode start exception,"+e);
			if(jChannel != null){
				jChannel.close();
			}
			jChannel= null;
		}
	}

	/**
	 * 关闭
	 */
	public void stop() {
		if(jChannel != null){
			jChannel.close();
		}
		logger.warn("BroadcastNode stopped.");
	}

	/**
	 * 广播消息
	 * @param msgMap
	 */
	public void send(Map<String,String> msgMap){
		if(msgMap == null || msgMap.isEmpty()){
			return;
		}
		//第一个参数null,代表广播
		Message message = new Message(null,JSON.toJSONString(msgMap));
		try {
			jChannel.send(message);
		} catch (Exception e) {
			logger.error("BroadcastNode send message error,"+msgMap,e);
		}
	}

	@SuppressWarnings("unchecked")
	@Override
	public void receive(Message msg) {
		String msgStr = (String) msg.getObject();
		if(StringUtils.isBlank(msgStr)){
			return;
		}
		try{
		    //你的业务逻辑
		}catch(Exception e){
			logger.error("BroadcastNode receive msg deal error,"+msgStr,e);
		}

	}

	@Override
	public void viewAccepted(View new_view) {
		List<Address> addresses = new_view.getMembers();
		StringBuilder sb = new StringBuilder();
		for(Address ad : addresses){
			sb.append(ad.toString()).append(",");
		}
		logger.warn("BroadcastNode views changed,new member address:" + sb.toString());
	}

	@Override
	public void getState(OutputStream output) throws Exception {
		System.out.println("getState====>>" + output);
	}

	@Override
	public void suspect(Address mbr) {
		System.out.println("suspect===>>>" + mbr);
	}

}

三、参考链接

  1. http://www.cnblogs.com/fangfan/p/4042823.html
  2. http://www.jgroups.org/

© 著作权归作者所有

zjytk05
粉丝 30
博文 6
码字总数 7435
作品 0
长宁
高级程序员
私信 提问
bbossgroups 2.0-RC版本中如何通过JGroups来实现集群节点间远程服务调用,或者多服务器之间远程服务调用

bbossgroups 2.0-RC中对jgroups已经升级到Jgroups 2.10.0版本,因此对aop中基于JGroups的rpc也做了相应的调整,本文详细讲解新的使用方法: 1.配置文件目录调整: jgroups本身协议配置文件和...

bboss
2010/07/17
0
0
JGroups系列之介绍和体会

JGroups系列之介绍和体会 很早就想做这个JGroups系列,因为在分布式的系统中,各个部分经常需要相互通信。这些通信包括:信息需要同时发给集群中的某些或全部的worker;或者一个worker启动、...

引鸩怼孑
2015/07/28
127
1
JGroups - 02架构概述

群组通信使用“组和成员”的概念。一般来说,成员是组的一个部分,一个组中包括多个成员。或者可以理解为,成员是一个节点,一组是一个集群。 一个节点是一个进程,位于某个主机上。一个集群...

东皇巴顿
2017/03/21
75
0
Memcached的JGroups实现支持失败转移和JMX

Memcached 是一个分布式内存对象缓存系统, 用于动态Web应用以减轻数据库负载。它通过在内存中缓存数据和对象来减少读取数据库的次数,从而提高动态、数据库驱动网站的速度。Memcached基于的...

JavaGG
2008/10/15
958
0
JGroups 4.0.19 发布,经典 Java 组播框架

JGroups 4.0.19 已发布。新版本做了一些功能上的改进,具体如下: 在 JGroups 4.0.19 中,改变了 ASYM_ENCRYPT 向成员传播私有共享组密钥的方式,从 pull 转变为基于 push 的方法 [1] [2]。 ...

局长
03/19
1K
3

没有更多内容

加载失败,请刷新页面

加载更多

视频如何加水印?

很多视频制作者的视频都被他人盗用过,为了防止自己的劳动成果被他人窃取,给视频加水印对于视频制作者来说,是一件非常重要的事情。那么下面分享一个手机给视频加水印的方法,一起来看看吧!...

白米稀饭2019
40分钟前
6
0
004-Envelop-基于Blockstack的文件传输dapp

本篇文章主要介绍基于Blockstack的文件传输工具; ####A-链接地址 官网地址:https://envelop.app/ Github地址:https://github.com/envelop-app ####B-特性: 1: Share private files easil...

Riverzhou
43分钟前
9
0
SpringCloud——声明式调用Feign

Feign声明式调用 一、Feign简介 使用Ribbon和RestTemplate消费服务的时候,有一个最麻烦的点在于,每次都要拼接URL,组织参数,所以有了Feign声明式调用,Feign的首要目标是将Java HTTP客户端...

devils_os
49分钟前
9
0
《JAVA核心知识》学习笔记 (22. 数据结构)

22.1.1. 栈(stack) 栈( stack)是限制插入和删除只能在一个位置上进行的表,该位置是表的末端,叫做栈顶 (top)。它是后进先出(LIFO)的。对栈的基本操作只有 push(进栈)和 pop(出栈...

Shingfi
54分钟前
8
0
你对AJAX认知有多少(1)?

AJAX(一) AJAX技术对于前段或者后端工程师来说,都是必不可缺的 那我们这几期都来细细品味一下AJAX的相关知识,直接上干货喽~ 1、什么是AJAX,为什么要使用Ajax(请谈一下你对Ajax的认识) 什么...

理性思考
今天
15
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部