文档章节

APDPlat拓展搜索之集成Solr

杨尚川
 杨尚川
发布于 2014/02/01 23:39
字数 1817
阅读 662
收藏 17

APDPlat充分利用Compass的OSEMORM integration特性,提供了简单易用功能强大内置搜索特性。

 

APDPlat的内置搜索,在设计简洁优雅的同时,还具备了强大的实时搜索能力,用户只需用注解的方式在模型中指定需要搜索哪些字段(还可在模型之间进行关联搜索)就获得了搜索能力,而不用编写任何代码。平台自动处理索引维护、查询解析、结果高亮等支撑功能。

 

然而APDPlat的内置搜索只能在单机上面使用,不支持分布式,只能用于中小规模的场景。为了支持大规模的分布式搜索和实时分析,APDPlat除了可以选择Compass的进化版ElasticSearch外(APDPlat拓展搜索之集成ElasticSearch),还可以有另外一个选择,那就是Solr

 

Solr提供了Java Client API(SolrJ),我们可以使用SolrJ来和Solr服务器进行交互。首先我们在pom.xml中引入SolrJ依赖:

 

<dependency>
	<groupId>org.apache.solr</groupId>
	<artifactId>solr-solrj</artifactId>
	<version>${solrj.version}</version>
</dependency>

 

接下来我们看一个APDPlat和Solr集成的例子:

 

APDPlat提供了可扩展的日志处理接口,用户可编写自己的插件并在配置文件中指定启用哪些插件,日志处理接口如下:

 

/**
 * 日志处理接口:
 * 可将日志存入独立日志数据库(非业务数据库)
 * 可将日志传递到activemq\rabbitmq\zeromq等消息队列
 * 可将日志传递到kafka\flume\chukwa\scribe等日志聚合系统
 * 可将日志传递到elasticsearch\solr等搜索服务器
 * @author 杨尚川
 */
public interface LogHandler {
    public <T extends Model> void handle(List<T> list);
}

 

要想让Solr搜索服务器索引日志数据,我们首先要构造一个HttpSolrServer的实例,然后用待索引的日志对象列表构造一个SolrInputDocument列表,其次就可以调用HttpSolrServer的add和commit方法把SolrInputDocument列表提交给Solr服务器建立索引,最后解析返回结果,判断操作是否成功。

 

构造HttpSolrServer实例需要指定几个配置信息,这些配置信息默认存放在config.properties中,可以在config.local.properties中对其进行覆盖,如下所示:

 

#Solr服务器配置
solr.host=192.168.0.100
solr.port=8983
solr.core=apdplat_for_log
solr.max.retries=1
solr.connection.timeout=5
solr.allow.compression =9200
solr.socket.read.timeout=3000
solr.max.connections.per.host=100
solr.max.total.connections=300

 

当我们在配置Solr服务器的时候,要把core如这里的apdplat_for_log配置为schemaless,否则需要一一指定待索引的日志对象的字段就太麻烦了,因为我们把apdplat_for_log这个core配置为schemaless,所以我们提交的各种各样未知类型的对象都可以索引到同一个core中。我们在建立索引的时候加一个type字段,其值为对象的类名称,这样搜索的时候就可以区分不同的对象。

 

 我们看看如何构造HttpSolrServer:

 

    private static final String host = PropertyHolder.getProperty("solr.host");
    private static final String port = PropertyHolder.getProperty("solr.port");
    private static final String core = PropertyHolder.getProperty("solr.core");
    private static final int maxRetries = PropertyHolder.getIntProperty("solr.max.retries");
    private static final int connectionTimeout = PropertyHolder.getIntProperty("solr.connection.timeout");
    private static final boolean allowCompression = PropertyHolder.getBooleanProperty("solr.allow.compression");
    private static final int socketReadTimeout = PropertyHolder.getIntProperty("solr.socket.read.timeout");
    private static final int maxConnectionsPerHost = PropertyHolder.getIntProperty("solr.max.connections.per.host");
    private static final int maxTotalConnections = PropertyHolder.getIntProperty("solr.max.total.connections");   
    private static SolrServer solrServer;
   
    public SolrLogHandler(){
        LOG.info("solr.host: "+host);
        LOG.info("solr.port: "+port);
        LOG.info("solr.core: "+core);
        LOG.info("solr.max.retries: "+maxRetries);
        LOG.info("solr.connection.timeout: "+connectionTimeout);
        LOG.info("solr.allow.compression: "+allowCompression);
        LOG.info("solr.socket.read.timeout: "+socketReadTimeout);
        LOG.info("solr.max.connections.per.host: "+maxConnectionsPerHost);
        LOG.info("solr.max.total.connections: "+maxTotalConnections);
        
        String url = "http://"+host+":"+port+"/solr/"+core+"/";
        LOG.info("初始化Solr服务器连接:"+url);
        HttpSolrServer httpSolrServer = new HttpSolrServer(url);
        httpSolrServer.setMaxRetries(maxRetries);
        httpSolrServer.setConnectionTimeout(connectionTimeout);
        httpSolrServer.setAllowCompression(allowCompression);
        httpSolrServer.setSoTimeout(socketReadTimeout);
        httpSolrServer.setDefaultMaxConnectionsPerHost(maxConnectionsPerHost);
        httpSolrServer.setMaxTotalConnections(maxTotalConnections);
        
        solrServer = httpSolrServer;
    }

 

 

值得注意的是这里的url

 

String url = "http://"+host+":"+port+"/solr/"+core+"/";

 

 

接下来要把日志对象列表转换为SolrInputDocument列表:

 

    public <T extends Model> List<SolrInputDocument> getSolrInputDocuments(List<T> list){        
        int j = 1;
        //构造批量索引请求
        List<SolrInputDocument> docs = new ArrayList<>(list.size());
        LOG.info("开始构造Solr文档");
        for(T model : list){
            try{
                String simpleName = model.getClass().getSimpleName();
                LOG.debug((j++)+"、simpleName: 【"+simpleName+"】");          
                SolrInputDocument doc = new SolrInputDocument();                
                Field[] fields = model.getClass().getDeclaredFields();
                int len = fields.length;
                for(int i = 0; i < len; i++){
                    Field field = fields[i];
                    String name = field.getName();
                    field.setAccessible(true);
                    Object value = field.get(model);
                    //小心空指针异常,LogHandler线程会悄无声息地推出!
                    if(value == null){
                        LOG.debug("忽略空字段:"+name);
                        continue;
                    }
                    LOG.debug("name: "+name+"   value: "+value);
                    doc.addField(name, value);
                }
                //日志类型(类名称)
                doc.addField("type", simpleName);
                //增加主键
                UUID uuid = UUID.randomUUID();
                doc.addField("id", uuid.toString());
                docs.add(doc);
            }catch(IllegalAccessException | IllegalArgumentException | SecurityException e){
                LOG.error("构造索引请求失败【"+model.getMetaData()+"】\n"+model, e);
            }
        }
        LOG.info("Solr文档构造完毕");
        return docs;
    }

 

 

这里,我们用UUID生成了一个随机主键,增加了一个type字段,其值为类名称,使用反射的方式取得日志对象字段名称字段值

 

文档列表准备完毕之后,就可以提交索引请求了:

 

solrServer.add(docs);
UpdateResponse updateResponse = solrServer.commit();

 

 

然后处理返回结果,判断索引操作是否成功:

 

int status = updateResponse.getStatus();
if(status==0){
	LOG.info("成功为Core: "+core+" 提交 "+docs.size()+" 个文档");
}else{
	LOG.info("索引提交失败,status:"+status);
}

 

 

下面是SolrLogHandler完整的实现:

 

/**
 * 
 * 日志处理插件:
 * 将日志保存到Solr中
 * 进行高性能实时搜索和分析
 * 支持大规模分布式搜索
 * 
 * @author 杨尚川
 */
@Service
public class SolrLogHandler implements LogHandler{
    private static final APDPlatLogger LOG = new APDPlatLogger(SolrLogHandler.class);
    
    private static final String host = PropertyHolder.getProperty("solr.host");
    private static final String port = PropertyHolder.getProperty("solr.port");
    private static final String core = PropertyHolder.getProperty("solr.core");
    private static final int maxRetries = PropertyHolder.getIntProperty("solr.max.retries");
    private static final int connectionTimeout = PropertyHolder.getIntProperty("solr.connection.timeout");
    private static final boolean allowCompression = PropertyHolder.getBooleanProperty("solr.allow.compression");
    private static final int socketReadTimeout = PropertyHolder.getIntProperty("solr.socket.read.timeout");
    private static final int maxConnectionsPerHost = PropertyHolder.getIntProperty("solr.max.connections.per.host");
    private static final int maxTotalConnections = PropertyHolder.getIntProperty("solr.max.total.connections");   
    private static SolrServer solrServer;
   
    public SolrLogHandler(){
        LOG.info("solr.host: "+host);
        LOG.info("solr.port: "+port);
        LOG.info("solr.core: "+core);
        LOG.info("solr.max.retries: "+maxRetries);
        LOG.info("solr.connection.timeout: "+connectionTimeout);
        LOG.info("solr.allow.compression: "+allowCompression);
        LOG.info("solr.socket.read.timeout: "+socketReadTimeout);
        LOG.info("solr.max.connections.per.host: "+maxConnectionsPerHost);
        LOG.info("solr.max.total.connections: "+maxTotalConnections);
        
        String url = "http://"+host+":"+port+"/solr/"+core+"/";
        LOG.info("初始化Solr服务器连接:"+url);
        HttpSolrServer httpSolrServer = new HttpSolrServer(url);
        httpSolrServer.setMaxRetries(maxRetries);
        httpSolrServer.setConnectionTimeout(connectionTimeout);
        httpSolrServer.setAllowCompression(allowCompression);
        httpSolrServer.setSoTimeout(socketReadTimeout);
        httpSolrServer.setDefaultMaxConnectionsPerHost(maxConnectionsPerHost);
        httpSolrServer.setMaxTotalConnections(maxTotalConnections);
        
        solrServer = httpSolrServer;
    }
    
    @Override
    public <T extends Model> void handle(List<T> list) {
        LOG.info("开始将 "+list.size()+" 个日志对象索引到Solr服务器");
        long start = System.currentTimeMillis();
        index(list);
        long cost = System.currentTimeMillis() - start;
        LOG.info("耗时:"+ConvertUtils.getTimeDes(cost));
    } 
    /**
     * 批量索引
     * 批量提交
     * 
     * @param <T> 泛型参数
     * @param list 批量模型
     */
    public <T extends Model> void index(List<T> list){
        List<SolrInputDocument> docs = getSolrInputDocuments(list);
        //批量提交索引
        try{
            LOG.info("开始批量提交索引文档");
            solrServer.add(docs);
            UpdateResponse updateResponse = solrServer.commit();     
            int status = updateResponse.getStatus();
            if(status==0){
                LOG.info("成功为Core: "+core+" 提交 "+docs.size()+" 个文档");
            }else{
                LOG.info("索引提交失败,status:"+status);
            }
            LOG.info("ResponseHeader:\n"+updateResponse.getResponseHeader().toString());
            LOG.info("Response:\n"+updateResponse.getResponse().toString());
            //加速内存释放
            docs.clear();
        }catch(IOException | SolrServerException e){
            LOG.error("批量提交索引失败", e);
        }
    }
    /**
     * 把对象列表转换为SOLR文档列表
     * @param <T> 对象类型
     * @param list 对象列表
     * @return SOLR文档列表
     */
    public <T extends Model> List<SolrInputDocument> getSolrInputDocuments(List<T> list){        
        int j = 1;
        //构造批量索引请求
        List<SolrInputDocument> docs = new ArrayList<>(list.size());
        LOG.info("开始构造Solr文档");
        for(T model : list){
            try{
                String simpleName = model.getClass().getSimpleName();
                LOG.debug((j++)+"、simpleName: 【"+simpleName+"】");          
                SolrInputDocument doc = new SolrInputDocument();                
                Field[] fields = model.getClass().getDeclaredFields();
                int len = fields.length;
                for(int i = 0; i < len; i++){
                    Field field = fields[i];
                    String name = field.getName();
                    field.setAccessible(true);
                    Object value = field.get(model);
                    //小心空指针异常,LogHandler线程会悄无声息地推出!
                    if(value == null){
                        LOG.debug("忽略空字段:"+name);
                        continue;
                    }
                    LOG.debug("name: "+name+"   value: "+value);
                    doc.addField(name, value);
                }
                //日志类型(类名称)
                doc.addField("type", simpleName);
                //增加主键
                UUID uuid = UUID.randomUUID();
                doc.addField("id", uuid.toString());
                docs.add(doc);
            }catch(IllegalAccessException | IllegalArgumentException | SecurityException e){
                LOG.error("构造索引请求失败【"+model.getMetaData()+"】\n"+model, e);
            }
        }
        LOG.info("Solr文档构造完毕");
        return docs;
    }
}

 

 

