文档章节

如何使用Spark大规模并行构建索引

九劫散仙
 九劫散仙
发布于 2016/02/01 13:45
字数 1248
阅读 815
收藏 7
使用Spark构建索引非常简单,因为spark提供了更高级的抽象rdd分布式弹性数据集,相比以前的使用Hadoop的MapReduce来构建大规模索引,Spark具有更灵活的api操作,性能更高,语法更简洁等一系列优点。 

先看下,整体的拓扑图: 





然后,再来看下,使用scala写的spark程序: 

Java代码   收藏代码
  1. package com.easy.build.index  
  2.   
  3. import java.util  
  4.   
  5. import org.apache.solr.client.solrj.beans.Field  
  6. import org.apache.solr.client.solrj.impl.HttpSolrClient  
  7. import org.apache.spark.rdd.RDD  
  8. import org.apache.spark.{SparkConf, SparkContext}  
  9.   
  10. import scala.annotation.meta.field  
  11. /** 
  12.   * Created by qindongliang on 2016/1/21. 
  13.   */  
  14.   
  15. //注册model,时间类型可以为字符串,只要后台索引配置为Long即可,注解映射形式如下  
  16. case class Record(  
  17.                    @(Field@field )("rowkey")     rowkey:String,  
  18.                    @(Field@field )("title")  title:String,  
  19.                    @(Field@field)("content") content:String,  
  20.                    @(Field@field)("isdel") isdel:String,  
  21.                    @(Field@field)("t1") t1:String,  
  22.                    @(Field@field)("t2")t2:String,  
  23.                    @(Field@field)("t3")t3:String,  
  24.                    @(Field@field)("dtime") dtime:String  
  25.   
  26.   
  27.                  )  
  28.   
  29. /*** 
  30.   * Spark构建索引==>Solr 
  31.   */  
  32. object SparkIndex {  
  33.   
  34.   //solr客户端  
  35.   val client=new  HttpSolrClient("http://192.168.1.188:8984/solr/monitor");  
  36.   //批提交的条数  
  37.   val batchCount=10000;  
  38.   
  39.   def main2(args: Array[String]) {  
  40.   
  41.     val d1=new Record("row1","title","content","1","01","57","58","3");  
  42.     val d2=new Record("row2","title","content","1","01","57","58","45");  
  43.     val d3=new Record("row3","title","content","1","01","57","58",null);  
  44.     client.addBean(d1);  
  45.     client.addBean(d2)  
  46.     client.addBean(d3)  
  47.     client.commit();  
  48.     println("提交成功!")  
  49.   
  50.   
  51.   }  
  52.   
  53.   
  54.   /*** 
  55.     * 迭代分区数据(一个迭代器集合),然后进行处理 
  56.     * @param lines 处理每个分区的数据 
  57.     */  
  58.   def  indexPartition(lines:scala.Iterator[String] ): Unit ={  
  59.           //初始化集合,分区迭代开始前,可以初始化一些内容,如数据库连接等  
  60.           val datas = new util.ArrayList[Record]()  
  61.           //迭代处理每条数据,符合条件会提交数据  
  62.           lines.foreach(line=>indexLineToModel(line,datas))  
  63.           //操作分区结束后,可以关闭一些资源,或者做一些操作,最后一次提交数据  
  64.           commitSolr(datas,true);  
  65.   }  
  66.   
  67.   /*** 
  68.     *  提交索引数据到solr中 
  69.     * 
  70.     * @param datas 索引数据 
  71.     * @param isEnd 是否为最后一次提交 
  72.     */  
  73.   def commitSolr(datas:util.ArrayList[Record],isEnd:Boolean): Unit ={  
  74.           //仅仅最后一次提交和集合长度等于批处理的数量时才提交  
  75.           if ((datas.size()>0&&isEnd)||datas.size()==batchCount) {  
  76.             client.addBeans(datas);  
  77.             client.commit(); //提交数据  
  78.             datas.clear();//清空集合,便于重用  
  79.           }  
  80.   }  
  81.   
  82.   
  83.   /*** 
  84.     * 得到分区的数据具体每一行,并映射 
  85.     * 到Model,进行后续索引处理 
  86.     * 
  87.     * @param line 每行具体数据 
  88.     * @param datas 添加数据的集合,用于批量提交索引 
  89.     */  
  90.   def indexLineToModel(line:String,datas:util.ArrayList[Record]): Unit ={  
  91.     //数组数据清洗转换  
  92.     val fields=line.split("\1",-1).map(field =>etl_field(field))  
  93.     //将清洗完后的数组映射成Tuple类型  
  94.     val tuple=buildTuble(fields)  
  95.     //将Tuple转换成Bean类型  
  96.     val recoder=Record.tupled(tuple)  
  97.     //将实体类添加至集合,方便批处理提交  
  98.     datas.add(recoder);  
  99.     //提交索引到solr  
  100.     commitSolr(datas,false);  
  101.   }  
  102.   
  103.   
  104.   /*** 
  105.     * 将数组映射成Tuple集合,方便与Bean绑定 
  106.     * @param array field集合数组 
  107.     * @return tuple集合 
  108.     */  
  109.   def buildTuble(array: Array[String]):(String, String, String, String, String, String, String, String)={  
  110.      array match {  
  111.        case Array(s1, s2, s3, s4, s5, s6, s7, s8) => (s1, s2, s3, s4, s5, s6, s7,s8)  
  112.      }  
  113.   }  
  114.   
  115.   
  116.   /*** 
  117.     *  对field进行加工处理 
  118.     * 空值替换为null,这样索引里面就不会索引这个字段 
  119.     * ,正常值就还是原样返回 
  120.     * 
  121.     * @param field 用来走特定规则的数据 
  122.     * @return 映射完的数据 
  123.     */  
  124.   def etl_field(field:String):String={  
  125.     field match {  
  126.       case "" => null  
  127.       case _ => field  
  128.     }  
  129.   }  
  130.   
  131.   /*** 
  132.     * 根据条件清空某一类索引数据 
  133.     * @param query 删除的查询条件 
  134.     */  
  135.   def deleteSolrByQuery(query:String): Unit ={  
  136.     client.deleteByQuery(query);  
  137.     client.commit()  
  138.     println("删除成功!")  
  139.   }  
  140.   
  141.   
  142.   def main(args: Array[String]) {  
  143.     //根据条件删除一些数据  
  144.     deleteSolrByQuery("t1:03")  
  145.     //远程提交时,需要提交打包后的jar  
  146.     val jarPath = "target\\spark-build-index-1.0-SNAPSHOT.jar";  
  147.     //远程提交时,伪装成相关的hadoop用户,否则,可能没有权限访问hdfs系统  
  148.     System.setProperty("user.name""webmaster");  
  149.     //初始化SparkConf  
  150.     val conf = new SparkConf().setMaster("spark://192.168.1.187:7077").setAppName("build index ");  
  151.     //上传运行时依赖的jar包  
  152.     val seq = Seq(jarPath) :+ "D:\\tmp\\lib\\noggit-0.6.jar" :+ "D:\\tmp\\lib\\httpclient-4.3.1.jar" :+ "D:\\tmp\\lib\\httpcore-4.3.jar" :+ "D:\\tmp\\lib\\solr-solrj-5.1.0.jar" :+ "D:\\tmp\\lib\\httpmime-4.3.1.jar"  
  153.     conf.setJars(seq)  
  154.     //初始化SparkContext上下文  
  155.     val sc = new SparkContext(conf);  
  156.     //此目录下所有的数据,将会被构建索引,格式一定是约定好的  
  157.     val rdd = sc.textFile("hdfs://192.168.1.187:9000/user/monitor/gs/");  
  158.     //通过rdd构建索引  
  159.     indexRDD(rdd);  
  160.     //关闭索引资源  
  161.     client.close();  
  162.     //关闭SparkContext上下文  
  163.     sc.stop();  
  164.   
  165.   
  166.   }  
  167.   
  168.   
  169.   /*** 
  170.     * 处理rdd数据,构建索引 
  171.     * @param rdd 
  172.     */  
  173.   def indexRDD(rdd:RDD[String]): Unit ={  
  174.     //遍历分区,构建索引  
  175.     rdd.foreachPartition(line=>indexPartition(line));  
  176.   }  
  177.   
  178.   
  179.   
  180. }  


