Spark Core学习日志(三)

原创
2017/01/09 08:24
阅读数 156

2017年1月8日 16:48:57

【小结】 1.常用的Transformation和action使用方法 2.内存管理: a)RDD内存持久化:cache()和persist() b)持久化策略:MEMORY_ONLY,MEMORY_AND_DISK,MEMORY_ONLY_SER,MEMORY_ADN_DSK_SER,DISK_ONLY,MEMORY_ONLY_2.. c)如何选择:优先MEMORY_ONLY,次之是进行序列化,不推荐DISK相关策略。 3.共享变量: a)Broadcast(广播变量) b)Accumulator(累加器) 4.容错机制:分布式数据集的容错性有两种方式: 数据检查点和记录数据的更新。 Spark选择记录更新的方式。 RDD是一个有向无环图(DAG), 每一个RDD都会记住创建该数据集需要哪些操作, 跟踪记 录RDD的继承关系, 这个关系在Spark中称为LineAge(血统)。 由于创建RDD的操作是相对粗粒度的变换, 比如map、 filter、 join等, 即单一的操作应用于 许多数据元素, 而不需要存储真正的数据, 比通过网络复制数据更高效。 当一个RDD的某个分区丢失时, RDD有足够的信息记录其如何通过其他RDD进行计算的, 只需要通过其他RDD重新计算RDD 血统的依赖关系分为两种: 宽依赖、 窄依赖。 根据父RDD分区是对应一个还是多个子RDD分区来判断。  1.窄依赖, 一个父RDD分区对应一个子RDD分区;  2.宽依赖, 一个父RDD分区对应多个子RDD分区;即可。 对于窄依赖, 只需通过重新计算丢失的那一块数据来恢复, 容错成本 对于宽依赖, 当容错重算分区时, 因为父分区数据只有一部分是需要重算子分区的, 其余数 据重算就造成了冗余计算。 所以, 不同的应用有时候也需要在适当的时机设置数据检查点。 由于RDD的只读特性使得 它比常用的共享内存更容易做检查点, 具体可以使用doCheckPoint方法。 检查点( 本质是通过将RDD写入Disk做检查点) 是为了通过lineage做容错的辅助, lineage 过长会造成容错成本过高, 这样就不如在中间阶段做检查点容错, 如果之后有节点出现问题 而丢失分区, 从做检查点的RDD开始重做Lineage, 就会减少开销 5.RDD算子补充

今天下午遇到了一个相对奇怪的问题。

java.lang.NumberFormatException: empty String

这个异常的原因是我在让String类型转Double类型的时候,字符串是空的。 也就是这部分:

map(m=>{
        val price=m(1).trim
        val pr=price.toDouble
        val name=m(0).trim
        (name,pr)
      })

其实,问题的产生是因为之前的不合理判断造成的: val result1=datas.map(_.split("\t")) .filter(m=>m.length==6 && m(1)!=null && m(0)!=null) 在java中,判断一个字符串是否为空,可以用str==null或者str.isEmpty()....但是有时候,我们也会写这样的:str==” “.这里需要区分一下:

  1. 1、null表示这个字符串不指向任何的东西,如果这时候你调用它的方法,那么就会出现空指针异常。
  2. 2、""表示它指向一个长度为0的字符串,这时候调用它的方法是安全的。
  3. 3.、null不是对象,""是对象,所以null没有分配空间。

然而在这个程序中,我们在过滤的时候,如果是让其!=null。这样的结果是有一点问题的。这几天的程序中,老师给的案例中,都是按上面这样的方式进行操作的。。。额,早上想的时候,想起了一个问题 ,当那个数据是“ ”的时候,他是有内容的,被Null条件筛选不掉,然而转Double的时候,肯定会发生错误啊,但是这个“ ”作为字符串进行处理的时候。。。会发生的问题,我没遇到。

【程序】

数据表结构:农产品、价格、数据采集时间、农产品市场、省、城市

package yxy

import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by yxy on 1/8/17.
  */
//某种农产品的价格均值计算公式:
/*P AVG = (P M1 +P M2 +...+P Mn -max(P)-min(P))/(N-2)
其中,P 表示价格,Mn 表示 market,即农产品市场。P M1 表示 M1 农产品市场的该产品价格,max(P)表示价格最大值,min(P)价格最小值。*/
object work02 {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("Market").setMaster("local")
    val sc=new SparkContext(conf)
    val datas=sc.textFile("hdfs://hadoop01:9000/data/pm/market")
    val result1=datas.map(_.split("\t"))
      .filter(m=>m.length==6 && m(1)!=" " && m(0)!=" ")
     ** // .filter(m=>m.length==6 && m(1)!=null && m(0)!=null)   错误**
      .map(m=>{
        val price=m(1).trim
        val pr=price.toDouble
        val name=m(0).trim
        (name,pr)
      })
      .groupByKey()
      .map(m=>{
        var avgs=0.0
        var sump=m._2.sum
        val lens=m._2.size
        val maxp=m._2.max
        val minp=m._2.min

        if(lens>2){

          avgs=(sump-maxp-minp)/(lens-2)
        }else{
          avgs=sump/lens
        }
        (m._1,avgs)
      })
     // .foreach(m=>println(m._1+":"+m._2))
  result1.foreach(m=>{
    println(m._1+" : "+m._2)
  })

  }
}

之前写得一些代码:

要处理的需求:统计每个省份的农产品市场总数。数据表结构:农产品、价格、数据采集时间、农产品市场、省、城市

package SparkCore

import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by tg on 1/4/17.
  */
object ProductDemo {
  def main(args: Array[String]): Unit = {
    val conf=new SparkConf().setAppName("ProductDemo").setMaster("local")
    val sc=new SparkContext(conf)

    val lines=sc.textFile("hdfs://tgmaster:9000/in/product")

    /**
      * 统计每个省份的农产品市场总数
      */
    val result=lines.map(_.split("\t")) //分隔每行数据
      .filter(m=>m.length==6 && m(3)!=null && m(4)!=null)  //过滤,保证数据完整性
      .map(m=>{    //分别取出省份和农产品市场,形成 省份:市场 这种格式的数据。
        val province=m(4).trim
        val market=m(3).trim
        province+":"+market
      })
      .distinct()  //对 省份:市场 这种格式的数据进行去重
      .map(m=>{
        val info=m.split(":")
        val prov=info(0).trim
        val market=info(1).trim
        (prov,market)   //取出去重之后的省份与市场,形成键值对(省份,市场)
      })
      .groupByKey() //针对key值(省份)进行分组
      .map(m=>{  //对分组之后的数据进行操作,(String,Iterable<...>集合),集合大小即为市场的数量
        val prov=m._1
        val marketCount=m._2.size
        (prov,marketCount) //形成键值对(省份,市场总数)
      })

    result.foreach(item=>{
      println(item._1+"省农产品市场总数:"+item._2)
    })
  }
}
展开阅读全文
打赏
0
0 收藏
分享

作者的其它热门文章

加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部