solr源码阅读(记录一次调试熟悉solr整体调用流程)

原创
2019/07/24 23:56
阅读数 491

历史版本solr5.3,开发需要,把一次源码阅读的经历记录一下,这次撸的是一次普通的select的流程代码阅读,根据查询参数肯定有很多分支,这里只阅读了一条,但大体也能看明白solr的整体编码结构

第一步,web项目先看什么,当然是web.xml

第二步:看一下SolrRequestFilter,前面主要做了一些url、权限的验证忽略跳过了,直接看核心

HttpSolrCall call = getHttpSolrCall((HttpServletRequest) request, (HttpServletResponse) response, retry);

对request、response还有核心 SolrCore进行了一次包装聚合

Action result = call.call();

public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain, boolean retry) throws IOException, ServletException {
    if (!(request instanceof HttpServletRequest)) return;

    AtomicReference<ServletRequest> wrappedRequest = new AtomicReference<>();
    if (!authenticateRequest(request, response, wrappedRequest)) { // the response and status code have already been sent
      return;
    }
    if (wrappedRequest.get() != null) {
      request = wrappedRequest.get();
    }
    if (cores.getAuthenticationPlugin() != null) {
      log.debug("User principal: {}", ((HttpServletRequest)request).getUserPrincipal());
    }

    // No need to even create the HttpSolrCall object if this path is excluded.
    if(excludePatterns != null) {
      String requestPath = ((HttpServletRequest) request).getServletPath();// /db/select
      String extraPath = ((HttpServletRequest)request).getPathInfo();
      if (extraPath != null) { // In embedded mode, servlet path is empty - include all post-context path here for testing 
        requestPath += extraPath;
      }
      for (Pattern p : excludePatterns) {
        Matcher matcher = p.matcher(requestPath);
        if (matcher.lookingAt()) {
          chain.doFilter(request, response);
          return;
        }
      }
    }
    
    HttpSolrCall call = getHttpSolrCall((HttpServletRequest) request, (HttpServletResponse) response, retry);//重要
    try {
      Action result = call.call();//重要
      switch (result) {
        case PASSTHROUGH:
          chain.doFilter(request, response);
          break;
        case RETRY:
          doFilter(request, response, chain, true);
          break;
        case FORWARD:
          request.getRequestDispatcher(call.getPath()).forward(request, response);
          break;
      }  
    } finally {
      call.destroy();
    }
  }

第三步:HttpSolrCall.call() 做了什么?

/**
 * This method processes the request.
 */
public Action call() throws IOException {
  MDCLoggingContext.reset();
  MDCLoggingContext.setNode(cores);

  if (cores == null) {
    sendError(503, "Server is shutting down or failed to initialize");
    return RETURN;
  }

  if (solrDispatchFilter.abortErrorMessage != null) {
    sendError(500, solrDispatchFilter.abortErrorMessage);
    return RETURN;
  }

  try {
    init();  //这里面 做了一些初始化,关注了一下handler的初始化   handler = cores.getRequestHandler(path);
    /* Authorize the request if
     1. Authorization is enabled, and
     2. The requested resource is not a known static file
      */
    if (cores.getAuthorizationPlugin() != null && shouldAuthorize()) {
      AuthorizationContext context = getAuthCtx();
      log.info(context.toString());
      AuthorizationResponse authResponse = cores.getAuthorizationPlugin().authorize(context);
      if (authResponse.statusCode == AuthorizationResponse.PROMPT.statusCode) {
        Map<String, String> headers = (Map) getReq().getAttribute(AuthenticationPlugin.class.getName());
        if (headers != null) {
          for (Map.Entry<String, String> e : headers.entrySet()) response.setHeader(e.getKey(), e.getValue());
        }
        log.debug("USER_REQUIRED "+req.getHeader("Authorization")+" "+ req.getUserPrincipal());
      }
      if (!(authResponse.statusCode == HttpStatus.SC_ACCEPTED) && !(authResponse.statusCode == HttpStatus.SC_OK)) {
        sendError(authResponse.statusCode,
            "Unauthorized request, Response code: " + authResponse.statusCode);
        return RETURN;
      }
    }

    HttpServletResponse resp = response;
    switch (action) {
      case ADMIN:
        handleAdminRequest();
        return RETURN;
      case REMOTEQUERY:
        remoteQuery(coreUrl + path, resp);
        return RETURN;
      case PROCESS:
        final Method reqMethod = Method.getMethod(req.getMethod());
        HttpCacheHeaderUtil.setCacheControlHeader(config, resp, reqMethod);
        // unless we have been explicitly told not to, do cache validation
        // if we fail cache validation, execute the query
        if (config.getHttpCachingConfig().isNever304() ||
            !HttpCacheHeaderUtil.doCacheHeaderValidation(solrReq, req, reqMethod, resp)) {
          SolrQueryResponse solrRsp = new SolrQueryResponse();
            /* even for HEAD requests, we need to execute the handler to
             * ensure we don't get an error (and to make sure the correct
             * QueryResponseWriter is selected and we get the correct
             * Content-Type)
             */
          SolrRequestInfo.setRequestInfo(new SolrRequestInfo(solrReq, solrRsp));
          execute(solrRsp);
          HttpCacheHeaderUtil.checkHttpCachingVeto(solrRsp, resp, reqMethod);
          Iterator<Map.Entry<String, String>> headers = solrRsp.httpHeaders();
          while (headers.hasNext()) {
            Map.Entry<String, String> entry = headers.next();
            resp.addHeader(entry.getKey(), entry.getValue());
          }
          QueryResponseWriter responseWriter = core.getQueryResponseWriter(solrReq);
          if (invalidStates != null) solrReq.getContext().put(CloudSolrClient.STATE_VERSION, invalidStates);
          writeResponse(solrRsp, responseWriter, reqMethod);
        }
        return RETURN;
      default: return action;
    }
  } catch (Throwable ex) {
    sendError(ex);
    // walk the the entire cause chain to search for an Error
    Throwable t = ex;
    while (t != null) {
      if (t instanceof Error) {
        if (t != ex) {
          SolrDispatchFilter.log.error("An Error was wrapped in another exception - please report complete stacktrace on SOLR-6161", ex);
        }
        throw (Error) t;
      }
      t = t.getCause();
    }
    return RETURN;
  } finally {
    MDCLoggingContext.clear();
  }

}

