文档章节

如何使用Spark大规模并行构建索引

九劫散仙
 九劫散仙
发布于 2016/02/01 13:45
字数 1248
阅读 748
收藏 7
点赞 3
评论 1
使用Spark构建索引非常简单,因为spark提供了更高级的抽象rdd分布式弹性数据集,相比以前的使用Hadoop的MapReduce来构建大规模索引,Spark具有更灵活的api操作,性能更高,语法更简洁等一系列优点。 

先看下,整体的拓扑图: 





然后,再来看下,使用scala写的spark程序: 

Java代码   收藏代码
  1. package com.easy.build.index  
  2.   
  3. import java.util  
  4.   
  5. import org.apache.solr.client.solrj.beans.Field  
  6. import org.apache.solr.client.solrj.impl.HttpSolrClient  
  7. import org.apache.spark.rdd.RDD  
  8. import org.apache.spark.{SparkConf, SparkContext}  
  9.   
  10. import scala.annotation.meta.field  
  11. /** 
  12.   * Created by qindongliang on 2016/1/21. 
  13.   */  
  14.   
  15. //注册model,时间类型可以为字符串,只要后台索引配置为Long即可,注解映射形式如下  
  16. case class Record(  
  17.                    @(Field@field )("rowkey")     rowkey:String,  
  18.                    @(Field@field )("title")  title:String,  
  19.                    @(Field@field)("content") content:String,  
  20.                    @(Field@field)("isdel") isdel:String,  
  21.                    @(Field@field)("t1") t1:String,  
  22.                    @(Field@field)("t2")t2:String,  
  23.                    @(Field@field)("t3")t3:String,  
  24.                    @(Field@field)("dtime") dtime:String  
  25.   
  26.   
  27.                  )  
  28.   
  29. /*** 
  30.   * Spark构建索引==>Solr 
  31.   */  
  32. object SparkIndex {  
  33.   
  34.   //solr客户端  
  35.   val client=new  HttpSolrClient("http://192.168.1.188:8984/solr/monitor");  
  36.   //批提交的条数  
  37.   val batchCount=10000;  
  38.   
  39.   def main2(args: Array[String]) {  
  40.   
  41.     val d1=new Record("row1","title","content","1","01","57","58","3");  
  42.     val d2=new Record("row2","title","content","1","01","57","58","45");  
  43.     val d3=new Record("row3","title","content","1","01","57","58",null);  
  44.     client.addBean(d1);  
  45.     client.addBean(d2)  
  46.     client.addBean(d3)  
  47.     client.commit();  
  48.     println("提交成功!")  
  49.   
  50.   
  51.   }  
  52.   
  53.   
  54.   /*** 
  55.     * 迭代分区数据(一个迭代器集合),然后进行处理 
  56.     * @param lines 处理每个分区的数据 
  57.     */  
  58.   def  indexPartition(lines:scala.Iterator[String] ): Unit ={  
  59.           //初始化集合,分区迭代开始前,可以初始化一些内容,如数据库连接等  
  60.           val datas = new util.ArrayList[Record]()  
  61.           //迭代处理每条数据,符合条件会提交数据  
  62.           lines.foreach(line=>indexLineToModel(line,datas))  
  63.           //操作分区结束后,可以关闭一些资源,或者做一些操作,最后一次提交数据  
  64.           commitSolr(datas,true);  
  65.   }  
  66.   
  67.   /*** 
  68.     *  提交索引数据到solr中 
  69.     * 
  70.     * @param datas 索引数据 
  71.     * @param isEnd 是否为最后一次提交 
  72.     */  
  73.   def commitSolr(datas:util.ArrayList[Record],isEnd:Boolean): Unit ={  
  74.           //仅仅最后一次提交和集合长度等于批处理的数量时才提交  
  75.           if ((datas.size()>0&&isEnd)||datas.size()==batchCount) {  
  76.             client.addBeans(datas);  
  77.             client.commit(); //提交数据  
  78.             datas.clear();//清空集合,便于重用  
  79.           }  
  80.   }  
  81.   
  82.   
  83.   /*** 
  84.     * 得到分区的数据具体每一行,并映射 
  85.     * 到Model,进行后续索引处理 
  86.     * 
  87.     * @param line 每行具体数据 
  88.     * @param datas 添加数据的集合,用于批量提交索引 
  89.     */  
  90.   def indexLineToModel(line:String,datas:util.ArrayList[Record]): Unit ={  
  91.     //数组数据清洗转换  
  92.     val fields=line.split("\1",-1).map(field =>etl_field(field))  
  93.     //将清洗完后的数组映射成Tuple类型  
  94.     val tuple=buildTuble(fields)  
  95.     //将Tuple转换成Bean类型  
  96.     val recoder=Record.tupled(tuple)  
  97.     //将实体类添加至集合,方便批处理提交  
  98.     datas.add(recoder);  
  99.     //提交索引到solr  
  100.     commitSolr(datas,false);  
  101.   }  
  102.   
  103.   
  104.   /*** 
  105.     * 将数组映射成Tuple集合,方便与Bean绑定 
  106.     * @param array field集合数组 
  107.     * @return tuple集合 
  108.     */  
  109.   def buildTuble(array: Array[String]):(String, String, String, String, String, String, String, String)={  
  110.      array match {  
  111.        case Array(s1, s2, s3, s4, s5, s6, s7, s8) => (s1, s2, s3, s4, s5, s6, s7,s8)  
  112.      }  
  113.   }  
  114.   
  115.   
  116.   /*** 
  117.     *  对field进行加工处理 
  118.     * 空值替换为null,这样索引里面就不会索引这个字段 
  119.     * ,正常值就还是原样返回 
  120.     * 
  121.     * @param field 用来走特定规则的数据 
  122.     * @return 映射完的数据 
  123.     */  
  124.   def etl_field(field:String):String={  
  125.     field match {  
  126.       case "" => null  
  127.       case _ => field  
  128.     }  
  129.   }  
  130.   
  131.   /*** 
  132.     * 根据条件清空某一类索引数据 
  133.     * @param query 删除的查询条件 
  134.     */  
  135.   def deleteSolrByQuery(query:String): Unit ={  
  136.     client.deleteByQuery(query);  
  137.     client.commit()  
  138.     println("删除成功!")  
  139.   }  
  140.   
  141.   
  142.   def main(args: Array[String]) {  
  143.     //根据条件删除一些数据  
  144.     deleteSolrByQuery("t1:03")  
  145.     //远程提交时,需要提交打包后的jar  
  146.     val jarPath = "target\\spark-build-index-1.0-SNAPSHOT.jar";  
  147.     //远程提交时,伪装成相关的hadoop用户,否则,可能没有权限访问hdfs系统  
  148.     System.setProperty("user.name""webmaster");  
  149.     //初始化SparkConf  
  150.     val conf = new SparkConf().setMaster("spark://192.168.1.187:7077").setAppName("build index ");  
  151.     //上传运行时依赖的jar包  
  152.     val seq = Seq(jarPath) :+ "D:\\tmp\\lib\\noggit-0.6.jar" :+ "D:\\tmp\\lib\\httpclient-4.3.1.jar" :+ "D:\\tmp\\lib\\httpcore-4.3.jar" :+ "D:\\tmp\\lib\\solr-solrj-5.1.0.jar" :+ "D:\\tmp\\lib\\httpmime-4.3.1.jar"  
  153.     conf.setJars(seq)  
  154.     //初始化SparkContext上下文  
  155.     val sc = new SparkContext(conf);  
  156.     //此目录下所有的数据,将会被构建索引,格式一定是约定好的  
  157.     val rdd = sc.textFile("hdfs://192.168.1.187:9000/user/monitor/gs/");  
  158.     //通过rdd构建索引  
  159.     indexRDD(rdd);  
  160.     //关闭索引资源  
  161.     client.close();  
  162.     //关闭SparkContext上下文  
  163.     sc.stop();  
  164.   
  165.   
  166.   }  
  167.   
  168.   
  169.   /*** 
  170.     * 处理rdd数据,构建索引 
  171.     * @param rdd 
  172.     */  
  173.   def indexRDD(rdd:RDD[String]): Unit ={  
  174.     //遍历分区,构建索引  
  175.     rdd.foreachPartition(line=>indexPartition(line));  
  176.   }  
  177.   
  178.   
  179.   
  180. }  


