`
wbj0110
  • 浏览: 1551133 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

Nutch抓取源码分析之Crawl类

阅读更多

1、初始化一个Configuration实例设置抓取工作配置;

2、设置一些默认抓取工作参数,比如进程数threads、抓取深度depth、抓取网页数量topN;

3、创建抓取工作抓取到的网页文件的存放目录(crawldb、linkdb、segments、indexes、index),用来存放原生网页,以及网页解析出的文本内容及其其它相关数据;

4、在抓取工作及其索引过程中,这通过初始化一些实现了这些操作的类的实例来完成的,例如:InjectorGeneratorFetcherParseSegment、CrawlDb、LinkDb、Indexer、DeleteDuplicates、IndexMerger。

5、最后,就开始执行相关操作了,包括初始化CrawlDb、生成抓取工作列表、抓取网页文件、更新CrawlDb、倒排Links、建立索引、复制索引文件、合并索引文件。

[java] view plaincopy
 
  1. //初始化配置文件  
  2.     Configuration conf = NutchConfiguration.createCrawlConfiguration();  
  3.     JobConf job = new NutchJob(conf);  
  4. // urls存放目录  
  5.     Path rootUrlDir = null;  
  6. Path dir = new Path("crawl-" + getDate());  
  7. //抓取线程数目  
  8. int threads = job.getInt("fetcher.threads.fetch"10);  
  9. //抓取工作遍历深度  
  10.     int depth = 5;  
  11.     long topN = Long.MAX_VALUE;  
  12.     String indexerName = "lucene";  
  13.     String solrUrl = null;  
  14.       
  15.     for (int i = 0; i < args.length; i++) {  
  16.       if ("-dir".equals(args[i])) {  
  17.         dir = new Path(args[i+1]);  
  18.         i++;  
  19.       } else if ("-threads".equals(args[i])) {  
  20.         threads = Integer.parseInt(args[i+1]);  
  21.         i++;  
  22.       } else if ("-depth".equals(args[i])) {  
  23.         depth = Integer.parseInt(args[i+1]);  
  24.         i++;  
  25.       } else if ("-topN".equals(args[i])) {  
  26.           topN = Integer.parseInt(args[i+1]);  
  27.           i++;  
  28.       } else if ("-solr".equals(args[i])) {  
  29.         indexerName = "solr";  
  30.         solrUrl = StringUtils.lowerCase(args[i + 1]);  
  31.         i++;  
  32.       } else if (args[i] != null) {  
  33.         rootUrlDir = new Path(args[i]);  
  34.       }  
  35.     }  
  36. FileSystem fs = FileSystem.get(job);  
  37. //抓取工作的不同操作需要的五个目录  
  38.     Path crawlDb = new Path(dir + "/crawldb");  
  39.     Path linkDb = new Path(dir + "/linkdb");  
  40.     Path segments = new Path(dir + "/segments");  
  41.     Path indexes = new Path(dir + "/indexes");  
  42.     Path index = new Path(dir + "/index");  
  43.     Path tmpDir = job.getLocalPath("crawl"+Path.SEPARATOR+getDate());  
  44.     Injector injector = new Injector(conf);  
  45.     Generator generator = new Generator(conf);  
  46.     Fetcher fetcher = new Fetcher(conf);  
  47.     ParseSegment parseSegment = new ParseSegment(conf);  
  48.     CrawlDb crawlDbTool = new CrawlDb(conf);  
  49.     LinkDb linkDbTool = new LinkDb(conf);  
  50.     // 初始化crawlDb  
  51.     injector.inject(crawlDb, rootUrlDir);  
  52.     int i;  
  53. for (i = 0; i < depth; i++) {             // generate new segment  
  54. //生成抓取工作列表,文件名以时间挫命名  
  55.       Path[] segs = generator.generate(crawlDb, segments, -1, topN, System  
  56.           .currentTimeMillis());  
  57.       if (segs == null) {  
  58.         LOG.info("Stopping at depth=" + i + " - no more URLs to fetch.");  
  59.         break;  
  60.       }  
  61.       //抓取网页内容  
  62.       fetcher.fetch(segs[0], threads, org.apache.nutch.fetcher.Fetcher.isParsing(conf));  // fetch it  
  63.       if (!Fetcher.isParsing(job)) {  
  64. //解析网页内容  
  65.         parseSegment.parse(segs[0]);    // parse it, if needed  
  66.       }  
  67. //更新CrawlDB  
  68.       crawlDbTool.update(crawlDb, segs, truetrue); // update crawldb  
  69. }  
  70.   
  71.     if (i > 0) {  
  72.       linkDbTool.invert(linkDb, segments, truetruefalse); // invert links  
  73.       // index, dedup & merge  
  74.       FileStatus[] fstats = fs.listStatus(segments, HadoopFSUtil.getPassDirectoriesFilter(fs));  
  75.       if (isSolrIndex) {  
  76.         SolrIndexer indexer = new SolrIndexer(conf);  
  77.         indexer.indexSolr(solrUrl, crawlDb, linkDb,   
  78.             Arrays.asList(HadoopFSUtil.getPaths(fstats)));  
  79.       }  
  80.       else {  
  81.         DeleteDuplicates dedup = new DeleteDuplicates(conf);          
  82.         if(indexes != null) {  
  83.           // Delete old indexes  
  84.           if (fs.exists(indexes)) {  
  85.             LOG.info("Deleting old indexes: " + indexes);  
  86.             fs.delete(indexes, true);  
  87.           }  
  88.           // Delete old index  
  89.           if (fs.exists(index)) {  
  90.             LOG.info("Deleting old merged index: " + index);  
  91.             fs.delete(index, true);  
  92.           }  
  93.         }  
  94.         //索引过程  
  95.         Indexer indexer = new Indexer(conf);  
  96.         indexer.index(indexes, crawlDb, linkDb,   
  97.             Arrays.asList(HadoopFSUtil.getPaths(fstats)));  
  98.           
  99.         IndexMerger merger = new IndexMerger(conf);  
  100.         if(indexes != null) {  
  101.           //复制索引文件  
  102.           dedup.dedup(new Path[] { indexes });  
  103.           fstats = fs.listStatus(indexes, HadoopFSUtil.getPassDirectoriesFilter(fs));  
  104.           //将索引目录index 中的索引文件合并后写入到indexes 目录中  
  105.           merger.merge(HadoopFSUtil.getPaths(fstats), index, tmpDir);  
  106.         }  
  107.       }      
  108.     }   

转:http://blog.csdn.net/cdl2008sky/article/details/7219825

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics