jedis连接池使用
jedis连接池使用
yiqifendou 发表于1年前
jedis连接池使用
  • 发表于 1年前
  • 阅读 61
  • 收藏 1
  • 点赞 0
  • 评论 0
package com.heli.mybatis.page.servlet;

import java.io.IOException;
import java.util.List;
import java.util.Random;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.apache.commons.lang3.StringUtils;

import com.commnon.RedisAPI;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Transaction;

public class ReidsMatchServlet extends HttpServlet {
	public static JedisPool pool = RedisAPI.getPool();

	// RedisAPI.set("accountBalance", "999999999");// 标还剩999999999块钱

	private static final long serialVersionUID = 1L;

	protected void doGet(HttpServletRequest request, HttpServletResponse response)
			throws ServletException, IOException {
		Jedis jedis = pool.getResource();
		long start = System.currentTimeMillis();
		int flag = 0;
		try {
			flag = bid(request, response, jedis);
		} catch (Exception e) {
			e.printStackTrace();
			response.getWriter().write("fail buy");
		} finally {
			pool.returnBrokenResource(jedis);
			RedisAPI.returnResource(pool, jedis);
		}
		if (flag == 1) {
			response.getWriter().write("success buy");
		} else if (flag == 2) {
			response.getWriter().write("have buy");
		} else if (flag == 0) {
			response.getWriter().write("bid is zero ,you can not buy");
		}else{
			response.getWriter().write("fail buy");
		}
		long end = System.currentTimeMillis();
		System.out.println("--------------------------------------------请求耗时:" + (end - start) + "毫秒");
	}

	protected void doPost(HttpServletRequest request, HttpServletResponse response)
			throws ServletException, IOException {
		doGet(request, response);
	}

	private int bid(HttpServletRequest request, HttpServletResponse response, Jedis jedis) throws Exception {
		int flag = 0;// 1,成功,2已经购买,3已经没钱了,其他異常
		// 每个请求对应一个userId
		int userId = new Random().nextInt(999999);
		
		// 观察 总标值,每人抢购一元
		while ("OK".equals(jedis.watch("accountBalance"))) {
        		// 判断是否购买过
        		Boolean isBuy = RedisAPI.sismember("userIdSet", userId + "");
        		if (isBuy) {
        			flag = 2;
        			return flag;
        		}
        		//投资额
			int r = 1;// new Random().nextInt(2);
			int lastAccount = 0;
			String balance = RedisAPI.get("accountBalance");
			if (StringUtils.isNotBlank(balance)) {
				lastAccount = Integer.valueOf(balance) - r;
			}
			if (lastAccount < 0) {
				flag = 3;
				break;
			}
			Transaction tx = jedis.multi();
			tx.set("accountBalance", lastAccount + "");
			List<Object> result = tx.exec();
			if (result == null || result.isEmpty()) {
                                //需要释放watch
				jedis.unwatch();
			} else {
				System.out.println("恭喜您," + userId + "已经中标" + r + "元,标余额" + lastAccount + "元");
				RedisAPI.set(Thread.currentThread().getName(), r + "");
				RedisAPI.sadd("userIdSet", userId + "");
				flag = 1;
				break;
			}
		}
		return flag;
	}
}

jedis连接池工具类

所需jar:jedis-2.1.0.jar和commons-pool-1.5.4.jar Jedis操作步骤如下:

1->获取Jedis实例需要从JedisPool中获取;

2->用完Jedis实例需要返还给JedisPool;

3->如果Jedis在使用过程中出错,则也需要还给JedisPool;

单机模式

package com.commnon;

/**
 * Redis操作接口
 *
 * @author 林计钦
 * @version 1.0 2013-6-14 上午08:54:14
 */
public class RedisAPI {
    private static JedisPool pool = getPool();

    /**
     * 构建redis连接池
     * 
     * @param ip
     * @param port
     * @return JedisPool
     */
    private static JedisPool getPool() {
        if (pool == null) {
            ResourceBundle bundle = ResourceBundle.getBundle("redis");
            if (bundle == null) {
                throw new IllegalArgumentException(
                        "[redis.properties] is not found!");
            }
            JedisPoolConfig config = new JedisPoolConfig();
                        //控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取;  
                        //如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。 
            config.setMaxActive(Integer.valueOf(bundle
                    .getString("redis.pool.maxActive")));
                        //控制一个pool最多有多少个状态为idle(空闲的)的jedis实例。  
            config.setMaxIdle(Integer.valueOf(bundle
                    .getString("redis.pool.maxIdle")));
                        //表示当borrow(引入)一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException; 
            config.setMaxWait(Long.valueOf(bundle.getString("redis.pool.maxWait")));
                        //在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的
            config.setTestOnBorrow(Boolean.valueOf(bundle
                    .getString("redis.pool.testOnBorrow")));
            config.setTestOnReturn(Boolean.valueOf(bundle
                    .getString("redis.pool.testOnReturn")));
            pool = new JedisPool(config, bundle.getString("redis.ip"),
                    Integer.valueOf(bundle.getString("redis.port")));
        }
        return pool;
    }
    

    /**
     * 返还到连接池
     * 
     * @param pool
     * @param redis
     */
    private static void returnResource(JedisPool pool, Jedis redis) {
        if (redis != null) {
            pool.returnResource(redis);
        }
    }

    /**
     * 获取数据
     * 
     * @param key
     * @return
     */
    public static String get(String key) {
        String value = null;
        Jedis jedis = null;
        try {
        	//pool.getResource()线程并发是安全的
            jedis = pool.getResource();
            value = jedis.get(key);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 释放redis对象
            pool.returnBrokenResource(jedis);
            // 返还到连接池
            returnResource(pool, jedis);
        }
        return value;
    }

    /**
     * 赋值数据
     * 
     * @param key
     * @return
     */
    public static String set(String key, String value) {
        String result = null;
        Jedis jedis = null;
        try {
        	//pool.getResource()线程并发是安全的
            jedis = pool.getResource();
            result = jedis.set(key, value);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 释放redis对象
            pool.returnBrokenResource(jedis);
            // 返还到连接池
            returnResource(pool, jedis);
        }

        return result;
    }
    
   
    /**
     * 赋值数据
     * 
     * @param key
     * @return
     */
    public static Long sadd(String key, String value) {
        Long result = null;
        Jedis jedis = null;
        try {
        	//pool.getResource()线程并发是安全的
            jedis = pool.getResource();
            result = jedis.sadd(key, value);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 释放redis对象
            pool.returnBrokenResource(jedis);
            // 返还到连接池
            returnResource(pool, jedis);
        }

        return result;
    }

    /**
     * 判断set中是否有值
     * 
     * @param key
     * @return
     */
    public static Boolean sismember(String key, String member) {
        Boolean result = null;
        Jedis jedis = null;
        try {
        	//pool.getResource()线程并发是安全的
            jedis = pool.getResource();
            result = jedis.sismember(key, member);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 释放redis对象
            pool.returnBrokenResource(jedis);
            // 返还到连接池
            returnResource(pool, jedis);
        }

        return result;
    }

}

redis.properties

#\u6700\u5927\u5206\u914d\u7684\u5bf9\u8c61\u6570
redis.pool.maxActive=1024
#\u6700\u5927\u80fd\u591f\u4fdd\u6301idel\u72b6\u6001\u7684\u5bf9\u8c61\u6570
redis.pool.maxIdle=200
#\u5f53\u6c60\u5185\u6ca1\u6709\u8fd4\u56de\u5bf9\u8c61\u65f6\uff0c\u6700\u5927\u7b49\u5f85\u65f6\u95f4
redis.pool.maxWait=1000
#\u5f53\u8c03\u7528borrow Object\u65b9\u6cd5\u65f6\uff0c\u662f\u5426\u8fdb\u884c\u6709\u6548\u6027\u68c0\u67e5
redis.pool.testOnBorrow=true
#\u5f53\u8c03\u7528return Object\u65b9\u6cd5\u65f6\uff0c\u662f\u5426\u8fdb\u884c\u6709\u6548\u6027\u68c0\u67e5
redis.pool.testOnReturn=true
#IP
redis.ip=127.0.0.1
#Port
redis.port=6379

代码说明:
a、获取jedis实例时,实际上可能有两类错误。
一类是pool.getReource(),得不到可用的jedis实例;
另一类是jedis.set/get时出错也会抛出异常;
为了实现区分,所以根据instance是否为null来实现,如果为空就证明instance根本就没初始化,也就不用return给pool;如果instance不为null,则证明是需要返还给pool的;

b、在instance出错时,必须调用returnBrokenResource返还给pool,否则下次通过getResource得到的instance的缓冲区可能还存在数据,出现问题!

JedisPool的配置
JedisPool的配置参数很大程度上依赖于实际应用需求、软硬件能力。以前没用过commons-pool,所以这次花了一整天专门看这些参数的含义。。。JedisPool的配置参数大部分是由JedisPoolConfig的对应项来赋值的。

maxActive:控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取;如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted。

maxIdle:控制一个pool最多有多少个状态为idle(空闲)的jedis实例;

whenExhaustedAction:表示当pool中的jedis实例都被allocated完时,pool要采取的操作;默认有三种。

WHEN_EXHAUSTED_FAIL --> 表示无jedis实例时,直接抛出NoSuchElementException;

WHEN_EXHAUSTED_BLOCK --> 则表示阻塞住,或者达到maxWait时抛出JedisConnectionException;

WHEN_EXHAUSTED_GROW --> 则表示新建一个jedis实例,也就说设置的maxActive无用;

maxWait:表示当borrow一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException;

testOnBorrow:在borrow一个jedis实例时,是否提前进行alidate操作;如果为true,则得到的jedis实例均是可用的;

testOnReturn:在return给pool时,是否提前进行validate操作;

testWhileIdle:如果为true,表示有一个idle object evitor线程对idle object进行扫描,如果validate失败,此object会被从pool中drop掉;这一项只有在timeBetweenEvictionRunsMillis大于0时才有意义;

timeBetweenEvictionRunsMillis:表示idle object evitor两次扫描之间要sleep的毫秒数;

numTestsPerEvictionRun:表示idle object evitor每次扫描的最多的对象数;

minEvictableIdleTimeMillis:表示一个对象至少停留在idle状态的最短时间,然后才能被idle object evitor扫描并驱逐;这一项只有在timeBetweenEvictionRunsMillis大于0时才有意义;

softMinEvictableIdleTimeMillis:在minEvictableIdleTimeMillis基础上,加入了至少minIdle个对象已经在pool里面了。如果为-1,evicted不会根据idle time驱逐任何对象。如果minEvictableIdleTimeMillis>0,则此项设置无意义,且只有在timeBetweenEvictionRunsMillis大于0时才有意义;

lifo:borrowObject返回对象时,是采用DEFAULT_LIFO(last in first out,即类似cache的最频繁使用队列),如果为False,则表示FIFO队列;

其中JedisPoolConfig对一些参数的默认设置如下:

testWhileIdle=true  
minEvictableIdleTimeMills=60000  
timeBetweenEvictionRunsMillis=30000  
numTestsPerEvictionRun=-1

分片模式(伪集群,非真正的集群,reids3.0以上版本才提供真正的集群)

一下内容来自网络,但是很多细节没有写出来,所以我经过自己琢磨,终于找到原因了。
Redis-2.4.15目前没有提供集群的功能,Redis作者在博客中说将在3.0中实现集群机制。目前Redis实现集群的方法主要是采用一致性哈稀分片(Shard),将不同的key分配到不同的redis server上,达到横向扩展的目的。下面来介绍一种比较常用的分布式场景:
在读写操作比较均匀且实时性要求较高,可以用下图的分布式模式:
在读操作远远多于写操作时,可以用下图的分布式模式:
对于一致性哈稀分片的算法,Jedis-2.0.0已经提供了,下面是使用示例代码(以ShardedJedisPool为例):

import java.util.ArrayList;
import java.util.List;

import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.JedisShardInfo;
import redis.clients.jedis.ShardedJedis;
import redis.clients.jedis.ShardedJedisPool;
import redis.clients.util.Hashing;
import redis.clients.util.Sharded;

public class ShardedRedisAPI {
	static ShardedJedisPool pool;
	static{
		JedisPoolConfig config =new JedisPoolConfig();//Jedis池配置
		config.setMaxActive(500);//最大活动的对象个数
		config.setMaxIdle(1000 * 60);//对象最大空闲时间
		config.setMaxWait(1000 * 10);//获取对象时最大等待时间
		config.setTestOnBorrow(true);
		
		String hostA = "10.10.224.44";
		int portA = 6379;
		String hostB = "10.10.224.48";
		int portB = 6379;
		
		List<JedisShardInfo> jdsInfoList =new ArrayList<JedisShardInfo>(2);
		JedisShardInfo infoA = new JedisShardInfo(hostA, portA);
		infoA.setPassword("redis.360buy");
		JedisShardInfo infoB = new JedisShardInfo(hostB, portB);
		infoB.setPassword("redis.360buy");
		jdsInfoList.add(infoA);
		jdsInfoList.add(infoB);
		
		//传入连接池配置、分布式redis服务器主机信息、分片规则(存储到哪台redis服务器)
		pool =new ShardedJedisPool(config, jdsInfoList, Hashing.MURMUR_HASH,Sharded.DEFAULT_KEY_TAG_PATTERN);
	}
	
	private static int index = 1;
	
	public static String generateKey(){
		return String.valueOf(Thread.currentThread().getId())+"_"+(index++);
	}
	     
	public static void main(String[] args) {
		for(int i=0; i<100; i++){
           String key =generateKey();
           //key += "{aaa}";
           ShardedJedis jds =null;
           try {
               jds =pool.getResource();
               System.out.println(key+":"+jds.getShard(key).getClient().getHost());
               System.out.println(jds.set(key,"1111111111111111111111111111111"));
           }catch (Exception e) {
               e.printStackTrace();
           }
           finally{
                //借鉴网上的并自行修改了下
                pool.returnBrokenResource(jds);
                if(jds != null){                    
                    pool.returnResource(jds);
                }               
           }
		}
	}

}

从运行结果中可以看到,不同的key被分配到不同的Redis-Server上去了。
总结: 客户端jedis的一致性哈稀进行分片原理:初始化ShardedJedisPool的时候,会将上面程序中的jdsInfoList数据进行一个算法技术,主要计算依据为list中的index位置来计算,我大概看了一下其源码如下: jedis源码中ShardedJedis实现sharding

(如果亲还是不信的话,可以将上面程序中的 jdsInfoList在add的时候,先add第二个,在add第一个,绝对取不出数据,原因很简单,第一次set值的时候,是按list下标来hash计算出一个服务器的,所以取值的时候,list顺序不能变动)
实际上,上面的集群模式还存在两个问题:

  1.   扩容问题:  
    

因为使用了一致性哈稀进行分片,那么不同的key分布到不同的Redis-Server上,当我们需要扩容时,需要增加机器到分片列表中,这时候会使得同样的key算出来落到跟原来不同的机器上,这样如果要取某一个值,会出现取不到的情况,对于这种情况,Redis的作者提出了一种名为Pre-Sharding的方式:
Pre-Sharding方法是将每一个台物理机上,运行多个不同断口的Redis实例,假如有三个物理机,每个物理机运行三个Redis实际,那么我们的分片列表中实际有9个Redis实例,当我们需要扩容时,增加一台物理机,步骤如下:
A. 在新的物理机上运行Redis-Server;
B. 该Redis-Server从属于(slaveof)分片列表中的某一Redis-Server(假设叫RedisA);
C. 等主从复制(Replication)完成后,将客户端分片列表中RedisA的IP和端口改为新物理机上Redis-Server的IP和端口;
D. 停止RedisA。
这样相当于将某一Redis-Server转移到了一台新机器上。Prd-Sharding实际上是一种在线扩容的办法,但还是很依赖Redis本身的复制功能的,如果主库快照数据文件过大,这个复制的过程也会很久,同时会给主库带来压力。所以做这个拆分的过程最好选择为业务访问低峰时段进行。
再总结一下这里的扩容:其实这里的扩容很简单的思想:就是前期我们可能只用到两三个服务器,但是但是担心后期要扩容,所以前期就现在每一个机器上面再装两个redis,这样就有9个redis嘛,后面如果确实服务器不够,需要扩容,就重新找一台新机来代替9个中的一个redis,有人说,这样不还是9个么,是的,但是以前服务器上面有三个redis,压力很大的,这样做,相当于单独分离出来并且将数据一起copy给新的服务器。值得注意的是,还需要修改客户端被代替的redis的IP和端口为现在新的服务器,只要顺序不变,不会影响一致性哈希分片(刚才上面刚说了哈)。
2. 单点故障问题:
还是用到Redis主从复制的功能,两台物理主机上分别都运行有Redis-Server,其中一个Redis-Server是另一个的从库,采用双机热备技术,客户端通过虚拟IP访问主库的物理IP,当主库宕机时,切换到从库的物理IP。只是事后修复主库时,应该将之前的从库改为主库(使用命令slaveof no one),主库变为其从库(使命令slaveof IP PORT),这样才能保证修复期间新增数据的一致性。

标签: jedis 连接池
共有 人打赏支持
粉丝 5
博文 49
码字总数 8974
×
yiqifendou
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: