`
wbj0110
  • 浏览: 1549275 次
  • 性别: Icon_minigender_1
  • 来自: 上海
文章分类
社区版块
存档分类
最新评论

Hadoop实例:单轮MapReduce的矩阵乘法

阅读更多

 

最近开始在看@王斌_ICTIR老师的《大数据:互联网大规模数据挖掘与分布式处理》,下面是对第二章提到的的单轮计算矩阵乘法进行的学习实现过程。

 

 

    矩阵的乘法只有在第一个矩阵的列数(column)和第二个矩阵的行数(row)相同时才有定义。一般单指矩阵乘积时,指的便是一般矩阵乘积。若Ai×r矩阵,Br×j矩阵,则他们的乘积AB(有时记做A · B)会是一个i×j矩阵。其乘积矩阵的元素如下面式子得出:

 

 

 

Hadoop实例:单轮MapReduce的矩阵乘法

 

   

 

    书中提到的对矩阵乘法的MapReduce实现方法是:

 

    Map函数:对于矩阵M的每个元素M[i,j],产生一系列的键值对(i,k)->(M,j, M[i,j]),其中k=1,2…,直到矩阵N的列数。同样,对于矩阵N的每个元素N[j,k],产生一系列的键值对(ik)->(N,j,N[j,k]),其中i=1,2…,直到矩阵M的行数。

 

    Reduce函数:根据MR的原理,相同键i,k的数据会发送个同一个 reduce。如果M2*2矩阵,N2×3矩阵,reduce函数需要处理的数据为:

 

1,1->[(M,1, M[1,1])(M,2, M[1,2])(N,1, N[1,1])(N,2, N[2,1])]

 

1,2->[(M,1, M[1,1])(M,2, M[1,2])(N,1, N[1,2])(N,2, N[2,2])]

 

1,3->[(M,1, M[1,1])(M,2, M[1,2])(N,1, N[1,3])(N,2, N[2,3])],

 

2,1->[(M,1, M[2,1])(M,2, M[2,2])(N,1, N[1,1])(N,2, N[2,1])]

 

2,2->[(M,1, M[2,1])(M,2, M[2,2])(N,1, N[1,2])(N,2, N[2,2])]

 

2,3->[(M,1, M[2,1])(M,2, M[2,2])(N,1, N[1,3])(N,2, N[2,3])]

 

 

 

    这样只要将所有(M,j, M[i,j])(N,j, N[j,k])分别按照j值排序并放在不同的两个列表里面。将这个列表的第j个元素M[i,j]N[j,k]相乘,然后将这些积相加,最后积的和与键(i,k)组对作为reduce函数的输出。对于上面的例子reduce的输出就是:

 

1,1->M[1,1]* N[1,1]+ M[1,2]* N[2,1]

 

1,2->M[1,1]* N[1,2]+ M[1,2]* N[2,2]

 

1,3->M[1,1]* N[1,3]+ M[1,2]* N[2,3]

 

2,1->M[2,1]* N[2,1]+ M[2,2]* N[2,1]

 

2,2->M[2,1]* N[1,2]+ M[2,2]* N[2,2]

 

2,3->M[2,1]* N[1,3]+ M[2,2]* N[2,3]

 

 

 

    下面是MapReduce的实现步骤:

 

    (1).构造矩阵M300*150;矩阵N150*500。两矩阵的值放在一个M.data文件中,每行的格式为:文件标识#行坐标#列坐标#坐标值。

 

 

 

Hadoop实例:单轮MapReduce的矩阵乘法

 

   

 

    (2).基于上面的方法编写Map函数和Reduce函数。代码详见:

 

        https://github.com/intergret/snippet/blob/master/MartrixMultiplication.java

 

Hadoop实例:单轮MapReduce的矩阵乘法

 

 

 

 

 

    

 

    (3).将运行的结果文件copy到本地,并使用check.py对结果中元素[10,95]的正确性进行验证。

 

Hadoop实例:单轮MapReduce的矩阵乘法

 

1. [代码]MapReduce    

 

001 import java.io.IOException;
002 import org.apache.hadoop.conf.Configuration;
003 import org.apache.hadoop.fs.Path;
004 import org.apache.hadoop.io.Text;
005 import org.apache.hadoop.mapreduce.Job;
006 import org.apache.hadoop.mapreduce.Mapper;
007 import org.apache.hadoop.mapreduce.Reducer;
008 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
009 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
010 import org.apache.hadoop.util.GenericOptionsParser;
011  
012  
013 public class MartrixMultiplication{
014  
015   public static class MartrixMapper extends Mapper<Object, Text, Text, Text>{
016      
017     private Text map_key = new Text();
018     private Text map_value = new Text();
019      
020     int rNumber = 300;
021     int cNumber = 500;
022     String fileTarget;
023     String i, j, k, ij, jk;
024      
025        
026     public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
027        
028         String eachterm[] = value.toString().split("#");
029          
030         fileTarget = eachterm[0];
031          
032         if(fileTarget.equals("M")){
033             i = eachterm[1];
034             j = eachterm[2];
035             ij = eachterm[3];
036              
037             for(int c = 1; c<=cNumber; c++){
038                 map_key.set(i + "#" + String.valueOf(c));
039                 map_value.set("M" + "#" + j + "#" + ij);
040                 context.write(map_key, map_value);
041             }
042              
043         }else if(fileTarget.equals("N")){
044             j = eachterm[1];
045             k = eachterm[2];
046             jk = eachterm[3];
047              
048             for(int r = 1; r<=rNumber; r++){
049                 map_key.set(String.valueOf(r) + "#" +k);
050             map_value.set("N" + "#" + j + "#" + jk);
051             context.write(map_key, map_value);
052           }
053              
054         }
055     }
056   }
057    
058    
059   public static class MartrixReducer extends Reducer<Text,Text,Text,Text> {
060      
061     private Text reduce_value = new Text();
062      
063     int jNumber = 150;
064      
065       int M_ij[] = new int[jNumber+1];
066     int N_jk[] = new int[jNumber+1];
067      
068     int j, ij, jk;
069      
070     String fileTarget;
071     int jsum = 0;
072      
073     public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
074        
075       jsum = 0;
076        
077       for (Text val : values) {
078           String eachterm[] = val.toString().split("#");
079            
080           fileTarget = eachterm[0];
081           j = Integer.parseInt(eachterm[1]);
082            
083           if(fileTarget.equals("M")){
084               ij = Integer.parseInt(eachterm[2]);
085               M_ij[j] = ij;
086           }else if(fileTarget.equals("N")){
087               jk = Integer.parseInt(eachterm[2]);
088               N_jk[j] = jk;
089         }
090            
091       }
092        
093        
094       for(int d = 1; d<=jNumber; d++){
095           jsum +=  M_ij[d] * N_jk[d];
096       }
097        
098       reduce_value.set(String.valueOf(jsum));
099         context.write(key, reduce_value);
100        
101     }
102   }
103    
104  
105   public static void main(String[] args) throws Exception {
106        
107         Configuration conf = new Configuration();
108         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
109         if (otherArgs.length != 2) {
110           System.err.println("Usage: MartrixMultiplication <in> <out>");
111           System.exit(2);
112         }
113          
114         Job job = new Job(conf, "martrixmultiplication");
115         job.setJarByClass(MartrixMultiplication.class);
116         job.setMapperClass(MartrixMapper.class);
117         job.setReducerClass(MartrixReducer.class);
118          
119         job.setOutputKeyClass(Text.class);
120         job.setOutputValueClass(Text.class);
121          
122         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
123         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
124          
125         System.exit(job.waitForCompletion(true) ? 0 : 1);
126                      
127   }
128    
129 }
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics