文档章节

聊聊artemis ClientConsumer的handleRegularMessage

go4it
 go4it
发布于 01/17 23:28
字数 766
阅读 28
收藏 0

本文主要研究一下artemis ClientConsumer的handleRegularMessage

handleRegularMessage

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java

public final class ClientConsumerImpl implements ClientConsumerInternal {

   //......

   private final PriorityLinkedList<ClientMessageInternal> buffer = new PriorityLinkedListImpl<>(ClientConsumerImpl.NUM_PRIORITIES);

   private final Runner runner = new Runner();

   private volatile MessageHandler handler;

   //......

   private void handleRegularMessage(ClientMessageInternal message) {
      if (message.getAddress() == null) {
         message.setAddress(queueInfo.getAddress());
      }

      message.onReceipt(this);

      if (!ackIndividually && message.getPriority() != 4 && !message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {
         // We have messages of different priorities so we need to ack them individually since the order
         // of them in the ServerConsumerImpl delivery list might not be the same as the order they are
         // consumed in, which means that acking all up to won't work
         ackIndividually = true;
      }

      // Add it to the buffer
      buffer.addTail(message, message.getPriority());

      if (handler != null) {
         // Execute using executor
         if (!stopped) {
            queueExecutor();
         }
      } else {
         notify();
      }
   }

   private void queueExecutor() {
      if (logger.isTraceEnabled()) {
         logger.trace(this + "::Adding Runner on Executor for delivery");
      }

      sessionExecutor.execute(runner);
   }

   //......
}
  • ClientConsumerImpl的handleRegularMessage方法先执行buffer.addTail(message, message.getPriority()),之后对于handler不为null的会执行queueExecutor(),否则执行notify();queueExecutor方法是通过sessionExecutor执行runner

Runner

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientConsumerImpl.java

public final class ClientConsumerImpl implements ClientConsumerInternal {

   //......

   private class Runner implements Runnable {

      @Override
      public void run() {
         try {
            callOnMessage();
         } catch (Exception e) {
            ActiveMQClientLogger.LOGGER.onMessageError(e);

            lastException = e;
         }
      }
   }

   private void callOnMessage() throws Exception {
      if (closing || stopped) {
         return;
      }

      session.workDone();

      // We pull the message from the buffer from inside the Runnable so we can ensure priority
      // ordering. If we just added a Runnable with the message to the executor immediately as we get it
      // we could not do that

      ClientMessageInternal message;

      // Must store handler in local variable since might get set to null
      // otherwise while this is executing and give NPE when calling onMessage
      MessageHandler theHandler = handler;

      if (theHandler != null) {
         if (rateLimiter != null) {
            rateLimiter.limit();
         }

         failedOver = false;

         synchronized (this) {
            message = buffer.poll();
         }

         if (message != null) {
            if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) {
               //Ignore, this could be a relic from a previous receiveImmediate();
               return;
            }

            boolean expired = message.isExpired();

            flowControlBeforeConsumption(message);

            if (!expired) {
               if (logger.isTraceEnabled()) {
                  logger.trace(this + "::Calling handler.onMessage");
               }
               final ClassLoader originalLoader = AccessController.doPrivileged(new PrivilegedAction<ClassLoader>() {
                  @Override
                  public ClassLoader run() {
                     ClassLoader originalLoader = Thread.currentThread().getContextClassLoader();

                     Thread.currentThread().setContextClassLoader(contextClassLoader);

                     return originalLoader;
                  }
               });

               onMessageThread = Thread.currentThread();
               try {
                  theHandler.onMessage(message);
               } finally {
                  try {
                     AccessController.doPrivileged(new PrivilegedAction<Object>() {
                        @Override
                        public Object run() {
                           Thread.currentThread().setContextClassLoader(originalLoader);
                           return null;
                        }
                     });
                  } catch (Exception e) {
                     ActiveMQClientLogger.LOGGER.failedPerformPostActionsOnMessage(e);
                  }

                  onMessageThread = null;
               }

               if (logger.isTraceEnabled()) {
                  logger.trace(this + "::Handler.onMessage done");
               }

               if (message.isLargeMessage()) {
                  message.discardBody();
               }
            } else {
               session.expire(this, message);
            }

            // If slow consumer, we need to send 1 credit to make sure we get another message
            if (clientWindowSize == 0) {
               startSlowConsumer();
            }
         }
      }
   }

   private void flowControlBeforeConsumption(final ClientMessageInternal message) throws ActiveMQException {
      // Chunk messages will execute the flow control while receiving the chunks
      if (message.getFlowControlSize() != 0) {
         // on large messages we should discount 1 on the first packets as we need continuity until the last packet
         flowControl(message.getFlowControlSize(), !message.isLargeMessage());
      }
   }

   public void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws ActiveMQException {
      if (clientWindowSize >= 0) {
         creditsToSend += messageBytes;

         if (creditsToSend >= clientWindowSize) {
            if (clientWindowSize == 0 && discountSlowConsumer) {
               if (logger.isTraceEnabled()) {
                  logger.trace(this + "::FlowControl::Sending " + creditsToSend + " -1, for slow consumer");
               }

               // sending the credits - 1 initially send to fire the slow consumer, or the slow consumer would be
               // always buffering one after received the first message
               final int credits = creditsToSend - 1;

               creditsToSend = 0;

               if (credits > 0) {
                  sendCredits(credits);
               }
            } else {
               if (logger.isDebugEnabled()) {
                  logger.debug("Sending " + messageBytes + " from flow-control");
               }

               final int credits = creditsToSend;

               creditsToSend = 0;

               if (credits > 0) {
                  sendCredits(credits);
               }
            }
         }
      }
   }


   //......
}   
  • Runner实现了Runnable接口,其run方法执行callOnMessage();该方法对于rateLimiter不为null会执行rateLimiter.limit();之后执行buffer.poll()获取ClientMessageInternal,若不为null,则执行flowControlBeforeConsumption(message),对于非expired的会执行theHandler.onMessage(message)方法;对于clientWindowSize为0的则执行startSlowConsumer();flowControlBeforeConsumption方法会执行flowControl方法,该方法会计算credits,然后执行sendCredits(credits)

小结

ClientConsumerImpl的handleRegularMessage方法先执行buffer.addTail(message, message.getPriority()),之后对于handler不为null的会执行queueExecutor(),否则执行notify();queueExecutor方法是通过sessionExecutor执行runner;Runner实现了Runnable接口,其run方法执行callOnMessage();该方法对于rateLimiter不为null会执行rateLimiter.limit();之后执行buffer.poll()获取ClientMessageInternal进行处理

doc

© 著作权归作者所有

go4it
粉丝 91
博文 1251
码字总数 1168439
作品 0
深圳
私信 提问
加载中

评论(0)

Apache Artemis 1.5.4 发布,嵌入式消息服务

Apache Artemis 提供了一个非堵塞架构,实现了超高性能的 Java 对象消息服务器。其核心只依赖一个 netty.jar 文件。该项目的目的是为你的 Java 应用提供一个可嵌入的消息服务。 Apache Artem...

王练
2017/04/01
785
0
使用apache-artemis搭建MQTT服务器

apache-artemis 搭建MQTT服务,windows下和Linux下操作方式基本一致 1、下载二进制包: 官网地址 https://activemq.apache.org/components/artemis/download/ linux 下载后缀为 .tar.gz 的包...

科陆李明
2019/04/08
323
0
Apache Artemis 1.0.0 发布,嵌入式消息服务

Apache Artemis 1.0.0 发布,是首个发布版本,现已提供下载:apache-artemis-1.0.0-bin.zip 更多内容,请查看软件主页。 去年,HornetQ 代码库捐献给 Apache ActiveMQ 社区,它现在成为 Acti...

sikkx
2015/06/03
4.6K
13
springcloud服务一直输出debug信息,跪求大佬解惑

用了artemis-http-client-1.2-SNAPSHOT,artemis-http-client-1.2-SNAPSHOT-CL后,启动服务,注册中心显示注册成功 ,服务本身一直输出debug信息...

想好好写代码的伪程序
2019/08/30
77
0
Apache Artemis 1.3.0 发布了

Apache Artemis 1.3.0 发布了!该版本的变动: 实现了 OpenWire 协议(ActiveMQ使用的核心协议) 支持所有 ActiveMQ 5.x 的 LDAP 模块 初步实现了 JDBC 库,支持 PostgreSQL, MySQL, 和 Derb...

daxiaoming
2016/07/09
2.6K
1

没有更多内容

加载失败,请刷新页面

加载更多

host machine and virtual machine communication between the three kinds of connection

1.桥接birdge模式 将虚拟机IP与物理机IP设在一个网段上,此时虚拟机相当于一台网络中与本地物理机公用一个HUB的独立设备。网络中其他机器与虚拟机、本地物理机与虚拟机都可以双向通信。虚拟机...

欣欣向荣666
13分钟前
16
0
Centos7安装gitblit

Gitblit介绍 Gitblit是一款开源工具,使用Java编写,用于管理、查看及服务于Git版本库。 Gitblit两种安装包 Gitblit GO:内部集成了Jetty服务器,不需要再集成其他容器,使用简单方便。(本文...

yhb890430
19分钟前
30
0
Ubuntu 安装 Source Code Pro 字体

1、解压字体 $ tar -zxvf source-code-pro-2.030R-ro-1.050R-it.tar.gz 2、解压字体 $ sudo cp -r source-code-pro-2.030R-ro-1.050R-it/TTF/ /usr/share/fonts/truetype/source-code-pro......

张小渔
21分钟前
53
0
mongo Authentication failed记录

虽然用的管理员账号,但是还是出现了以下的错误: 主要看后面的错误信息: { "ok" : 0.0, "errmsg" : "Authentication failed.", "code" : 18, "codeName" : "AuthenticationFailed" } 在想管......

woshixin
33分钟前
55
0
PHP+jPaginate插件制作无刷新分页实例

jPaginate是一款动感滚动分页插件,它的表现形式是像分页的按钮一样,有意思的是这些按钮却可以左右滚动,可以通过单击或鼠标滑向点两侧的小箭头来控制按钮的左右滚动。 读取第一页数据: <d...

ymkjs1990
37分钟前
71
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部