文档章节

聊聊artemis的SessionConsumerFlowCreditMessage

go4it
 go4it
发布于 01/16 23:14
字数 725
阅读 120
收藏 0

本文主要研究一下artemis的SessionConsumerFlowCreditMessage

SessionConsumerFlowCreditMessage

activemq-artemis-2.11.0/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionConsumerFlowCreditMessage.java

public class SessionConsumerFlowCreditMessage extends PacketImpl {

   private long consumerID;
   private int credits;

   public SessionConsumerFlowCreditMessage(final long consumerID, final int credits) {
      super(SESS_FLOWTOKEN);
      this.consumerID = consumerID;
      this.credits = credits;
   }

   public SessionConsumerFlowCreditMessage() {
      super(SESS_FLOWTOKEN);
   }

   // Public --------------------------------------------------------

   public long getConsumerID() {
      return consumerID;
   }

   public int getCredits() {
      return credits;
   }

   @Override
   public void encodeRest(final ActiveMQBuffer buffer) {
      buffer.writeLong(consumerID);
      buffer.writeInt(credits);
   }

   @Override
   public void decodeRest(final ActiveMQBuffer buffer) {
      consumerID = buffer.readLong();
      credits = buffer.readInt();
   }

   @Override
   public String toString() {
      return getParentString() + ", consumerID=" + consumerID + ", credits=" + credits + "]";
   }

   @Override
   public int hashCode() {
      final int prime = 31;
      int result = super.hashCode();
      result = prime * result + (int) (consumerID ^ (consumerID >>> 32));
      result = prime * result + credits;
      return result;
   }

   @Override
   public boolean equals(Object obj) {
      if (this == obj)
         return true;
      if (!super.equals(obj))
         return false;
      if (!(obj instanceof SessionConsumerFlowCreditMessage))
         return false;
      SessionConsumerFlowCreditMessage other = (SessionConsumerFlowCreditMessage) obj;
      if (consumerID != other.consumerID)
         return false;
      if (credits != other.credits)
         return false;
      return true;
   }
}
  • SessionConsumerFlowCreditMessage继承了PacketImpl,其type为SESS_FLOWTOKEN

ServerSessionPacketHandler

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java

public class ServerSessionPacketHandler implements ChannelHandler {

   //......

   private volatile AtomicInteger availableCredits = new AtomicInteger(0);

   //......

   private void onMessagePacket(final Packet packet) {
      if (logger.isTraceEnabled()) {
         logger.trace("ServerSessionPacketHandler::handlePacket," + packet);
      }
      final byte type = packet.getType();
      switch (type) {
         case SESS_SEND: {
            onSessionSend(packet);
            break;
         }
         case SESS_ACKNOWLEDGE: {
            onSessionAcknowledge(packet);
            break;
         }
         case SESS_PRODUCER_REQUEST_CREDITS: {
            onSessionRequestProducerCredits(packet);
            break;
         }
         case SESS_FLOWTOKEN: {
            onSessionConsumerFlowCredit(packet);
            break;
         }
         default:
            // separating a method for everything else as JIT was faster this way
            slowPacketHandler(packet);
            break;
      }
   }

   private void onSessionConsumerFlowCredit(Packet packet) {
      this.storageManager.setContext(session.getSessionContext());
      try {
         Packet response = null;
         boolean requiresResponse = false;
         try {
            SessionConsumerFlowCreditMessage message = (SessionConsumerFlowCreditMessage) packet;
            session.receiveConsumerCredits(message.getConsumerID(), message.getCredits());
         } catch (ActiveMQIOErrorException e) {
            response = onActiveMQIOErrorExceptionWhileHandlePacket(packet, e, requiresResponse, response, this.session);
         } catch (ActiveMQXAException e) {
            response = onActiveMQXAExceptionWhileHandlePacket(packet, e, requiresResponse, response);
         } catch (ActiveMQQueueMaxConsumerLimitReached e) {
            response = onActiveMQQueueMaxConsumerLimitReachedWhileHandlePacket(packet, e, requiresResponse, response);
         } catch (ActiveMQException e) {
            response = onActiveMQExceptionWhileHandlePacket(packet, e, requiresResponse, response);
         } catch (Throwable t) {
            response = onCatchThrowableWhileHandlePacket(packet, t, requiresResponse, response, this.session);
         }
         sendResponse(packet, response, false, false);
      } finally {
         this.storageManager.clearContext();
      }
   }

   //......
}
  • onMessagePacket方法在type为SESS_FLOWTOKEN时执行onSessionConsumerFlowCredit方法;该方法执行的是session.receiveConsumerCredits以及sendResponse方法

ServerSessionImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java

public class ServerSessionImpl implements ServerSession, FailureListener {

   //......

   public void receiveConsumerCredits(final long consumerID, final int credits) throws Exception {
      ServerConsumer consumer = locateConsumer(consumerID);

      if (consumer == null) {
         logger.debug("There is no consumer with id " + consumerID);

         return;
      }

      consumer.receiveCredits(credits);
   }

   //......
}
  • receiveConsumerCredits方法执行的是consumer.receiveCredits方法

ServerConsumerImpl

activemq-artemis-2.11.0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java

public class ServerConsumerImpl implements ServerConsumer, ReadyListener {

   //......

   public void receiveCredits(final int credits) {
      if (credits == -1) {
         if (logger.isDebugEnabled()) {
            logger.debug(this + ":: FlowControl::Received disable flow control message");
         }
         // No flow control
         availableCredits = null;

         // There may be messages already in the queue
         promptDelivery();
      } else if (credits == 0) {
         // reset, used on slow consumers
         logger.debug(this + ":: FlowControl::Received reset flow control message");
         availableCredits.set(0);
      } else {
         int previous = availableCredits.getAndAdd(credits);

         if (logger.isDebugEnabled()) {
            logger.debug(this + "::FlowControl::Received " +
                            credits +
                            " credits, previous value = " +
                            previous +
                            " currentValue = " +
                            availableCredits.get());
         }

         if (previous <= 0 && previous + credits > 0) {
            if (logger.isTraceEnabled()) {
               logger.trace(this + "::calling promptDelivery from receiving credits");
            }
            promptDelivery();
         }
      }
   }

   public void promptDelivery() {
      // largeMessageDeliverer is always set inside a lock
      // if we don't acquire a lock, we will have NPE eventually
      if (largeMessageDeliverer != null) {
         resumeLargeMessage();
      } else {
         forceDelivery();
      }
   }

   private void forceDelivery() {
      if (browseOnly) {
         messageQueue.getExecutor().execute(browserDeliverer);
      } else {
         messageQueue.deliverAsync();
      }
   }

   //......
}
  • receiveCredits方法在credits为-1时设置availableCredits为null,然后执行promptDelivery方法;在credits为0时设置availableCredits为0;其他情况执行availableCredits.getAndAdd(credits);promptDelivery方法主要是执行resumeLargeMessage或者forceDelivery方法

小结

SessionConsumerFlowCreditMessage继承了PacketImpl,其type为SESS_FLOWTOKEN;ServerSessionPacketHandler的onMessagePacket方法在type为SESS_FLOWTOKEN时执行onSessionConsumerFlowCredit方法;该方法执行的是session.receiveConsumerCredits以及sendResponse方法;receiveConsumerCredits方法在receiveCredits方法在credits为-1时设置availableCredits为null,然后执行promptDelivery方法;在credits为0时设置availableCredits为0;其他情况执行availableCredits.getAndAdd(credits)

doc

© 著作权归作者所有

go4it
粉丝 93
博文 1293
码字总数 1207997
作品 0
深圳
私信 提问
加载中

评论(0)

Apache Artemis 1.5.4 发布,嵌入式消息服务

Apache Artemis 提供了一个非堵塞架构,实现了超高性能的 Java 对象消息服务器。其核心只依赖一个 netty.jar 文件。该项目的目的是为你的 Java 应用提供一个可嵌入的消息服务。 Apache Artem...

王练
2017/04/01
787
0
Apache Artemis 1.0.0 发布,嵌入式消息服务

Apache Artemis 1.0.0 发布,是首个发布版本,现已提供下载:apache-artemis-1.0.0-bin.zip 更多内容,请查看软件主页。 去年,HornetQ 代码库捐献给 Apache ActiveMQ 社区,它现在成为 Acti...

sikkx
2015/06/03
4.6K
13
使用apache-artemis搭建MQTT服务器

apache-artemis 搭建MQTT服务,windows下和Linux下操作方式基本一致 1、下载二进制包: 官网地址 https://activemq.apache.org/components/artemis/download/ linux 下载后缀为 .tar.gz 的包...

科陆李明
2019/04/08
406
0
springcloud服务一直输出debug信息,跪求大佬解惑

用了artemis-http-client-1.2-SNAPSHOT,artemis-http-client-1.2-SNAPSHOT-CL后,启动服务,注册中心显示注册成功 ,服务本身一直输出debug信息...

想好好写代码的伪程序
2019/08/30
96
0
已有12000多人申请成为NASA宇航员 希望执行月球和火星任务

据外媒报道,美国宇航局(NASA)最近开展的公开招募“阿尔忒弥斯一代”(Artemis Generation)宇航员活动已经吸引12000多人报名参加。被选为该计划的人员将参与Artemis飞往月球及其他星球的任...

稿源:
前天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Ubuntu 18.04 的网络配置

netplan简介 目前,ubuntu18.04上使用了netplan 作为网络配置工具;在终端上配置网络参数跟之前的版本有比较大的差别 Netplan工作流程如下图所示;通过读取 /etc/netplan/*.yaml 下的配置文件...

osc_10loka5t
1分钟前
0
0
Docker底层网络经典文章分享

说明 关于 docker 底层网络的原理介绍,网上有很多的博客等资源,下面分享些经典实例文章, 望大家共同进步~ 分享 分享一 理解Docker单机容器网络 分享二 理解Docker跨多主机容器网络 大佬的...

osc_4myehtgl
2分钟前
0
0
VMWare Workstation上安装CentOS 8.1/RHEL 8.1 Linux实战系列

Linux系统运维实战系列 CentOS 8/RHEL 8 Linux系统实战系列原创持续更新中。。。。。。 请关注我的博客: grand.blog.51cto.com 1.下载CentOS8.1镜像: 实验虚拟机软件和CentOS8.1 ISO系统镜像...

osc_bskubcvl
3分钟前
0
0
在 Linux 上安装 Adobe Flash Player

1、访问flash官网,点击下载,选择你的操作系统和flash版本 2、下载后,解压下载的压缩包 tar -zx -f install_flash_player_11_linux.x86_64.tar.gz #解压下载好的压缩包 3、安装火狐浏览...

osc_xs2d5ls9
4分钟前
0
0
在 Linux 上安装 Adobe Flash Player

1、访问flash官网,点击下载,选择你的操作系统和flash版本 2、下载后,解压下载的压缩包 tar -zx -f install_flash_player_11_linux.x86_64.tar.gz #解压下载好的压缩包 3、安装火狐浏览...

osc_3iv3c4fo
6分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部