文档章节

Spark Core学习日志(三)

花生-瓜子
 花生-瓜子
发布于 2017/01/09 08:24
字数 1578
阅读 6
收藏 0

2017年1月8日 16:48:57

【小结】 1.常用的Transformation和action使用方法 2.内存管理: a)RDD内存持久化:cache()和persist() b)持久化策略:MEMORY_ONLY,MEMORY_AND_DISK,MEMORY_ONLY_SER,MEMORY_ADN_DSK_SER,DISK_ONLY,MEMORY_ONLY_2.. c)如何选择:优先MEMORY_ONLY,次之是进行序列化,不推荐DISK相关策略。 3.共享变量: a)Broadcast(广播变量) b)Accumulator(累加器) 4.容错机制:分布式数据集的容错性有两种方式: 数据检查点和记录数据的更新。 Spark选择记录更新的方式。 RDD是一个有向无环图(DAG), 每一个RDD都会记住创建该数据集需要哪些操作, 跟踪记 录RDD的继承关系, 这个关系在Spark中称为LineAge(血统)。 由于创建RDD的操作是相对粗粒度的变换, 比如map、 filter、 join等, 即单一的操作应用于 许多数据元素, 而不需要存储真正的数据, 比通过网络复制数据更高效。 当一个RDD的某个分区丢失时, RDD有足够的信息记录其如何通过其他RDD进行计算的, 只需要通过其他RDD重新计算RDD 血统的依赖关系分为两种: 宽依赖、 窄依赖。 根据父RDD分区是对应一个还是多个子RDD分区来判断。  1.窄依赖, 一个父RDD分区对应一个子RDD分区;  2.宽依赖, 一个父RDD分区对应多个子RDD分区;即可。 对于窄依赖, 只需通过重新计算丢失的那一块数据来恢复, 容错成本 对于宽依赖, 当容错重算分区时, 因为父分区数据只有一部分是需要重算子分区的, 其余数 据重算就造成了冗余计算。 所以, 不同的应用有时候也需要在适当的时机设置数据检查点。 由于RDD的只读特性使得 它比常用的共享内存更容易做检查点, 具体可以使用doCheckPoint方法。 检查点( 本质是通过将RDD写入Disk做检查点) 是为了通过lineage做容错的辅助, lineage 过长会造成容错成本过高, 这样就不如在中间阶段做检查点容错, 如果之后有节点出现问题 而丢失分区, 从做检查点的RDD开始重做Lineage, 就会减少开销 5.RDD算子补充

今天下午遇到了一个相对奇怪的问题。

java.lang.NumberFormatException: empty String

这个异常的原因是我在让String类型转Double类型的时候,字符串是空的。 也就是这部分:

map(m=>{
        val price=m(1).trim
        val pr=price.toDouble
        val name=m(0).trim
        (name,pr)
      })

其实,问题的产生是因为之前的不合理判断造成的: val result1=datas.map(_.split("\t")) .filter(m=>m.length==6 && m(1)!=null && m(0)!=null) 在java中,判断一个字符串是否为空,可以用str==null或者str.isEmpty()....但是有时候,我们也会写这样的:str==” “.这里需要区分一下:

  1. 1、null表示这个字符串不指向任何的东西,如果这时候你调用它的方法,那么就会出现空指针异常。
  2. 2、""表示它指向一个长度为0的字符串,这时候调用它的方法是安全的。
  3. 3.、null不是对象,""是对象,所以null没有分配空间。

然而在这个程序中,我们在过滤的时候,如果是让其!=null。这样的结果是有一点问题的。这几天的程序中,老师给的案例中,都是按上面这样的方式进行操作的。。。额,早上想的时候,想起了一个问题 ,当那个数据是“ ”的时候,他是有内容的,被Null条件筛选不掉,然而转Double的时候,肯定会发生错误啊,但是这个“ ”作为字符串进行处理的时候。。。会发生的问题,我没遇到。

【程序】

数据表结构:农产品、价格、数据采集时间、农产品市场、省、城市

package yxy

import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by yxy on 1/8/17.
  */
//某种农产品的价格均值计算公式:
/*P AVG = (P M1 +P M2 +...+P Mn -max(P)-min(P))/(N-2)
其中,P 表示价格,Mn 表示 market,即农产品市场。P M1 表示 M1 农产品市场的该产品价格,max(P)表示价格最大值,min(P)价格最小值。*/
object work02 {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("Market").setMaster("local")
    val sc=new SparkContext(conf)
    val datas=sc.textFile("hdfs://hadoop01:9000/data/pm/market")
    val result1=datas.map(_.split("\t"))
      .filter(m=>m.length==6 && m(1)!=" " && m(0)!=" ")
     ** // .filter(m=>m.length==6 && m(1)!=null && m(0)!=null)   错误**
      .map(m=>{
        val price=m(1).trim
        val pr=price.toDouble
        val name=m(0).trim
        (name,pr)
      })
      .groupByKey()
      .map(m=>{
        var avgs=0.0
        var sump=m._2.sum
        val lens=m._2.size
        val maxp=m._2.max
        val minp=m._2.min

        if(lens>2){

          avgs=(sump-maxp-minp)/(lens-2)
        }else{
          avgs=sump/lens
        }
        (m._1,avgs)
      })
     // .foreach(m=>println(m._1+":"+m._2))
  result1.foreach(m=>{
    println(m._1+" : "+m._2)
  })

  }
}

之前写得一些代码:

要处理的需求:统计每个省份的农产品市场总数。数据表结构:农产品、价格、数据采集时间、农产品市场、省、城市

package SparkCore

import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by tg on 1/4/17.
  */
object ProductDemo {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("ProductDemo").setMaster("local")
    val sc=new SparkContext(conf)

    val lines=sc.textFile("hdfs://tgmaster:9000/in/product")

    /**
      * 统计每个省份的农产品市场总数
      */
    val result=lines.map(_.split("\t")) //分隔每行数据
      .filter(m=>m.length==6 && m(3)!=null && m(4)!=null)  //过滤,保证数据完整性
      .map(m=>{    //分别取出省份和农产品市场,形成 省份:市场 这种格式的数据。
        val province=m(4).trim
        val market=m(3).trim
        province+":"+market
      })
      .distinct()  //对 省份:市场 这种格式的数据进行去重
      .map(m=>{
        val info=m.split(":")
        val prov=info(0).trim
        val market=info(1).trim
        (prov,market)   //取出去重之后的省份与市场,形成键值对(省份,市场)
      })
      .groupByKey() //针对key值(省份)进行分组
      .map(m=>{  //对分组之后的数据进行操作,(String,Iterable<...>集合),集合大小即为市场的数量
        val prov=m._1
        val marketCount=m._2.size
        (prov,marketCount) //形成键值对(省份,市场总数)
      })

    result.foreach(item=>{
      println(item._1+"省农产品市场总数:"+item._2)
    })
  }
}

© 著作权归作者所有

上一篇: 常用算子补充
下一篇: 常用算子补充
花生-瓜子
粉丝 0
博文 2
码字总数 2145
作品 0
大同
程序员
私信 提问
Spark 的Core深入(二)

Spark 的 Core 深入(二) 标签(空格分隔): Spark的部分 一、日志清洗的优化: 1.1 日志清洗有脏数据问题 rdd.partitions.length rdd.cacherdd.count 一个分区默认一个task 分区去处理默认...

flyfish225
2018/05/08
0
0
Spark 数据分析导论-笔记

Spark Core Spark Core 实现了Spark 的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。 Spark Core 中还包含了 对弹性分布式数据集(resilient distributed dataset,简...

Java搬砖工程师
2018/12/26
28
0
Spark 学习资源收集【Updating】

(一)spark 相关安装部署、开发环境 1、Spark 伪分布式 & 全分布式 安装指南 http://my.oschina.net/leejun2005/blog/394928 2、Apache Spark探秘:三种分布式部署方式比较 http://dongxic...

大数据之路
2014/09/08
5K
0
如何在万亿级别规模的数据量上使用Spark

一、前言 Spark作为大数据计算引擎,凭借其快速、稳定、简易等特点,快速的占领了大数据计算的领域。本文主要为作者在搭建使用计算平台的过程中,对于Spark的理解,希望能给读者一些学习的思...

风火数据
2018/05/14
0
0
Spark笔记整理(三):Spark WC开发与应用部署

[TOC] Spark WordCount开发 创建的是maven工程,使用的依赖如下: spark wc之Java版本 本地执行,输出结果如下: ###spark wc之Java lambda版本 本地执行,输出结果如下: spark wc之scala版...

xpleaf
2018/04/25
0
0

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周四乱弹 —— 当你简历注水但还是找到了工作

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @花间小酌 :#今日歌曲推荐# 分享成龙的单曲《男儿当自强》。 《男儿当自强》- 成龙 手机党少年们想听歌,请使劲儿戳(这里) @hxg2016 :刚在...

小小编辑
今天
2.7K
21
靠写代码赚钱的一些门路

作者 @mezod 译者 @josephchang10 如今,通过自己的代码去赚钱变得越来越简单,不过对很多人来说依然还是很难,因为他们不知道有哪些门路。 今天给大家分享一个精彩的 GitHub 库,这个库整理...

高级农民工
昨天
3
0
用好项目管理工具,人人都可以成为项目经理

现在市面上的项目管理工具越来越多了,但是大多数都是一些协同工具或轻量项目管理工具。如果是多团队、跨部门使用或者企业级的项目管理,从管理思想到工具运用,需要适应企业的业务流程体系,...

cs平台
昨天
12
0
只需一步,在Spring Boot中统一Restful API返回值格式与统一处理异常

统一返回值 在前后端分离大行其道的今天,有一个统一的返回值格式不仅能使我们的接口看起来更漂亮,而且还可以使前端可以统一处理很多东西,避免很多问题的产生。 比较通用的返回值格式如下:...

晓月寒丶
昨天
69
0
区块链应用到供应链上的好处和实际案例

区块链可以解决供应链中的很多问题,例如记录以及追踪产品。那么使用区块链应用到各产品供应链上到底有什么好处?猎头悬赏平台解优人才网小编给大家做个简单的分享: 使用区块链的最突出的优...

猎头悬赏平台
昨天
32
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部