文档章节

常用算子补充

花生-瓜子
 花生-瓜子
发布于 2017/01/09 10:37
字数 567
阅读 3
收藏 0

MapPartition

  • MapPartions()一次处理一个分区的所有数据,而map()算子一次处理分区中的一条数据。
  • 所以MapPartitions处理数据的速度比map快。
  • 如果RDD分区的数据很庞大,而mapPartiton很容易造成内存溢出。
  • 如果RDD分区的数据相对较小,为提高数据考虑,可以用MapPartition处理。

MapPartitionWithIndex

这个方法两个参数,一个是分区索引,一个是分区所有数据的集合 返回值是Iterator<T>的集合

ScalaDemo

package yxy

import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

/**
  * Created by yxy on 1/9/17.
  */
object addTransfomationdemo {
  def main(args: Array[String]): Unit = {
	mapPartitionsDemo()
  }

def mapPartitionWithIndexDemo(): Unit ={
val conf=new SparkConf().setAppName("Demo").setMaster("local")
val sc=new SparkContext(conf)
val dataRDD=sc.parallelize(Array("zhang","zhao","you"),3)//3是分区的 意思
val result2=dataRDD.mapPartitionsWithIndex((index,datas)=>{
  //两个参数,一个是分区索引,一个是分区所有数据的集合
  //返回值是Iterator<T>的集合
  val array=new ArrayBuffer[String]()
  while(datas.hasNext){
    val info=(index+1)+"fenqu de shuju you:"+datas.next()
    array+=info
  }
  array.toIterator
})
  .foreach(println(_))
}

def mapPartitionsDemo(): Unit ={
val conf=new SparkConf().setAppName("Demo").setMaster("local")
val sc=new SparkContext(conf)
val dataRDD=sc.parallelize(Array(1,2,3,4,5))
val results=dataRDD.mapPartitions(m=>{
 /* val lists=List[Int]()
  var i=0
  while (m.hasNext){
   i+=m.next()
  }
  lists.::(i).iterator*/

  val array=new ArrayBuffer[Int]()
  while (m.hasNext){
array+=m.next()
  }
  array.toIterator
  //注意MapPartitions()算子的返回值
})
  .collect()

for(i<-results){
  println(i)
}
  }
}

【注意这里的返回值类型!】

JAVA 实现

/**
 * [@Title](https://my.oschina.net/w2e): Demo01.java
 * [@Author](https://my.oschina.net/arthor):youxiangyang
 * [@Date](https://my.oschina.net/u/2504391):上午9:20:51
 */
package sparkcore;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;

/**
 * [@author](https://my.oschina.net/arthor) AURA
 *
 */
public class Demo01 {
/**
 * 
 */
public static void main(String[] args) {
	mapPartitionDemo();
	//mapPartitionWithIndexDemo();
	
}
private static void mapPartitionWithIndexDemo() {
	SparkConf conf=new SparkConf().setAppName("Demo").setMaster("local");
	JavaSparkContext sContext=new JavaSparkContext(conf);
	List<String> asList = Arrays.asList("zhang","wang","li");
	JavaRDD<String> parallelize = sContext.parallelize(asList,2);
	JavaRDD<String> mapPartitionsWithIndex = parallelize.mapPartitionsWithIndex(new Function2<Integer, Iterator<String>, Iterator<String>>() {

		/**
		 * 
		 */
		private static final long serialVersionUID = 1L;

		public Iterator<String> call(Integer arg0, Iterator<String> datas)
				throws Exception {
			ArrayList<String> arrayList = new ArrayList<String>();
			while (datas.hasNext()) {
				String string = (String) datas.next();
				arrayList.add(string);
			}
			return arrayList.iterator();
		}
	},true);
	for (String string : mapPartitionsWithIndex.collect()) {
		System.out.println(string);
	}
	sContext.close();
}
private static void mapPartitionDemo() {
	SparkConf conf=new SparkConf().setAppName("par").setMaster("local");
	JavaSparkContext sContext=new JavaSparkContext(conf);
	List<Integer> list = Arrays.asList(1,2,3,4,5);
	JavaRDD<Integer> parallelize = sContext.parallelize(list);
	JavaRDD<Integer> mapPartitions = parallelize.mapPartitions(new FlatMapFunction<Iterator<Integer>,Integer>() {

		/**
		 * 
		 */
		private static final long serialVersionUID = 1L;

		public Iterable<Integer> call(Iterator<Integer> dates) throws Exception {
			// TODO Auto-generated method stub
			ArrayList<Integer> list=new ArrayList<Integer>();
			while (dates.hasNext()) {
				list.add(dates.next());
				//mapPartition()一次处理一个分区的所有数据
				//map()处理的额是分区中的一条数据
			}
			return list;
		}
	});
	for (Integer integer :mapPartitions.collect()) {
		System.out.println(integer);
	}
	sContext.close();
}
}

