RabbitMQ使用多路由,多队列来破除流控

原创
2019/09/06 00:10
阅读数 3.2K

流控机制是我们在使用RabbitMQ最头疼的问题,一旦并发激增时,消费者消费队列消息就像滴水一样慢。

现在我们下单后,需要给通知中心发送消息,让通知中心通知服务商收取订单,并确认提供服务。

我们先给Order接口添加一个发送消息的方法。

public interface Order {
    public void makeOrder(Order order);
    public OrderSuccessResult getResult(Order order);
    public void postOrder(Order order);
}

实现类实现该方法

@Data
@AllArgsConstructor
@NoArgsConstructor
@ServiceOrderVersion(value = 1)
@RequiredArgsConstructor
public class ServiceOrder extends AbstractOrder {
    private Long id;
    @NonNull
    private String code;
    @NonNull
    private Store store;
    @NonNull
    private ProviderService service;
    @NonNull
    private Car car;
    @NonNull
    private Date serviceDate;
    @NonNull
    private String contact;
    @NonNull
    private String contactTel;
    private AppUser user;
    @NonNull
    private String content;
    private int status;
    private Date createDate;


    @Override
    public void makeOrder(Order order) {
        ServiceOrderDao serviceOrderDao = SpringBootUtil.getBean(ServiceOrderDao.class);
        IdService idService = SpringBootUtil.getBean(IdService.class);
        ((ServiceOrder)order).setId(idService.genId());
        ((ServiceOrder)order).setCode(getCodeInfo(idService));
        AppUser loginAppUser = AppUserUtil.getLoginAppUser();
        AppUser user = new AppUser();
        user.setId(loginAppUser.getId());
        user.setUsername(loginAppUser.getUsername());
        ((ServiceOrder)order).setUser(user);
        ((ServiceOrder)order).setStatus(1);
        ((ServiceOrder)order).setCreateDate(new Date());
        serviceOrderDao.save((ServiceOrder) order);
    }

    @Override
    public OrderSuccessResult getResult(Order order) {
        ServiceOrderSuccessResultFactory orderSuccessResultFactory = SpringBootUtil.getBean(ServiceOrderSuccessResultFactory.class);
        this.orderSuccessResult = orderSuccessResultFactory.getOrderSuccessResult();
        return this.orderSuccessResult.getResult(order);
    }

    @Override
    public void postOrder(Order order) {
        MessageSender sender = SpringBootUtil.getBean(MessageSender.class);
        CompletableFuture.runAsync(() ->
                sender.send(OwnerCarCenterMq.MQ_EXCHANGE_ORDER,
                        OwnerCarCenterMq.ROUTING_KEY_ORDER,
                        order)
        );
    }

    private String getCodeInfo(IdService idService) {
        String flow = String.valueOf(idService.genId());
        flow = flow.substring(14,flow.length());
        String pre = DateUtils.format(new Date(), DateUtils.pattern9);
        return pre + flow;
    }
}

其中我们定义了这么一组队列名,交换机,和路由

public interface OwnerCarCenterMq {
    /**
     * 队列名
     */
    String ORDER_QUEUE = "order";
    /**
     * 服务系统exchange名
     */
    String MQ_EXCHANGE_ORDER = "order.topic.exchange";

    /**
     * 服务添加routing key
     */
    String ROUTING_KEY_ORDER = "post.order";
}

为了避免流控,我们定义了10个队列,并全部绑定到一个交换机上。

@Configuration
public class RabbitmqConfig {

   @Bean
   public List<Queue> orderQueues() {
      List<Queue> queues = new ArrayList<>();
      for (int i = 1;i < 11;i++) {
         Queue queue = new Queue(OwnerCarCenterMq.ORDER_QUEUE + "_" + i);
         queues.add(queue);
      }
      return queues;
   }

   @Bean
   public TopicExchange orderExchange() {
      return new TopicExchange(OwnerCarCenterMq.MQ_EXCHANGE_ORDER);
   }


   @Bean
   public List<Binding> bindingOrders() {
      List<Binding> bindings = new ArrayList<>();
      for (int i = 1;i < 11;i++) {
         Binding binding = BindingBuilder.bind(orderQueues().get(i - 1)).to(orderExchange())
               .with(OwnerCarCenterMq.ROUTING_KEY_ORDER + "_" + i);
         bindings.add(binding);
      }
      return bindings;
   }
}

重新封装消息提供者,每次发送都随机选取一个路由来进行发送。

