文档章节

Jgroups

山东和湖北
 山东和湖北
发布于 2016/05/11 18:15
字数 497
阅读 15
收藏 0

这是之前在网上看到的文章,跟着学习了一把。结合上一篇文章,再定义一个实体,cacheType,cacheKey,cacheValue,cacheSeconds就可以用jgroups+Guava cache实现分布式localCache的数据同步。 

package jgroups;

import org.jgroups.*;
import org.jgroups.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created by ZhaoYun on 2016/5/8.
 * 节点
 */
public class Node extends ReceiverAdapter {
    private static final Logger LOGGER = LoggerFactory.getLogger(Node.class);

    /**
     * 配置文件.
     */
    private static final String CONFIG_XML = "network-tcp.xml";

    /**
     * 集群名称.
     */
    private static final String CLUSTER_NAME = "ZY";

    /**
     * 节点通道.
     */
    private JChannel channel = null;

    /**
     * 以此作为节点间初始化的同步数据.
     */
    private Map<String, String> cacheData = new HashMap<String, String>();

    private ReentrantLock lock = new ReentrantLock();

    public Node(){
        InputStream is = this.getClass().getClassLoader().getResourceAsStream(CONFIG_XML);
        try {
            channel = new JChannel(is);
            channel.setReceiver(this);
            channel.connect(CLUSTER_NAME);
            channel.getState(null,50000);
        } catch (Exception e) {
            System.out.println("启动节点异常!" + e.getMessage());
            // 最好是自定义RuntimeException!
            throw new RuntimeException("启动节点异常!", e);
        }
    }

    /**
     *  发送消息给目标地址.
     * @param address 为空表示发给所有节点.
     * @param object 消息
     */
    public void sendMsg(Address address,Object object){
        Message message = new Message(address,null,object);
        try{
            channel.send(message);
        }catch (Exception e){
            System.out.println("send message error:"+e.getMessage());
            throw new RuntimeException("send message error!", e);
        }
    }

    /**
     *
     * @param output
     * @throws Exception
     */
    @Override
    public void getState(OutputStream output)throws Exception{
        lock.lock();
        try{
            Util.objectToStream(cacheData,new DataOutputStream(output));
        }catch (Exception e){
            System.out.println("get state error:"+ e.getMessage());
            throw new RuntimeException();
        }finally {
            lock.unlock();
        }
    }

    /**
     *
     * @param message
     */
    @Override
    public void receive(Message message){
        //当前节点不接收自己发送到通道当中的消息.
        if(message.getSrc().equals(channel.getAddress())){
            System.out.println(" self ");
            return;
        }
        System.out.println(message.getObject()+ "ZY"+message.getDest());
    }

    /**
     *
     * @param inputStream
     * @throws Exception
     */
    @Override
    public void setState(InputStream inputStream) throws Exception{
        lock.lock();
        try{
            Map<String, String> cacheData = (Map<String, String>) Util.objectFromStream(new DataInputStream(inputStream));
            this.cacheData.putAll(cacheData);
        }catch (Exception e){
            System.out.println("从主节点同步状态到当前节点发生异常!" + e.getMessage());
        }finally {
            lock.unlock();
        }
    }

    @Override
    public void viewAccepted(View view) {
        System.out.println("当前成员[" + this.channel.getAddressAsString() + "]");
        System.out.println(view.getCreator());
        System.out.println(view.getMembers());
        System.out.println("当前节点数据:" + cacheData);
    }

    /**
     * 提供一个简单的初始化数据的方法.
     * @param key
     * @param val
     */
    public void addData(String key,String val){
        if(key!=null&&!key.isEmpty()){
            cacheData.put(key, val);
        }
    }

Jgroups maven依赖:

<!-- jgroups start  用于多实例之间信息同步-->
        <dependency>
            <groupId>org.jgroups</groupId>
            <artifactId>jgroups</artifactId>
            <version>3.5.0.Final</version>
        </dependency>
        <!-- jgroups end -->

 

© 著作权归作者所有

共有 人打赏支持
山东和湖北
粉丝 0
博文 6
码字总数 4761
作品 0
昌平
程序员
bbossgroups 2.0-RC版本中如何通过JGroups来实现集群节点间远程服务调用,或者多服务器之间远程服务调用

bbossgroups 2.0-RC中对jgroups已经升级到Jgroups 2.10.0版本,因此对aop中基于JGroups的rpc也做了相应的调整,本文详细讲解新的使用方法: 1.配置文件目录调整: jgroups本身协议配置文件和...

bboss
2010/07/17
0
0
JGroups系列之介绍和体会

JGroups系列之介绍和体会 很早就想做这个JGroups系列,因为在分布式的系统中,各个部分经常需要相互通信。这些通信包括:信息需要同时发给集群中的某些或全部的worker;或者一个worker启动、...

引鸩怼孑
2015/07/28
0
1
JGroups - 02架构概述

群组通信使用“组和成员”的概念。一般来说,成员是组的一个部分,一个组中包括多个成员。或者可以理解为,成员是一个节点,一组是一个集群。 一个节点是一个进程,位于某个主机上。一个集群...

东皇巴顿
2017/03/21
0
0
JBoss 系列四十九:JBoss 7/WildFly 中端口使用列表

JBoss 7中端口使用列表 JBoss 7中所有配置都在一个文件中(standaone*.xml, domain.xml),和之前的JBoss相比JBoss 7用到的端口变少,我们将以表格的形式列出所有使用到的端口。 remoting 444...

无鸯
2014/02/04
0
0
Hibernate Search 5.10.0.Beta2 发布,数据检索框架

Hibernate Search 5.10.0.Beta1 已发布,这是 5.10 分支的第二个测试版本,此版本主要包括升级到适用于JBoss模块的WildFly 12,但它也增加了一些错误修正和改进。 以下是自Hibernate Search ...

周其
04/16
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Shiro | 实现权限验证完整版

写在前面的话 提及权限,就会想到安全,是一个十分棘手的话题。这里只是作为学校Shiro的一个记录,而不是,权限就应该这样设计之类的。 Shiro框架 1、Shiro是基于Apache开源的强大灵活的开源...

冯文议
今天
1
0
linux 系统的运行级别

运行级别 运行级别 | 含义 0 关机 1 单用户模式,可以想象为windows 的安全模式,主要用于修复系统 2 不完全的命令模式,不含NFS服务 3 完全的命令行模式,就是标准的字符界面 4 系统保留 5 ...

Linux学习笔记
今天
2
0
学习设计模式——命令模式

任何模式的出现,都是为了解决一些特定的场景的耦合问题,以达到对修改封闭,对扩展开放的效果。命令模式也不例外: 命令模式是为了解决命令的请求者和命令的实现者之间的耦合关系。 解决了这...

江左煤郎
今天
3
0
字典树收集(非线程安全,后续做线程安全改进)

将500W个单词放进一个数据结构进行存储,然后进行快速比对,判断一个单词是不是这个500W单词之中的;来了一个单词前缀,给出500w个单词中有多少个单词是该前缀. 1、这个需求首先需要设计好数据结...

算法之名
昨天
15
0
GRASP设计模式

此文参考了这篇博客,建议读者阅读原文。 面向对象(Object-Oriented,OO)是当下软件开发的主流方法。在OO分析与设计中,我们首先从问题领域中抽象出领域模型,在领域模型中以适当的粒度归纳...

克虏伯
昨天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部