© 著作权归作者所有

花生-瓜子
粉丝 0
博文 2
码字总数 2145
作品 0
大同
程序员
私信 提问
Spark常用的算子以及Scala函数总结

Spark与Scala 首先,介绍一下scala语言: Scala 是一种把面向对象和函数式编程理念加入到静态类型语言中的混血儿。 为什么学scala? spark提供了R、Python等语言的接口,为什么还要重新学一门...

流川枫AI
2017/12/24
0
0
图像的空域增强处理—空域滤波(matlab实现)

在这里我描述的是我一个初学者所收集到的一些较实用化的信息,具体原理不做描述; 空域滤波分为:平滑滤波(低通滤波)、锐化滤波(高通滤波); 平滑滤波(低通滤波):过滤掉图像中的高频部...

qwe331822775
2018/03/27
0
0
spark的sparkUI如何解读?

spark的sparkUI如何解读? 以spark2.1.4来做例子 Job - schedule mode 进入之后默认是进入spark job 页面 这个说明有很详细的解释,spark有两种操作算子:转换算子(transformation)和执行算...

王二狗子11
2018/01/07
0
0
FPGA设计——图像处理(Sobel边缘检测)

1. 概述 本设计采用FPGA技术,实现CMOS视频图像的边缘检测(sobel),并通过以太网传输(UDP方式)给PC实时显示。 2. 硬件系统框图 CMOS采用MT9V011(30万像素),FPGA采用ALTERA公司的CYCLONE I...

shugenyin
2017/10/28
0
0
使用Matlab对二值图像进行轮廓提取

转自:http://blog.csdn.net/q1302182594/article/details/50394576 本文主要总结一下在matlab中可用于进行轮廓提取的函数。 1 bwperim 根据参考资料[2]的提示,可以使用bwperim()函数进行轮...

u013066730
2017/02/22
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Blockstack-2 :Blockstack ID注册

本篇文章主要记录Blockstack ID注册的流程; 在介绍注册流程之前,先简单的介绍一下Blockstack ID; 相对于传统互联网来说,Blockstack ID更像是统一的账号系统;即一个账号即可登录和授权所...

Riverzhou
今天
19
0
面试官问:平时碰到系统CPU飙高和频繁GC,你会怎么排查?

处理过线上问题的同学基本上都会遇到系统突然运行缓慢,CPU 100%,以及Full GC次数过多的问题。当然,这些问题的最终导致的直观现象就是系统运行缓慢,并且有大量的报警。本文主要针对系统运...

Java高级架构师n
今天
33
0
面向对象编程

1、类和对象 类是对象的蓝图和模板,而对象是实例;即对象是具体的实例,类是一个抽象的模板 当我们把一大堆拥有共同特征的对象的静态特征(属性)和动态特征(行为)都抽取出来后,就可以定...

huijue
今天
30
0
redis异常解决 :idea启动本地redis出现 jedis.exceptions.JedisDataException: NOAUTH Authentication required

第一次安装在本地redis服务,试试跑项目,结果却出现nested exception is redis.clients.jedis.exceptions.JedisDataException: NOAUTH Authentication required错误,真是让人头疼 先检查一...

青慕
今天
42
0
Spring 之 IoC 源码分析 (基于注解方式)

一、 IoC 理论 IoC 全称为 Inversion of Control,翻译为 “控制反转”,它还有一个别名为 DI(Dependency Injection),即依赖注入。 二、IoC方式 Spring为IoC提供了2种方式,一种是基于xml...

星爵22
今天
37
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部