文档章节

spark和elasticsearch集成

cjun1990
 cjun1990
发布于 2016/12/10 11:49
字数 494
阅读 245
收藏 1
  1. 在spark程序中引入elasticsearch
  • 引入elasticsearch的依赖,将elasticsearch-hadoop上传到集群中,这里scope范围为provided即可。
<dependencies>
	<dependency>
		<groupId>org.elasticsearch</groupId>
		<artifactId>elasticsearch-hadoop</artifactId>
		<version>2.4.0</version>
		<scope>provided</scope>
		<exclusions>
			<exclusion>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-core_2.10</artifactId>
			</exclusion>
			<exclusion>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-sql_2.10</artifactId>
			</exclusion>
			<exclusion>
			<groupId>org.apache.storm</groupId>
			<artifactId>storm-core</artifactId>
			</exclusion>
			<exclusion>
			<groupId>cascading</groupId>
			<artifactId>cascading-hadoop</artifactId>
			</exclusion>
		</exclusions>
	</dependency>
</dependencies>
<repositories>
	<repository>
	<id>cloudera-repos</id>
	<name>Cloudera Repos</name>
	<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
	</repository>
	<repository>
	<id>Akka repository</id>
	<url>http://repo.akka.io/releases</url>
	</repository>
	<repository>
	<id>jboss</id>
	<url>http://repository.jboss.org/nexus/content/groups/public-jboss</url>
	</repository>
	<repository>
	<id>Sonatype snapshots</id>
	<url>http://oss.sonatype.org/content/repositories/snapshots/</url>
	</repository>
	<repository>
	<id>sonatype-oss</id>
	<url>http://oss.sonatype.org/content/repositories/snapshots</url>
	<snapshots><enabled>true</enabled></snapshots>
	</repository>
</repositories>
  • 在代码中使用elasticsearch
import org.elasticsearch.spark.sql._
def main(args: Array[String]): Unit ={
	val conf = new SparkConf()
	conf.setAppName("Spark Action ElasticSearch")
	conf.set("es.index.auto.create", "true")
	conf.set("es.nodes","192.168.1.11")
	conf.set("es.port","9200")
	val sc: SparkContext = new SparkContext(conf)
	val sqlContext = new HiveContext(sc)
	val df: DataFrame = sqlContext.sql("select * from info limit 50")
	//保存数据到ES
	df.saveToEs("myindex/info")
	从ES中读取数据
	val esdf = sqlContext.read.format("org.elasticsearch.spark.sql").load("myindex/info")
	esdf.count
    sc.stop()
}
  1. 在spark-shell中引入elasticsearch,以cdh为例。
  • 去maven中央仓库下载elasticsearch-hadoop的jar包,将jar包上传到目录:/opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.8/jars/中,在/opt/cloudera/parcels/CDH/lib/spark/conf/classpath.txt(spark的classpath配置文件)文件中最后添加如下内容:
/opt/cloudera/parcels/CDH-5.5.0-1.cdh5.5.0.p0.8/jars/elasticsearch-hadoop-2.4.0.jar
  • 启动spark-shell,命令如下:
spark-shell --master yarn --conf spark.es.nodes=192.168.1.11  spark.es.port=9200  spark.es.index.auto.create=true
  1. 使用jdbc连接elasticsearch查询
  • 引入maven依赖
<dependency>
	<groupId>org.elasticsearch</groupId>
	<artifactId>elasticsearch</artifactId>
	<version>2.4.0</version>
</dependency>  
<dependency>
	<groupId>org.nlpcn</groupId>
	<artifactId>elasticsearch-sql</artifactId>
	<version>2.4.0</version>
</dependency>
<dependency>
	<groupId>com.alibaba</groupId>
	<artifactId>druid</artifactId>
	<version>1.0.15</version>
</dependency>
<dependency>
	<groupId>mysql</groupId>
	<artifactId>mysql-connector-java</artifactId>
	<version>5.1.35</version>
</dependency>
  • 使用代码查询
public static void query(){
	try {
		Connection connection = getConnection();
		String sql = "select * from bigdata/student where usertype > 5 limit 5";
		PreparedStatement ps = connection.prepareStatement(sql);
		ResultSet rs = ps.executeQuery();
		while(rs.next()){
			System.out.println(rs.getString("_id") +" "+rs.getString("recordtime")
			+"  "+rs.getInt("area")+"  "+rs.getInt("usertype")+"  "+rs.getInt("count"));
		}
		ps.close();
		connection.close();
	} catch (Exception e) {
		e.printStackTrace();
	}
}
/**
 * 获取 ES jdbc连接
 */
public static Connection getConnection() throws Exception{
	String url = "jdbc:elasticsearch://192.168.1.11:9300";
	Properties properties = new Properties();
	properties.put("url", url);
	DruidDataSource dds = (DruidDataSource) ElasticSearchDruidDataSourceFactory.createDataSource(properties);
	Connection connection = dds.getConnection();
	return connection;
}

官网参考资料:Elasticsearch for Apache Hadoop

© 著作权归作者所有

cjun1990
粉丝 35
博文 371
码字总数 183914
作品 0
深圳
程序员
私信 提问
Spark2.x写入Elasticsearch的性能测试

一、Spark集成ElasticSearch的设计动机 ElasticSearch 毫秒级的查询响应时间还是很惊艳的。其优点有: 1. 优秀的全文检索能力 2. 高效的列式存储与查询能力 3. 数据分布式存储(Shard 分片) 相...

openfea
2017/10/27
383
0
Spark2.x与ElasticSearch的完美结合

ElasticSearch(简称ES)是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RestFul web接口。ElasticSearch是用Java开发的,并作为Apache许可条款下的开放源...

openfea
2017/10/19
302
0
SegmentFault D-Day 2015 北京:聊聊大数据

SegmentFault D-Day 介绍 SegmentFault D-Day,是由国内最前沿的开发者社区 SegmentFault 主办的技术沙⻰,于2014年正式启动。2015年上半年D-Day已在北、上、广、深、杭等城市成功举办9场,活...

文洁洁洁
2015/11/17
143
0
spark操作elasticsearch

本次测试为本地单机版的elasticsearch和spark 配置:spark2.2.0,elasticsearch1.7.2(集群)或者elasticsearch6.6.1(单机版),sdk2.11.1 pom依赖: <dependency> </dependency><dependency> <......

Sheav
2018/01/12
577
0
SegmentFault D-Day 2015 北京:聊聊大数据

SegmentFault D-Day 介绍 SegmentFault D-Day,是由国内最前沿的开发者社区 SegmentFault 主办的技术沙⻰,于2014年正式启动。2015年上半年D-Day已在北、上、广、深、杭等城市成功举办9场,活...

文洁洁洁
2015/11/17
4
0

没有更多内容

加载失败,请刷新页面

加载更多

前端技术之:Prisma Demo服务部署过程记录

安装前提条件: 1、已经安装了docker运行环境 2、以下命令执行记录发生在MackBook环境 3、已经安装了PostgreSQL(我使用的是11版本) 4、Node开发运行环境可以正常工作 首先需要通过Node包管...

popgis
今天
5
0
数组和链表

数组 链表 技巧一:掌握链表,想轻松写出正确的链表代码,需要理解指针获引用的含义: 对指针的理解,记住下面的这句话就可以了: 将某个变量赋值给指针,实际上就是将这个变量的地址赋值给指...

code-ortaerc
今天
4
0
栈-链式(c/c++实现)

上次说“栈是在线性表演变而来的,线性表很自由,想往哪里插数据就往哪里插数据,想删哪数据就删哪数据...。但给线性表一些限制呢,就没那么自由了,把线性表的三边封起来就变成了栈,栈只能...

白客C
今天
40
0
Mybatis Plus service

/** * @author beth * @data 2019-10-20 23:34 */@RunWith(SpringRunner.class)@SpringBootTestpublic class ServiceTest { @Autowired private IUserInfoService iUserInfoS......

一个yuanbeth
今天
5
0
php7-internal 7 zval的操作

## 7.7 zval的操作 扩展中经常会用到各种类型的zval,PHP提供了很多宏用于不同类型zval的操作,尽管我们也可以自己操作zval,但这并不是一个好习惯,因为zval有很多其它用途的标识,如果自己...

冻结not
昨天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部