首页 > 代码库 > Storm【实践系列-如何写一个爬虫】 - ParserBolt
Storm【实践系列-如何写一个爬虫】 - ParserBolt
阅读背景: 如果您对爬虫,或则web前端不够了解,请自行google。
代码前提:您需要参阅本ID 所写的前面两篇博文: Storm【实践系列-如何写一个爬虫】 - Fetcher
本章主题: ParserBolt 如何完成的解析,并且如何从前面的组件得到数据,并emit出去。
博文流程: 博文将整个 爬虫系列公开,其过程为:
1 : 代码实现。
2 : 对代码的细节进行解析。
3 : 对真个设计进行回顾,并作总结。
如果您在参看本ID的博文的过程之中,只存在流程 1。那么请继续等待。一旦公司业务不处于饱和阶段。
本ID将保证每日一篇。
package com.digitalpebble.storm.crawler.bolt.parser; import java.io.ByteArrayInputStream; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.commons.lang.StringUtils; import org.apache.tika.Tika; import org.apache.tika.metadata.Metadata; import org.apache.tika.parser.ParseContext; import org.apache.tika.sax.BodyContentHandler; import org.apache.tika.sax.Link; import org.apache.tika.sax.LinkContentHandler; import org.apache.tika.sax.TeeContentHandler; import org.slf4j.LoggerFactory; import org.xml.sax.ContentHandler; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import com.codahale.metrics.Timer; import com.digitalpebble.storm.crawler.StormConfiguration; import com.digitalpebble.storm.crawler.filtering.URLFilters; import com.digitalpebble.storm.crawler.util.Configuration; import com.digitalpebble.storm.crawler.util.HistogramMetric; import com.digitalpebble.storm.crawler.util.MeterMetric; import com.digitalpebble.storm.crawler.util.TimerMetric; import com.digitalpebble.storm.crawler.util.URLUtil; /** * Uses Tika to parse the output of a fetch and extract text + metadata ***/ @SuppressWarnings("serial") public class ParserBolt extends BaseRichBolt { private Configuration config; private Tika tika; private URLFilters filters = null; private OutputCollector collector; private static final org.slf4j.Logger LOG = LoggerFactory .getLogger(ParserBolt.class); private MeterMetric eventMeters; private HistogramMetric eventHistograms; private TimerMetric eventTimers; private boolean ignoreOutsideHost = false; private boolean ignoreOutsideDomain = false; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { config = StormConfiguration.create(); String urlconfigfile = config.get("urlfilters.config.file", "urlfilters.json"); if (urlconfigfile != null) try { filters = new URLFilters(urlconfigfile); } catch (IOException e) { LOG.error("Exception caught while loading the URLFilters"); } ignoreOutsideHost = config.getBoolean( "parser.ignore.outlinks.outside.host", false); ignoreOutsideDomain = config.getBoolean( "parser.ignore.outlinks.outside.domain", false); // instanciate Tika long start = System.currentTimeMillis(); tika = new Tika(); long end = System.currentTimeMillis(); LOG.debug("Tika loaded in " + (end - start) + " msec"); this.collector = collector; this.eventMeters = context.registerMetric("parser-meter", new MeterMetric(), 5); this.eventTimers = context.registerMetric("parser-timer", new TimerMetric(), 5); this.eventHistograms = context.registerMetric("parser-histograms", new HistogramMetric(), 5); } public void execute(Tuple tuple) { eventMeters.scope("tuple_in").mark(); byte[] content = tuple.getBinaryByField("content"); eventHistograms.scope("content_bytes").update(content.length); String url = tuple.getStringByField("url"); HashMap<String, String[]> metadata = (HashMap<String, String[]>) tuple .getValueByField("metadata"); // TODO check status etc... Timer.Context timer = eventTimers.scope("parsing").time(); // rely on mime-type provided by server or guess? ByteArrayInputStream bais = new ByteArrayInputStream(content); Metadata md = new Metadata(); String text = null; LinkContentHandler linkHandler = new LinkContentHandler(); ContentHandler textHandler = new BodyContentHandler(); TeeContentHandler teeHandler = new TeeContentHandler(linkHandler, textHandler); ParseContext parseContext = new ParseContext(); // parse try { tika.getParser().parse(bais, teeHandler, md, parseContext); text = textHandler.toString(); } catch (Exception e) { LOG.error("Exception while parsing " + url, e.getMessage()); eventMeters.scope( "error_content_parsing_" + e.getClass().getSimpleName()) .mark(); collector.fail(tuple); eventMeters.scope("tuple_fail").mark(); return; } finally { try { bais.close(); } catch (IOException e) { LOG.error("Exception while closing stream", e); } } long duration = timer.stop(); LOG.info("Parsed " + url + " in " + duration + " msec"); // get the outlinks and convert them to strings (for now) String fromHost; URL url_; try { url_ = new URL(url); fromHost = url_.getHost().toLowerCase(); } catch (MalformedURLException e1) { // we would have known by now as previous // components check whether the URL is valid LOG.error("MalformedURLException on " + url); eventMeters.scope( "error_outlinks_parsing_" + e1.getClass().getSimpleName()) .mark(); collector.fail(tuple); eventMeters.scope("tuple_fail").mark(); return; } List<Link> links = linkHandler.getLinks(); Set<String> slinks = new HashSet<String>(links.size()); for (Link l : links) { if (StringUtils.isBlank(l.getUri())) continue; String urlOL = null; try { URL tmpURL = URLUtil.resolveURL(url_, l.getUri()); urlOL = tmpURL.toExternalForm(); } catch (MalformedURLException e) { LOG.debug("MalformedURLException on " + l.getUri()); eventMeters.scope( "error_out_link_parsing_" + e.getClass().getSimpleName()).mark(); continue; } // filter the urls if (filters != null) { urlOL = filters.filter(urlOL); if (urlOL == null) { eventMeters.scope("outlink_filtered").mark(); continue; } } if (urlOL != null && ignoreOutsideHost) { String toHost; try { toHost = new URL(urlOL).getHost().toLowerCase(); } catch (MalformedURLException e) { toHost = null; } if (toHost == null || !toHost.equals(fromHost)) { urlOL = null; // skip it eventMeters.scope("outlink_outsideHost").mark(); continue; } } if (urlOL != null) { slinks.add(urlOL); eventMeters.scope("outlink_kept").mark(); } } // add parse md to metadata for (String k : md.names()) { // TODO handle mutliple values String[] values = md.getValues(k); metadata.put("parse." + k, values); } collector.emit(tuple, new Values(url, content, metadata, text.trim(), slinks)); collector.ack(tuple); eventMeters.scope("tuple_success").mark(); } public void declareOutputFields(OutputFieldsDeclarer declarer) { // output of this module is the list of fields to index // with at least the URL, text content declarer.declare(new Fields("url", "content", "metadata", "text", "outlinks")); } }
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。