文档章节

zipkin源码 4.reporter

markeloff
 markeloff
发布于 2016/11/24 21:21
字数 825
阅读 66
收藏 0

首先看怎么构建一个Reporter:

@Bean
public Reporter<Span> reporter(){
    Reporter<Span> reporter = AsyncReporter.builder(
            URLConnectionSender.create("http://localhost:9411/api/v1/spans"))
            .build();
    return reporter;
}

上面代码创建了一个http类型reporter,返回一个BoundedAsyncReporter类型的reporter

下面是URLConnectionSender.create方法,主要设置编码类型、连接读取超时设置、是否压缩以及最大字节数设置。

public abstract class URLConnectionSender implements Sender {
  /** Creates a sender that posts {@link Encoding#THRIFT} messages. */
  public static URLConnectionSender create(String endpoint) {
    return builder().endpoint(endpoint).build();
  }

  public static Builder builder() {
    return new AutoValue_URLConnectionSender.Builder()
        .encoding(Encoding.THRIFT)
        .connectTimeout(10 * 1000)
        .readTimeout(60 * 1000)
        .compressionEnabled(true)
        .messageMaxBytes(5 * 1024 * 1024);
  }

接下来分析下AsyncReporter。

首先看下AsyncReporter的创建者build的build方法:

public AsyncReporter<Span> build() {
  switch (sender.encoding()) {
    case JSON:
      return build(Encoder.JSON);
    case THRIFT:
      return build(Encoder.THRIFT);
    default:
      throw new UnsupportedOperationException(sender.encoding().name());
  }
}

//接下来会执行build(Encoder.THRIFT)方法:
public <S> AsyncReporter<S> build(Encoder<S> encoder) {
  checkNotNull(encoder, "encoder");
  
  //检查encode是否一样,不一样ao抛出异常
  checkArgument(encoder.encoding() == sender.encoding(),
      "Encoder.encoding() %s != Sender.encoding() %s",
      encoder.encoding(), sender.encoding());
  
  //创建有限制的AsyncReporter,设置了上传消息的最大字节数、消息的超时时间
  //以及初始化ByteBoundedQueue并设置最大长度以及最大字节数(默认jvm总内存的1%)
  final BoundedAsyncReporter<S> result = new BoundedAsyncReporter<>(this, encoder);

  if (messageTimeoutNanos > 0) { // Start a thread that flushes the queue in a loop.
    final BufferNextMessage consumer =
        new BufferNextMessage(sender, messageMaxBytes, messageTimeoutNanos);
    
    //启动一个线程处理队列中需要上报的消息    
    final Thread flushThread = new Thread(() -> {
      try {
        while (!result.closed.get()) {
          //上报消息
          result.flush(consumer);
        }
      } finally {
        for (byte[] next : consumer.drain()) result.pending.offer(next);
        result.close.countDown();
      }
    }, "AsyncReporter(" + sender + ")");
    flushThread.setDaemon(true);
    flushThread.start();
  }
  return result;
}
}

下面分析result.flush(consumer)方法:

void flush(BufferNextMessage bundler) {
  if (closed.get()) throw new IllegalStateException("closed");
  //该方法阻塞直到有数据转移到consumer中,即bundler
  //bundler.remainingNanos()获取如果cout为0,即队列没有数据的时候需要阻塞多久直到有数据
  //下面主要看下ByteBoundedQueue的doDrain(Consumer consumer)方法 -》
  pending.drainTo(bundler, bundler.remainingNanos());

  // record after flushing reduces the amount of gauge events vs on doing this on report
  metrics.updateQueuedSpans(pending.count);
  metrics.updateQueuedBytes(pending.sizeInBytes);

  if (!bundler.isReady()) return; // try to fill up the bundle

  // Signal that we are about to send a message of a known size in bytes
  metrics.incrementMessages();
  metrics.incrementMessageBytes(bundler.sizeInBytes());
  //获取bundler中的数据集合
  List<byte[]> nextMessage = bundler.drain();

  // In failure case, we increment messages and spans dropped.
  Callback failureCallback = sendSpansCallback(nextMessage.size());
  try {
    //对spans编码并发送到collect
    sender.sendSpans(nextMessage, failureCallback);
  } catch (RuntimeException e) {
    failureCallback.onError(e);
    // Raise in case the sender was closed out-of-band.
    if (e instanceof IllegalStateException) throw e;
  }
}

//ByteBoundedQueue的doDrain
int doDrain(Consumer consumer) {
    int drainedCount = 0;
    int drainedSizeInBytes = 0;
    while (drainedCount < count) {
      //读取需要处理的数据
      byte[] next = elements[readPos];

      if (next == null) break;
      //判断consumer能否接受数据,能的话存入consumer的list中
      if (consumer.accept(next)) {
        drainedCount++;
        drainedSizeInBytes += next.length;

        elements[readPos] = null;
        if (++readPos == elements.length) readPos = 0; // circle back to the front of the array
      } else {
        break;
      }
    }
    //更新count跟sizeInBytes
    count -= drainedCount;
    sizeInBytes -= drainedSizeInBytes;
    return drainedCount;
  }

接下来分析BoundedAsyncReporter的report如果上报span相关数据到queue中:

@Override
public void report(S span) {
  checkNotNull(span, "span");
  metrics.incrementSpans(1);
  //对span进行编码,这里使用thrift
  byte[] next = encoder.encode(span);
  //计算span的字节数
  int messageSizeOfNextSpan = sender.messageSizeInBytes(Collections.singletonList(next));
  metrics.incrementSpanBytes(next.length);
  //判断reporter是否关闭,以及span的字节数超过最大限制,没有的话则进行pending.offer操作
  if (closed.get() ||
      // don't enqueue something larger than we can drain
      messageSizeOfNextSpan > messageMaxBytes ||
      !pending.offer(next)) {
    metrics.incrementSpansDropped(1);
  }
}
//ByteBoundedQueue的offer方法
boolean offer(byte[] next) {
    lock.lock();
    try {
      //queue的长度已经满了
      if (count == elements.length) return false;
      //queue的字节长度加上将要offer的字节数超过了最大的字节数
      if (sizeInBytes + next.length > maxBytes) return false;
      //保存span数据
      elements[writePos++] = next;
      
      if (writePos == elements.length) writePos = 0; // circle back to the front of the array

      count++;
      sizeInBytes += next.length;
      //唤醒消费线程
      available.signal(); // alert any drainers
      return true;
    } finally {
      lock.unlock();
    }
  }

© 著作权归作者所有

markeloff
粉丝 5
博文 18
码字总数 23124
作品 0
南京
高级程序员
私信 提问
zipkin源码目录

client 1.zipkin源码 1.简介 2.zipkin源码 2.zipkin client brave-springmvc 3.zipkin源码 3.zipkin client brave-okhttpclient 4.zipkin源码 4.reporter 5.zipkin源码 5.thrift编码分析 待续......

markeloff
2016/11/24
144
0
[学习微服务-第5天] ServiceComb+Zipkin源码解读

SeviceComb + Zipkin 简介 ServiceComb 是Apache的微服务顶级项目,在微服务框架中,微服务之间通过网络进行通信,我们必须处理所有与网络相关的问题,例如延迟,超时和分区。随着部署的微服...

业界首个Apache微服务顶级项目
02/12
25
0
SpringCloud(Finchley版)7 - Zipkin 服务追踪分析

一, 简介 Spring Cloud Sleuth 集成 zipkin 组件 Spring Cloud Sleuth 主要功能就是在分布式系统中提供追踪解决方案,并且兼容支持了 zipkin,你只需要在pom文件中引入相应的依赖即可. 二, 下...

ge洋
01/11
162
0
Spring Cloud微服务之 sleuth+zipkin日志聚合

1.简介 (1)什么是服务追踪 Sleuth 在微服务架构中,要完成一个功能,通过Rest请求服务API调用服务来完成,整个调用过程可能会聚合多个后台服务器协同完成。在整个链路上,任何一处调用超时...

编程SHA
03/25
54
0
学习微服务的服务链路追踪——Spring Cloud Sleuth+zipkin

spring cloud sleuth提供了服务链路追踪,并兼容了zipkin,Zipkin是一个链路跟踪工具,可以用来监控微服务集群中调用链路的通畅情况。 1.本来想新建一个有关zipkin-server的自定义zipkin服务器...

啊哈关关
2018/08/14
917
0

没有更多内容

加载失败,请刷新页面

加载更多

无回路有向图的拓扑排序

因公司业务需要,在表单中每个字段都会配置自动计算,但自动计算公式中会引用到其他字段中的值。所以希望可以根据计算公式,优先计算引用的公式。所以最终使用了无回路有向图的扩扑排序来实现...

兜兜毛毛
今天
6
0
如何抢占云栖大会C位?史上最强强强攻略来了

点击观看视频: APSARA云栖大会开发者情怀 原文链接 本文为云栖社区原创内容,未经允许不得转载。

阿里云官方博客
今天
6
0
Kubernetes 从懵圈到熟练:集群服务的三个要点和一种实现

作者 | 声东 阿里云售后技术专家<br /> 文章来源:Docker,点击查看原文。 <br />以我的经验来讲,理解 Kubernetes 集群服务的概念,是比较不容易的一件事情。尤其是当我们基于似是而非的理解...

阿里巴巴云原生
今天
9
0
PHP7.3的新特性

2018年12月6日,PHP7.3正式版发布,在PHP7.2基础上进行了大量错误修复和安全优化,性能提升10%! 从目前的更新说明来看,PHP 7.3 并不是一个主打新特性的版本,包含更多的是 bug 修复。PHP 7...

迅睿CMS-PHP开源CMS程序
今天
8
0
Tomcat 应用中并行流带来的类加载问题

本文首发于 vivo互联网技术 微信公众号 链接:https://mp.weixin.qq.com/s/f-X3n9cvDyU5f5NYH6mhxQ 作者:肖铭轩、王道环 随着 Java8 的不断流行,越来越多的开发人员使用并行流(parallel)...

vivo互联网技术
今天
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部