准备工作: 创建hive分区表
create table login_origin(
created_date string,
event_id string,
dev_mac string,
domain string,
session_id string,
dev_name string,
country string,
language string,
ip string
) PARTITIONED BY (day string, dev_platform string)
row format delimited fields terminated by '\t'
stored as parquet;
方法一: 写数据到数据所在位置
1.写数据至数据所在位置, 例如本例, 位置为hdfs://c1:8020/user/hive/warehouse/collectlog.db/login_origin
//df为DataFrame
df.write.mode(SaveMode.Overwrite).format("parquet")
.partitionBy("day" , "dev_platform" ).save(outputPath)
- 寻找刚刚数据新建的分区, 并添加hive分区
val hivePartitions: util.List[Row] = df.select($"day", $"dev_platform").distinct().collectAsList()
hivePartitions.toArray().foreach(r => {
val Row(day, dev_platform) = r
spark.sql( s"ALTER TABLE collectlog.login_origin ADD IF NOT EXISTS PARTITION (day=$day, dev_platform=$dev_platform)")
})
方法二: 自动分区
spark.sql(" set hive.exec.dynamic.partition=true")
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
spark.sql("insert overwrite table collectlog.domain_login partition(day) select * from domain_login_tmp")
