首页 > 代码库 > 企业搜索引擎开发之连接器connector(二十六)

企业搜索引擎开发之连接器connector(二十六)

连接器通过监视器对象DocumentSnapshotRepositoryMonitor从上文提到的仓库对象SnapshotRepository(数据库仓库为DBSnapshotRepository)中迭代获取数据

监视器类DocumentSnapshotRepositoryMonitor在其构造方法初始化相关成员变量,这些成员属性都是与数据获取及数据处理逻辑相关的对象

 /** This connector instance‘s current traversal schedule. */  private volatile TraversalSchedule traversalSchedule;  /** Directory that contains snapshots. */  private final SnapshotStore snapshotStore;  /** The root of the repository to monitor */  private final SnapshotRepository<? extends DocumentSnapshot> query;  /** Reader for the current snapshot. */  private SnapshotReader snapshotReader;  /** Callback to invoke when a change is detected. */  private final Callback callback;  /** Current record from the snapshot. */  private DocumentSnapshot current;  /** The snapshot we are currently writing */  private OrderedSnapshotWriter snapshotWriter;  private final String name;  private final DocumentSnapshotFactory documentSnapshotFactory;  private final DocumentSink documentSink;  /* Contains a checkpoint confirmation from CM. */  private MonitorCheckpoint guaranteeCheckpoint;  /* The monitor should exit voluntarily if set to false */  private volatile boolean isRunning = true;  /**   * Creates a DocumentSnapshotRepositoryMonitor that monitors the   * Repository rooted at {@code root}.   *   * @param name the name of this monitor (a hash of the start path)   * @param query query for files   * @param snapshotStore where snapshots are stored   * @param callback client callback   * @param documentSink destination for filtered out file info   * @param initialCp checkpoint when system initiated, could be {@code null}   * @param documentSnapshotFactory for un-serializing   *        {@link DocumentSnapshot} objects.   */  public DocumentSnapshotRepositoryMonitor(String name,      SnapshotRepository<? extends DocumentSnapshot> query,      SnapshotStore snapshotStore, Callback callback,      DocumentSink documentSink, MonitorCheckpoint initialCp,      DocumentSnapshotFactory documentSnapshotFactory) {    this.name = name;    this.query = query;    this.snapshotStore = snapshotStore;    this.callback = callback;    this.documentSnapshotFactory = documentSnapshotFactory;    this.documentSink = documentSink;    guaranteeCheckpoint = initialCp;  }

同时实现了Runnable接口,在override的run方法里面实现数据的处理逻辑

