文档章节

pyspark访问hive数据实战

aibati2008
 aibati2008
发布于 2017/03/09 12:05
字数 1672
阅读 1679
收藏 0

之前我们部门在数据分析这边每天的日报都是直接使用hive脚本进行调用,随着APP用户行为和日志数据量的逐渐累积,跑每天的脚本运行需要花的时间越来越长,虽然进行了sql优化,但是上spark已经提上日程。

直接进行spark开发需要去学习scala,为了降低数据分析师的学习成本,决定前期先试用sparkSQL,能够让计算引擎无缝从MR切换到spark,现在主要使用pyspark访问hive数据。

以下是安装配置过程中的详细步骤:

1.安装spark

需要先安装JDK和scala,这不必多说,由于现有hadoop集群版本是采用的2.6.3,所以spark版本是下载的稳定版本spark-1.4.0-bin-hadoop2.6.tgz

我是先在一台机器上完成了Spark的部署,Master和Slave都在一台机器上。注意要配置免秘钥ssh登陆。

1.1 环境变量配置

export JAVA_HOME=/usr/jdk1.8.0_73
export HADOOP_HOME=/usr/hadoop
export HADOOP_CONF_DIR=/usr/hadoop/etc/hadoop
export SCALA_HOME=/usr/local/scala-2.11.7
export SPARK_HOME=/home/hadoop/spark_folder/spark-1.4.0-bin-hadoop2.6
export SPARK_MASTER_IP=127.0.0.1
export SPARK_MASTER_PORT=7077
export SPARK_MASTER_WEBUI_PORT=8099
 
export SPARK_WORKER_CORES=3     //每个Worker使用的CPU核数
export SPARK_WORKER_INSTANCES=1   //每个Slave中启动几个Worker实例
export SPARK_WORKER_MEMORY=10G    //每个Worker使用多大的内存
export SPARK_WORKER_WEBUI_PORT=8081 //Worker的WebUI端口号
export SPARK_EXECUTOR_CORES=1       //每个Executor使用使用的核数
export SPARK_EXECUTOR_MEMORY=1G     //每个Executor使用的内存

export HIVE_HOME=/home/hadoop/hive
export SPARK_CLASSPATH=$HIVE_HOME/lib/mysql-connector-java-5.1.31-bin.jar:$SPARK_CLASSPATH
export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$HADOOP_HOME/lib/native

1.2 配置slaves

cp slaves.template slaves
vi slaves 添加以下内容:localhost

1.3 启动master和slave

cd $SPARK_HOME/sbin/
./start-master.sh

启动日志位于 $SPARK_HOME/logs/目录,访问 http://localhost:8099,即可看到Spark的WebUI界面

执行 ./bin/spark-shell,打开Scala到Spark的连接窗口   

 

2.SparkSQL与Hive的整合

1. 拷贝$HIVE_HOME/conf/hive-site.xml和hive-log4j.properties到 $SPARK_HOME/conf/
2. 在$SPARK_HOME/conf/目录中,修改spark-env.sh,添加
export HIVE_HOME=/home/hadoop/hive
export SPARK_CLASSPATH=$HIVE_HOME/lib/mysql-connector-java-5.1.31-bin.jar:$SPARK_CLASSPATH
3. 另外也可以设置一下Spark的log4j配置文件,使得屏幕中不打印额外的INFO信息(如果不想受干扰可设置为更高):
log4j.rootCategory=WARN, console

4.进入$SPARK_HOME/bin,执行 ./spark-sql –master spark://127.0.0.1:7077 进入spark-sql CLI:

[hadoop@hadoop spark]$ bin/spark-sql --help  
Usage: ./bin/spark-sql [options] [cli option]  
CLI options:  
 -d,--define <keykey=value>          Variable subsitution to apply to hive  
                                  commands. e.g. -d A=B or --define A=B  
    --database <databasename>     Specify the database to use  
 -e <quoted-query-string>         SQL from command line  
 -f <filename>                    SQL from files  
 -h <hostname>                    connecting to Hive Server on remote host  
    --hiveconf <propertyproperty=value>   Use value for given property  
    --hivevar <keykey=value>         Variable subsitution to apply to hive  
                                  commands. e.g. --hivevar A=B  
 -i <filename>                    Initialization SQL file  
 -p <port>                        connecting to Hive Server on port number  
 -S,--silent                      Silent mode in interactive shell  
 -v,--verbose                     Verbose mode (echo executed SQL to the  
                                  console)  

 需要注意的是CLI不是使用JDBC连接,所以不能连接到ThriftServer;但可以配置conf/hive-site.xml连接到hive的metastore,然后对hive数据进行查询。下面我们接着说如何在python中连接hive数据表查询。

 

3.配置pyspark和示例代码

3.1 配置pyspark

打开/etc/profile:

        #PythonPath 将Spark中的pySpark模块增加的Python环境中

         export PYTHONPATH=/opt/spark-hadoop/python

        source /etc/profile  

执行./bin/pyspark ,打开Python到Spark的连接窗口,确认没有报错。

打开命令行窗口,输入python,Python版本为2.7.6,如图所示,注意Spark暂时不支持Python3。输入import pyspark不报错,证明开发前工作已经完成。

3.2 启动ThriftServer

启动ThriftServer,使之运行在spark集群中:

sbin/start-thriftserver.sh --master spark://localhost:7077 --executor-memory 5g  

ThriftServer可以连接多个JDBC/ODBC客户端,并相互之间可以共享数据。

 

3.3 请求示例

查看spark官方文档说明,spark1.4和2.0对于sparksql调用hive数据的API变化并不大。都是用sparkContext 。

from pyspark import SparkConf, SparkContext
from pyspark.sql import HiveContext

conf = (SparkConf()
         .setMaster("spark://127.0.0.1:7077")
         .setAppName("My app")
         .set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)
sqlContext = HiveContext(sc)
my_dataframe = sqlContext.sql("Select count(1) from logs.fmnews_dim_where")
my_dataframe.show()

返回结果:

运行以后在webUI界面看到job运行详情。

 

4.性能比较

截取了接近一个月的用户行为数据,数据大小为2G,总共接近1600w条记录。

 

为了测试不同sql需求情况下的结果,我们选取了日常运行的2类sql:

1.统计数据条数:

select count(1) from fmnews_user_log2;

2.统计用户行为:

SELECT device_id, min_time FROM
        (SELECT device_id,min(import_time) min_time FROM fmnews_user_log2
            GROUP BY device_id)a
        WHERE from_unixtime(int(substr(min_time,0,10)),'yyyy-MM-dd') = '2017-03-02';

3. 用户行为分析:

select case when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '06:00' and '07:59' then 1
            when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '08:00' and '09:59' then 2
            when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '10:00' and '11:59' then 3
            when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '12:00' and '13:59' then 4
            when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '14:00' and '15:59' then 5
            when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '16:00' and '17:59' then 6
            when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '18:00' and '19:59' then 7
            when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '20:00' and '21:59' then 8
            when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '22:00' and '23:59' then 9
            else 0 end fmnews_time_type, count(distinct device_id) device_count,count(1) click_count
       from fmcm.fmnews_user_log2
     where from_unixtime(int(substr(import_time,0,10)),'yyyy-MM-dd') = '2017-03-02'
    group by case when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '06:00' and '07:59' then 1
            when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '08:00' and '09:59' then 2
            when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '10:00' and '11:59' then 3
            when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '12:00' and '13:59' then 4
            when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '14:00' and '15:59' then 5
            when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '16:00' and '17:59' then 6
            when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '18:00' and '19:59' then 7
            when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '20:00' and '21:59' then 8
            when from_unixtime(int(substr(fmnews_time,0,10)),'HH:mm') between '22:00' and '23:59' then 9
            else 0 end;

第一条sql的执行结果对比:hive 35.013 seconds

第一条sql的执行结果对比:sparksql 1.218 seconds

 

第二条sql的执行结果对比:hive 78.101 seconds

第二条sql的执行结果对比:sparksql 8.669 seconds


 

第三条sql的执行结果对比:hive 101.228 seconds

第三条sql的执行结果对比:sparksql 14.221 seconds

可以看到,虽然没有官网吹破天的100倍性能提升,但是根据sql的复杂度来看10~30倍的效率还是可以达到的。

不过这里要注意到2个影响因子:

