首页 > 代码库 > Pipeline & PageProcesser

Pipeline & PageProcesser

Pipeline & PageProcesser

这两部分是应该程序员自己实现的部分,因为PageProcesser关乎如何解析页面而Pipeline则是存储,推荐使用OOSpider也就是注解式编程。

Downloader

public interface Downloader {

/**
* Downloads web pages and store in Page object.
*
* @param request request
* @param task task
* @return page
*/
public Page download(Request request, Task task);

/**
* Tell the downloader how many threads the spider used.
* @param threadNum number of threads
*/
public void setThread(int threadNum);
}

主要的实现类又3个,我只重点介绍一下HttpClientDownloader,有兴趣的可以自己看看源码

@ThreadSafe
public class HttpClientDownloader extends AbstractDownloader {

private Logger logger = LoggerFactory.getLogger(getClass());

private final Map<String, CloseableHttpClient> httpClients = new HashMap<String, CloseableHttpClient>();

private HttpClientGenerator httpClientGenerator = new HttpClientGenerator();

private CloseableHttpClient getHttpClient(Site site, Proxy proxy) {
if (site == null) {
return httpClientGenerator.getClient(null, proxy);
}
String domain = site.getDomain();
CloseableHttpClient httpClient = httpClients.get(domain);
if (httpClient == null) {
synchronized (this) {
httpClient = httpClients.get(domain);
if (httpClient == null) {
httpClient = httpClientGenerator.getClient(site, proxy);
httpClients.put(domain, httpClient);
}
}
}
return httpClient;
}

@Override
public Page download(Request request, Task task) {
Site site = null;
if (task != null) {
site = task.getSite();
}
Set<Integer> acceptStatCode;
String charset = null;
Map<String, String> headers = null;
if (site != null) {
acceptStatCode = site.getAcceptStatCode();
charset = site.getCharset();
headers = site.getHeaders();
} else {
acceptStatCode = Sets.newHashSet(200);
}
logger.info("downloading page {}", request.getUrl());
CloseableHttpResponse httpResponse = null;
int statusCode=0;
try {
HttpHost proxyHost = null;
Proxy proxy = null; //TODO
if (site.getHttpProxyPool() != null && site.getHttpProxyPool().isEnable()) {
proxy = site.getHttpProxyFromPool();
proxyHost = proxy.getHttpHost();
} else if(site.getHttpProxy()!= null){
proxyHost = site.getHttpProxy();
}

HttpUriRequest httpUriRequest = getHttpUriRequest(request, site, headers, proxyHost);
httpResponse = getHttpClient(site, proxy).execute(httpUriRequest);???
statusCode = httpResponse.getStatusLine().getStatusCode();
request.putExtra(Request.STATUS_CODE, statusCode);
if (statusAccept(acceptStatCode, statusCode)) {
Page page = handleResponse(request, charset, httpResponse, task);
onSuccess(request);
return page;
} else {
logger.warn("code error " + statusCode + "\t" + request.getUrl());
return null;
}
} catch (IOException e) {
logger.warn("download page " + request.getUrl() + " error", e);
if (site.getCycleRetryTimes() > 0) {
return addToCycleRetry(request, site);
}
one rror(request);
return null;
} finally {
request.putExtra(Request.STATUS_CODE, statusCode);
if (site.getHttpProxyPool()!=null && site.getHttpProxyPool().isEnable()) {
site.returnHttpProxyToPool((HttpHost) request.getExtra(Request.PROXY), (Integer) request
.getExtra(Request.STATUS_CODE));
}
try {
if (httpResponse != null) {
//ensure the connection is released back to pool
EntityUtils.consume(httpResponse.getEntity());
}
} catch (IOException e) {
logger.warn("close response fail", e);
}
}
}

@Override
public void setThread(int thread) {
httpClientGenerator.setPoolSize(thread);
}

protected boolean statusAccept(Set<Integer> acceptStatCode, int statusCode) {
return acceptStatCode.contains(statusCode);
}

protected HttpUriRequest getHttpUriRequest(Request request, Site site, Map<String, String> headers,HttpHost proxy) {
RequestBuilder requestBuilder = selectRequestMethod(request).setUri(request.getUrl());
if (headers != null) {
for (Map.Entry<String, String> headerEntry : headers.entrySet()) {
requestBuilder.addHeader(headerEntry.getKey(), headerEntry.getValue());
}
}
RequestConfig.Builder requestConfigBuilder = RequestConfig.custom()
.setConnectionRequestTimeout(site.getTimeOut())
.setSocketTimeout(site.getTimeOut())
.setConnectTimeout(site.getTimeOut())
.setCookieSpec(CookieSpecs.BEST_MATCH);
if (proxy !=null) {
requestConfigBuilder.setProxy(proxy);
request.putExtra(Request.PROXY, proxy);
}
requestBuilder.setConfig(requestConfigBuilder.build());
return requestBuilder.build();
}

protected RequestBuilder selectRequestMethod(Request request) {
String method = request.getMethod();
if (method == null || method.equalsIgnoreCase(HttpConstant.Method.GET)) {
//default get
return RequestBuilder.get();
} else if (method.equalsIgnoreCase(HttpConstant.Method.POST)) {
RequestBuilder requestBuilder = RequestBuilder.post();
NameValuePair[] nameValuePair = (NameValuePair[]) request.getExtra("nameValuePair");
if (nameValuePair != null && nameValuePair.length > 0) {
requestBuilder.addParameters(nameValuePair);
}
return requestBuilder;
} else if (method.equalsIgnoreCase(HttpConstant.Method.HEAD)) {
return RequestBuilder.head();
} else if (method.equalsIgnoreCase(HttpConstant.Method.PUT)) {
return RequestBuilder.put();
} else if (method.equalsIgnoreCase(HttpConstant.Method.DELETE)) {
return RequestBuilder.delete();
} else if (method.equalsIgnoreCase(HttpConstant.Method.TRACE)) {
return RequestBuilder.trace();
}
throw new IllegalArgumentException("Illegal HTTP Method " + method);
}

protected Page handleResponse(Request request, String charset, HttpResponse httpResponse, Task task) throws IOException {
String content = getContent(charset, httpResponse);
Page page = new Page();
page.setRawText(content);
page.setUrl(new PlainText(request.getUrl()));
page.setRequest(request);
page.setStatusCode(httpResponse.getStatusLine().getStatusCode());
return page;
}

protected String getContent(String charset, HttpResponse httpResponse) throws IOException {
if (charset == null) {
byte[] contentBytes = IOUtils.toByteArray(httpResponse.getEntity().getContent());
String htmlCharset = getHtmlCharset(httpResponse, contentBytes);
if (htmlCharset != null) {
return new String(contentBytes, htmlCharset);
} else {
logger.warn("Charset autodetect failed, use {} as charset. Please specify charset in Site.setCharset()", Charset.defaultCharset());
return new String(contentBytes);
}
} else {
return IOUtils.toString(httpResponse.getEntity().getContent(), charset);
}
}

protected String getHtmlCharset(HttpResponse httpResponse, byte[] contentBytes) throws IOException {
String charset;
// charset
// 1、encoding in http header Content-Type
String value = http://www.mamicode.com/httpResponse.getEntity().getContentType().getValue();
charset = UrlUtils.getCharset(value);
if (StringUtils.isNotBlank(charset)) {
logger.debug("Auto get charset: {}", charset);
return charset;
}
// use default charset to decode first time
Charset defaultCharset = Charset.defaultCharset();
String content = new String(contentBytes, defaultCharset.name());
// 2、charset in meta
if (StringUtils.isNotEmpty(content)) {
Document document = Jsoup.parse(content);
Elements links = document.select("meta");
for (Element link : links) {
// 2.1、html4.01 <meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />
String metaContent = link.attr("content");
String metaCharset = link.attr("charset");
if (metaContent.indexOf("charset") != -1) {
metaContent = metaContent.substring(metaContent.indexOf("charset"), metaContent.length());
charset = metaContent.split("=")[1];
break;
}
// 2.2、html5 <meta charset="UTF-8" />
else if (StringUtils.isNotEmpty(metaCharset)) {
charset = metaCharset;
break;
}
}
}
logger.debug("Auto get charset: {}", charset);
// 3、todo use tools as cpdetector for content decode
return charset;
}
}

其中包括了添加http proxy这部分官方文档都没有介绍,如果需要那就自行看源码吧- -b
再看带那种的这部分

if (statusAccept(acceptStatCode, statusCode)) {
Page page = handleResponse(request, charset, httpResponse, task);
onSuccess(request);
return page;
} else {
logger.warn("code error " + statusCode + "\t" + request.getUrl());
return null;
}

acceptStatCode默认是200,如果出现其他resultCode那么将会直接return null,也不会释放HttpClient的资源,也就是下面的finally块不会被执行。也算是一个bug吧

finally {
request.putExtra(Request.STATUS_CODE, statusCode);
if (site.getHttpProxyPool()!=null && site.getHttpProxyPool().isEnable()) {
site.returnHttpProxyToPool((HttpHost) request.getExtra(Request.PROXY), (Irequest
.getExtra(Request.STATUS_CODE));
}
try {
if (httpResponse != null) {
//ensure the connection is released back to pool
EntityUtils.consume(httpResponse.getEntity());
}
} catch (IOException e) {
logger.warn("close response fail", e);
}
}

到此为止,所有的关于WebMagic的主体源码都介绍完毕了,如果你需要使用那么目前的知识已经足够了,如果出现bug还是需要自行更改,还好WebMagic给我们提供了更换组件的接口,使用起来还是很方便的。

Pipeline & PageProcesser