Apache Spark 2.2.0 中文文档 - SparkR (R on Spark) | ApacheCN

原创
2017/09/26 12:43
阅读数 70

SparkR (R on Spark)

概述

SparkR 是一个 R package, 它提供了一个轻量级的前端以从 R 中使用 Apache Spark. 在 Spark 2.2.0 中, SparkR 提供了一个分布式的 data frame, 它实现了像 selection, filtering, aggregation etc 一系列所支持的操作.(dplyr 与 R data frames 相似) ), 除了可用于海量数据上之外. SparkR 还支持使用 MLlib 来进行分布式的 machine learning(机器学习).

SparkDataFrame

SparkDataFrame 是一个分布式的, 将数据映射到有名称的 colums(列)的集合. 在概念上 相当于关系数据库中的 table 表或 R 中的 data frame,但在该引擎下有更多的优化. SparkDataFrames 可以从各种来源构造,例如: 结构化的数据文件,Hive 中的表,外部数据库或现有的本地 R data frames.

All of the examples on this page use sample data included in R or the Spark distribution and can be run using the ./bin/sparkR shell.

启动: SparkSession

SparkR 的入口点是 SparkSession, 它会连接您的 R 程序到 Spark 集群中. 您可以使用 sparkR.session 来创建 SparkSession, 并传递诸如应用程序名称, 依赖的任何 spark 软件包等选项, 等等. 此外,还可以通过 SparkSession 来与 SparkDataFrames 一起工作。 如果您正在使用 sparkR shell,那么 SparkSession 应该已经被创建了,你不需要再调用 sparkR.session.

sparkR.session()

从 RStudio 来启动

您可以从 RStudio 中来启动 SparkR. 您可以从 RStudio, R shell, Rscript 或者 R IDEs 中连接你的 R 程序到 Spark 集群中去. 要开始, 确保已经在环境变量中设置好 SPARK_HOME (您可以检测下 Sys.getenv), 加载 SparkR package, 并且像下面一样调用 sparkR.session. 它将检测 Spark 的安装, 并且, 如果没有发现, 它将自动的下载并且缓存起来. 当然,您也可以手动的运行 install.spark.

为了调用 sparkR.session, 您也可以指定某些 Spark driver 的属性. 通常哪些 应用程序属性 和 运行时环境 不能以编程的方式来设置, 这是因为 driver 的 JVM 进程早就已经启动了, 在这种情况下 SparkR 会帮你做好准备. 要设置它们, 可以像在 sparkConfig 参数中的其它属性一样传递它们到 sparkR.session() 中去.

