首页 > 代码库 > 企业搜索引擎开发之连接器connector(二十八)

企业搜索引擎开发之连接器connector(二十八)

通常一个SnapshotRepository仓库对象对应一个DocumentSnapshotRepositoryMonitor监视器对象,同时也对应一个快照存储器对象,它们的关联是通过监视器管理对象DocumentSnapshotRepositoryMonitorManagerImpl实现的

DocumentSnapshotRepositoryMonitorManagerImpl类要实现那些行为,先查看其实现接口DocumentSnapshotRepositoryMonitorManager定义的方法规范

/** * Management interface to {@link DocumentSnapshotRepositoryMonitor} threads. * * @since 2.8 */public interface DocumentSnapshotRepositoryMonitorManager {  /**   * Ensures all monitor threads are running.   *   * @param checkpoint for the last completed document or null if none have   *        been completed.   * @throws RepositoryException   */  void start(String checkpoint) throws RepositoryException;  /**   * Stops all the configured {@link DocumentSnapshotRepositoryMonitor} threads.   */  void stop();  /**   * Removes persisted state for {@link DocumentSnapshotRepositoryMonitor}   * threads. After calling this {@link DocumentSnapshotRepositoryMonitor}   * threads will no longer be able to resume from where they left off last   * time.   */  void clean();  /**   * Returns the number of {@link DocumentSnapshotRepositoryMonitor} threads   * that are alive. This method is for testing purposes.   */  int getThreadCount();  /**   * Returns the {@link CheckpointAndChangeQueue} for this   * {@link DocumentSnapshotRepositoryMonitorManager}   */  CheckpointAndChangeQueue getCheckpointAndChangeQueue();  /** Returns whether we are after a start() call and before a stop(). */  boolean isRunning();  /**   * Receives information specifying what is guaranteed to be delivered to GSA.   * Every entry in passed in Map is a monitor name and MonitorCheckpoint.   * The monitor of that name can expect that all documents before and including   * document related with MonitorCheckpoint will be delivered to GSA.   * This information is for the convenience and efficiency of the Monitor so   * that it knows how many changes it has to resend.  It‘s valid for a monitor   * to ignore these updates if it feels like it for some good reason.   * FileConnectorSystemMonitor instances use this information to trim their   * file system snapshots.   */  void acceptGuarantees(Map<String, MonitorCheckpoint> guarantees);  /**   * Receives {@link TraversalSchedule} from TraversalManager which is   * {@link TraversalScheduleAware}.     */  void setTraversalSchedule(TraversalSchedule traversalSchedule);}

然后再来看DocumentSnapshotRepositoryMonitorManagerImpl类怎么实现上述接口中定义的行为

先来了解相关属性及如何初始化它们的

private volatile TraversalSchedule traversalSchedule;//监控器线程  private final List<Thread> threads =      Collections.synchronizedList(new ArrayList<Thread>());  //监控器映射容器  private final Map<String, DocumentSnapshotRepositoryMonitor> fileSystemMonitorsByName =      Collections.synchronizedMap(new HashMap<String, DocumentSnapshotRepositoryMonitor>());  private boolean isRunning = false;  // Monitor threads start in off state.  private final List<? extends SnapshotRepository<? extends DocumentSnapshot>>      repositories;  private final File snapshotDir;  private final ChecksumGenerator checksumGenerator;  //CheckpointAndChange对象容器(List)  private final CheckpointAndChangeQueue checkpointAndChangeQueue;  //Change对象容器(阻塞队列)  private final ChangeQueue changeQueue;  private final DocumentSnapshotFactory documentSnapshotFactory;/**   * Constructs {@link DocumentSnapshotRepositoryMonitorManagerImpl}   * for the {@link DiffingConnector}.   *   * @param repositories a {@code List} of {@link SnapshotRepository   *        SnapshotRepositorys}   * @param documentSnapshotFactory a {@link DocumentSnapshotFactory}   * @param snapshotDir directory to store {@link SnapshotRepository}   * @param checksumGenerator a {@link ChecksumGenerator} used to   *        detect changes in a document‘s content   * @param changeQueue a {@link ChangeQueue}   * @param checkpointAndChangeQueue a   *        {@link CheckpointAndChangeQueue}   */  public DocumentSnapshotRepositoryMonitorManagerImpl(      List<? extends SnapshotRepository<          ? extends DocumentSnapshot>> repositories,      DocumentSnapshotFactory documentSnapshotFactory,      File snapshotDir, ChecksumGenerator checksumGenerator,      ChangeQueue changeQueue,      CheckpointAndChangeQueue checkpointAndChangeQueue) {    this.repositories = repositories;    this.documentSnapshotFactory = documentSnapshotFactory;    this.snapshotDir = snapshotDir;    this.checksumGenerator = checksumGenerator;    this.changeQueue = changeQueue;    this.checkpointAndChangeQueue = checkpointAndChangeQueue;  }

