文档章节

spark dataframe 全局排名优化

osenlin
 osenlin
发布于 08/08 23:45
字数 354
阅读 148
收藏 0

spark提供给我们的全局排序,默认情况下只有spark-sql提供的窗口函数,但如果窗口是整个表eg:row_number() over(order by a) 会存在严重的数据倾斜,下面我们演示了俩种方式,例2是例1的改进方式

例1:Spark-SQL形式

df = spark.createDataFrame(
[{"b": 2, "c": 4, "a": 3}, {"a": 2, "c": 6, "b": 3}, {"a": 5, "c": 2, "b": 3}, {"a": 1, "c": 3, "b": 3}])
df.show()
df.createOrReplaceTempView("temp")
resDf1=spark.sql("select *,row_number() over(order by a) rank from temp")
resDf1.show()

例2:Spark-command 形式

def f(rows):
	for row in rows:
		t = row[0].asDict()
		#排名首位是0,正常情况下是1,故这边加一
		t["id"] = long(row[1]) + 1
		yield t
# 通过zipWithIndex获取排名,但是首位是0
# ascending=False 降序
indexrdd = df.select("*").rdd.sortBy(ascending=False, numPartitions=3,keyfunc=lambda x: x.a).zipWithIndex().mapPartitions(lambda x: f(x))
print indexrdd.collect()
indexdf = spark.createDataFrame(indexrdd)
indexdf.show()
dp = spark.createDataFrame(df.rdd.sortBy(ascending=True, numPartitions=3, keyfunc=lambda x: x.c).zipWithIndex())
dp.show()

例子2的基本原理 先进行一定的采样sample,然后确定数据边界,最终将数据根据这些边界进行分区,也就是所谓的rangepartition。在计算每个分区的顺序

© 著作权归作者所有

共有 人打赏支持
osenlin
粉丝 31
博文 62
码字总数 82160
作品 0
深圳
架构师
加载中

评论(1)

我还在等你回家
转发一下
【Spark】Spark SQL, DataFrames and Datasets Guide(翻译文,持续更新)

本文主要是翻译Spark官网Spark SQL programming guide 。只能保证大概意思,尽量保证细节。英文水平有限,如果有错误的地方请指正,轻喷。目录导航在右上角 Spark SQL、DataFrames 和 Datase...

跑呀跑
09/19
0
0
Spark Core组件:RDD、DataFrame和DataSet

1. 介绍 spark生态系统中,Spark Core,包括各种Spark的各种核心组件,它们能够对内存和硬盘进行操作,或者调用CPU进行计算。 spark core定义了RDD、DataFrame和DataSet spark最初只有RDD,D...

wsc449
01/17
0
0
基于spark1.3.1的spark-sql实战-01

sqlContext总的一个过程如下图所示: SQL语句经过SqlParse解析成UnresolvedLogicalPlan; 使用analyzer结合数据数据字典(catalog)进行绑定,生成resolvedLogicalPlan; 使用optimizer对res...

stark_summer
2015/05/19
0
0
Apache Flink和Apache Spark有什么异同?它们的发展前景分别怎样?

============================= object WordCount { def main(args: Array[String]) {val env = new SparkContext("local","wordCount")val data = List("hi","how are you","hi")val dataSe......

justlpf
05/12
0
0
详细解读Spark的数据分析引擎:Spark SQL

欢迎关注大数据和人工智能技术文章发布的微信公众号:清研学堂,在这里你可以学到夜白(作者笔名)精心整理的笔记,让我们每天进步一点点,让优秀成为一种习惯! 一、spark SQL:类似于Hive,...

李金泽
03/04
0
0

没有更多内容

加载失败,请刷新页面

加载更多

js的

<%@ taglib uri="http://java.sun.com/jsp/jstl/core" prefix="c" %> <c:forEach items="${topics}" var="item" varStatus="status"> </c:forEach> 注意 c:forEach E大写 varStatus ......

踏破铁鞋无觅处
23分钟前
0
0
带你走进java集合之ConcurrentHashMap

一、概述 上一篇文章《带你走进java集合之HashMap》分析了HashMap的实现原理,重点分析了HashMap是怎么样的一种数据结构,以及如何去插入,查询,扩容等操作。相信经过上一篇文章的学习,大家...

木木匠
24分钟前
0
0
spring-boot 热加载实现替换

参考资料 1、spring-boot 热加载实现替换

哎小艾
26分钟前
0
0
kotlin使用spring mvc(二)

使用FilterRegistrationBean注册Filter 使用WebFilter配置过滤器的缺点是不可以对过滤器进行排序,但是使用FilterRegistrationBean可以设置Filter执行的顺序 编写过滤器 class CustomFilter...

weidedong
27分钟前
0
0
Qt那些事0.0.5

碰到了中文乱码问题。 虽然是自己做了件令自己都不齿的事情,但是情急之下,暂且如此:将中文硬编码进代码中。 我也想通过tr+qm翻译进行转换,但是难过的是,tr之后,找不到或者不起作用。这...

Ev4n
29分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部