文档章节

spark介绍

弘_轩
 弘_轩
发布于 2017/05/18 18:54
字数 934
阅读 85
收藏 2

以SparkContext为程序运行的总入口,在SparkContext的初始化过程中,Spark会分别创建DAGScheduler作业调度和TaskScheduler任务调度两级调度模块。作业调度模块为每个Spark作业计算具有依赖关系的多个调度阶段(通常根据shuffle来划分),然后为每个阶段构建出一组具体的任务(通常会考虑数据的本地性等),然后以TaskSets(任务组)的形式提交给任务调度模块来具体执行。而任务调度模块则负责具体启动任务、监控和汇报任务运行情况。 

 

Application:  用户编写的应用应用程序。    

Driver: Application中运行main函数并创建的SparkContext, 创建SparkContext的目的是和集群的ClusterManager通讯,进行资源的申请、任务的分配和监控等。所以,可以用SparkContext代表Driver         

Worker:集群中可以运行Application代码的节点。     
    
Executor: 某个Application在Worker上面的一个进程,该进程负责执行某些Task,并负责把数据存在内存或者磁盘上。每个Application都各自有一批属于自己的Executor。        

Task:被送到Executor执行的工作单元,和Hadoop MapReduce中的MapTask和ReduceTask一样,是运行Application的基本单位。多个Task组成一个Stage,而Task的调度和管理由TaskScheduler负责。

Job:包含多个Task组成的并行计算,往往由Spark Action触发产生。一个Application可以产生多个Job。  

Stage:每个Job的Task被拆分成很多组Task, 作为一个TaskSet,命名为Stage。Stage的调度和划分由DAGScheduler负责。Stage又分为Shuffle Map Stage和Result Stage两种。Stage的边界就在发生Shuffle的地方。

**RDD:**Spark的基本数据操作抽象,可以通过一系列算子进行操作。RDD是Spark最核心的东西,可以被分区、被序列化、不可变、有容错机制,并且能并行操作的数据集合。存储级别可以是内存,也可以是磁盘。

DAGScheduler:根据Job构建基于Stage的DAG(有向无环任务图),并提交Stage给TaskScheduler

TaskScheduler:将Stage提交给Worker(集群)运行,每个Executor运行什么在此分配。

 

val spark = SparkSession.builder
      .appName(f"log clean:traffic_${beginInput}_$endInput")
      .config("spark.executor.memory", "2g")
      .config("spark.sql.warehouse.dir", "/user/hive/warehouse")
      .enableHiveSupport()
      .getOrCreate()

val trafficLog = config.getString("logPath.trafficLog").format(inputDate)

def lineExplain(line: String) = {
        try {
          val lineArr = line.stripLineEnd.split("\\?t\\?=")
          val ngCookieList = lineArr(0).split("\\|\\|")
          val ngCookieLen = ngCookieList.length

          if (lineArr.length >= 29 && List(2, 4, 9).contains(ngCookieLen)){
            //修复url后面加/
            val requestUrl = lfb.repairUrl(lineArr(1).replace("\"", ""))
            val domain = (new java.net.URL(requestUrl)).getHost
            val seoUrl: String = Try(lineArr(29).split(" ")(0)) match {
              case Success(seoUrl) => seoUrl
              case Failure(ex) => ""
            }
            //url解析
            val tids: Tids = seoUrl match {
              case _ if seoUrl.length > 5 => classify(seoUrl)
              case _ => classify(requestUrl)
            }
            if (tids.siteIds.nonEmpty) {
              val iploc = lineArr(2)
              val randomStr = lineArr(3).split("\\?")(1)
              val referUrl = lineArr(7)
              val referDomain = Try((new java.net.URL(referUrl)).getHost).toOption.getOrElse("0")
              
              }
              val uniqKey = (randomStr, suv, svn, ssid)
              val lineObj = TrafficLine(ip, iploc, area.country, area.province, area.city,
                tids.siteIds, tids.cnlIds, tids.colIds, tids.gids,addTime, inputDate
              )
              Some((uniqKey, lineObj))

            } else None

          } else None


        } catch {
          case e => {
            logger.error(line)
            logger.error(e)
            None
          }
        }
      }

spark.sparkContext.textFile(trafficLog).repartition(40).flatMap(lineExplain).
        reduceByKey((v1, v2) => v1, numPartitions=20).map(_._2).toDF.repartition(4).
        write.partitionBy("dt").mode(SaveMode.Append).parquet(f"$saveLogPath/")

spark.sql("msck repair table traffic_log")
spark.stop()


#伪代码
import scala.collection.mutable.{Map, Set}
spark.sparkContext.textFile(trafficLog).map(line => (url, Set(userId))).reduceByKey(_ ++ _).
                   map(lst => (user, len(userIdSet))).collectAsMap()



spark.sparkContext.textFile(trafficLog).map(line => (url, userId)).distinct().map(lst => (url, 1)).
                 reduceByKey(_ + _).collectAsMap()
                   

 

1、repartition 和 coalesce

2、宽依赖和窄依赖

窄依赖指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对应于一个子RDD的分区,或多个父RDD的分区对应于一个子RDD的分区。图中,map/filter和union属于第一类,对输入进行协同划分(cogroup)的join属于第二类。

宽依赖指子RDD的分区依赖于父RDD的多个或所有分区,这是因为shuffle类操作,如图中的groupByKey和未经协同划分的join。

3、RDD 的 Transformation与Action,DAG

 

 

© 著作权归作者所有

弘_轩
粉丝 5
博文 16
码字总数 10368
作品 0
福州
高级程序员
私信 提问

暂无文章

spring cloud

一、从面试题入手 1.1、什么事微服务 1.2、微服务之间如何独立通讯的 1.3、springCloud和Dubbo有哪些区别 1.通信机制:DUbbo基于RPC远程过程调用;微服务cloud基于http restFUL API 1.4、spr...

榴莲黑芝麻糊
7分钟前
0
0
Executor线程池原理与源码解读

线程池为线程生命周期的开销和资源不足问题提供了解决方 案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。 线程实现方式 Thread、Runnable、Callable //实现Runnable接口的...

小强的进阶之路
昨天
6
0
maven 环境隔离

解决问题 即 在 resource 文件夹下面 ,新增对应的资源配置文件夹,对应 开发,测试,生产的不同的配置内容 <resources> <resource> <directory>src/main/resources.${deplo......

之渊
昨天
8
0
详解箭头函数和普通函数的区别以及箭头函数的注意事项、不适用场景

箭头函数是ES6的API,相信很多人都知道,因为其语法上相对于普通函数更简洁,深受大家的喜爱。就是这种我们日常开发中一直在使用的API,大部分同学却对它的了解程度还是不够深... 普通函数和...

OBKoro1
昨天
7
0
轻量级 HTTP(s) 代理 TinyProxy

CentOS 下安装 TinyProxy yum install -y tinyproxy 启动、停止、重启 # 启动service tinyproxy start# 停止service tinyproxy stop# 重启service tinyproxy restart 相关配置 默认...

Anoyi
昨天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部