spark使用zipWithIndex和zipWithUniqueId为rdd中每条数据添加索引数据
spark使用zipWithIndex和zipWithUniqueId为rdd中每条数据添加索引数据
闵开慧 发表于1个月前
spark使用zipWithIndex和zipWithUniqueId为rdd中每条数据添加索引数据
  • 发表于 1个月前
  • 阅读 7
  • 收藏 0
  • 点赞 0
  • 评论 0

腾讯云 十分钟定制你的第一个小程序>>>   

摘要: spark使用zipWithIndex和zipWithUniqueId为rdd中每条数据添加索引数据

spark的rdd中数据需要添加自增主键,然后将数据存入数据库,使用map来添加有的情况是可以的,有的情况是不可以的,所以需要使用以下两种中的其中一种来进行添加。

zipWithIndex

def zipWithIndex(): RDD[(T, Long)]

该函数将RDD中的元素和这个元素在RDD中的ID(索引号)组合成键/值对。

  1. scala> var rdd2 = sc.makeRDD(Seq("A","B","R","D","F"),2)
  2. rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[34] at makeRDD at :21
  3.  
  4. scala> rdd2.zipWithIndex().collect
  5. res27: Array[(String, Long)] = Array((A,0), (B,1), (R,2), (D,3), (F,4))
  6.  

zipWithUniqueId

def zipWithUniqueId(): RDD[(T, Long)]

该函数将RDD中元素和一个唯一ID组合成键/值对,该唯一ID生成算法如下:

每个分区中第一个元素的唯一ID值为:该分区索引号,

每个分区中第N个元素的唯一ID值为:(前一个元素的唯一ID值) + (该RDD总的分区数)

看下面的例子:

  1. scala> var rdd1 = sc.makeRDD(Seq("A","B","C","D","E","F"),2)
  2. rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[44] at makeRDD at :21
  3. //rdd1有两个分区,
  4.  
  5. scala> rdd1.zipWithUniqueId().collect
  6. res32: Array[(String, Long)] = Array((A,0), (B,2), (C,4), (D,1), (E,3), (F,5))
  7. //总分区数为2
  8. //第一个分区第一个元素ID为0,第二个分区第一个元素ID为1
  9. //第一个分区第二个元素ID为0+2=2,第一个分区第三个元素ID为2+2=4
  10. //第二个分区第二个元素ID为1+2=3,第二个分区第三个元素ID为3+2=5
  11.  
共有 人打赏支持
粉丝 308
博文 586
码字总数 260518
×
闵开慧
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: