文档章节

如何使用Spark的local模式远程读取Hadoop集群数据

九劫散仙
 九劫散仙
发布于 2017/03/31 11:55
字数 818
阅读 57
收藏 1

我们在windows开发机上使用spark的local模式读取远程hadoop集群中的hdfs上的数据,这样的目的是方便快速调试,而不用每写一行代码或者一个方法,一个类文件都需要打包成jar上传到linux上,再扔到正式的集群上进行测试,像功能性验证直接使用local模式来快速调测是非常方便的,当然功能测试之后,我们还需要打包成jar仍到集群上进行其他的验证比如jar包的依赖问题,这个在local模式是没法测的,还有集群运行的调优参数,这些都可以在正式仍到集群时验证。

一个样例代码如下:

  def main(args: Array[String]): Unit = {

    //指定local模式
    val conf = new SparkConf().setMaster("local[2]").setAppName("read kp data to kafka")
    val sc= new SparkContext(conf)
    //支持通配符路径,支持压缩文件读取
    val rrd=sc.textFile("hdfs://192.168.10.4:8020/data/log/{20170227,20170228}/tomcat-log*")
    //提到到集群模式时,去掉uri地址,如果有双namenode,可以自动容灾
     //val rrd=sc.textFile("/data/log/{20170227,20170228}/tomcat-log*")
    //统计数量
    println(rrd.count())
    //停止spark
    sc.stop()

  }

如何在spark中遍历数据时获取文件路径:

    val path:String="hdfs://192.168.10.4:8020/data/userlog/{20170226}/kp*"
    
    val text= sc.newAPIHadoopFile[LongWritable,Text,TextInputFormat](path)

    val linesWithFileNames = text.asInstanceOf[NewHadoopRDD[LongWritable, Text]]
      .mapPartitionsWithInputSplit((inputSplit, iterator) => {
        val file = inputSplit.asInstanceOf[FileSplit]
        iterator.map(tup => (file.getPath, tup._2)) // 返回的K=全路径 V=每一行的值
      }
      )

    linesWithFileNames.foreach(println)

如果遍历压缩文件时想要获取文件名,就使用newAPIHadoopFile,此外在本地调试下通过之后,提交到集群运行的时候,一定要把uri去掉,本地加上是想让它远程读取方便调试使用,如果正式运行去掉uri在双namenode的时候可以自动兼容,不去反而成一个隐患了。

最后我们可以通过spark on yarn模式提交任务,一个例子如下:


jars=`echo /home/search/x_spark_job/libs/*jar | sed 's/ /,/g'`

bin/spark-submit  --class  KSearch   --master yarn  --jars  $jars    /home/search/x_spark_job/kp-1.0.0.jar 


这里用spark提交有另外一个优势,就是假如我开发的不是YARN应用,就是代码里没有使用SparkContext,而是一个普通的应用,就是读取mysql一个表的数据,写入另外一个mysql,这里跟MR没有关系,但是我依然可以用spark-sumbit提交,这时候是不会提交到YARN上的,但是程序会按普通程序运行,程序依赖的jar包,直接使用--jars传入就行,这一点非常方便,尤其是应用有多个依赖时,比如依赖es,hadoop,hbase,redis,fastjson,我打完包后的程序是瘦身的只有主体jar非常小,依赖的jar我可以不打到主体jar里面,在外部用的时候传入,方便共用并灵活性大大提高。

最后,spark的wholeTextFiles对gz压缩的支持不太友好,不能直接访问,相关问题,请参考:

http://stackoverflow.com/questions/24402737/how-to-read-gz-files-in-spark-using-wholetextfiles

http://stackoverflow.com/questions/36604145/read-whole-text-files-from-a-compression-in-spark

http://stackoverflow.com/questions/24402737/how-to-read-gz-files-in-spark-using-wholetextfiles?rq=1

有什么问题可以扫码关注微信公众号:我是攻城师(woshigcs),在后台留言咨询。 技术债不能欠,健康债更不能欠, 求道之路,与君同行。

输入图片说明

© 著作权归作者所有

共有 人打赏支持
九劫散仙
粉丝 266
博文 175
码字总数 189625
作品 0
海淀
私信 提问
spark单机模式 和 集群模式 安装

1.spark单机模式安装 实现步骤: 1)安装和配置好JDK 2)上传和解压Spark安装包 3)进入Spark安装目录下的conf目录 复制conf spark-env.sh.template 文件为 spark-env.sh 在其中修改,增加如...

仟昭
03/01
0
0
Spark安装启动 and 在程序中调用spark服务

1.软件准备 我的系统环境为ubuntu 13.10 1.scala-2.9.3.tgz http://www.scala-lang.org/files/archive/scala-2.9.3.tgz 由于spark是依赖scala2.9.3开发的,所以只能安装2.9.3,我试的时候一开...

zachary124
2014/01/25
0
9
配置hadoop+pyspark环境

配置hadoop+pyspark环境 1、部署hadoop环境 配置hadoop伪分布式环境,所有服务都运行在同一个节点上。 1.1、安装JDK 安装jdk使用的是二进制免编译包,下载页面 下载jdk 解压文件,配置环境变...

巴利奇
2018/10/30
0
0
大数据技术学习,大数据处理为何选择Spark,而不是Hadoop

大数据处理为何选择Spark,而不是Hadoop。 一.基础知识 1.Spark Spark是一个用来实现快速而通用的集群计算的平台。 在速度方面,Spark扩展了广泛使用的MapReduce计算模型,而且高效地支持更多...

董黎明
2018/10/20
0
0
Apache Zeppelin 中 Spark解释器

概述 Apache Spark是一种快速和通用的集群计算系统。它提供Java,Scala,Python和R中的高级API,以及支持一般执行图的优化引擎。Zeppelin支持Apache Spark,Spark解释器组由5个解释器组成。 ...

hblt-j
2018/11/22
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Map的遍历方式

import java.util.*; public class a_21 { public static void main(String[] args) { Map<Integer,Integer> map= new HashMap<Integer, Integer>(); for(int i=0;i<6;i++) ......

南桥北木
31分钟前
1
0
总结:线程间频繁切换为什么耗费资源?

因为线程切换的时候,CPU需要将此线程的所有执行状态保存起来,如线程编号,执行到的位置等,然后再去执行其它线程。

浮躁的码农
今天
4
0
PHP版本高于5.5时,curl文件上传必须使用CurlFile对象

坑了我一天,之前@的方法各种上传不成功文件。都怀疑服务端有bug了。

叫我哀木涕
今天
1
0
js算法总结

数列求和 等差数列求和 function sum(a0,d,n){//a0->首项,d->公差,n->项数//(首项+末项)*项数/2return (a0+(a0+(n-1)*d))*n/2;} 等比数列求和 function sum(a0,q,n){//a0->首项,q......

祖达
今天
4
0
小白?转型?毕业生?外行学习快速入行大数据开发指南

这篇文章中,本文将针对三种不同的、想要进入数据科学领域的人群,给出自己的经验,帮助他们迅速有效入行。 虽然没有适合每个人的万能解决方案,但这三类建议值得想转行的你一看。 第1类:新...

董黎明
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部