Jgroups
Jgroups
山东和湖北 发表于2年前
Jgroups
  • 发表于 2年前
  • 阅读 12
  • 收藏 0
  • 点赞 2
  • 评论 0

标题:腾讯云 新注册用户域名抢购1元起>>>   

摘要: 分布式系统要求服务器间数据同步,因此采用了jgroups进行通信。

这是之前在网上看到的文章,跟着学习了一把。结合上一篇文章,再定义一个实体,cacheType,cacheKey,cacheValue,cacheSeconds就可以用jgroups+Guava cache实现分布式localCache的数据同步。 

package jgroups;

import org.jgroups.*;
import org.jgroups.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Created by ZhaoYun on 2016/5/8.
 * 节点
 */
public class Node extends ReceiverAdapter {
    private static final Logger LOGGER = LoggerFactory.getLogger(Node.class);

    /**
     * 配置文件.
     */
    private static final String CONFIG_XML = "network-tcp.xml";

    /**
     * 集群名称.
     */
    private static final String CLUSTER_NAME = "ZY";

    /**
     * 节点通道.
     */
    private JChannel channel = null;

    /**
     * 以此作为节点间初始化的同步数据.
     */
    private Map<String, String> cacheData = new HashMap<String, String>();

    private ReentrantLock lock = new ReentrantLock();

    public Node(){
        InputStream is = this.getClass().getClassLoader().getResourceAsStream(CONFIG_XML);
        try {
            channel = new JChannel(is);
            channel.setReceiver(this);
            channel.connect(CLUSTER_NAME);
            channel.getState(null,50000);
        } catch (Exception e) {
            System.out.println("启动节点异常!" + e.getMessage());
            // 最好是自定义RuntimeException!
            throw new RuntimeException("启动节点异常!", e);
        }
    }

    /**
     *  发送消息给目标地址.
     * @param address 为空表示发给所有节点.
     * @param object 消息
     */
    public void sendMsg(Address address,Object object){
        Message message = new Message(address,null,object);
        try{
            channel.send(message);
        }catch (Exception e){
            System.out.println("send message error:"+e.getMessage());
            throw new RuntimeException("send message error!", e);
        }
    }

    /**
     *
     * @param output
     * @throws Exception
     */
    @Override
    public void getState(OutputStream output)throws Exception{
        lock.lock();
        try{
            Util.objectToStream(cacheData,new DataOutputStream(output));
        }catch (Exception e){
            System.out.println("get state error:"+ e.getMessage());
            throw new RuntimeException();
        }finally {
            lock.unlock();
        }
    }

    /**
     *
     * @param message
     */
    @Override
    public void receive(Message message){
        //当前节点不接收自己发送到通道当中的消息.
        if(message.getSrc().equals(channel.getAddress())){
            System.out.println(" self ");
            return;
        }
        System.out.println(message.getObject()+ "ZY"+message.getDest());
    }

    /**
     *
     * @param inputStream
     * @throws Exception
     */
    @Override
    public void setState(InputStream inputStream) throws Exception{
        lock.lock();
        try{
            Map<String, String> cacheData = (Map<String, String>) Util.objectFromStream(new DataInputStream(inputStream));
            this.cacheData.putAll(cacheData);
        }catch (Exception e){
            System.out.println("从主节点同步状态到当前节点发生异常!" + e.getMessage());
        }finally {
            lock.unlock();
        }
    }

    @Override
    public void viewAccepted(View view) {
        System.out.println("当前成员[" + this.channel.getAddressAsString() + "]");
        System.out.println(view.getCreator());
        System.out.println(view.getMembers());
        System.out.println("当前节点数据:" + cacheData);
    }

    /**
     * 提供一个简单的初始化数据的方法.
     * @param key
     * @param val
     */
    public void addData(String key,String val){
        if(key!=null&&!key.isEmpty()){
            cacheData.put(key, val);
        }
    }

Jgroups maven依赖:

<!-- jgroups start  用于多实例之间信息同步-->
        <dependency>
            <groupId>org.jgroups</groupId>
            <artifactId>jgroups</artifactId>
            <version>3.5.0.Final</version>
        </dependency>
        <!-- jgroups end -->

 

共有 人打赏支持
粉丝 0
博文 5
码字总数 4761
×
山东和湖北
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: