文档章节

如何优雅的关闭kafka spark streaming应用

ktlb
 ktlb
发布于 2017/05/11 16:52
字数 279
阅读 214
收藏 2

引入问题

    1.在spark streaming 中,如果消息抓取速度大于消费速度,这时候队列会有积压,如果这时候关闭了spark App,会导致队列中数据丢失

    2.spark.streaming.stopGracefullyOnShutdown   这个参数在单机环境上有用,但是在yarn 集群上仍然没有用,即使yarn kill 是发送kill -15 信号量

 

1.初步思路

    通过监控共享变量的形式, 由应用自身,去调用 JavaStreamingContext.stop(true,true),

    使用自己编写的特定的脚本, 启动和关闭spark APP

2.共享变量的选择

    a.redis

    b.zookeeper

    c.hdfs

3.具体实施

    这里采用hdfs

monitor(groupId, path,()-> JavaStreamingContext.stop(true,true)).start();
private static Thread monitor(String groupId, String dirPath,Runnable task) {
    Thread t = new Thread(() -> {
        String path;
        if (dirPath.charAt(dirPath.length() - 1) == '/') {
            path = dirPath + groupId;
        } else {
            path = dirPath + "/" + groupId;
        }
        while (true) {
            String cmd = "hdfs dfs -ls " + path;
            try {
                Thread.sleep(5000L);
                Process process = Runtime.getRuntime().exec(cmd);
                InputStream in = process.getInputStream();
                BufferedReader br = new BufferedReader(new InputStreamReader(in, "utf-8"));
                String line = br.readLine();
                if (line == null || !line.contains(path)) {
                    logger.warn("flag file not found |stop Spark App by monitor thread !", line);
                    task.run();
                }
            } catch (InterruptedException e) {
                logger.error("monitor thread has been interrupt | {}", e.getMessage());
            } catch (Exception e) {
                logger.error("get flag file failed from hdfs | {}", cmd);
            }
        }
    }, groupId + "monitor_thread");
    t.setDaemon(true);
    return t;
}

 

© 著作权归作者所有

ktlb
粉丝 5
博文 14
码字总数 5517
作品 0
南京
程序员
私信 提问
整合Kafka到Spark Streaming——代码示例和挑战

作者Michael G. Noll是瑞士的一位工程师和研究员,效力于Verisign,是Verisign实验室的大规模数据分析基础设施(基础Hadoop)的技术主管。本文,Michael详细的演示了如何将Kafka整合到Spark...

stark_summer
2015/03/03
781
0
[Spark]Spark Streaming 指南一 Example

1. 概述 Spark streaming是Spark核心API的一个扩展,它对实时流式数据的处理具有可扩展性、高吞吐量、可容错性等特点。数据可以从诸如Kafka,Flume,Kinesis或TCP套接字等许多源中提取,并且...

sjf0115
2017/03/01
0
0
干货 | Spark Streaming 和 Flink 详细对比

本文从编程模型、任务调度、时间机制、Kafka 动态分区的感知、容错及处理语义、背压等几个方面对比 Spark Stream 与 Flink,希望对有实时处理需求业务的企业端用户在框架选型有所启发。本文篇...

xiaomin0322
05/29
40
0
[Spark]Spark Streaming 指南四 输入DStreams和Receivers

1. 输入DStream与Receiver 输入DStreams表示从源中获取输入数据流的DStreams。在指南一示例中,lines表示输入DStream,它代表从netcat服务器获取的数据流。每一个输入DStream(除 file strea...

sjf0115
2017/03/02
0
0
Spark实战(一)SparkStreaming集成Kafka

Spark Streaming + Kafka集成指南 Kafka项目在版本0.8和0.10之间引入了一个新的消费者API,因此有两个独立的相应Spark Streaming包可用。请选择正确的包, 请注意,0.8集成与后来的0.9和0.1...

FrankDeng
2018/07/15
0
0

没有更多内容

加载失败,请刷新页面

加载更多

计算机实现原理专题--二进制减法器(二)

在计算机实现原理专题--二进制减法器(一)中说明了基本原理,现准备说明如何来实现。 首先第一步255-b运算相当于对b进行按位取反,因此可将8个非门组成如下图的形式: 由于每次做减法时,我...

FAT_mt
今天
5
0
好程序员大数据学习路线分享函数+map映射+元祖

好程序员大数据学习路线分享函数+map映射+元祖,大数据各个平台上的语言实现 hadoop 由java实现,2003年至今,三大块:数据处理,数据存储,数据计算 存储: hbase --> 数据成表 处理: hive --> 数...

好程序员官方
今天
7
0
tabel 中含有复选框的列 数据理解

1、el-ui中实现某一列为复选框 实现多选非常简单: 手动添加一个el-table-column,设type属性为selction即可; 2、@selection-change事件:选项发生勾选状态变化时触发该事件 <el-table @sel...

everthing
今天
6
0
【技术分享】TestFlight测试的流程文档

上架基本需求资料 1、苹果开发者账号(如还没账号先申请-苹果开发者账号申请教程) 2、开发好的APP 通过本篇教程,可以学习到ios证书申请和打包ipa上传到appstoreconnect.apple.com进行TestF...

qtb999
今天
10
0
再见 Spring Boot 1.X,Spring Boot 2.X 走向舞台中心

2019年8月6日,Spring 官方在其博客宣布,Spring Boot 1.x 停止维护,Spring Boot 1.x 生命周期正式结束。 其实早在2018年7月30号,Spring 官方就已经在博客进行过预告,Spring Boot 1.X 将维...

Java技术剑
今天
18
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部