ok,至此,我们的建索引程序就写完了,本例子中用的是远程提交模式,实际上它也可以支持spark on yarn (cluster 或者 client )  模式,不过此时需要注意的是,不需要显式指定setMaster的值,而由提交任务时,通过--master来指定运行模式,另外,依赖的相关jar包,也需要通过--jars参数来提交到集群里面,否则的话,运行时会报异常,最后看下本例子里面的solr是单机模式的,所以使用spark建索引提速并没有达到最大值,真正能发挥最大威力的是,多台search集群正如我画的架构图里面,每台机器是一个shard,这就是solrcloud的模式,或者在elasticsearch里面的集群shard,这样以来,才能真正达到高效批量的索引构建 

© 著作权归作者所有

共有 人打赏支持
九劫散仙
粉丝 261
博文 175
码字总数 189625
作品 0
海淀
加载中

评论(1)

一库大神
一库大神
老大,有没批量读取hbase的栗子!
【DataMagic】如何在万亿级别规模的数据量上使用Spark

欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~ 本文首发在云+社区,未经许可,不得转载。 作者:张国鹏 | 腾讯 运营开发工程师 一、前言 Spark作为大数据计算引擎,凭借其快速、...

04/18
0
0
如何在万亿级别规模的数据量上使用Spark?

  【IT168 技术】   一、前言   Spark作为大数据计算引擎,凭借其快速、稳定、简易等特点,快速的占领了大数据计算的领域。本文主要为作者在搭建使用计算平台的过程中,对于Spark的理解...

网络大数据
05/30
0
0
Spark2.0-RDD分区原理分析

Spark分区原理分析 介绍 分区是指如何把RDD分布在spark集群的各个节点的操作。以及一个RDD能够分多少个分区。 一个分区是大型分布式数据集的逻辑块。 那么思考一下:分区数如何映射到spark的...

xiaomin0322
06/06
0
0
如何在万亿级别规模的数据量上使用Spark

一、前言 Spark作为大数据计算引擎,凭借其快速、稳定、简易等特点,快速的占领了大数据计算的领域。本文主要为作者在搭建使用计算平台的过程中,对于Spark的理解,希望能给读者一些学习的思...

风火数据
05/14
0
0
【转】Spark,一种快速数据分析替代方案

Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,...

mj4738
2012/05/24
0
0
Spark GraphX宝刀出鞘,图文并茂研习图计算秘笈与熟练的掌握Scala语言【大数据Spark

Spark GraphX宝刀出鞘,图文并茂研习图计算秘笈 大数据的概念与应用,正随着智能手机、平板电脑的快速流行而日渐普及,大数据中图的并行化处理一直是一个非常热门的话题。图计算正在被广泛地...

Spark亚太研究院
2014/08/29
0
0
Spark 入门(Python、Scala 版)

本文中,我们将首先讨论如何在本地机器上利用Spark进行简单分析。然后,将在入门级水平探索Spark,了解Spark是什么以及它如何工作(希望可以激发更多探索)。最后两节将开始通过命令行与Spa...

大数据之路
2015/05/07
0
0
Spark Streaming 框架 - StreamingPro