@Override  public void run() {    // Call NDC.push() via reflection, if possible.    invoke(ndcPush, "Monitor " + name);    try {      while (true) {        tryToRunForever();        // TODO: Remove items from this monitor that are in queues.        // Watch out for race conditions. The queues are potentially        // giving docs to CM as bad things happen in monitor.        // This TODO would be mitigated by a reconciliation with GSA.        performExceptionRecovery();      }    } catch (InterruptedException ie) {      LOG.info("Repository Monitor " + name + " received stop signal. " + this);    } finally {      // Call NDC.remove() via reflection, if possible.      invoke(ndcRemove);    }  }

进一步调用tryToRunForever()方法

private void tryToRunForever() throws InterruptedException {    try {      while (true) {        if (traversalSchedule == null || traversalSchedule.shouldRun()) {          // Start traversal          doOnePass();        }        else {          LOG.finest("Currently out of traversal window. "              + "Sleeping for 15 minutes.");          // TODO(nashi): Calculate when it should wake up while          // handling TraversalScheduleAware events properly.          //没到点,休息          callback.passPausing(15*60*1000);        }      }    } catch (SnapshotWriterException e) {      String msg = "Failed to write to snapshot file: " + snapshotWriter.getPath();      LOG.log(Level.SEVERE, msg, e);    } catch (SnapshotReaderException e) {      String msg = "Failed to read snapshot file: " + snapshotReader.getPath();      LOG.log(Level.SEVERE, msg, e);    } catch (SnapshotStoreException e) {      String msg = "Problem with snapshot store.";      LOG.log(Level.SEVERE, msg, e);    } catch (SnapshotRepositoryRuntimeException e) {      String msg = "Failed reading repository.";      LOG.log(Level.SEVERE, msg, e);    }  }

在doOnePass()方法实现从仓库对象SnapshotRepository中获取数据,并将数据快照持久化到快照文件,并实现相关的数据处理逻辑(判断是新增 删除或更新等,

这些数据最后通过回调Callback接口添加到ChangeQueue对象中的阻塞队列)

/**   * 在doOnePass()方法中生成独立的快照读写器   * Makes one pass through the repository, notifying {@code visitor} of any   * changes.   *   * @throws InterruptedException   */  private void doOnePass() throws SnapshotStoreException,      InterruptedException {    callback.passBegin();    try {        //快照读取器      // Open the most recent snapshot and read the first record.      this.snapshotReader = snapshotStore.openMostRecentSnapshot();      current = snapshotReader.read();       //快照写入器      // Create an snapshot writer for this pass.      this.snapshotWriter =          new OrderedSnapshotWriter(snapshotStore.openNewSnapshotWriter());      //下面代码为从仓库里面获取数据      for(DocumentSnapshot ss : query) {          //检查是否停止        if (false == isRunning) {          LOG.log(Level.INFO, "Exiting the monitor thread " + name              + " " + this);          throw new InterruptedException();        }        if (Thread.currentThread().isInterrupted()) {          throw new InterruptedException();        }        processDeletes(ss);        safelyProcessDocumentSnapshot(ss);      }      // Take care of any trailing paths in the snapshot.      processDeletes(null);    } finally {      try {        snapshotStore.close(snapshotReader, snapshotWriter);      } catch (IOException e) {        LOG.log(Level.WARNING, "Failed closing snapshot reader and writer.", e);        // Try to proceed anyway.  Weird they are not closing.      }    }    if (current != null) {      throw new IllegalStateException(          "Should not finish pass until entire read snapshot is consumed.");    }    //完工了,休息    callback.passComplete(getCheckpoint(-1));    snapshotStore.deleteOldSnapshots();    if (!callback.hasEnqueuedAtLeastOneChangeThisPass()) {      // No monitor checkpoints from this pass went to queue because      // there were no changes, so we can delete the snapshot we just wrote.      new java.io.File(snapshotWriter.getPath()).delete();      // TODO: Check return value; log trouble.    }    snapshotWriter = null;    snapshotReader = null;  }

processDeletes方法实现数据删除逻辑的处理

/**   * Process snapshot entries as deletes until {@code current} catches up with   * {@code documentSnapshot}. Or, if {@code documentSnapshot} is {@code null},   * process all remaining snapshot entries as deletes.   *   * @param documentSnapshot where to stop   * @throws SnapshotReaderException   * @throws InterruptedException   */  private void processDeletes(DocumentSnapshot documentSnapshot)      throws SnapshotReaderException, InterruptedException {      //参数documentSnapshot大于当前current的,则删除当前的current;然后继续迭代快照里面下一个documentSnapshot    while (current != null        && (documentSnapshot == null            || COMPARATOR.compare(documentSnapshot, current) > 0)) {      callback.deletedDocument(          new DeleteDocumentHandle(current.getDocumentId()), getCheckpoint());      current = snapshotReader.read();    }  }

下面跟踪safelyProcessDocumentSnapshot方法

private void safelyProcessDocumentSnapshot(DocumentSnapshot snapshot)      throws InterruptedException, SnapshotReaderException,      SnapshotWriterException {    try {      processDocument(snapshot);    } catch (RepositoryException re) {      //TODO Log the exception or its message? in document sink perhaps.        //处理异常的snapshot      documentSink.add(snapshot.getDocumentId(), FilterReason.IO_EXCEPTION);    }  }

进一步调用processDocument方法,里面包括更新和新增数据的处理逻辑

/**   * Processes a document found in the document repository.   *   * @param documentSnapshot   * @throws RepositoryException   * @throws InterruptedException   * @throws SnapshotReaderException   * @throws SnapshotWriterException   */  private void processDocument(DocumentSnapshot documentSnapshot)      throws InterruptedException, RepositoryException, SnapshotReaderException,          SnapshotWriterException {    // At this point ‘current‘ >= ‘file‘, or possibly current == null if    // we‘ve processed the previous snapshot entirely.    if (current != null        && COMPARATOR.compare(documentSnapshot, current) == 0) {        //处理发生变化的documentSnapshot,并更新当前的documentSnapshot      processPossibleChange(documentSnapshot);    } else {      // This file didn‘t exist during the previous scan.        //不存在该documentSnapshot      DocumentHandle documentHandle  = documentSnapshot.getUpdate(null);           snapshotWriter.write(documentSnapshot);      // Null if filtered due to mime-type.      if (documentHandle != null) {        callback.newDocument(documentHandle, getCheckpoint(-1));      }    }  }

处理更新情况

/**   * Processes a document found in the document repository that also appeared   * in the previous scan. Determines whether the document has changed,   * propagates changes to the client and writes the snapshot record.   *   * @param documentSnapshot   * @throws RepositoryException   * @throws InterruptedException   * @throws SnapshotWriterException   * @throws SnapshotReaderException   */  private void processPossibleChange(DocumentSnapshot documentSnapshot)      throws RepositoryException, InterruptedException, SnapshotWriterException,             SnapshotReaderException {      //大概是对比hash值    DocumentHandle documentHandle = documentSnapshot.getUpdate(current);    //写入快照文件    snapshotWriter.write(documentSnapshot);    if (documentHandle == null) {      // No change.        //如果未发生改变,则不发送    } else {      // Normal change - send the gsa an update.      callback.changedDocument(documentHandle, getCheckpoint());    }    current = snapshotReader.read();  }

更新数据的快照和新增数据的快照首先持久化到最新的快照文件

数据提交通过回调callback成员的相关方法,最后将数据提交到ChangeQueue队列对象

Callback接口定义了数据处理的相关方法

/**   * 回调接口   * The client provides an implementation of this interface to receive   * notification of changes to the repository.   */  public static interface Callback {    public void passBegin() throws InterruptedException;        public void newDocument(DocumentHandle documentHandle,        MonitorCheckpoint mcp) throws InterruptedException;    public void deletedDocument(DocumentHandle documentHandle,        MonitorCheckpoint mcp) throws InterruptedException;    public void changedDocument(DocumentHandle documentHandle,        MonitorCheckpoint mcp) throws InterruptedException;    public void passComplete(MonitorCheckpoint mcp) throws InterruptedException;    public boolean hasEnqueuedAtLeastOneChangeThisPass();    public void passPausing(int sleepms) throws InterruptedException;  }

在ChangeQueue队列类内部定义了内部类Callback,实现了该接口,在其实现方法里面将提交的数据添加到ChangeQueue队列类的成员阻塞队列之中 

/**   * 回调接口实现:向阻塞队列pendingChanges加入Change元素   * Adds {@link Change Changes} to this queue.   */  private class Callback implements DocumentSnapshotRepositoryMonitor.Callback {    private int changeCount = 0;    public void passBegin() {      changeCount = 0;      activityLogger.scanBeginAt(new Timestamp(System.currentTimeMillis()));    }    /* @Override */    public void changedDocument(DocumentHandle dh, MonitorCheckpoint mcp)        throws InterruptedException {      ++changeCount;      pendingChanges.put(new Change(Change.FactoryType.CLIENT, dh, mcp));      activityLogger.gotChangedDocument(dh.getDocumentId());    }     /* @Override */    public void deletedDocument(DocumentHandle dh, MonitorCheckpoint mcp)        throws InterruptedException {      ++changeCount;      pendingChanges.put(new Change(Change.FactoryType.INTERNAL, dh, mcp));      activityLogger.gotDeletedDocument(dh.getDocumentId());    }    /* @Override */    public void newDocument(DocumentHandle dh, MonitorCheckpoint mcp)        throws InterruptedException {      ++changeCount;      pendingChanges.put(new Change(Change.FactoryType.CLIENT, dh, mcp));      activityLogger.gotNewDocument(dh.getDocumentId());    }    /* @Override */    public void passComplete(MonitorCheckpoint mcp) throws InterruptedException {      activityLogger.scanEndAt(new Timestamp(System.currentTimeMillis()));      if (introduceDelayAfterEveryScan || changeCount == 0) {        Thread.sleep(sleepInterval);      }    }    public boolean hasEnqueuedAtLeastOneChangeThisPass() {      return changeCount > 0;    }    /* @Override */    public void passPausing(int sleepms) throws InterruptedException {      Thread.sleep(sleepms);    }  }

---------------------------------------------------------------------------

本系列企业搜索引擎开发之连接器connector系本人原创

转载请注明出处 博客园 刺猬的温驯

本人邮箱: chenying998179@163#com (#改为.)

本文链接 http://www.cnblogs.com/chenying99/p/3789505.html