首页 > 代码库 > 企业搜索引擎开发之连接器connector(十九)

企业搜索引擎开发之连接器connector(十九)

连接器是基于http协议通过推模式(push)向数据接收服务端推送数据,即xmlfeed格式数据(xml格式),其发送数据接口命名为Pusher

Pusher接口定义了与发送数据相关的方法

public interface Pusher {

  /**
   * Status indicating the readiness of the Pusher.
   */
  public static enum PusherStatus {
    OK, LOW_MEMORY, LOCAL_FEED_BACKLOG, GSA_FEED_BACKLOG, DISABLED;
  }

  /**
   * Takes an spi Document and pushes it along, presumably to the GSA Feed.
   *
   * @param document A Document
   * @return PusherStatus. If OK, Pusher may accept more documents.
   * @throws RepositoryException if transient error accessing the Repository
   * @throws RepositoryDocumentException if fatal error accessing the Document
   * @throws FeedException if a transient Feed error occurs in the Pusher
   * @throws PushException if a transient error occurs in the Pusher
   */
  public PusherStatus take(Document document)
      throws PushException, FeedException, RepositoryException;

  /**
   * Finishes processing a document feed.  If the caller anticipates no
   * further calls to {@link #take(Document)} will be
   * made, this method should be called, so that the Pusher may send a cached,
   * accumulated Feed to the feed processor.
   *
   * @throws RepositoryException if transient error accessing the Repository
   * @throws RepositoryDocumentException if fatal error accessing the Document
   * @throws FeedException if a transient Feed error occurs in the Pusher
   * @throws PushException if a transient error occurs in the Pusher
   */
  public void flush()
      throws PushException, FeedException, RepositoryException;

  /**
   * Cancels a feed.  Discard any accumulated feed data.
   */
  public void cancel();

  /**
   * Gets the current pusher status.
   *
   * @return the current PusherStatus
   * @throws RepositoryException if transient error accessing the Repository
   * @throws FeedException if a transient Feed error occurs in the Pusher
   * @throws PushException if a transient error occurs in the Pusher
   */
  public PusherStatus getPusherStatus()
      throws PushException, FeedException, RepositoryException;
}

该接口的通过PusherStatus take(Document document)方法发送数据,我们可以查看其实现类DocPusher查看发送数据方法源码

