springboot配置多个redis数据源,并订阅redis管道

原创
2020/10/23 19:57
阅读数 14

配置多个数据源,数据源为redis集群

spring:
  application:
    name: XXX
  redis:
    cluster:
      nodes: Ip:Port,Ip:Port,Ip:Port
    password:
    test-on-borrow: true
    timeout: 10000 #按照各自业务需求配置超时时间
    lettuce:
      pool:
        max-active: 200
        max-idle: 200
        max-wait: 5000
        min-idle: 10
  secondaryRedis:
    cluster:
      nodes: Ip:Port,Ip:Port,Ip:Port
    password:
/**
 * Redis配置声明
 * 
 */
@Configuration
public class RedisConfig {
    //如果单个数据源,直接使用lettuceConnectionFactory,不要单独配置
    //@Autowired
    //public LettuceConnectionFactory lettuceConnectionFactory;

    //监听执行方法
    @Autowired
    IndexListener indexListener;

    //监听执行方法
    @Autowired
    SendIndexPriceListener sendIndexPriceListener;

    //声明temp
    @Resource(name = "secondaryRedisTemplate")
    private RedisTemplate secondaryRedisTemplate;

    //声明temp
    @Resource(name = "stringRedisTemplate")
    private RedisTemplate stringRedisTemplate;



    /**
     * 监听
     * @return
     */
    @Bean
    RedisMessageListenerContainer indexMessageListenerContainer(){
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(stringRedisTemplate.getConnectionFactory());
        redisMessageListenerContainer.setTaskExecutor(threadPool());
        redisMessageListenerContainer.addMessageListener(indexListener, new ChannelTopic("管道名称"));
        redisMessageListenerContainer.afterPropertiesSet();
        return redisMessageListenerContainer;
    }

    /**
     * 监听 指数推送
     * @return
     */
    @Bean
    RedisMessageListenerContainer sendIndexPriceListenerContainer(){

        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(secondaryRedisTemplate.getConnectionFactory());
        redisMessageListenerContainer.setTaskExecutor(threadPool());
        //监听器,管道名称
        redisMessageListenerContainer.addMessageListener(sendIndexPriceListener,
                new ChannelTopic("管道名称"));
        redisMessageListenerContainer.afterPropertiesSet();
        return redisMessageListenerContainer;
    }

    public ExecutorService threadPool(){
        return Executors.newFixedThreadPool(30);
    }
}

配置多个数据源

@Configuration
public class GenerateNewTemplateRedisConfig {

    @Autowired
    private Environment environment;
    /**
     * 配置lettuce连接池
     *
     * @return
     */
    @Bean
    @Primary
    @ConfigurationProperties(prefix = "spring.redis.lettuce.pool")
    public GenericObjectPoolConfig redisPool() {
        return new GenericObjectPoolConfig();
    }

    /**
     * 配置第一个数据源的
     *
     * @return
     */
    @Bean("redisClusterConfig")
    @Primary
    public RedisClusterConfiguration redisClusterConfig() {
        Map<String, Object> source = new HashMap<>(8);
        source.put("spring.redis.cluster.nodes", environment.getProperty("spring.redis.cluster.nodes"));
        RedisClusterConfiguration redisClusterConfiguration;
        redisClusterConfiguration = new RedisClusterConfiguration(new MapPropertySource("RedisClusterConfiguration", source));
        redisClusterConfiguration.setPassword(RedisPassword.of(environment.getProperty("spring.redis.password")));
        return redisClusterConfiguration;
    }

    /**
     * 配置第一个数据源的连接工厂
     * 这里注意:需要添加@Primary 指定bean的名称,目的是为了创建两个不同名称的LettuceConnectionFactory
     *
     * @param redisPool
     * @param redisClusterConfig
     * @return
     */
    @Bean("lettuceConnectionFactory")
    @Primary
    public LettuceConnectionFactory lettuceConnectionFactory(GenericObjectPoolConfig redisPool,
                                                             @Qualifier("redisClusterConfig") RedisClusterConfiguration redisClusterConfig) {
        LettuceClientConfiguration clientConfiguration = LettucePoolingClientConfiguration.builder().poolConfig(redisPool).build();
        return new LettuceConnectionFactory(redisClusterConfig, clientConfiguration);
    }

    /**
     * 配置第一个数据源的stringRedisTemplate
     * 注意:这里指定使用名称=factory 的 RedisConnectionFactory
     * 并且标识第一个数据源是默认数据源 @Primary
     *
     * @param redisConnectionFactory
     * @return
     */
    @Bean("stringRedisTemplate")
    @Primary
    public StringRedisTemplate stringRedisTemplate(@Qualifier("lettuceConnectionFactory") RedisConnectionFactory redisConnectionFactory) {
        return getRedisTemplate(redisConnectionFactory);
    }

    /**
     * 配置第二个数据源
     *
     * @return
     */
    @Bean("secondaryRedisClusterConfig")
    public RedisClusterConfiguration secondaryRedisConfig() {
        Map<String, Object> source = new HashMap<>(8);
        source.put("spring.redis.cluster.nodes", environment.getProperty("spring.secondaryRedis.cluster.nodes"));
        RedisClusterConfiguration redisClusterConfiguration;
        redisClusterConfiguration = new RedisClusterConfiguration(new MapPropertySource("RedisClusterConfiguration", source));
        redisClusterConfiguration.setPassword(RedisPassword.of(environment.getProperty("spring.secondaryRedis.password")));
        return redisClusterConfiguration;
    }

    @Bean("secondaryLettuceConnectionFactory")
    public LettuceConnectionFactory secondaryLettuceConnectionFactory(GenericObjectPoolConfig redisPool,
                                                                      @Qualifier("secondaryRedisClusterConfig")RedisClusterConfiguration secondaryRedisClusterConfig) {
        LettuceClientConfiguration clientConfiguration = LettucePoolingClientConfiguration.builder().poolConfig(redisPool).build();
        return new LettuceConnectionFactory(secondaryRedisClusterConfig, clientConfiguration);
    }

    /**
     * 配置第二个数据源的RedisTemplate
     * 注意:这里指定使用名称=factory2 的 RedisConnectionFactory
     *
     * @param redisConnectionFactory
     * @return
     */
    @Bean("secondaryRedisTemplate")
    public StringRedisTemplate secondaryRedisTemplate(@Qualifier("secondaryLettuceConnectionFactory") RedisConnectionFactory redisConnectionFactory) {
        return getRedisTemplate(redisConnectionFactory);
    }

    //返回Template并绑定RedisConnectionFactory
    private StringRedisTemplate getRedisTemplate(RedisConnectionFactory factory) {
        StringRedisTemplate template = new StringRedisTemplate();
        template.setConnectionFactory(factory);
        return template;
    }

}
/**
 * 监听执行方法
 * 
 */
@Slf4j
@Component
public class SendIndexPriceListener extends MessageListenerAdapter {

    @Autowired
    LocalCache localCache;

    @Autowired
    IndexPriceService indexPriceService;

    @Autowired
    private SingleContractStopCache singleContractStopCache;

    public SendIndexPriceListener() {
        super();
        super.afterPropertiesSet();
    }

    public void handleMessage(String message, String channel) {
        try {
            String currencyId = "";
            String contractIdStr = "";

            if (org.apache.commons.lang3.StringUtils.isEmpty(message)) {

                log.error("message error, message :: {} ", message);

            }

            SendIndexPriceDto sendIndexPriceDto = JSON.parseObject(message, SendIndexPriceDto.class);

            boolean isLostParam = (org.apache.commons.lang3.StringUtils.isEmpty(sendIndexPriceDto.getTradePair()));//判断币种名称
            boolean isLostIndexPrice = StringUtils.isEmpty(sendIndexPriceDto.getIndexPrice());//判断指数价格
            if (isLostParam || isLostIndexPrice) {
                log.error("isLostParam or isLostIndexPrice error, isLostParam or isLostIndexPrice :: {} ", "is null");
            }
            //获取全部数据
            List<Contract> contractList = 查询数据库或Redis数据;

            //业务逻辑
            String[] tradePairArray = sendIndexPriceDto.getTradePair().toUpperCase().split("_");
            if (tradePairArray.length > 0) {
                Map<String, String> tradePairMap = contractList.stream().collect(Collectors.toMap(a -> a.getCurrencyName().toUpperCase(), a -> a.getCurrencyId() + "_" + a.getSeq()));
                if (tradePairMap.containsKey(tradePairArray[0].toUpperCase())) {
                    String[] currencySeq = tradePairMap.get(tradePairArray[0].toUpperCase()).split("_");
                    currencyId = currencySeq[0];
                    contractIdStr = currencySeq[1];
                }
            } else {
                log.error("currencyName error, currencyName :: {}", "is null");
            }

            Long id = Long.valueOf(currencyId);
            BigDecimal price = new BigDecimal(sendIndexPriceDto.getIndexPrice());
            if (price.compareTo(BigDecimal.ZERO) <= 0) {
                log.error("price error, price :: {} < 0", price);
            }

            price = price.setScale(8, RoundingMode.DOWN);

            IndexPrice indexPriceBean = IndexPrice.builder()
                    .currencyId(id)
                    .price(price)
                    .createdDate(new Date())
                    .build();

            if (!CollectionUtils.isEmpty(contractList)) {
                Map<Long, Long> currencyContractMap = contractList.stream().collect(Collectors.toMap(a -> a.getCurrencyId(), a -> a.getSeq()));
                if (currencyContractMap.containsKey(id)) {
                    Long contractId = currencyContractMap.get(id);
                    if (singleContractStopCache.singleContractFlagStatus(contractId)) {
                        log.warn("contract is stop trade, indexPrice stop !!! contract ::: {}", currencyContractMap.get(id));
                    }
                    indexPriceBean.setContractId(currencyContractMap.get(id));
                }
            }
            indexPriceService.updIndexPrice(indexPriceBean);
            log.debug("update success,contract :: {}, index price :: {}, time :: {}", contractIdStr, sendIndexPriceDto.getIndexPrice(), sendIndexPriceDto.getCreateTime());
            } catch (Exception e) {
                log.error("deal channel :: 【{}】 message :: {} fail!", channel, message, e);
            }
        }
}
展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部