文档章节

windows环境下flink入门demo实例

凯京技术团队
 凯京技术团队
发布于 01/29 18:08
字数 1732
阅读 109
收藏 1

前言碎语

为了应对凯京科技集团的飞速发展,凯京科技研发中心2019定下了数据中台的目标。数据处理我们选择了批处理+流处理结合的大数据应用软件新秀Apache Flink,前几天阿里又发出好信息称将开源Blink(Flink早期分支迁出迭代优化),所以今天来近距离感受下Flink。博主之前没接触过大数据相关的东西,所以不细究其设计概念了。目标就是跑一个最简单的流处理的例子,后面慢慢深入后在和大家分享具体的组件概念以及api设计。

Apache Flink是什么?

Apache Flink 是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态计算。可部署在各种集群环境,对各种大小的数据规模进行快速计算。上面是非常官方的描述,说白了我们为什么选择Flink,是因为他在社区口碑非常不错。在国内的话有阿里这种大数据大流量的公司一直在输出,当然像腾讯、华为、饿了么、滴滴等也都有使用Apache Flink。

进入正题

本篇博文涉及到的软件工具以及下载地址:

Apache Flink :https://flink.apache.org/downloads.html

Netcat:https://eternallybored.org/misc/netcat/

Netcat是一个有“瑞士军刀”美誉的网络工具,这里用来绑定端口等待Apache Flink的连接

第一步:启动Flink

从上面的地址下载Flink后是一个压缩包,解压后的目录结构如下:

/conf/flink-conf.yaml里有一些Flink的基本配置信息,如,jobmanager、taskmanager的端口和jvm内存(默认1024M)大小,web控制台的端口(默认8081)等。我们可以不该任何配置,然后进入到bin下,执行start-cluster.bat。这里要注意不是并不是flink.bat。flink.bat是用来提交job的。还有要确保相关的端口没有被占用

运行成功后会有两个java黑窗口(一个TaskManager、一个JobManager),如果只有一个java黑窗口,很可能是你的TaskManager因为端口占用没有启动起来,成功后访问:http://localhost:8081.就会看到如下的web管理控制台了:

如果启动失败的话,上面箭头所指向的地方应该是0.

第二步:job任务编写

1.首先需要新建一个maven工程,然后导入Flink的接口依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>1.7.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_2.11</artifactId>
    <version>1.7.1</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_2.11</artifactId>
    <version>1.7.1</version>
</dependency>

2.编写具体的job,官方提供了一个单词统计的demo

package com.kl;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class SocketWindowWordCount {
   public static void main(String[] args) throws Exception {
      // the host and the port to connect to
      final String hostname;
      final int port;
      try {
         final ParameterTool params = ParameterTool.fromArgs(args);
         hostname = params.has("hostname") ? params.get("hostname") : "localhost";
         port = params.has("port") ? params.getInt("port"):9000;
      } catch (Exception e) {
         System.err.println("No port specified. Please run 'SocketWindowWordCount " +
            "--hostname <hostname> --port <port>', where hostname (localhost by default) " +
            "and port is the address of the text server");
         System.err.println("To start a simple text server, run 'netcat -l <port>' and " +
            "type the input text into the command line");
         return;
      }
      // get the execution environment
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      // get input data by connecting to the socket
      DataStream<String> text = env.socketTextStream(hostname, port, "\n");
      // parse the data, group it, window it, and aggregate the counts
      DataStream<WordWithCount> windowCounts = text
            .flatMap(new FlatMapFunction<String, WordWithCount>() {
               public void flatMap(String value, Collector<WordWithCount> out) {
                  for (String word : value.split("\\s")) {
                     out.collect(new WordWithCount(word, 1L));
                  } }})
            .keyBy("word")
            .timeWindow(Time.seconds(5))
            .reduce(new ReduceFunction<WordWithCount>() {
               public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                  return new WordWithCount(a.word, a.count + b.count);
               }});
      // print the results with a single thread, rather than in parallel
      windowCounts.print().setParallelism(1);
      env.execute("Socket Window WordCount");
   }
   /**
    * Data type for words with count.
    */
   public static class WordWithCount {
      public String word;
      public long count;
      public WordWithCount() {}
      public WordWithCount(String word, long count) {
         this.word = word;
         this.count = count;
      }
      @Override
      public String toString() {
         return word + " : " + count;
      }
   }
}

上面demo实现了从启动参数中获取ip和端口,然后连接从输入流接收文本信息,然后统计文本里单词出现的次数。因为要打成可运行的jar,所以,还需要引入maven的jar打包插件,如下:

<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-shade-plugin</artifactId>
            <version>1.2.1</version>
            <executions>
                <execution>
                    <phase>package</phase>
                    <goals>
                        <goal>shade</goal>
                    </goals>
                    <configuration>
                        <transformers>
                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                <mainClass>com.kl.SocketWindowWordCount</mainClass>
                            </transformer>
                        </transformers>
                    </configuration>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

mainClass标签中就是你的main方法所在类全类名。然后mvn install就可以打出一个可运行的jar包了。

第三步:Netcat监听端口,等待连接

从上面贴的地址下载Netcat后,是一个压缩包,有些安全软件可能会报病毒,请忽略就好了。然后解压文件目录如下:

