文档章节

通过自定义SparkSQL外部数据源实现SparkSQL读取HBase

小水熊
 小水熊
发布于 2015/11/16 17:19
字数 897
阅读 3307
收藏 4

包: sparksql.hbase

HBaseRelation.scala

package sparksql.hbase

import java.io.Serializable
import org.apache.spark.sql._
import org.apache.spark.sql.sources.TableScan
import org.apache.hadoop.hbase.client.{Result}
import org.apache.spark.sql._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.LongType
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.sources.BaseRelation
import sparksql.hbase.hbase._
 
 
object Resolver extends  Serializable { 
  def resolve (hbaseField: HBaseSchemaField, result: Result ): Any = {
    val cfColArray = hbaseField.fieldName.split(":",-1)
    val cfName = cfColArray(0)
    val colName =  cfColArray(1)
    var fieldRs: Any = null
    //resolve row key otherwise resolve column
    if(cfName=="" && colName=="key") {
      fieldRs = resolveRowKey(result, hbaseField.fieldType)
    } else {
      fieldRs =  resolveColumn(result, cfName, colName,hbaseField.fieldType)
    }
    fieldRs
  }
 
  def resolveRowKey (result: Result, resultType: String): Any = {
     val rowkey = resultType match {
      case "string" =>
        result.getRow.map(_.toChar).mkString
      case "int" =>
        result  .getRow.map(_.toChar).mkString.toInt
      case "long" =>
        result.getRow.map(_.toChar).mkString.toLong
    }
    rowkey
  }
 
  def resolveColumn (result: Result, columnFamily: String, columnName: String, resultType: String): Any = {
    val column = result.containsColumn(columnFamily.getBytes, columnName.getBytes) match{
        case true =>{
            resultType match {
              case "string" =>
                Bytes.toString(result.getValue(columnFamily.getBytes,columnName.getBytes))
                //result.getValue(columnFamily.getBytes,columnName.getBytes).map(_.toChar).mkString
              case "int" =>
                Bytes.toInt(result.getValue(columnFamily.getBytes,columnName.getBytes))
              case "long" =>
                Bytes.toLong(result.getValue(columnFamily.getBytes,columnName.getBytes))
              case "float" =>
                Bytes.toFloat(result.getValue(columnFamily.getBytes,columnName.getBytes))
              case "double" =>
                Bytes.toDouble(result.getValue(columnFamily.getBytes,columnName.getBytes))
             }
        }
        case _ => {
            resultType match {
              case "string" =>
                ""
              case "int" =>
                0
              case "long" =>
                0
             }
        }
    }
    column
  }
}
 
/**
   val hbaseDDL = s"""
      |CREATE TEMPORARY TABLE hbase_people
      |USING com.shengli.spark.hbase
      |OPTIONS (
      |  sparksql_table_schema   '(row_key string, name string, age int, job string)',
      |   hbase_table_name     'people',
      | hbase_table_schema '(:key , profile:name , profile:age , career:job )'
      |)""".stripMargin
 */
case class HBaseRelation(@transient val hbaseProps: Map[String,String])(@transient val sqlContext: SQLContext) extends BaseRelation with Serializable with TableScan{
 
  val hbaseTableName =  hbaseProps.getOrElse("hbase_table_name", sys.error("not valid schema"))
  val hbaseTableSchema =  hbaseProps.getOrElse("hbase_table_schema", sys.error("not valid schema"))
  val registerTableSchema = hbaseProps.getOrElse("sparksql_table_schema", sys.error("not valid schema"))
  val rowRange = hbaseProps.getOrElse("row_range", "->")
  //get star row and end row
  val range = rowRange.split("->",-1)
  val startRowKey = range(0).trim
  val endRowKey = range(1).trim
 
  val tempHBaseFields = extractHBaseSchema(hbaseTableSchema) //do not use this, a temp field
  val registerTableFields = extractRegisterSchema(registerTableSchema)
  val tempFieldRelation = tableSchemaFieldMapping(tempHBaseFields,registerTableFields)
 
  val hbaseTableFields = feedTypes(tempFieldRelation)
  val fieldsRelations =  tableSchemaFieldMapping(hbaseTableFields,registerTableFields)
  val queryColumns =  getQueryTargetCloumns(hbaseTableFields)
 
  def  feedTypes( mapping: Map[HBaseSchemaField, RegisteredSchemaField]) :  Array[HBaseSchemaField] = {
         val hbaseFields = mapping.map{
           case (k,v) =>
               val field = k.copy(fieldType=v.fieldType)
               field
        }
        hbaseFields.toArray
  }
 
  def isRowKey(field: HBaseSchemaField) : Boolean = {
    val cfColArray = field.fieldName.split(":",-1)
    val cfName = cfColArray(0)
    val colName =  cfColArray(1)
    if(cfName=="" && colName=="key") true else false
  }
 
  //eg: f1:col1  f1:col2  f1:col3  f2:col1
  def getQueryTargetCloumns(hbaseTableFields: Array[HBaseSchemaField]): String = {
    var str = ArrayBuffer[String]()
    hbaseTableFields.foreach{ field=>
         if(!isRowKey(field)) {
           str +=  field.fieldName
         }
    }
    str.mkString(" ")
  }
  lazy val schema = {
    val fields = hbaseTableFields.map{ field=>
        val name  = fieldsRelations.getOrElse(field, sys.error("table schema is not match the definition.")).fieldName
        val relatedType =  field.fieldType match  {
          case "string" =>
            SchemaType(StringType,nullable = false)
          case "int" =>
            SchemaType(IntegerType,nullable = false)
          case "long" =>
            SchemaType(LongType,nullable = false)
        }
        StructField(name,relatedType.dataType,relatedType.nullable)
    }
    StructType(fields)
  }
 
  def tableSchemaFieldMapping( externalHBaseTable: Array[HBaseSchemaField],  registerTable : Array[RegisteredSchemaField]): Map[HBaseSchemaField, RegisteredSchemaField] = {
       if(externalHBaseTable.length != registerTable.length) sys.error("columns size not match in definition!")
       val rs = externalHBaseTable.zip(registerTable)
       rs.toMap
  }
 
    /**
     * spark sql schema will be register
     *   registerTableSchema   '(rowkey string, value string, column_a string)'
      */
  def extractRegisterSchema(registerTableSchema: String) : Array[RegisteredSchemaField] = {
         val fieldsStr = registerTableSchema.trim.drop(1).dropRight(1)
         val fieldsArray = fieldsStr.split(",").map(_.trim)
         fieldsArray.map{ fildString =>
           val splitedField = fildString.split("\\s+", -1)
           RegisteredSchemaField(splitedField(0), splitedField(1))
         }
   }
 
  //externalTableSchema '(:key , f1:col1 )'
  def extractHBaseSchema(externalTableSchema: String) : Array[HBaseSchemaField] = {
        val fieldsStr = externalTableSchema.trim.drop(1).dropRight(1)
        val fieldsArray = fieldsStr.split(",").map(_.trim)
        fieldsArray.map(fildString => HBaseSchemaField(fildString,""))
  }
 
 
 
  // By making this a lazy val we keep the RDD around, amortizing the cost of locating splits.
  lazy val buildScan = {
 
    val hbaseConf = HBaseConfiguration.create()
    hbaseConf.set("hbase.zookeeper.quorum", "zookeeper-name")
    hbaseConf.set(TableInputFormat.INPUT_TABLE, hbaseTableName)
    hbaseConf.set(TableInputFormat.SCAN_COLUMNS, queryColumns);
    hbaseConf.set(TableInputFormat.SCAN_ROW_START, startRowKey);
    hbaseConf.set(TableInputFormat.SCAN_ROW_STOP, endRowKey);
 
    val hbaseRdd = sqlContext.sparkContext.newAPIHadoopRDD(
      hbaseConf,
      classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat],
      classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
      classOf[org.apache.hadoop.hbase.client.Result]
    )
 
 
    val rs = hbaseRdd.map(tuple => tuple._2).map(result => {
      var values = new ArrayBuffer[Any]()
      hbaseTableFields.foreach{field=>
        values += Resolver.resolve(field,result)
      }
      Row.fromSeq(values.toSeq)
    })
    rs
  }
 
  private case class SchemaType(dataType: DataType, nullable: Boolean)
}


DefaultSource.scala

package sparksql.hbase

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources.RelationProvider
 
 
class DefaultSource extends RelationProvider {
  def createRelation(sqlContext: SQLContext, parameters: Map[String, String]) = {
    HBaseRelation(parameters)(sqlContext)
  }
}


package.scala

package sparksql.hbase

import org.apache.spark.sql.SQLContext
import scala.collection.immutable.HashMap 
 
 
package object hbase {
 
  abstract class SchemaField extends Serializable
 
   case class RegisteredSchemaField(fieldName: String, fieldType: String)  extends  SchemaField  with Serializable
 
   case class HBaseSchemaField(fieldName: String, fieldType: String)  extends  SchemaField  with Serializable
 
   case class Parameter(name: String)
 
 
  protected  val SPARK_SQL_TABLE_SCHEMA = Parameter("sparksql_table_schema")
  protected  val HBASE_TABLE_NAME = Parameter("hbase_table_name")
  protected  val HBASE_TABLE_SCHEMA = Parameter("hbase_table_schema")
  protected  val ROW_RANGE = Parameter("row_range")
  /**
   * Adds a method, `hbaseTable`, to SQLContext that allows reading data stored in hbase table.
   */
  implicit class HBaseContext(sqlContext: SQLContext) {
    def hbaseTable(sparksqlTableSchema: String, hbaseTableName: String, hbaseTableSchema: String, rowRange: String = "->") = {
      var params = new HashMap[String, String]
      params += ( SPARK_SQL_TABLE_SCHEMA.name -> sparksqlTableSchema)
      params += ( HBASE_TABLE_NAME.name -> hbaseTableName)
      params += ( HBASE_TABLE_SCHEMA.name -> hbaseTableSchema)
      //get star row and end row
      params += ( ROW_RANGE.name -> rowRange)
      sqlContext.baseRelationToDataFrame(HBaseRelation(params)(sqlContext));
    }
  }
 
}