/**
   * Takes a Document and sends a the feed to the GSA.
   *
   * @param document Document corresponding to the document.
   * @return true if Pusher should accept more documents, false otherwise.
   * @throws PushException if Pusher problem
   * @throws FeedException if transient Feed problem
   * @throws RepositoryDocumentException if fatal Document problem
   * @throws RepositoryException if transient Repository problem
   */
  @Override
  public PusherStatus take(Document document)
      throws PushException, FeedException, RepositoryException {
    if (feedSender.isShutdown()) {
      return PusherStatus.DISABLED;
    }
    checkSubmissions();

    // Apply any configured Document filters to the document.
    document = documentFilterFactory.newDocumentFilter(document);

    FeedType feedType;
    try {
      feedType = DocUtils.getFeedType(document);
    } catch (RuntimeException e) {
      LOGGER.log(Level.WARNING,
          "Rethrowing RuntimeException as RepositoryDocumentException", e);
      throw new RepositoryDocumentException(e);
    }

    // All feeds in a feed file must be of the same type.
    // If the feed would change type, send the feed off to the GSA
    // and start a new one.
    // TODO: Fix this check to allow ACLs in any type feed.
    if (xmlFeed != null && !feedType.isCompatible(xmlFeed.getFeedType())) {
      if (LOGGER.isLoggable(Level.FINE)) {
        LOGGER.fine("A new feedType, " + feedType + ", requires a new feed for "
            + connectorName + ". Closing feed and sending to GSA.");
      }
      submitFeed();
    }

    if (xmlFeed == null) {
      if (LOGGER.isLoggable(Level.FINE)) {
        LOGGER.fine("Creating new " + feedType + " feed for " + connectorName);
      }
      try {
        startNewFeed(feedType);
      } catch (OutOfMemoryError me) {
        throw new PushException("Unable to allocate feed buffer.  Try reducing"
            + " the maxFeedSize setting, reducing the number of connector"
            + " intances, or adjusting the JVM heap size parameters.", me);
      }
    }

    boolean isThrowing = false;
    int resetPoint = xmlFeed.size();
    int resetCount = xmlFeed.getRecordCount();
    try {
      if (LOGGER.isLoggable(Level.FINER)) {
        LOGGER.log(Level.FINER, "DOCUMENT: Adding document with docid={0} and "
            + "searchurl={1} from connector {2} to feed.", new Object[] {
            DocUtils.getOptionalString(document, SpiConstants.PROPNAME_DOCID),
            DocUtils.getOptionalString(document,
              SpiConstants.PROPNAME_SEARCHURL),
            connectorName});
      }

      // Add this document to the feed.
      xmlFeed.addRecord(document);

      // If the feed is full, send it off to the GSA.
      if (xmlFeed.isFull() || lowMemory()) {
        if (LOGGER.isLoggable(Level.FINE)) {
          LOGGER.fine("Feed for " + connectorName + " has grown to "
              + xmlFeed.size() + " bytes. Closing feed and sending to GSA.");
        }
        submitFeed();
        return getPusherStatus();
      }

      // Indicate that this Pusher may accept more documents.
      return PusherStatus.OK;

    } catch (OutOfMemoryError me) {
      resetFeed(resetPoint, resetCount);
      throw new PushException("Out of memory building feed, retrying.", me);
    } catch (RuntimeException e) {
      resetFeed(resetPoint, resetCount);
      LOGGER.log(Level.WARNING,
          "Rethrowing RuntimeException as RepositoryDocumentException", e);
      throw new RepositoryDocumentException(e);
    } catch (RepositoryDocumentException rde) {
      // Skipping this document, remove it from the feed.
      resetFeed(resetPoint, resetCount);
      throw rde;
    } catch (IOException ioe) {
      LOGGER.log(Level.SEVERE, "IOException while reading: skipping", ioe);
      resetFeed(resetPoint, resetCount);
      Throwable t = ioe.getCause();
      isThrowing = true;
      if (t != null && (t instanceof RepositoryException)) {
        throw (RepositoryException) t;
      } else {
        throw new RepositoryDocumentException("I/O error reading data", ioe);
      }
    }
  }

在上面的方法中,首先需要将参数Document documen对象经过包装(如Base64编码等)添加到xmlFeed集合中,当xmlFeed集合满足条件的时候才向数据服务器发送过去,即每次向数据服务器发送的是document对象集合,而不是单独的document对象

当xmlFeed对象达到指定要求时,便调用submitFeed()方法提交xmlFeed对象

/**
     * Takes the accumulated XmlFeed and sends the feed to the GSA.
     * 
     * @throws PushException
     *             if Pusher problem
     * @throws FeedException
     *             if transient Feed problem
     * @throws RepositoryException
     */
    private void submitFeed() throws PushException, FeedException,
            RepositoryException {
        if (xmlFeed == null) {
            return;
        }

        final XmlFeed feed = xmlFeed;
        xmlFeed = null;
        final String logMessage;
        if (feedLog != null) {
            logMessage = feedLog.toString();
            feedLog = null;
        } else {
            logMessage = null;
        }

        try {
            feed.close();
        } catch (IOException ioe) {
            throw new PushException("Error closing feed", ioe);
        }

        try {
            // Send the feed to the GSA in a separate thread.
            FutureTask<String> future = new FutureTask<String>(
                    new Callable<String>() {
                        public String call() throws PushException,
                                FeedException, RepositoryException {
                            try {
                                NDC.push("Feed " + feed.getDataSource());
                                return submitFeed(feed, logMessage);
                            } finally {
                                NDC.remove();
                            }
                        }
                    });
            feedSender.execute(future);
            // Add the future to list of outstanding submissions.
            synchronized (submissions) {
                submissions.add(future);
            }
        } catch (RejectedExecutionException ree) {
            throw new FeedException("Asynchronous feed was rejected. ", ree);
        }
    }

该方法首先将数据发送方法封装到FutureTask对象的call()方法里面,然后在线程池里面执行之,最后将future结构句柄添加到LinkedList<FutureTask<String>> submissions集合

下面是调用feedConnection对象发送数据方法

/**
     * Takes the supplied XmlFeed and sends that feed to the GSA.
     * 
     * @param feed
     *            an XmlFeed
     * @param logMessage
     *            a Feed Log message
     * @return response String from GSA
     * @throws PushException
     *             if Pusher problem
     * @throws FeedException
     *             if transient Feed problem
     * @throws RepositoryException
     */
    private String submitFeed(XmlFeed feed, String logMessage)
            throws PushException, FeedException, RepositoryException {
        
        if (LOGGER.isLoggable(Level.FINE)) {
            LOGGER.fine("Submitting " + feed.getFeedType() + " feed for "
                    + feed.getDataSource() + " to the GSA. "
                    + feed.getRecordCount() + " records totaling "
                    + feed.size() + " bytes.");
        }

        // Write the generated feedLog message to the feed logger.
        if (logMessage != null && FEED_LOGGER.isLoggable(FEED_LOG_LEVEL)) {
            FEED_LOGGER.log(FEED_LOG_LEVEL, logMessage);
        }
        //将xmlfeed写入临时文件
        // Write the Feed to the TeedFeedFile, if one was specified.
        String teedFeedFilename = Context.getInstance().getTeedFeedFile();
        // String teedFeedFilename = "D:/files/google2.txt";
        if (teedFeedFilename != null) {
            boolean isThrowing = false;
            OutputStream os = null;
            try {
                os = new FileOutputStream(teedFeedFilename, true);
                feed.writeTo(os);
            } catch (IOException e) {
                isThrowing = true;
                throw new FeedException("Cannot write to file: "
                        + teedFeedFilename, e);
            } finally {
                if (os != null) {
                    try {
                        os.close();
                    } catch (IOException e) {
                        if (!isThrowing) {
                            throw new FeedException("Cannot write to file: "
                                    + teedFeedFilename, e);
                        }
                    }
                }
            }
        }

        String gsaResponse = feedConnection.sendData(feed);
        if (!gsaResponse.equals(GsaFeedConnection.SUCCESS_RESPONSE)) {
            String eMessage = gsaResponse;
            if (GsaFeedConnection.UNAUTHORIZED_RESPONSE.equals(gsaResponse)) {
                eMessage += ": Client is not authorized to send feeds. Make "
                        + "sure the GSA is configured to trust feeds from your host.";
            }
            if (GsaFeedConnection.INTERNAL_ERROR_RESPONSE.equals(gsaResponse)) {
                eMessage += ": Check GSA status or feed format.";
            }
            throw new PushException(eMessage);
        }
        return gsaResponse;
    }

如果配置了teedFeedFilename属性,则现将xmlfeed对象写入该文件,然后才是调用feedConnection对象实例发送xmlfeed对象数据

feedConnection对象基于http协议发送数据,下面是feedConnection对象发送数据方法

/* @Override */
    public String sendData(FeedData feedData) throws FeedException {
        try {
            String response = sendFeedData((XmlFeed) feedData);
            gotFeedError = !response.equalsIgnoreCase(SUCCESS_RESPONSE);
            return response;
        } catch (FeedException fe) {
            gotFeedError = true;
            throw fe;
        }
    }

进一步调用sendFeedData(XmlFeed feed)方法发送

