mapreduce实现"浏览该商品的人大多数还浏览了"经典应用

转自:http://blog.csdn.net/u011750989/article/details/12004065

输入:

日期    ...cookie id.        ...商品id..

xx            xx                        xx

输出:

商品id         商品id列表(按优先级排序,用逗号分隔)

xx                   xx

比如:

id1              id3,id0,id4,id2

id2             id0,id5

整个计算过程分为4步

1、提取原始日志日期,cookie id,商品id信息,按天计算,最后输出数据格式

商品id-0 商品id-1

xx           x x

这一步做了次优化,商品id-0一定比商品id-1小,为了减少存储,在最后汇总数据转置下即可

reduce做局部排序及排重

2、基于上次的结果做汇总,按天计算

商品id-0 商品id-1  关联值(关联值即同时访问这两个商品的用户数)

xx             x x                xx

3、汇总最近三个月数据,同时考虑时间衰减,时间越久关联值的贡献越低,最后输出两两商品的关联值(包括转置后)

4、行列转换,生成最后要的推荐结果数据,按关联值排序生成

第一个MR

[java] view plaincopy

  1. import java.io.IOException;
  2. import java.util.ArrayList;
  3. import org.apache.hadoop.conf.Configuration;
  4. import org.apache.hadoop.fs.FileSystem;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.LongWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.io.WritableComparable;
  9. import org.apache.hadoop.io.WritableComparator;
  10. import org.apache.hadoop.mapreduce.Job;
  11. import org.apache.hadoop.mapreduce.Mapper;
  12. import org.apache.hadoop.mapreduce.Partitioner;
  13. import org.apache.hadoop.mapreduce.Reducer;
  14. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  15. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  16. import org.apache.hadoop.util.GenericOptionsParser;
  17. import org.apache.log4j.Logger;
  18. /*
  19. * 输入:原始数据,会有重复
  20. *日期 cookie 楼盘id
  21. *
  22. * 输出:
  23. * 日期 楼盘id1 楼盘id2  //楼盘id1一定小于楼盘id2 ,按日期 cookie进行分组
  24. *
  25. */
  26. public class HouseMergeAndSplit {
  27. public static class Partitioner1 extends Partitioner<TextPair, Text> {
  28. @Override
  29. public int getPartition(TextPair key, Text value, int numParititon) {
  30. return Math.abs((new Text(key.getFirst().toString()+key.getSecond().toString())).hashCode() * 127) % numParititon;
  31. }
  32. }
  33. public static class Comp1 extends WritableComparator {
  34. public Comp1() {
  35. super(TextPair.class, true);
  36. }
  37. @SuppressWarnings("unchecked")
  38. public int compare(WritableComparable a, WritableComparable b) {
  39. TextPair t1 = (TextPair) a;
  40. TextPair t2 = (TextPair) b;
  41. int comp= t1.getFirst().compareTo(t2.getFirst());
  42. if (comp!=0)
  43. return comp;
  44. return t1.getSecond().compareTo(t2.getSecond());
  45. }
  46. }
  47. public static class TokenizerMapper
  48. extends Mapper<LongWritable, Text, TextPair, Text>{
  49. Text val=new Text("test");
  50. public void map(LongWritable key, Text value, Context context
  51. ) throws IOException, InterruptedException {
  52. String s[]=value.toString().split("\001");
  53. TextPair tp=new TextPair(s[0],s[1],s[4]+s[3]); //thedate cookie city+houseid
  54. context.write(tp, val);
  55. }
  56. }
  57. public static class IntSumReducer
  58. extends Reducer<TextPair,Text,Text,Text> {
  59. private static String comparedColumn[] = new String[3];
  60. ArrayList<String> houselist= new ArrayList<String>();
  61. private static Text keyv = new Text();
  62. private static Text valuev = new Text();
  63. static Logger logger = Logger.getLogger(HouseMergeAndSplit.class.getName());
  64. public void reduce(TextPair key, Iterable<Text> values,
  65. Context context
  66. ) throws IOException, InterruptedException {
  67. houselist.clear();
  68. String thedate=key.getFirst().toString();
  69. String cookie=key.getSecond().toString();
  70. for (int i=0;i<3;i++)
  71. comparedColumn[i]="";
  72. //first+second为分组键,每次不同重新调用reduce函数
  73. for (Text val:values)
  74. {
  75. if (thedate.equals(comparedColumn[0]) && cookie.equals(comparedColumn[1])&&  !key.getThree().toString().equals(comparedColumn[2]))
  76. {
  77. // context.write(new Text(key.getFirst()+" "+key.getSecond().toString()), new Text(key.getThree().toString()+" first"+ " "+comparedColumn[0]+" "+comparedColumn[1]+" "+comparedColumn[2]));
  78. houselist.add(key.getThree().toString());
  79. comparedColumn[0]=key.getFirst().toString();
  80. comparedColumn[1]=key.getSecond().toString();
  81. comparedColumn[2]=key.getThree().toString();
  82. }
  83. if (!thedate.equals(comparedColumn[0])||!cookie.equals(comparedColumn[1]))
  84. {
  85. //  context.write(new Text(key.getFirst()+" "+key.getSecond().toString()), new Text(key.getThree().toString()+" second"+ " "+comparedColumn[0]+" "+comparedColumn[1]+" "+comparedColumn[2]));
  86. houselist.add(key.getThree().toString());
  87. comparedColumn[0]=key.getFirst().toString();
  88. comparedColumn[1]=key.getSecond().toString();
  89. comparedColumn[2]=key.getThree().toString();
  90. }
  91. }
  92. keyv.set(comparedColumn[0]); //日期
  93. //valuev.set(houselist.toString());
  94. //logger.info(houselist.toString());
  95. //context.write(keyv,valuev);
  96. for (int i=0;i<houselist.size()-1;i++)
  97. {
  98. for (int j=i+1;j<houselist.size();j++)
  99. {    valuev.set(houselist.get(i)+"  "+houselist.get(j)); //关联的楼盘
  100. context.write(keyv,valuev);
  101. }
  102. }
  103. }
  104. }
  105. public static void main(String[] args) throws Exception {
  106. Configuration conf = new Configuration();
  107. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  108. if (otherArgs.length != 2) {
  109. System.err.println("Usage: wordcount <in> <out>");
  110. System.exit(2);
  111. }
  112. FileSystem fstm = FileSystem.get(conf);
  113. Path outDir = new Path(otherArgs[1]);
  114. fstm.delete(outDir, true);
  115. conf.set("mapred.textoutputformat.separator", "\t"); //reduce输出时key value中间的分隔符
  116. Job job = new Job(conf, "HouseMergeAndSplit");
  117. job.setNumReduceTasks(4);
  118. job.setJarByClass(HouseMergeAndSplit.class);
  119. job.setMapperClass(TokenizerMapper.class);
  120. job.setMapOutputKeyClass(TextPair.class);
  121. job.setMapOutputValueClass(Text.class);
  122. // 设置partition
  123. job.setPartitionerClass(Partitioner1.class);
  124. // 在分区之后按照指定的条件分组
  125. job.setGroupingComparatorClass(Comp1.class);
  126. // 设置reduce
  127. // 设置reduce的输出
  128. job.setReducerClass(IntSumReducer.class);
  129. job.setOutputKeyClass(Text.class);
  130. job.setOutputValueClass(Text.class);
  131. //job.setNumReduceTasks(18);
  132. FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  133. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  134. System.exit(job.waitForCompletion(true) ? 0 : 1);
  135. }
  136. }

TextPair

[java] view plaincopy

  1. import java.io.DataInput;
  2. import java.io.DataOutput;
  3. import java.io.IOException;
  4. import org.apache.hadoop.io.Text;
  5. import org.apache.hadoop.io.WritableComparable;
  6. public class TextPair implements WritableComparable<TextPair> {
  7. private Text first;
  8. private Text second;
  9. private Text three;
  10. public TextPair() {
  11. set(new Text(), new Text(),new Text());
  12. }
  13. public TextPair(String first, String second,String three) {
  14. set(new Text(first), new Text(second),new Text(three));
  15. }
  16. public TextPair(Text first, Text second,Text Three) {
  17. set(first, second,three);
  18. }
  19. public void set(Text first, Text second,Text three) {
  20. this.first = first;
  21. this.second = second;
  22. this.three=three;
  23. }
  24. public Text getFirst() {
  25. return first;
  26. }
  27. public Text getSecond() {
  28. return second;
  29. }
  30. public Text getThree() {
  31. return three;
  32. }
  33. public void write(DataOutput out) throws IOException {
  34. first.write(out);
  35. second.write(out);
  36. three.write(out);
  37. }
  38. public void readFields(DataInput in) throws IOException {
  39. first.readFields(in);
  40. second.readFields(in);
  41. three.readFields(in);
  42. }
  43. public int compareTo(TextPair tp) {
  44. int cmp = first.compareTo(tp.first);
  45. if (cmp != 0) {
  46. return cmp;
  47. }
  48. cmp= second.compareTo(tp.second);
  49. if (cmp != 0) {
  50. return cmp;
  51. }
  52. return three.compareTo(tp.three);
  53. }
  54. }