使用示例:

package test
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkContext


object SparkSqlHbaseTest {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark sql hbase test")
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)

    var hbasetable = sqlContext.read.format("sparksql.hbase").options(Map(
    "sparksql_table_schema" -> "(key string, title string, url string)",
    "hbase_table_name" -> "crawled",
    "hbase_table_schema" -> "(:key , data:title , basic:url)"
    )).load()

    hbasetable.printSchema()

    hbasetable.registerTempTable("crawled")

    var records = sqlContext.sql("SELECT * from crawled limit 10").collect
  }

}


© 著作权归作者所有

小水熊

小水熊

粉丝 68
博文 61
码字总数 41633
作品 1
静安
架构师
私信 提问
加载中

评论(1)

L
Le-Mon
代码中对hbase和sparksql中table映射是有问题的
方法是tableSchemaFieldMapping 方法中直接调用ArrayToMap,导致了之前的tempFieldRelation顺序改变,然后和Hbase结合时导致了field映射错误
总结:Hive,Hive on Spark和SparkSQL区别

Hive on Mapreduce Hive的原理大家可以参考这篇大数据时代的技术hive:hive介绍,实际的一些操作可以看这篇笔记:新手的Hive指南,至于还有兴趣看Hive优化方法可以看看我总结的这篇Hive性能优...

hblt-j
01/15
409
0
巨杉Tech | Hbase迁移至SequoiaDB 实战

背景 在传统银行 IT 架构中,联机交易与统计分析系统往往采用不同的技术与物理设备,通过定期执行的 ETL 将联机交易数据向分析系统中迁移。而作为数据服务资源池,同一份数据可能被不同类型的...

巨杉数据库
09/18
18
0
从 Hive 迁移到 Spark SQL 在有赞的实践

有赞数据平台从2017年上半年开始,逐步使用 SparkSQL 替代 Hive 执行离线任务,目前 SparkSQL 每天的运行作业数量5000个,占离线作业数目的55%,消耗的 cpu 资源占集群总资源的50%左右。本文...

Hive
03/20
0
0
HBase实战之HBase BulkLoad批量写入数据

1.概述 在进行数据传输时,批量加载数据到HBase集群有多种方式,比如通过HBase API进行批量写入数据、使用Sqoop工具批量导数到HBase集群、使用MapReduce批量导入等。这些方式,导入数据的过程...

HBase技术社区
2018/10/02
0
0
HBase BulkLoad批量写入数据实战

1.概述 在进行数据传输中,批量加载数据到HBase集群有多种方式,比如通过HBase API进行批量写入数据、使用Sqoop工具批量导数到HBase集群、使用MapReduce批量导入等。这些方式,在导入数据的过...

哥不是小萝莉
2018/08/19
0
0

没有更多内容

加载失败,请刷新页面

加载更多

500行代码,教你用python写个微信飞机大战

这几天在重温微信小游戏的飞机大战,玩着玩着就在思考人生了,这飞机大战怎么就可以做的那么好,操作简单,简单上手。 帮助蹲厕族、YP族、饭圈女孩在无聊之余可以有一样东西让他们振作起来!...

上海小胖
今天
8
0
关于AsyncTask的onPostExcute方法是否会在Activity重建过程中调用的问题

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/XG1057415595/article/details/86774575 假设下面一种情况...

shzwork
今天
7
0
object 类中有哪些方法?

getClass(): 获取运行时类的对象 equals():判断其他对象是否与此对象相等 hashcode():返回该对象的哈希码值 toString():返回该对象的字符串表示 clone(): 创建并返此对象的一个副本 wait...

happywe
今天
6
0
Docker容器实战(七) - 容器中进程视野下的文件系统

前两文中,讲了Linux容器最基础的两种技术 Namespace 作用是“隔离”,它让应用进程只能看到该Namespace内的“世界” Cgroups 作用是“限制”,它给这个“世界”围上了一圈看不见的墙 这么一...

JavaEdge
今天
8
0
文件访问和共享的方法介绍

在上一篇文章中,你了解到文件有三个不同的权限集。拥有该文件的用户有一个集合,拥有该文件的组的成员有一个集合,然后最终一个集合适用于其他所有人。在长列表(ls -l)中这些权限使用符号...

老孟的Linux私房菜
今天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部