进入到这个目录,然后执行: nc64.exe -l -p 9000。相当于打开了9000端口,并监听了入站信息。最后实现的效果就是从这个窗口中输入的数据,回车后会发送Apache Flink中我们提交的job中处理输出,所以这里的9000端口,要和我们等下启动job的启动参数端口一致。

第四步:提交job运行

运行job有两种方式:可以通过Flink.bat运行,也可以通过web控制台运行。

命令行运行:

flink run E:\flinkWorkingspce\flinkdemo\target\finlk-demo-1.0-SNAPSHOT.jar --port 9000

web控制台运行:

如上图,点击Add New后选择你的jar包然后上传,上传成功就会在列表里列出来。然后选中你上传的jar。就会出现如下图的输入框,可以输入你的启动参数,然后点击submit提交就可以了

第五步:验证效果

提交后如果没有问题,job的详情页面如下:

这个时候我们从Netcat的监听的黑窗口中敲入一些长文本,就会在Flink的job里统计输出出来如:

文末结语

Flink的Windows环境入门实例还算顺利,这只是第一步,后面Apache Flink的生产落地肯定还会有更多的问题和挑战。我们会把落地过程中的问题拿到osc分享、来和大家一起交流,欢迎大家关注凯京科技。

作者简介:

陈凯玲,2016年5月加入凯京科技。曾任职高级研发和项目经理,现任凯京科技研发中心架构&运维部负责人。pmp项目管理认证拥有者,阿里云认证最有价值专家MVP。热爱开源,先后开源过多个热门项目。热爱分享技术点滴,独立博客KL博客(http://www.kailing.pub)博主。

欢迎加入凯京开源技术QQ群:613025121,和我们一起交流互联网应用的技术架构落地实践

© 著作权归作者所有

共有 人打赏支持
凯京技术团队
粉丝 27
博文 10
码字总数 18554
作品 0
崇明
架构师
私信 提问
《从0到1学习Flink》—— Mac 上搭建 Flink 1.6.0 环境并构建运行简单程序入门

准备工作 1、安装查看 Java 的版本号,推荐使用 Java 8。 安装 Flink 2、在 Mac OS X 上安装 Flink 是非常方便的。推荐通过 homebrew 来安装。 3、检查安装: 结果: 4、启动 flink 接着就可...

技术小能手
2018/11/12
0
0
Apache Flink 1.3.1 发布,通用数据处理平台

Apache Flink 1.3.1 发布了,这是 Apache Flink 1.3 系列的首个 bug 修复版本。该版本包含 50 个修复程序和对 Flink 1.3.0 的小改进。下面的列表包括所有修补程序的详细列表。建议用户升级至...

局长
2017/06/24
749
3
[Flink]Flink1.3 Batch指南一 本地运行

Flink可以在单台机器上运行,甚至可以在单个Java虚拟机中运行。 这运行机制可以方便用户在本地测试和调试Flink程序。本节概述了Flink的本地执行机制。 本地环境和执行器(executors)允许你可以...

sjf0115
2017/10/23
0
0
聊聊flink Table的Over Windows

序 本文主要研究一下flink Table的Over Windows 实例 Over Windows类似SQL的over子句,它可以基于event-time、processing-time或者row-count;具体可以通过Over类来构造,其中必须设置order...

go4it
01/27
0
0
《从0到1学习Flink》—— 如何自定义 Data Source ?

前言 在 《从0到1学习Flink》—— Data Source 介绍 文章中,我给大家介绍了 Flink Data Source 以及简短的介绍了一下自定义 Data Source,这篇文章更详细的介绍下,并写一个 demo 出来让大家...

技术小能手
2018/11/27
0
0

没有更多内容

加载失败,请刷新页面

加载更多

CDH5之时钟偏差问题

CDH5之时钟偏差问题 一、介绍 主机时钟偏差的问题,是分布式中各个主机之间存在系统时差,或者和ntp服务器的时间不同步造成的。如果集群之中没有配置ntp服务,那么时钟偏差会非常频繁,如下图...

星汉
7分钟前
0
0
ArrayBlockingQueue 与 LinkedBlockingDeque 的内部实现

区别 类似于ArrayList 与 LinkedList 的区别,ArrayBlockingQueue 与 ArrayList 的内部存储结构为数组;而LinkedBlockingDeque 与 LinkedList 的内存存储结构是一个双向链表的存储(所以两者...

noob_fly
8分钟前
0
0
巨杉数据库中标广州银行影像内容管理平台项目

近期,巨杉数据库中标广州银行影像内容管理平台项目,助推广州银行智慧业务升级。 随着银行在智慧化转型的不断发展,影像、音视频甚至用户生物特征信息等非结构化数据在银行数据管理中的比重...

巨杉数据库
10分钟前
0
0
网络社交如何保护个人隐私?做好这4步

在这个全民社交时代,互联网成为了我们生活最主要的娱乐方式,但也让我们的“一举一动”变得有迹可寻。比如,在微博上发布的动态、评论、定位以及第三方应用的授权等操作,都在不经意间将个人...

ThinkSNS官方帐号
11分钟前
0
0
CentOS 编译安装的软件卸载方法

之前在源码包的位置执行的 make install 现在就是反过来 执行make uninstall 本人微信: 本人QQ:

lwkai
13分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部