博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop/MapReduce 查找、统计和列出大图中的所有三角形
阅读量:2489 次
发布时间:2019-05-11

本文共 11726 字,大约阅读时间需要 39 分钟。

package cjtriangle;import java.io.IOException;import java.util.ArrayList;import java.util.Iterator;import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class CJTriangle_Step1 extends Configured implements Tool  {        public static class CJTriangleMapper extends Mapper
{ @Override protected void map( LongWritable key, Text value, Context context) throws IOException, InterruptedException { String tokens[] = value.toString().split(","); if(tokens == null || tokens.length < 2) return; context.write(new Text(tokens[0]), new Text(tokens[1])); context.write(new Text(tokens[1]), new Text(tokens[0])); } } public static class CJTrianglerReducer extends Reducer
{ @Override protected void reduce( Text key, Iterable
values, Context context) throws IOException, InterruptedException { System.out.println("---------------------------------CJTrianglerReducer----------------------------------"); System.out.println("key =" + key ); System.out.println("values:"); List
list = new ArrayList
(); Iterator
iterator = values.iterator(); while(iterator.hasNext()) { Text value = iterator.next(); list.add(value.toString()); } for(int i = 0; i < list.size(); i ++) { System.out.println("value:" + list.get(i)); context.write(new Text(key + "," + list.get(i)), new Text("" + -1)); for(int j = i+1; j < list.size(); j ++) { context.write(new Text(list.get(j) + "," + list.get(i)), new Text("" + key.toString())); } } System.out.println("---------------------------CJTrianglerReducer END-----------------------------------------"); } } public static void main(String[] args) throws Exception { args = new String[2]; args[0] = "/media/chenjie/0009418200012FF3/ubuntu/trangel.txt"; args[1] = "/media/chenjie/0009418200012FF3/ubuntu/trangel1";; int jobStatus = submitJob(args); System.exit(jobStatus); } public static int submitJob(String[] args) throws Exception { int jobStatus = ToolRunner.run(new CJTriangle_Step1(), args); return jobStatus; } @SuppressWarnings("deprecation") @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = new Job(conf); job.setJobName("trangel1"); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(CJTriangleMapper.class); job.setReducerClass(CJTrianglerReducer.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); FileSystem fs = FileSystem.get(conf); Path outPath = new Path(args[1]); if(fs.exists(outPath)) { fs.delete(outPath, true); } boolean status = job.waitForCompletion(true); return status ? 0 : 1; } }

