首页 > 代码库 > 企业搜索引擎开发之连接器connector(二十七)

企业搜索引擎开发之连接器connector(二十七)

ChangeQueue类实现ChangeSource接口,声明了拉取下一条Change对象的方法

 * A source of {@link Change} objects. * * @since 2.8 */public interface ChangeSource {  /**   * @return the next change, or {@code null} if there is no change available   */  public Change getNextChange();}

在ChangeQueue类实例里面初始化阻塞队列private final BlockingQueue<Change> pendingChanges,作为保存Change对象容器

/**   * 初始化阻塞队列pendingChanges   * @param size   * @param sleepInterval   * @param introduceDelayAfterEachScan   * @param activityLogger   */  private ChangeQueue(int size, long sleepInterval,       boolean introduceDelayAfterEachScan, CrawlActivityLogger activityLogger) {    pendingChanges = new ArrayBlockingQueue<Change>(size);    this.sleepInterval = sleepInterval;    this.activityLogger = activityLogger;    this.introduceDelayAfterEveryScan = introduceDelayAfterEachScan;  }

参数introduceDelayAfterEveryScan设置在数据迭代完毕是否延时

上文中提到在其内部类CallBack中将提交的数据添加到阻塞队列BlockingQueue<Change> pendingChanges之中

而在ChangeQueue实现ChangeSource接口的方法中,实现从阻塞队列获取Change对象

/**   * 获取阻塞队列pendingChanges元素   * Gets the next available change from the ChangeQueue.  Will wait up to   * 1/4 second for a change to appear if none is immediately available.   *   * @return the next available change, or {@code null} if no changes are   *         available   */  public Change getNextChange() {    try {      return pendingChanges.poll(250L, TimeUnit.MILLISECONDS);    } catch (InterruptedException ie) {      return null;    }  }

ChangeQueue对象作为保存Change对象的缓冲容器,上文中分析到Change对象是通过启动监控器对象DocumentSnapshotRepositoryMonitor的线程方法添加进来的

那么,由哪个对象实现调用ChangeQueue对象的getNextChange()方法取出Change对象数据呢?

通过跟踪CheckpointAndChangeQueue类的loadUpFromChangeSource方法调用了getNextChange()方法,在该方法里面将获取的Chnage对象经过包装为CheckpointAndChange类型对象后添加到成员属性List<CheckpointAndChange> checkpointAndChangeList之中

先熟悉一下相关成员属性和构造函数

 private final AtomicInteger maximumQueueSize =      new AtomicInteger(DEFAULT_MAXIMUM_QUEUE_SIZE);  private final List<CheckpointAndChange> checkpointAndChangeList;  private final ChangeSource changeSource;  private final DocumentHandleFactory internalDocumentHandleFactory;  private final DocumentHandleFactory clientDocumentHandleFactory;  private volatile DiffingConnectorCheckpoint lastCheckpoint;  private final File persistDir;  // place to persist enqueued values  private MonitorRestartState monitorPoints = new MonitorRestartState();public CheckpointAndChangeQueue(ChangeSource changeSource, File persistDir,      DocumentHandleFactory internalDocumentHandleFactory,      DocumentHandleFactory clientDocumentHandleFactory) {    this.changeSource = changeSource;    this.checkpointAndChangeList        = Collections.synchronizedList(            new ArrayList<CheckpointAndChange>(maximumQueueSize.get()));    this.persistDir = persistDir;    this.internalDocumentHandleFactory = internalDocumentHandleFactory;    this.clientDocumentHandleFactory = clientDocumentHandleFactory;    ensurePersistDirExists();  }

包括初始化ChangeSource类型对象changeSource(也即ChangeQueue类型对象)以及List容器List<CheckpointAndChange> checkpointAndChangeList