TextPairSecond

[java] view plaincopy

  1. import java.io.DataInput;
  2. import java.io.DataOutput;
  3. import java.io.IOException;
  4. import org.apache.hadoop.io.FloatWritable;
  5. import org.apache.hadoop.io.Text;
  6. import org.apache.hadoop.io.WritableComparable;
  7. public class TextPairSecond implements WritableComparable<TextPairSecond> {
  8. private Text first;
  9. private FloatWritable second;
  10. public TextPairSecond() {
  11. set(new Text(), new FloatWritable());
  12. }
  13. public TextPairSecond(String first, float second) {
  14. set(new Text(first), new FloatWritable(second));
  15. }
  16. public TextPairSecond(Text first, FloatWritable second) {
  17. set(first, second);
  18. }
  19. public void set(Text first, FloatWritable second) {
  20. this.first = first;
  21. this.second = second;
  22. }
  23. public Text getFirst() {
  24. return first;
  25. }
  26. public FloatWritable getSecond() {
  27. return second;
  28. }
  29. public void write(DataOutput out) throws IOException {
  30. first.write(out);
  31. second.write(out);
  32. }
  33. public void readFields(DataInput in) throws IOException {
  34. first.readFields(in);
  35. second.readFields(in);
  36. }
  37. public int compareTo(TextPairSecond tp) {
  38. int cmp = first.compareTo(tp.first);
  39. if (cmp != 0) {
  40. return cmp;
  41. }
  42. return second.compareTo(tp.second);
  43. }
  44. }

第二个MR

[java] view plaincopy

  1. import java.io.IOException;
  2. import java.text.SimpleDateFormat;
  3. import java.util.ArrayList;
  4. import java.util.Date;
  5. import org.apache.hadoop.conf.Configuration;
  6. import org.apache.hadoop.fs.FileSystem;
  7. import org.apache.hadoop.fs.Path;
  8. import org.apache.hadoop.io.IntWritable;
  9. import org.apache.hadoop.io.LongWritable;
  10. import org.apache.hadoop.io.NullWritable;
  11. import org.apache.hadoop.io.Text;
  12. import org.apache.hadoop.io.WritableComparable;
  13. import org.apache.hadoop.io.WritableComparator;
  14. import org.apache.hadoop.mapred.OutputCollector;
  15. import org.apache.hadoop.mapreduce.Job;
  16. import org.apache.hadoop.mapreduce.Mapper;
  17. import org.apache.hadoop.mapreduce.Partitioner;
  18. import org.apache.hadoop.mapreduce.Reducer;
  19. import org.apache.hadoop.mapreduce.Mapper.Context;
  20. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  21. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  22. import org.apache.hadoop.util.GenericOptionsParser;
  23. import org.apache.log4j.Logger;
  24. /*
  25. *  统计楼盘之间共同出现的次数
  26. * 输入:
  27. * 日期 楼盘1 楼盘2
  28. *
  29. * 输出:
  30. * 日期 楼盘1 楼盘2 共同出现的次数
  31. *
  32. */
  33. public class HouseCount {
  34. public static class TokenizerMapper
  35. extends Mapper<LongWritable, Text, Text, IntWritable>{
  36. IntWritable iw=new IntWritable(1);
  37. public void map(LongWritable key, Text value, Context context
  38. ) throws IOException, InterruptedException {
  39. context.write(value, iw);
  40. }
  41. }
  42. public static class IntSumReducer
  43. extends Reducer<Text,IntWritable,Text,IntWritable> {
  44. IntWritable result=new IntWritable();
  45. public void reduce(Text key, Iterable<IntWritable> values,
  46. Context context
  47. ) throws IOException, InterruptedException {
  48. int sum=0;
  49. for (IntWritable iw:values)
  50. {
  51. sum+=iw.get();
  52. }
  53. result.set(sum);
  54. context.write(key, result) ;
  55. }
  56. }
  57. public static void main(String[] args) throws Exception {
  58. Configuration conf = new Configuration();
  59. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  60. if (otherArgs.length != 2) {
  61. System.err.println("Usage: wordcount <in> <out>");
  62. System.exit(2);
  63. }
  64. FileSystem fstm = FileSystem.get(conf);
  65. Path outDir = new Path(otherArgs[1]);
  66. fstm.delete(outDir, true);
  67. conf.set("mapred.textoutputformat.separator", "\t"); //reduce输出时key value中间的分隔符
  68. Job job = new Job(conf, "HouseCount");
  69. job.setNumReduceTasks(2);
  70. job.setJarByClass(HouseCount.class);
  71. job.setMapperClass(TokenizerMapper.class);
  72. job.setMapOutputKeyClass(Text.class);
  73. job.setMapOutputValueClass(IntWritable.class);
  74. // 设置reduce
  75. // 设置reduce的输出
  76. job.setReducerClass(IntSumReducer.class);
  77. job.setOutputKeyClass(Text.class);
  78. job.setOutputValueClass(IntWritable.class);
  79. //job.setNumReduceTasks(18);
  80. FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  81. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  82. System.exit(job.waitForCompletion(true) ? 0 : 1);
  83. }
  84. }

第三个MR