@Slf4j
@Component
public class MessageSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String exchange,String routingKey,Object content) {
        log.info("send content=" + content);
        this.rabbitTemplate.setMandatory(true);
        this.rabbitTemplate.setConfirmCallback(this);
        this.rabbitTemplate.setReturnCallback(this);
        ThreadLocalRandom random = ThreadLocalRandom.current();
        this.rabbitTemplate.convertAndSend(exchange,routingKey + "_" + random.nextInt(1,11),serialize(content));
    }

    /**
     * 确认后回调:
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            log.info("send ack fail, cause = " + cause);
        } else {
            log.info("send ack success");
        }
    }

    /**
     * 失败后return回调:
     *
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);
    }

    /**
     * 对消息对象进行二进制序列化
     * @param o
     * @return
     */
    private byte[] serialize(Object o) {
        Kryo kryo = new Kryo();
        ByteArrayOutputStream stream = new ByteArrayOutputStream();
        Output output = new Output(stream);
        kryo.writeObject(output, o);
        output.close();
        return stream.toByteArray();
    }
}

我们可以看到在ServiceOrder里,我们是通过异步来进行发送到。

Controller如下

@Slf4j
@RestController
public class OrderController {
    private ThreadLocal<OrderFactory> orderFactory = new ThreadLocal<>();
    private ThreadLocal<Order> orderService = new ThreadLocal<>();
    @Autowired
    private OrderBean orderBean;

    @Transactional
    @SuppressWarnings("unchecked")
    @PostMapping("/makeeorder")
    public Result<OrderSuccessResult> makeOrder(@RequestBody String orderStr, @RequestParam("type") String type) {
        log.info(orderStr);
        Order order = setOrderFactory(orderStr,type);
        orderService.get().makeOrder(order);
        orderService.get().postOrder(order);
        return Result.success(orderService.get().getResult(order));
    }

    /**
     * 判断是哪一种类型的订单来获取哪一种类型的具体订单工厂
     * @param orderStr
     * @return
     */
    private Order setOrderFactory(String orderStr,String type) {
        Class<?> classType = orderBean.getOrderMap().get(type);
        Object order = JSONObject.parseObject(orderStr, classType);
//        if (orderStr.contains("service")) {
//            order = JSON.parseObject(orderStr, ServiceOrder.class);
//        }else if (orderStr.contains("product")) {
//            order = JSON.parseObject(orderStr, ProductOrder.class);
//        }
        Class<?> classFactoryType = orderBean.getOrderFactoryMap().get(type + "Factory");
        this.orderFactory.set((OrderFactory) SpringBootUtil.getBean(classFactoryType));
//        if (order instanceof ServiceOrder) {
//            this.orderFactory.set(SpringBootUtil.getBean(ServiceOrderFactory.class));
//        }else if (order instanceof ProductOrder) {
//            this.orderFactory.set(SpringBootUtil.getBean(ProductOrderFactory.class));
//        }
        orderService.set(orderFactory.get().getOrder());
        return (Order) order;
    }
}

最后是在我们的通知中心模块接收消息,同时对这10个队列实行监控

@Slf4j
@Component
@RabbitListener(queues = {OwnerCarCenterMq.ORDER_QUEUE + "_" + 1,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 2,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 3,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 4,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 5,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 6,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 7,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 8,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 9,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 10})
public class ServiceOrderConsummer {
    @Getter
    private Queue<ServiceOrder> serviceOrders = new ConcurrentLinkedDeque<>();
    @RabbitHandler
    public void receiceOrder(byte[] data, Channel channel, Message message) throws IOException {
        try {
            //告诉服务器收到这条消息 已经被我消费了 可以在队列删掉;否则消息服务器以为这条消息没处理掉 后续还会在发
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            ServiceOrder order = unSerialize(data);
            this.serviceOrders.add(order);
            log.info(String.valueOf(order));
        } catch (IOException e) {
            e.printStackTrace();
            //丢弃这条消息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
            log.info("receiver fail");
        }
    }

    /**
     * 反序列化
     * @param data
     * @return
     */
    private ServiceOrder unSerialize(byte[] data) {
        Input input = null;
        try {
            Kryo kryo = new Kryo();
            input = new Input(new ByteArrayInputStream(data));
            return kryo.readObject(input,ServiceOrder.class);
        }
        finally {
            input.close();
        }
    }
}

项目启动后,我们可以看到rabbitmq的情况如下

现我们来对其进行压测,启动Jmeter,我们使用1000线程来进行压测测试。各配置如下

保存文件上传服务器,因为本人是华为云的服务器,故在服务器上进行压测,不进行远程压测

在服务器的jmeter的bin目录下输入

./jmeter -n -t model/rabbit.jmx -l log.jtl

这里-n为不启动图形界面,-t使用我们上传的配置文件,-l记录日志

压测结果如下

我们在压测过程中来看一下rabbitmq的UI界面

消费基本上是实时的,没有出现流控积压现象。

展开阅读全文
加载中

作者的其它热门文章

打赏
0
1 收藏
分享
打赏
0 评论
1 收藏
0
分享
返回顶部
顶部