文档章节

Spark 实现自己的RDD,让代码更优雅

K
 Kadima
发布于 2017/01/15 19:50
字数 799
阅读 80
收藏 0

你是否在最初书写spark的代码时总是使用object 是否在为代码的重复而忧心,接下来的博客中,我会专注于spark代码简洁性。

1,什么事RDD,官网上有很全面的解释,在此不再赘述,不过我们需要从代码层面上理解什么事RDD,如果他是一个类,他又有哪些重要的属性和方法,现在列出以下几点:

    1)partitions():Get the array of partitions of this RDD, taking into account whether the
RDD is checkpointed or not. Partition是一个特质,分布在每一个excutor上的分区,都会有一个Partition实现类去做唯一标识。

    2)iterator():Internal method to this RDD; will read from cache if applicable, or otherwise compute it. This should not be called by users directly, but is available for implementors of custom subclasses of RDD. 这是一个RDD的迭代器,传入的参数是Partition和TaskContext,这样就可以在每一个Partition上执行相应的逻辑了。

    3)dependencies():Get the list of dependencies of this RDD,在1.6中,Dependency共有如下几个继承类,后续博文会详解它,感兴趣的读者可以直接阅读源码进一步了解

            

    4)partitioner():此函数返回一个Option[Partitioner],如果RDD不是key-value pair RDD类型的数据,那么为None,我们和以自己实现这个抽象类。当时看到这里,我就在想为什么不能实现一个特质,而要用

抽象类,个人理解这是属于面向对象的东西了,类是实体的抽象爱,而接口则定义一些行为。

    5)preferredLocations():Optionally overridden by subclasses to specify placement preferences.

 

下面我们自己实现一个和Mysql交互的RDD,只涉及到上面说的部分函数,当然在生产环境中不建议这样做,除非你自己想把自己的mysql搞挂,此处只是演示,对于像Hbase之类的分布式数据库,逻辑类似。

package com.hypers.rdd

import java.sql.{Connection, ResultSet}

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}

import scala.reflect.ClassTag

//TODO 去重
class HFAJdbcRDD[T: ClassTag]
    (sc: SparkContext,
     connection: () => Connection, //method
     sql: String,
     numPartittions: Int,
     mapRow: (ResultSet) => T
) extends RDD[T](sc, Nil) with Logging {

    /**
      * 若是这个Rdd是有父RDD 那么 compute一般会调用到iterator方法 将taskContext传递出去
      * @param thePart
      * @param context
      * @return
      */
    @DeveloperApi
    override def compute(thePart: Partition, context: TaskContext): Iterator[T] = new Iterator[T] {

        val part = thePart.asInstanceOf[HFAJdbcPartition]
        val conn = connection()
        //如果直接执行sql会使数据重复,因此此处使用分页
        val stmt = conn.prepareStatement(String.format("%s limit %s,1",sql,thePart.index.toString), ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
        logInfo("Get sql data size is " + stmt.getFetchSize)
        val rs: ResultSet = stmt.executeQuery()

        override def hasNext: Boolean = {
            if(rs.next()){
                true
            }else{
                conn.close()
                false
            }
        }

        override def next(): T = {
            mapRow(rs)
        }
    }


    /**
      * 将一些信息传递到compute方法 例如sql limit 的参数
      * @return
      */
    override protected def getPartitions: Array[Partition] = {
        (0 until numPartittions).map { inx =>
            new HFAJdbcPartition(inx)
        }.toArray
    }
}

private class HFAJdbcPartition(inx: Int) extends Partition {
    override def index: Int = inx
}

 

package com.hypers.rdd.execute

import java.sql.{DriverManager, ResultSet}

import com.hypers.commons.spark.BaseJob
import com.hypers.rdd.HFAJdbcRDD

//BaseJob里面做了sc的初始化,在此不做演示,您也可以自己new出sparkContext
object HFAJdbcTest extends BaseJob {

    def main(args: Array[String]) {
        HFAJdbcTest(args)
    }

    override def apply(args: Array[String]): Unit = {

        val jdbcRdd = new HFAJdbcRDD[Tuple2[Int, String]](sc,
            getConnection,
            "select id,name from user where id<10",
            3,
            reseultHandler
        )

        logger.info("count is " + jdbcRdd.count())
        logger.info("count keys " + jdbcRdd.keys.collect().toList)

    }

    def getConnection() = {
        Class.forName("com.mysql.jdbc.Driver").newInstance()
        DriverManager.getConnection("jdbc:mysql://localhost:3306/db", "root", "123456")
    }

    def reseultHandler(rs: ResultSet): Tuple2[Int, String] = {
        rs.getInt("id") -> rs.getString("name")

    }
}

 

© 著作权归作者所有

上一篇: RabbitMq之HelloWorld
下一篇: spark
K
粉丝 5
博文 20
码字总数 8451
作品 0
崇明
私信 提问
Spark成为大数据高手进阶步骤

什么是Spark Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapRedu...

MoksMo
2015/11/05
2.1K
1
Spark2.1.0之基础知识

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/80303035 在阅读本文之前,读者最好已经阅读了《Spark2.1.0之初识Spark》一文,本文...

泰山不老生
2018/05/24
0
0
大数据入门与实战-Spark上手

1 Spark简介 1.1 引言 行业正在广泛使用Hadoop来分析他们的数据集。原因是Hadoop框架基于简单的编程模型(MapReduce),它使计算解决方案具有可扩展性,灵活性,容错性和成本效益。在这里,主...

致Great
03/12
0
0
浅谈 Spark 的多语言支持

作者:郑锴,花名铁杰,阿里巴巴高级技术专家,Apache Hadoop PMC,Apache Kerby 创立者。深耕分布式系统开发和开源大数据多年,先后专注在安全,存储和计算领域。之前在 Intel,目前转战阿里...

开源大数据EMR
04/23
0
0
Apache Flink和Apache Spark有什么异同?它们的发展前景分别怎样?

============================= object WordCount { def main(args: Array[String]) {val env = new SparkContext("local","wordCount")val data = List("hi","how are you","hi")val dataSe......

justlpf
2018/05/12
0
0

没有更多内容

加载失败,请刷新页面

加载更多

OpenStack 简介和几种安装方式总结

OpenStack :是一个由NASA和Rackspace合作研发并发起的,以Apache许可证授权的自由软件和开放源代码项目。项目目标是提供实施简单、可大规模扩展、丰富、标准统一的云计算管理平台。OpenSta...

小海bug
昨天
6
0
DDD(五)

1、引言 之前学习了解了DDD中实体这一概念,那么接下来需要了解的就是值对象、唯一标识。值对象,值就是数字1、2、3,字符串“1”,“2”,“3”,值时对象的特征,对象是一个事物的具体描述...

MrYuZixian
昨天
6
0
数据库中间件MyCat

什么是MyCat? 查看官网的介绍是这样说的 一个彻底开源的,面向企业应用开发的大数据库集群 支持事务、ACID、可以替代MySQL的加强版数据库 一个可以视为MySQL集群的企业级数据库,用来替代昂贵...

沉浮_
昨天
6
0
解决Mac下VSCode打开zsh乱码

1.乱码问题 iTerm2终端使用Zsh,并且配置Zsh主题,该主题主题需要安装字体来支持箭头效果,在iTerm2中设置这个字体,但是VSCode里这个箭头还是显示乱码。 iTerm2展示如下: VSCode展示如下: 2...

HelloDeveloper
昨天
7
0
常用物流快递单号查询接口种类及对接方法

目前快递查询接口有两种方式可以对接,一是和顺丰、圆通、中通、天天、韵达、德邦这些快递公司一一对接接口,二是和快递鸟这样第三方集成接口一次性对接多家常用快递。第一种耗费时间长,但是...

程序的小猿
昨天
10
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部