文档章节

Apache Beam WordCount编程实战及源码解读

xiaomin0322
 xiaomin0322
发布于 05/31 13:34
字数 1477
阅读 34
收藏 1

概述:Apache Beam WordCount编程实战及源码解读,并通过intellij IDEA和terminal两种方式调试运行WordCount程序,Apache Beam对大数据的批处理和流处理,提供一套先进的统一的编程模型,并可以运行大数据处理引擎上。完整项目Github源码

负责公司大数据处理相关架构,但是具有多样性,极大的增加了开发成本,急需统一编程处理,Apache Beam,一处编程,处处运行,故将折腾成果分享出来。

1.Apache Beam编程实战–前言,Apache Beam的特点与关键概念。
Apache Beam 于2017年1月10日成为Apache新的顶级项目。

1.1.Apache Beam 特点:
统一:对于批处理和流媒体用例使用单个编程模型。
方便:支持多个pipelines环境运行,包括:Apache Apex, Apache Flink, Apache Spark, 和 Google Cloud Dataflow。
可扩展:编写和分享新的SDKs,IO连接器和transformation库 
部分翻译摘自官网:Apacher Beam 官网
1.2.Apache Beam关键概念:
1.2.1.Apache Beam SDKs
主要是开发API,为批处理和流处理提供统一的编程模型。目前(2017)支持JAVA语言,而Python正在紧张开发中。

1.2.2. Apache Beam Pipeline Runners(Beam的执行器/执行者们),支持Apache Apex,Apache Flink,Apache Spark,Google Cloud Dataflow多个大数据计算框架。可谓是一处Apache Beam编程,多计算框架运行。
1.2.3. 他们的对如下的支持情况详见


2.Apache Beam编程实战–Apache Beam源码解读
基于maven,intellij IDEA,pom.xm查看 完整项目Github源码 。直接通过IDEA的项目导入功能即可导入完整项目,等待MAVEN下载依赖包,然后按照如下解读步骤即可顺利运行。

2.1.源码解析-Apache Beam 数据流处理原理解析:
关键步骤:

创建Pipeline
将转换应用于Pipeline
读取输入文件
应用ParDo转换
应用SDK提供的转换(例如:Count)
写出输出
运行Pipeline


2.2.源码解析,完整项目Github源码,附WordCount,pom.xml等
/**
 * MIT.
 * Author: wangxiaolei(王小雷).
 * Date:17-2-20.
 * Project:ApacheBeamWordCount.
 */


import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.Aggregator;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;


public class WordCount {

    /**
     *1.a.通过Dofn编程Pipeline使得代码很简洁。b.对输入的文本做单词划分,输出。
     */
    static class ExtractWordsFn extends DoFn<String, String> {
        private final Aggregator<Long, Long> emptyLines =
                createAggregator("emptyLines", Sum.ofLongs());

        @ProcessElement
        public void processElement(ProcessContext c) {
            if (c.element().trim().isEmpty()) {
                emptyLines.addValue(1L);
            }

            // 将文本行划分为单词
            String[] words = c.element().split("[^a-zA-Z']+");
            // 输出PCollection中的单词
            for (String word : words) {
                if (!word.isEmpty()) {
                    c.output(word);
                }
            }
        }
    }

    /**
     *2.格式化输入的文本数据,将转换单词为并计数的打印字符串。
     */
    public static class FormatAsTextFn extends SimpleFunction<KV<String, Long>, String> {
        @Override
        public String apply(KV<String, Long> input) {
            return input.getKey() + ": " + input.getValue();
        }
    }
    /**
     *3.单词计数,PTransform(PCollection Transform)将PCollection的文本行转换成格式化的可计数单词。
     */
    public static class CountWords extends PTransform<PCollection<String>,
            PCollection<KV<String, Long>>> {
        @Override
        public PCollection<KV<String, Long>> expand(PCollection<String> lines) {

            // 将文本行转换成单个单词
            PCollection<String> words = lines.apply(
                    ParDo.of(new ExtractWordsFn()));

            // 计算每个单词次数
            PCollection<KV<String, Long>> wordCounts =
                    words.apply(Count.<String>perElement());

            return wordCounts;
        }
    }

    /**
     *4.可以自定义一些选项(Options),比如文件输入输出路径
     */
    public interface WordCountOptions extends PipelineOptions {

        /**
         * 文件输入选项,可以通过命令行传入路径参数,路径默认为gs://apache-beam-samples/shakespeare/kinglear.txt
         */
        @Description("Path of the file to read from")
        @Default.String("gs://apache-beam-samples/shakespeare/kinglear.txt")
        String getInputFile();
        void setInputFile(String value);

