文档章节

t-io 集群解决方案以及源码解析

Javen
 Javen
发布于 06/09 23:08
字数 1631
阅读 1591
收藏 54
点赞 5
评论 8

t-io 集群解决方案以及源码解析

[TOC]

0x01 概要说明

本博客是基于老谭t-io showcase中的tio-websocket-showcase 示例来实现集群。看showcase 入门还是挺容易的,入坑(入门)请看老谭写的用t-io来写一个网页聊天室或客服是个怎样的体验。 要深入理解具体实现原理后续的业务扩展,把t-io玩6起来还需要耐心看看源码,看了之后我相信你一定会有收获的,祝你好运。

其实t-io2.4的版本中已加入的集群实现的逻辑代码,只是官方没有写文档以及完整的示例而已,在此不得不说t-io 是一个比较良心的开源项目,很多业务场景都有考虑到。你们有需求也可以去t-ioissues

0x02 已有的集群解决方案

实现思路就是基于redis来做一个发布/订阅的方式达到多节点协作的目的,t-io内置的集群也是使用的此解决方案。下面就来聊聊如何使用t-io的内置集群。

0x03 t-io的内置集群

t-io中是否开启集群是通过org.tio.core.GroupContext中的tioClusterConfig是否为空来判断的。

好了,闲话少说直接上菜(代码)

判断是否开启集群(org.tio.core.GroupContext)

/**
 * 是否是集群
 * @return true: 是集群
 * @author: tanyaowu
 */
public boolean isCluster() {
	return tioClusterConfig != null;
}

tio-websocket-showcase中增加集群解决方案

//实例化t-io集群配置
TioClusterConfig tioClusterConfig = TioClusterConfig.newInstance("Javen", RedissonTemplate.me().getRedissonClient());
//开启群组集群-默认不集群
tioClusterConfig.setCluster4group(true);
//配置t-io集群
serverGroupContext.setTioClusterConfig(tioClusterConfig);
  • TioClusterConfig 中为我们封装了各种场景下是否开启集群的参数配置、消息的发布与订阅以及添加消息监听
  • RedissonTemplate 是使用J-IM中的部分代码,目的是来实例化RedissonClient

RedissonTemplate 代码如下慢慢品读

package org.jim.common.cache.redis;

import java.io.Serializable;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
 * @author WChao
 * @date 2018年5月18日 下午2:46:55
 */
public class RedissonTemplate implements Serializable{

	private static final long serialVersionUID = -4528751601700736437L;
	private static final Logger logger = LoggerFactory.getLogger(RedissonTemplate.class);
	private static RedissonTemplate instance = null;
	private static RedisConfiguration redisConfig = null;
	private static final String REDIS = "redis";
	private static RedissonClient redissonClient = null;
	
	private RedissonTemplate(){};
	
	public static RedissonTemplate me() throws Exception{
		 if (instance == null) { 
	        	synchronized (RedissonTemplate.class) {
					if(instance == null){
						redisConfig = RedisConfigurationFactory.parseConfiguration();
						init();
						instance = new RedissonTemplate();
					}
				}
	     }
		 return instance;
	}
	
	private static final void init() throws Exception {
			String host = redisConfig.getHost();
			if(host == null) {
				logger.error("the server ip of redis  must be not null!");
				throw new Exception("the server ip of redis  must be not null!");
			}	
			int port = redisConfig.getPort();
			String password = redisConfig.getAuth();
			Config redissonConfig = new Config();
			SingleServerConfig singleServerConfig = redissonConfig.useSingleServer();
			singleServerConfig.setAddress(REDIS+"://"+host+":"+port).setPassword(password).setTimeout(redisConfig.getTimeout()).setRetryAttempts(redisConfig.getRetryNum());
			try {
			   redissonClient = Redisson.create(redissonConfig);
			} catch (Exception e) {
				logger.error("cann't create RedissonClient for server"+redisConfig.getHost());
				throw new Exception("cann't create RedissonClient for server"+redisConfig.getHost());
			}
			
	}
	/**
	 * 获取RedissonClient客户端;
	 * @return
	 */
	public final RedissonClient getRedissonClient(){
		return redissonClient;
	}
}

看到这里有人可能要问,在什么地方发布消息以及处理订阅消息!!!

  • 什么地方发布消息

    当然是发送消息的时候,调用Tio.sendXxx()系列方法的时候。在tio-websocket-showcase中主要实现的是群聊,调用的是Tio.sendToGroup(),具体实现代码如下:

    /**
    	 * 发消息到组
    	 * @param groupContext
    	 * @param group
    	 * @param packet
    	 * @param channelContextFilter
    	 * @author tanyaowu
    	 */
    	private static Boolean sendToGroup(GroupContext groupContext, String group, Packet packet, ChannelContextFilter channelContextFilter, boolean isBlock) {
    		try {
    			SetWithLock<ChannelContext> setWithLock = groupContext.groups.clients(groupContext, group);
    			if (setWithLock == null) {
    				log.debug("{}, 组[{}]不存在", groupContext.getName(), group);
    				return false;
    			}
    			Boolean ret = sendToSet(groupContext, setWithLock, packet, channelContextFilter, isBlock);
    			return ret;
    		} finally {
                //判断是否集群以及是不是集群通过topic转过来的消息包
    			if (groupContext.isCluster() && !packet.isFromCluster()) {
    				TioClusterConfig tioClusterConfig = groupContext.getTioClusterConfig();
    				//判断是否开启了群组集群
    				if (tioClusterConfig.isCluster4group()) {
    //					TioClusterVo tioClusterVo = new TioClusterVo(packet);
    //					tioClusterVo.setGroup(group);
    //					tioClusterConfig.publishAsyn(tioClusterVo);
                        //在集群环境下,把群组消息通知到集群中的其它机器
    					notifyClusterForGroup(groupContext, group, packet);
    				}
    			}
    		}
    	}
    
    /**
    	 * 在集群环境下,把群组消息通知到集群中的其它机器
    	 * @param groupContext
    	 * @param group
    	 * @param packet
    	 */
    	public static void notifyClusterForGroup(GroupContext groupContext, String group, Packet packet) {
    		TioClusterConfig tioClusterConfig = groupContext.getTioClusterConfig();
    		TioClusterVo tioClusterVo = new TioClusterVo(packet);
    		tioClusterVo.setGroup(group);
    		tioClusterConfig.publishAsyn(tioClusterVo);
    	}
    
  • 处理订阅消息

其实在t-io中有默认实现,具体的代码如下

public void setTioClusterConfig(TioClusterConfig tioClusterConfig) {
	this.tioClusterConfig = tioClusterConfig;
	if (this.tioClusterConfig != null) {
		this.tioClusterConfig.addMessageListener(new DefaultMessageListener(this));
	}
}

org.tio.core.cluster.DefaultMessageListener 有详细的注释慢慢品读

package org.tio.core.cluster;

import org.apache.commons.lang3.StringUtils;
import org.redisson.api.listener.MessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.tio.core.Tio;
import org.tio.core.GroupContext;
import org.tio.core.intf.Packet;
import org.tio.utils.json.Json;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicLong;

/**
 * 默认的集群消息监听类
 * 作者: 陈磊(Cooppor)
 * 日期: 2018-05-28 15:08
 */
public class DefaultMessageListener implements MessageListener<TioClusterVo> {

    private static Logger log = LoggerFactory.getLogger(DefaultMessageListener.class);

    /**
     * 收到了多少次topic
     */
    private static final AtomicLong RECEIVED_TOPIC_COUNT = new AtomicLong();

    private GroupContext groupContext;

    public DefaultMessageListener(GroupContext groupContext) {
        this.groupContext = groupContext;
    }

    @Override
    public void onMessage(String channel, TioClusterVo tioClusterVo) {
		log.info("收到topic:{}, count:{}, tioClusterVo:{}", channel, RECEIVED_TOPIC_COUNT.incrementAndGet(), Json.toJson(tioClusterVo));
		String clientid = tioClusterVo.getClientId();
		if (StringUtils.isBlank(clientid)) {
			log.error("clientid is null");
			return;
		}
		if (Objects.equals(TioClusterVo.CLIENTID, clientid)) {
			log.info("自己发布的消息,忽略掉,{}", clientid);
			return;
		}

		Packet packet = tioClusterVo.getPacket();
		if (packet == null) {
			log.error("packet is null");
			return;
		}
		packet.setFromCluster(true);
		
		//发送给所有
		boolean isToAll = tioClusterVo.isToAll();
		if (isToAll) {
			Tio.sendToAll(groupContext, packet);
		}

		//发送给指定组
		String group = tioClusterVo.getGroup();
		if (StringUtils.isNotBlank(group)) {
			Tio.sendToGroup(groupContext, group, packet);
		}

		//发送给指定用户
		String userid = tioClusterVo.getUserid();
		if (StringUtils.isNotBlank(userid)) {
			Tio.sendToUser(groupContext, userid, packet);
		}
		
		//发送给指定token
		String token = tioClusterVo.getToken();
		if (StringUtils.isNotBlank(token)) {
			Tio.sendToToken(groupContext, token, packet);
		}

		//发送给指定ip
		String ip = tioClusterVo.getIp();
		if (StringUtils.isNotBlank(ip)) {
			Tio.sendToIp(groupContext, ip, packet);
		}
		
		//发送给指定channelId
		String channelId = tioClusterVo.getChannelId();
		if (StringUtils.isNotBlank(channelId)) {
			Tio.sendToId(groupContext, channelId, packet);
		}
	}
}
0x05 配置redis

哥们,测试时别忘了配置Redis。

/tio-websocket-showcase/src/main/resources/redis.properties

#连接池连接不够用时,重试获取连接次数
retrynum = 100
#可用连接实例的最大数目,默认值为8;
maxactive = -1
#控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。
maxidle = 20
#等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。
maxwait = 5000
timeout = 2000
#redis所在机器ip
host = 127.0.0.1
#redis端口号
port = 6379
#redis密码
auth = 

开启两个端口测试 9326以及9327

9326端口

9327端口

到这里在t-io 中借助Redis来实现集群部署实现步骤就介绍完了,个人能力有限如有错误欢迎指正。你有更好的解决方案或者建议欢迎一起交流讨论,如有疑问欢迎留言。

Fork的源码地址 https://gitee.com/javen205/tio-websocket-showcase

0x06 广而告之
  • IJPay 让支付触手可及,封装了微信支付、支付宝支付、银联支付常用的支付方式以及各种常用的接口。不依赖任何第三方 mvc 框架,仅仅作为工具使用简单快速完成支付模块的开发,可轻松嵌入到任何系统里。
  • t-io 让天下没有难开发的网络编程
  • J-IM 是用JAVA语言,基于t-io开发的轻量、高性能、单机支持几十万至百万在线用户IM,主要目标降低即时通讯门槛,快速打造低成本接入在线IM系统。

© 著作权归作者所有

共有 人打赏支持
Javen

Javen

粉丝 36
博文 30
码字总数 4307
作品 3
深圳
高级程序员
加载中

评论(8)

lifes77
lifes77
有空得去了解
lifes77
lifes77
支持
Javen
Javen

引用来自“talent-tan”的评论

集群和拉黑的代码是在2.4放到码云的,但是在1.x版本就已经有了,只是没开源
大拇指就在一瞬间不知不觉的竖起来了
Javen
Javen

引用来自“SZCoder”的评论

:kissing_heart:感谢分享
谢谢支持,有问题随时反馈
SZCoder
SZCoder
:kissing_heart:感谢分享
talent-tan
talent-tan
集群和拉黑的代码是在2.4放到码云的,但是在1.x版本就已经有了,只是没开源
Javen
Javen

引用来自“talent-tan”的评论

文章可以看出博主已经很熟悉tio的源代码,并且用通俗易懂的语言表达出来,非常棒!
感谢谭总在百忙之中抽出时间过来看我写的博客,集群使用没有毛病我就放心了。
talent-tan
talent-tan
文章可以看出博主已经很熟悉tio的源代码,并且用通俗易懂的语言表达出来,非常棒!
hadoop 1.X资源管理机制缺陷分析和解决方案

一、概述 用hadoop1.x版本已经有一年多了,在使用的过程中发现hadoop1.X的资源管理机制存在诸多缺陷,甚至在这种资源管理机制下会造成服务器资源的严重浪费,负载过高或者过低。本文主要介绍...

zengzhaozheng ⋅ 2014/02/22 ⋅ 0

在Ignite上运行微服务:第一部分

在Ignite上运行微服务:第一部分 从本文开始,会通过一个系列的篇幅来介绍使用Apache Ignite内存数据组织平台来构建容错、可扩展的基于微服务的解决方案。 介绍 当前,很多公司都会将自己的应...

李玉珏 ⋅ 2016/12/15 ⋅ 2

t-io关于心跳、集群等技术问题咨询