概述 Spark 是一个可扩展的可编程框架,用于数据集的大规模分布式处理, 称为弹性分布式数据集(Resilient Distributed Datasets,RDD)。 Spark Streaming 是 Spark API 核心的扩展,它支持...

匿名
04/29
0
0
整合Kafka到Spark Streaming——代码示例和挑战

作者Michael G. Noll是瑞士的一位工程师和研究员,效力于Verisign,是Verisign实验室的大规模数据分析基础设施(基础Hadoop)的技术主管。本文,Michael详细的演示了如何将Kafka整合到Spark...

stark_summer
2015/03/03
0
0
Spark与Hadoop计算模型的比较分析

Spark与Hadoop计算模型的比较分析 最近很多人都在讨论Spark这个貌似通用的分布式计算模型,国内很多机器学习相关工作者都在研究和使用它。   Spark是一个通用的并行计算框架,由UCBerkele...

jmppok
2015/03/25
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

代码生成利器:IDEA 强大的 Live Templates

代码生成利器:IDEA 强大的 Live Templates

qwfys
24分钟前
1
0
spring boot使用通用mapper(tk.mapper) ,id自增和回显等问题

最近项目使用到tk.mapper设置id自增,数据库是mysql。在使用通用mapper主键生成过程中有一些问题,在总结一下。 1、UUID生成方式-字符串主键 在主键上增加注解 @Id @GeneratedValue...

北岩
27分钟前
1
0
告警系统邮件引擎、运行告警系统

告警系统邮件引擎 cd mail vim mail.py #!/usr/bin/env python#-*- coding: UTF-8 -*-import os,sysreload(sys)sys.setdefaultencoding('utf8')import getoptimport smtplibfr......

Zhouliang6
30分钟前
0
0
日常运维--rsync同步工具

rsync命令是一个远程数据同步工具,可通过LAN/WAN快速同步多台主机间的文件。rsync使用所谓的“rsync算法”来使本地和远程两个主机之间的文件达到同步,这个算法只传送两个文件的不同部分,而...

chencheng-linux
34分钟前
1
0
Java工具类—随机数

Java中常用的生成随机数有Math.random()方法及java.util.Random类.但他们生成的随机数都是伪随机的. Math.radom()方法 在jdk1.8的Math类中可以看到,Math.random()方法实际上就是调用Random类...

PrivateO2
47分钟前
1
0
关于java内存模型、并发编程的好文

Java并发编程:volatile关键字解析    volatile这个关键字可能很多朋友都听说过,或许也都用过。在Java 5之前,它是一个备受争议的关键字,因为在程序中使用它往往会导致出人意料的结果。在...

DannyCoder
昨天
0
0
dubbo @Reference retries 重试次数 一个坑

在代码一中设置 成retries=0,也就是调用超时不用重试,结果DEBUG的时候总是重试,不是0吗,0就不用重试啊。为什么还是调用了多次呢? 结果在网上看到 这篇文章才明白 https://www.cnblogs....

奋斗的小牛
昨天
1
0
数据结构与算法3

要抓紧喽~~~~~~~放羊的孩纸回来喽 LowArray类和LowArrayApp类 程序将一个普通的Java数组封装在LowArray类中。类中的数组隐藏了起来,它是私有的,所以只有类自己的方法才能访问他。 LowArray...

沉迷于编程的小菜菜
昨天
0
0
spring boot应用测试框架介绍

一、spring boot应用测试存在的问题 官方提供的测试框架spring-boot-test-starter,虽然提供了很多功能(junit、spring test、assertj、hamcrest、mockito、jsonassert、jsonpath),但是在数...

yangjianzhou
昨天
1
0
rsync工具介绍/rsync通过ssh同步

rsync工具介绍 数据备份是必不可少,在Linux系统下数据备份的工具很多,其中重点介绍就是rsync工具,rsync不仅可以远程同步数据,还可以本地同步数据,且不会覆盖以前的数据在已经存在的数据...

Hi_Yolks
昨天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部