文档章节

聊聊artemis的maxDeliveryAttempts

go4it
 go4it
发布于 01/27 10:38
字数 1000
阅读 143
收藏 0

本文主要研究一下artemis的maxDeliveryAttempts

maxDeliveryAttempts

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java

public class AddressSettings implements Mergeable<AddressSettings>, Serializable, EncodingSupport {

   //......

   public static final int DEFAULT_MAX_DELIVERY_ATTEMPTS = 10;

   private Integer maxDeliveryAttempts = null;

   private SimpleString deadLetterAddress = null;

   //......

   public int getMaxDeliveryAttempts() {
      return maxDeliveryAttempts != null ? maxDeliveryAttempts : AddressSettings.DEFAULT_MAX_DELIVERY_ATTEMPTS;
   }

   public AddressSettings setMaxDeliveryAttempts(final int maxDeliveryAttempts) {
      this.maxDeliveryAttempts = maxDeliveryAttempts;
      return this;
   }

   public SimpleString getDeadLetterAddress() {
      return deadLetterAddress;
   }

   public AddressSettings setDeadLetterAddress(final SimpleString deadLetterAddress) {
      this.deadLetterAddress = deadLetterAddress;
      return this;
   }

   //......
}   
  • AddressSettings定义了maxDeliveryAttempts及deadLetterAddress属性,其getMaxDeliveryAttempts方法在maxDeliveryAttempts为null时返回AddressSettings.DEFAULT_MAX_DELIVERY_ATTEMPTS,默认值为10

checkRedelivery

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

public class QueueImpl extends CriticalComponentImpl implements Queue {

   //......

   public Pair<Boolean, Boolean> checkRedelivery(final MessageReference reference,
                                  final long timeBase,
                                  final boolean ignoreRedeliveryDelay) throws Exception {
      Message message = reference.getMessage();

      if (internalQueue) {
         if (logger.isTraceEnabled()) {
            logger.trace("Queue " + this.getName() + " is an internal queue, no checkRedelivery");
         }
         // no DLQ check on internal queues
         return new Pair<>(true, false);
      }

      if (!internalQueue && reference.isDurable() && isDurable() && !reference.isPaged()) {
         storageManager.updateDeliveryCount(reference);
      }

      AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());

      int maxDeliveries = addressSettings.getMaxDeliveryAttempts();
      int deliveryCount = reference.getDeliveryCount();

      // First check DLA
      if (maxDeliveries > 0 && deliveryCount >= maxDeliveries) {
         if (logger.isTraceEnabled()) {
            logger.trace("Sending reference " + reference + " to DLA = " + addressSettings.getDeadLetterAddress() + " since ref.getDeliveryCount=" + reference.getDeliveryCount() + "and maxDeliveries=" + maxDeliveries + " from queue=" + this.getName());
         }
         boolean dlaResult = sendToDeadLetterAddress(null, reference, addressSettings.getDeadLetterAddress());

         return new Pair<>(false, dlaResult);
      } else {
         // Second check Redelivery Delay
         long redeliveryDelay = addressSettings.getRedeliveryDelay();
         if (!ignoreRedeliveryDelay && redeliveryDelay > 0) {
            redeliveryDelay = calculateRedeliveryDelay(addressSettings, deliveryCount);

            if (logger.isTraceEnabled()) {
               logger.trace("Setting redeliveryDelay=" + redeliveryDelay + " on reference=" + reference);
            }

            reference.setScheduledDeliveryTime(timeBase + redeliveryDelay);

            if (!reference.isPaged() && reference.isDurable() && isDurable()) {
               storageManager.updateScheduledDeliveryTime(reference);
            }
         }

         decDelivering(reference);

         return new Pair<>(true, false);
      }
   }

   //......
}
  • QueueImpl的checkRedelivery方法会对比deliveryCount与maxDeliveries,当deliveryCount大于0且大于等于maxDeliveries时会执行sendToDeadLetterAddress

sendToDeadLetterAddress

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

public class QueueImpl extends CriticalComponentImpl implements Queue {
   
   //......

   private boolean sendToDeadLetterAddress(final Transaction tx,
                                        final MessageReference ref,
                                        final SimpleString deadLetterAddress) throws Exception {
      if (deadLetterAddress != null) {
         Bindings bindingList = postOffice.lookupBindingsForAddress(deadLetterAddress);

         if (bindingList == null || bindingList.getBindings().isEmpty()) {
            ActiveMQServerLogger.LOGGER.messageExceededMaxDelivery(ref, deadLetterAddress);
            ref.acknowledge(tx, AckReason.KILLED, null);
         } else {
            ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name);
            move(tx, deadLetterAddress, null, ref, false, AckReason.KILLED, null);
            return true;
         }
      } else {
         ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(ref, name);

         ref.acknowledge(tx, AckReason.KILLED, null);
      }

      return false;
   }

   private void move(final Transaction originalTX,
                     final SimpleString address,
                     final Binding binding,
                     final MessageReference ref,
                     final boolean rejectDuplicate,
                     final AckReason reason,
                     final ServerConsumer consumer) throws Exception {
      Transaction tx;

      if (originalTX != null) {
         tx = originalTX;
      } else {
         // if no TX we create a new one to commit at the end
         tx = new TransactionImpl(storageManager);
      }

      Message copyMessage = makeCopy(ref, reason == AckReason.EXPIRED);

      copyMessage.setAddress(address);

      postOffice.route(copyMessage, tx, false, rejectDuplicate, binding);

      acknowledge(tx, ref, reason, consumer);

      if (originalTX == null) {
         tx.commit();
      }
   }

   //......
}
  • sendToDeadLetterAddress方法在bindingList不为空的情况下会执行move操作,move到deadLetterAddress,其AckReason为AckReason.KILLED

incrementDeliveryCount

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java

public class ServerConsumerImpl implements ServerConsumer, ReadyListener {

   //......

   public HandleStatus handle(final MessageReference ref) throws Exception {
      // available credits can be set back to null with a flow control option.
      AtomicInteger checkInteger = availableCredits;
      if (callback != null && !callback.hasCredits(this) || checkInteger != null && checkInteger.get() <= 0) {
         if (logger.isDebugEnabled()) {
            logger.debug(this + " is busy for the lack of credits. Current credits = " +
                            availableCredits +
                            " Can't receive reference " +
                            ref);
         }

         return HandleStatus.BUSY;
      }

      synchronized (lock) {
         // If the consumer is stopped then we don't accept the message, it
         // should go back into the
         // queue for delivery later.
         // TCP-flow control has to be done first than everything else otherwise we may lose notifications
         if ((callback != null && !callback.isWritable(this, protocolContext)) || !started || transferring) {
            return HandleStatus.BUSY;
         }

         // If there is a pendingLargeMessage we can't take another message
         // This has to be checked inside the lock as the set to null is done inside the lock
         if (largeMessageDeliverer != null) {
            if (logger.isDebugEnabled()) {
               logger.debug(this + " is busy delivering large message " +
                               largeMessageDeliverer +
                               ", can't deliver reference " +
                               ref);
            }
            return HandleStatus.BUSY;
         }
         final Message message = ref.getMessage();

         if (!message.acceptsConsumer(sequentialID())) {
            return HandleStatus.NO_MATCH;
         }

         if (filter != null && !filter.match(message)) {
            if (logger.isTraceEnabled()) {
               logger.trace("Reference " + ref + " is a noMatch on consumer " + this);
            }
            return HandleStatus.NO_MATCH;
         }

         if (logger.isTraceEnabled()) {
            logger.trace("ServerConsumerImpl::" + this + " Handling reference " + ref);
         }
         if (!browseOnly) {
            if (!preAcknowledge) {
               deliveringRefs.add(ref);
            }

            ref.handled();

            ref.setConsumerId(this.id);

            ref.incrementDeliveryCount();

            // If updateDeliveries = false (set by strict-update),
            // the updateDeliveryCountAfterCancel would still be updated after c
            if (strictUpdateDeliveryCount && !ref.isPaged()) {
               if (ref.getMessage().isDurable() && ref.getQueue().isDurable() &&
                  !ref.getQueue().isInternalQueue() &&
                  !ref.isPaged()) {
                  storageManager.updateDeliveryCount(ref);
               }
            }

            if (preAcknowledge) {
               if (message.isLargeMessage()) {
                  // we must hold one reference, or the file will be deleted before it could be delivered
                  ((LargeServerMessage) message).incrementDelayDeletionCount();
               }

               // With pre-ack, we ack *before* sending to the client
               ref.getQueue().acknowledge(ref, this);
               acks++;
            }

            if (message.isLargeMessage() && this.supportLargeMessage) {
               largeMessageDeliverer = new LargeMessageDeliverer((LargeServerMessage) message, ref);
            }

         }

         pendingDelivery.countUp();

         return HandleStatus.HANDLED;
      }
   }

   //......
}
  • ServerConsumerImpl的handle方法会在非browseOnly的情况下会调用ref.incrementDeliveryCount()来增加deliveryCount;必要的时候会执行storageManager.updateDeliveryCount(ref)

updateDeliveryCount

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java

public abstract class AbstractJournalStorageManager extends CriticalComponentImpl implements StorageManager {

   //......

