`
wbj0110
  • 浏览: 1536692 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

分别使用Hadoop MapReduce、hive统计手机流量

阅读更多

问题导读
1.hive实现统计的查询语句是什么?
2.生产环境中为什么建议使用外部表?
3.hadoop mapreduce创建类DataWritable的作用是什么?
4.为什么创建类DataWritable?
5.如何实现统计手机流量?
6.对比hive与mapreduce统计手机流量的区别?







1.使用Hive进行手机流量统计

很多公司在使用hive对数据进行处理。
hive是hadoop家族成员,是一种解析like sql语句的框架。它封装了常用MapReduce任务,让你像执行sql一样操作存储在HDFS的表。
hive的表分为两种,内表和外表。
Hive 创建内部表时,会将数据移动到数据仓库指向的路径;若创建外部表,仅记录数据所在的路径,不对数据的位置做任何改变。
在删除表的时候,内部表的元数据和数据会被一起删除, 而外部表只删除元数据,不删除数据。这样外部表相对来说更加安全些,数据组织也更加灵活,方便共享源数据。
Hive的内外表,还有一个Partition的分区的知识点,用于避免全表扫描,快速检索。后期的文章会提到。
接下来开始正式开始《Hive统计手机流量》


原始数据:

1363157985066   13726230503     00-FD-07-A4-72-B8:CMCC  120.196.100.82  i02.c.aliimg.com                24      27      2481    24681   200
1363157995052   13826544101     5C-0E-8B-C7-F1-E0:CMCC  120.197.40.4                    4       0       264     0       200
1363157991076   13926435656     20-10-7A-28-CC-0A:CMCC  120.196.100.99                  2       4       132     1512    200
1363154400022   13926251106     5C-0E-8B-8B-B1-50:CMCC  120.197.40.4                    4       0       240     0       200
1363157993044   18211575961     94-71-AC-CD-E6-18:CMCC-EASY     120.196.100.99  iface.qiyi.com  瑙.?缃..        15   2      1527    2106    200
1363157995074   84138413        5C-0E-8B-8C-E8-20:7DaysInn      120.197.40.4    122.72.52.12            20      16      4116    1432    200
1363157993055   13560439658     C4-17-FE-BA-DE-D9:CMCC  120.196.100.99                  18      15      1116    954     200
1363157995033   15920133257     5C-0E-8B-C7-BA-20:CMCC  120.197.40.4    sug.so.360.cn   淇℃.瀹..        20      20   156    2936    200


操作步骤:

  1. #配置好Hive之后,使用hive命令启动hive框架。hive启动属于懒加载模式,会比较慢
  2. hive;
  3. #使用show databases命令查看当前数据库信息
  4. hive> show databases;
  5. OK
  6. default
  7. hive 
  8. Time taken: 3.389 seconds
  9. #使用 use hive命令,使用指定的数据库 hive数据库是我之前创建的
  10. use hive;
  11. #创建表,这里是创建内表。内表加载hdfs上的数据,会将被加载文件中的内容剪切走。
  12. #外表没有这个问题,所以在实际的生产环境中,建议使用外表。
  13. create table ll(reportTime string,msisdn string,apmac string,acmac string,host string,siteType string,upPackNum bigint,downPackNum bigint,upPayLoad bigint,downPayLoad bigint,httpStatus string)row format delimited fields terminated by '\t';
  14. #加载数据,这里是从hdfs加载数据,也可用linux下加载数据 需要local关键字
  15. load data inpath'/HTTP_20130313143750.dat' into table ll;
  16. #数据加载完毕之后,hdfs的
  17. #执行hive 的like sql语句,对数据进行统计
  18. select msisdn,sum(uppacknum),sum(downpacknum),sum(uppayload),sum(downpayload) from ll group by msisdn;
复制代码

执行结果如下:

  1. hive> select msisdn,sum(uppacknum),sum(downpacknum),sum(uppayload),sum(downpayload) from ll group by msisdn;
  2. Total MapReduce jobs = 1
  3. Launching Job 1 out of 1
  4. Number of reduce tasks not specified. Estimated from input data size: 1
  5. In order to change the average load for a reducer (in bytes):
  6.   set hive.exec.reducers.bytes.per.reducer=<number>
  7. In order to limit the maximum number of reducers:
  8.   set hive.exec.reducers.max=<number>
  9. In order to set a constant number of reducers:
  10.   set mapred.reduce.tasks=<number>
  11. Starting Job = job_201307160252_0006, Tracking URL = http://hadoop0:50030/jobdetails.jsp?jobid=job_201307160252_0006
  12. Kill Command = /usr/local/hadoop/libexec/../bin/hadoop job  -Dmapred.job.tracker=hadoop0:9001 -kill job_201307160252_0006
  13. Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
  14. 2013-07-17 19:51:42,599 Stage-1 map = 0%,  reduce = 0%
  15. 2013-07-17 19:52:40,474 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 48.5 sec
  16. 2013-07-17 19:52:41,690 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 48.5 sec
  17. 2013-07-17 19:52:42,693 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 48.5 sec
  18. 2013-07-17 19:52:43,698 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 48.5 sec
  19. 2013-07-17 19:52:44,702 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 48.5 sec
  20. 2013-07-17 19:52:45,707 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 48.5 sec
  21. 2013-07-17 19:52:46,712 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 48.5 sec
  22. 2013-07-17 19:52:47,715 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 48.5 sec
  23. 2013-07-17 19:52:48,721 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 48.5 sec
  24. 2013-07-17 19:52:49,758 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 48.5 sec
  25. 2013-07-17 19:52:50,763 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 48.5 sec
  26. 2013-07-17 19:52:51,772 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 50.0 sec
  27. 2013-07-17 19:52:52,775 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 50.0 sec
  28. 2013-07-17 19:52:53,779 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 50.0 sec
  29. MapReduce Total cumulative CPU time: 50 seconds 0 msec
  30. Ended Job = job_201307160252_0006
  31. MapReduce Jobs Launched: 
  32. Job 0: Map: 1  Reduce: 1   Cumulative CPU: 50.0 sec   HDFS Read: 2787075 HDFS Write: 16518 SUCCESS
  33. Total MapReduce CPU Time Spent: 50 seconds 0 msec
  34. OK
  35. 13402169727        171        108        11286        130230
  36. 13415807477        2067        1683        169668        1994181
  37. 13416127574        1501        1094        161963        802756
  38. 13416171820        113        99        10630        32120
  39. 13417106524        160        128        18688        13088
  40. 13418002498        240        256        22136        86896
  41. 13418090588        456        351        98934        67470
  42. 13418117364        264        152        29436        49966
  43. 13418173218        37680        48348        2261286        73159722
  44. 13418666750        22432        26482        1395648        39735552
  45. 13420637670        20        20        1480        1480
  46. ......
  47. Time taken: 75.24 seconds
复制代码







2.Hadoop MapReduce手机流量统计

自定义一个writable

  1. package cn.maoxiangyi.hadoop.wordcount;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import org.apache.hadoop.io.Writable;
  6. public class DataWritable implements Writable {
  7.         
  8.         private int upPackNum;
  9.         private int downPackNum;
  10.         private int upPayLoad;
  11.         private int downPayLoad;
  12.         
  13.         public DataWritable() {
  14.                 super();
  15.         }
  16.         public DataWritable(int upPackNum, int downPackNum, int upPayLoad,
  17.                         int downPayLoad) {
  18.                 super();
  19.                 this.upPackNum = upPackNum;
  20.                 this.downPackNum = downPackNum;
  21.                 this.upPayLoad = upPayLoad;
  22.                 this.downPayLoad = downPayLoad;
  23.         }
  24.         @Override
  25.         public void write(DataOutput out) throws IOException {
  26.                 out.writeInt(upPackNum);
  27.                 out.writeInt(downPackNum);
  28.                 out.writeInt(upPayLoad);
  29.                 out.writeInt(downPayLoad);
  30.         }
  31.         @Override
  32.         public void readFields(DataInput in) throws IOException {
  33.                 upPackNum = in.readInt();
  34.                 downPackNum = in.readInt();
  35.                 upPayLoad = in.readInt();
  36.                 downPayLoad =in.readInt();
  37.         }
  38.         public int getUpPackNum() {
  39.                 return upPackNum;
  40.         }
  41.         public void setUpPackNum(int upPackNum) {
  42.                 this.upPackNum = upPackNum;
  43.         }
  44.         public int getDownPackNum() {
  45.                 return downPackNum;
  46.         }
  47.         public void setDownPackNum(int downPackNum) {
  48.                 this.downPackNum = downPackNum;
  49.         }
  50.         public int getUpPayLoad() {
  51.                 return upPayLoad;
  52.         }
  53.         public void setUpPayLoad(int upPayLoad) {
  54.                 this.upPayLoad = upPayLoad;
  55.         }
  56.         public int getDownPayLoad() {
  57.                 return downPayLoad;
  58.         }
  59.         public void setDownPayLoad(int downPayLoad) {
  60.                 this.downPayLoad = downPayLoad;
  61.         }
  62.         @Override
  63.         public String toString() {
  64.                 return "        " + upPackNum + "        "
  65.                                 + downPackNum + "        " + upPayLoad + "        "
  66.                                 + downPayLoad;
  67.         }
  68.         
  69.         
  70. }
复制代码

MapReduc函数

  1. package cn.maoxiangyi.hadoop.wordcount;
  2. import java.io.IOException;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.io.IntWritable;
  6. import org.apache.hadoop.io.LongWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. import org.apache.hadoop.mapreduce.Reducer;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13. public class DataTotalMapReduce {
  14.         public static void main(String[] args) throws Exception {
  15.                 Configuration configuration = new Configuration();
  16.                 Job job = new Job(configuration);
  17.                 job.setJarByClass(DataTotalMapReduce.class);
  18.                 job.setMapperClass(DataTotalMapper.class);
  19.                 job.setReducerClass(DataTotalReducer.class);
  20.                 job.setOutputKeyClass(Text.class);
  21.                 job.setOutputValueClass(DataWritable.class);
  22.                 job.setCombinerClass(DataTotalReducer.class);
  23.                 Path inputDir = new Path("hdfs://hadoop0:9000/HTTP_20130313143750.dat");
  24.                 FileInputFormat.addInputPath(job, inputDir);
  25.                 Path outputDir = new Path("hdfs://hadoop0:9000/dataTotal");
  26.                 FileOutputFormat.setOutputPath(job, outputDir);
  27.                 job.waitForCompletion(true);
  28.         }
  29. }
  30. /**
  31. * 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82
  32. * i02.c.aliimg.com 24 27 2481 24681 200 1363157995052 13826544101
  33. * 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656
  34. * 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
  35. */
  36. class DataTotalMapper extends Mapper<LongWritable, Text, Text, DataWritable> {
  37.         @Override
  38.         protected void map(LongWritable key, Text value, Context context)
  39.                         throws IOException, InterruptedException {
  40.                 String lineStr = value.toString();
  41.                 String[] strArr = lineStr.split("\t");
  42.                 String phpone = strArr[1];
  43.                 String upPackNum = strArr[6];
  44.                 String downPackNum = strArr[7];
  45.                 String upPayLoad = strArr[8];
  46.                 String downPayLoad = strArr[9];
  47.                 context.write(
  48.                                 new Text(phpone),
  49.                                 new DataWritable(Integer.parseInt(upPackNum), Integer
  50.                                                 .parseInt(downPackNum), Integer.parseInt(upPayLoad),
  51.                                                 Integer.parseInt(downPayLoad)));
  52.         }
  53. }
  54. class DataTotalReducer extends Reducer<Text, DataWritable, Text, DataWritable> {
  55.         @Override
  56.         protected void reduce(Text k2, Iterable<DataWritable> v2, Context context)
  57.                         throws IOException, InterruptedException {
  58.                 int upPackNumSum = 0;
  59.                 int downPackNumSum = 0;
  60.                 int upPayLoadSum = 0;
  61.                 int downPayLoadSum = 0;
  62.                 for (DataWritable dataWritable : v2) {
  63.                         upPackNumSum += dataWritable.getUpPackNum();
  64.                         downPackNumSum += dataWritable.getDownPackNum();
  65.                         upPayLoadSum += dataWritable.getUpPayLoad();
  66.                         downPayLoadSum += dataWritable.getDownPayLoad();
  67.                 }
  68.                 context.write(k2, new DataWritable(upPackNumSum, downPackNumSum, upPayLoadSum, downPayLoadSum));
  69.         }
  70. }
复制代码


结果节选

  1. 13402169727             171     108     11286   130230
  2. 13415807477             2067    1683    169668  1994181
  3. 13416127574             1501    1094    161963  802756
  4. 13416171820             113     99      10630   32120
  5. 13417106524             160     128     18688   13088
  6. 13418002498             240     256     22136   86896
  7. 13418090588             456     351     98934   67470
  8. 13418117364             264     152     29436   49966
  9. 13418173218             37680   48348   2261286 73159722
  10. 13418666750             22432   26482   1395648 39735552
  11. 13420637670             20      20      1480    1480
  12. 13422149173             40      32      4000    3704
  13. 13422311151             465     535     33050   661790
  14. 13424077835             84      72      15612   9948
  15. 13424084200             765     690     60930   765675
  16. 13428887537             43892   44830   2925330 65047620
  17. 13430219372             454     352     33792   192876
  18. 13430234524             27852   39056   1767220 52076614
  19. 13430237899             1293    1165    166346  808613
  20. 13430258776             4681    4783    350511  6609423
  21. 13430266620             10544   9377    11600817        5728002
  22. 13432023893             40      0       2400    0
复制代码

 

 

http://www.aboutyun.com/forum.php?highlight=hive&mod=viewthread&tid=7455

 
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics