文档章节

solr自己设置hadler

泡海椒
 泡海椒
发布于 2017/08/30 13:34
字数 912
阅读 22
收藏 0

package com.solr.handler;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpException;
import org.apache.commons.httpclient.HttpStatus;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.commons.httpclient.UsernamePasswordCredentials;
import org.apache.commons.httpclient.auth.AuthScope;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.methods.PostMethod;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.JavaBinCodec;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrEventListener;
import org.apache.solr.core.SolrInfoMBean;
import org.apache.solr.handler.RequestHandlerBase;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.util.plugin.SolrCoreAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 *  解决多个solr server 共用一个索引目录时,当其中一个server commit后其他solr server需要reopen才能查询到数据
 *       <requestHandler name="/indexversion" class="com.paic.solr.handler.SharedIndexHandler" >
                <str name="notifyurl">http://localhost:7001/solr/indexversion;http://hcd-it-218:8983/solr/indexversion</str>
                <str name="way">update?commit=true</str>
         </requestHandler>  
 *
 *
 */
public class SharedIndexHandler extends RequestHandlerBase implements SolrCoreAware {
    private static final Logger LOG = LoggerFactory.getLogger(SharedIndexHandler.class.getName());
    private SolrCore core;
    private static HttpClient client;
    private HttpClient myHttpClient;
    private volatile Long indexVersion = 0L;

    private static synchronized HttpClient createHttpClient(String connTimeout, String readTimeout) {
        if (connTimeout == null && readTimeout == null && client != null)
            return client;
        MultiThreadedHttpConnectionManager mgr = new MultiThreadedHttpConnectionManager();
        // Keeping a very high number so that if you have a large number of cores
        // no requests are kept waiting for an idle connection.
        mgr.getParams().setDefaultMaxConnectionsPerHost(10000);
        mgr.getParams().setMaxTotalConnections(10000);
        mgr.getParams().setSoTimeout(readTimeout == null ? 20000 : Integer.parseInt(readTimeout)); //20 secs
        mgr.getParams().setConnectionTimeout(connTimeout == null ? 5000 : Integer.parseInt(connTimeout)); //5 secs
        HttpClient httpClient = new HttpClient(mgr);
        if (client == null && connTimeout == null && readTimeout == null)
            client = httpClient;
        return httpClient;
    }

    @Override
    public void inform(SolrCore core2) {
        this.core = core2;
        String urls = (String) initArgs.get(NOTIFYURL);
        if (urls == null || urls.trim().length() == 0)
            return;
        List<String> notifyurl = new ArrayList<String>();
        String[] urlsArray = urls.split(";");
        String hostip = null;
        try {
            hostip = java.net.InetAddress.getLocalHost().getHostAddress();
        } catch (java.net.UnknownHostException e) {
            LOG.error(e.getMessage(), e);
        }
        for (int i = 0; i < urlsArray.length; i++) {
            if (urlsArray[i].indexOf(hostip) > -1)
                continue;
            notifyurl.add(urlsArray[i]);
        }
        String way = (String) initArgs.get(NOTIFYWAY);

        String connTimeout = (String) initArgs.get(HTTP_CONN_TIMEOUT);
        String readTimeout = (String) initArgs.get(HTTP_READ_TIMEOUT);
        String httpBasicAuthUser = (String) initArgs.get(HTTP_BASIC_AUTH_USER);
        String httpBasicAuthPassword = (String) initArgs.get(HTTP_BASIC_AUTH_PASSWORD);
        myHttpClient = createHttpClient(connTimeout, readTimeout);
        if (httpBasicAuthUser != null && httpBasicAuthPassword != null) {
            myHttpClient.getState().setCredentials(AuthScope.ANY,
                    new UsernamePasswordCredentials(httpBasicAuthUser, httpBasicAuthPassword));
        }
        core.registerNewSearcherListener(getEventListener(notifyurl, way, myHttpClient));
        Map<String, SolrInfoMBean> reg = core.getInfoRegistry();
        SolrInfoMBean m = reg.get("searcher");
        try {
            indexVersion = (Long) m.getStatistics().get("indexVersion");
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }

    @Override
    public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
        rsp.setHttpCaching(false);
        final SolrParams solrParams = req.getParams();
        String command = solrParams.get(COMMAND);
        if (command == null) {
            rsp.add(STATUS, OK_STATUS);
            rsp.add("message", "No command");
            return;
        } else if (command.equals(CMD_INDEX_VERSION)) {
            rsp.add(CMD_INDEX_VERSION, indexVersion);
        } else if (command.equals(CMD_SHARED_DATAIMPORT_STATUS)) {
            String urls = (String) initArgs.get(NOTIFYURL);
            String[] urlsArray = urls.split(";");
            //检查dataimport是否完成
            //http://localhost:7001/solr/indexversion
            //http://localhost:8983/solr/dataimport?command=status
            String dataimportStatus = DATA_IMPORT_IDLE_STAUTS;
            for (String url : urlsArray) {
                if(url==null||url.length()==0) continue;
                String queryUrl = url.substring(0, url.lastIndexOf("/") + 1) + "dataimport";
                dataimportStatus = queryDataimportStatus(queryUrl);
                if (!dataimportStatus.equals(DATA_IMPORT_IDLE_STAUTS))
                    break;
            }
            rsp.add("dataimporstatus", dataimportStatus);
        }
    }

    private String queryDataimportStatus(String queryUrl) {
        GetMethod getMethod = new GetMethod(queryUrl);
        getMethod.setQueryString("command=status&wt=javabin");
        String dataimportStatus = DATA_IMPORT_IDLE_STAUTS;
        try {
            int requestStatus = myHttpClient.executeMethod(getMethod);
            if (requestStatus == HttpStatus.SC_OK) {
                NamedList nl = (NamedList) new JavaBinCodec().unmarshal(getMethod.getResponseBodyAsStream());
                dataimportStatus = (String) nl.get("status");
                System.out.println(dataimportStatus);
            }
        } catch (HttpException e) {
            LOG.error("查询dataimport状态错误。", e);
        } catch (IOException e) {
            LOG.error("查询dataimport状态错误。", e);
        } finally {
            getMethod.releaseConnection();
        }
        return dataimportStatus;
    }

    @Override
    public String getDescription() {
        return "SharedIndexHandler used by multiple Solr servers and a shared index. to notify each other";
    }

    @Override
    public String getSourceId() {
        return "$Id: SharedIndexHandler.java 2012-09-15 $";
    }

    @Override
    public String getSource() {
        return "$URL: http://svn.apache.org/repos/asf/lucene/dev/tags/lucene_solr_3_6_0/com/paic/solr/handler/handler/SharedIndexHandler.java $";
    }

    @Override
    public String getVersion() {
        return "$Revision: 001 $";
    }

    /**
     *  在post commit执行时先判断远程服务器的版本和本地是否一致,
     *  不一致需要触发commit请求
     * @param notifyurls
     * @return
     */
    private SolrEventListener getEventListener(final List<String> notifyurls, final String way,
            final HttpClient httpclient) {
        return new SolrEventListener() {
            @Override
            public void init(NamedList args) {

            }

            @Override
            public void postCommit() {

            }

            @Override
            public void newSearcher(SolrIndexSearcher newSearcher, SolrIndexSearcher currentSearcher) {
                indexVersion = (Long) newSearcher.getStatistics().get("indexVersion");
                Thread t = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        for (String url : notifyurls) {
                            NamedList response = null;
                            try {
                                response = getLatestVersion(url);
                            } catch (Exception e) {
                                LOG.error(" 获取索引版本信息失败: " + e.getMessage());
                                continue;
                            }
                            long latestVersion = (Long) response.get(CMD_INDEX_VERSION);
                            LOG.info(url + ":" + latestVersion + "   本地:" + indexVersion);
                            if (indexVersion == latestVersion) {
                                continue;
                            }
                            String notifyurl = url.substring(0, url.lastIndexOf("/") + 1) + way;
                            LOG.info("need to notify=" + notifyurl);
                            GetMethod getMethod = new GetMethod(notifyurl);
                            try {
                                int statusCode = httpclient.executeMethod(getMethod);
                                if (statusCode == HttpStatus.SC_OK) {
                                    LOG.info("请求:" + notifyurl + "成功.");
                                }
                            } catch (HttpException e) {
                                LOG.error("触发请求" + notifyurl + "失败");
                            } catch (IOException e) {
                                LOG.error("触发请求" + notifyurl + "失败");
                            } finally {
                                getMethod.releaseConnection();
                            }
                        }
                    }
                });
                t.start();

            }

            @SuppressWarnings("unchecked")
            NamedList getLatestVersion(String notifyUrl) throws IOException {
                PostMethod post = new PostMethod(notifyUrl);
                post.addParameter(COMMAND, CMD_INDEX_VERSION);
                post.addParameter("wt", "javabin");
                return getNamedListResponse(post);
            }

            private NamedList getNamedListResponse(PostMethod method) throws IOException {
                try {
                    int status = httpclient.executeMethod(method);
                    if (status != HttpStatus.SC_OK) {
                        throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
                                "Request failed for the url " + method);
                    }
                    return (NamedList) new JavaBinCodec().unmarshal(method.getResponseBodyAsStream());
                } finally {
                    try {
                        method.releaseConnection();
                    } catch (Exception e) {
                    }
                }
            }
        };
    }

    public static final String NOTIFYURL = "notifyurl";
    public static final String NOTIFYWAY = "way";
    public static final String HTTP_CONN_TIMEOUT = "httpConnTimeout";
    public static final String COMMAND = "command";
    public static final String HTTP_READ_TIMEOUT = "httpReadTimeout";
    public static final String HTTP_BASIC_AUTH_USER = "httpBasicAuthUser";
    public static final String HTTP_BASIC_AUTH_PASSWORD = "httpBasicAuthPassword";
    public static final String STATUS = "status";
    public static final String ERR_STATUS = "ERROR";
    public static final String OK_STATUS = "OK";
    public static final String CMD_INDEX_VERSION = "indexversion";
    public static final String CMD_SHARED_DATAIMPORT_STATUS = "dataimportstatus";
    public static final String DATA_IMPORT_IDLE_STAUTS = "idle";

}
 

© 著作权归作者所有

共有 人打赏支持
上一篇: 面试
下一篇: ClassLoader解惑
泡海椒
粉丝 11
博文 279
码字总数 291526
作品 0
成都
程序员
私信 提问
session_set_save_handler函数的简单用法

void sessionsetsave_handler (string open, string close, string read, string write, string destroy, string gc) 这个函数可以定义用户级的session的保存函数(打开、关闭、写入等)。 ...

刘赤龙
2010/06/08
0
1
session_set_save_handler函数的简单用法

void sessionsetsave_handler (string open, string close, string read, string write, string destroy, string gc) 这个函数可以定义用户级的session的保存函数(打开、关闭、写入等)。 ...

刘赤龙
2010/05/26
0
0
solr-集群安装搭建-入门

0,下载安装源码,solr的官网是 :http://lucene.apache.org/solr/ 1,解压出 源码中的安装脚本。假如 安装源码是: solr-5.4.0.tgz,那么执行命令是: tar -zxvf solr-5.4.0.tgz solr-5.4....

岸芷汀兰
2015/12/31
111
0
ubuntu14.04下tomcat7部署solr-4.10.4

看过好多安装的博文,结合了几篇打算自己写一个简单的整合步奏,首先保证安装了jdk和tomcat 1.下载solr-4.10.4,解压到/opt下 2.为 solr 选择一个目录,并使用 SOLR_HOME 环境变量指向这个目录...

开挂奥特曼
2015/05/11
0
0
solr4.9在tomcat7下安装、配置

solr简介 solr是受欢迎的,速度极快的开源企业搜索平台。主要功能包括强大的全文搜索,搜索词高亮显示,分面搜索,近实时的索引,动态聚类,数据库集成,丰富的文件(例如,Word,PDF)处理,...

cloud-coder
2014/08/21
0
1

没有更多内容

加载失败,请刷新页面

加载更多

“敏捷开发”怎么就“敏捷”了

什么是敏捷开发 传统的软件开发过程中,我们往往会针对特定的用户需求,采用“瀑布模型”,从用户的需求开始一步步进行需求分析、软件设计、软件开发、软件测试以及软件交付与维护。 然而,这...

SamYjy
37分钟前
2
0
聊聊我怎么系统学习Linux技能并快速提高的

随着电子信息科技时代的发展,学会使用计算机在我们的生活中成为了必不可少的一项技能。而作为计算机中的三大操作系统之一的Linux更是饱受计算机爱好者们的喜爱。今天我们就来和大家一起聊一...

linuxprobe16
49分钟前
3
0
MySQL专题—— 从认识索引到理解索引【索引优化】

认识索引 认识索引是什么东西非常关键,一个非常恰当的比喻就是书的目录页与书的正文内容之间的关系,为了方便查找书中的内容,通过对内容建立索引形成目录。因此,首先你要明白的一点就是,...

架构师springboot
53分钟前
2
0
Java-怎样构造方法和匿名对象

前言 在编写程序时不安全的初始化会导致程序发生发生重大错误。为了使程序可以被安全地初始化,C++引入了构造器(也可以成为构造方法)的概念,这是一个在创建对象时被自动调用的特殊方法。J...

小刀爱编程
今天
2
0
7、MyBaties 增删改

事务 : 从数据库角度出发,完成业务时需要执行的 SQL 集合,统称一个事务. 1、在 mybatis 中默认是关闭了 JDBC 的自动提交功能 每一个 SqlSession 默认都是不自动提交事务. session.commit()提...

KingFightingAn
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部