文档章节

Zipkin-1.19.0学习系列16:关于数据上报

强子大叔的码田
 强子大叔的码田
发布于 2017/01/13 17:29
字数 480
阅读 208
收藏 1

之前,我们大致弄明白了数据采集的原理,现在来解决数据上报的问题。

----------------------------------------------------------------------------------

先上图

这里,我们主要讲解HTTP协议。首先生成一个默认的builder

 public static Builder builder() {
      return new AutoValue_HttpSpanCollector_Config.Builder()
          .connectTimeout(10 * 1000)
          .readTimeout(60 * 1000)
          .compressionEnabled(false)
          .flushInterval(1);
    }

后台会启动一个刷新线程,只有1个线程

Flusher(Flushable flushable, int flushInterval, final String threadPoolName) {
      this.flushable = flushable;
      this.scheduler = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
        @Override
        public Thread newThread(final Runnable r) {
          return new Thread(r, threadPoolName);
        }
      });
      this.scheduler.scheduleWithFixedDelay(this, 0, flushInterval, SECONDS);
    }

    @Override
    public void run() {
      try {
        flushable.flush();
      } catch (IOException ignored) {
      }
    }

上报的url是

Step completed: "thread=http-nio-8080-exec-1", com.github.kristofa.brave.http.HttpSpanCollector.<init>(), line=92 bci=12
92        this.url = baseUrl + (baseUrl.endsWith("/") ? "" : "/") + "api/v1/spans";

===

拿到span后,抛给HttpSpanCollector.

/**
   * Queues the span for collection, or drops it if the queue is full.
   *
   * @param span Span, should not be <code>null</code>.
   */
  @Override
  public void collect(Span span) {
    metrics.incrementAcceptedSpans(1);
    if (!pending.offer(span)) {
      metrics.incrementDroppedSpans(1);
    }
  }

pending.offer

这里用了offer, offer方法在添加元素时,如果发现队列已满无法添加的话,会直接返回false。

就会存在丢数据的可能性。

===再看我们的那个定时任务。

/**
   * Calling this will flush any pending spans to the transport on the current thread.
   */
  @Override
  public void flush() {
    if (pending.isEmpty()) return;
    List<Span> drained = new ArrayList<Span>(pending.size());
    pending.drainTo(drained);
    if (drained.isEmpty()) return;

    int spanCount = drained.size();
    try {
      reportSpans(drained);
    } catch (IOException e) {
      metrics.incrementDroppedSpans(spanCount);
    } catch (RuntimeException e) {
      metrics.incrementDroppedSpans(spanCount);
    }
  }

这里比较简单,最后看

 @Override
  protected void reportSpans(List<Span> drained) throws IOException {
    byte[] encoded = codec.writeSpans(drained);
    sendSpans(encoded);
  }

就是先把所有的span构造成json字符串,然后调用sendSpan发送

@Override
  protected void sendSpans(byte[] json) throws IOException {
    // intentionally not closing the connection, so as to use keep-alives
    HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection();
    connection.setConnectTimeout(config.connectTimeout());
    connection.setReadTimeout(config.readTimeout());
    connection.setRequestMethod("POST");
    connection.addRequestProperty("Content-Type", "application/json");
    if (config.compressionEnabled()) {
      connection.addRequestProperty("Content-Encoding", "gzip");
      ByteArrayOutputStream gzipped = new ByteArrayOutputStream();
      try (GZIPOutputStream compressor = new GZIPOutputStream(gzipped)) {
        compressor.write(json);
      }
      json = gzipped.toByteArray();
    }
    connection.setDoOutput(true);
    connection.setFixedLengthStreamingMode(json.length);
    connection.getOutputStream().write(json);

    try (InputStream in = connection.getInputStream()) {
      while (in.read() != -1) ; // skip
    } catch (IOException e) {
      try (InputStream err = connection.getErrorStream()) {
        if (err != null) { // possible, if the connection was dropped
          while (err.read() != -1) ; // skip
        }
      }
      throw e;
    }
  }

这样就发送成功了,

PS:

99        HttpURLConnection connection = (HttpURLConnection) new URL(url).openConnection();

HttpSpanCollector[1] print url
 url = "http://1.2.3.4:9411/api/v1/spans"
HttpSpanCollector[1] 

 

© 著作权归作者所有

强子大叔的码田

强子大叔的码田

粉丝 916
博文 1444
码字总数 1227067
作品 9
南京
架构师
私信 提问
强子哥哥/MyEye

#MyEye 公司内部接入了64个产品,每天写入HBase数据量(100G/天).水平扩展就可以支持日TB级数据量。 上线12个月非常稳定! #官方QQ群: 120734278 技术选型如下:(其中MyThrift请参考本人的另外...

强子哥哥
2016/12/24
0
0
SOFATracer 数据上报机制和源码分析|剖析

SOFA Scalable Open Financial Architecture 是蚂蚁金服自主研发的金融级分布式中间件,包含了构建金融级云原生架构所需的各个组件,是在金融场景里锤炼出来的最佳实践。 SOFATracer 是一个用...

s潘潘
01/10
0
0
ZipKin原理学习--ZipKin入门介绍

ZipKin入门介绍 Zipkin是一款开源的分布式实时数据追踪系统(Distributed Tracking System),基于 Google Dapper的论文设计而来,由 Twitter 公司开发贡献。其主要功能是聚集来自各个异构系...

qq924862077
2018/05/12
0
0
ZipKin原理学习(分布式实时数据追踪系统)+window部署zipkin+k8s部署zipkin

版权声明:本文为博主原创文章,转载请注明来源。开发合作联系luanpenguestc@sina.com https://blog.csdn.net/luanpeng825485697/article/details/85772954 ZipKin入门介绍 Zipkin是一款开源...

数据架构师
01/08
0
0
应用透明链路追踪工具 - Molten

Molten 是应用透明链路追踪工具。 Molten 追踪php核心调用库运行时信息并且按照zipkin/optracing格式输出信息。 Molten 提供多种sapi, 多种采样类型, 上报追踪状态, 模块控制和多种数据落地 ...

匿名
01/10
3.6K
2

没有更多内容

加载失败,请刷新页面

加载更多

vue中eventBus的使用

使用场景: 1、兄弟组件的通信,父子组件的通信 2、不同路由的通信 针对兄弟组件的通信,父子组件的通信 新建bus.js文件 import Vue from 'vue' var bus = new Vue() export default bus 在需...

tianyawhl
17分钟前
4
0
C# DBHelper

using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Data;using System.Data.SqlClient;using System.Windows.Forms;namesp......

vga
20分钟前
3
0
Spring Boot中@ConditionalOnProperty使用详解

在Spring Boot的自动配置中经常看到@ConditionalOnProperty注解的使用,本篇文章带大家来了解一下该注解的功能。 Spring Boot中的使用 在Spring Boot的源码中,比如涉及到Http编码的自动配置...

程序新视界
21分钟前
6
0
centos7下安装mysql(完整配置)

https://blog.csdn.net/baidu_32872293/article/details/80557668

为何不可1995
23分钟前
3
0
如何从零到一设计一个MQ消息队列

消息队列整体设计思路 主要是设计一个整体的消息被消费的数据流。 这里会涉及到:消息生产Producer、Broker(消息服务端)、消息消费者Consumer。 1.Producer(消息生产者):发送消息到Broker。...

一只会编程的狼
30分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部