        /**
         * 设置结果文件输出路径,在intellij IDEA的运行设置选项中或者在命令行中指定输出文件路径,如./pom.xml
         */
        @Description("Path of the file to write to")
        @Required
        String getOutput();
        void setOutput(String value);
    }
    /**
     * 5.运行程序
     */
    public static void main(String[] args) {
        WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(WordCountOptions.class);
        Pipeline p = Pipeline.create(options);

        p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
                .apply(new CountWords())
                .apply(MapElements.via(new FormatAsTextFn()))
                .apply("WriteCounts", TextIO.Write.to(options.getOutput()));

        p.run().waitUntilFinish();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
3.支持Spark,Flink,Apex等大数据数据框架来运行该WordCount程序。完整项目Github源码(推荐,注意pom.xml模块加载是否成功,在工具中开发大数据程序,利于调试,开发体验较好)
3.1.intellij IDEA(社区版)中Spark大数据框架运行Pipeline计算程序
Spark运行

设置VM options

-DPspark-runner
1
设置Programe arguments

--inputFile=pom.xml --output=counts
1


3.2.intellij IDEA(社区版)中Apex,Flink等支持的大数据框架均可运行WordCount的Pipeline计算程序,完整项目Github源码
Apex运行

设置VM options

-DPapex-runner
1
设置Programe arguments

--inputFile=pom.xml --output=counts
1
Flink运行等等

设置VM options

-DPflink-runner
1
设置Programe arguments

--inputFile=pom.xml --output=counts
1
4.终端运行(Terminal)(不推荐,第一次下载过程很慢,开发体验较差)
4.1.以下命令是下载官方示例源码,第一次运行下载较慢,如果失败了就多运行几次,(推荐下载,完整项目Github源码)直接用上述解读在intellij IDEA中运行。
mvn archetype:generate       -DarchetypeRepository=https://repository.apache.org/content/groups/snapshots       -DarchetypeGroupId=org.apache.beam       -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples       -DarchetypeVersion=LATEST       -DgroupId=org.example       -DartifactId=word-count-beam       -Dversion="0.1"       -Dpackage=org.apache.beam.examples       -DinteractiveMode=false
1
2


4.2.打包并运行
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount      -Dexec.args="--runner=SparkRunner --inputFile=pom.xml --output=counts" -Pspark-runner
1


4.3.成功运行结果
4.3.1.显示运行成功


4.3.2.WordCount输出计算结果

--------------------- 
 

本文转载自:https://blog.csdn.net/dream_an/article/details/56277784 

上一篇: 全链路监控
xiaomin0322
粉丝 146
博文 3936
码字总数 199458
作品 0
上海
架构师
私信 提问
玩转KafkaIO与Flink

随着大数据 2.0 时代悄然到来,大数据从简单的批处理扩展到了实时处理、流处理、交互式查询和机器学习应用。近年来涌现出诸多大数据应用组件,如 HBase、Hive、Kafka、Spark、Flink 等。开发...

微笑向暖wx
2018/09/28
243
0
Apache Beam的架构概览

 Apache Beam是一个开源的数据处理编程库,由Google贡献给Apache的项目,前不久刚刚成为Apache TLP项目。它提供了一个高级的、统一的编程模型,允许我们通过构建Pipeline的方式实现批量、流...

xiaomin0322
05/31
45
0
Apache Beam 0.5.0 发布,大数据批处理和流处理标准

Apache Beam 0.5.0 发布了,Apache Beam 是 Google 在2016年2月份贡献给 Apache 基金会的项目,主要目标是统一批处理和流处理的编程范式,为无限,乱序,web-scale的数据集处理提供简单灵活,...

王练
2017/02/10
1K
0
Apache Beam 0.6.0,大数据批处理和流处理标准

Apache Beam 0.6.0 发布了,该版本为 Python 编程语言引入了一个新的 SDK。 此外,该版本为 Apache HBase 在 Java SDK 中添加了一个新的 IO 链接器,以及一些常见的错误修复和改进。 最后,还...

局长
2017/03/18
950
0
Apache 基金会宣布 Apache Beam 成为顶级项目

1月10日,Apache 软件基金会宣布,Apache Beam 已经成功地从孵化毕业,成为基金会的一个新的顶级项目。 Apache Beam 是 Google 在2016年2月份贡献给 Apache 基金会孵化的项目。项目的名称表明...

王练
2017/01/12
3.5K
1

没有更多内容

加载失败,请刷新页面

加载更多

vue 2打包注意点

使用npm run build打包之后往往直接本地运行,路径类似这样:http://127.0.0.1:5500/xa/dist/index.html 或者http://127.0.0.1:5500/dist/index.html。然后页面打开是空白的,打开控制台查看...

牧云橙
20分钟前
4
0
归并排序

1.原理图 2.代码 public static void merge(int []a,int left,int mid,int right){ int []tmp=new int[a.length];//辅助数组 int p1=left,p2=mid+1,k=left;//p1、p2是检测......

wen123
24分钟前
4
0
css实现透明的两种方法

一、opacity:0~1 值越高,透明度越低: div{opacity:0.5 } 选择器匹配到的节点们,包括节点们的孩子节点,都会实现%50透明,另 0.5 可直接写成 .5 二、rgba(0~255,0~255,0~255,0~1) r...

Bing309
27分钟前
4
0
Tomcat 配置访问路径

此处只是部署完成后idea打开的默认路径,并非项目部署路径, 此处才是项目实际部署路径,可以有多个项目部署路径,idea可以配置默认打开一个

Aeroever
30分钟前
4
0
将ApiBoot Logging采集的日志上报到Admin

通过ApiBoot Logging可以将每一条请求的详细信息获取到,在分布式部署方式中,一个请求可能会经过多个服务,如果是每个服务都独立保存请求日志信息,我们没有办法做到统一的控制,而且还会存...

恒宇少年
30分钟前
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部