1. 我们数据集并没有采取全量,在数据量达到TB级别两者的差距应该会有所减小。同时sql也没有针对hive做优化。

2. spark暂时是单机(内存足够)并没有搭建集群,hive使用的hadoop集群有4台datanode。

 

 

参考内容:

http://lxw1234.com/archives/2015/06/281.htm

http://spark.apache.org/docs/1.4.0/api/python/pyspark.sql.html

http://blog.csdn.net/book_mmicky/article/details/39152727

http://stackoverflow.com/questions/36051091/query-hive-table-in-pyspark

http://kevin12.iteye.com/blog/2290821

https://my.oschina.net/zhgk/blog/417596

© 著作权归作者所有

aibati2008
粉丝 14
博文 87
码字总数 61726
作品 0
成都
技术主管
私信 提问
PySpark SQL常用语法

许多数据分析师都是用HIVE SQL跑数,这里我建议转向PySpark: PySpark的语法是从左到右串行的,便于阅读、理解和修正;SQL的语法是从内到外嵌套的,不方便维护; PySpark继承Python优美、简洁...

真依然很拉风
2018/05/14
0
0
新书《全栈数据之门》完整目录

封面 全栈数据之门 前言 自强不息,厚德载物 0x1 Linux,自由之光 0x10 Linux,你是我的眼 0x11 Linux 基础,从零开始 01 Linux 之门 02 文件操作 03 权限管理 04 软件安装 05 实战经验 ...

云戒
2017/03/21
0
0
使用pyspark模仿sqoop从oracle导数据到hive的主要功能(自动建表,分区导入,增量,解决数据换行符问题)

  最近公司开始做大数据项目,让我使用sqoop(1.6.4版本)导数据进行数据分析计算,然而当我们将所有的工作流都放到azkaban上时整个流程跑完需要花费13分钟,而其中导数据(增量)就占了4...

ZJ&Y
2018/07/31
0
0
利用PySpark 数据预处理(特征化)实战

前言 之前说要自己维护一个spark deep learning的分支,加快SDL的进度,这次终于提供了一些组件和实践,可以很大简化数据的预处理。 模型 这次实际情况是,我手头已经有个现成的模型,基于T...

祝威廉
2017/10/29
0
0
全栈数据,主要技术点

生命之源 0. 缘起 本文仅仅只是列出一些知识点,拟做为内部技术分享的点,只是对『全栈数据』技术点有一个概要性的了解。 列出的点,99%都是自己的经验,或接触、或了解、或实战的内容。 本文...

云戒
2017/06/25
0
0

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周六乱弹 —— 早上儿子问我他是怎么来的

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @凉小生 :#今日歌曲推荐# 少点戾气,愿你和这个世界温柔以待。中岛美嘉的单曲《僕が死のうと思ったのは (曾经我也想过一了百了)》 《僕が死の...

小小编辑
58分钟前
182
4
Excption与Error包结构,OOM 你遇到过哪些情况,SOF 你遇到过哪些情况

Throwable 是 Java 中所有错误与异常的超类,Throwable 包含两个子类,Error 与 Exception 。用于指示发生了异常情况。 Java 抛出的 Throwable 可以分成三种类型。 被检查异常(checked Exc...

Garphy
今天
10
0
计算机实现原理专题--二进制减法器(二)

在计算机实现原理专题--二进制减法器(一)中说明了基本原理,现准备说明如何来实现。 首先第一步255-b运算相当于对b进行按位取反,因此可将8个非门组成如下图的形式: 由于每次做减法时,我...

FAT_mt
昨天
6
0
好程序员大数据学习路线分享函数+map映射+元祖

好程序员大数据学习路线分享函数+map映射+元祖,大数据各个平台上的语言实现 hadoop 由java实现,2003年至今,三大块:数据处理,数据存储,数据计算 存储: hbase --> 数据成表 处理: hive --> 数...

好程序员官方
昨天
7
0
tabel 中含有复选框的列 数据理解

1、el-ui中实现某一列为复选框 实现多选非常简单: 手动添加一个el-table-column,设type属性为selction即可; 2、@selection-change事件:选项发生勾选状态变化时触发该事件 <el-table @sel...

everthing
昨天
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部