private String sendFeedData(XmlFeed feed) throws FeedException {
        String feedType = feed.getFeedType().toLegacyString();
        String dataSource = feed.getDataSource();
        OutputStream outputStream;
        HttpURLConnection uc;
        StringBuilder buf = new StringBuilder();
        byte[] prefix;
        byte[] suffix;
        try {
            // Build prefix.
            controlHeader(buf, "datasource", ServletUtil.MIMETYPE_TEXT_PLAIN);
            buf.append(dataSource).append(CRLF);
            controlHeader(buf, "feedtype", ServletUtil.MIMETYPE_TEXT_PLAIN);
            buf.append(feedType).append(CRLF);
            controlHeader(buf, "data", ServletUtil.MIMETYPE_XML);
            prefix = buf.toString().getBytes("UTF-8");

            // Build suffix.
            buf.setLength(0);
            buf.append(CRLF).append("--").append(BOUNDARY).append("--")
                    .append(CRLF);
            suffix = buf.toString().getBytes("UTF-8");

            LOGGER.finest("Opening feed connection to " + feedUrl);
            
            synchronized (this) {
                uc = (HttpURLConnection) feedUrl.openConnection();
            }
            if (uc instanceof HttpsURLConnection && !validateCertificate) {
                SslUtil.setTrustingHttpsOptions((HttpsURLConnection) uc);
            }
          
            uc.setRequestProperty("Charsert", "UTF-8");
            uc.setDoInput(true);
            uc.setDoOutput(true);
            uc.setFixedLengthStreamingMode(prefix.length + feed.size()
                    + suffix.length);            
            uc.setRequestProperty("Content-Type",
                    "multipart/form-data; boundary=" + BOUNDARY);
            
            outputStream = uc.getOutputStream();
        } catch (IOException ioe) {
            throw new FeedException(feedUrl.toString(), ioe);
        } catch (GeneralSecurityException e) {
            throw new FeedException(feedUrl.toString(), e);
        }

        boolean isThrowing = false;
        buf.setLength(0);
        try {
            LOGGER.finest("Writing feed data to feed connection.");
            // If there is an exception during this read/write, we do our
            // best to close the url connection and read the result.
            try {
                outputStream.write(prefix);
                feed.writeTo(outputStream);
                outputStream.write(suffix);
                outputStream.flush();
            } catch (IOException e) {
                LOGGER.log(Level.SEVERE,
                        "IOException while posting: will retry later", e);
                isThrowing = true;
                throw new FeedException(e);
            } catch (RuntimeException e) {
                isThrowing = true;
                throw e;
            } catch (Error e) {
                isThrowing = true;
                throw e;
            } finally {
                try {
                    outputStream.close();
                } catch (IOException e) {
                    LOGGER.log(
                            Level.SEVERE,
                            "IOException while closing after post: will retry later",
                            e);
                    if (!isThrowing) {
                        isThrowing = true;
                        throw new FeedException(e);
                    }
                }
            }
        } finally {
            BufferedReader br = null;
            try {
                LOGGER.finest("Waiting for response from feed connection.");
                InputStream inputStream = uc.getInputStream();
                br = new BufferedReader(new InputStreamReader(inputStream,
                        "UTF8"));
                String line;
                while ((line = br.readLine()) != null) {
                    buf.append(line);
                }
            } catch (IOException ioe) {
                if (!isThrowing) {
                    throw new FeedException(ioe);
                }
            } finally {
                try {
                    if (br != null) {
                        br.close();
                    }
                } catch (IOException e) {
                    LOGGER.log(Level.SEVERE,
                            "IOException while closing after post: continuing",
                            e);
                }
                if (uc != null) {
                    uc.disconnect();
                }
                if (LOGGER.isLoggable(Level.FINEST)) {
                    LOGGER.finest("Received response from feed connection: "
                            + buf.toString());
                }
            }
        }
        return buf.toString();
    }

这里的Content-Type设置为multipart/form-data,类似于我们通过表单上传文件时设置的form属性,通过向输出流写入xmlfeed的二进制数据,然后从输入流接收反馈结果,,具体发送数据的格式本人在企业搜索引擎开发之连接器connector(十八)已提及过,本文不再描述

---------------------------------------------------------------------------

本系列企业搜索引擎开发之连接器connector系本人原创

转载请注明出处 博客园 刺猬的温驯

本人邮箱: chenying998179@163#com (#改为.)

本文链接 http://www.cnblogs.com/chenying99/p/3775504.html