1.看了下helloworld那个demo,client端有组特定心跳包的代码,但server端就一行关于心跳的代码, serverGroupContext.setHeartbeatTimeout(org.tio.examples.helloworld.common.Const.TIMEOU...

鸿鹄之志1984 ⋅ 03/21 ⋅ 0

消息中间件(Kafka/RabbitMQ)收录集

本篇主要整理工作中遇到的一些消息中间件的相关知识,包括Kafka, RabbitMQ, RocketMQ, ActiveMQ等,不排除收录其他消息中间件的可能。 这里会持续收录相关知识,包括安装、部署、使用示例、监...

u013256816 ⋅ 2017/01/26 ⋅ 0

k8s集群之日志收集EFK架构

k8s集群之日志收集EFK架构 参考文档 http://tonybai.com/2017/03/03/implement-kubernetes-cluster-level-logging-with-fluentd-and-elasticsearch-stack/ https://github.com/kubernetes/k......

minminmsn ⋅ 2017/03/16 ⋅ 0

DNS 服务器--dnspod-sr

DNSPod Security Recursive DNS Server 关于 dnspod-sr 是一个运行在 Linux 平台上的高性能的递归 DNS 服务器软件,具备高性能、高负载、易扩展的优势,非 BIND 等软件可以比拟。 特性 高性能...

DNSPod ⋅ 2012/06/01 ⋅ 12

分库分表的解决方案

分库分表的解决方案 思路: 1、完整阅读分库、分表策略,注意区分分库与分表的不同,撰写阅读笔记。 2、试验基于IBATIS+SPRING2.0的分库源码,注意思考路由的规则。 3、试验分表的源码实现,...

小报童 ⋅ 2013/10/28 ⋅ 0

分布式框架dubbo原理解析

dubbo原理解析 互联网架构演化 单一应用架构:网站初期,访问量小,只需一个应用,将所有功能都部署在一起,以减少部署节点和成本。 分布式服务架构:当网站访问量越来越多,系统升级越来越频...

jonathan_loda ⋅ 2016/06/17 ⋅ 4

常见的高可用MySQL解决方案

MySQL数据库作为最基础的数据存储服务之一,在整个系统中有着非常重要的地位,因此要求其具备高可用性是无可厚非的。有很多解决方案能实现不同的SLA(服务水平协定),这些方案可以保证数据库...

tianfuguoss ⋅ 2015/12/01 ⋅ 0

[喵咪Redis]Redis安装与介绍

[喵咪Redis]Redis安装与介绍 哈喽大家好啊,这次要来和大家一起来了解学习Redis的一系列技术,最终目的是搭建一个高可用redis集群自动负载灾备,那我们先从最基础的Redis的一些基本介绍以及安装...

喵了_个咪 ⋅ 2016/06/29 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

熊掌号收录比例对于网站原创数据排名的影响[图]

从去年下半年开始,我在写博客了,因为我觉得业余写写博客也还是很不错的,但是从2017年下半年开始,百度已经推出了原创保护功能和熊掌号平台,为此,我也提交了不少以前的老数据,而这些历史...

原创小博客 ⋅ 52分钟前 ⋅ 0

LVM讲解、磁盘故障小案例

LVM LVM就是动态卷管理,可以将多个硬盘和硬盘分区做成一个逻辑卷,并把这个逻辑卷作为一个整体来统一管理,动态对分区进行扩缩空间大小,安全快捷方便管理。 1.新建分区,更改类型为8e 即L...

蛋黄Yolks ⋅ 今天 ⋅ 0

Hadoop Yarn调度器的选择和使用

一、引言 Yarn在Hadoop的生态系统中担任了资源管理和任务调度的角色。在讨论其构造器之前先简单了解一下Yarn的架构。 上图是Yarn的基本架构,其中ResourceManager是整个架构的核心组件,它负...

p柯西 ⋅ 今天 ⋅ 0

uWSGI + Django @ Ubuntu

创建 Django App Project 创建后, 可以看到路径下有一个wsgi.py的问题 uWSGI运行 直接命令行运行 利用如下命令, 可直接访问 uwsgi --http :8080 --wsgi-file dj/wsgi.py 配置文件 & 运行 [u...

袁祾 ⋅ 今天 ⋅ 0

JVM堆的理解

在JVM中,我们经常提到的就是堆了,堆确实很重要,其实,除了堆之外,还有几个重要的模块,看下图: 大 多数情况下,我们并不需要关心JVM的底层,但是如果了解它的话,对于我们系统调优是非常...

不羁之后 ⋅ 昨天 ⋅ 0

推荐:并发情况下:Java HashMap 形成死循环的原因

在淘宝内网里看到同事发了贴说了一个CPU被100%的线上故障,并且这个事发生了很多次,原因是在Java语言在并发情况下使用HashMap造成Race Condition,从而导致死循环。这个事情我4、5年前也经历...

码代码的小司机 ⋅ 昨天 ⋅ 1

聊聊spring cloud gateway的RetryGatewayFilter

序 本文主要研究一下spring cloud gateway的RetryGatewayFilter GatewayAutoConfiguration spring-cloud-gateway-core-2.0.0.RC2-sources.jar!/org/springframework/cloud/gateway/config/G......

go4it ⋅ 昨天 ⋅ 0

创建新用户和授予MySQL中的权限教程

导读 MySQL是一个开源数据库管理软件,可帮助用户存储,组织和以后检索数据。 它有多种选项来授予特定用户在表和数据库中的细微的权限 - 本教程将简要介绍一些选项。 如何创建新用户 在MySQL...

问题终结者 ⋅ 昨天 ⋅ 0

android -------- 颜色的半透明效果配置

最近有朋友问我 Android 背景颜色的半透明效果配置,我网上看资料,总结了一下, 开发中也是常常遇到的,所以来写篇博客 常用的颜色值格式有: RGB ARGB RRGGBB AARRGGBB 这4种 透明度 透明度...

切切歆语 ⋅ 昨天 ⋅ 0

CentOS开机启动subversion

建立自启动脚本: vim /etc/init.d/subversion 输入如下内容: #!/bin/bash## subversion startup script for the server## chkconfig: 2345 90 10# description: start the subve......

随风而飘 ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部