文档章节

spark使用zipWithIndex和zipWithUniqueId为rdd中每条数据添加索引数据

闵开慧
 闵开慧
发布于 2017/09/12 11:09
字数 416
阅读 23
收藏 0

spark的rdd中数据需要添加自增主键,然后将数据存入数据库,使用map来添加有的情况是可以的,有的情况是不可以的,所以需要使用以下两种中的其中一种来进行添加。

zipWithIndex

def zipWithIndex(): RDD[(T, Long)]

该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。

  1. scala> var rdd2 = sc.makeRDD(Seq("A","B","R","D","F"),2)
  2. rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[34] at makeRDD at :21
  3.  
  4. scala> rdd2.zipWithIndex().collect
  5. res27: Array[(String, Long)] = Array((A,0), (B,1), (R,2), (D,3), (F,4))
  6.  

zipWithUniqueId

def zipWithUniqueId(): RDD[(T, Long)]

该函数将RDD中元素和一个唯一ID组合成键/值对,该唯一ID生成算法如下:

每个分区中第一个元素的唯一ID值为:该分区索引号,

每个分区中第N个元素的唯一ID值为:(前一个元素的唯一ID值) + (该RDD总的分区数)

看下面的例子:

  1. scala> var rdd1 = sc.makeRDD(Seq("A","B","C","D","E","F"),2)
  2. rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[44] at makeRDD at :21
  3. //rdd1有两个分区,
  4.  
  5. scala> rdd1.zipWithUniqueId().collect
  6. res32: Array[(String, Long)] = Array((A,0), (B,2), (C,4), (D,1), (E,3), (F,5))
  7. //总分区数为2
  8. //第一个分区第一个元素ID为0,第二个分区第一个元素ID为1
  9. //第一个分区第二个元素ID为0+2=2,第一个分区第三个元素ID为2+2=4
  10. //第二个分区第二个元素ID为1+2=3,第二个分区第三个元素ID为3+2=5
  11.  

© 著作权归作者所有

共有 人打赏支持
闵开慧
粉丝 334
博文 607
码字总数 266601
作品 0
青浦
高级程序员
Spark基本工作原理与RDD及wordcount程序实例和原理深度剖析

RDD以及其特点 1、RDD是Spark提供的核心抽象,全称为Resillient Distributed Dataset,即弹性分布式数据集。 2、RDD在抽象上来说是一种元素集合,包含了数据。它是被分区的,分为多个分区,每...

qq1137623160
05/10
0
0
Spark 大规模机器学习官方文档 - 中文翻译

Spark官方文档 - 中文翻译 转载请注明出处:http://www.cnblogs.com/BYRans/ 1 概述(Overview) 2 引入Spark(Linking with Spark) 3 初始化Spark(Initializing Spark) 4 弹性分布式数据集(RD...

李伟铭k
07/09
0
0
Spark实战(一)SparkStreaming集成Kafka

Spark Streaming + Kafka集成指南 Kafka项目在版本0.8和0.10之间引入了一个新的消费者API,因此有两个独立的相应Spark Streaming包可用。请选择正确的包, 请注意,0.8集成与后来的0.9和0.1...

FrankDeng
07/15
0
0
Spark 入门(Python、Scala 版)

本文中,我们将首先讨论如何在本地机器上利用Spark进行简单分析。然后,将在入门级水平探索Spark,了解Spark是什么以及它如何工作(希望可以激发更多探索)。最后两节将开始通过命令行与Spa...

大数据之路
2015/05/07
0
0
Spark 的Core深入(二)

Spark 的 Core 深入(二) 标签(空格分隔): Spark的部分 一、日志清洗的优化: 1.1 日志清洗有脏数据问题 rdd.partitions.length rdd.cacherdd.count 一个分区默认一个task 分区去处理默认...

flyfish225
05/08
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Confluence 6 使用 Fail2Ban 来限制登录尝试

什么是 Fail2Ban? 我们需要在我们网站中防止密码的暴利破解。Fail2Ban 是一个 Python 的应用来查看日志文件,使用的是正则表达式,同时还可以与Shorewall (或者 iptables)直接工作来来启用...

honeymose
3分钟前
0
0
日期和时间API - 读《Java 8实战》

日期与时间 LocalDate 创建一个LocalDate对象并读取其值 // 根据年月日创建日期LocalDate date1 = LocalDate.of(2014, 3, 18);// 读取System.out.println(date1.getYear()); // 2014Sys...

yysue
3分钟前
0
0
8月15日任务

8月15日任务 Memcached命令行 • telnet 127.0.0.1 11211 • set key2 0 30 2 ab STORED get key2 VALUE key2 0 2 ab END 实例: [root@localhost 02]# telnet 127.0.0.1 11211-bash: te......

寰宇01
16分钟前
0
0
LNMP架构(Nginx访问日志、Nginx日志切割、静态文件不记录日志和过期时间)

Nginx访问日志 1.打开配置文件,搜索log_format vim /usr/local/nginx/conf/nginx.conf 2.访问日志常用变量含义 $remote_addr : 客户端IP(公网IP) $http_x_forwarded_for : 代理服务器的IP ...

蛋黄_Yolks
16分钟前
0
0
lombok 不用再写pojo的getset

java实体类不写get/set方法 1、下载地址https://projectlombok.org/download Myeclipse、eclipse安装lombok Lombok是一种Java实用工具,可以帮助开发人员消除Java的冗长,具体看lombok的官网...

木之下
23分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部