文档章节

如何使用Spark大规模并行构建索引

九劫散仙
 九劫散仙
发布于 2016/02/01 13:45
字数 1248
阅读 777
收藏 7
使用Spark构建索引非常简单,因为spark提供了更高级的抽象rdd分布式弹性数据集,相比以前的使用Hadoop的MapReduce来构建大规模索引,Spark具有更灵活的api操作,性能更高,语法更简洁等一系列优点。 

先看下,整体的拓扑图: 





然后,再来看下,使用scala写的spark程序: 

Java代码   收藏代码
  1. package com.easy.build.index  
  2.   
  3. import java.util  
  4.   
  5. import org.apache.solr.client.solrj.beans.Field  
  6. import org.apache.solr.client.solrj.impl.HttpSolrClient  
  7. import org.apache.spark.rdd.RDD  
  8. import org.apache.spark.{SparkConf, SparkContext}  
  9.   
  10. import scala.annotation.meta.field  
  11. /** 
  12.   * Created by qindongliang on 2016/1/21. 
  13.   */  
  14.   
  15. //注册model,时间类型可以为字符串,只要后台索引配置为Long即可,注解映射形式如下  
  16. case class Record(  
  17.                    @(Field@field )("rowkey")     rowkey:String,  
  18.                    @(Field@field )("title")  title:String,  
  19.                    @(Field@field)("content") content:String,  
  20.                    @(Field@field)("isdel") isdel:String,  
  21.                    @(Field@field)("t1") t1:String,  
  22.                    @(Field@field)("t2")t2:String,  
  23.                    @(Field@field)("t3")t3:String,  
  24.                    @(Field@field)("dtime") dtime:String  
  25.   
  26.   
  27.                  )  
  28.   
  29. /*** 
  30.   * Spark构建索引==>Solr 
  31.   */  
  32. object SparkIndex {  
  33.   
  34.   //solr客户端  
  35.   val client=new  HttpSolrClient("http://192.168.1.188:8984/solr/monitor");  
  36.   //批提交的条数  
  37.   val batchCount=10000;  
  38.   
  39.   def main2(args: Array[String]) {  
  40.   
  41.     val d1=new Record("row1","title","content","1","01","57","58","3");  
  42.     val d2=new Record("row2","title","content","1","01","57","58","45");  
  43.     val d3=new Record("row3","title","content","1","01","57","58",null);  
  44.     client.addBean(d1);  
  45.     client.addBean(d2)  
  46.     client.addBean(d3)  
  47.     client.commit();  
  48.     println("提交成功!")  
  49.   
  50.   
  51.   }  
  52.   
  53.   
  54.   /*** 
  55.     * 迭代分区数据(一个迭代器集合),然后进行处理 
  56.     * @param lines 处理每个分区的数据 
  57.     */  
  58.   def  indexPartition(lines:scala.Iterator[String] ): Unit ={  
  59.           //初始化集合,分区迭代开始前,可以初始化一些内容,如数据库连接等  
  60.           val datas = new util.ArrayList[Record]()  
  61.           //迭代处理每条数据,符合条件会提交数据  
  62.           lines.foreach(line=>indexLineToModel(line,datas))  
  63.           //操作分区结束后,可以关闭一些资源,或者做一些操作,最后一次提交数据  
  64.           commitSolr(datas,true);  
  65.   }  
  66.   
  67.   /*** 
  68.     *  提交索引数据到solr中 
  69.     * 
  70.     * @param datas 索引数据 
  71.     * @param isEnd 是否为最后一次提交 
  72.     */  
  73.   def commitSolr(datas:util.ArrayList[Record],isEnd:Boolean): Unit ={  
  74.           //仅仅最后一次提交和集合长度等于批处理的数量时才提交  
  75.           if ((datas.size()>0&&isEnd)||datas.size()==batchCount) {  
  76.             client.addBeans(datas);  
  77.             client.commit(); //提交数据  
  78.             datas.clear();//清空集合,便于重用  
  79.           }  
  80.   }  
  81.   
  82.   
  83.   /*** 
  84.     * 得到分区的数据具体每一行,并映射 
  85.     * 到Model,进行后续索引处理 
  86.     * 
  87.     * @param line 每行具体数据 
  88.     * @param datas 添加数据的集合,用于批量提交索引 
  89.     */  
  90.   def indexLineToModel(line:String,datas:util.ArrayList[Record]): Unit ={  
  91.     //数组数据清洗转换  
  92.     val fields=line.split("\1",-1).map(field =>etl_field(field))  
  93.     //将清洗完后的数组映射成Tuple类型  
  94.     val tuple=buildTuble(fields)  
  95.     //将Tuple转换成Bean类型  
  96.     val recoder=Record.tupled(tuple)  
  97.     //将实体类添加至集合,方便批处理提交  
  98.     datas.add(recoder);  
  99.     //提交索引到solr  
  100.     commitSolr(datas,false);  
  101.   }  
  102.   
  103.   
  104.   /*** 
  105.     * 将数组映射成Tuple集合,方便与Bean绑定 
  106.     * @param array field集合数组 
  107.     * @return tuple集合 
  108.     */  
  109.   def buildTuble(array: Array[String]):(String, String, String, String, String, String, String, String)={  
  110.      array match {  
  111.        case Array(s1, s2, s3, s4, s5, s6, s7, s8) => (s1, s2, s3, s4, s5, s6, s7,s8)  
  112.      }  
  113.   }  
  114.   
  115.   
  116.   /*** 
  117.     *  对field进行加工处理 
  118.     * 空值替换为null,这样索引里面就不会索引这个字段 
  119.     * ,正常值就还是原样返回 
  120.     * 
  121.     * @param field 用来走特定规则的数据 
  122.     * @return 映射完的数据 
  123.     */  
  124.   def etl_field(field:String):String={  
  125.     field match {  
  126.       case "" => null  
  127.       case _ => field  
  128.     }  
  129.   }  
  130.   
  131.   /*** 
  132.     * 根据条件清空某一类索引数据 
  133.     * @param query 删除的查询条件 
  134.     */  
  135.   def deleteSolrByQuery(query:String): Unit ={  
  136.     client.deleteByQuery(query);  
  137.     client.commit()  
  138.     println("删除成功!")  
  139.   }  
  140.   
  141.   
  142.   def main(args: Array[String]) {  
  143.     //根据条件删除一些数据  
  144.     deleteSolrByQuery("t1:03")  
  145.     //远程提交时,需要提交打包后的jar  
  146.     val jarPath = "target\\spark-build-index-1.0-SNAPSHOT.jar";  
  147.     //远程提交时,伪装成相关的hadoop用户,否则,可能没有权限访问hdfs系统  
  148.     System.setProperty("user.name""webmaster");  
  149.     //初始化SparkConf  
  150.     val conf = new SparkConf().setMaster("spark://192.168.1.187:7077").setAppName("build index ");  
  151.     //上传运行时依赖的jar包  
  152.     val seq = Seq(jarPath) :+ "D:\\tmp\\lib\\noggit-0.6.jar" :+ "D:\\tmp\\lib\\httpclient-4.3.1.jar" :+ "D:\\tmp\\lib\\httpcore-4.3.jar" :+ "D:\\tmp\\lib\\solr-solrj-5.1.0.jar" :+ "D:\\tmp\\lib\\httpmime-4.3.1.jar"  
  153.     conf.setJars(seq)  
  154.     //初始化SparkContext上下文  
  155.     val sc = new SparkContext(conf);  
  156.     //此目录下所有的数据,将会被构建索引,格式一定是约定好的  
  157.     val rdd = sc.textFile("hdfs://192.168.1.187:9000/user/monitor/gs/");  
  158.     //通过rdd构建索引  
  159.     indexRDD(rdd);  
  160.     //关闭索引资源  
  161.     client.close();  
  162.     //关闭SparkContext上下文  
  163.     sc.stop();  
  164.   
  165.   
  166.   }  
  167.   
  168.   
  169.   /*** 
  170.     * 处理rdd数据,构建索引 
  171.     * @param rdd 
  172.     */  
  173.   def indexRDD(rdd:RDD[String]): Unit ={  
  174.     //遍历分区,构建索引  
  175.     rdd.foreachPartition(line=>indexPartition(line));  
  176.   }  
  177.   
  178.   
  179.   
  180. }  


ok,至此,我们的建索引程序就写完了,本例子中用的是远程提交模式,实际上它也可以支持spark on yarn (cluster 或者 client )  模式,不过此时需要注意的是,不需要显式指定setMaster的值,而由提交任务时,通过--master来指定运行模式,另外,依赖的相关jar包,也需要通过--jars参数来提交到集群里面,否则的话,运行时会报异常,最后看下本例子里面的solr是单机模式的,所以使用spark建索引提速并没有达到最大值,真正能发挥最大威力的是,多台search集群正如我画的架构图里面,每台机器是一个shard,这就是solrcloud的模式,或者在elasticsearch里面的集群shard,这样以来,才能真正达到高效批量的索引构建 

© 著作权归作者所有

共有 人打赏支持
九劫散仙
粉丝 264
博文 175
码字总数 189625
作品 0
海淀
加载中

评论(1)

一库大神
一库大神
老大,有没批量读取hbase的栗子!
【DataMagic】如何在万亿级别规模的数据量上使用Spark

欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~ 本文首发在云+社区,未经许可,不得转载。 作者:张国鹏 | 腾讯 运营开发工程师 一、前言 Spark作为大数据计算引擎,凭借其快速、...

04/18
0
0
如何在万亿级别规模的数据量上使用Spark?

  【IT168 技术】   一、前言   Spark作为大数据计算引擎,凭借其快速、稳定、简易等特点,快速的占领了大数据计算的领域。本文主要为作者在搭建使用计算平台的过程中,对于Spark的理解...

网络大数据
05/30
0
0
Spark2.0-RDD分区原理分析

Spark分区原理分析 介绍 分区是指如何把RDD分布在spark集群的各个节点的操作。以及一个RDD能够分多少个分区。 一个分区是大型分布式数据集的逻辑块。 那么思考一下:分区数如何映射到spark的...

xiaomin0322
06/06
0
0
如何在万亿级别规模的数据量上使用Spark

一、前言 Spark作为大数据计算引擎,凭借其快速、稳定、简易等特点,快速的占领了大数据计算的领域。本文主要为作者在搭建使用计算平台的过程中,对于Spark的理解,希望能给读者一些学习的思...

风火数据
05/14
0
0
【转】Spark,一种快速数据分析替代方案

Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,...

mj4738
2012/05/24
0
0

没有更多内容

加载失败,请刷新页面

加载更多

你为什么在Redis里读到了本应过期的数据

一个事故的故事 晚上睡的正香突然被电话吵醒,对面是开发焦急的声音:我们的程序在访问redis的时候读到了本应过期的key导致整个业务逻辑出了问题,需要马上解决。 看到这里你可能会想:这是不...

IT--小哥
今天
2
0
祝大家节日快乐,阖家幸福! centos GnuTLS 漏洞

yum update -y gnutls 修复了GnuTLS 漏洞。更新到最新 gnutls.x86_64 0:2.12.23-22.el6 版本

yizhichao
昨天
5
0
Scrapy 1.5.0之选择器

构造选择器 Scrapy选择器是通过文本(Text)或 TextResponse 对象构造的 Selector 类的实例。 它根据输入类型自动选择最佳的解析规则(XML vs HTML): >>> from scrapy.selector import Sele...

Eappo_Geng
昨天
4
0
Windows下Git多账号配置,同一电脑多个ssh-key的管理

Windows下Git多账号配置,同一电脑多个ssh-key的管理   这一篇文章是对上一篇文章《Git-TortoiseGit完整配置流程》的拓展,所以需要对上一篇文章有所了解,当然直接往下看也可以,其中也有...

morpheusWB
昨天
5
0
中秋快乐!!!

HiBlock
昨天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部