call方法里面重要的有:

1.执行本次请求

SolrRequestInfo.setRequestInfo(new SolrRequestInfo(solrReq, solrRsp));
execute(solrRsp);
HttpCacheHeaderUtil.checkHttpCachingVeto(solrRsp, resp, reqMethod);

2.对搜索结果进行封装,并通过response进行回写

QueryResponseWriter responseWriter = core.getQueryResponseWriter(solrReq);
if (invalidStates != null) solrReq.getContext().put(CloudSolrClient.STATE_VERSION, invalidStates);
writeResponse(solrRsp, responseWriter, reqMethod);

第五步:先观察一下 上面的 execute,最终调用 SolrCore 核心类的 execute方法

solrReq.getCore().execute(handler, solrReq, rsp);//最终调用 SolrCore 核心类的 execute方法
public void execute(SolrRequestHandler handler, SolrQueryRequest req, SolrQueryResponse rsp) {
  if (handler==null) {
    String msg = "Null Request Handler '" +
      req.getParams().get(CommonParams.QT) + "'";

    if (log.isWarnEnabled()) log.warn(logid + msg + ":" + req);

    throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, msg);
  }

  preDecorateResponse(req, rsp);//返回结果预拼装,可以看一下,不过感觉返回的时候的封装更值得一读

  if (requestLog.isDebugEnabled() && rsp.getToLog().size() > 0) {
    // log request at debug in case something goes wrong and we aren't able to log later
    requestLog.debug(rsp.getToLogAsString(logid));
  }

  // TODO: this doesn't seem to be working correctly and causes problems with the example server and distrib (for example /spell)
  // if (req.getParams().getBool(ShardParams.IS_SHARD,false) && !(handler instanceof SearchHandler))
  //   throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,"isShard is only acceptable with search handlers");


  handler.handleRequest(req,rsp);//这里用到了第三步 init 初始化的handler ,我测试的seach方法,这里用到的就是 SearchHandler类的 handler方法
  postDecorateResponse(handler, req, rsp);//和 前面的 preDecorateResponse 成对拼接字符串

  if (rsp.getToLog().size() > 0) {
    if (requestLog.isInfoEnabled()) {
      requestLog.info(rsp.getToLogAsString(logid));
    }

    if (log.isWarnEnabled() && slowQueryThresholdMillis >= 0) {
      final long qtime = (long) (req.getRequestTimer().getTime());
      if (qtime >= slowQueryThresholdMillis) {
        log.warn("slow: " + rsp.getToLogAsString(logid));
      }
    }
  }
}

第六步: 继续深挖,handler.handleRequest(req,rsp) 先执行了RequestHandlerBase  handleRequest,然后里面主要调用了 SearchHandler的 handleRequestBody方法,原因看第四步的代码,这个比较长,大部分逻辑都在这里面了,但看主要的就是通过各个 Components来实现搜索预处理、处理 prepare方法 和 process方法,这里调用了 QueryComponent

@Override
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception
{
  List<SearchComponent> components  = getComponents();
  ResponseBuilder rb = new ResponseBuilder(req, rsp, components);
  if (rb.requestInfo != null) {
    rb.requestInfo.setResponseBuilder(rb);
  }

  boolean dbg = req.getParams().getBool(CommonParams.DEBUG_QUERY, false);
  rb.setDebug(dbg);
  if (dbg == false){//if it's true, we are doing everything anyway.
    SolrPluginUtils.getDebugInterests(req.getParams().getParams(CommonParams.DEBUG), rb);
  }

  final RTimer timer = rb.isDebug() ? req.getRequestTimer() : null;

  final ShardHandler shardHandler1 = getAndPrepShardHandler(req, rb); // creates a ShardHandler object only if it's needed
  
  if (timer == null) {
    // non-debugging prepare phase
    for( SearchComponent c : components ) {
      c.prepare(rb);//重点关注
    }
  } else {
    // debugging prepare phase
    RTimer subt = timer.sub( "prepare" );
    for( SearchComponent c : components ) {
      rb.setTimer( subt.sub( c.getName() ) );
      c.prepare(rb);//重点关注
      rb.getTimer().stop();
    }
    subt.stop();
  }

  if (!rb.isDistrib) {
    // a normal non-distributed request

    long timeAllowed = req.getParams().getLong(CommonParams.TIME_ALLOWED, -1L);
    if (timeAllowed > 0L) {
      SolrQueryTimeoutImpl.set(timeAllowed);
    }
    try {
      // The semantics of debugging vs not debugging are different enough that
      // it makes sense to have two control loops
      if(!rb.isDebug()) {
        // Process
        for( SearchComponent c : components ) {
          c.process(rb);//重点关注
        }
      }
      else {
        // Process
        RTimer subt = timer.sub( "process" );
        for( SearchComponent c : components ) {
          rb.setTimer( subt.sub( c.getName() ) );
          c.process(rb);//重点关注
          rb.getTimer().stop();
        }
        subt.stop();

        // add the timing info
        if (rb.isDebugTimings()) {
          rb.addDebugInfo("timing", timer.asNamedList() );
        }
      }
    } catch (ExitableDirectoryReader.ExitingReaderException ex) {
      log.warn( "Query: " + req.getParamString() + "; " + ex.getMessage());
      SolrDocumentList r = (SolrDocumentList) rb.rsp.getValues().get("response");
      if(r == null)
        r = new SolrDocumentList();
      r.setNumFound(0);
      rb.rsp.add("response", r);
      if(rb.isDebug()) {
        NamedList debug = new NamedList();
        debug.add("explain", new NamedList());
        rb.rsp.add("debug", debug);
      }
      rb.rsp.getResponseHeader().add("partialResults", Boolean.TRUE);
    } finally {
      SolrQueryTimeoutImpl.reset();
    }
  } else {
    // a distributed request

    if (rb.outgoing == null) {
      rb.outgoing = new LinkedList<>();
    }
    rb.finished = new ArrayList<>();

    int nextStage = 0;
    do {
      rb.stage = nextStage;
      nextStage = ResponseBuilder.STAGE_DONE;

      // call all components
      for( SearchComponent c : components ) {
        // the next stage is the minimum of what all components report
        nextStage = Math.min(nextStage, c.distributedProcess(rb));
      }


      // check the outgoing queue and send requests
      while (rb.outgoing.size() > 0) {

        // submit all current request tasks at once
        while (rb.outgoing.size() > 0) {
          ShardRequest sreq = rb.outgoing.remove(0);
          sreq.actualShards = sreq.shards;
          if (sreq.actualShards==ShardRequest.ALL_SHARDS) {
            sreq.actualShards = rb.shards;
          }
          sreq.responses = new ArrayList<>(sreq.actualShards.length); // presume we'll get a response from each shard we send to

          // TODO: map from shard to address[]
          for (String shard : sreq.actualShards) {
            ModifiableSolrParams params = new ModifiableSolrParams(sreq.params);
            params.remove(ShardParams.SHARDS);      // not a top-level request
            params.set(CommonParams.DISTRIB, "false");               // not a top-level request
            params.remove("indent");
            params.remove(CommonParams.HEADER_ECHO_PARAMS);
            params.set(ShardParams.IS_SHARD, true);  // a sub (shard) request
            params.set(ShardParams.SHARDS_PURPOSE, sreq.purpose);
            params.set(ShardParams.SHARD_URL, shard); // so the shard knows what was asked
            if (rb.requestInfo != null) {
              // we could try and detect when this is needed, but it could be tricky
              params.set("NOW", Long.toString(rb.requestInfo.getNOW().getTime()));
            }
            String shardQt = params.get(ShardParams.SHARDS_QT);
            if (shardQt != null) {
              params.set(CommonParams.QT, shardQt);
            } else {
              // for distributed queries that don't include shards.qt, use the original path
              // as the default but operators need to update their luceneMatchVersion to enable
              // this behavior since it did not work this way prior to 5.1
              if (req.getCore().getSolrConfig().luceneMatchVersion.onOrAfter(Version.LUCENE_5_1_0)) {
                String reqPath = (String) req.getContext().get(PATH);
                if (!"/select".equals(reqPath)) {
                  params.set(CommonParams.QT, reqPath);
                } // else if path is /select, then the qt gets passed thru if set
              } else {
                // this is the pre-5.1 behavior, which translates to sending the shard request to /select
                params.remove(CommonParams.QT);
              }
            }
            shardHandler1.submit(sreq, shard, params);
          }
        }


        // now wait for replies, but if anyone puts more requests on
        // the outgoing queue, send them out immediately (by exiting
        // this loop)
        boolean tolerant = rb.req.getParams().getBool(ShardParams.SHARDS_TOLERANT, false);
        while (rb.outgoing.size() == 0) {
          ShardResponse srsp = tolerant ? 
              shardHandler1.takeCompletedIncludingErrors():
              shardHandler1.takeCompletedOrError();
          if (srsp == null) break;  // no more requests to wait for

          // Was there an exception?  
          if (srsp.getException() != null) {
            // If things are not tolerant, abort everything and rethrow
            if(!tolerant) {
              shardHandler1.cancelAll();
              if (srsp.getException() instanceof SolrException) {
                throw (SolrException)srsp.getException();
              } else {
                throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, srsp.getException());
              }
            } else {
              if(rsp.getResponseHeader().get("partialResults") == null) {
                rsp.getResponseHeader().add("partialResults", Boolean.TRUE);
              }
            }
          }

          rb.finished.add(srsp.getShardRequest());

          // let the components see the responses to the request
          for(SearchComponent c : components) {
            c.handleResponses(rb, srsp.getShardRequest());
          }
        }
      }

      for(SearchComponent c : components) {
        c.finishStage(rb);
      }

      // we are done when the next stage is MAX_VALUE
    } while (nextStage != Integer.MAX_VALUE);
  }
  
  // SOLR-5550: still provide shards.info if requested even for a short circuited distrib request
  if(!rb.isDistrib && req.getParams().getBool(ShardParams.SHARDS_INFO, false) && rb.shortCircuitedURL != null) {  
    NamedList<Object> shardInfo = new SimpleOrderedMap<Object>();
    SimpleOrderedMap<Object> nl = new SimpleOrderedMap<Object>();        
    if (rsp.getException() != null) {
      Throwable cause = rsp.getException();
      if (cause instanceof SolrServerException) {
        cause = ((SolrServerException)cause).getRootCause();
      } else {
        if (cause.getCause() != null) {
          cause = cause.getCause();
        }          
      }
      nl.add("error", cause.toString() );
      StringWriter trace = new StringWriter();
      cause.printStackTrace(new PrintWriter(trace));
      nl.add("trace", trace.toString() );
    }
    else {
      nl.add("numFound", rb.getResults().docList.matches());
      nl.add("maxScore", rb.getResults().docList.maxScore());
    }
    nl.add("shardAddress", rb.shortCircuitedURL);
    nl.add("time", req.getRequestTimer().getTime()); // elapsed time of this request so far
    
    int pos = rb.shortCircuitedURL.indexOf("://");        
    String shardInfoName = pos != -1 ? rb.shortCircuitedURL.substring(pos+3) : rb.shortCircuitedURL;
    shardInfo.add(shardInfoName, nl);   
    rsp.getValues().add(ShardParams.SHARDS_INFO,shardInfo);            
  }
}

