首页 > 代码库 > solr源码解读(转)

solr源码解读(转)

solr源码解读(转)原文地址:http://blog.csdn.net/duck_genuine/article/details/6962624

配置

solr 对一个搜索请求的的流程

在solrconfig.xml会配置一个handler。配置了前置处理组件preParams,还有后置处理组件filterResult,当然还有默认的组件

 

[html] view plaincopy
 
  1. <requestHandler name="standard" class="solr.SearchHandler" default="true">  
  2.   
  3.      <arr name="first-components">  
  4.         <str>preParams</str>  
  5.      </arr>  
  6.         <lst name="defaults">  
  7.           <str name="echoParams">explicit</str>  
  8.           <int name="rows">10</int>  
  9.           <int name="start">0</int>  
  10.          <str name="q">*:*</str>  
  11.         </lst>      
  12.   
  13.      <arr name="last-components">  
  14.         <str>filterResult</str>  
  15.      </arr>       
  16.   
  17.    </requestHandler>  



 

http请求控制器

当一个查询请求过来的时候,先到类SolrDispatchFilter,由这个分发器寻找对应的handler来处理。
 

[java] view plaincopy
 
  1. String qt = solrReq.getParams().get( CommonParams.QT );  
  2. handler = core.getRequestHandler( qt );  



 

---------------------------------------------------------------------------------------------------

 

[java] view plaincopy
 
  1. this.execute( req, handler, solrReq, solrRsp );  
  2. HttpCacheHeaderUtil.checkHttpCachingVeto(solrRsp, resp, reqMethod);  



 

-----------------------------------------------------------------------------------------------

从上面的代码里看出是由solrCore留下的接口来处理请求。从代码框架上,从此刻开始进入solr的核心代码。

 

 

[java] view plaincopy
 
  1. protected void execute( HttpServletRequest req, SolrRequestHandler handler, SolrQueryRequest sreq, SolrQueryResponse rsp) {  
  2.   sreq.getContext().put( "webapp", req.getContextPath() );  
  3.   sreq.getCore().execute( handler, sreq, rsp );  
  4. }  



 

 

看一下solrCore代码execute的方法 的主要代码

 

[java] view plaincopy
 
  1. public void execute(SolrRequestHandler handler, SolrQueryRequest req, SolrQueryResponse rsp) {  
  2. 。。。。。  
  3.     handler.handleRequest(req,rsp);  
  4.     setResponseHeaderValues(handler,req,rsp);  
  5.  。。。。。。。  
  6.   }  



 

主要实现对请求的处理,并将请求结果的状态信息写到响应的头部

 

SolrRequestHandler 处理器

 

再看一下对请求的处理。。先看定义该请求处理器的接口,可以更好理解。只有两个方法,一个是初始化信息,主要是配置时的默认参数,另一个就是处理请求的接口。

 

[java] view plaincopy
 
  1. public interface SolrRequestHandler extends SolrInfoMBean {  
  2.   public void init(NamedList args);  
  3.   public void handleRequest(SolrQueryRequest req, SolrQueryResponse rsp);  
  4. }  



 

先看一下实现该接口的类RequestHandlerBase

 

[java] view plaincopy
 
  1. public void handleRequest(SolrQueryRequest req, SolrQueryResponse rsp) {  
  2.     numRequests++;  
  3.     try {  
  4.       SolrPluginUtils.setDefaults(req,defaults,appends,invariants);  
  5.       rsp.setHttpCaching(httpCaching);  
  6.       handleRequestBody( req, rsp );  
  7.       // count timeouts  
  8.       NamedList header = rsp.getResponseHeader();  
  9.       if(header != null) {  
  10.         Object partialResults = header.get("partialResults");  
  11.         boolean timedOut = partialResults == null ? false : (Boolean)partialResults;  
  12.         if( timedOut ) {  
  13.           numTimeouts++;  
  14.           rsp.setHttpCaching(false);  
  15.         }  
  16.       }  
  17.     } catch (Exception e) {  
  18.       SolrException.log(SolrCore.log,e);  
  19.       if (e instanceof ParseException) {  
  20.         e = new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);  
  21.       }  
  22.       rsp.setException(e);  
  23.       numErrors++;  
  24.     }  
  25.     totalTime += rsp.getEndTime() - req.getStartTime();  
  26.   }  



 

主要记录该请求处理的状态与处理时间记录。真正的实现方法交由各个子类      handleRequestBody( req, rsp );

现在看一下SearchHandler对于搜索处理的实现方法

 

首先是将solrconfig.xml上配置的各个处理组件按一定顺序组装起来,先是first-Component,默认的component,last-component.这些处理组件会按照它们的顺序来执行,以下是searchHandler的实现主体。方法handleRequestBody

 

 

[java] view plaincopy
 
  1. @Override  
  2. public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception, ParseException, InstantiationException, IllegalAccessException  
  3. {  
  4.   // int sleep = req.getParams().getInt("sleep",0);  
  5.   // if (sleep > 0) {log.error("SLEEPING for " + sleep);  Thread.sleep(sleep);}  
  6.   ResponseBuilder rb = new ResponseBuilder();  
  7.   rb.req = req;  
  8.   rb.rsp = rsp;  
  9.   rb.components = components;  
  10.   rb.setDebug(req.getParams().getBool(CommonParams.DEBUG_QUERY, false));  
  11.   
  12.   final RTimer timer = rb.isDebug() ? new RTimer() : null;  
  13.   
  14.   if (timer == null) {  
  15.     // non-debugging prepare phase  
  16.     for( SearchComponent c : components ) {  
  17.       c.prepare(rb);  
  18.     }  
  19.   } else {  
  20.     // debugging prepare phase  
  21.     RTimer subt = timer.sub( "prepare" );  
  22.     for( SearchComponent c : components ) {  
  23.       rb.setTimer( subt.sub( c.getName() ) );  
  24.       c.prepare(rb);  
  25.       rb.getTimer().stop();  
  26.     }  
  27.     subt.stop()<span style="color:#FF0000;">;</span>  
  28.   }  
  29.    //单机版  
  30.   if (rb.shards == null) {  
  31.     // a normal non-distributed request  
  32.   
  33.     // The semantics of debugging vs not debugging are different enough that  
  34.     // it makes sense to have two control loops  
  35.     if(!rb.isDebug()) {  
  36.       // Process  
  37.       for( SearchComponent c : components ) {  
  38.         c.process(rb);  
  39.       }  
  40.     }  
  41.     else {  
  42.       // Process  
  43.       RTimer subt = timer.sub( "process" );  
  44.       for( SearchComponent c : components ) {  
  45.         rb.setTimer( subt.sub( c.getName() ) );  
  46.         c.process(rb);  
  47.         rb.getTimer().stop();  
  48.       }  
  49.       subt.stop();  
  50.       timer.stop();  
  51.   
  52.       // add the timing info  
  53.       if( rb.getDebugInfo() == null ) {  
  54.         rb.setDebugInfo( new SimpleOrderedMap<Object>() );  
  55.       }  
  56.       rb.getDebugInfo().add( "timing", timer.asNamedList() );  
  57.     }  
  58.   
  59.   } else {//分布式请求  
  60.     // a distributed request  
  61.   
  62.     HttpCommComponent comm = new HttpCommComponent();  
  63.   
  64.     if (rb.outgoing == null) {  
  65.       rb.outgoing = new LinkedList<ShardRequest>();  
  66.     }  
  67.     rb.finished = new ArrayList<ShardRequest>();  
  68.   
  69.     //起始状态为0,结束状态为整数的最大值  
  70.     int nextStage = 0;  
  71.     do {  
  72.       rb.stage = nextStage;  
  73.       nextStage = ResponseBuilder.STAGE_DONE;  
  74.   
  75.       // call all components  
  76.       for( SearchComponent c : components ) {  
  77.         //得到所有组件运行后返回的下一个状态,并取最小值  
  78.         nextStage = Math.min(nextStage, c.distributedProcess(rb));  
  79.       }  
  80.   
  81.   
  82.       // 如果有需要向子机发送请求  
  83.       while (rb.outgoing.size() > 0) {  
  84.   
  85.         // submit all current request tasks at once  
  86.         while (rb.outgoing.size() > 0) {  
  87.           ShardRequest sreq = rb.outgoing.remove(0);  
  88.           sreq.actualShards = sreq.shards;  
  89.           if (sreq.actualShards==ShardRequest.ALL_SHARDS) {  
  90.             sreq.actualShards = rb.shards;  
  91.           }  
  92.           sreq.responses = new ArrayList<ShardResponse>();  
  93.   
  94.           // 向各个子机发送请求  
  95.           for (String shard : sreq.actualShards) {  
  96.             ModifiableSolrParams params = new ModifiableSolrParams(sreq.params);  
  97.             params.remove(ShardParams.SHARDS);      // not a top-level request  
  98.             params.remove("indent");  
  99.             params.remove(CommonParams.HEADER_ECHO_PARAMS);  
  100.             params.set(ShardParams.IS_SHARD, true);  // a sub (shard) request  
  101.             String shardHandler = req.getParams().get(ShardParams.SHARDS_QT);  
  102.             if (shardHandler == null) {  
  103.               params.remove(CommonParams.QT);  
  104.             } else {  
  105.               params.set(CommonParams.QT, shardHandler);  
  106.             }  
  107.           //提交子请求  
  108.            comm.submit(sreq, shard, params);  
  109.           }  
  110.         }  
  111.   
  112.   
  113.         // now wait for replies, but if anyone puts more requests on  
  114.         // the outgoing queue, send them out immediately (by exiting  
  115.         // this loop)  
  116.         while (rb.outgoing.size() == 0) {  
  117.           ShardResponse srsp = comm.takeCompletedOrError();  
  118.           if (srsp == null) break;  // no more requests to wait for  
  119.   
  120.           // Was there an exception?  If so, abort everything and  
  121.           // rethrow  
  122.           if (srsp.getException() != null) {  
  123.             comm.cancelAll();  
  124.             if (srsp.getException() instanceof SolrException) {  
  125.               throw (SolrException)srsp.getException();  
  126.             } else {  
  127.               throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, srsp.getException());  
  128.             }  
  129.           }  
  130.   
  131.           rb.finished.add(srsp.getShardRequest());  
  132.   
  133.           //每个组件都对于返回的数据处理  
  134.           for(SearchComponent c : components) {  
  135.             c.handleResponses(rb, srsp.getShardRequest());  
  136.           }  
  137.         }  
  138.       }//请求队列结束  
  139.   
  140.       //再对该轮请求进行收尾工作  
  141.       for(SearchComponent c : components) {  
  142.           c.finishStage(rb);  
  143.        }  
  144.   
  145.       //如果状态未到结束,则继续循环  
  146.     } while (nextStage != Integer.MAX_VALUE);  
  147.   }  
  148. }  

 

 

