文档章节

Apache Beam 的Spark Streaming 示例

Ryan-瑞恩
 Ryan-瑞恩
发布于 2017/04/01 10:27
字数 350
阅读 463
收藏 1

应网友要求,需要使用Beam,底层使用Spark 接受 Kafka 流数据。

我看了下源码的实现,这里只是给出Demo,没有做测试,具体执行,自行做测试使用,如果有什么问题,可以留言交流。

package com.ryan.beam.spark.stream;

import org.apache.beam.runners.direct.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.BigEndianLongCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.KV;
import org.apache.spark.api.java.StorageLevels;
import org.joda.time.Instant;

/**
 * <pre>
 * User:        Ryan
 * Date:        2017/3/6
 * Email:       liuwei412552703@163.com
 * Version      V1.0
 * Discription:
 */
public class BeamKafkaWordCount {


    public static void main(String[] args) {
        SparkPipelineOptions sparkPipelineOptions = PipelineOptionsFactory.as(SparkPipelineOptions.class);
        sparkPipelineOptions.setSparkMaster("local[4]");
        sparkPipelineOptions.setJobName("kafkaWordCount");
        sparkPipelineOptions.setAppName("kafkaWordCount");
        sparkPipelineOptions.setStorageLevel(StorageLevels.MEMORY_AND_DISK.toString());
        sparkPipelineOptions.setEnableSparkMetricSinks(true);
        sparkPipelineOptions.setBatchIntervalMillis(10L);
        sparkPipelineOptions.setMaxRecordsPerBatch(500L);


        Pipeline pipeline = Pipeline.create(sparkPipelineOptions);


        /**
         * Beam Spark Kafka 数据
         * 接受Kafka 数据
         *
         */
        ImmutableMap<String, Object> immutableMap = ImmutableMap.<String, Object>of("receive.buffer.bytes", 1024 * 1024);

        pipeline.apply(KafkaIO.read().withBootstrapServers("192.168.1.102:9092,192.168.1.102:9092")
                .withTopics(ImmutableList.of("topic_a", "topic_b"))
                // above two are required configuration. returns PCollection<KafkaRecord<byte[], byte[]>

                // rest of the settings are optional :

                // set a Coder for Key and Value (note the change to return type)
                .withKeyCoder(BigEndianLongCoder.of()) // PCollection<KafkaRecord<Long, byte[]>
                .withValueCoder(StringUtf8Coder.of())  // PCollection<KafkaRecord<Long, String>

                // you can further customize KafkaConsumer used to read the records by adding more
                // settings for ConsumerConfig. e.g :
                .updateConsumerProperties(immutableMap)

                // custom function for calculating record timestamp (default is processing time)
                .withTimestampFn(new SerializableFunction<KV<Long, String>, Instant>() {
                    @Override
                    public Instant apply(KV<Long, String> input) {

                        return null;
                    }
                })

                // custom function for watermark (default is record timestamp)
                .withWatermarkFn(new SerializableFunction<KV<Long, String>, Instant>() {
                    @Override
                    public Instant apply(KV<Long, String> input) {
                        return null;
                    }
                })

                // finally, if you don't need Kafka metadata, you can drop it
                // PCollection<KV<Long, String>>
                .withoutMetadata())
                // PCollection<String>
                .apply(Values.<String>create());


    }
}

 

Beam 是为了统一大数据处理的 组件,目前还处于孵化期,不建议生产使用!

 

 

© 著作权归作者所有

Ryan-瑞恩

Ryan-瑞恩

粉丝 153
博文 245
码字总数 189501
作品 0
西安
后端工程师
私信 提问
Apache Beam 0.5.0 发布,大数据批处理和流处理标准

Apache Beam 0.5.0 发布了,Apache Beam 是 Google 在2016年2月份贡献给 Apache 基金会的项目,主要目标是统一批处理和流处理的编程范式,为无限,乱序,web-scale的数据集处理提供简单灵活,...

王练
2017/02/10
1K
0
Apache Beam

@Ryan-瑞恩 你好,想跟你请教个问题: 可以在多写一点关于Apache Beam的博文吗,还有Apache beam可以基于Storm 或者 Spark Streaming吗

zhoujianqian
2017/03/22
86
1
Apache Beam WordCount编程实战及源码解读

概述:Apache Beam WordCount编程实战及源码解读,并通过intellij IDEA和terminal两种方式调试运行WordCount程序,Apache Beam对大数据的批处理和流处理,提供一套先进的统一的编程模型,并可...

xiaomin0322
05/31
24
0
Apache Beam 2.9.0 发布,大数据批处理和流处理标准

Apache Beam 2.9.0 发布了。Apache Beam 是 Google 在2016年2月份贡献给 Apache 基金会的项目,主要目标是统一批处理和流处理的编程范式,为无限、乱序、web-scale 的数据集处理提供简单灵活...

局长
2018/12/16
727
0
玩转KafkaIO与Flink

随着大数据 2.0 时代悄然到来,大数据从简单的批处理扩展到了实时处理、流处理、交互式查询和机器学习应用。近年来涌现出诸多大数据应用组件,如 HBase、Hive、Kafka、Spark、Flink 等。开发...

微笑向暖wx
2018/09/28
224
0

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周六乱弹 —— 早上儿子问我他是怎么来的

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @凉小生 :#今日歌曲推荐# 少点戾气,愿你和这个世界温柔以待。中岛美嘉的单曲《僕が死のうと思ったのは (曾经我也想过一了百了)》 《僕が死の...

小小编辑
今天
2.2K
15
Excption与Error包结构,OOM 你遇到过哪些情况,SOF 你遇到过哪些情况

Throwable 是 Java 中所有错误与异常的超类,Throwable 包含两个子类,Error 与 Exception 。用于指示发生了异常情况。 Java 抛出的 Throwable 可以分成三种类型。 被检查异常(checked Exc...

Garphy
今天
38
0
计算机实现原理专题--二进制减法器(二)

在计算机实现原理专题--二进制减法器(一)中说明了基本原理,现准备说明如何来实现。 首先第一步255-b运算相当于对b进行按位取反,因此可将8个非门组成如下图的形式: 由于每次做减法时,我...

FAT_mt
昨天
40
0
好程序员大数据学习路线分享函数+map映射+元祖

好程序员大数据学习路线分享函数+map映射+元祖,大数据各个平台上的语言实现 hadoop 由java实现,2003年至今,三大块:数据处理,数据存储,数据计算 存储: hbase --> 数据成表 处理: hive --> 数...

好程序员官方
昨天
61
0
tabel 中含有复选框的列 数据理解

1、el-ui中实现某一列为复选框 实现多选非常简单: 手动添加一个el-table-column,设type属性为selction即可; 2、@selection-change事件:选项发生勾选状态变化时触发该事件 <el-table @sel...

everthing
昨天
21
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部