if (nchar(Sys.getenv("SPARK_HOME")) < 1) {
  Sys.setenv(SPARK_HOME = "/home/spark")
}
library(SparkR, lib.loc = c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib")))
sparkR.session(master = "local[*]", sparkConfig = list(spark.driver.memory = "2g"))

下面的 Spark driver 属性可以 从 RStudio 的 sparkR.session 的 sparkConfig 中进行设置:

Property Name<(属性名称) Property group(属性分组) spark-submit equivalent
spark.master Application Properties --master
spark.yarn.keytab Application Properties --keytab
spark.yarn.principal Application Properties --principal
spark.driver.memory Application Properties --driver-memory
spark.driver.extraClassPath Runtime Environment --driver-class-path
spark.driver.extraJavaOptions Runtime Environment --driver-java-options
spark.driver.extraLibraryPath Runtime Environment --driver-library-path

创建 SparkDataFrames

有了一个 SparkSession 之后, 可以从一个本地的 R data frame, Hive 表, 或者其它的 data sources 中来创建 SparkDataFrame 应用程序.

从本地的 data frames 来创建 SparkDataFrames

要创建一个 data frame 最简单的方式是去转换一个本地的 R data frame 成为一个 SparkDataFrame. 我们明确的使用 as.DataFrame 或 createDataFrame 并且经过本地的 R data frame 中以创建一个 SparkDataFrame. 例如, 下面的例子基于 R 中已有的 faithful 来创建一个 SparkDataFrame.

df <- as.DataFrame(faithful)

# 展示第一个 SparkDataFrame 的内容
head(df)
##  eruptions waiting
##1     3.600      79
##2     1.800      54
##3     3.333      74

从 Data Sources(数据源)创建 SparkDataFrame

SparkR 支持通过 SparkDataFrame 接口对各种 data sources(数据源)进行操作. 本节介绍使用数据源加载和保存数据的常见方法. 您可以查看 Spark Sql 编程指南的 specific options 部分以了解更多可用于内置的 data sources(数据源)内容.

从数据源创建 SparkDataFrames 常见的方法是 read.df. 此方法将加载文件的路径和数据源的类型,并且将自动使用当前活动的 SparkSession. SparkR 天生就支持读取 JSON, CSV 和 Parquet 文件, 并且通过可靠来源的软件包 第三方项目, 您可以找到 Avro 等流行文件格式的 data source connectors(数据源连接器). 可以用 spark-submit 或 sparkR 命令指定 --packages 来添加这些包, 或者在交互式 R shell 或从 RStudio 中使用sparkPackages 参数初始化 SparkSession.

sparkR.session(sparkPackages = "com.databricks:spark-avro_2.11:3.0.0")

We can see how to use data sources using an example JSON input file. Note that the file that is used here is not a typical JSON file. Each line in the file must contain a separate, self-contained valid JSON object. For more information, please see JSON Lines text format, also called newline-delimited JSON. As a consequence, a regular multi-line JSON file will most often fail.

我们可以看看如何使用 JSON input file 的例子来使用数据源. 注意, 这里使用的文件是 not 一个经典的 JSON 文件. 文件中的每行都必须包含一个单独的,独立的有效的JSON对象

people <- read.df("./examples/src/main/resources/people.json", "json")
head(people)
##  age    name
##1  NA Michael
##2  30    Andy
##3  19  Justin

# SparkR 自动从 JSON 文件推断出 schema(模式)
printSchema(people)
# root
#  |-- age: long (nullable = true)
#  |-- name: string (nullable = true)

# 同样, 使用  read.json 读取多个文件
people <- read.json(c("./examples/src/main/resources/people.json", "./examples/src/main/resources/people2.json"))

该 data sources API 原生支持 CSV 格式的 input files(输入文件). 要了解更多信息请参阅 SparkR read.df API 文档.

df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")

该 data sources API 也可用于将 SparkDataFrames 存储为多个 file formats(文件格式). 例如, 我们可以使用 write.df 把先前的示例的 SparkDataFrame 存储为一个 Parquet 文件.

write.df(people, path = "people.parquet", source = "parquet", mode = "overwrite")

从 Hive tables 来创建 SparkDataFrame

您也可以从 Hive tables(表)来创建 SparkDataFrames. 为此,我们需要创建一个具有 Hive 支持的 SparkSession,它可以访问 Hive MetaStore 中的 tables(表). 请注意, Spark 应该使用 Hive support 来构建,更多细节可以在 SQL 编程指南 中查阅.

sparkR.session()

sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

# Queries can be expressed in HiveQL.
results <- sql("FROM src SELECT key, value")

# results is now a SparkDataFrame
head(results)
##  key   value
## 1 238 val_238
## 2  86  val_86
## 3 311 val_311

SparkDataFrame 操作

SparkDataFrames 支持一些用于结构化数据处理的 functions(函数). 这里我们包括一些基本的例子,一个完整的列表可以在 API 文档中找到:

Selecting rows(行), columns(列)

# Create the SparkDataFrame
df <- as.DataFrame(faithful)

# 获取关于 SparkDataFrame 基础信息
df
## SparkDataFrame[eruptions:double, waiting:double]

# Select only the "eruptions" column
head(select(df, df$eruptions))
##  eruptions
##1     3.600
##2     1.800
##3     3.333

# You can also pass in column name as strings
head(select(df, "eruptions"))

# Filter the SparkDataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))
##  eruptions waiting
##1     1.750      47
##2     1.750      47
##3     1.867      48

Grouping, Aggregation(分组, 聚合)

SparkR data frames 支持一些常见的, 用于在 grouping(分组)数据后进行 aggregate(聚合)的函数. 例如, 我们可以在 faithful dataset 中计算 waiting 时间的直方图, 如下所示.

# We use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
##  waiting count
##1      70     4
##2      67     1
##3      69     2

# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
##   waiting count
##1      78    15
##2      83    14
##3      81    13

Operating on Columns(列上的操作)

SparkR 还提供了一些可以直接应用于列进行数据处理和 aggregatation(聚合)的函数. 下面的例子展示了使用基本的算术函数.

# Convert waiting time from hours to seconds.
# Note that we can assign this to a new column in the same SparkDataFrame
df$waiting_secs <- df$waiting * 60
head(df)
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440

应用 User-Defined Function(UDF 用户自定义函数)

在 SparkR 中, 我们支持几种 User-Defined Functions:

Run a given function on a large dataset using dapply or dapplyCollect

dapply

应用一个 function(函数)到 SparkDataFrame 的每个 partition(分区). 应用于 SparkDataFrame 每个 partition(分区)的 function(函数)应该只有一个参数, 它中的 data.frame 对应传递的每个分区. 函数的输出应该是一个 data.frame. Schema 指定生成的 SparkDataFrame row format. 它必须匹配返回值的 data types.

# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame.
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440
##4     2.283      62         3720
##5     4.533      85         5100
##6     2.883      55         3300

dapplyCollect

像 dapply 那样, 应用一个函数到 SparkDataFrame 的每个分区并且手机返回结果. 函数的输出应该是一个 data.frame. 但是, 不需要传递 Schema. 注意, 如果运行在所有分区上的函数的输出不能 pulled(拉)到 driver 的内存中过去, 则 dapplyCollect 会失败.

# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame and return a R's data.frame
ldf <- dapplyCollect(
         df,
         function(x) {
           x <- cbind(x, "waiting_secs" = x$waiting * 60)
         })
head(ldf, 3)
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440

Run a given function on a large dataset grouping by input column(s) and using gapply or gapplyCollect(在一个大的 dataset 上通过 input colums(输入列)来进行 grouping(分组)并且使用 gapply or gapplyCollect 来运行一个指定的函数)

gapply

应用给一个函数到 SparkDataFrame 的每个 group. 该函数被应用到 SparkDataFrame 的每个 group, 并且应该只有两个参数: grouping key 和 R data.frame 对应的 key. 该 groups 从 SparkDataFrame 的 columns(列)中选择. 函数的输出应该是 data.frame. Schema 指定生成的 SparkDataFrame row format. 它必须在 Spark data types 数据类型 的基础上表示 R 函数的输出 schema(模式). 用户可以设置返回的 data.frame列名.

# Determine six waiting times with the largest eruption time in minutes.
schema <- structType(structField("waiting", "double"), structField("max_eruption", "double"))
result <- gapply(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
    },
    schema)
head(collect(arrange(result, "max_eruption", decreasing = TRUE)))

##    waiting   max_eruption
##1      64       5.100
##2      69       5.067
##3      71       5.033
##4      87       5.000
##5      63       4.933
##6      89       4.900

gapplyCollect

像 gapply 那样, 将函数应用于 SparkDataFrame 的每个分区,并将结果收集回 R data.frame. 函数的输出应该是一个 data.frame. 但是,不需要传递 schema(模式). 请注意,如果在所有分区上运行的 UDF 的输出无法 pull(拉)到 driver 的内存, 那么 gapplyCollect 可能会失败.

# Determine six waiting times with the largest eruption time in minutes.
result <- gapplyCollect(
    df,
    "waiting",
    function(key, x) {
        y <- data.frame(key, max(x$eruptions))
        colnames(y) <- c("waiting", "max_eruption")
        y
    })
head(result[order(result$max_eruption, decreasing = TRUE), ])

##    waiting   max_eruption
##1      64       5.100
##2      69       5.067
##3      71       5.033
##4      87       5.000
##5      63       4.933
##6      89       4.900

使用 spark.lapply 分发运行一个本地的 R 函数

spark.lapply

类似于本地 R 中的 lapplyspark.lapply 在元素列表中运行一个函数,并使用 Spark 分发计算. 以类似于 doParallel 或 lapply 的方式应用于列表的元素. 所有计算的结果应该放在一台机器上. 如果不是这样, 他们可以像 df < - createDataFrame(list) 这样做, 然后使用 dapply.

# Perform distributed training of multiple models with spark.lapply. Here, we pass
# a read-only list of arguments which specifies family the generalized linear model should be.
families <- c("gaussian", "poisson")
train <- function(family) {
  model <- glm(Sepal.Length ~ Sepal.Width + Species, iris, family = family)
  summary(model)
}
# Return a list of model's summaries
model.summaries <- spark.lapply(families, train)

# Print the summary of each model
print(model.summaries)

SparkR 中运行 SQL 查询

A SparkDataFrame can also be registered as a temporary view in Spark SQL and that allows you to run SQL queries over its data. The sqlfunction enables applications to run SQL queries programmatically and returns the result as a SparkDataFrame.

# Load a JSON file
people <- read.df("./examples/src/main/resources/people.json", "json")

# Register this SparkDataFrame as a temporary view.
createOrReplaceTempView(people, "people")

# SQL statements can be run by using the sql method
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers)
##    name
##1 Justin

机器学习

算法

SparkR 现支持下列机器学习算法:

分类

回归

聚类

协同过滤

频繁模式挖掘

统计

SparkR 底层实现使用 MLlib 来训练模型. 有关示例代码,请参阅MLlib用户指南的相应章节. 用户可以调用summary输出拟合模型的摘要, 利用模型对数据进行预测, 并且使用 write.ml/read.ml 来 保存/加载拟合的模型 . SparkR 支持对模型拟合使用部分R的公式运算符, 包括 ‘~’, ‘.’, ‘:’, ‘+’, 和 ‘-‘.

模型持久化

下面的例子展示了SparkR如何 保存/加载 机器学习模型.

training <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
# Fit a generalized linear model of family "gaussian" with spark.glm
df_list <- randomSplit(training, c(7,3), 2)
gaussianDF <- df_list[[1]]
gaussianTestDF <- df_list[[2]]
gaussianGLM <- spark.glm(gaussianDF, label ~ features, family = "gaussian")

# Save and then load a fitted MLlib model
modelPath <- tempfile(pattern = "ml", fileext = ".tmp")
write.ml(gaussianGLM, modelPath)
gaussianGLM2 <- read.ml(modelPath)

# Check model summary
summary(gaussianGLM2)

# Check model prediction
gaussianPredictions <- predict(gaussianGLM2, gaussianTestDF)
head(gaussianPredictions)

unlink(modelPath)

Find full example code at "examples/src/main/r/ml/ml.R" in the Spark repo.

R和Spark之间的数据类型映射

R Spark
byte byte
integer integer
float float
double double
numeric double
character string
string string
binary binary
raw binary
logical boolean
POSIXct timestamp
POSIXlt timestamp
Date date
array array
list array
env map

Structured Streaming

SparkR 支持 Structured Streaming API (测试阶段). Structured Streaming 是一个 构建于SparkSQL引擎之上的易拓展、可容错的流式处理引擎. 更多信息请参考 R API Structured Streaming Programming Guide

R 函数名冲突

当在R中加载或引入(attach)一个新package时, 可能会发生函数名冲突,一个函数掩盖了另一个函数

下列函数是被SparkR所掩盖的:

被掩盖函数 如何获取
cov in package:stats
stats::cov(x, y = NULL, use = "everything",
           method = c("pearson", "kendall", "spearman"))
filter in package:stats
stats::filter(x, filter, method = c("convolution", "recursive"),
              sides = 2, circular = FALSE, init)
sample in package:base base::sample(x, size, replace = FALSE, prob = NULL)

由于SparkR的一部分是在dplyr软件包上建模的,因此SparkR中的某些函数与dplyr中同名. 根据两个包的加载顺序, 后加载的包会掩盖先加载的包的部分函数. 在这种情况下, 可以在函数名前指定包名前缀, 例如: SparkR::cume_dist(x) or dplyr::cume_dist(x).

你可以在 R 中使用search()检查搜索路径

迁移指南

SparkR 1.5.x 升级至 1.6.x

  • 在Spark 1.6.0 之前, 写入模式默认值为 append. 在 Spark 1.6.0 改为 error 匹配 Scala API.
  • SparkSQL 将R 中的 NA 转换为 null,反之亦然.

SparkR 1.6.x 升级至 2.0

  • table 方法已经移除并替换为 tableToDF.
  • 类 DataFrame 已改名为 SparkDataFrame 避免名称冲突.
  • Spark的 SQLContext 和 HiveContext 已经过时并替换为 SparkSession. 相应的摒弃 sparkR.init()而通过调用 sparkR.session() 来实例化SparkSession. 一旦实例化完成, 当前的SparkSession即可用于SparkDataFrame 操作(注释:spark2.0开始所有的driver实例通过sparkSession来进行构建).
  • sparkR.session 不支持 sparkExecutorEnv 参数.要为executors设置环境,请使用前缀”spark.executorEnv.VAR_NAME”设置Spark配置属性,例如”spark.executorEnv.PATH”, -sqlContext 不再需要下列函数: createDataFrameas.DataFrameread.jsonjsonFileread.parquetparquetFileread.textsqltablestableNamescacheTableuncacheTableclearCachedropTempTableread.dfloadDFcreateExternalTable.
  • registerTempTable 方法已经过期并且替换为createOrReplaceTempView.
  • dropTempTable 方法已经过期并且替换为 dropTempView.
  • sc SparkContext 参数不再需要下列函数: setJobGroupclearJobGroupcancelJobGroup

升级至 SparkR 2.1.0

  • join 不再执行笛卡尔积计算, 使用 crossJoin 来进行笛卡尔积计算.

升级至 SparkR 2.2.0

  • createDataFrame 和 as.DataFrame 添加numPartitions参数. 数据分割时, 分区位置计算已经与scala计算相一致.
  • 方法 createExternalTable 已经过期并且替换为createTable. 可以调用这两种方法来创建外部或托管表. 已经添加额外的 catalog 方法.
  • 默认情况下,derby.log现在已保存到tempdir()目录中. 当实例化SparkSession且选项enableHiveSupport 为TRUE,会创建derby.log .
  • 更正spark.lda 错误设置优化器的bug.
  • 更新模型概况输出 coefficients as matrix. 更新的模型概况包括 spark.logitspark.kmeansspark.glmspark.gaussianMixture 的模型概况已经添加对数概度(log-likelihood) loglik.

 

我们一直在努力

 

apachecn/spark-doc-zh

   云计算之嫣然伊笑

 

原文地址: http://spark.apachecn.org/docs/cn/2.2.0/sparkr.html
网页地址: http://spark.apachecn.org/
github: https://github.com/apachecn/spark-doc-zh(觉得不错麻烦给个 Star,谢谢!~)

展开阅读全文
打赏
1
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
1
分享
返回顶部
顶部