文档章节

监控kafka Streaming 队列中的Active Batch

ktlb
 ktlb
发布于 2017/05/09 16:19
字数 72
阅读 59
收藏 0
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5000L));
new Thread(() -> {
    while (true) {
        try {
            Thread.sleep(3000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Seq<BatchUIData> batches = jssc.ssc().progressListener().waitingBatches();
        scala.collection.immutable.List<BatchUIData> list = batches.toList();
        scala.collection.Iterator<BatchUIData> it = list.iterator();
        System.out.println("active batch size---->" + list.size());
        while (it.hasNext()) {
            BatchUIData data = it.next();
            Long eventCount = data.numRecords();
            String batchTime = UIUtils.formatBatchTime(data.batchTime().milliseconds(), property.getInteger("spark.kafka.duration"), true, null);
            System.out.println("batchTime---->" + batchTime);
            System.out.println("eventCount---->" + eventCount);
        }
    }
}).start();

© 著作权归作者所有

ktlb
粉丝 5
博文 14
码字总数 5517
作品 0
南京
程序员
私信 提问
Structured streaming+kafka集成样例

关于structured streaming, spark社区已经有很多文章介绍,个人认为其中最大的特点是将流视作没有边界的大表,从而能够使用sql来操作这张表,其中包括使用sql join(截止Spark2.1.1,目前只支...

biggeng
2017/07/05
0
0
sparkStreaming基本概念

概述 Spark Streaming 是 Spark Core API 的扩展, 它支持弹性的, 高吞吐的, 容错的实时数据流的处理. 数据可以通过多种数据源获取, 例如 Kafka, Flume, Kinesis 以及 TCP sockets, 也可以通过...

freeli
2018/11/20
248
0
Spark踩坑记——Spark Streaming+Kafka

转载自:https://www.cnblogs.com/xlturing/p/6246538.html 前言 在WeTest舆情项目中,需要对每天千万级的游戏评论信息进行词频统计,在生产者一端,我们将数据按照每天的拉取时间存入了Kaf...

weixin_37589896
2017/11/29
0
0
SparkStreaming VS Structed Streaming

导言 Spark在2.*版本后加入StructedStreaming模块,与流处理引擎Sparkstreaming一样,用于处理流数据。但二者又有许多不同之处。 Sparkstreaming首次引入在0.*版本,其核心思想是利用spark批...

WestC
2018/07/09
0
0
Spark Streaming的优化之路——从Receiver到Direct模式

     作者:个推数据研发工程师 学长 1 业务背景 随着大数据的快速发展,业务场景越来越复杂,离线式的批处理框架MapReduce已经不能满足业务,大量的场景需要实时的数据处理结果来进行分...

个推
06/16
14
0

没有更多内容

加载失败,请刷新页面

加载更多

Spring Cloud 笔记之Spring cloud config client

观察者模式它的数据的变化是被动的。 观察者模式在java中的实现: package com.hxq.springcloud.springcloudconfigclient;import org.springframework.context.ApplicationListener;i...

xiaoxiao_go
今天
4
0
CentOS7.6中安装使用fcitx框架

内容目录 一、为什么要使用fcitx?二、安装fcitx框架三、安装搜狗输入法 一、为什么要使用fcitx? Gnome3桌面自带的输入法框架为ibus,而在使用ibus时会时不时出现卡顿无法输入的现象。 搜狗和...

技术训练营
今天
4
0
《Designing.Data-Intensive.Applications》笔记 四

第九章 一致性与共识 分布式系统最重要的的抽象之一是共识(consensus):让所有的节点对某件事达成一致。 最终一致性(eventual consistency)只提供较弱的保证,需要探索更高的一致性保证(stro...

丰田破产标志
今天
7
0
docker 使用mysql

1, 进入容器 比如 myslq1 里面进行操作 docker exec -it mysql1 /bin/bash 2. 退出 容器 交互: exit 3. mysql 启动在容器里面,并且 可以本地连接mysql docker run --name mysql1 --env MY...

之渊
今天
7
0
python数据结构

1、字符串及其方法(案例来自Python-100-Days) def main(): str1 = 'hello, world!' # 通过len函数计算字符串的长度 print(len(str1)) # 13 # 获得字符串首字母大写的...

huijue
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部