0


PageRank算法的MapReduce实现(输入有向图,迭代收敛)

输入为网页有向图的邻接表:
![在这里插入图片描述](https://img-blog.csdnimg.cn/fa08d337ffb344098a30f2ebbb6b1333.png

通过统计输入文件的行数,即可得之网页总数为4

每个网页的初值为1/N,即0.25

第一行输入经过map处理后,得到如下结果:

B 0.0833
C 0.0833
D 0.0833

同理,第二三四行经过map处理后,得到:

A 0.125
D 0.125

C 0.25

B 0.125
C 0.125

系统会自动对map的输出进行shuflle处理,即对key进行排序,将相同key的value合并成一个列表。

A 0.125
B 0.0833 0.125
C 0.0833 0.25 0.125
D 0.0833 0.125 0.125

此时出现一个疑问:

为什么要进行这一步,而不是直接将相同key的value进行加和呢?

是为了MapReduce编程的可扩展性,在已知PageRank任务的前提下,我们知道要对相同key的value进行加和,如果是求最大值的任务呢?

所以把对value列表的操作交给reduce,我们要怎么操作这些列表,只要对reduce进行编写即可。

为解决网页间的终止点问题和陷阱问题,需要在reduce中进行如下处理(网页没有出链或者出链只有自己,pr值迭代后只增不减)

假设:上网者通过出链访问其他网页的概率为a,通过地址栏随机访问页面的概率为(1-a)

所以,在reduce过程,某网页pr变换为:

a *(接收其他网页发送来的pr值) + (1-a) * 1/N

经过reduce处理后,网页的pr值为
A = 0.8 * 0.125 + 0.2 * 0.25 = 0.15
B = 0.8 * (0.0833 + 0.125) + 0.2 * 0.25 = 0.216
C = 0.8 * (0.0833 + 0.25) + 0.2 * 0.25 = 0.416
D = 0.8 * (0.0833 + 0.125 + 0.125) + 0.2 * 0.25 = 0.216

此时一轮迭代结束,将reduce的结果输出
在这里插入图片描述

那么何时停止迭代呢?

要么到达最大迭代次数,要么pr值的变化已经收敛(pr值的曲线图趋于水平)

如何判断pr值收敛:

设置一个参数epi,若 max | Pij - P ij-1| < epi ,则说明pr值的变化已经收敛。

完整的程序如下:(支持eclipse Run on Hadoop,不支持yarn -jar运行,因为yarn -jar运行时,只能访问类中static变量的初始值,若在程序运行时对static变量的值进行更改,则map/reduce中得到的变量值还是旧值)

packagetest02;importjava.io.BufferedReader;importjava.io.IOException;importjava.io.InputStreamReader;importjava.util.HashMap;importjava.util.StringTokenizer;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FSDataInputStream;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importorg.apache.hadoop.util.GenericOptionsParser;publicclassPageRank_02{privatestaticintN=1;privatestaticfloat a =0.8f;privatestaticint maxIteration =40;privatestaticfloat epi =0.000001f;privatestaticHashMap<String,Float> map;privatestaticHashMap<String,Float> old_map;publicstaticvoidmain(String[] args)throwsException{Configuration conf =newConfiguration();String[] otherArgs =(newGenericOptionsParser(conf, args)).getRemainingArgs();if(otherArgs.length <2){System.err.println("Usage: <in> <out> [max_iteration] [epi]");System.err.println(" max_iteration -- Integer, default 40");System.err.println("     epi       -- Float,   default 0.000001f");System.exit(2);}String input ="";if(otherArgs.length >0)
            input = otherArgs[0];String output ="";if(otherArgs.length >1)
            output = otherArgs[1];if(otherArgs.length >2)setMaxIteration(Integer.parseInt(otherArgs[2]));if(otherArgs.length >3)setEpi(Float.parseFloat(otherArgs[3]));// 统计input文件行数,即网页个数FileSystem fs =FileSystem.get(conf);FSDataInputStream in = fs.open(newPath(input));BufferedReader d =newBufferedReader(newInputStreamReader(in));int count =0;String line;while((line = d.readLine())!=null){
            count +=1;}System.err.println("Numbers of pages: "+ count);setN(count);
        d.close();
        in.close();for(int i =0; i <getMaxIteration(); i++){
            map =newHashMap<String,Float>();Job job =Job.getInstance(conf,"page rank");
            job.setJarByClass(PageRank_02.class);
            job.setMapperClass(PageRankMapper.class);
            job.setReducerClass(PageRankReduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job,newPath(input));String new_output = output +(i +1);// 把下一次的输出设置成一个新地址。// 输出路径存在,则删除Path new_output_path =newPath(new_output);if(fs.exists(new_output_path)){
                fs.delete(new_output_path,true);}FileOutputFormat.setOutputPath(job, new_output_path);
            job.waitForCompletion(true);float max_delta =-1.0f;if(i >0){for(String key : map.keySet()){
                    max_delta =Math.max(max_delta,Math.abs(map.get(key)- old_map.get(key)));}}System.err.println("iteration: "+ i +" , MaxIteration: "+getMaxIteration());System.err.println("N: "+getN());System.err.println("a: "+getA());System.err.println("max_delta: "+ max_delta);System.err.println("epi: "+getEpi());if(max_delta < epi && i >0)break;

            old_map = map;}System.exit(0);}/* map过程 */publicstaticclassPageRankMapperextendsMapper<Object,Text,Text,Text>{privateString id;privatefloat pr;privateint count;privatefloat average_pr;publicvoidmap(Object key,Text value,Context context)throwsIOException,InterruptedException{StringTokenizer str =newStringTokenizer(value.toString());// 对value进行解析
            id = str.nextToken();// id为解析的第一个词,代表当前网页if(old_map!=null&& old_map.containsKey(id)){
                pr = old_map.get(id);}else{
                pr =1.0f/N;}
            
            count = str.countTokens();// count为剩余词的个数,代表当前网页的出链网页个数
            average_pr = pr / count;// 求出当前网页对出链网页的贡献值while(str.hasMoreTokens()){String linkid = str.nextToken();
                context.write(newText(linkid),newText(average_pr +""));// 输出的是<出链网页,获得的贡献值>}}}/* reduce过程 */publicstaticclassPageRankReduceextendsReducer<Text,Text,Text,Text>{publicvoidreduce(Text key,Iterable<Text> values,Context context)throwsIOException,InterruptedException{float pr =0;for(Text val : values){
                pr +=Float.parseFloat(val.toString());}
            pr =getA()* pr +(1-getA())*(1.0f/getN());// 加入跳转因子
            map.put(key.toString(), pr);
            context.write(key,newText(pr +""));}}publicstaticfloatgetEpi(){return epi;}publicstaticvoidsetEpi(float epi){PageRank_02.epi = epi;}publicstaticfloatgetA(){return a;}publicstaticvoidsetA(float a){PageRank_02.a = a;}publicstaticintgetMaxIteration(){return maxIteration;}publicstaticvoidsetMaxIteration(int maxIteration){PageRank_02.maxIteration = maxIteration;}publicstaticintgetN(){returnN;}publicstaticvoidsetN(int n){PageRank_02.N= n;}}

程序输入参数分别为:输入文件 输出文件 Max_iteration epi

Run Configurations设置如下
在这里插入图片描述按照如图配置运行程序

在iteration: 14时,程序退出循环

pr值变化的最大值:
max_delta = 0.0000846

设置的参数epi:
epi = 0.0001

max_delta < epi
即pr值已收敛
在这里插入图片描述

参考文献
1.MapReduce 之PageRank算法概述、设计思路和源码分析https://blog.csdn.net/u010414589/article/details/51404971


本文转载自: https://blog.csdn.net/As_zyh/article/details/122155426
版权归原作者 As_zyh 所有, 如有侵权,请联系我们删除。

“PageRank算法的MapReduce实现(输入有向图,迭代收敛)”的评论:

还没有评论