   public void updateDeliveryCount(final MessageReference ref) throws Exception {
      // no need to store if it's the same value
      // otherwise the journal will get OME in case of lots of redeliveries
      if (ref.getDeliveryCount() == ref.getPersistedCount()) {
         return;
      }

      ref.setPersistedCount(ref.getDeliveryCount());
      DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(), ref.getDeliveryCount());

      readLock();
      try {
         messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(), JournalRecordIds.UPDATE_DELIVERY_COUNT, updateInfo, syncNonTransactional, getContext(syncNonTransactional));
      } finally {
         readUnLock();
      }
   }

   //......
}
  • AbstractJournalStorageManager的updateDeliveryCount方法会更新persistedCount到storage

小结

AddressSettings定义了maxDeliveryAttempts及deadLetterAddress属性,其getMaxDeliveryAttempts方法在maxDeliveryAttempts为null时返回AddressSettings.DEFAULT_MAX_DELIVERY_ATTEMPTS,默认值为10;QueueImpl的checkRedelivery方法会对比deliveryCount与maxDeliveries,当deliveryCount大于0且大于等于maxDeliveries时会执行sendToDeadLetterAddress;sendToDeadLetterAddress方法在bindingList不为空的情况下会执行move操作,move到deadLetterAddress,其AckReason为AckReason.KILLED

doc

© 著作权归作者所有

go4it
粉丝 91
博文 1259
码字总数 1174443
作品 0
深圳
私信 提问
加载中

评论(0)

Apache Artemis 1.5.4 发布,嵌入式消息服务

Apache Artemis 提供了一个非堵塞架构,实现了超高性能的 Java 对象消息服务器。其核心只依赖一个 netty.jar 文件。该项目的目的是为你的 Java 应用提供一个可嵌入的消息服务。 Apache Artem...

王练
2017/04/01
785
0
使用apache-artemis搭建MQTT服务器

apache-artemis 搭建MQTT服务,windows下和Linux下操作方式基本一致 1、下载二进制包: 官网地址 https://activemq.apache.org/components/artemis/download/ linux 下载后缀为 .tar.gz 的包...

科陆李明
2019/04/08
330
0
Apache Artemis 1.0.0 发布,嵌入式消息服务

Apache Artemis 1.0.0 发布,是首个发布版本,现已提供下载:apache-artemis-1.0.0-bin.zip 更多内容,请查看软件主页。 去年,HornetQ 代码库捐献给 Apache ActiveMQ 社区,它现在成为 Acti...

sikkx
2015/06/03
4.6K
13
springcloud服务一直输出debug信息,跪求大佬解惑

用了artemis-http-client-1.2-SNAPSHOT,artemis-http-client-1.2-SNAPSHOT-CL后,启动服务,注册中心显示注册成功 ,服务本身一直输出debug信息...

想好好写代码的伪程序
2019/08/30
77
0
Apache Artemis 1.3.0 发布了

Apache Artemis 1.3.0 发布了!该版本的变动: 实现了 OpenWire 协议(ActiveMQ使用的核心协议) 支持所有 ActiveMQ 5.x 的 LDAP 模块 初步实现了 JDBC 库,支持 PostgreSQL, MySQL, 和 Derb...

daxiaoming
2016/07/09
2.6K
1

没有更多内容

加载失败,请刷新页面

加载更多

 企业信息平台的快速搭建,框架如何选?

Web端开发框架如何选 目前,大部分的企业信息集成系统都在web端运行,而搭建框架的选择对一个企业的发展至关重要,不过其最终目的都是要符合企业发展逻辑,助力企业战略的实施。 而在框架的选...

我想造火箭
30分钟前
32
0
安装mysql 实操截图

前言: CentOS 7 版本将MySQL数据库软件从默认的程序列表中移除,用MariaDB代替了,MariaDB数据库管理系统是MySQL的一个分支,主要由开源社区在维护,采用GPL授权许可。开发这个分支的原因之...

冥焱
34分钟前
57
0
FecMall 多商户分销系统 - 价格公式计算

FecMall Fecbdc 分销价格公式计算 本章详解讲述分销平台的各个价格,以及相应的设置,本章节非常重要,贯穿分销系统的整个流程,请仔细阅读 官网: http://www.fecmall.com/ 业务逻辑设计 系...

FecShop
35分钟前
33
0
Java Web 学习笔记(7)

文件下载 package com.janeroad.servlet;import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.......

JaneRoad
38分钟前
41
0
如何在JavaScript中更改span元素的文本

如果我有跨度,请说: <span id="myspan"> hereismytext </span> 如何使用JavaScript将“ hereismytext”更改为“ newtext”? #1楼 对于现代浏览器,您应该使用: document.getElementByI......

技术盛宴
40分钟前
46
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部