Spark Java RDD基本操作
博客专区 > Yulong_ 的博客 > 博客详情
Spark Java RDD基本操作
Yulong_ 发表于4个月前
Spark Java RDD基本操作
  • 发表于 4个月前
  • 阅读 9
  • 收藏 0
  • 点赞 0
  • 评论 0

腾讯云 新注册用户 域名抢购1元起>>>   

1、项目创建

关于Java:选用1.7或者1.8.为了通用性,本章内容使用1.7进行编写。
 

关于Scala:工程不需要增加scala nature,即不需Add Scala Nature。若增加在java代码中调用scala library会有异常。

关于Spark版本:使用1.6.3进行编写。

maven 依赖

<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.3</version>
</dependency>
</dependencies>

2、API测试

2.1、初始化

Spark 编程的第一步是需要创建一个JavaSparkContext对象,用来告诉 Spark 如何访问集群。在创建 JavaSparkContext之前,你需要构建一个 SparkConf对象, SparkConf 对象包含了一些你应用程序的信息。

SparkConf conf = new SparkConf().setAppName("JavaApiLearn").setMaster("local");
		@SuppressWarnings("resource")
		JavaSparkContext jsc = new JavaSparkContext(conf)

其中

setAppName方法指定了spark程序的名称。

setMaster("local")指定了spark程序运行的方式,分为如下几种。
local Run Spark locally with one worker thread (i.e. no parallelism at all).
local[K] Run Spark locally with K worker threads (ideally, set this to the number of cores on your machine).
local[*] Run Spark locally with as many worker threads as logical cores on your machine.
spark://HOST:PORT Connect to the given Spark standalone cluster master. The port must be whichever one your master is configured to use, which is 7077 by default.
mesos://HOST:PORT Connect to the given Mesos cluster. The port must be whichever one your is configured to use, which is 5050 by default. Or, for a Mesos cluster using ZooKeeper, use mesos://zk://.... To submit with --deploy-mode cluster, the HOST:PORT should be configured to connect to the MesosClusterDispatcher.
yarn Connect to a YARN cluster in client or cluster mode depending on the value of --deploy-mode. The cluster location will be found based on the HADOOP_CONF_DIR or YARN_CONF_DIR variable.
yarn-client Equivalent to yarn with --deploy-mode client, which is preferred to `yarn-client`
yarn-cluster Equivalent to yarn with --deploy-mode cluster, which is preferred to `yarn-cluster`

除了在eclipse、Intellij中运行local模式的任务,也可以打成jar包,使用spark-submit来进行任务提交。

 

2.2、Collection转化RDD

通过List来进行转化RDD

// List to RDD
		List<String> list = new ArrayList<String>();
		list.add("11,22,33,44,55");
		list.add("aa,bb,cc,dd,ee");
		JavaRDD<String> jrl = jsc.parallelize(list);

2.3、外部数据集转化RDD

2.3.1、从文件读取转化RDD

JavaRDD<String> jrf = jsc.textFile("data/README.md");
        //JavaRDD<String> jrfFromHDFS = jsc.textFile("hdfs:///data/README.md"); //from hdfs to rdd
        //JavaRDD<String> jrfFromLocal = jsc.textFile("file:///data/README.md"); //from localfile to rdd

 

2.4、PairRDD

测试代码:

List<String> list = new ArrayList<String>();
		list.add("11 22,33,44,55");
		list.add("aa bb,cc,dd,ee");
		list.add("aa bb,cc,dd,ee");
		JavaRDD<String> jRDD = jsc.parallelize(list,1);
		
		JavaPairRDD<String, String> jPRDD = jRDD.mapToPair(new PairFunction<String, String, String>() {
			public Tuple2<String, String> call(String s) throws Exception {
				return new Tuple2<String, String>(s.split("\\s+")[0], s.substring(s.indexOf(" ")+1));
			}
		});
		
		PrintUtilPro.printList(jPRDD.collect());

输出:

11,22,33,44,55
aa,bb,cc,dd,ee
aa,bb,cc,dd,ee

备注:输出中第一个逗号是tuple中key和value分隔符。

