文档章节

Jgroups

山东和湖北
 山东和湖北
发布于 2016/05/11 18:15
字数 497
阅读 15
收藏 0

这是之前在网上看到的文章,跟着学习了一把。结合上一篇文章,再定义一个实体,cacheType,cacheKey,cacheValue,cacheSeconds就可以用jgroups+Guava cache实现分布式localCache的数据同步。 

package jgroups;

import org.jgroups.*;
import org.jgroups.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created by ZhaoYun on 2016/5/8.
 * 节点
 */
public class Node extends ReceiverAdapter {
    private static final Logger LOGGER = LoggerFactory.getLogger(Node.class);

    /**
     * 配置文件.
     */
    private static final String CONFIG_XML = "network-tcp.xml";

    /**
     * 集群名称.
     */
    private static final String CLUSTER_NAME = "ZY";

    /**
     * 节点通道.
     */
    private JChannel channel = null;

    /**
     * 以此作为节点间初始化的同步数据.
     */
    private Map<String, String> cacheData = new HashMap<String, String>();

    private ReentrantLock lock = new ReentrantLock();

    public Node(){
        InputStream is = this.getClass().getClassLoader().getResourceAsStream(CONFIG_XML);
        try {
            channel = new JChannel(is);
            channel.setReceiver(this);
            channel.connect(CLUSTER_NAME);
            channel.getState(null,50000);
        } catch (Exception e) {
            System.out.println("启动节点异常!" + e.getMessage());
            // 最好是自定义RuntimeException!
            throw new RuntimeException("启动节点异常!", e);
        }
    }

    /**
     *  发送消息给目标地址.
     * @param address 为空表示发给所有节点.
     * @param object 消息
     */
    public void sendMsg(Address address,Object object){
        Message message = new Message(address,null,object);
        try{
            channel.send(message);
        }catch (Exception e){
            System.out.println("send message error:"+e.getMessage());
            throw new RuntimeException("send message error!", e);
        }
    }

    /**
     *
     * @param output
     * @throws Exception
     */
    @Override
    public void getState(OutputStream output)throws Exception{
        lock.lock();
        try{
            Util.objectToStream(cacheData,new DataOutputStream(output));
        }catch (Exception e){
            System.out.println("get state error:"+ e.getMessage());
            throw new RuntimeException();
        }finally {
            lock.unlock();
        }
    }

    /**
     *
     * @param message
     */
    @Override
    public void receive(Message message){
        //当前节点不接收自己发送到通道当中的消息.
        if(message.getSrc().equals(channel.getAddress())){
            System.out.println(" self ");
            return;
        }
        System.out.println(message.getObject()+ "ZY"+message.getDest());
    }

    /**
     *
     * @param inputStream
     * @throws Exception
     */
    @Override
    public void setState(InputStream inputStream) throws Exception{
        lock.lock();
        try{
            Map<String, String> cacheData = (Map<String, String>) Util.objectFromStream(new DataInputStream(inputStream));
            this.cacheData.putAll(cacheData);
        }catch (Exception e){
            System.out.println("从主节点同步状态到当前节点发生异常!" + e.getMessage());
        }finally {
            lock.unlock();
        }
    }

    @Override
    public void viewAccepted(View view) {
        System.out.println("当前成员[" + this.channel.getAddressAsString() + "]");
        System.out.println(view.getCreator());
        System.out.println(view.getMembers());
        System.out.println("当前节点数据:" + cacheData);
    }

    /**
     * 提供一个简单的初始化数据的方法.
     * @param key
     * @param val
     */
    public void addData(String key,String val){
        if(key!=null&&!key.isEmpty()){
            cacheData.put(key, val);
        }
    }

Jgroups maven依赖:

<!-- jgroups start  用于多实例之间信息同步-->
        <dependency>
            <groupId>org.jgroups</groupId>
            <artifactId>jgroups</artifactId>
            <version>3.5.0.Final</version>
        </dependency>
        <!-- jgroups end -->

 

© 著作权归作者所有

共有 人打赏支持
山东和湖北
粉丝 0
博文 6
码字总数 4761
作品 0
昌平
程序员
bbossgroups 2.0-RC版本中如何通过JGroups来实现集群节点间远程服务调用,或者多服务器之间远程服务调用

bbossgroups 2.0-RC中对jgroups已经升级到Jgroups 2.10.0版本,因此对aop中基于JGroups的rpc也做了相应的调整,本文详细讲解新的使用方法: 1.配置文件目录调整: jgroups本身协议配置文件和...

bboss
2010/07/17
0
0
JGroups系列之介绍和体会

JGroups系列之介绍和体会 很早就想做这个JGroups系列,因为在分布式的系统中,各个部分经常需要相互通信。这些通信包括:信息需要同时发给集群中的某些或全部的worker;或者一个worker启动、...

引鸩怼孑
2015/07/28
0
1
JGroups - 02架构概述

群组通信使用“组和成员”的概念。一般来说,成员是组的一个部分,一个组中包括多个成员。或者可以理解为,成员是一个节点,一组是一个集群。 一个节点是一个进程,位于某个主机上。一个集群...

东皇巴顿
2017/03/21
0
0
JBoss 系列四十九:JBoss 7/WildFly 中端口使用列表

JBoss 7中端口使用列表 JBoss 7中所有配置都在一个文件中(standaone*.xml, domain.xml),和之前的JBoss相比JBoss 7用到的端口变少,我们将以表格的形式列出所有使用到的端口。 remoting 444...

无鸯
2014/02/04
0
0
Hibernate Search 5.10.0.Beta2 发布,数据检索框架

Hibernate Search 5.10.0.Beta1 已发布,这是 5.10 分支的第二个测试版本,此版本主要包括升级到适用于JBoss模块的WildFly 12,但它也增加了一些错误修正和改进。 以下是自Hibernate Search ...

周其
04/16
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

RabbitMQ在CentOS环境安装

1.废话不多说准备一台虚拟机,系统为centos,我这里使用的系统版本如下图所示:

凌晨一点
57分钟前
0
0
线程池相关

在java.util.concurrent包下,提供了一系列与线程池相关的类。 使用线程池的好处 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗; 提高响应速度。当任务到达时,任务...

edwardGe
59分钟前
0
0
学习大数据这三个关键技术是一定要掌握!

大数据时代全面来临,大数据、人工智能等技术引领科技创新潮流,获得国家政策大力支持,前景广阔。学习大数据技术的人自然是络绎不绝, 学习大数据虽然是一个趋势,但也要注意大数据培训课程...

董黎明
今天
0
0
jetbrains 上传代码到github

设置中找github 获取token 验证是否成功 测试git 生成key,一路回车即可 ssh-keygen -t rsa -C “youremail@example.com” 打开pub复制key,需要再次输入一次密码 验证是否成功,输入yes即可...

阿豪boy
今天
0
0
分布式服务框架(拾遗)

前言 现在的大部分工程都已经是基于分布式架构来处理。所以这里对分布式框架做一个简单的总结 常用的RPC框架 RPC框架原理 RPC(Remote Procedure Call,远程过程调用)一般用来实现部署在不同...

kukudeku
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部