搜索
查看: 7930|: 0

MapReduce TopK统计加排序

[复制链接]

1

主题

0

回帖

15

积分

新手上路

积分
15
发表于 2014-5-21 13:25:05 | 显示全部楼层 |阅读模式
Hadoop技术内幕中指出Top K算法有两步,一是统计词频,二是找出词频最高的前K个词。在网上找了很多MapReduce的Top K案例,这些案例都只有排序功能,所以自己写了个案例。
这个案例分两个步骤,第一个是就是wordCount案例,二就是排序功能。

一,统计词频
  1. package TopK;
  2. import java.io.IOException;
  3. import java.util.StringTokenizer;

  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IntWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. import org.apache.hadoop.mapreduce.Reducer;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

  13. /**
  14. * 统计词频
  15. * @author zx
  16. * zhangxian1991@qq.com
  17. */
  18. public class WordCount {
  19.    
  20.     /**
  21.      * 读取单词
  22.      * @author zx
  23.      *
  24.      */
  25.     public static class Map extends Mapper<Object,Text,Text,IntWritable>{

  26.         IntWritable count = new IntWritable(1);
  27.         
  28.         @Override
  29.         protected void map(Object key, Text value, Context context)
  30.                 throws IOException, InterruptedException {
  31.             StringTokenizer st = new StringTokenizer(value.toString());
  32.             while(st.hasMoreTokens()){   
  33.                 String word = st.nextToken().replaceAll(""", "").replace("'", "").replace(".", "");
  34.                 context.write(new Text(word), count);
  35.             }
  36.         }
  37.         
  38.     }
  39.    
  40.     /**
  41.      * 统计词频
  42.      * @author zx
  43.      *
  44.      */
  45.     public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable>{

  46.         @SuppressWarnings("unused")
  47.         @Override
  48.         protected void reduce(Text key, Iterable<IntWritable> values,Context context)
  49.                 throws IOException, InterruptedException {
  50.             int count = 0;
  51.             for (IntWritable intWritable : values) {
  52.                 count ++;
  53.             }
  54.             context.write(key,new IntWritable(count));
  55.         }
  56.         
  57.     }
  58.    
  59.     @SuppressWarnings("deprecation")
  60.     public static boolean run(String in,String out) throws IOException, ClassNotFoundException, InterruptedException{
  61.         
  62.         Configuration conf = new Configuration();
  63.         
  64.         Job job = new Job(conf,"WordCount");
  65.         job.setJarByClass(WordCount.class);
  66.         job.setMapperClass(Map.class);
  67.         job.setReducerClass(Reduce.class);
  68.         
  69.         // 设置Map输出类型
  70.         job.setMapOutputKeyClass(Text.class);
  71.         job.setMapOutputValueClass(IntWritable.class);

  72.         // 设置Reduce输出类型
  73.         job.setOutputKeyClass(Text.class);
  74.         job.setOutputValueClass(IntWritable.class);

  75.         // 设置输入和输出目录
  76.         FileInputFormat.addInputPath(job, new Path(in));
  77.         FileOutputFormat.setOutputPath(job, new Path(out));
  78.         
  79.         return job.waitForCompletion(true);
  80.     }
  81.    
  82. }
复制代码


二,排序 并求出频率最高的前K个词
  1. package TopK;

  2. import java.io.IOException;
  3. import java.util.Comparator;
  4. import java.util.Map.Entry;
  5. import java.util.Set;
  6. import java.util.StringTokenizer;
  7. import java.util.TreeMap;
  8. import java.util.regex.Pattern;

  9. import org.apache.hadoop.conf.Configuration;
  10. import org.apache.hadoop.fs.Path;
  11. import org.apache.hadoop.io.IntWritable;
  12. import org.apache.hadoop.io.Text;
  13. import org.apache.hadoop.mapreduce.Job;
  14. import org.apache.hadoop.mapreduce.Mapper;
  15. import org.apache.hadoop.mapreduce.Reducer;
  16. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  17. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  18. import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
  19. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

  20. /**
  21. * 以单词出现的频率排序
  22. *
  23. * @author zx
  24. * zhangxian1991@qq.com
  25. */
  26. public class Sort {

  27.     /**
  28.      * 读取单词(词频 word)
  29.      *
  30.      * @author zx
  31.      *
  32.      */
  33.     public static class Map extends Mapper<Object, Text, IntWritable, Text> {

  34.         // 输出key 词频
  35.         IntWritable outKey = new IntWritable();
  36.         Text outValue = new Text();

  37.         @Override
  38.         protected void map(Object key, Text value, Context context)
  39.                 throws IOException, InterruptedException {

  40.             StringTokenizer st = new StringTokenizer(value.toString());
  41.             while (st.hasMoreTokens()) {
  42.                 String element = st.nextToken();
  43.                 if (Pattern.matches("\\d+", element)) {
  44.                     outKey.set(Integer.parseInt(element));
  45.                 } else {
  46.                     outValue.set(element);
  47.                 }
  48.             }

  49.             context.write(outKey, outValue);
  50.         }

  51.     }

  52.     /**
  53.      * 根据词频排序
  54.      *
  55.      * @author zx
  56.      *
  57.      */
  58.     public static class Reduce extends
  59.             Reducer<IntWritable, Text, Text, IntWritable> {
  60.         
  61.         private static MultipleOutputs<Text, IntWritable> mos = null;
  62.         
  63.         //要获得前K个频率最高的词
  64.         private static final int k = 10;
  65.         
  66.         //用TreeMap存储可以利用它的排序功能
  67.         //这里用 MyInt 因为TreeMap是对key排序,且不能唯一,而词频可能相同,要以词频为Key就必需对它封装
  68.         private static TreeMap<MyInt, String> tm = new TreeMap<MyInt, String>(new Comparator<MyInt>(){
  69.             /**
  70.              * 默认是从小到大的顺序排的,现在修改为从大到小
  71.              * @param o1
  72.              * @param o2
  73.              * @return
  74.              */
  75.             @Override
  76.             public int compare(MyInt o1, MyInt o2) {
  77.                 return o2.compareTo(o1);
  78.             }
  79.             
  80.         }) ;
  81.         
  82.         /*
  83.          * 以词频为Key是要用到reduce的排序功能
  84.          */
  85.         @Override
  86.         protected void reduce(IntWritable key, Iterable<Text> values,
  87.                 Context context) throws IOException, InterruptedException {
  88.             for (Text text : values) {
  89.                 context.write(text, key);
  90.                 tm.put(new MyInt(key.get()),text.toString());
  91.                
  92.                 //TreeMap以对内部数据进行了排序,最后一个必定是最小的
  93.                 if(tm.size() > k){
  94.                     tm.remove(tm.lastKey());
  95.                 }
  96.                
  97.             }
  98.         }

  99.         @Override
  100.         protected void cleanup(Context context)
  101.                 throws IOException, InterruptedException {
  102.             String path = context.getConfiguration().get("topKout");
  103.             mos = new MultipleOutputs<Text, IntWritable>(context);
  104.             Set<Entry<MyInt, String>> set = tm.entrySet();
  105.             for (Entry<MyInt, String> entry : set) {
  106.                 mos.write("topKMOS", new Text(entry.getValue()), new IntWritable(entry.getKey().getValue()), path);
  107.             }
  108.             mos.close();
  109.         }

  110.         
  111.         
  112.     }

  113.     @SuppressWarnings("deprecation")
  114.     public static void run(String in, String out,String topKout) throws IOException,
  115.             ClassNotFoundException, InterruptedException {

  116.         Path outPath = new Path(out);

  117.         Configuration conf = new Configuration();
  118.         
  119.         //前K个词要输出到哪个目录
  120.         conf.set("topKout",topKout);
  121.         
  122.         Job job = new Job(conf, "Sort");
  123.         job.setJarByClass(Sort.class);
  124.         job.setMapperClass(Map.class);
  125.         job.setReducerClass(Reduce.class);

  126.         // 设置Map输出类型
  127.         job.setMapOutputKeyClass(IntWritable.class);
  128.         job.setMapOutputValueClass(Text.class);

  129.         // 设置Reduce输出类型
  130.         job.setOutputKeyClass(Text.class);
  131.         job.setOutputValueClass(IntWritable.class);

  132.         //设置MultipleOutputs的输出格式
  133.         //这里利用MultipleOutputs进行对文件输出
  134.         MultipleOutputs.addNamedOutput(job,"topKMOS",TextOutputFormat.class,Text.class,Text.class);
  135.         
  136.         // 设置输入和输出目录
  137.         FileInputFormat.addInputPath(job, new Path(in));
  138.         FileOutputFormat.setOutputPath(job, outPath);
  139.         job.waitForCompletion(true);

  140.     }

  141. }
复制代码

自己封装的Int
  1. package TopK;

  2. public class MyInt implements Comparable<MyInt>{
  3.     private Integer value;

  4.     public MyInt(Integer value){
  5.         this.value = value;
  6.     }
  7.    
  8.     public int getValue() {
  9.         return value;
  10.     }

  11.     public void setValue(int value) {
  12.         this.value = value;
  13.     }

  14.     @Override
  15.     public int compareTo(MyInt o) {
  16.         return value.compareTo(o.getValue());
  17.     }
  18.    
  19.    
  20. }
复制代码

运行入口
  1. package TopK;

  2. import java.io.IOException;

  3. /**
  4. *
  5. * @author zx
  6. *zhangxian1991@qq.com
  7. */
  8. public class TopK {
  9.     public static void main(String args[]) throws ClassNotFoundException, IOException, InterruptedException{
  10.         
  11.         //要统计字数,排序的文字
  12.         String in = "hdfs://localhost:9000/input/MaDing.text";
  13.         
  14.         //统计字数后的结果
  15.         String wordCout = "hdfs://localhost:9000/out/wordCout";
  16.         
  17.         //对统计完后的结果再排序后的内容
  18.         String sort = "hdfs://localhost:9000/out/sort";
  19.         
  20.         //前K条
  21.         String topK = "hdfs://localhost:9000/out/topK";
  22.         
  23.         //如果统计字数的job完成后就开始排序
  24.         if(WordCount.run(in, wordCout)){
  25.             Sort.run(wordCout, sort,topK);
  26.         }
  27.         
  28.     }
  29. }
复制代码




您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

大数据中国微信

QQ   

版权所有: Discuz! © 2001-2013 大数据.

GMT+8, 2025-1-28 03:53 , Processed in 0.099419 second(s), 24 queries .

快速回复 返回顶部 返回列表