[java] view plaincopy

  1. import java.io.IOException;
  2. import java.text.ParseException;
  3. import java.text.SimpleDateFormat;
  4. import java.util.ArrayList;
  5. import java.util.Calendar;
  6. import java.util.Date;
  7. import org.apache.hadoop.conf.Configuration;
  8. import org.apache.hadoop.fs.FileSystem;
  9. import org.apache.hadoop.fs.Path;
  10. import org.apache.hadoop.io.FloatWritable;
  11. import org.apache.hadoop.io.IntWritable;
  12. import org.apache.hadoop.io.LongWritable;
  13. import org.apache.hadoop.io.NullWritable;
  14. import org.apache.hadoop.io.Text;
  15. import org.apache.hadoop.io.WritableComparable;
  16. import org.apache.hadoop.io.WritableComparator;
  17. import org.apache.hadoop.mapred.OutputCollector;
  18. import org.apache.hadoop.mapreduce.Job;
  19. import org.apache.hadoop.mapreduce.Mapper;
  20. import org.apache.hadoop.mapreduce.Partitioner;
  21. import org.apache.hadoop.mapreduce.Reducer;
  22. import org.apache.hadoop.mapreduce.Mapper.Context;
  23. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  24. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  25. import org.apache.hadoop.util.GenericOptionsParser;
  26. import org.apache.log4j.Logger;
  27. /*
  28. * 汇总近三个月统计楼盘之间共同出现的次数,考虑衰减系数, 并最后a b 转成 b a输出一次
  29. * 输入:
  30. * 日期  楼盘1 楼盘2 共同出现的次数
  31. *
  32. * 输出
  33. * 楼盘1 楼盘2 共同出现的次数(考虑了衰减系数,每天的衰减系数不一样)
  34. *
  35. */
  36. public class HouseCountHz {
  37. public static class HouseCountHzMapper
  38. extends Mapper<LongWritable, Text, Text, FloatWritable>{
  39. Text keyv=new Text();
  40. FloatWritable valuev=new FloatWritable();
  41. public void map(LongWritable key, Text value, Context context
  42. ) throws IOException, InterruptedException {
  43. String[] s=value.toString().split("\t");
  44. keyv.set(s[1]+" "+s[2]);//楼盘1,楼盘2
  45. Calendar date1=Calendar.getInstance();
  46. Calendar d2=Calendar.getInstance();
  47. Date b = null;
  48. SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd");
  49. try {
  50. b=sdf.parse(s[0]);
  51. } catch (ParseException e) {
  52. e.printStackTrace();
  53. }
  54. d2.setTime(b);
  55. long n=date1.getTimeInMillis();
  56. long birth=d2.getTimeInMillis();
  57. long sss=n-birth;
  58. int day=(int)((sss)/(3600*24*1000)); //该条记录的日期与当前日期的日期差
  59. float factor=1/(1+(float)(day-1)/10); //衰减系数
  60. valuev.set(Float.parseFloat(s[3])*factor);
  61. context.write(keyv, valuev);
  62. }
  63. }
  64. public static class HouseCountHzReducer
  65. extends Reducer<Text,FloatWritable,Text,FloatWritable> {
  66. FloatWritable result=new FloatWritable();
  67. Text keyreverse=new Text();
  68. public void reduce(Text key, Iterable<FloatWritable> values,
  69. Context context
  70. ) throws IOException, InterruptedException {
  71. float sum=0;
  72. for (FloatWritable iw:values)
  73. {
  74. sum+=iw.get();
  75. }
  76. result.set(sum);
  77. String[] keys=key.toString().split("\t");
  78. keyreverse.set(keys[1]+"   "+keys[0]);
  79. context.write(key, result) ;
  80. context.write(keyreverse, result)  ;
  81. }
  82. }
  83. public static void main(String[] args) throws Exception {
  84. Configuration conf = new Configuration();
  85. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
  86. if (otherArgs.length != 2) {
  87. System.err.println("Usage: wordcount <in> <out>");
  88. System.exit(2);
  89. }
  90. FileSystem fstm = FileSystem.get(conf);
  91. Path outDir = new Path(otherArgs[1]);
  92. fstm.delete(outDir, true);
  93. conf.set("mapred.textoutputformat.separator", "\t"); //reduce输出时key value中间的分隔符
  94. Job job = new Job(conf, "HouseCountHz");
  95. job.setNumReduceTasks(2);
  96. job.setJarByClass(HouseCountHz.class);
  97. job.setMapperClass(HouseCountHzMapper.class);
  98. job.setMapOutputKeyClass(Text.class);
  99. job.setMapOutputValueClass(FloatWritable.class);
  100. // 设置reduce
  101. // 设置reduce的输出
  102. job.setReducerClass(HouseCountHzReducer.class);
  103. job.setOutputKeyClass(Text.class);
  104. job.setOutputValueClass(FloatWritable.class);
  105. //job.setNumReduceTasks(18);
  106. FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  107. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  108. System.exit(job.waitForCompletion(true) ? 0 : 1);
  109. }
  110. }

第四个MR