下面我们再来看它的start方法,在该方法中,主要动作为分别为调用checkpointAndChangeQueue对象的start方法,初始化各个仓库对象相关联的快照存储对象SnapshotStore,最后是启动各个仓库对象的监控器实例

/**   * 启动方法   */  /** Go from "cold" to "warm" including CheckpointAndChangeQueue. */  public void start(String connectorManagerCheckpoint)      throws RepositoryException {    try {        //启动 获取Change(主要动作:从json格式队列文件加载monitorPoints和checkpointAndChangeList队列)      checkpointAndChangeQueue.start(connectorManagerCheckpoint);    } catch (IOException e) {      throw new RepositoryException("Failed starting CheckpointAndChangeQueue.",          e);    }    //MonitorCheckpoint容器    Map<String, MonitorCheckpoint> monitorPoints        = checkpointAndChangeQueue.getMonitorRestartPoints();          Map<String, SnapshotStore> snapshotStores = null;        //加载monitorName与SnapshotStore映射容器    try {      snapshotStores =          recoverSnapshotStores(connectorManagerCheckpoint, monitorPoints);               } catch (SnapshotStoreException e) {      throw new RepositoryException("Snapshot recovery failed.", e);    } catch (IOException e) {      throw new RepositoryException("Snapshot recovery failed.", e);    } catch (InterruptedException e) {      throw new RepositoryException("Snapshot recovery interrupted.", e);    }           //启动监控线程    startMonitorThreads(snapshotStores, monitorPoints);            isRunning = true;  }

在初始化每个仓库对象的快照存储对象SnapshotStore时,同时传入相关联的MonitorCheckPoint对象实例,必要时修复快照文件

 /* For each start path gets its monitor recovery files in state were monitor   * can be started. */  /**   * 加载monitorName与SnapshotStore映射容器   * @param connectorManagerCheckpoint   * @param monitorPoints   * @return   * @throws IOException   * @throws SnapshotStoreException   * @throws InterruptedException   */  private Map<String, SnapshotStore> recoverSnapshotStores(      String connectorManagerCheckpoint, Map<String,      MonitorCheckpoint> monitorPoints)      throws IOException, SnapshotStoreException, InterruptedException {    Map<String, SnapshotStore> snapshotStores =        new HashMap<String, SnapshotStore>();    for (SnapshotRepository<? extends DocumentSnapshot> repository        : repositories) {      String monitorName = makeMonitorNameFromStartPath(repository.getName());      File dir = new File(snapshotDir,  monitorName);      boolean startEmpty = (connectorManagerCheckpoint == null)          || (!monitorPoints.containsKey(monitorName));      if (startEmpty) {        LOG.info("Deleting " + repository.getName()            + " global checkpoint=" + connectorManagerCheckpoint            + " monitor checkpoint=" + monitorPoints.get(monitorName));        //删除该快照目录        delete(dir);      } else {          //修复该快照目录        SnapshotStore.stitch(dir, monitorPoints.get(monitorName),            documentSnapshotFactory);      }      SnapshotStore snapshotStore = new SnapshotStore(dir,          documentSnapshotFactory);      snapshotStores.put(monitorName, snapshotStore);    }    return snapshotStores;  }

下面继续跟踪启动监控器线程的方法

 /**   * 启动监控线程(貌似MonitorCheckpoint与SnapshotStore与monitor有映射关系)   * Creates a {@link DocumentSnapshotRepositoryMonitor} thread for each   * startPath.   *   * @throws RepositoryDocumentException if any of the threads cannot be   *         started.   */  private void startMonitorThreads(Map<String, SnapshotStore> snapshotStores,      Map<String, MonitorCheckpoint> monitorPoints)      throws RepositoryDocumentException {    for (SnapshotRepository<? extends DocumentSnapshot> repository            : repositories) {      String monitorName = makeMonitorNameFromStartPath(repository.getName());      //monitorName snapshotStores映射      //快照存储器(读写器)      SnapshotStore snapshotStore = snapshotStores.get(monitorName);      //创建监控线程      Thread monitorThread = newMonitorThread(repository, snapshotStore,          monitorPoints.get(monitorName));      threads.add(monitorThread);      LOG.info("starting monitor for <" + repository.getName() + ">");      monitorThread.setName(repository.getName());      monitorThread.setDaemon(true);      monitorThread.start();    }  }

监控器对象的创建在下面的方法

/**   * 创建监控线程   * Creates a {@link DocumentSnapshotRepositoryMonitor} thread for the provided   * folder.   *   * @throws RepositoryDocumentException if {@code startPath} is not readable,   *         or if there is any problem reading or writing snapshots.   */  private Thread newMonitorThread(      SnapshotRepository<? extends DocumentSnapshot> repository,      SnapshotStore snapshotStore, MonitorCheckpoint startCp)      throws RepositoryDocumentException {      //注意monitorName    String monitorName = makeMonitorNameFromStartPath(repository.getName());    //document在监控线程里面处理    DocumentSnapshotRepositoryMonitor monitor =        new DocumentSnapshotRepositoryMonitor(monitorName, repository,            snapshotStore, changeQueue.newCallback(), DOCUMENT_SINK, startCp,            documentSnapshotFactory);    monitor.setTraversalSchedule(traversalSchedule);    LOG.fine("Adding a new monitor for " + monitorName + ": " + monitor);    fileSystemMonitorsByName.put(monitorName, monitor);    return new Thread(monitor);  }

stop方法实现监控器线程的停止

/**   * 停止监控器   */  private void flagAllMonitorsToStop() {    for (SnapshotRepository<? extends DocumentSnapshot> repository        : repositories) {      String monitorName = makeMonitorNameFromStartPath(repository.getName());      DocumentSnapshotRepositoryMonitor          monitor = fileSystemMonitorsByName.get(monitorName);      if (null != monitor) {        monitor.shutdown();      }      else {        LOG.fine("Unable to stop non existent monitor thread for "            + monitorName);      }    }  }  /**   * 停止监控器线程   */  /* @Override */  public synchronized void stop() {    for (Thread thread : threads) {      thread.interrupt();    }    for (Thread thread : threads) {      try {        thread.join(MAX_SHUTDOWN_MS);        if (thread.isAlive()) {          LOG.warning("failed to stop background thread: " + thread.getName());        }      } catch (InterruptedException e) {        // Mark this thread as interrupted so it can be dealt with later.        Thread.currentThread().interrupt();      }    }    threads.clear();    /* in case thread.interrupt doesn‘t stop monitors */    flagAllMonitorsToStop();    fileSystemMonitorsByName.clear();    changeQueue.clear();    this.isRunning = false;  }

在flagAllMonitorsToStop()方法中调用监控器对象的monitor.shutdown()方法,设置监控器对象 的标识属性

 /* The monitor should exit voluntarily if set to false */  private volatile boolean isRunning = true;

---------------------------------------------------------------------------

本系列企业搜索引擎开发之连接器connector系本人原创

转载请注明出处 博客园 刺猬的温驯

本人邮箱: chenying998179@163#com (#改为.)

本文链接 http://www.cnblogs.com/chenying99/p/3789613.html