文档章节

APDPlat拓展搜索之集成ElasticSearch

杨尚川
 杨尚川
发布于 2014/02/01 22:00
字数 1579
阅读 292
收藏 1

APDPlat充分利用Compass的OSEMORM integration特性,提供了简单易用功能强大内置搜索特性。

 

APDPlat的内置搜索,在设计简洁优雅的同时,还具备了强大的实时搜索能力,用户只需用注解的方式在模型中指定需要搜索哪些字段(还可在模型之间进行关联搜索)就获得了搜索能力,而不用编写任何代码。平台自动处理索引维护、查询解析、结果高亮等支撑功能。

 

然而APDPlat的内置搜索只能在单机上面使用,不支持分布式,只能用于中小规模的场景。为了支持大规模的分布式搜索和实时分析,APDPlat选用Compass的进化版ElasticSearch Compass和ElasticSearch的关系)。

 

ElasticSearch提供了Java Client API,但是由于该API依赖于Lucene的org.apache.lucene.util包中的几个类,以致于无法和APDPlat集成,原因是APDPlat中Compass依赖的Lucene的版本和ElasticSearch依赖的版本冲突。

 

从这里可以得知,ElasticSearch的Java Client API如果完全移除对Lucene的依赖,仅仅作为用户和ElasticSearch集群之间通信的接口,使用起来就会更方便。

 

因此,APDPlat只能采用ElasticSearch的RESTful API

 

接下来我们看一个APDPlat和ElasticSearch集成的例子:

 

APDPlat提供了可扩展的日志处理接口,用户可编写自己的插件并在配置文件中指定启用哪些插件,日志处理接口如下:

 

/**
 * 日志处理接口:
 * 可将日志存入独立日志数据库(非业务数据库)
 * 可将日志传递到activemq\rabbitmq\zeromq等消息队列
 * 可将日志传递到kafka\flume\chukwa\scribe等日志聚合系统
 * 可将日志传递到elasticsearch\solr等搜索服务器
 * @author 杨尚川
 */
public interface LogHandler {
    public <T extends Model> void handle(List<T> list);
}

 

将日志传递到ElasticSearch搜索服务器的实现使用了几个配置信息,这些配置信息默认存放在config.properties中,如下所示:

 

#elasticsearch服务器配置
elasticsearch.host=localhost
elasticsearch.port=9200
elasticsearch.log.index.name=apdplat_for_log

 

因为LogHandler接口中定义的参数List<T> list为泛型,只知道T是Model的子类,而不知道具体是哪一个类,所以我们使用反射的机制来获取具体对象类型:

 

String simpleName = model.getClass().getSimpleName();
LOG.debug((j++)+"、simpleName: 【"+simpleName+"】");
json.append("{\"index\":{\"_index\":\"")
  .append(INDEX_NAME)
  .append("\",\"_type\":\"")
  .append(simpleName)
  .append("\"}}")
  .append("\n");
json.append("{");

 

同时,我们利用反射的方式获取对象的字段以及相应的值,并正确处理类型问题:

 

Field[] fields = model.getClass().getDeclaredFields();
int len = fields.length;
for(int i = 0; i < len; i++){
	Field field = fields[i];
	String name = field.getName();
	field.setAccessible(true);
	Object value = field.get(model);
	//小心空指针异常,LogHandler线程会悄无声息地退出!
	if(value == null){
		LOG.debug("忽略空字段:"+name);
		continue;
	}
	if(i>0){
		json.append(",");
	}
	String valueClass=value.getClass().getSimpleName();
	LOG.debug("name: "+name+"   type: "+valueClass);
	if("Timestamp".equals(valueClass) || "Date".equals(valueClass)){
		//提交给ES的日期时间值要为"2014-01-31T13:53:54"这样的形式
		value=DateTypeConverter.toDefaultDateTime((Date)value).replace(" ", "T");
	}
	String prefix = "\"";
	String suffix = "\"";
	//提交给ES的数字和布尔值不要加双引号
	if("Float".equals(valueClass)
			|| "Double".equals(valueClass) 
			|| "Long".equals(valueClass) 
			|| "Integer".equals(valueClass)
			|| "Short".equals(valueClass)
			|| "Boolean".equals(valueClass)){
		prefix="";
		suffix="";
	}
	json.append("\"")
			.append(name)
			.append("\":")
			.append(prefix)
			.append(value)
			.append(suffix);
}
json.append("}\n");

 

