首页 > 代码库 > 企业搜索引擎开发之连接器connector(二十九)

企业搜索引擎开发之连接器connector(二十九)

在哪里调用监控器管理对象snapshotRepositoryMonitorManager的start方法及stop方法,然后又在哪里调用CheckpointAndChangeQueue对象的resume方法获取List<CheckpointAndChange> guaranteedChanges集合

下面跟踪到DiffingConnectorTraversalManager类的相关方法,在该类实现的方法中,调用了监控器管理对象snapshotRepositoryMonitorManager的相关方法实现对其操作

private final DocumentSnapshotRepositoryMonitorManager      snapshotRepositoryMonitorManager;  private final TraversalContextManager traversalContextManager;  /**   * Boolean to mark TraversalManager as invalid.   * It‘s possible for Connector Manager to keep a reference to   * an outdated TraversalManager (after a new one has been given   * previous TraversalManagers are invalid to use).   */  private boolean isActive = true;  /**   * Creates a {@link DiffingConnectorTraversalManager}.   *   * @param snapshotRepositoryMonitorManager the   *        {@link DocumentSnapshotRepositoryMonitorManager}   *        for use accessing a {@link ChangeSource}   * @param traversalContextManager {@link TraversalContextManager}   *        that holds the current {@link TraversalContext}   */  public DiffingConnectorTraversalManager(      DocumentSnapshotRepositoryMonitorManager snapshotRepositoryMonitorManager,      TraversalContextManager traversalContextManager) {    this.snapshotRepositoryMonitorManager = snapshotRepositoryMonitorManager;    this.traversalContextManager = traversalContextManager;  }

resumeTraversal方法启动监视器管理对象snapshotRepositoryMonitorManager,并返回DocumentList集合

/* @Override */  public synchronized DocumentList resumeTraversal(String checkpoint)      throws RepositoryException {    /* Exhaustive list of method‘s use:     resumeTraversal(null) from startTraversal:       monitors get started from null     resumeTraversal(null) from Connector Manager sometime after startTraversal:       monitors already started from previous resumeTraversal call     resumeTraversal(cp) from Connector Manager without a startTraversal:       means there was a shutdown or turn off       monitors get started from cp; should use state     resumeTraversal(cp) from Connector Manager sometime after some uses:       is most common case; roll    */    if (isActive()) {        //启动snapshotRepositoryMonitorManager      if (!snapshotRepositoryMonitorManager.isRunning()) {        snapshotRepositoryMonitorManager.start(checkpoint);      }      return newDocumentList(checkpoint);    } else {      throw new RepositoryException(          "Inactive FileTraversalManager referanced.");    }  }

进一步调用newDocumentList方法返回DocumentList集合

private DocumentList newDocumentList(String checkpoint)      throws RepositoryException {    //获取队列 CheckpointAndChangeQueue(队列 CheckpointAndChangeQueue只由snapshotRepositoryMonitorManager引用)    CheckpointAndChangeQueue checkpointAndChangeQueue =        snapshotRepositoryMonitorManager.getCheckpointAndChangeQueue();    try {      DiffingConnectorDocumentList documentList = new DiffingConnectorDocumentList(          checkpointAndChangeQueue,          CheckpointAndChangeQueue.initializeCheckpointStringIfNull(              checkpoint));      //Map<String, MonitorCheckpoint>      Map<String, MonitorCheckpoint> guaranteesMade =          checkpointAndChangeQueue.getMonitorRestartPoints();            snapshotRepositoryMonitorManager.acceptGuarantees(guaranteesMade);      return new ConfirmActiveDocumentList(documentList);    } catch (IOException e) {      throw new RepositoryException("Failure when making DocumentList.", e);    }  }

DiffingConnectorDocumentList documentList对象的构造函数里面封装了CheckpointAndChangeQueue checkpointAndChangeQueue队列集合

DiffingConnectorDocumentList 类完整实现如下:

/** * An implementation of {@link DocumentList} for the {@link DiffingConnector}. * * @since 2.8 */public class DiffingConnectorDocumentList implements DocumentList {  private final Iterator<CheckpointAndChange> checkpointAndChangeIterator;  private String checkpoint;  /**   * Creates a document list that returns a batch of documents from the provided   * {@link CheckpointAndChangeQueue}.   *   * @param queue a CheckpointAndChangeQueue containing document changes   * @param checkpoint point into the change queue after which to start   *        returning documents   * @throws IOException if persisting fails   */  public DiffingConnectorDocumentList(CheckpointAndChangeQueue queue,      String checkpoint) throws IOException {      //CheckpointAndChangeQueue queued的resume方法获取List<CheckpointAndChange>      //本DocumentList批次数据已经加载于内存    List<CheckpointAndChange> guaranteedChanges = queue.resume(checkpoint);    checkpointAndChangeIterator = guaranteedChanges.iterator();    this.checkpoint = checkpoint;  }    /**   * 调用方获取该状态并持久化,迭代完毕即为最后的checkpoint   */  /* @Override */  public String checkpoint() {    return checkpoint;  }  /* @Override */  public Document nextDocument() throws RepositoryException {    if (checkpointAndChangeIterator.hasNext()) {      CheckpointAndChange checkpointAndChange =        checkpointAndChangeIterator.next();      //更新checkpoint      checkpoint = checkpointAndChange.getCheckpoint().toString();      return checkpointAndChange.getChange().getDocumentHandle().getDocument();    } else {      return null;    }  }}

在其构造方法中调用参数CheckpointAndChangeQueue queue的resume方法获取List<CheckpointAndChange> guaranteedChanges,在其nextDocument()方法中通过迭代获取CheckpointAndChange checkpointAndChange对象,同时更新checkpoint状态标识

最后获取与监视器关联的MonitorCheckpoint对象映射

//Map<String, MonitorCheckpoint>      Map<String, MonitorCheckpoint> guaranteesMade =          checkpointAndChangeQueue.getMonitorRestartPoints();

然后调用监控器管理对象snapshotRepositoryMonitorManager的acceptGuarantees方法,相应的监视器对象接收并确认MonitorCheckpoint对象

 /**   * 监视器管理对象收到CheckpointAndChangeQueue对象反馈,分发给对应的监视器处理MonitorCheckpoint   */  /* @Override */  public void acceptGuarantees(Map<String, MonitorCheckpoint> guarantees) {    for (Map.Entry<String, MonitorCheckpoint> entry : guarantees.entrySet()) {      String monitorName = entry.getKey();      MonitorCheckpoint checkpoint = entry.getValue();      DocumentSnapshotRepositoryMonitor monitor = fileSystemMonitorsByName.get(monitorName);      if (monitor != null) {        // Signal is asynch.  Let monitor figure out how to use.          //回调        monitor.acceptGuarantee(checkpoint);      }    }  }

与仓库对象相对应的具体监视器接收确认

/**   * 监视器收到反馈 [MonitorCheckpoint接收确认]   * @param cp   */  // Public for DocumentSnapshotRepositoryMonitorTest  @VisibleForTesting  public void acceptGuarantee(MonitorCheckpoint cp) {    snapshotStore.acceptGuarantee(cp);    guaranteeCheckpoint = cp;  }

仓库对应的存储对象处于处理链的末端

/**   * 反馈MonitorCheckpoint处理   * @param cp   */  void acceptGuarantee(MonitorCheckpoint cp) {    long readSnapshotNumber = cp.getSnapshotNumber();    if (readSnapshotNumber < 0) {      throw new IllegalArgumentException("Received invalid snapshot in: " + cp);    }    if (oldestSnapshotToKeep > readSnapshotNumber) {      LOG.warning("Received an older snapshot than " + oldestSnapshotToKeep + ": " + cp);    } else {      oldestSnapshotToKeep = readSnapshotNumber;    }  }

---------------------------------------------------------------------------

本系列企业搜索引擎开发之连接器connector系本人原创

转载请注明出处 博客园 刺猬的温驯

本人邮箱: chenying998179@163#com (#改为.)

本文链接 http://www.cnblogs.com/chenying99/p/3789650.html