文档章节

因为有它,Spark集群的交互操作变得更简单

openfea
 openfea
发布于 2017/06/15 21:04
字数 1252
阅读 8
收藏 0

Spark 2.X开发的一个动机是让它可以触及更广泛的受众,特别是缺乏编程技能但可能非常熟悉SQL的数据分析师或业务分析师。因此,Spark 2.X现在比以往更易使用。

在以前的Spark 1.x版本中,主要使用RDD(弹性分布式数据集),所有的操作都是基于RDD的转化,而在Spark 2.x中,主要基于DataFrame操作,所有的操作都是基于dataframe进行操作。

在本文中,我将重点介绍使用fea spk包如何进行spark的dataframe操作,为以后进行fea大数据分析做一下铺垫。使用这种方式的优势在于,可以利用spark集群的分布式原理,对大规模的数据进行分析和处理,步骤如下:

1、 创建spk连接

在spark 2.X的操作里面,使用SparkSession为Spark集群提供了唯一的入口点。val spk= SparkSession.builder.  master("local")  .appName("spark session example")  .getOrCreate()

而使用fea spk包,需要创建的spk连接如下

spk = @udf df0@sys by spk.open_spark

2. fea spk dataframe

fea spk操作有2种dataframe,一种是pandas的dataframe,可以直接在fea里面运行dump查看。

另外一种是spark的dataframe,它能够进行各种各样的spark算子操作,比如group,agg等。

spark dataframe需要转换为pandas的dataframe才能运行dump命令查看,转换的原语如下:

pd= @udf df by spk.to_DF  #spark dataframe df转换为pandas dataframe pd

dump pd   

#可以直接使用dump命令查看

sdf= @udf spk,pd by spk.to_SDF 

#将pandas dataframe pd转换为spark dataframe sdf,以便进行spark的各种操作。

3. 使用spk连接读取数据

fea spk包支持各种各样的数据源。如,hive,mongodb,text,avro , json, csv , parquet,mysql,oracle等数据源,下面列举几个比较常见的数据源来进行演示。

  • csv数据源

a.csv文件格式如下:

id,hash

1,ssss

2,333

3,5567

使用如下命令,连接读取数据

df= @udf spk by spk.load_csv with (header,/data/a.csv)

pd= @udf df by spk.to_DF

dump pd

  • Mysql数据源

Mysql中student_infos表数据如下:

使用如下命令,连接读取数据

df1= @udf spk by spk.load_mysql with (student_infos)

pd= @udf df1 by spk.to_DF

dump pd

4. 使用spk包 来进行groupby,agg操作

d.csv数据如下

df2= @udf spk by spk.load_csv with (header,/data/d.csv)

df3= @udf df2 by spk.group with (name) 

#对df2表的name字段进行group操作

df4= @udf df3 by spk.agg with (salary:avg,consumer:sum)

#对group之后的df3表的salary字段求均值,consumer字段进行求和操作

pd= @udf df4 by spk.to_DF

dump pd

5 使用spk包来进行join操作

b.csv数据如下

c.csv数据如下

df5= @udf spk by spk.load_csv with (header,/data/b.csv)

df6= @udf spk by spk.load_csv with (header,/data/c.csv)

df7= @udf df5,df6 by spk.join with (name:name1,inner)

#按照df5表的name字段,df6表的name1字段进行join内连接

pd= @udf df7 by spk.to_DF

dump pd

6. 使用spk包给表的一列或者多列重命名

对于上面的df7表,把name命名为name1,age命名为age1

df8=@udf df7 by spk.rename with (name:name1,age:age1)

pd=@udf df8 by spk.to_DF

dump pd

7 使用spk包对表按照某种条件进行过滤

以上面的df6表为例,统计income字段大于3000

df9= @udf df6 by spk.filter with (income>3000)
pd=@udf df9 by spk.to_DF

8. 使用spk包将表注册成能够使用SQL语句的表

以上面的df7表为例进行说明,将表注册为employee表

a= @udf df7 by spk.df_table with (employee)

使用SQL语句查询注册的表,返回DF

df10= @udf spk by spk.df_sql with (select * from employee where income>2000)
pd=@udf df10 by spk.to_DF

dump pd

9 将表保存为parquet文件格式

以df10为例,保存目录为hdfs的目录/user/root/employee.parquet

b=@udf df10 by spk.save_parquet with (employee.parquet)

 此外spk还有很多原语,暂时列举一部分,下面进行spk包机器学习的演示。

使用spk包进行机器学习,真正实现了分布式机器学习的思想,替代了原始的单机版本的机器学习,大大提高了机器学习的速度和吞吐量。目前spk包支持的机器学习还是比较完善的,包括逻辑回归,决策树,随机森林,贝叶斯,神经网络,Kmeans等算法。

10.使用随机森林进行分类

m1表的内容如下:

前面4个是特征,后面label为标签,有3种情况,0,1,2,下面使用随机森林算法进行模型的训练。注意,使用spk包进行机器学习,要求表的字段为double类型,所以要先进行转换。

m1= @udf m1 by spk.ML_double

md1= @udf m1 by spk.ML_rf with (maxDepth=5, numTrees=10)

md1是训练出的随机森林模型

下面进行预测,预测的表为m2,数据具有4个特征,不包括标签列,表格式如下

r1= @udf m2 by spk.ML_predict with (md1@public)
pd=@udf r1 by spk.to_DF

dump pd

prediction这列就是预测的结果

下面对模型进行打分

s1= @udf m1 by spk.ML_score with (md1@public)

dump s1

© 著作权归作者所有

openfea
粉丝 18
博文 86
码字总数 95615
作品 1
杭州
其他
私信 提问
大数据入门与实战-Spark上手

1 Spark简介 1.1 引言 行业正在广泛使用Hadoop来分析他们的数据集。原因是Hadoop框架基于简单的编程模型(MapReduce),它使计算解决方案具有可扩展性,灵活性,容错性和成本效益。在这里,主...

致Great
03/12
0
0
Spark与Hadoop的比较(特别说一下 Spark 和 MapReduce比较)

Hadoop和Spark方面要记住的最重要一点就是,它们并不是非此即彼的关系,因为它们不是相互排斥,也不是说一方是另一方的简易替代者。两者彼此兼容,这使得这对组合成为一种功能极其强大的解决...

小海bug
2018/06/21
160
0
什么是 Apache Spark?大数据分析平台如是说

自从 Apache Spark 2009 年在 U.C. Berkeley 的 AMPLab 默默诞生以来,它已经成为这个世界上最重要的分布式大数据框架之一。Spark 可以用多种方式部署,它为 Java、Scala、Python,和 R 编程...

oschina
2017/11/22
708
0
Spark 入门(Python、Scala 版)

本文中,我们将首先讨论如何在本地机器上利用Spark进行简单分析。然后,将在入门级水平探索Spark,了解Spark是什么以及它如何工作(希望可以激发更多探索)。最后两节将开始通过命令行与Spa...

大数据之路
2015/05/07
7.5K
0
Spark 数据分析导论-笔记

Spark Core Spark Core 实现了Spark 的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。 Spark Core 中还包含了 对弹性分布式数据集(resilient distributed dataset,简...

Java搬砖工程师
2018/12/26
28
0

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周六乱弹 —— 早上儿子问我他是怎么来的

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @凉小生 :#今日歌曲推荐# 少点戾气,愿你和这个世界温柔以待。中岛美嘉的单曲《僕が死のうと思ったのは (曾经我也想过一了百了)》 《僕が死の...

小小编辑
今天
1K
12
Excption与Error包结构,OOM 你遇到过哪些情况,SOF 你遇到过哪些情况

Throwable 是 Java 中所有错误与异常的超类,Throwable 包含两个子类,Error 与 Exception 。用于指示发生了异常情况。 Java 抛出的 Throwable 可以分成三种类型。 被检查异常(checked Exc...

Garphy
今天
22
0
计算机实现原理专题--二进制减法器(二)

在计算机实现原理专题--二进制减法器(一)中说明了基本原理,现准备说明如何来实现。 首先第一步255-b运算相当于对b进行按位取反,因此可将8个非门组成如下图的形式: 由于每次做减法时,我...

FAT_mt
昨天
15
0
好程序员大数据学习路线分享函数+map映射+元祖

好程序员大数据学习路线分享函数+map映射+元祖,大数据各个平台上的语言实现 hadoop 由java实现,2003年至今,三大块:数据处理,数据存储,数据计算 存储: hbase --> 数据成表 处理: hive --> 数...

好程序员官方
昨天
25
0
tabel 中含有复选框的列 数据理解

1、el-ui中实现某一列为复选框 实现多选非常简单: 手动添加一个el-table-column,设type属性为selction即可; 2、@selection-change事件:选项发生勾选状态变化时触发该事件 <el-table @sel...

everthing
昨天
11
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部