怎么做坑人的网站,镇江一网推网络技术有限公司,建网站 是否 数据库,淘宝返利网站建设一. OutputFormat简介
OutputFormat是MapReduce输出的基类#xff0c;所有MapReduce输出都实现了OutputFormat接口#xff0c;它接收ReduceTask产生的数据#xff0c;然后将结果按照指定格式输出。
在MapReduce中#xff0c;如果不指定#xff0c;默认使用的是TextOutpu…一. OutputFormat简介
OutputFormat是MapReduce输出的基类所有MapReduce输出都实现了OutputFormat接口它接收ReduceTask产生的数据然后将结果按照指定格式输出。
在MapReduce中如果不指定默认使用的是TextOutputFormat。但是在一些特定的场景下默认的TextOutputFormat不一定能满足我们的需求因此可以自定义OutputFormat来实现个性化需求。
二. 需求
使用MapReduce对输入文件中的单词进行计数单词hello的计数结果输出到hello.log中非hello的单词的计数结果输出到non-hello.log。
要实现上面的输出需求就需要自定义OutputFormat。
自定义OutputFormat的步骤
自定义一个类继承FileOutputFormat。自定义一个类继承RecordWriter重写方法write()和close()。
代码实现
package mr;import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;class MultiOuputFormat extends FileOutputFormatText, IntWritable {Overridepublic RecordWriterText, IntWritable getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {Configuration configuration job.getConfiguration();String outputPath configuration.get(FileOutputFormat.OUTDIR);FileSystem fs FileSystem.get(configuration);Path path1 new Path(outputPath /hello.log);Path path2 new Path(outputPath /non-hello.log);if (fs.exists(path1)) {fs.delete(path1, true);}if (fs.exists(path2)) {fs.delete(path2, true);}FSDataOutputStream out1 fs.create(path1);FSDataOutputStream out2 fs.create(path2);return new MyRecordWriter(out1, out2);}
}class MyRecordWriter extends RecordWriterText, IntWritable {private FSDataOutputStream out1;private FSDataOutputStream out2;public MyRecordWriter(FSDataOutputStream out1, FSDataOutputStream out2) {super();this.out1 out1;this.out2 out2;}Overridepublic void write(Text key, IntWritable value) throws IOException, InterruptedException {String outStr key.toString() , value.toString() \n;if (key.toString().contains(hello)) {out1.write(outStr.getBytes());} else {out2.write(outStr.getBytes());}}Overridepublic void close(TaskAttemptContext context) throws IOException, InterruptedException {IOUtils.close(out1);IOUtils.close(out2);}
}public class WordCountOutputFormat {static class WordCountMapper extends MapperLongWritable, Text, Text, IntWritable {Overridepublic void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] words value.toString().split( );for (String word: words) {context.write(new Text(word), new IntWritable(1));}}}static class WordCountReducer extends ReducerText, IntWritable, Text, IntWritable {Overridepublic void reduce(Text key, IterableIntWritable values, Context context) throws IOException, InterruptedException {int sum 0;for (IntWritable val : values) {sum val.get();}context.write(key, new IntWritable(sum));}}public static void main(String[] args) throws Exception {Configuration conf new Configuration();Job job Job.getInstance(conf);job.setJarByClass(WordCountOutputFormat.class);job.setJobName(WordCount);// 设置输入,输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));// 设置Mapperjob.setMapperClass(WordCountOutputFormat.WordCountMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 设置Reducerjob.setReducerClass(WordCountOutputFormat.WordCountReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);job.setNumReduceTasks(1);job.setOutputFormatClass(MultiOuputFormat.class);boolean waitFor job.waitForCompletion(true);System.exit(waitFor ? 0 : 1);}
}
运行结果
[roothadoop1 ~]# yarn jar learn-1.0-SNAPSHOT.jar mr.WordCountOutputFormat /test/a.txt /output# 查看输入文件
[roothadoop1 ~]# hdfs dfs -text /test/a.txt
hello world
name hello
world# 查看结果文件
[roothadoop1 ~]# hdfs dfs -ls /output
Found 3 items
-rw-r--r-- 3 root supergroup 0 2024-10-29 21:52 /output/_SUCCESS
-rw-r--r-- 3 root supergroup 8 2024-10-29 21:52 /output/hello.log
-rw-r--r-- 3 root supergroup 15 2024-10-29 21:52 /output/non-hello.log
[roothadoop1 ~]# hdfs dfs -text /output/hello.log
hello,2
[roothadoop1 ~]# hdfs dfs -text /output/non-hello.log
name,1
world,2