本文共 11726 字,大约阅读时间需要 39 分钟。
package cjtriangle;import java.io.IOException;import java.util.ArrayList;import java.util.Iterator;import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class CJTriangle_Step1 extends Configured implements Tool { public static class CJTriangleMapper extends Mapper{ @Override protected void map( LongWritable key, Text value, Context context) throws IOException, InterruptedException { String tokens[] = value.toString().split(","); if(tokens == null || tokens.length < 2) return; context.write(new Text(tokens[0]), new Text(tokens[1])); context.write(new Text(tokens[1]), new Text(tokens[0])); } } public static class CJTrianglerReducer extends Reducer { @Override protected void reduce( Text key, Iterable values, Context context) throws IOException, InterruptedException { System.out.println("---------------------------------CJTrianglerReducer----------------------------------"); System.out.println("key =" + key ); System.out.println("values:"); List list = new ArrayList (); Iterator iterator = values.iterator(); while(iterator.hasNext()) { Text value = iterator.next(); list.add(value.toString()); } for(int i = 0; i < list.size(); i ++) { System.out.println("value:" + list.get(i)); context.write(new Text(key + "," + list.get(i)), new Text("" + -1)); for(int j = i+1; j < list.size(); j ++) { context.write(new Text(list.get(j) + "," + list.get(i)), new Text("" + key.toString())); } } System.out.println("---------------------------CJTrianglerReducer END-----------------------------------------"); } } public static void main(String[] args) throws Exception { args = new String[2]; args[0] = "/media/chenjie/0009418200012FF3/ubuntu/trangel.txt"; args[1] = "/media/chenjie/0009418200012FF3/ubuntu/trangel1";; int jobStatus = submitJob(args); System.exit(jobStatus); } public static int submitJob(String[] args) throws Exception { int jobStatus = ToolRunner.run(new CJTriangle_Step1(), args); return jobStatus; } @SuppressWarnings("deprecation") @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = new Job(conf); job.setJobName("trangel1"); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(CJTriangleMapper.class); job.setReducerClass(CJTrianglerReducer.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); FileSystem fs = FileSystem.get(conf); Path outPath = new Path(args[1]); if(fs.exists(outPath)) { fs.delete(outPath, true); } boolean status = job.waitForCompletion(true); return status ? 0 : 1; } }
package cjtriangle;import java.io.IOException;import java.util.ArrayList;import java.util.Collections;import java.util.Iterator;import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import edu.umd.cloud9.io.pair.PairOfLongs;public class CJTriangle_Step2 extends Configured implements Tool { public static class CJTriangleMapper extends Mapper{ @Override protected void map( LongWritable key, Text value, Context context) throws IOException, InterruptedException { String tokens[] = value.toString().split("\t"); if(tokens == null || tokens.length < 2) return; context.write(new PairOfLongs(Long.valueOf(tokens[0].split(",")[0]),Long.valueOf(tokens[0].split(",")[1])), new LongWritable(Long.valueOf(tokens[1]))); } } public static class CJTrianglerReducer extends Reducer { @Override protected void reduce( PairOfLongs key, Iterable values, Context context) throws IOException, InterruptedException { System.out.println("---------------------------------CJTrianglerReducer----------------------------------"); System.out.println("key:" + key); List list = new ArrayList (); Iterator iterator = values.iterator(); while(iterator.hasNext()) { LongWritable value = iterator.next(); list.add(value.get()); } boolean flag = false; for(int i = 0; i < list.size(); i ++) { System.out.println("value:" + list.get(i)); if(list.get(i) == -1) { flag = true; } } if(flag) { for(Long value : list) { if(value != -1) { String tri = buildTriangle(key.getLeftElement(),key.getRightElement(),value); System.out.println("生成三角形:" + tri); context.write(NullWritable.get(), new Text(tri)); } } } System.out.println("---------------------------CJTrianglerReducer END-----------------------------------------"); } private String buildTriangle(long e1, long e2, Long e3) { List list = new ArrayList (); list.add(e1); list.add(e2); list.add(e3); Collections.sort(list); return list.get(0) + "," + list.get(1) + "," + list.get(2); } } public static void main(String[] args) throws Exception { args = new String[2]; args[0] = "/media/chenjie/0009418200012FF3/ubuntu/trangel1/part-r-00000"; args[1] = "/media/chenjie/0009418200012FF3/ubuntu/trangel2";; int jobStatus = submitJob(args); System.exit(jobStatus); } public static int submitJob(String[] args) throws Exception { int jobStatus = ToolRunner.run(new CJTriangle_Step2(), args); return jobStatus; } @SuppressWarnings("deprecation") @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = new Job(conf); job.setJobName("trangel1"); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapOutputKeyClass(PairOfLongs.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(CJTriangleMapper.class); job.setReducerClass(CJTrianglerReducer.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); FileSystem fs = FileSystem.get(conf); Path outPath = new Path(args[1]); if(fs.exists(outPath)) { fs.delete(outPath, true); } boolean status = job.waitForCompletion(true); return status ? 0 : 1; } }
package cjtriangle;import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class CJTriangle_Step3 extends Configured implements Tool { public static class CJTriangleMapper extends Mapper{ @Override protected void map( LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(new Text(value.toString()), new LongWritable(1)); } } public static class CJTrianglerReducer extends Reducer { @Override protected void reduce( Text key, Iterable values, Context context) throws IOException, InterruptedException { System.out.println("---------------------------------CJTrianglerReducer----------------------------------"); System.out.println("key:" + key); Iterator iterator = values.iterator(); while(iterator.hasNext()) { LongWritable value = iterator.next(); System.out.println("value:" + value.toString()); } context.write(NullWritable.get(), new Text(key.toString())); System.out.println("---------------------------CJTrianglerReducer END-----------------------------------------"); } } public static void main(String[] args) throws Exception { args = new String[2]; args[0] = "/media/chenjie/0009418200012FF3/ubuntu/trangel2/part-r-00000"; args[1] = "/media/chenjie/0009418200012FF3/ubuntu/trangel3";; int jobStatus = submitJob(args); System.exit(jobStatus); } public static int submitJob(String[] args) throws Exception { int jobStatus = ToolRunner.run(new CJTriangle_Step3(), args); return jobStatus; } @SuppressWarnings("deprecation") @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = new Job(conf); job.setJobName("trangel1"); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(CJTriangleMapper.class); job.setReducerClass(CJTrianglerReducer.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); FileSystem fs = FileSystem.get(conf); Path outPath = new Path(args[1]); if(fs.exists(outPath)) { fs.delete(outPath, true); } boolean status = job.waitForCompletion(true); return status ? 0 : 1; } }