再来回顾loadUpFromChangeSource方法

 /**   * 从ChangeSource拉取Change,加入checkpointAndChangeList   */  private void loadUpFromChangeSource() {    int max = maximumQueueSize.get();    if (checkpointAndChangeList.size() < max) {      lastCheckpoint = lastCheckpoint.nextMajor();    }       while (checkpointAndChangeList.size() < max) {      Change newChange = changeSource.getNextChange();      if (newChange == null) {        break;      }      lastCheckpoint = lastCheckpoint.next();      checkpointAndChangeList.add(new CheckpointAndChange(          lastCheckpoint, newChange));          }  }

方法主要行为即从changeSource对象取出Change对象,然后经过包装为CheckPointAndChange对象添加到 容器List<CheckpointAndChange> checkpointAndChangeList之中

在其resume方法里面调用了loadUpFromChangeSource方法(resume方法在DiffingConnectorDocumentList类的构造函数中调用)

/**   * 获取List<CheckpointAndChange>队列   * Returns an {@link Iterator} for currently available   * {@link CheckpointAndChange} objects that occur after the passed in   * checkpoint. The {@link String} form of a {@link DiffingConnectorCheckpoint}   * passed in is produced by calling   * {@link DiffingConnectorCheckpoint#toString()}. As a side effect, Objects   * up to and including the object with the passed in checkpoint are removed   * from this queue.   *   * @param checkpointString null means return all {@link CheckpointAndChange}   *        objects and a non null value means to return   *        {@link CheckpointAndChange} objects with checkpoints after the   *        passed in value.   * @throws IOException if error occurs while manipulating recovery state   */  synchronized List<CheckpointAndChange> resume(String checkpointString)      throws IOException {      //移除已完成队列    removeCompletedChanges(checkpointString);    //从ChangeSource拉取Change,加入checkpointAndChangeList    loadUpFromChangeSource();    //更新monitorPoints    monitorPoints.updateOnGuaranteed(checkpointAndChangeList);    try {        //持久化checkpointAndChangeList到队列文件        //一次resume即生成一文件      writeRecoveryState();    } finally {      // TODO: Enahnce with mechanism that remembers      // information about recovery files to avoid re-reading.        //移除冗余的队列文件 (已经消费完成的)      removeExcessRecoveryState();    }    return getList();  }

在填充List<CheckpointAndChange> checkpointAndChangeList容器后,将其中的数据以json格式持久化到队列文件 

/**    * 持久化json队列   * @throws IOException   */  private void writeRecoveryState() throws IOException {    // TODO(pjo): Move this method into RecoveryFile.    File recoveryFile = new RecoveryFile(persistDir);    FileOutputStream outStream = new FileOutputStream(recoveryFile);    Writer writer = new OutputStreamWriter(outStream, Charsets.UTF_8);    try {      try {        writeJson(writer);      } catch (JSONException e) {        throw IOExceptionHelper.newIOException("Failed writing recovery file.", e);      }      writer.flush();      outStream.getFD().sync();    } finally {      writer.close();    }  }

队列文件命名包含了当前系统时间,用于比较文件创建的早晚

/**    * 可用于比较时间的队列文件   * A File that has some of the recovery logic.    *  Original recovery files‘ names contained a single nanosecond timestamp,   *  eg.  recovery.10220010065599398 .  These turned out to be flawed   *  because nanosecond times can go "back in time" between JVM restarts.   *  Updated recovery files‘ names contain a wall clock millis timestamp    *  followed by an underscore followed by a nanotimestamp, eg.   *  recovery.702522216012_10220010065599398 .   */  static class RecoveryFile extends File {    final static long NO_TIME_AVAIL = -1;    long milliTimestamp = NO_TIME_AVAIL;    long nanoTimestamp;    long parseTime(String s) throws IOException {      try {        return Long.parseLong(s);      } catch(NumberFormatException e) {        throw new LoggingIoException("Invalid recovery filename: "            + getAbsolutePath());      }    }        /**     * 解析文件名称中包含的时间     * @throws IOException     */    void parseOutTimes() throws IOException {      try {        String basename = getName();        if (!basename.startsWith(RECOVERY_FILE_PREFIX)) {          throw new LoggingIoException("Invalid recovery filename: "              + getAbsolutePath());        } else {          String extension = basename.substring(RECOVERY_FILE_PREFIX.length());          if (!extension.contains("_")) {  // Original name format.            nanoTimestamp = parseTime(extension);          } else {  // Updated name format.            String timeParts[] = extension.split("_");            if (2 != timeParts.length) {              throw new LoggingIoException("Invalid recovery filename: "                  + getAbsolutePath());            }            milliTimestamp = parseTime(timeParts[0]);            nanoTimestamp = parseTime(timeParts[1]);          }        }      } catch(IndexOutOfBoundsException e) {        throw new LoggingIoException("Invalid recovery filename: "            + getAbsolutePath());      }    }    RecoveryFile(File persistanceDir) throws IOException {      super(persistanceDir, RECOVERY_FILE_PREFIX + System.currentTimeMillis()          + "_" + System.nanoTime());      parseOutTimes();    }        /**     * 该构造函数用于先获得文件绝对路径     * @param absolutePath     * @throws IOException     */    RecoveryFile(String absolutePath) throws IOException {      super(absolutePath);      parseOutTimes();    }    boolean isOlder(RecoveryFile other) {      boolean weHaveMillis = milliTimestamp != NO_TIME_AVAIL;      boolean otherHasMillis = other.milliTimestamp != NO_TIME_AVAIL;      boolean bothHaveMillis = weHaveMillis && otherHasMillis;      boolean neitherHasMillis = (!weHaveMillis) && (!otherHasMillis);      if (bothHaveMillis) {        if (this.milliTimestamp < other.milliTimestamp) {          return true;        } else if (this.milliTimestamp > other.milliTimestamp) {          return false;        } else {          return this.nanoTimestamp < other.nanoTimestamp;        }      } else if (neitherHasMillis) {        return this.nanoTimestamp < other.nanoTimestamp;      } else if (weHaveMillis) {  // and other doesn‘t; we are newer.        return false;      } else {  // other has millis; other is newer.        return true;      }    }        /** A delete method that logs failures. */    /**     * 删除文件     */    public void logOnFailDelete() {      boolean deleted = super.delete();      if (!deleted) {        LOG.severe("Failed to delete: " + getAbsolutePath());      }    }    // TODO(pjo): Move more recovery logic into this class.  }

下面来看在其启动方法(start方法)都做了什么

 /**   * Initialize to start processing from after the passed in checkpoint   * or from the beginning if the passed in checkpoint is null.  Part of   * making DocumentSnapshotRepositoryMonitorManager go from "cold" to "warm".   */  public synchronized void start(String checkpointString) throws IOException {    LOG.info("Starting CheckpointAndChangeQueue from " + checkpointString);    //创建队列目录    ensurePersistDirExists();    checkpointAndChangeList.clear();    lastCheckpoint = constructLastCheckpoint(checkpointString);    if (null == checkpointString) {        //删除队列文件      removeAllRecoveryState();    } else {      RecoveryFile current = removeExcessRecoveryState();      //加载monitorPoints和checkpointAndChangeList队列      loadUpFromRecoveryState(current);      //this.monitorPoints.points.entrySet();          }  }

无非从原先保存的队列文件中加载CheckPointAndChange对象列表到List<CheckpointAndChange> checkpointAndChangeList容器中(另外还包括MonitorCheckoint对象)

/**   * 加载队列   * @param file   * @throws IOException   */  private void loadUpFromRecoveryState(RecoveryFile file) throws IOException {    // TODO(pjo): Move this method into RecoveryFile.    new LoadingQueueReader().readJson(file);  }

在CheckpointAndChangeQueue类中定义了内部类,即用于从json格式文件加载CheckPointAndChange对象列表到List<CheckpointAndChange> checkpointAndChangeList容器

抽象队列读取抽象类AbstractQueueReader

/**   * 从json文件加载队列抽象类   * Reads JSON recovery files. Uses the Template Method pattern to   * delegate what to do with the parsed objects to subclasses.   *   * Note: This class uses gson for streaming support.   */  private abstract class AbstractQueueReader {    public void readJson(File file) throws IOException {      readJson(new BufferedReader(new InputStreamReader(                  new FileInputStream(file), Charsets.UTF_8)));    }    /**     * Reads and parses the stream, calling the abstract methods to     * take whatever action is required. The given stream will be     * closed automatically.     *     * @param reader the stream to parse     */    @VisibleForTesting    void readJson(Reader reader) throws IOException {      JsonReader jsonReader = new JsonReader(reader);      try {        readJson(jsonReader);      } finally {        jsonReader.close();      }    }    /**     * Reads and parses the stream, calling the abstract methods to     * take whatever action is required.     */    private void readJson(JsonReader reader) throws IOException {      JsonParser parser = new JsonParser();      reader.beginObject();      while (reader.hasNext()) {        String name = reader.nextName();        if (name.equals(MONITOR_STATE_JSON_TAG)) {          readMonitorPoints(parser.parse(reader));        } else if (name.equals(QUEUE_JSON_TAG)) {          reader.beginArray();          while (reader.hasNext()) {            readCheckpointAndChange(parser.parse(reader));          }          reader.endArray();        } else {          throw new IOException("Read invalid recovery file.");        }      }      reader.endObject();      reader.setLenient(true);      String name = reader.nextString();      if (!name.equals(SENTINAL)) {        throw new IOException("Read invalid recovery file.");      }    }    protected abstract void readMonitorPoints(JsonElement gson)        throws IOException;    protected abstract void readCheckpointAndChange(JsonElement gson)        throws IOException;  }

抽象方法由子类实现

/**   * 检测队列文件的有效性   * Verifies that a JSON recovery file is valid JSON with a   * trailing sentinel.   */  private class ValidatingQueueReader extends AbstractQueueReader {    protected void readMonitorPoints(JsonElement gson) throws IOException {    }    protected void readCheckpointAndChange(JsonElement gson)        throws IOException {    }  }     /**   * 从json文件加载队列实现类   */  /** Loads the queue from a JSON recovery file. */  /*   * TODO(jlacey): Change everything downstream to gson. For now, we   * reserialize the individual gson objects and deserialize them   * using org.json.   */  @VisibleForTesting  class LoadingQueueReader extends AbstractQueueReader {    /**     * 加载MonitorRestartState checkpoint(HashMap<String, MonitorCheckpoint> points)     */    protected void readMonitorPoints(JsonElement gson) throws IOException {      try {        JSONObject json = gsonToJson(gson);        monitorPoints = new MonitorRestartState(json);        //monitorPoints.updateOnGuaranteed(checkpointAndChangeList)      } catch (JSONException e) {        throw IOExceptionHelper.newIOException(            "Failed reading persisted JSON queue.", e);      }    }        /**     * 加载checkpointAndChangeList     */    protected void readCheckpointAndChange(JsonElement gson)        throws IOException {      try {        JSONObject json = gsonToJson(gson);        checkpointAndChangeList.add(new CheckpointAndChange(json,            internalDocumentHandleFactory, clientDocumentHandleFactory));      } catch (JSONException e) {        throw IOExceptionHelper.newIOException(            "Failed reading persisted JSON queue.", e);      }    }    // TODO(jlacey): This could be much more efficient, especially    // with LOBs, if we directly transformed the objects with a little    // recursive parser. This code is only used when recovering failed    // batches, so I don‘t know if that‘s worth the effort.    private JSONObject gsonToJson(JsonElement gson) throws JSONException {      return new JSONObject(gson.toString());    }  }

---------------------------------------------------------------------------

本系列企业搜索引擎开发之连接器connector系本人原创

转载请注明出处 博客园 刺猬的温驯

本人邮箱: chenying998179@163#com (#改为.)

本文链接 http://www.cnblogs.com/chenying99/p/3789560.html