构造完要提交的JSON数据之后,向服务器发送HTTP PUT请求:

 

HttpURLConnection conn = (HttpURLConnection) URL.openConnection();
conn.setRequestMethod("PUT");
conn.setDoOutput(true);
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(conn.getOutputStream(),"utf-8"));    
writer.write(json.toString());
writer.flush();
StringBuilder result = new StringBuilder();
try (BufferedReader reader = new BufferedReader (new InputStreamReader (conn.getInputStream()))) {
	String line = reader.readLine();
	while(line != null){
		result.append(line);
		line = reader.readLine();
	}
}

 

服务器会以JSON数据格式返回处理结果,我们使用Jackson解析返回的JSON字符串:

 

JsonNode node = MAPPER.readTree(resultStr);
for(JsonNode item : node.get("items")){
	JsonNode createJsonNode = item.get("create");
	JsonNode okJsonNode = createJsonNode.get("ok");
	if(okJsonNode != null){
		boolean r = okJsonNode.getBooleanValue();
		if(r){
			success++;
		}
	}else{
		JsonNode errorJsonNode = createJsonNode.get("error");
		if(errorJsonNode != null){
			String errorMessage = errorJsonNode.getTextValue();
			LOG.error("索引失败:"+errorMessage);
		}
	}
}

 

下面是ElasticSearchLogHandler完整的实现:

 

/**
 * 
 * 日志处理实现:
 * 将日志保存到ElasticSearch中
 * 进行高性能实时搜索和分析
 * 支持大规模分布式搜索
 * 
 * @author 杨尚川
 */
@Service
public class ElasticSearchLogHandler implements LogHandler{
    private static final APDPlatLogger LOG = new APDPlatLogger(ElasticSearchLogHandler.class);
    
    private static final String INDEX_NAME = PropertyHolder.getProperty("elasticsearch.log.index.name");
    private static final String HOST = PropertyHolder.getProperty("elasticsearch.host");
    private static final String PORT = PropertyHolder.getProperty("elasticsearch.port");
    private static final ObjectMapper MAPPER = new ObjectMapper();
    private static URL URL;
    
    private int success;
   
    public ElasticSearchLogHandler(){
        LOG.info("elasticsearch.log.index.name: "+INDEX_NAME);
        LOG.info("elasticsearch.host: "+HOST);
        LOG.info("elasticsearch.port: "+PORT);
        try {
            URL = new URL("http://"+HOST+":"+PORT+"/_bulk");
        } catch (MalformedURLException ex) {
            LOG.error("构造URL失败",ex);
        }
    }
    
    /**
     * 批量索引
     * 批量提交
     * 
     * @param <T> 泛型参数
     * @param list 批量模型
     */
    public <T extends Model> void index(List<T> list){
        success = 0;
        StringBuilder json = new StringBuilder();
        int j = 1;
        //构造批量索引请求
        for(T model : list){
            try{
                String simpleName = model.getClass().getSimpleName();
                LOG.debug((j++)+"、simpleName: 【"+simpleName+"】");
                json.append("{\"index\":{\"_index\":\"")
                        .append(INDEX_NAME)
                        .append("\",\"_type\":\"")
                        .append(simpleName)
                        .append("\"}}")
                        .append("\n");
                json.append("{");
                Field[] fields = model.getClass().getDeclaredFields();
                int len = fields.length;
                for(int i = 0; i < len; i++){
                    Field field = fields[i];
                    String name = field.getName();
                    field.setAccessible(true);
                    Object value = field.get(model);
                    //小心空指针异常,LogHandler线程会悄无声息地退出!
                    if(value == null){
                        LOG.debug("忽略空字段:"+name);
                        continue;
                    }
                    if(i>0){
                        json.append(",");
                    }
                    String valueClass=value.getClass().getSimpleName();
                    LOG.debug("name: "+name+"   type: "+valueClass);
                    if("Timestamp".equals(valueClass) || "Date".equals(valueClass)){
                        //提交给ES的日期时间值要为"2014-01-31T13:53:54"这样的形式
                        value=DateTypeConverter.toDefaultDateTime((Date)value).replace(" ", "T");
                    }
                    String prefix = "\"";
                    String suffix = "\"";
                    //提交给ES的数字和布尔值不要加双引号
                    if("Float".equals(valueClass)
                            || "Double".equals(valueClass) 
                            || "Long".equals(valueClass) 
                            || "Integer".equals(valueClass)
                            || "Short".equals(valueClass)
                            || "Boolean".equals(valueClass)){
                        prefix="";
                        suffix="";
                    }
                    json.append("\"")
                            .append(name)
                            .append("\":")
                            .append(prefix)
                            .append(value)
                            .append(suffix);
                }
                json.append("}\n");
            }catch(SecurityException | IllegalArgumentException | IllegalAccessException e){
                LOG.error("构造索引请求失败【"+model.getMetaData()+"】\n"+model, e);
            }
        }
        //批量提交索引
        try{
            LOG.debug("提交JSON数据:\n"+json.toString());
            HttpURLConnection conn = (HttpURLConnection) URL.openConnection();
            conn.setRequestMethod("PUT");
            conn.setDoOutput(true);
            BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(conn.getOutputStream(),"utf-8"));    
            writer.write(json.toString());
            writer.flush();
            StringBuilder result = new StringBuilder();
            try (BufferedReader reader = new BufferedReader (new InputStreamReader (conn.getInputStream()))) {
                String line = reader.readLine();
                while(line != null){
                    result.append(line);
                    line = reader.readLine();
                }
            }
            String resultStr = result.toString();
            LOG.debug(resultStr);          
            //使用Jackson解析返回的JSON
            JsonNode node = MAPPER.readTree(resultStr);
            for(JsonNode item : node.get("items")){
                JsonNode createJsonNode = item.get("create");
                JsonNode okJsonNode = createJsonNode.get("ok");
                if(okJsonNode != null){
                    boolean r = okJsonNode.getBooleanValue();
                    if(r){
                        success++;
                    }
                }else{
                    JsonNode errorJsonNode = createJsonNode.get("error");
                    if(errorJsonNode != null){
                        String errorMessage = errorJsonNode.getTextValue();
                        LOG.error("索引失败:"+errorMessage);
                    }
                }
            }
        }catch(IOException e){
            LOG.error("批量提交索引失败", e);
        }
    }
    
    @Override
    public <T extends Model> void handle(List<T> list) {
        LOG.info("开始将 "+list.size()+" 个日志对象索引到ElasticSearch服务器");
        long start = System.currentTimeMillis();
        index(list);
        long cost = System.currentTimeMillis() - start;
        
        if(success != list.size()){
            LOG.info("索引失败: "+(list.size()-success)+" 个");            
        }
        if(success > 0){
            LOG.info("索引成功: "+success+" 个");
        }
        LOG.info("耗时:"+ConvertUtils.getTimeDes(cost));
    }
}

 

最后我们在配置文件config.local.properties中指定log.handlers的值为ElasticSearchLogHandler类的Spring bean name elasticSearchLogHandler,因为ElasticSearchLogHandler类加了Spring的@Service注解:

 

log.handlers=elasticSearchLogHandler

 

 

APDPlat托管在Github

© 著作权归作者所有

杨尚川

杨尚川

粉丝 1103
博文 220
码字总数 1624053
作品 12
东城
架构师
私信 提问
加载中

评论(2)

开源中国首席码农
开源中国首席码农
APDPlat中并没有用到ElasticSearch?
开源中国首席码农
开源中国首席码农
请教下楼主Model类具体内容是什么?能否贴出完成的源码?包括日志
APDPlat拓展搜索之集成Solr

APDPlat充分利用Compass的OSEM和ORM integration特性,提供了简单易用且功能强大的内置搜索特性。 APDPlat的内置搜索,在设计简洁优雅的同时,还具备了强大的实时搜索能力,用户只需用注解的...

杨尚川
2014/02/01
662
0
APDPlat 2.6 已可用,支持Java8,支持Sping MVC

APDPlat 2.6 发布了,该版本最值得关注的改进包括对 Java 8 的支持,支持 Spring MVC,全面将 Compass 替换成 ElasticSearch,这是一个巨大的搜索改进,更多改进细节请看这里。 APDPlat 是 ...

杨尚川
2015/04/22
2.7K
3
基于word分词提供的文本相似度算法来实现通用的网页相似度检测

实现代码:基于word分词提供的文本相似度算法来实现通用的网页相似度检测 运行结果: 检查的博文数:128 1、检查博文:192本软件著作用词分析(五)用词最复杂99级,相似度分值:Simple=0.96...

杨尚川
2015/05/28
1K
0
APDPlat如何自动建库建表并初始化数据?

APDPlat共支持10种数据库:DB2、DERBY、H2、HSQL、INFORMIX、MYSQL、ORACLE、POSTGRESQL、SQLSERVER、SYBASE。 数据库的默认配置信息在文件APDPlatCore/src/main/resources/org/apdplat/db.p...

杨尚川
2014/02/08
360
0
APDPlat的系统启动和关闭流程剖析

APDPlat接管了Spring的启动关闭权,为各种运行其上的开源框架和类库的无缝集成提供了支持。 当然,大家都知道,一个JAVA EE Web应用的入口点是web.xml,APDPlat当然也不例外,我们看看APDPl...

杨尚川
2014/02/03
414
0

没有更多内容

加载失败,请刷新页面

加载更多

一套基于SpringBoot+Vue+Shiro 前后端分离 开发的代码生成器

一、前言 最近花了一个月时间完成了一套基于Spring Boot+Vue+Shiro前后端分离的代码生成器,目前项目代码已基本完成 止步传统CRUD,进阶代码优化: 该项目可根据数据库字段动态生成 controll...

郑清
33分钟前
2
0
javascript-十六进制随机颜色

<script> // 编写一个函数,获得一个十六进制的随机颜色的字符串(如#20CD4F) // function randomColor(){ // var r = random(0,255).toString(16); // var g = random(0,255).toString(16......

ACKo
35分钟前
2
0
springBoot +mybatis 出现sql 语句在数据库可以查询到,但是赋值到实体类上就没有的情况?

1.不要老是反复查看自己是否写错了,为啥有的能出来有的出不来? 可以查看配置文件中是否配置全: 如果在application.yml 文件中是如下配置: mybatis: mapper-locations: classpath:mapp...

kuchawyz
48分钟前
2
0
正则表达式

一、RegExp对象 进行验证和查找的API 1、创建对象: (1)用/创建(直接量):var reg=/正则/ig,表达式固定不变时使用 (2)用new创建:var reg=new RegExp(‘正则’,‘ig’),表达式需要...

wytao1995
48分钟前
2
0
实战限流(guava的RateLimiter)

关于限流 常用的限流算法有漏桶算法和令牌桶算法,guava的RateLimiter使用的是令牌桶算法,也就是以固定的频率向桶中放入令牌,例如一秒钟10枚令牌,实际业务在每次响应请求之前都从桶中获取...

程序员欣宸
49分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部