package cjtriangle;import java.io.IOException;import java.util.ArrayList;import java.util.Collections;import java.util.Iterator;import java.util.List;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;import edu.umd.cloud9.io.pair.PairOfLongs;public class CJTriangle_Step2 extends Configured implements Tool  {        public static class CJTriangleMapper extends Mapper
{ @Override protected void map( LongWritable key, Text value, Context context) throws IOException, InterruptedException { String tokens[] = value.toString().split("\t"); if(tokens == null || tokens.length < 2) return; context.write(new PairOfLongs(Long.valueOf(tokens[0].split(",")[0]),Long.valueOf(tokens[0].split(",")[1])), new LongWritable(Long.valueOf(tokens[1]))); } } public static class CJTrianglerReducer extends Reducer
{ @Override protected void reduce( PairOfLongs key, Iterable
values, Context context) throws IOException, InterruptedException { System.out.println("---------------------------------CJTrianglerReducer----------------------------------"); System.out.println("key:" + key); List
list = new ArrayList
(); Iterator
iterator = values.iterator(); while(iterator.hasNext()) { LongWritable value = iterator.next(); list.add(value.get()); } boolean flag = false; for(int i = 0; i < list.size(); i ++) { System.out.println("value:" + list.get(i)); if(list.get(i) == -1) { flag = true; } } if(flag) { for(Long value : list) { if(value != -1) { String tri = buildTriangle(key.getLeftElement(),key.getRightElement(),value); System.out.println("生成三角形:" + tri); context.write(NullWritable.get(), new Text(tri)); } } } System.out.println("---------------------------CJTrianglerReducer END-----------------------------------------"); } private String buildTriangle(long e1, long e2, Long e3) { List
list = new ArrayList
(); list.add(e1); list.add(e2); list.add(e3); Collections.sort(list); return list.get(0) + "," + list.get(1) + "," + list.get(2); } } public static void main(String[] args) throws Exception { args = new String[2]; args[0] = "/media/chenjie/0009418200012FF3/ubuntu/trangel1/part-r-00000"; args[1] = "/media/chenjie/0009418200012FF3/ubuntu/trangel2";; int jobStatus = submitJob(args); System.exit(jobStatus); } public static int submitJob(String[] args) throws Exception { int jobStatus = ToolRunner.run(new CJTriangle_Step2(), args); return jobStatus; } @SuppressWarnings("deprecation") @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = new Job(conf); job.setJobName("trangel1"); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapOutputKeyClass(PairOfLongs.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(CJTriangleMapper.class); job.setReducerClass(CJTrianglerReducer.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); FileSystem fs = FileSystem.get(conf); Path outPath = new Path(args[1]); if(fs.exists(outPath)) { fs.delete(outPath, true); } boolean status = job.waitForCompletion(true); return status ? 0 : 1; } }
package cjtriangle;import java.io.IOException;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.NullWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class CJTriangle_Step3 extends Configured implements Tool  {        public static class CJTriangleMapper extends Mapper
{ @Override protected void map( LongWritable key, Text value, Context context) throws IOException, InterruptedException { context.write(new Text(value.toString()), new LongWritable(1)); } } public static class CJTrianglerReducer extends Reducer
{ @Override protected void reduce( Text key, Iterable
values, Context context) throws IOException, InterruptedException { System.out.println("---------------------------------CJTrianglerReducer----------------------------------"); System.out.println("key:" + key); Iterator
iterator = values.iterator(); while(iterator.hasNext()) { LongWritable value = iterator.next(); System.out.println("value:" + value.toString()); } context.write(NullWritable.get(), new Text(key.toString())); System.out.println("---------------------------CJTrianglerReducer END-----------------------------------------"); } } public static void main(String[] args) throws Exception { args = new String[2]; args[0] = "/media/chenjie/0009418200012FF3/ubuntu/trangel2/part-r-00000"; args[1] = "/media/chenjie/0009418200012FF3/ubuntu/trangel3";; int jobStatus = submitJob(args); System.exit(jobStatus); } public static int submitJob(String[] args) throws Exception { int jobStatus = ToolRunner.run(new CJTriangle_Step3(), args); return jobStatus; } @SuppressWarnings("deprecation") @Override public int run(String[] args) throws Exception { Configuration conf = getConf(); Job job = new Job(conf); job.setJobName("trangel1"); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(CJTriangleMapper.class); job.setReducerClass(CJTrianglerReducer.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); FileSystem fs = FileSystem.get(conf); Path outPath = new Path(args[1]); if(fs.exists(outPath)) { fs.delete(outPath, true); } boolean status = job.waitForCompletion(true); return status ? 0 : 1; } }

你可能感兴趣的文章
h:panelGrid、h:panelGroup标签学习
查看>>
f:facet标签 的用法
查看>>
<h:panelgroup>相当于span元素
查看>>
java中append()的方法
查看>>
必学高级SQL语句
查看>>
经典SQL语句大全
查看>>
log日志记录是什么
查看>>
<rich:modelPanel>标签的使用
查看>>
<h:commandLink>和<h:inputLink>的区别
查看>>
<a4j:keeyAlive>的英文介绍
查看>>
关于list对象的转化问题
查看>>
VOPO对象介绍
查看>>
suse创建的虚拟机,修改ip地址
查看>>
linux的挂载的问题,重启后就挂载就没有了
查看>>
docker原始镜像启动容器并创建Apache服务器实现反向代理
查看>>
docker容器秒死的解决办法
查看>>
管理网&业务网的一些笔记
查看>>
openstack报错解决一
查看>>
openstack报错解决二
查看>>
linux source命令
查看>>