最后我们在配置文件config.local.properties中指定log.handlers的值为SolrLogHandler类的Spring bean name solrSearchLogHandler,因为SolrLogHandler类加了Spring的@Service注解:

 

log.handlers=solrLogHandler

 

 

 APDPlat托管在Github

© 著作权归作者所有

杨尚川

杨尚川

粉丝 1103
博文 220
码字总数 1624053
作品 12
东城
架构师
私信 提问
APDPlat拓展搜索之集成ElasticSearch

APDPlat充分利用Compass的OSEM和ORM integration特性,提供了简单易用且功能强大的内置搜索特性。 APDPlat的内置搜索,在设计简洁优雅的同时,还具备了强大的实时搜索能力,用户只需用注解的...

杨尚川
2014/02/01
292
2
APDPlat如何自动建库建表并初始化数据?

APDPlat共支持10种数据库:DB2、DERBY、H2、HSQL、INFORMIX、MYSQL、ORACLE、POSTGRESQL、SQLSERVER、SYBASE。 数据库的默认配置信息在文件APDPlatCore/src/main/resources/org/apdplat/db.p...

杨尚川
2014/02/08
360
0
基于word分词提供的文本相似度算法来实现通用的网页相似度检测

实现代码:基于word分词提供的文本相似度算法来实现通用的网页相似度检测 运行结果: 检查的博文数:128 1、检查博文:192本软件著作用词分析(五)用词最复杂99级,相似度分值:Simple=0.96...

杨尚川
2015/05/28
1K
0
APDPlat的系统启动和关闭流程剖析

APDPlat接管了Spring的启动关闭权,为各种运行其上的开源框架和类库的无缝集成提供了支持。 当然,大家都知道,一个JAVA EE Web应用的入口点是web.xml,APDPlat当然也不例外,我们看看APDPl...

杨尚川
2014/02/03
414
0
应用级产品开发平台 - APDPlat

APDPlat快速体验 APDPlat入门指南 APDPlat专题文章 APDPlat是Application Product Development Platform(应用级产品开发平台)的缩写。 APDPlat提供了应用容器、多模块架构、代码生成、安装...

杨尚川
2012/10/30
6.7K
0

没有更多内容

加载失败,请刷新页面

加载更多

最简单的获取相机拍照的图片

  import android.content.Intent;import android.graphics.Bitmap;import android.os.Bundle;import android.os.Environment;import android.provider.MediaStore;import andr......

MrLins
51分钟前
4
0
说好不哭!数据可视化深度干货,前端开发下一个涨薪点在这里~

随着互联网在各行各业的影响不断深入,数据规模越来越大,各企业也越来越重视数据的价值。作为一家专业的数据智能公司,个推从消息推送服务起家,经过多年的持续耕耘,积累沉淀了海量数据,在...

个推
53分钟前
7
0
第三方支付-返回与回调注意事项

不管是支付宝,微信,还是其它第三方支付,第四方支付,支付机构服务商只要涉及到钱的交易都要进行如下校验,全部成功了才视为成功订单 1.http请求是否成功 2.校验商户号 3.校验订单号及状态...

Shingfi
56分钟前
4
0
简述Java内存分配和回收策略以及Minor GC 和 Major GC(Full GC)

内存分配: 1. 栈区:栈可分为Java虚拟机和本地方法栈 2. 堆区:堆被所有线程共享,在虚拟机启动时创建,是唯一的目的是存放对象实例,是gc的主要区域。通常可分为两个区块年轻代和年老代。更...

DustinChan
今天
6
0
Excel插入批注:可在批注插入文字、形状、图片

1.批注一直显示:审阅选项卡-------->勾选显示批注选项: 2.插入批注快捷键:Shift+F2 组合键 3.在批注中插入图片:鼠标右键点击批注框的小圆点【重点不可以在批注文本框内点击】----->调出批...

东方墨天
今天
6
1

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部