首先运行的是各个组件的方法prepare

 

[java] view plaincopy
 
  1. for( SearchComponent c : components ) {  
  2.   c.prepare(rb);  
  3. }  



 

再则如果不是分布式搜索,则比较简单的运行

 

[java] view plaincopy
 
  1. for( SearchComponent c : components ) {  
  2.         c.process(rb);  
  3.       }  



 

就结束!

如果是分布式搜索,过程会比较复杂些,对于每个组件处理都会返回一个状态,对于以下几个方法循环执行,直到状态结束 。  

在类ResponseBuilder定义了几个状态。

  

[java] view plaincopy
 
  1. public static int STAGE_START           = 0;  
  2. public static int STAGE_PARSE_QUERY     = 1000;  
  3. public static int STAGE_EXECUTE_QUERY   = 2000;  
  4. public static int STAGE_GET_FIELDS      = 3000;  
  5. public static int STAGE_DONE            = Integer.MAX_VALUE;  

 

 

 

从STAGE_START---->STAGE_PARSE_QUERY------>STAGE_EXECUTE_QUERY--------------->STAGE_GET_FIELDS------------>STAGE_DONE

从这些状态名称可以猜得出整个对应的过程。

每个组件先调用方法distributeProcess,并返回下一个状态

 

[java] view plaincopy
 
  1. for( SearchComponent c : components ) {  
  2.      // the next stage is the minimum of what all components report  
  3.      nextStage = Math.min(nextStage, c.distributedProcess(rb));  
  4.    }  

 

而方法handleResponse主要处理返回来的数据

 

     

[java] view plaincopy
 
  1. for(SearchComponent c : components) {  
  2.         c.handleResponses(rb, srsp.getShardRequest());  
  3.       }  


然后交由finishStage方法来对每一个状态的过程作结束动作。

 

------------------------------

 

[java] view plaincopy
 
  1. for(SearchComponent c : components) {  
  2.           c.finishStage(rb);  
  3.        }  



 

-----------------------------

了解这个流程有助于扩展solr。比如有个业务是要我对搜索的自然结果排序进行干预,而这个干预只针对前几页结果,所以我不得不做个组件来对其中结果进行处理。