ok,至此,我们的建索引程序就写完了,本例子中用的是远程提交模式,实际上它也可以支持spark on yarn (cluster 或者 client )  模式,不过此时需要注意的是,不需要显式指定setMaster的值,而由提交任务时,通过--master来指定运行模式,另外,依赖的相关jar包,也需要通过--jars参数来提交到集群里面,否则的话,运行时会报异常,最后看下本例子里面的solr是单机模式的,所以使用spark建索引提速并没有达到最大值,真正能发挥最大威力的是,多台search集群正如我画的架构图里面,每台机器是一个shard,这就是solrcloud的模式,或者在elasticsearch里面的集群shard,这样以来,才能真正达到高效批量的索引构建 

© 著作权归作者所有

共有 人打赏支持
九劫散仙
粉丝 266
博文 175
码字总数 189625
作品 0
海淀
私信 提问
加载中

评论(1)

一库大神
一库大神
老大,有没批量读取hbase的栗子!
突破DBMS局限性,阿里借力Spark提升查询性能

我们知道SQL Server是一款技术上和商业上都很成功的产品,这一次微软选择拥抱Spark大数据生态,着实令人有些惊讶。国内的几款产品也丝毫不落后,阿里云的DRDS、腾讯云TDSQL也都各自推出了与S...

技术小能手
2018/11/05
0
0
Cloudera Developer之Spark 及 Hadoop 开发员培训(CCA-175)

学习如何将数据导入到 Apache Hadoop 机群并使用 Spark、Hive、Flume、Sqoop、Impala 及其他 Hadoop 生态系统工具对数据进行各种操作和处理分析。 培训详情地址:https://www.huodongjia.co...

活动家
2017/07/28
31
0
如何在万亿级别规模的数据量上使用Spark?

  【IT168 技术】   一、前言   Spark作为大数据计算引擎,凭借其快速、稳定、简易等特点,快速的占领了大数据计算的领域。本文主要为作者在搭建使用计算平台的过程中,对于Spark的理解...

网络大数据
2018/05/30
0
0
如何在万亿级别规模的数据量上使用Spark

一、前言 Spark作为大数据计算引擎,凭借其快速、稳定、简易等特点,快速的占领了大数据计算的领域。本文主要为作者在搭建使用计算平台的过程中,对于Spark的理解,希望能给读者一些学习的思...

风火数据
2018/05/14
0
0
Spark GraphX宝刀出鞘,图文并茂研习图计算秘笈与熟练的掌握Scala语言【大数据Spark

Spark GraphX宝刀出鞘,图文并茂研习图计算秘笈 大数据的概念与应用,正随着智能手机、平板电脑的快速流行而日渐普及,大数据中图的并行化处理一直是一个非常热门的话题。图计算正在被广泛地...

Spark亚太研究院
2014/08/29
0
0

没有更多内容

加载失败,请刷新页面

加载更多

vue 对对象的属性进行修改时,不能渲染页面 vue.$set()

我在vue里的方法里给一个对象添加某个属性时,我console.log出来的是已经更改的object ,但是页面始终没有变化 原因如下: **受现代 JavaScript 的限制 (而且 Object.observe 也已经被废弃),...

Js_Mei
44分钟前
0
0
开始看《Java学习笔记》

虽然书买了很久,但一直没看。这其中也写过一些Java程序,但都是基于IDE的帮助和对C#的理解来写的,感觉不踏实。 林信良的书写得蛮好的,能够帮助打好基础,看得出作者是比较用心的。 第1章概...

max佩恩
昨天
12
0
Redux 三大原则

1.单一数据源 在传统的MVC架构中,我们可以根据需要创建无数个Model,而Model之间可以互相监听、触发事件甚至循环或嵌套触发事件,这些在Redux中都是不被允许的。 因为在Redux的思想里,一个...

wenxingjun
昨天
8
0
跟我学Spring Cloud(Finchley版)-12-微服务容错三板斧

至此,我们已实现服务发现、负载均衡,同时,使用Feign也实现了良好的远程调用——我们的代码是可读、可维护的。理论上,我们现在已经能构建一个不错的分布式应用了,但微服务之间是通过网络...

周立_ITMuch
昨天
4
0
XML

学习目标  能够说出XML的作用  能够编写XML文档声明  能够编写符合语法的XML  能够通过DTD约束编写XML文档  能够通过Schema约束编写XML文档  能够通过Dom4j解析XML文档 第1章 xm...

stars永恒
昨天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部