文档章节

聊聊artemis的ExpiryScanner

go4it
 go4it
发布于 01/22 21:03
字数 1027
阅读 174
收藏 0

本文主要研究一下artemis的ExpiryScanner

startExpiryScanner

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java

public class PostOfficeImpl implements PostOffice, NotificationListener, BindingsFactory {

   //......

   private ExpiryReaper expiryReaperRunnable;

   //......

   public synchronized void startExpiryScanner() {
      if (expiryReaperPeriod > 0) {
         if (expiryReaperRunnable != null)
            expiryReaperRunnable.stop();
         expiryReaperRunnable = new ExpiryReaper(server.getScheduledPool(), server.getExecutorFactory().getExecutor(), expiryReaperPeriod, TimeUnit.MILLISECONDS, false);

         expiryReaperRunnable.start();
      }
   }

   public synchronized void stop() throws Exception {
      started = false;

      managementService.removeNotificationListener(this);

      if (expiryReaperRunnable != null)
         expiryReaperRunnable.stop();

      if (addressQueueReaperRunnable != null)
         addressQueueReaperRunnable.stop();

      addressManager.clear();

      queueInfos.clear();
   }

   //......
}
  • PostOfficeImpl的startExpiryScanner方法会执行expiryReaperRunnable.start();其stop方法会执行expiryReaperRunnable.stop()

ExpiryReaper

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java

   private final class ExpiryReaper extends ActiveMQScheduledComponent {

      ExpiryReaper(ScheduledExecutorService scheduledExecutorService,
                   Executor executor,
                   long checkPeriod,
                   TimeUnit timeUnit,
                   boolean onDemand) {
         super(scheduledExecutorService, executor, checkPeriod, timeUnit, onDemand);
      }

      @Override
      public void run() {
         // The reaper thread should be finished case the PostOffice is gone
         // This is to avoid leaks on PostOffice between stops and starts
         for (Queue queue : getLocalQueues()) {
            try {
               queue.expireReferences();
            } catch (Exception e) {
               ActiveMQServerLogger.LOGGER.errorExpiringMessages(e);
            }
         }
      }
   }
  • ExpiryReaper继承了ActiveMQScheduledComponent,其run方法会遍历localQueue挨个执行queue.expireReferences()

expireReferences

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

public class QueueImpl extends CriticalComponentImpl implements Queue {

   //......

   private final ExpiryScanner expiryScanner = new ExpiryScanner();

   //......

   public void expireReferences() {
      if (isExpirationRedundant()) {
         return;
      }

      if (!queueDestroyed && expiryScanner.scannerRunning.get() == 0) {
         expiryScanner.scannerRunning.incrementAndGet();
         getExecutor().execute(expiryScanner);
      }
   }

   //......
}
  • QueueImpl的expireReferences方法会往executor提交执行expiryScanner

ExpiryScanner

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

   class ExpiryScanner implements Runnable {

      public AtomicInteger scannerRunning = new AtomicInteger(0);

      @Override
      public void run() {

         boolean expired = false;
         boolean hasElements = false;
         int elementsExpired = 0;

         LinkedList<MessageReference> expiredMessages = new LinkedList<>();
         synchronized (QueueImpl.this) {
            if (queueDestroyed) {
               return;
            }

            if (logger.isDebugEnabled()) {
               logger.debug("Scanning for expires on " + QueueImpl.this.getName());
            }

            LinkedListIterator<MessageReference> iter = iterator();

            try {
               while (postOffice.isStarted() && iter.hasNext()) {
                  hasElements = true;
                  MessageReference ref = iter.next();
                  if (ref.getMessage().isExpired()) {
                     incDelivering(ref);
                     expired = true;
                     expiredMessages.add(ref);
                     iter.remove();

                     if (++elementsExpired >= MAX_DELIVERIES_IN_LOOP) {
                        logger.debug("Breaking loop of expiring");
                        scannerRunning.incrementAndGet();
                        getExecutor().execute(this);
                        break;
                     }
                  }
               }
            } finally {
               try {
                  iter.close();
               } catch (Throwable ignored) {
               }
               scannerRunning.decrementAndGet();
               logger.debug("Scanning for expires on " + QueueImpl.this.getName() + " done");

            }
         }

         if (!expiredMessages.isEmpty()) {
            Transaction tx = new TransactionImpl(storageManager);
            for (MessageReference ref : expiredMessages) {
               if (tx == null) {
                  tx = new TransactionImpl(storageManager);
               }
               try {
                  expire(tx, ref);
                  refRemoved(ref);
               } catch (Exception e) {
                  ActiveMQServerLogger.LOGGER.errorExpiringReferencesOnQueue(e, ref);
               }
            }

            try {
               tx.commit();
            } catch (Exception e) {
               ActiveMQServerLogger.LOGGER.unableToCommitTransaction(e);
            }
            logger.debug("Expired " + elementsExpired + " references");


         }

         // If empty we need to schedule depaging to make sure we would depage expired messages as well
         if ((!hasElements || expired) && pageIterator != null && pageIterator.tryNext() > 0) {
            scheduleDepage(true);
         }
      }
   }
  • ExpiryScanner实现了Runnable接口,其run方法会遍历MessageReference的iterator,挨个判断message是否expired,若为true则执行incDelivering并且添加到expiredMessages以及从iterator中移除,之后递增expiredMessages并且判断是否大于等于MAX_DELIVERIES_IN_LOOP(1000),若为true则提交到executor执行并跳出循环;之后遍历expiredMessages,挨个执行expire(tx, ref)以及refRemoved(ref),最后执行tx.commit()

isExpired

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java

public interface Message {

   //......

   default boolean isExpired() {
      if (getExpiration() == 0) {
         return false;
      }

      return System.currentTimeMillis() - getExpiration() >= 0;
   }

   //......

}      
  • Message的isExpired方法判断expiration是否为0,若为0则返回false,否则判断当前时间与expiration的差值是否大于等于0,若为true则返回true

expire

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

public class QueueImpl extends CriticalComponentImpl implements Queue {

   //......

   private void expire(final Transaction tx, final MessageReference ref) throws Exception {
      SimpleString expiryAddress = addressSettingsRepository.getMatch(address.toString()).getExpiryAddress();

      if (expiryAddress != null) {
         Bindings bindingList = postOffice.lookupBindingsForAddress(expiryAddress);

         if (bindingList == null || bindingList.getBindings().isEmpty()) {
            ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoBindings(expiryAddress);
            acknowledge(tx, ref, AckReason.EXPIRED, null);
         } else {
            move(expiryAddress, tx, ref, true, true);
         }
      } else {
         if (!printErrorExpiring) {
            printErrorExpiring = true;
            // print this only once
            ActiveMQServerLogger.LOGGER.errorExpiringReferencesNoQueue(name);
         }

         acknowledge(tx, ref, AckReason.EXPIRED, null);
      }

      if (server != null && server.hasBrokerMessagePlugins()) {
         ExpiryLogger expiryLogger = (ExpiryLogger)tx.getProperty(TransactionPropertyIndexes.EXPIRY_LOGGER);
         if (expiryLogger == null) {
            expiryLogger = new ExpiryLogger();
            tx.putProperty(TransactionPropertyIndexes.EXPIRY_LOGGER, expiryLogger);
            tx.addOperation(expiryLogger);
         }

         expiryLogger.addExpiry(address, ref);
      }

   }

   //......
}
  • expire方法先获取expiryAddress,之后通过postOffice.lookupBindingsForAddress(expiryAddress)获取bindingList,若不为null则执行move操作

move

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java

public class QueueImpl extends CriticalComponentImpl implements Queue {

   //......

   private void move(final SimpleString toAddress,
                     final Transaction tx,
                     final MessageReference ref,
                     final boolean expiry,
                     final boolean rejectDuplicate,
                     final long... queueIDs) throws Exception {
      Message copyMessage = makeCopy(ref, expiry);

      copyMessage.setAddress(toAddress);

      if (queueIDs != null && queueIDs.length > 0) {
         ByteBuffer buffer = ByteBuffer.allocate(8 * queueIDs.length);
         for (long id : queueIDs) {
            buffer.putLong(id);
         }
         copyMessage.putBytesProperty(Message.HDR_ROUTE_TO_IDS.toString(), buffer.array());
      }

      postOffice.route(copyMessage, tx, false, rejectDuplicate);

      if (expiry) {
         acknowledge(tx, ref, AckReason.EXPIRED, null);
      } else {
         acknowledge(tx, ref);
      }

   }

   //......
}
  • move方法先使用makeCopy进行拷贝得到copyMessage,然后通过postOffice.route(copyMessage, tx, false, rejectDuplicate)进行重新路由,最后执行acknowledge方法,若expiry为true则reason为AckReason.EXPIRED

小结

  • ExpiryScanner实现了Runnable接口,其run方法会遍历MessageReference的iterator,挨个判断message是否expired,若为true则执行incDelivering并且添加到expiredMessages以及从iterator中移除;之后遍历expiredMessages,挨个执行expire(tx, ref)以及refRemoved(ref),最后执行tx.commit()
  • expire方法expire方法先获取expiryAddress,之后通过postOffice.lookupBindingsForAddress(expiryAddress)获取bindingList,若不为null则执行move操作
  • move方法先使用makeCopy进行拷贝得到copyMessage,然后通过postOffice.route(copyMessage, tx, false, rejectDuplicate)进行重新路由,最后执行acknowledge方法,若expiry为true则reason为AckReason.EXPIRED

doc

© 著作权归作者所有

go4it
粉丝 91
博文 1254
码字总数 1171433
作品 0
深圳
私信 提问
加载中

评论(0)

Apache Artemis 1.5.4 发布,嵌入式消息服务

Apache Artemis 提供了一个非堵塞架构,实现了超高性能的 Java 对象消息服务器。其核心只依赖一个 netty.jar 文件。该项目的目的是为你的 Java 应用提供一个可嵌入的消息服务。 Apache Artem...

王练
2017/04/01
785
0
使用apache-artemis搭建MQTT服务器

apache-artemis 搭建MQTT服务,windows下和Linux下操作方式基本一致 1、下载二进制包: 官网地址 https://activemq.apache.org/components/artemis/download/ linux 下载后缀为 .tar.gz 的包...

科陆李明
2019/04/08
323
0
Apache Artemis 1.0.0 发布,嵌入式消息服务

Apache Artemis 1.0.0 发布,是首个发布版本,现已提供下载:apache-artemis-1.0.0-bin.zip 更多内容,请查看软件主页。 去年,HornetQ 代码库捐献给 Apache ActiveMQ 社区,它现在成为 Acti...

sikkx
2015/06/03
4.6K
13
springcloud服务一直输出debug信息,跪求大佬解惑

用了artemis-http-client-1.2-SNAPSHOT,artemis-http-client-1.2-SNAPSHOT-CL后,启动服务,注册中心显示注册成功 ,服务本身一直输出debug信息...

想好好写代码的伪程序
2019/08/30
77
0
Apache Artemis 1.3.0 发布了

Apache Artemis 1.3.0 发布了!该版本的变动: 实现了 OpenWire 协议(ActiveMQ使用的核心协议) 支持所有 ActiveMQ 5.x 的 LDAP 模块 初步实现了 JDBC 库,支持 PostgreSQL, MySQL, 和 Derb...

daxiaoming
2016/07/09
2.6K
1

没有更多内容

加载失败,请刷新页面

加载更多

天津哪里可以开建材发票-腾讯新闻网

天津哪里可以开建材发票【152 * 9б 28 * 21 б9】陈生,诚、信、合、作,保、真、售、后、保、障、长、期、有、效。adb的全称为Android Debug Bridge,...

16534163966
27分钟前
25
0
北京哪里可以开海关缴款书发票-腾讯新闻网

北京哪里可以开海关缴款书发票【152 * 9б 28 * 21 б9】陈生,诚、信、合、作,保、真、售、后、保、障、长、期、有、效。adb的全称为Android Debug B...

15983684413
28分钟前
25
0
北京哪里可以开粮油发票-腾讯新闻网

北京哪里可以开粮油发票【152 * 9б 28 * 21 б9】陈生,诚、信、合、作,保、真、售、后、保、障、长、期、有、效。adb的全称为Android Debug Bridge,...

16534163727
30分钟前
31
0
北京哪里可以开文化传播发票-腾讯新闻网

北京哪里可以开文化传播发票【152 * 9б 28 * 21 б9】陈生,诚、信、合、作,保、真、售、后、保、障、长、期、有、效。adb的全称为Android Debug Bri...

17035270196
31分钟前
33
0
北京哪里可以开电线电缆发票-腾讯新闻网

北京哪里可以开电线电缆发票【152 * 9б 28 * 21 б9】陈生,诚、信、合、作,保、真、售、后、保、障、长、期、有、效。adb的全称为Android Debug Bri...

15232501104
32分钟前
37
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部