首页 > 代码库 > nutch 生产者队列的大小如何控制
nutch 生产者队列的大小如何控制
如果topN 设置为1000万 ,不会这1000万都放到QueueFeeder(内存)中,而是从文件系统中(hdfs)中迭代不断填充QueueFeeder。
队列中默认存放 threadcount * 50 。
这个类的作用是从文件系统读文件填充队列。
/** * This class feeds the queues with input items, and re-fills them as * items are consumed by FetcherThread-s. */ private static class QueueFeeder extends Thread { private final Context context; private final FetchItemQueues queues; private final int size; private Iterator<FetchEntry> currentIter; //FetchEntry实现了 org.apache.hadoop.io.Writable boolean hasMore; private long timelimit = -1; public QueueFeeder(Context context, FetchItemQueues queues, int size) throws IOException, InterruptedException { this.context = context; this.queues = queues; this.size = size; this.setDaemon(true); this.setName("QueueFeeder"); hasMore = context.nextKey(); if (hasMore) { currentIter = context.getValues().iterator(); } // the value of the time limit is either -1 or the time where it should finish timelimit = context.getConfiguration().getLong("fetcher.timelimit", -1); } @Override public void run() { int cnt = 0; int timelimitcount = 0; try { while (hasMore) { if (System.currentTimeMillis() >= timelimit && timelimit != -1) { // enough .. lets‘ simply // read all the entries from the input without processing them while (currentIter.hasNext()) { currentIter.next(); timelimitcount++; } hasMore = context.nextKey(); if (hasMore) { currentIter = context.getValues().iterator(); } continue; } int feed = size - queues.getTotalSize(); if (feed <= 0) { // queues are full - spin-wait until they have some free space try { Thread.sleep(1000); } catch (final Exception e) {}; continue; } if (LOG.isDebugEnabled()) { LOG.debug("-feeding " + feed + " input urls ..."); } while (feed > 0 && currentIter.hasNext()) { FetchEntry entry = currentIter.next(); final String url = TableUtil.unreverseUrl(entry.getKey()); queues.addFetchItem(url, entry.getWebPage()); feed--; cnt++; } if (currentIter.hasNext()) { continue; // finish items in current list before reading next key } hasMore = context.nextKey(); if (hasMore) { currentIter = context.getValues().iterator(); } } } catch (Exception e) { LOG.error("QueueFeeder error reading input, record " + cnt, e); return; } LOG.info("QueueFeeder finished: total " + cnt + " records. Hit by time limit :" + timelimitcount); context.getCounter("FetcherStatus","HitByTimeLimit-QueueFeeder").increment(timelimitcount); } }
nutch 生产者队列的大小如何控制
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。