2.5、RDD操作

关于Transformation和Actions的操作解释可以参照培训手册。

Transformations

The following table lists some of the common transformations supported by Spark. Refer to the RDD API doc (ScalaJavaPythonR) and pair RDD functions doc (ScalaJava) for details.

Transformation Meaning
map(func)

Return a new distributed dataset formed by passing each element of the source through a function func.

将原来RDD的每个数据项,使用map中用户自定义的函数func进行映射,转变为一个新的元素,并返回一个新的RDD。

filter(func)

Return a new dataset formed by selecting those elements of the source on which funcreturns true.

使用函数func对原RDD中数据项进行过滤,将符合func中条件的数据项组成新的RDD返回。

flatMap(func)

Similar to map, but each input item can be mapped to 0 or more output items (so funcshould return a Seq rather than a single item).

类似于map,但是输入数据项可以被映射到0个或多个输出数据集合中,所以函数func的返回值是一个数据项集合而不是一个单一的数据项。

mapPartitions(func)

Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.

类似于map,但是该操作是在每个分区上分别执行,所以当操作一个类型为T的RDD时func的格式必须是Iterator<T> => Iterator<U>。即mapPartitions需要获取到每个分区的迭代器,在函数中通过这个分区的迭代器对整个分区的元素进行操作。

mapPartitionsWithIndex(func)

Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.

类似于mapPartitions,但是需要提供给func一个整型值,这个整型值是分区的索引,所以当处理T类型的RDD时,func的格式必须为(Int, Iterator<T>) => Iterator<U>

sample(withReplacementfractionseed)

Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.

对数据采样。用户可以设定是否有放回(withReplacement)、采样的百分比(fraction)、随机种子(seed)。

union(otherDataset)

Return a new dataset that contains the union of the elements in the source dataset and the argument.

返回原数据集和参数指定的数据集合并后的数据集。使用union函数时需要保证两个RDD元素的数据类型相同,返回的RDD数据类型和被合并的RDD元素数据类型相同。该操作不进行去重操作,返回的结果会保存所有元素。如果想去重,可以使用distinct()。

intersection(otherDataset)

Return a new RDD that contains the intersection of elements in the source dataset and the argument.

返回两个数据集的交集。

subtract(otherDataset)

Return an RDD with the elements from `this` that are not in `other`.

返回this RDD中但不在other RDD中的元素

distinct([numTasks]))

Return a new dataset that contains the distinct elements of the source dataset.

将RDD中的元素进行去重操作。

