Nutch Crawler工作流程及文件格式详细分析
Crawler和Searcher两部分被尽是分开,其主要目的是为了使两个部分可以布地配置在硬件平台上,例如Crawler和Searcher分别被放置在两个主机上,这样可以极大的提高灵活性和性能。
一、总体介绍
1、先注入种子urls到crawldb
2、循环:
[ul][li]generate 从crawldb中生成一个url的子集用于抓取[/li][li]fetch 抓取上一小的url生成一个个segment[/li][li]parse 分析已抓取segment的内容[/li][li]update 把已抓取的数据更新到原先的crawldb[/li][/ul]
3、从已抓取的segments中分析出link地图
4、索引segment文本及inlink锚文本
二、相关的数据结构
Crawl DB
● CrawlDb 是一个包含如下结构数据的文件:
<URL, CrawlDatum>
● CrawlDatum:
<status, date, interval, failures, linkCount,...>
● Status:
{db_unfetched, db_fetched, db_gone,linked,
fetch_success, fetch_fail, fetch_gone}
爬虫Crawler:
Crawler的工作流程包括了整个nutch的所有步骤--injector,generator,fetcher,parseSegment,updateCrawleDB,Invert links, Index ,DeleteDuplicates,IndexMerger
Crawler涉及的数据文件格式和含义,和以上的各个步骤相关的文件分别被存放在物理设备上的以下几个文件夹里,crawldb,segments,indexes,linkdb,index五个文件夹里。
那么各个步骤和流程是怎么,各个文件夹里又是放着什么呢?观察Crawler类可以知道它的流程。
1、Injector injector =new Injector(conf); //Crawl.java line 104
Usage: Injector<crawldb><url_dir>
首先是建立起始url集,每个url都经过URLNormalizers、filter和scoreFilter三个过程并标记状态。首先经过normalizerplugin,把url进行标准化,比如basic nomalizer的作用有把大写的url标准化为小写,把空格去除等等。然后再经过的plugin是filter,可以根据你写的正则表达式把想要的url留下来。经过两个步骤后,然后就是把这个url进行状态标记,每个url都对应着一个CrawlDatum,这个类对应着每个url在所有生命周期内的一切状态。细节上还有这个url处理的时间和初始时的分值。
同时,在这个步骤里,会在文件系统里生成如下文件夹crawlDB\current\part-00000, 这个文件夹里还有.data.crc, .index.crc, data, index四个文件
● MapReduce1: 把输入的文件转换成DB格式
In:包含urls的文本文件
Map(line) →<url, CrawlDatum>;status=db_unfetched
Reduce() isidentity;
Output:临时的输出文件夹
● MapReduce2: 合并到现有的DB
Input:第一步的输出和已存在的DB文件
Map() isidentity.
Reduce:合并CrawlDatum成一个实体(entry)
Out:一个新的DB
2、Generator generator = newGenerator(conf);
//Generates a subset of a crawl db tofetch
Usage: Generator <crawldb><segments_dir> [-force] [-topN N][-numFetchers numFetchers] [-adddays numDays] [-noFilter]
在这个步骤里,Generator一共做了四件事情,
1、给前面injector完成的输出结果里按分值选出前topN个url,作为一个fetch的子集。
2、根据第一步的结果检查是否已经选取出一些url,CrawlDatum的实体集。
3、再次转化,此次要以url的host来分组,并以url的hash来排序。
4、根据以上的步骤的结果来更新crawldb(injector产生)。
● MapReduce1: 根据要求选取一些要抓取的url
In: Crawl DB文件
Map() → ifdate≥now, invert to <CrawlDatum,url>
Partition以随机的hash值来分组
Reduce:
compare() 以CrawlDatum.linkCount的降序排列
output onlytop-N most-linked entries
● MapReduce2: 为下一步抓取准备
Map() isinvert; Partition() by host, Reduce() is identity.
Out:包含<url,CrawlDatum> 要并行抓取的文件
3、Fetcher fetcher = newFetcher(conf);
//The fetcher. Most of the work is done byplugins
Usage: Fetcher<segment> [-threads n][-noParsing]
这个步骤里,Fetcher所做的事情主要就是抓取了,同时也完成一些其它的工作。首先,这是一个多线程的步骤,默认以10个线程去抓取。根据抓取回来后的结果状态来进行不同的标记,存储,再处理等等行为。输入是上一步骤Generator产生的segment文件夹,这个步骤里,考虑到先前已经按照ip或host来patition了,所以在此就不再把input文件进行分割了。程序继承了SequenceFileInputFormat重写了inputFormat来达到这点。这个类的各种形为都是插件来具体完成的,它只是一个骨架一样为各种插件提供一个平台。它先根据url来取出具体的protocol,得到protocolOutput,进而得到状态status及内容content。然后,根据抓取的状态status来继续再处理。再处理时,首先会将这次抓取的内容content、状态status及它的状态标记进行存储。这个存储的过程中,还会记下抓取的时间,再把segment存过metadata,同时在分析parsing前经过scoreFilter,再用parseUtil(一系列的parse插件)进行分析,分析后再经过一次score插件的处理。经过这一系列处理后,最后进行输出(url,fetcherOutput)。
之前讲到根据抓取回来的各种状态,进行再处理,这些状态一共包括12种,比如当抓取成功时,会像上刚讲的那样先存储结果,再判断是否是链接跳转,跳转的次数等等处理。
● MapReduce:抓取
In:<url,CrawlDatum>, 以host分区,以hash值排序
Map(url,CrawlDatum) → <url,FetcherOutput>
多线程的,同步的map实现
调用已有的协议protocol插件
FetcherOutput: <CrawlDatum,Content>
Reduce isidentity
Out: 两个文件:<url,CrawlDatum>,<url,Content>
4、ParseSegment parseSegment= new ParseSegment(conf);
//Parse content in a segment
Usage: ParseSegmentsegment
对于这个步骤的逻辑比较简单,只是对抓取后上一步骤存储在segment里的content进行分析parse。同样,这个步骤的具体工作也是由插件来完成的。
MapReduce: 分析内容
In:<url, Content> 抓取来的内容
Map(url,Content) → <url, Parse>
调用分析插件parserplugins
Reduce isidentity.
Parse:<ParseText, ParseData>
Out:分割成三个文件: <url,ParseText>,<url,ParseData>和<url,CrawlDatum> 为了outlinks.
5、CrawlDb crawlDbTool = newCrawlDb(conf);
//takes the output of the fetcher and updates the crawldbaccordingly.
Usage: CrawlDb<crawldb> (-dir<segments> |<seg1><seg2> ...) [-force] [-normalize][-filter] [-noAdditions]
这个类主要是根据fetcher的输出去更新crawldb。map和reduce分别做了两方面的事情,在map里是对url的nomalizer,和filte,在reduce里是对新抓取进来的页面(CrawlDatum)和原先已经存在的进行合并。
MapReduce:合并抓取的和分析后的输出到crawldb里
In:<url,CrawlDatum>现有的db加上抓取后的和分析后的输出
Map() isidentity
Reduce()合并所有实体(entry)成一个,以抓取后的状态覆盖原先的db状态信息,统计出分析后的链接数
Out: 新的crawldb
6.LinkDb linkDbTool = newLinkDb(conf);
//Maintains an inverted link map, listing incoming links foreach url.
Usage: LinkDb<linkdb> (-dir<segmentsDir> |<seg1><seg2> ...) [-force] [-noNormalize][-noFilter]
这个类的作用是管理新转化进来的链接映射,并列出每个url的外部链接(incominglinks)。先是对每一个url取出它的outLinks,作map操作把这个url作为每个outLinks的incominglink,在reduce里把根据每个key来把一个url的所有incominglink都加到inlinks里。这样就把每个url的外部链接统计出来了。然后一步是对这些新加进来的链接进行合并。
● MapReduce: 统计每个链接的外部链接
In:<url,ParseData>, 包含所有链接的分析后的结果
Map(srcUrl,ParseData> → <destUrl,Inlinks>
为每个链出链接收集一个入链。
Inlinks:<srcUrl, anchorText>*
Reduce()加上外部入链数量
Out:<url, Inlinks>, 一个相关完整的链接地图
7.Indexer indexer = newIndexer(conf);
//Create indexes for segments
Usage:<index><crawldb><linkdb><segment> ...
这个类的任务是另一方面的工作了,它是基于hadoop和lucene的分布式索引。它就是为前面爬虫抓取回来的数据进行索引好让用户可以搜索到这些数据。这里的输入就比较多了,有segments下的fetch_dir,parseData和parseText,还有crawldb下的current_dir和linkdb下的current_dir。在这个类里,map也不做,在reduce时处理。当然要把这些数据体组合成一个lucene的document让它索引了。在reduce里组装好后收集时是<url,doc>,最后在输出的OutputFormat类里进行真正的索引。
● MapReduce: 生成lucene的索引文件
In: 外个文件,values 以 <Class, Object>包装
<url, ParseData> from parse, 有title,metadata, 等等信息.
<url, ParseText> from parse, 文本text
<url, Inlinks> from invert,锚文本anchors
<url, CrawlDatum> fromfetch,用于抓取
Map() isidentity
Reduce()生成Lucene Document
调用index插件
Out:建立Lucene 索引; 最后存储到文件系统上
8.DeleteDuplicates dedup = new DeleteDuplicates(conf);
//这个类的作用就是它的名字了。
Usage: DeleteDuplicates<indexes> ...
这个类的作用就是这它的名字所写的意思--去重。前面索引后(当然不是一次时的情况)会有重复,所以要去重。为什么呢,在一次索引时是不重复的,可是多次抓取后就会有重复了。就是这个原因才要去重。当然去重的规则有两种一个是以时间为标准,一种是以内容的md5值为标准。
9.IndexMerger merger= new IndexMerger(conf);
IndexMerger [-workingdir<workingdir>] outputIndexindexesDir...
这个类就比较简单了,把所有的小索引合并成一个索引。在这一步没有用到map-reduce。
在这九大步骤中generator,fetcher,parseSegment,crawlDbTool会根据抓取的层数循环运行,当抓取的层数大于1时会运行linkInvert,index,dedup,和merge。