常用算子补充

原创
2017/01/09 10:37
阅读数 70

MapPartition

  • MapPartions()一次处理一个分区的所有数据,而map()算子一次处理分区中的一条数据。
  • 所以MapPartitions处理数据的速度比map快。
  • 如果RDD分区的数据很庞大,而mapPartiton很容易造成内存溢出。
  • 如果RDD分区的数据相对较小,为提高数据考虑,可以用MapPartition处理。

MapPartitionWithIndex

这个方法两个参数,一个是分区索引,一个是分区所有数据的集合 返回值是Iterator<T>的集合

ScalaDemo

package yxy

import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

/**
  * Created by yxy on 1/9/17.
  */
object addTransfomationdemo {
  def main(args: Array[String]): Unit = {
	mapPartitionsDemo()
  }

def mapPartitionWithIndexDemo(): Unit ={
val conf=new SparkConf().setAppName("Demo").setMaster("local")
val sc=new SparkContext(conf)
val dataRDD=sc.parallelize(Array("zhang","zhao","you"),3)//3是分区的 意思
val result2=dataRDD.mapPartitionsWithIndex((index,datas)=>{
  //两个参数,一个是分区索引,一个是分区所有数据的集合
  //返回值是Iterator<T>的集合
  val array=new ArrayBuffer[String]()
  while(datas.hasNext){
    val info=(index+1)+"fenqu de shuju you:"+datas.next()
    array+=info
  }
  array.toIterator
})
  .foreach(println(_))
}

def mapPartitionsDemo(): Unit ={
val conf=new SparkConf().setAppName("Demo").setMaster("local")
val sc=new SparkContext(conf)
val dataRDD=sc.parallelize(Array(1,2,3,4,5))
val results=dataRDD.mapPartitions(m=>{
 /* val lists=List[Int]()
  var i=0
  while (m.hasNext){
   i+=m.next()
  }
  lists.::(i).iterator*/

  val array=new ArrayBuffer[Int]()
  while (m.hasNext){
array+=m.next()
  }
  array.toIterator
  //注意MapPartitions()算子的返回值
})
  .collect()

for(i<-results){
  println(i)
}
  }
}

【注意这里的返回值类型!】

JAVA 实现

/**
 * [@Title](https://my.oschina.net/w2e): Demo01.java
 * [@Author](https://my.oschina.net/arthor):youxiangyang
 * [@Date](https://my.oschina.net/u/2504391):上午9:20:51
 */
package sparkcore;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;

/**
 * [@author](https://my.oschina.net/arthor) AURA
 *
 */
public class Demo01 {
/**
 * 
 */
public static void main(String[] args) {
	mapPartitionDemo();
	//mapPartitionWithIndexDemo();
	
}
private static void mapPartitionWithIndexDemo() {
	SparkConf conf=new SparkConf().setAppName("Demo").setMaster("local");
	JavaSparkContext sContext=new JavaSparkContext(conf);
	List<String> asList = Arrays.asList("zhang","wang","li");
	JavaRDD<String> parallelize = sContext.parallelize(asList,2);
	JavaRDD<String> mapPartitionsWithIndex = parallelize.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {

		/**
		 * 
		 */
		private static final long serialVersionUID = 1L;

		public Iterator<String> call(Integer arg0, Iterator<String> datas)
				throws Exception {
			ArrayList<String> arrayList = new ArrayList<String>();
			while (datas.hasNext()) {
				String string = (String) datas.next();
				arrayList.add(string);
			}
			return arrayList.iterator();
		}
	},true);
	for (String string : mapPartitionsWithIndex.collect()) {
		System.out.println(string);
	}
	sContext.close();
}
private static void mapPartitionDemo() {
	SparkConf conf=new SparkConf().setAppName("par").setMaster("local");
	JavaSparkContext sContext=new JavaSparkContext(conf);
	List<Integer> list = Arrays.asList(1,2,3,4,5);
	JavaRDD<Integer> parallelize = sContext.parallelize(list);
	JavaRDD<Integer> mapPartitions = parallelize.mapPartitions(new FlatMapFunction<Iterator<Integer>,Integer>() {

		/**
		 * 
		 */
		private static final long serialVersionUID = 1L;

		public Iterable<Integer> call(Iterator<Integer> dates) throws Exception {
			// TODO Auto-generated method stub
			ArrayList<Integer> list=new ArrayList<Integer>();
			while (dates.hasNext()) {
				list.add(dates.next());
				//mapPartition()一次处理一个分区的所有数据
				//map()处理的额是分区中的一条数据
			}
			return list;
		}
	});
	for (Integer integer :mapPartitions.collect()) {
		System.out.println(integer);
	}
	sContext.close();
}
}
展开阅读全文
打赏
0
0 收藏
分享

作者的其它热门文章

加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部