groupByKey([numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. 
Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance. 
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.

操作(K,V)格式的数据集,返回 (K, Iterable)格式的数据集。
注意,如果分组是为了按key进行聚合操作(例如,计算sum、average),此时使用reduceByKeyaggregateByKey计算效率会更高。
注意,默认情况下,并行情况取决于父RDD的分区数,但可以通过参数numTasks来设置任务数。

reduceByKey(func, [numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

使用给定的func,将(K,V)对格式的数据集中key相同的值进行聚集,其中func的格式必须为(V,V) => V。可选参数numTasks可以指定reduce任务的数目。

aggregateByKey(zeroValue)(seqOpcombOp, [numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.

对(K,V)格式的数据按key进行聚合操作,聚合时使用给定的合并函数和一个初试值,返回一个(K,U)对格式数据。需要指定的三个参数:zeroValue为在每个分区中,对key值第一次读取V类型的值时,使用的U类型的初始变量;seqOp用于在每个分区中,相同的key中V类型的值合并到zeroValue创建的U类型的变量中。combOp是对重新分区后两个分区中传入的U类型数据的合并函数。

sortByKey([ascending], [numTasks])

When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.

(K,V)格式的数据集,其中K已实现了Ordered,经过sortByKey操作返回排序后的数据集。指定布尔值参数ascending来指定升序或降序排列。

join(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin,rightOuterJoin, and fullOuterJoin.

用于操作两个键值对格式的数据集,操作两个数据集(K,V)和(K,W)返回(K, (V, W))格式的数据集。通过leftOuterJoinrightOuterJoinfullOuterJoin完成外连接操作。

cogroup(otherDataset, [numTasks])

When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith.

用于操作两个键值对格式数据集(K,V)和(K,W),返回数据集格式为 (K,(Iterable, Iterable)) 。这个操作也称为groupWith。对在两个RDD中的Key-Value类型的元素,每个RDD相同Key的元素分别聚合为一个集合,并且返回两个RDD中对应Key的元素集合的迭代器。

cartesian(otherDataset)

When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).

对类型为T和U的两个数据集进行操作,返回包含两个数据集所有元素对的(T,U)格式的数据集。即对两个RDD内的所有元素进行笛卡尔积操作。

pipe(command[envVars])

Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.

以管道(pipe)方式将 RDD的各个分区(partition)使用 shell命令处理(比如一个 Perl或 bash脚本)。 RDD的元素会被写入进程的标准输入(stdin),将进程返回的一个字符串型 RDD(RDD of strings),以一行文本的形式写入进程的标准输出(stdout)中。

coalesce(numPartitions)

Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.

把RDD的分区数降低到通过参数numPartitions指定的值。在得到的更大一些数据集上执行操作,会更加高效。

repartition(numPartitions)

Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.

随机地对RDD的数据重新洗牌(Reshuffle),从而创建更多或更少的分区,以平衡数据。总是对网络上的所有数据进行洗牌(shuffles)。

 

repartitionAndSortWithinPartitions(partitioner)

Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

根据给定的分区器对RDD进行重新分区,在每个结果分区中,按照key值对记录排序。这在每个分区中比先调用repartition再排序效率更高,因为它可以将排序过程在shuffle操作的机器上进行。

Actions

The following table lists some of the common actions supported by Spark. Refer to the RDD API doc (ScalaJavaPythonR)

and pair RDD functions doc (ScalaJava) for details.

Action Meaning
reduce(func)

Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.

使用函数func聚集数据集中的元素,这个函数func输入为两个元素,返回为一个元素。这个函数应该符合结合律和交换了,这样才能保证数据集中各个元素计算的正确性。

collect()

Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.

在驱动程序中,以数组的形式返回数据集的所有元素。通常用于filter或其它产生了大量小数据集的情况。

count()

Return the number of elements in the dataset.

返回数据集中元素的个数。

first()

Return the first element of the dataset (similar to take(1)).

返回数据集中的第一个元素(类似于take(1))。

take(n)

Return an array with the first n elements of the dataset.

返回数据集中的前n个元素。

takeSample(withReplacement,num, [seed])

Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.

对一个数据集随机抽样,返回一个包含num个随机抽样元素的数组,参数withReplacement指定是否有放回抽样,参数seed指定生成随机数的种子。

takeOrdered(n[ordering])

Return the first n elements of the RDD using either their natural order or a custom comparator.

返回RDD按自然顺序或自定义顺序排序后的前n个元素。

saveAsTextFile(path)

Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.

将数据集中的元素以文本文件(或文本文件集合)的形式保存到指定的本地文件系统、HDFS或其它Hadoop支持的文件系统中。Spark将在每个元素上调用toString方法,将数据元素转换为文本文件中的一行记录。

saveAsSequenceFile(path
(Java and Scala)

Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).

将数据集中的元素以Hadoop Sequence文件的形式保存到指定的本地文件系统、HDFS或其它Hadoop支持的文件系统中。该操作只支持对实现了Hadoop的Writable接口的键值对RDD进行操作。在Scala中,还支持隐式转换为Writable的类型(Spark包括了基本类型的转换,例如Int、Double、String等等)。

saveAsObjectFile(path
(Java and Scala)

Write the elements of the dataset in a simple format using Java serialization, which can then be loaded usingSparkContext.objectFile().

将数据集中的元素以简单的Java序列化的格式写入指定的路径。这些保存该数据的文件,可以使用SparkContext.objectFile()进行加载。

countByKey()

Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.

仅支持对(K,V)格式的键值对类型的RDD进行操作。返回(K,Int)格式的Hashmap,(K,Int)为每个key值对应的记录数目。

foreach(func)

Run a function func on each element of the dataset. This is usually done for side effects such as updating anAccumulator or interacting with external storage systems. 
Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

对数据集中每个元素使用函数func进行处理。该操作通常用于更新一个累加器(Accumulator)或与外部数据源进行交互。注意:在foreach()之外修改累加器变量可能引起不确定的后果。详细介绍请阅读Understanding closures部分。

 

Value数据类型的Transformation算子

类型 算子
输入分区与输出分区一对一型 map、flatMap、mapPartitions
输入分区与输出分区多对一型 union、cartesian、intersection
输入分区与输出分区多对多型 groupBy、groupByKey
输出分区为输入分区子集型 filter、distinct、subtract、sample、takeSample
Cache型 cache、persist

 

Key-Value数据类型的Transfromation算子

类型 算子
输入分区与输出分区一对一 mapValues
对单个RDD

combineByKey、reduceByKey、partitionBy、aggregateByKey、SortByKey

两个RDD聚集 Cogroup
连接 join、leftOutJoin、rightOutJoin

 

Action算子

类型 算子
无输出 foreach
HDFS saveAsTextFile、saveAsObjectFile
Scala集合和数据类型 collect、collectAsMap、reduceByKeyLocally、lookup、count、top、reduce、fold、aggregate

2.6、传递函数

2.6.1、Function< T,R>

/*
 * Function<T,R>
 * 接收一个输入值并返回一个输出值,用于类似map()和filter()的操作中
 * R call(T)
 */
//过滤RDD数据集中包含result的表项,新建RDD数据集resultLines
JavaRDD<String> resultLines=lines.filter(
        new Function<String, Boolean>() {
            public Boolean call(String v1)throws Exception {
                return v1.contains("result");
            }
        }
);

2.6.2、Function< T1,T2,R>

/*
 * Function<T1,T2,R>
 * 接收两个输入值并返回一个输出值,用于类似aggregate()和fold()等操作中
 * R call(T1,T2)
 */


List<String> strLine=new ArrayList<String>();
strLine.add("hello world");
strLine.add("This is Spark");
strLine.add("This is JavaRDD")
JavaRDD<String> input=sc.parallelize(strLine);


//以下代码的功能是wordcount,其中的reduceByKey操作的Function2函数定义了遇到相同的key时,value是如何reduce的————直接将两者的value相加。
//将文本行的单词过滤出来,并将所有的单词保存在RDD数据集words中。切分为单词,扁平化处理。见FlatMapFunction< T,R>
JavaRDD<String> words=input.flatMap(
        new FlatMapFunction<String, String>() {
            public Iterable<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" "));
            }
        }
);
//转化为键值对
JavaPairRDD<String,Integer> counts=words.mapToPair(
        new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2(s, 1);
            }
        }
);
//对每个词语进行计数
JavaPairRDD <String,Integer> results=counts.reduceByKey(
        new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        }
) ;

2.6.3、FlatMapFunction<T,R>

/*
 * FlatMapFunction<T,R>
 * 接收一个输入值并返回任意个输出,用于类似flatMap()这样的操作中
 * Iterable <R> call(T)
 */
List<String> strLine=new ArrayList<String>();
strLine.add("hello world");
strLine.add("This is Spark");
strLine.add("This is JavaRDD")
JavaRDD<String> input=sc.parallelize(strLine);
//将文本行的单词过滤出来,并将所有的单词保存在RDD数据集words中。
JavaRDD<String> words=input.flatMap(
        new FlatMapFunction<String, String>() {
            public Iterable<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" "));
            }
        }
);

2.6.4、PairFunction<T, K, V>

/*
 * PairFunction<T,K,R>
 * 接收一个输入值并返回一个Tuple,用于类似mapToPair()这样的操作中,将一个元素变为一个键值对(PairRDD)
 * Tuple2<K, V> call(T)
 */
//转化为键值对
JavaPairRDD<String,Integer> counts=words.mapToPair(
        new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2(s, 1);
            }
        }
);

 

共有 人打赏支持
粉丝 9
博文 79
码字总数 169741
×
Yulong_
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: