文档章节

Jgroups

山东和湖北
 山东和湖北
发布于 2016/05/11 18:15
字数 497
阅读 16
收藏 0

这是之前在网上看到的文章,跟着学习了一把。结合上一篇文章,再定义一个实体,cacheType,cacheKey,cacheValue,cacheSeconds就可以用jgroups+Guava cache实现分布式localCache的数据同步。 

package jgroups;

import org.jgroups.*;
import org.jgroups.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created by ZhaoYun on 2016/5/8.
 * 节点
 */
public class Node extends ReceiverAdapter {
    private static final Logger LOGGER = LoggerFactory.getLogger(Node.class);

    /**
     * 配置文件.
     */
    private static final String CONFIG_XML = "network-tcp.xml";

    /**
     * 集群名称.
     */
    private static final String CLUSTER_NAME = "ZY";

    /**
     * 节点通道.
     */
    private JChannel channel = null;

    /**
     * 以此作为节点间初始化的同步数据.
     */
    private Map<String, String> cacheData = new HashMap<String, String>();

    private ReentrantLock lock = new ReentrantLock();

    public Node(){
        InputStream is = this.getClass().getClassLoader().getResourceAsStream(CONFIG_XML);
        try {
            channel = new JChannel(is);
            channel.setReceiver(this);
            channel.connect(CLUSTER_NAME);
            channel.getState(null,50000);
        } catch (Exception e) {
            System.out.println("启动节点异常!" + e.getMessage());
            // 最好是自定义RuntimeException!
            throw new RuntimeException("启动节点异常!", e);
        }
    }

    /**
     *  发送消息给目标地址.
     * @param address 为空表示发给所有节点.
     * @param object 消息
     */
    public void sendMsg(Address address,Object object){
        Message message = new Message(address,null,object);
        try{
            channel.send(message);
        }catch (Exception e){
            System.out.println("send message error:"+e.getMessage());
            throw new RuntimeException("send message error!", e);
        }
    }

    /**
     *
     * @param output
     * @throws Exception
     */
    @Override
    public void getState(OutputStream output)throws Exception{
        lock.lock();
        try{
            Util.objectToStream(cacheData,new DataOutputStream(output));
        }catch (Exception e){
            System.out.println("get state error:"+ e.getMessage());
            throw new RuntimeException();
        }finally {
            lock.unlock();
        }
    }

    /**
     *
     * @param message
     */
    @Override
    public void receive(Message message){
        //当前节点不接收自己发送到通道当中的消息.
        if(message.getSrc().equals(channel.getAddress())){
            System.out.println(" self ");
            return;
        }
        System.out.println(message.getObject()+ "ZY"+message.getDest());
    }

    /**
     *
     * @param inputStream
     * @throws Exception
     */
    @Override
    public void setState(InputStream inputStream) throws Exception{
        lock.lock();
        try{
            Map<String, String> cacheData = (Map<String, String>) Util.objectFromStream(new DataInputStream(inputStream));
            this.cacheData.putAll(cacheData);
        }catch (Exception e){
            System.out.println("从主节点同步状态到当前节点发生异常!" + e.getMessage());
        }finally {
            lock.unlock();
        }
    }

    @Override
    public void viewAccepted(View view) {
        System.out.println("当前成员[" + this.channel.getAddressAsString() + "]");
        System.out.println(view.getCreator());
        System.out.println(view.getMembers());
        System.out.println("当前节点数据:" + cacheData);
    }

    /**
     * 提供一个简单的初始化数据的方法.
     * @param key
     * @param val
     */
    public void addData(String key,String val){
        if(key!=null&&!key.isEmpty()){
            cacheData.put(key, val);
        }
    }

Jgroups maven依赖:

<!-- jgroups start  用于多实例之间信息同步-->
        <dependency>
            <groupId>org.jgroups</groupId>
            <artifactId>jgroups</artifactId>
            <version>3.5.0.Final</version>
        </dependency>
        <!-- jgroups end -->

 

© 著作权归作者所有

共有 人打赏支持
上一篇: Guava String
下一篇: Guava Cache
山东和湖北
粉丝 0
博文 6
码字总数 4761
作品 0
昌平
程序员
私信 提问
bbossgroups 2.0-RC版本中如何通过JGroups来实现集群节点间远程服务调用,或者多服务器之间远程服务调用

bbossgroups 2.0-RC中对jgroups已经升级到Jgroups 2.10.0版本,因此对aop中基于JGroups的rpc也做了相应的调整,本文详细讲解新的使用方法: 1.配置文件目录调整: jgroups本身协议配置文件和...

bboss
2010/07/17
0
0
JGroups-3.4.6.Final 发布

JGroups-3.4.6.Final 发布了,官方找不到该版本的改进说明。源码下载地址: https://github.com/belaban/JGroups/releases/tag/JGroups-3.4.6.Final 二进制包下载: http://sourceforge.net...

oschina
2014/09/12
828
0
JGroups系列之介绍和体会

JGroups系列之介绍和体会 很早就想做这个JGroups系列,因为在分布式的系统中,各个部分经常需要相互通信。这些通信包括:信息需要同时发给集群中的某些或全部的worker;或者一个worker启动、...

引鸩怼孑
2015/07/28
0
1
JGroups 3.5.0.Beta2 发布

JGroups 3.5.0.Beta2 发布了,官方没有发布改进说明。下载地址: https://github.com/belaban/JGroups/archive/JGroups-3.5.0.Beta2.zip JGroups是一个可靠的群组通讯Java工具包。它基于IP组...

oschina
2014/03/29
1K
0
JGroups 发布 3.3.0 的首个 Alpha 版本

JGroups 刚刚上传 3.3.0 的首个 Alpha 版本到 SourceForge,下载地址:jgroups-3.3.0.Alpha1.jar (2.0 MB) 尚未发布更新记录。 JGroups是一个可靠的群组通讯Java工具包。它基于IP组播(IP mul...

oschina
2013/02/09
715
0

没有更多内容

加载失败,请刷新页面

加载更多

[LintCode] Serialize and Deserialize Binary Tree(二叉树的序列化和反序列化)

描述 设计一个算法,并编写代码来序列化和反序列化二叉树。将树写入一个文件被称为“序列化”,读取文件后重建同样的二叉树被称为“反序列化”。 如何反序列化或序列化二叉树是没有限制的,你...

honeymose
今天
5
0
java框架学习日志-7(静态代理和JDK代理)

静态代理 我们平时去餐厅吃饭,不是直接告诉厨师做什么菜的,而是先告诉服务员点什么菜,然后由服务员传到给厨师,相当于服务员是厨师的代理,我们通过代理让厨师炒菜,这就是代理模式。代理...

白话
今天
23
0
Flink Window

1.Flink窗口 Window Assigner分配器。 窗口可以是时间驱动的(Time Window,例如:每30秒钟),也可以是数据驱动的(Count Window,例如:每一百个元素)。 一种经典的窗口分类可以分成: 翻...

满小茂
今天
18
0
my.ini

1

architect刘源源
今天
16
0
docker dns

There is a opensource application that solves this issue, it's called DNS Proxy Server It's a DNS server that solves containers hostnames, if could not found a hostname that mat......

kut
今天
17
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部