zipkin源码 4.reporter

原创
2016/11/24 21:21
阅读数 143

首先看怎么构建一个Reporter:

@Bean
public Reporter<Span> reporter(){
    Reporter<Span> reporter = AsyncReporter.builder(
            URLConnectionSender.create("http://localhost:9411/api/v1/spans"))
            .build();
    return reporter;
}

上面代码创建了一个http类型reporter,返回一个BoundedAsyncReporter类型的reporter

下面是URLConnectionSender.create方法,主要设置编码类型、连接读取超时设置、是否压缩以及最大字节数设置。

public abstract class URLConnectionSender implements Sender {
  /** Creates a sender that posts {@link Encoding#THRIFT} messages. */
  public static URLConnectionSender create(String endpoint) {
    return builder().endpoint(endpoint).build();
  }

  public static Builder builder() {
    return new AutoValue_URLConnectionSender.Builder()
        .encoding(Encoding.THRIFT)
        .connectTimeout(10 * 1000)
        .readTimeout(60 * 1000)
        .compressionEnabled(true)
        .messageMaxBytes(5 * 1024 * 1024);
  }

接下来分析下AsyncReporter。

首先看下AsyncReporter的创建者build的build方法:

public AsyncReporter<Span> build() {
  switch (sender.encoding()) {
    case JSON:
      return build(Encoder.JSON);
    case THRIFT:
      return build(Encoder.THRIFT);
    default:
      throw new UnsupportedOperationException(sender.encoding().name());
  }
}

//接下来会执行build(Encoder.THRIFT)方法:
public <S> AsyncReporter<S> build(Encoder<S> encoder) {
  checkNotNull(encoder, "encoder");
  
  //检查encode是否一样,不一样ao抛出异常
  checkArgument(encoder.encoding() == sender.encoding(),
      "Encoder.encoding() %s != Sender.encoding() %s",
      encoder.encoding(), sender.encoding());
  
  //创建有限制的AsyncReporter,设置了上传消息的最大字节数、消息的超时时间
  //以及初始化ByteBoundedQueue并设置最大长度以及最大字节数(默认jvm总内存的1%)
  final BoundedAsyncReporter<S> result = new BoundedAsyncReporter<>(this, encoder);

  if (messageTimeoutNanos > 0) { // Start a thread that flushes the queue in a loop.
    final BufferNextMessage consumer =
        new BufferNextMessage(sender, messageMaxBytes, messageTimeoutNanos);
    
    //启动一个线程处理队列中需要上报的消息    
    final Thread flushThread = new Thread(() -> {
      try {
        while (!result.closed.get()) {
          //上报消息
          result.flush(consumer);
        }
      } finally {
        for (byte[] next : consumer.drain()) result.pending.offer(next);
        result.close.countDown();
      }
    }, "AsyncReporter(" + sender + ")");
    flushThread.setDaemon(true);
    flushThread.start();
  }
  return result;
}
}

下面分析result.flush(consumer)方法:

void flush(BufferNextMessage bundler) {
  if (closed.get()) throw new IllegalStateException("closed");
  //该方法阻塞直到有数据转移到consumer中,即bundler
  //bundler.remainingNanos()获取如果cout为0,即队列没有数据的时候需要阻塞多久直到有数据
  //下面主要看下ByteBoundedQueue的doDrain(Consumer consumer)方法 -》
  pending.drainTo(bundler, bundler.remainingNanos());

  // record after flushing reduces the amount of gauge events vs on doing this on report
  metrics.updateQueuedSpans(pending.count);
  metrics.updateQueuedBytes(pending.sizeInBytes);

  if (!bundler.isReady()) return; // try to fill up the bundle

  // Signal that we are about to send a message of a known size in bytes
  metrics.incrementMessages();
  metrics.incrementMessageBytes(bundler.sizeInBytes());
  //获取bundler中的数据集合
  List<byte[]> nextMessage = bundler.drain();

  // In failure case, we increment messages and spans dropped.
  Callback failureCallback = sendSpansCallback(nextMessage.size());
  try {
    //对spans编码并发送到collect
    sender.sendSpans(nextMessage, failureCallback);
  } catch (RuntimeException e) {
    failureCallback.onError(e);
    // Raise in case the sender was closed out-of-band.
    if (e instanceof IllegalStateException) throw e;
  }
}

//ByteBoundedQueue的doDrain
int doDrain(Consumer consumer) {
    int drainedCount = 0;
    int drainedSizeInBytes = 0;
    while (drainedCount < count) {
      //读取需要处理的数据
      byte[] next = elements[readPos];

      if (next == null) break;
      //判断consumer能否接受数据,能的话存入consumer的list中
      if (consumer.accept(next)) {
        drainedCount++;
        drainedSizeInBytes += next.length;

        elements[readPos] = null;
        if (++readPos == elements.length) readPos = 0; // circle back to the front of the array
      } else {
        break;
      }
    }
    //更新count跟sizeInBytes
    count -= drainedCount;
    sizeInBytes -= drainedSizeInBytes;
    return drainedCount;
  }

接下来分析BoundedAsyncReporter的report如果上报span相关数据到queue中:

@Override
public void report(S span) {
  checkNotNull(span, "span");
  metrics.incrementSpans(1);
  //对span进行编码,这里使用thrift
  byte[] next = encoder.encode(span);
  //计算span的字节数
  int messageSizeOfNextSpan = sender.messageSizeInBytes(Collections.singletonList(next));
  metrics.incrementSpanBytes(next.length);
  //判断reporter是否关闭,以及span的字节数超过最大限制,没有的话则进行pending.offer操作
  if (closed.get() ||
      // don't enqueue something larger than we can drain
      messageSizeOfNextSpan > messageMaxBytes ||
      !pending.offer(next)) {
    metrics.incrementSpansDropped(1);
  }
}
//ByteBoundedQueue的offer方法
boolean offer(byte[] next) {
    lock.lock();
    try {
      //queue的长度已经满了
      if (count == elements.length) return false;
      //queue的字节长度加上将要offer的字节数超过了最大的字节数
      if (sizeInBytes + next.length > maxBytes) return false;
      //保存span数据
      elements[writePos++] = next;
      
      if (writePos == elements.length) writePos = 0; // circle back to the front of the array

      count++;
      sizeInBytes += next.length;
      //唤醒消费线程
      available.signal(); // alert any drainers
      return true;
    } finally {
      lock.unlock();
    }
  }
展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
在线直播报名
返回顶部
顶部