5.【Spring Cloud Alibaba】消息驱动的微服务-SpringCloudAlibabaRocketMQ

2019/04/10 10:10
阅读数 47

Spring实现异步的方式

image

引入MQ后的架构演进

image

MQ的使用场景

  • 异步处理
  • 流量削峰填谷
  • 解耦微服务

MQ的选择

image

mq对比详情

mq对比详情

image

搭建MQ

搭建教程

搭建RocketMq控制台

RocketMQ控制台安装教程

RocketMq的术语与概念

image

image

RocketMQ进阶

==看官方RocketMQ指导==

消息编程模型01-编写生产者

image

pom.xml
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
</dependency>
application.yml

image

代码实现
private final RocketMQTemplate rocketMQTemplate;

image

消息编程模型02-编写消费者

image

pom.xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.3</version>
</dependency>
application.yml

image

代码实现

image

分布式事务01-流程剖析,概念术语,事务消息状态

image

image

image

分布式事务02-编码实现

表创建
CREATE TABLE `rocketmq_transaction_log` (
`id`  int(11) NOT NULL AUTO_INCREMENT COMMENT '主键' ,
`transaction_Id`  varchar(45) NOT NULL COMMENT '事务' ,
`log`  varchar(45) NOT NULL COMMENT '日志' ,
PRIMARY KEY (`id`)
);
ShareService
public Share auditById(Integer id, ShareAuditDTO auditDTO) {
    // 1. 查询share是否存在,不存在或者当前的audit_status != NOT_YET,那么抛异常
    Share share = this.shareMapper.selectByPrimaryKey(id);
    if (share == null) {
        throw new IllegalArgumentException("参数非法!该分享不存在!");
    }
    if (!Objects.equals("NOT_YET", share.getAuditStatus())) {
        throw new IllegalArgumentException("参数非法!该分享已审核通过或审核不通过!");
    }

    // 3. 如果是PASS,那么发送消息给rocketmq,让用户中心去消费,并为发布人添加积分
    if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) {
        // 发送半消息。。
        String transactionId = UUID.randomUUID().toString();

        this.rocketMQTemplate.sendMessageInTransaction(
                "tx-add-bonus-group",
                "add-bonus",
                MessageBuilder
                    .withPayload(
                        UserAddBonusMsgDTO.builder()
                            .userId(share.getUserId())
                            .bonus(50)
                            .build()
                    )
                    // header也有妙用...
                    .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                    .setHeader("share_id", id)
                    .setHeader("dto", JSON.toJSONString(auditDTO))
                    .build(),
                auditDTO
            );
    }
    else {
        this.auditByIdInDB(id, auditDTO);
    }
    return share;
}

@Transactional(rollbackFor = Exception.class)
public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {
    Share share = Share.builder()
        .id(id)
        .auditStatus(auditDTO.getAuditStatusEnum().toString())
        .reason(auditDTO.getReason())
        .build();
    this.shareMapper.updateByPrimaryKeySelective(share);

    // 4. 把share写到缓存
}

@Transactional(rollbackFor = Exception.class)
public void auditByIdWithRocketMqLog(Integer id, ShareAuditDTO auditDTO, String transactionId) {
    this.auditByIdInDB(id, auditDTO);

    this.rocketmqTransactionLogMapper.insertSelective(
        RocketmqTransactionLog.builder()
            .transactionId(transactionId)
            .log("审核分享...")
            .build()
    );
}
AddBonusTransactionListener
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
    private final ShareService shareService;
    private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        MessageHeaders headers = msg.getHeaders();

        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        Integer shareId = Integer.valueOf((String) headers.get("share_id"));

        ShareAuditDTO auditDTO = (ShareAuditDTO)arg;

        try {
            this.shareService.auditByIdWithRocketMqLog(shareId, auditDTO, transactionId);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        MessageHeaders headers = msg.getHeaders();
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);

        // select * from xxx where transaction_id = xxx
        RocketmqTransactionLog transactionLog = this.rocketmqTransactionLogMapper.selectOne(
            RocketmqTransactionLog.builder()
                .transactionId(transactionId)
                .build()
        );
        if (transactionLog != null) {
            return RocketMQLocalTransactionState.COMMIT;
        }
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}

SpringCloudStream

image

image

image

SpringCloudStream-编写生产者

pom.xml
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
application.yml

image

代码编写

==@EnableBinding({Source.class})==

// 扫描mybatis哪些包里面的接口
@MapperScan("com.itmuch.contentcenter.dao")
@SpringBootApplication
@EnableFeignClients// (defaultConfiguration = GlobalFeignConfiguration.class)
@EnableBinding({Source.class})
public class ContentCenterApplication {

image

==修改com.alibaba.nacos日志级别==

logging:
  level:
    com.itmuch.contentcenter.feignclient.UserCenterFeignClient: debug
    com.alibaba.nacos: error

SpringCloudStream-编写消费者

pom.xml
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.0.3</version>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
application.yml

image

代码编写

==@EnableBinding({Sink.class})==

// 扫描mybatis哪些包里面的接口
@MapperScan("com.itmuch.usercenter.dao")
@SpringBootApplication
//@EnableDiscoveryClient
@EnableBinding({Sink.class})
public class UserCenterApplication {

    public static void main(String[] args) {
        SpringApplication.run(UserCenterApplication.class, args);
    }

}

image

SpringCloudStream自定义接口-发送消息

image

image

image

image

SpringCloudStream自定义接口-消费消息

image

image

image

image

==修改com.alibaba.nacos日志级别==

logging:
  level:
    com.alibaba.nacos: error

消息过滤

Spring Cloud Stream实现消息过滤消费

  • condition
  • Tags
  • Sql 92

SpringCloudStream的监控

如下==三个链接查看SpringCloudStream==的监控 image

application.yml
management:
  endpoints:
    web:
      exposure:
        include: '*'
  endpoint:
    health:
      show-details: always

SpringCloudStream的异常处理

SpringCloudStream的异常处理

全局处理【通用】
@StreamListener(value = Processor.INPUT)
public void handle(String body) {
    throw new RuntimeException("x");
}

@StreamListener("errorChannel")
public void error(Message<?> message) {
    ErrorMessage errorMessage = (ErrorMessage) message;
    System.out.println("Handling ERROR: " + errorMessage);
}

SpringCloudStream+RocketMQ实现分布式事务01-重构生产者

application.yml
spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
        bindings:
          output:
            producer:
              transactional: true
              group: tx-add-bonus-group
      bindings:
        output:
          # 用来指定topic
          destination: add-bonus

==@EnableBinding({Source.class})==

@MapperScan("com.itmuch.contentcenter.dao")
@SpringBootApplication
@EnableFeignClients// (defaultConfiguration = GlobalFeignConfiguration.class)
@EnableBinding({Source.class})
public class ContentCenterApplication {
ShareService
public Share auditById(Integer id, ShareAuditDTO auditDTO) {
    // 1. 查询share是否存在,不存在或者当前的audit_status != NOT_YET,那么抛异常
    Share share = this.shareMapper.selectByPrimaryKey(id);
    if (share == null) {
        throw new IllegalArgumentException("参数非法!该分享不存在!");
    }
    if (!Objects.equals("NOT_YET", share.getAuditStatus())) {
        throw new IllegalArgumentException("参数非法!该分享已审核通过或审核不通过!");
    }

    // 3. 如果是PASS,那么发送消息给rocketmq,让用户中心去消费,并为发布人添加积分
    if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) {
        // 发送半消息。。
        String transactionId = UUID.randomUUID().toString();

        this.source.output()
            .send(
                MessageBuilder
                    .withPayload(
                        UserAddBonusMsgDTO.builder()
                            .userId(share.getUserId())
                            .bonus(50)
                            .build()
                    )
                    // header也有妙用...
                    .setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId)
                    .setHeader("share_id", id)
                    .setHeader("dto", JSON.toJSONString(auditDTO))
                    .build()
            );
    }
    else {
        this.auditByIdInDB(id, auditDTO);
    }
    return share;
}

@Transactional(rollbackFor = Exception.class)
public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {
    Share share = Share.builder()
        .id(id)
        .auditStatus(auditDTO.getAuditStatusEnum().toString())
        .reason(auditDTO.getReason())
        .build();
    this.shareMapper.updateByPrimaryKeySelective(share);

    // 4. 把share写到缓存
}

@Transactional(rollbackFor = Exception.class)
public void auditByIdWithRocketMqLog(Integer id, ShareAuditDTO auditDTO, String transactionId) {
    this.auditByIdInDB(id, auditDTO);

    this.rocketmqTransactionLogMapper.insertSelective(
        RocketmqTransactionLog.builder()
            .transactionId(transactionId)
            .log("审核分享...")
            .build()
    );
}
AddBonusTransactionListener
@RocketMQTransactionListener(txProducerGroup = "tx-add-bonus-group")
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {
    private final ShareService shareService;
    private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        MessageHeaders headers = msg.getHeaders();

        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);
        Integer shareId = Integer.valueOf((String) headers.get("share_id"));

        String dtoString = (String) headers.get("dto");
        ShareAuditDTO auditDTO = JSON.parseObject(dtoString, ShareAuditDTO.class);

        try {
            this.shareService.auditByIdWithRocketMqLog(shareId, auditDTO, transactionId);
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        MessageHeaders headers = msg.getHeaders();
        String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);

        // select * from xxx where transaction_id = xxx
        RocketmqTransactionLog transactionLog = this.rocketmqTransactionLogMapper.selectOne(
            RocketmqTransactionLog.builder()
                .transactionId(transactionId)
                .build()
        );
        if (transactionLog != null) {
            return RocketMQLocalTransactionState.COMMIT;
        }
        return RocketMQLocalTransactionState.ROLLBACK;
    }
}

SpringCloudStream+RocketMQ实现分布式事务02-重构消费者

application.yml
spring:
  cloud:
    stream:
      rocketmq:
        binder:
          name-server: 127.0.0.1:9876
      bindings:
        input:
          destination: add-bonus
          group: binder-group

==@EnableBinding({Sink.class})==

// 扫描mybatis哪些包里面的接口
@MapperScan("com.itmuch.usercenter.dao")
@SpringBootApplication
//@EnableDiscoveryClient
@EnableBinding({Sink.class})
public class UserCenterApplication {

    public static void main(String[] args) {
        SpringApplication.run(UserCenterApplication.class, args);
    }

}
AddBonusStreamConsumer
@Service
@RequiredArgsConstructor(onConstructor = @__(@Autowired))
@Slf4j
public class AddBonusStreamConsumer {
    private final UserService userService;

    @StreamListener(Sink.INPUT)
    public void receive(UserAddBonusMsgDTO message) {
        message.setEvent("CONTRIBUTE");
        message.setDescription("投稿加积分..");
        this.userService.addBonus(message);
    }
}
UserService
@Transactional(rollbackFor = Exception.class)
public void addBonus(UserAddBonusMsgDTO msgDTO) {
    // 1. 为用户加积分
    Integer userId = msgDTO.getUserId();
    Integer bonus = msgDTO.getBonus();
    User user = this.userMapper.selectByPrimaryKey(userId);

    user.setBonus(user.getBonus() + bonus);
    this.userMapper.updateByPrimaryKeySelective(user);

    // 2. 记录日志到bonus_event_log表里面
    this.bonusEventLogMapper.insert(
        BonusEventLog.builder()
            .userId(userId)
            .value(bonus)
            .event(msgDTO.getEvent())
            .createTime(new Date())
            .description(msgDTO.getDescription())
            .build()
    );
    log.info("积分添加完毕...");
}

SpringCloudStream知识盘点

SpringCloudStream知识盘点

原文出处:https://www.cnblogs.com/xjknight/p/12349104.html

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部