所以我想可以添加一个组件放在最后-------------》

1)如果是分布式搜索:

       这个组件可以在重写finsihStage做处理。算是对最终结果的排序处理即可。

2)如果只是单机:

      这个组件可以在重写process做处理

 

 

组件

现在看一下其中一个主要的组件QueryComponent

prepare

对于QueryComponent主要解析用户传送的语法解析参数defType,以及过滤查询fq,返回字段集fl.排序字段Sort

 

单机处理

process

 

   分布式搜索过程中的某一步,这里应该是主机要合并文档,取出对应的文档的过程,

主机发出指定的solr主键ids来取文档集,首先取出对应的lucene的内部id集。如果某些文档已不在则弃掉。

 

 

[java] view plaincopy
 
  1. String ids = params.get(ShardParams.IDS);  
  2.     if (ids != null) {//将传过来的ids,放进结果集中,并在后面取出对应的结果文档  
  3.      SchemaField idField = req.getSchema().getUniqueKeyField();  
  4.       List<String> idArr = StrUtils.splitSmart(ids, ",", true);  
  5.       int[] luceneIds = new int[idArr.size()];  
  6.       int docs = 0;  
  7.       for (int i=0; i<idArr.size(); i++) {  
  8.       //solr主键id对应的文档lucene内部的id  
  9.        int id = req.getSearcher().getFirstMatch(  
  10.                 new Term(idField.getName(), idField.getType().toInternal(idArr.get(i))));  
  11.         if (id >= 0)  
  12.           luceneIds[docs++] = id;  
  13.       }  
  14.        
  15.       DocListAndSet res = new DocListAndSet();  
  16.   
  17.       //这里并没有传入scores[]  
  18.   
  19.   res.docList = new DocSlice(0, docs, luceneIds, null, docs, 0);  
  20. //需要另一种doc集合处理。  
  21.  if (rb.isNeedDocSet()) {  
  22.  List<Query> queries = new ArrayList<Query>();  
  23.   queries.add(rb.getQuery());  
  24. List<Query> filters = rb.getFilters();   
  25. if (filters != null)  
  26.  queries.addAll(filters);  
  27.   res.docSet = searcher.getDocSet(queries);  
  28.  }   
  29. rb.setResults(res);  
  30.  rsp.add("response",rb.getResults().docList);  
  31.  return;   
  32. }  

 

[java] view plaincopy
 
    1. <pre name="code" class="java">  //封装搜索值对象与封装结果值对象   
    2.    SolrIndexSearcher.QueryCommand cmd = rb.getQueryCommand();  
    3.    //设置超时最大值  
    4.     cmd.setTimeAllowed(timeAllowed);  
    5.     SolrIndexSearcher.QueryResult result = new SolrIndexSearcher.QueryResult();  
    6.     //搜索  
    7.     searcher.search(result,cmd);  
    8.     //设置搜索结果  
    9.     rb.setResult( result );  
    10.     rsp.add("response",rb.getResults().docList);  
    11.     rsp.getToLog().add("hits", rb.getResults().docList.matches());  
    12.     //对含有字段排序处理  
    13.     doFieldSortValues(rb, searcher);  
    14.    //非分布查询过程,且搜索结果数小于50,进行缓存  
    15.     doPrefetch(rb);  
    16.   
    17.   
    18. <pre name="code" class="java"><p>目前看到真实获取文档内容的是在</p><p>QueryResponseWriter</p><p>例如xml的输出格式类XMLWriter</p></pre><p></p>  
    19. <pre></pre>  
    20. <pre></pre>  
    21. <br>  
    22. <p></p>  
    23. <h2><a name="t10"></a>分布式处理<br>  
    24. </h2>  
    25. <h3><a name="t11"></a>1)distributedProcess</h3>  
    26. <p></p><pre name="code" class="java">  @Override    
    27.   public int distributedProcess(ResponseBuilder rb) throws IOException {  
    28.     if (rb.stage < ResponseBuilder.STAGE_PARSE_QUERY)  
    29.       return ResponseBuilder.STAGE_PARSE_QUERY;  
    30.     if (rb.stage == ResponseBuilder.STAGE_PARSE_QUERY) {  
    31.       createDistributedIdf(rb);  
    32.       return ResponseBuilder.STAGE_EXECUTE_QUERY;  
    33.     }  
    34.     if (rb.stage < ResponseBuilder.STAGE_EXECUTE_QUERY) return ResponseBuilder.STAGE_EXECUTE_QUERY;  
    35.     if (rb.stage == ResponseBuilder.STAGE_EXECUTE_QUERY) {  
    36. //分布式查询  
    37.      createMainQuery(rb);  
    38.       return ResponseBuilder.STAGE_GET_FIELDS;  
    39.     }  
    40.     if (rb.stage < ResponseBuilder.STAGE_GET_FIELDS) return ResponseBuilder.STAGE_GET_FIELDS;  
    41.     if (rb.stage == ResponseBuilder.STAGE_GET_FIELDS) {  
    42.    
    43.     //这里就会去对应的主机拿取需要的字段,封装请求字段的参数,放进请求队列里,可以由外部的searchHandler提交该请求,最后结果放在ShardResponse类里。  
    44.      createRetrieveDocs(rb);  
    45.       return ResponseBuilder.STAGE_DONE;  
    46.     }  
    47.     return ResponseBuilder.STAGE_DONE;  
    48.   }</pre><br>  
    49. <br>  
    50. <p></p>  
    51. <p>   <br>  
    52. </p>  
    53. <p><br>  
    54. </p>  
    55. <h3><a name="t12"></a> 2) handleResponses<br>  
    56. </h3>  
    57. <pre name="code" class="java"> public void handleResponses(ResponseBuilder rb, ShardRequest sreq) {    
    58.   
    59.          if ((sreq.purpose & ShardRequest.PURPOSE_GET_TOP_IDS) != 0) {  
    60.   
    61.                       //合并ids   
    62.   
    63.                mergeIds(rb, sreq);  
    64.   
    65.               //合并groupCount     
    66.   
    67.             mergeGroupCounts(rb, sreq);   
    68.   
    69.            }      
    70.   
    71.        if ((sreq.purpose & ShardRequest.PURPOSE_GET_FIELDS) != 0) {  
    72.   
    73.                //获取文档的字段,并将结题组装起来放到最终结果列表对应的位置里      
    74.   
    75.              returnFields(rb, sreq);      
    76.   
    77.             return;    
    78.   
    79.        }  
    80.   
    81.   }</pre><br>  
    82. <br>  
    83. <h3><a name="t13"></a>   3)  finishStage</h3>  
    84. <p><br>  
    85. </p>  
    86. <p> </p><pre name="code" class="java"> @Override  
    87.   public void finishStage(ResponseBuilder rb) {  
    88.    //这里说是==获取文档内容的值,在  
    89.    if (rb.stage == ResponseBuilder.STAGE_GET_FIELDS) {  
    90.        //有些文档可能已不存在了,则忽略掉  
    91.       for (Iterator<SolrDocument> iter = rb._responseDocs.iterator(); iter.hasNext();) {  
    92.         if (iter.next() == null) {  
    93.           iter.remove();  
    94.           rb._responseDocs.setNumFound(rb._responseDocs.getNumFound()-1);  
    95.         }          
    96.       }  
    97.   
    98.       rb.rsp.add("response", rb._responseDocs);  
    99.     }  
    100.   }  
    101. </pre><br>  
    102. <p></p>  
    103. <p><span style="color:#FF0000"><br>  
    104. </span></p>  
    105. <p><span style="color:#FF0000">同样最后的结果是保存在<br>  
    106. <br>  
    107. ResponseBuilder <br>  
    108. <br>  
    109.      ResponseBuilder <br>  
    110.          NamedList values = new SimpleOrderedMap();<br>  
    111. <br>  
    112. 这个字段里,以键为"response",单机存储的是lucene 的内部id列表<br>  
    113. 如果是分布式,则存储的是SolrDocumentList,不用再去索引拿出对应的存储字段,<br>  
    114. 这个在QueryResponseWriter里有对应的处理</span><br>  
    115. </p>  
    116. <p></p>  
    117. <p><br>  
    118. </p>  
    119. <p><br>  
    120. </p>  
    121. <p><br>  
    122. </p>  
    123. <p><br>  
    124. </p>  
    125. <p><br>  
    126. </p>  
    127. <p><br>  
    128. </p>  
    129. <p><br>  
    130. </p>  
    131. <p><br>  
    132. </p>  
    133. <p></p>  
    134.   
    135. </pre>  

solr源码解读(转)