第七步:来看一下 QueryComponent 的prepare 和 process

prepare 最重要的 就是根据defType(url传递或者solrconfig设置,默认是lucene),获取对应的Parser和Query

@Override
public void prepare(ResponseBuilder rb) throws IOException
{

  SolrQueryRequest req = rb.req;
  SolrParams params = req.getParams();
  if (!params.getBool(COMPONENT_NAME, true)) {
    return;
  }
  SolrQueryResponse rsp = rb.rsp;

  // Set field flags    
  ReturnFields returnFields = new SolrReturnFields( req );//获取指定的返回字段,没单独指定的话这里为null
  rsp.setReturnFields( returnFields );
  int flags = 0;
  if (returnFields.wantsScore()) {
    flags |= SolrIndexSearcher.GET_SCORES;
  }
  rb.setFieldFlags( flags );

  String defType = params.get(QueryParsing.DEFTYPE, QParserPlugin.DEFAULT_QTYPE);

  // get it from the response builder to give a different component a chance
  // to set it.
  String queryString = rb.getQueryString();
  if (queryString == null) {
    // this is the normal way it's set.
    queryString = params.get( CommonParams.Q );
    rb.setQueryString(queryString);
  }

  try {
    QParser parser = QParser.getParser(rb.getQueryString(), defType, req);
    Query q = parser.getQuery();
    if (q == null) {
      // normalize a null query to a query that matches nothing
      q = new MatchNoDocsQuery();
    }

    rb.setQuery( q );

    String rankQueryString = rb.req.getParams().get(CommonParams.RQ);
    if(rankQueryString != null) {
      QParser rqparser = QParser.getParser(rankQueryString, defType, req);
      Query rq = rqparser.getQuery();
      if(rq instanceof RankQuery) {
        RankQuery rankQuery = (RankQuery)rq;
        rb.setRankQuery(rankQuery);
        MergeStrategy mergeStrategy = rankQuery.getMergeStrategy();
        if(mergeStrategy != null) {
          rb.addMergeStrategy(mergeStrategy);
          if(mergeStrategy.handlesMergeFields()) {
            rb.mergeFieldHandler = mergeStrategy;
          }
        }
      } else {
        throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,"rq parameter must be a RankQuery");
      }
    }

    rb.setSortSpec( parser.getSort(true) );
    rb.setQparser(parser);

    final String cursorStr = rb.req.getParams().get(CursorMarkParams.CURSOR_MARK_PARAM);
    if (null != cursorStr) {
      final CursorMark cursorMark = new CursorMark(rb.req.getSchema(),
                                                   rb.getSortSpec());
      cursorMark.parseSerializedTotem(cursorStr);
      rb.setCursorMark(cursorMark);
    }

    String[] fqs = req.getParams().getParams(CommonParams.FQ);
    if (fqs!=null && fqs.length!=0) {
      List<Query> filters = rb.getFilters();
      // if filters already exists, make a copy instead of modifying the original
      filters = filters == null ? new ArrayList<Query>(fqs.length) : new ArrayList<>(filters);
      for (String fq : fqs) {
        if (fq != null && fq.trim().length()!=0) {
          QParser fqp = QParser.getParser(fq, null, req);
          filters.add(fqp.getQuery());
        }
      }
      // only set the filters if they are not empty otherwise
      // fq=&someotherParam= will trigger all docs filter for every request 
      // if filter cache is disabled
      if (!filters.isEmpty()) {
        rb.setFilters( filters );
      }
    }
  } catch (SyntaxError e) {
    throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
  }

  if (params.getBool(GroupParams.GROUP, false)) {
    prepareGrouping(rb);
  } else {
    //Validate only in case of non-grouping search.
    if(rb.getSortSpec().getCount() < 0) {
      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'rows' parameter cannot be negative");
    }
  }

  //Input validation.
  if (rb.getQueryCommand().getOffset() < 0) {
    throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "'start' parameter cannot be negative");
  }
}

里面获取Parser单独看了一下

QParserPlugin qplug = req.getCore().getQueryPlugin(parserName);
QParser parser =  qplug.createParser(qstr, localParams, req.getParams(), req);//如果要修改评分算法,这里可能会用到(马后炮了)

最长的一个方法(感觉...),这里东西太多可以自己细看,也可以直接撸最关键的一个方法 SolrIndexSearcher 类的search方法开始查找数据了

/**
 * Actually run the query
 */
@Override
public void process(ResponseBuilder rb) throws IOException
{
  LOG.debug("process: {}", rb.req.getParams());

  SolrQueryRequest req = rb.req;
  SolrParams params = req.getParams();
  if (!params.getBool(COMPONENT_NAME, true)) {
    return;
  }
  SolrIndexSearcher searcher = req.getSearcher();

  StatsCache statsCache = req.getCore().getStatsCache();
  
  int purpose = params.getInt(ShardParams.SHARDS_PURPOSE, ShardRequest.PURPOSE_GET_TOP_IDS);
  if ((purpose & ShardRequest.PURPOSE_GET_TERM_STATS) != 0) {
    statsCache.returnLocalStats(rb, searcher);
    return;
  }
  // check if we need to update the local copy of global dfs
  if ((purpose & ShardRequest.PURPOSE_SET_TERM_STATS) != 0) {
    // retrieve from request and update local cache
    statsCache.receiveGlobalStats(req);
  }

  SolrQueryResponse rsp = rb.rsp;
  IndexSchema schema = searcher.getSchema();

  // Optional: This could also be implemented by the top-level searcher sending
  // a filter that lists the ids... that would be transparent to
  // the request handler, but would be more expensive (and would preserve score
  // too if desired).
  String ids = params.get(ShardParams.IDS);
  if (ids != null) {
    SchemaField idField = schema.getUniqueKeyField();
    List<String> idArr = StrUtils.splitSmart(ids, ",", true);
    int[] luceneIds = new int[idArr.size()];
    int docs = 0;
    for (int i=0; i<idArr.size(); i++) {
      int id = searcher.getFirstMatch(
              new Term(idField.getName(), idField.getType().toInternal(idArr.get(i))));
      if (id >= 0)
        luceneIds[docs++] = id;
    }

    DocListAndSet res = new DocListAndSet();
    res.docList = new DocSlice(0, docs, luceneIds, null, docs, 0);
    if (rb.isNeedDocSet()) {
      // TODO: create a cache for this!
      List<Query> queries = new ArrayList<>();
      queries.add(rb.getQuery());
      List<Query> filters = rb.getFilters();
      if (filters != null) queries.addAll(filters);
      res.docSet = searcher.getDocSet(queries);
    }
    rb.setResults(res);

    ResultContext ctx = new ResultContext();
    ctx.docs = rb.getResults().docList;
    ctx.query = null; // anything?
    rsp.add("response", ctx);
    return;
  }

  // -1 as flag if not set.
  long timeAllowed = params.getLong(CommonParams.TIME_ALLOWED, -1L);
  if (null != rb.getCursorMark() && 0 < timeAllowed) {
    // fundamentally incompatible
    throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Can not search using both " +
                            CursorMarkParams.CURSOR_MARK_PARAM + " and " + CommonParams.TIME_ALLOWED);
  }

  SolrIndexSearcher.QueryCommand cmd = rb.getQueryCommand();
  cmd.setTimeAllowed(timeAllowed);

  req.getContext().put(SolrIndexSearcher.STATS_SOURCE, statsCache.get(req));
  
  SolrIndexSearcher.QueryResult result = new SolrIndexSearcher.QueryResult();

  //
  // grouping / field collapsing
  //
  GroupingSpecification groupingSpec = rb.getGroupingSpec();
  if (groupingSpec != null) {
    try {
      boolean needScores = (cmd.getFlags() & SolrIndexSearcher.GET_SCORES) != 0;
      if (params.getBool(GroupParams.GROUP_DISTRIBUTED_FIRST, false)) {
        CommandHandler.Builder topsGroupsActionBuilder = new CommandHandler.Builder()
            .setQueryCommand(cmd)
            .setNeedDocSet(false) // Order matters here
            .setIncludeHitCount(true)
            .setSearcher(searcher);

        for (String field : groupingSpec.getFields()) {
          topsGroupsActionBuilder.addCommandField(new SearchGroupsFieldCommand.Builder()
              .setField(schema.getField(field))
              .setGroupSort(groupingSpec.getGroupSort())
              .setTopNGroups(cmd.getOffset() + cmd.getLen())
              .setIncludeGroupCount(groupingSpec.isIncludeGroupCount())
              .build()
          );
        }

        CommandHandler commandHandler = topsGroupsActionBuilder.build();
        commandHandler.execute();
        SearchGroupsResultTransformer serializer = new SearchGroupsResultTransformer(searcher);
        rsp.add("firstPhase", commandHandler.processResult(result, serializer));
        rsp.add("totalHitCount", commandHandler.getTotalHitCount());
        rb.setResult(result);
        return;
      } else if (params.getBool(GroupParams.GROUP_DISTRIBUTED_SECOND, false)) {
        CommandHandler.Builder secondPhaseBuilder = new CommandHandler.Builder()
            .setQueryCommand(cmd)
            .setTruncateGroups(groupingSpec.isTruncateGroups() && groupingSpec.getFields().length > 0)
            .setSearcher(searcher);

        for (String field : groupingSpec.getFields()) {
          SchemaField schemaField = schema.getField(field);
          String[] topGroupsParam = params.getParams(GroupParams.GROUP_DISTRIBUTED_TOPGROUPS_PREFIX + field);
          if (topGroupsParam == null) {
            topGroupsParam = new String[0];
          }

          List<SearchGroup<BytesRef>> topGroups = new ArrayList<>(topGroupsParam.length);
          for (String topGroup : topGroupsParam) {
            SearchGroup<BytesRef> searchGroup = new SearchGroup<>();
            if (!topGroup.equals(TopGroupsShardRequestFactory.GROUP_NULL_VALUE)) {
              searchGroup.groupValue = new BytesRef(schemaField.getType().readableToIndexed(topGroup));
            }
            topGroups.add(searchGroup);
          }

          secondPhaseBuilder.addCommandField(
              new TopGroupsFieldCommand.Builder()
                  .setField(schemaField)
                  .setGroupSort(groupingSpec.getGroupSort())
                  .setSortWithinGroup(groupingSpec.getSortWithinGroup())
                  .setFirstPhaseGroups(topGroups)
                  .setMaxDocPerGroup(groupingSpec.getGroupOffset() + groupingSpec.getGroupLimit())
                  .setNeedScores(needScores)
                  .setNeedMaxScore(needScores)
                  .build()
          );
        }

        for (String query : groupingSpec.getQueries()) {
          secondPhaseBuilder.addCommandField(new QueryCommand.Builder()
              .setDocsToCollect(groupingSpec.getOffset() + groupingSpec.getLimit())
              .setSort(groupingSpec.getGroupSort())
              .setQuery(query, rb.req)
              .setDocSet(searcher)
              .build()
          );
        }

        CommandHandler commandHandler = secondPhaseBuilder.build();
        commandHandler.execute();
        TopGroupsResultTransformer serializer = new TopGroupsResultTransformer(rb);
        rsp.add("secondPhase", commandHandler.processResult(result, serializer));
        rb.setResult(result);
        return;
      }

      int maxDocsPercentageToCache = params.getInt(GroupParams.GROUP_CACHE_PERCENTAGE, 0);
      boolean cacheSecondPassSearch = maxDocsPercentageToCache >= 1 && maxDocsPercentageToCache <= 100;
      Grouping.TotalCount defaultTotalCount = groupingSpec.isIncludeGroupCount() ?
          Grouping.TotalCount.grouped : Grouping.TotalCount.ungrouped;
      int limitDefault = cmd.getLen(); // this is normally from "rows"
      Grouping grouping =
          new Grouping(searcher, result, cmd, cacheSecondPassSearch, maxDocsPercentageToCache, groupingSpec.isMain());
      grouping.setSort(groupingSpec.getGroupSort())
          .setGroupSort(groupingSpec.getSortWithinGroup())
          .setDefaultFormat(groupingSpec.getResponseFormat())
          .setLimitDefault(limitDefault)
          .setDefaultTotalCount(defaultTotalCount)
          .setDocsPerGroupDefault(groupingSpec.getGroupLimit())
          .setGroupOffsetDefault(groupingSpec.getGroupOffset())
          .setGetGroupedDocSet(groupingSpec.isTruncateGroups());

      if (groupingSpec.getFields() != null) {
        for (String field : groupingSpec.getFields()) {
          grouping.addFieldCommand(field, rb.req);
        }
      }

      if (groupingSpec.getFunctions() != null) {
        for (String groupByStr : groupingSpec.getFunctions()) {
          grouping.addFunctionCommand(groupByStr, rb.req);
        }
      }

      if (groupingSpec.getQueries() != null) {
        for (String groupByStr : groupingSpec.getQueries()) {
          grouping.addQueryCommand(groupByStr, rb.req);
        }
      }

      if (rb.doHighlights || rb.isDebug() || params.getBool(MoreLikeThisParams.MLT, false)) {
        // we need a single list of the returned docs
        cmd.setFlags(SolrIndexSearcher.GET_DOCLIST);
      }

      grouping.execute();
      if (grouping.isSignalCacheWarning()) {
        rsp.add(
            "cacheWarning",
            String.format(Locale.ROOT, "Cache limit of %d percent relative to maxdoc has exceeded. Please increase cache size or disable caching.", maxDocsPercentageToCache)
        );
      }
      rb.setResult(result);

      if (grouping.mainResult != null) {
        ResultContext ctx = new ResultContext();
        ctx.docs = grouping.mainResult;
        ctx.query = null; // TODO? add the query?
        rsp.add("response", ctx);
        rsp.getToLog().add("hits", grouping.mainResult.matches());
      } else if (!grouping.getCommands().isEmpty()) { // Can never be empty since grouping.execute() checks for this.
        rsp.add("grouped", result.groupedResults);
        rsp.getToLog().add("hits", grouping.getCommands().get(0).getMatches());
      }
      return;
    } catch (SyntaxError e) {
      throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);
    }
  }

  // normal search result
  searcher.search(result, cmd); //重点
  rb.setResult(result);

  ResultContext ctx = new ResultContext();
  ctx.docs = rb.getResults().docList;
  ctx.query = rb.getQuery();
  rsp.add("response", ctx);
  rsp.getToLog().add("hits", rb.getResults().docList.matches());

  if ( ! rb.req.getParams().getBool(ShardParams.IS_SHARD,false) ) {
    if (null != rb.getNextCursorMark()) {
      rb.rsp.add(CursorMarkParams.CURSOR_MARK_NEXT,
                 rb.getNextCursorMark().getSerializedTotem());
    }
  }

  if(rb.mergeFieldHandler != null) {
    rb.mergeFieldHandler.handleMergeFields(rb, searcher);
  } else {
    doFieldSortValues(rb, searcher);
  }

  doPrefetch(rb);
}

第八步:SolrIndexSearcher 的 search方法,来回跳..最后getDocListC 走缓存,没有数据 继续调用getDocListNC

/**
 * getDocList version that uses+populates query and filter caches.
 * In the event of a timeout, the cache is not populated.
 */
private void getDocListC(QueryResult qr, QueryCommand cmd) throws IOException {
  DocListAndSet out = new DocListAndSet();
  qr.setDocListAndSet(out);
  QueryResultKey key=null;
  int maxDocRequested = cmd.getOffset() + cmd.getLen();
  // check for overflow, and check for # docs in index
  if (maxDocRequested < 0 || maxDocRequested > maxDoc()) maxDocRequested = maxDoc();
  int supersetMaxDoc= maxDocRequested;
  DocList superset = null;

  int flags = cmd.getFlags();
  Query q = cmd.getQuery();
  if (q instanceof ExtendedQuery) {
    ExtendedQuery eq = (ExtendedQuery)q;
    if (!eq.getCache()) {
      flags |= (NO_CHECK_QCACHE | NO_SET_QCACHE | NO_CHECK_FILTERCACHE);
    }
  }


  // we can try and look up the complete query in the cache.
  // we can't do that if filter!=null though (we don't want to
  // do hashCode() and equals() for a big DocSet).
  if (queryResultCache != null && cmd.getFilter()==null
      && (flags & (NO_CHECK_QCACHE|NO_SET_QCACHE)) != ((NO_CHECK_QCACHE|NO_SET_QCACHE)))
  {//是否走缓存
      // all of the current flags can be reused during warming,
      // so set all of them on the cache key.
      key = new QueryResultKey(q, cmd.getFilterList(), cmd.getSort(), flags);//封装缓存key
      if ((flags & NO_CHECK_QCACHE)==0) {
        superset = queryResultCache.get(key);//优先从缓存获取

        if (superset != null) {
          // check that the cache entry has scores recorded if we need them
          if ((flags & GET_SCORES)==0 || superset.hasScores()) {
            // NOTE: subset() returns null if the DocList has fewer docs than
            // requested
            out.docList = superset.subset(cmd.getOffset(),cmd.getLen());
          }
        }
        if (out.docList != null) {
          // found the docList in the cache... now check if we need the docset too.
          // OPT: possible future optimization - if the doclist contains all the matches,
          // use it to make the docset instead of rerunning the query.
          if (out.docSet==null && ((flags & GET_DOCSET)!=0) ) {
            if (cmd.getFilterList()==null) {
              out.docSet = getDocSet(cmd.getQuery());
            } else {
              List<Query> newList = new ArrayList<>(cmd.getFilterList().size()+1);
              newList.add(cmd.getQuery());
              newList.addAll(cmd.getFilterList());
              out.docSet = getDocSet(newList);
            }
          }
          return;
        }
      }

    // If we are going to generate the result, bump up to the
    // next resultWindowSize for better caching.

    if ((flags & NO_SET_QCACHE) == 0) {
      // handle 0 special case as well as avoid idiv in the common case.
      if (maxDocRequested < queryResultWindowSize) {
        supersetMaxDoc=queryResultWindowSize;
      } else {
        supersetMaxDoc = ((maxDocRequested -1)/queryResultWindowSize + 1)*queryResultWindowSize;
        if (supersetMaxDoc < 0) supersetMaxDoc=maxDocRequested;
      }
    } else {
      key = null;  // we won't be caching the result
    }
  }
  cmd.setSupersetMaxDoc(supersetMaxDoc);


  // OK, so now we need to generate an answer.
  // One way to do that would be to check if we have an unordered list
  // of results for the base query.  If so, we can apply the filters and then
  // sort by the resulting set.  This can only be used if:
  // - the sort doesn't contain score
  // - we don't want score returned.

  // check if we should try and use the filter cache
  boolean useFilterCache=false;
  if ((flags & (GET_SCORES|NO_CHECK_FILTERCACHE))==0 && useFilterForSortedQuery && cmd.getSort() != null && filterCache != null) {
    useFilterCache=true;
    SortField[] sfields = cmd.getSort().getSort();
    for (SortField sf : sfields) {
      if (sf.getType() == SortField.Type.SCORE) {
        useFilterCache=false;
        break;
      }
    }
  }

  if (useFilterCache) {
    // now actually use the filter cache.
    // for large filters that match few documents, this may be
    // slower than simply re-executing the query.
    if (out.docSet == null) {
      out.docSet = getDocSet(cmd.getQuery(),cmd.getFilter());
      DocSet bigFilt = getDocSet(cmd.getFilterList());
      if (bigFilt != null) out.docSet = out.docSet.intersection(bigFilt);
    }
    // todo: there could be a sortDocSet that could take a list of
    // the filters instead of anding them first...
    // perhaps there should be a multi-docset-iterator
    sortDocSet(qr, cmd);
  } else {
    // do it the normal way...
    if ((flags & GET_DOCSET)!=0) {
      // this currently conflates returning the docset for the base query vs
      // the base query and all filters.
      DocSet qDocSet = getDocListAndSetNC(qr,cmd);
      // cache the docSet matching the query w/o filtering
      if (qDocSet!=null && filterCache!=null && !qr.isPartialResults()) filterCache.put(cmd.getQuery(),qDocSet);
    } else {
      getDocListNC(qr,cmd);
    }
    assert null != out.docList : "docList is null";
  }

  if (null == cmd.getCursorMark()) {
    // Kludge...
    // we can't use DocSlice.subset, even though it should be an identity op
    // because it gets confused by situations where there are lots of matches, but
    // less docs in the slice then were requested, (due to the cursor)
    // so we have to short circuit the call.
    // None of which is really a problem since we can't use caching with
    // cursors anyway, but it still looks weird to have to special case this
    // behavior based on this condition - hence the long explanation.
    superset = out.docList;
    out.docList = superset.subset(cmd.getOffset(),cmd.getLen());
  } else {
    // sanity check our cursor assumptions
    assert null == superset : "cursor: superset isn't null";
    assert 0 == cmd.getOffset() : "cursor: command offset mismatch";
    assert 0 == out.docList.offset() : "cursor: docList offset mismatch";
    assert cmd.getLen() >= supersetMaxDoc : "cursor: superset len mismatch: " +
      cmd.getLen() + " vs " + supersetMaxDoc;
  }

  // lastly, put the superset in the cache if the size is less than or equal
  // to queryResultMaxDocsCached
  if (key != null && superset.size() <= queryResultMaxDocsCached && !qr.isPartialResults()) {
    queryResultCache.put(key, superset);
  }
}
private void getDocListNC(QueryResult qr,QueryCommand cmd) throws IOException {
  int len = cmd.getSupersetMaxDoc();
  int last = len;
  if (last < 0 || last > maxDoc()) last=maxDoc();
  final int lastDocRequested = last;
  int nDocsReturned;
  int totalHits;
  float maxScore;
  int[] ids;
  float[] scores;

  boolean needScores = (cmd.getFlags() & GET_SCORES) != 0;
  
  Query query = QueryUtils.makeQueryable(cmd.getQuery());

  ProcessedFilter pf = getProcessedFilter(cmd.getFilter(), cmd.getFilterList());
  if (pf.filter != null) {
    query = new FilteredQuery(query, pf.filter);
  }

  // handle zero case...
  if (lastDocRequested<=0) {
    final float[] topscore = new float[] { Float.NEGATIVE_INFINITY };
    final int[] numHits = new int[1];

    Collector collector;

    if (!needScores) {//是否需要返回评分值
      collector = new SimpleCollector () {
        @Override
        public void collect(int doc) {
          numHits[0]++;
        }
        
        @Override
        public boolean needsScores() {
          return false;
        }
      };
    } else {
      collector = new SimpleCollector() {
        Scorer scorer;
        @Override
        public void setScorer(Scorer scorer) {
          this.scorer = scorer;
        }
        @Override
        public void collect(int doc) throws IOException {
          numHits[0]++;
          float score = scorer.score();
          if (score > topscore[0]) topscore[0]=score;            
        }
        
        @Override
        public boolean needsScores() {
          return true;
        }
      };
    }
    
    buildAndRunCollectorChain(qr, query, collector, cmd, pf.postFilter);

    nDocsReturned=0;
    ids = new int[nDocsReturned];
    scores = new float[nDocsReturned];
    totalHits = numHits[0];
    maxScore = totalHits>0 ? topscore[0] : 0.0f;
    // no docs on this page, so cursor doesn't change
    qr.setNextCursorMark(cmd.getCursorMark());
  } else {
    final TopDocsCollector topCollector = buildTopDocsCollector(len, cmd);
    Collector collector = topCollector;
    buildAndRunCollectorChain(qr, query, collector, cmd, pf.postFilter);

    totalHits = topCollector.getTotalHits();
    TopDocs topDocs = topCollector.topDocs(0, len);
    populateNextCursorMarkFromTopDocs(qr, cmd, topDocs);

    maxScore = totalHits>0 ? topDocs.getMaxScore() : 0.0f;
    nDocsReturned = topDocs.scoreDocs.length;
    ids = new int[nDocsReturned];
    scores = (cmd.getFlags()&GET_SCORES)!=0 ? new float[nDocsReturned] : null;
    for (int i=0; i<nDocsReturned; i++) {
      ScoreDoc scoreDoc = topDocs.scoreDocs[i];
      ids[i] = scoreDoc.doc;
      if (scores != null) scores[i] = scoreDoc.score;
    }
  }

  int sliceLen = Math.min(lastDocRequested,nDocsReturned);
  if (sliceLen < 0) sliceLen=0;
  qr.setDocList(new DocSlice(0,sliceLen,ids,scores,totalHits,maxScore));
}

buildAndRunCollectorChain(qr, query, collector, cmd, pf.postFilter);  最后调用了 IndexSearcher的search方法

第九步:看IndexSearcher

/**
 * Lower-level search API.
 * 
 * <p>
 * {@link LeafCollector#collect(int)} is called for every document. <br>
 * 
 * <p>
 * NOTE: this method executes the searches on all given leaves exclusively.
 * To search across all the searchers leaves use {@link #leafContexts}.
 * 
 * @param leaves 
 *          the searchers leaves to execute the searches on
 * @param weight
 *          to match documents
 * @param collector
 *          to receive hits
 * @throws BooleanQuery.TooManyClauses If a query would exceed 
 *         {@link BooleanQuery#getMaxClauseCount()} clauses.
 */
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector)
    throws IOException {

  // TODO: should we make this
  // threaded...?  the Collector could be sync'd?
  // always use single thread:
  for (LeafReaderContext ctx : leaves) { // search each subreader
    final LeafCollector leafCollector;
    try {
      leafCollector = collector.getLeafCollector(ctx);
    } catch (CollectionTerminatedException e) {
      // there is no doc of interest in this reader context
      // continue with the following leaf
      continue;
    }
    BulkScorer scorer = weight.bulkScorer(ctx);//获取匹配的doc
    if (scorer != null) {
      try {
        scorer.score(leafCollector, ctx.reader().getLiveDocs());//排序计算分值
      } catch (CollectionTerminatedException e) {
        // collection was terminated prematurely
        // continue with the following leaf
      }
    }
  }
}

第十步:计算分值跳了好多比较复杂,没有细读,最后跳到了

CustomScoreQuery类的
@Override
public float score() throws IOException {
  // lazily advance to current doc.
  int doc = docID();
  if (doc > valSrcDocID) {
    for (Scorer valSrcScorer : valSrcScorers) {
      valSrcScorer.advance(doc);
    }
    valSrcDocID = doc;
  }
  // TODO: this thing technically takes any Query, so what about when subs don't match?
  for (int i = 0; i < valSrcScorers.length; i++) {
    vScores[i] = valSrcScorers[i].score();
  }
  return qWeight * provider.customScore(subQueryScorer.docID(), subQueryScorer.score(), vScores);
}

接下来就是一层层往回返了,最后通过第四步的回写返回接口数据。

总结主要流程:
SolrDispatchFilter->HttpSolrCall->SolrCore->RequestHandlerBase->SearchHandler->QueryComponent->QParser->SolrIndexSearcher->BulkScorer->Weight->TopScoreDocCollector->CustomScoreQuery

其实做了两件事:1.获取匹配的doc  2.计算匹配文档的score

附栈的堆叠图

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部