[java] view plaincopy

    1. import java.io.IOException;
    2. import java.util.Iterator;
    3. import org.apache.hadoop.conf.Configuration;
    4. import org.apache.hadoop.fs.FileSystem;
    5. import org.apache.hadoop.fs.Path;
    6. import org.apache.hadoop.io.FloatWritable;
    7. import org.apache.hadoop.io.LongWritable;
    8. import org.apache.hadoop.io.Text;
    9. import org.apache.hadoop.io.WritableComparable;
    10. import org.apache.hadoop.io.WritableComparator;
    11. import org.apache.hadoop.mapreduce.Job;
    12. import org.apache.hadoop.mapreduce.Mapper;
    13. import org.apache.hadoop.mapreduce.Partitioner;
    14. import org.apache.hadoop.mapreduce.Reducer;
    15. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    16. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    17. import org.apache.hadoop.util.GenericOptionsParser;
    18. /*
    19. * 输入数据:
    20. * 楼盘1 楼盘2 共同出现的次数
    21. *
    22. * 输出数据
    23. *  楼盘1 楼盘2,楼盘3,楼盘4 (按次数排序)
    24. */
    25. public class HouseRowToCol {
    26. public static class Partitioner1 extends Partitioner<TextPairSecond, Text> {
    27. @Override
    28. //分区
    29. public int getPartition(TextPairSecond key, Text value, int numParititon) {
    30. return Math.abs((new Text(key.getFirst().toString()+key.getSecond().toString())).hashCode() * 127) % numParititon;
    31. }
    32. }
    33. //分组
    34. public static class Comp1 extends WritableComparator {
    35. public Comp1() {
    36. super(TextPairSecond.class, true);
    37. }
    38. @SuppressWarnings("unchecked")
    39. public int compare(WritableComparable a, WritableComparable b) {
    40. TextPairSecond t1 = (TextPairSecond) a;
    41. TextPairSecond t2 = (TextPairSecond) b;
    42. return t1.getFirst().compareTo(t2.getFirst());
    43. }
    44. }
    45. //排序
    46. public static class KeyComp extends WritableComparator {
    47. public KeyComp() {
    48. super(TextPairSecond.class, true);
    49. }
    50. @SuppressWarnings("unchecked")
    51. public int compare(WritableComparable a, WritableComparable b) {
    52. TextPairSecond t1 = (TextPairSecond) a;
    53. TextPairSecond t2 = (TextPairSecond) b;
    54. int comp= t1.getFirst().compareTo(t2.getFirst());
    55. if (comp!=0)
    56. return comp;
    57. return -t1.getSecond().compareTo(t2.getSecond());
    58. }
    59. }
    60. public static class HouseRowToColMapper
    61. extends Mapper<LongWritable, Text, TextPairSecond, Text>{
    62. Text houseid1=new Text();
    63. Text houseid2=new Text();
    64. FloatWritable weight=new FloatWritable();
    65. public void map(LongWritable key, Text value, Context context
    66. ) throws IOException, InterruptedException {
    67. String s[]=value.toString().split("\t");
    68. weight.set(Float.parseFloat(s[2]));
    69. houseid1.set(s[0]);
    70. houseid2.set(s[1]);
    71. TextPairSecond tp=new TextPairSecond(houseid1,weight);
    72. context.write(tp, houseid2);
    73. }
    74. }
    75. public static class HouseRowToColReducer
    76. extends Reducer<TextPairSecond,Text,Text,Text> {
    77. Text valuev=new Text();
    78. public void reduce(TextPairSecond key, Iterable<Text> values,
    79. Context context
    80. ) throws IOException, InterruptedException {
    81. Text keyv=key.getFirst();
    82. Iterator<Text> it=values.iterator();
    83. StringBuilder sb=new StringBuilder(it.next().toString());
    84. while(it.hasNext())
    85. {
    86. sb.append(","+it.next().toString());
    87. }
    88. valuev.set(sb.toString());
    89. context.write(keyv, valuev);
    90. }
    91. }
    92. public static void main(String[] args) throws Exception {
    93. Configuration conf = new Configuration();
    94. String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    95. if (otherArgs.length != 2) {
    96. System.err.println("Usage: wordcount <in> <out>");
    97. System.exit(2);
    98. }
    99. FileSystem fstm = FileSystem.get(conf);
    100. Path outDir = new Path(otherArgs[1]);
    101. fstm.delete(outDir, true);
    102. conf.set("mapred.textoutputformat.separator", "\t"); //reduce输出时key value中间的分隔符
    103. Job job = new Job(conf, "HouseRowToCol");
    104. job.setNumReduceTasks(4);
    105. job.setJarByClass(HouseRowToCol.class);
    106. job.setMapperClass(HouseRowToColMapper.class);
    107. job.setMapOutputKeyClass(TextPairSecond.class);
    108. job.setMapOutputValueClass(Text.class);
    109. // 设置partition
    110. job.setPartitionerClass(Partitioner1.class);
    111. // 在分区之后按照指定的条件分组
    112. job.setGroupingComparatorClass(Comp1.class);
    113. job.setSortComparatorClass(KeyComp.class);
    114. // 设置reduce
    115. // 设置reduce的输出
    116. job.setReducerClass(HouseRowToColReducer.class);
    117. job.setOutputKeyClass(Text.class);
    118. job.setOutputValueClass(Text.class);
    119. //job.setNumReduceTasks(18);
    120. FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    121. FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    122. System.exit(job.waitForCompletion(true) ? 0 : 1);
    123. }
    124. }
时间: 01-05

mapreduce实现"浏览该商品的人大多数还浏览了"经典应用的相关文章

使用 Redis 缓存来实现用户最近浏览的商品列表

一.如何使用 Redis 来缓存来实现最近浏览的商品列表? 首先我们要确定一个两个点,最近浏览的商品肯定是一个存一个取的操作. 那么就可以确定以下几个问题: 最近浏览的记录肯定是要有失效时间的 这里可以使用缓存(Redis等),缓存可以设置失效时间(最大设置为一个月). 如果使用关系型数据库,还需要定时清楚,就很不符合实际需求. 最近浏览记录肯定是要有个数限制的,不可能记录所有的浏览记录 如果使用Redis来实现的话,Redis 中有 LTRM 来修建,以保证存储的浏览条数. 需要在哪里添加保存

jsp写商品显示信息和cookie浏览记录,网上看到的好东西,记录一下。

这是本次项目结果,商品展示页面和浏览历史记录页面. 1,先从数据库入手:创建商品数据库. CREATE TABLE `items` (   `id` int(11) NOT NULL auto_increment,   `name` varchar(50) default NULL,   `city` varchar(50) default NULL,   `price` int(11) default NULL,   `number` int(11) default NULL,   `pict

2018-08-07 期 MapReduce模拟实现热销商品排行

package cn.sjq.mr.sort; import java.io.IOException; import java.util.Comparator; import java.util.TreeSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoo

python —— 小小商城 (用户注册、登陆、浏览购买商品...)

#!/usr/bin/env pyhton # -*- conding:utf-8 -*- # 文件夹自己建 import os import sys import time from datetime import datetime CUN_NAME = None CARS = {} dong = '冻结账户的文件.txt' def user_login(func): def inner(): if not CUN_NAME: print('*************请先登录*********

猜你喜欢-----推荐系统原理介绍

写在正文之前   最近在做推荐系统,在项目组内做了一个分享.今天有些时间,就将逻辑梳理一遍,将ppt内容用文字沉淀下来,便于接下来对推荐系统的进一步研究.推荐系统确实是极度复杂,要走的路还很长. A First Glance   为什么需要推荐系统--信息过载   随着互联网行业的井喷式发展,获取信息的方式越来越多,人们从主动获取信息逐渐变成了被动接受信息,信息量也在以几何倍数式爆发增长.举一个例子,PC时代用google reader,常常有上千条未读博客更新:如今的微信公众号,也有大量的红点

易买网吐血文档(图片拖不上来,要文档留下联系)

易买网文档 1.项目感想 通过这十几天的时间,我感觉到了,开发项目这东西需要一个全方面的准备工作. 自己的思考一定要全面性,宁可自己多发时间思考好问题,也不要像迷路的小羔羊! 2.实现三级分类 1.实现数据库设计 2.无线级的思想 每个子级只需要知道他的父类ID就行 通过知道他的父类ID可以给他封装给有规律的集合 3.通过servlet传到前台进行给它拆分 3.实现分类多条件查询分页 1.sql语句的编写 SELECT * FROM trade WHERE (leve2=? OR leve3=?

Java遇见HTML——JSP篇之商品浏览记录的实现

一.项目总体介绍 使用Cookie实现商品浏览记录. 要实现这个程序采取的是Model1(Jsp+JavaBean)架构实现,具体步骤: 首先要有个数据库,商品表,操作数据库的一个类DBHelper类 创建实体类(与数据库表一一对应) 创建业务逻辑类(DAO) 创建页面层 二.DBHelper类设计 1 package util; 2 3 import java.sql.Connection; 4 import java.sql.DriverManager; 5 6 public class D

显示商品的浏览记录

package com.cn.cookie; import java.io.IOException; import java.util.LinkedHashMap; import java.util.Map; import javax.servlet.ServletException; import javax.servlet.http.Cookie; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpSer

利用Cookie,实现动态显示用户曾经浏览过的商品

1.在DisPlay的servlet中主要做两件事情 1.1显示在出售的商品 1.2显示顾客曾经浏览的商品 DisPlay的代码如下: package com.baowei.cookie; import java.io.IOException; import java.io.PrintWriter; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import java.util.Set;