网站建设信息公开和解读回应,建筑网站排行榜,传奇网站制作网,成都酒店设计十大公司排名目录
一.引言
二.源码浅析
1.RDD.saveAsTextFile
2.TextOutputFormat
3.FileOutputFormat
三.源码修改
1.修改文件生成逻辑 - getRecordWriter
2.允许目录存在 - checkoutputSpecs
3.全部代码 - TextOutputFormatV2
四.追加存储代码实战
五.总结 一.引言
Output d…目录
一.引言
二.源码浅析
1.RDD.saveAsTextFile
2.TextOutputFormat
3.FileOutputFormat
三.源码修改
1.修改文件生成逻辑 - getRecordWriter
2.允许目录存在 - checkoutputSpecs
3.全部代码 - TextOutputFormatV2
四.追加存储代码实战
五.总结 一.引言
Output directory file XXX already exists 目标目录已经存在这个报错写 Spark 的同学都不会陌生它不允许我们在同一个目录持续增加文件存储。在使用 Flink 文件流场景中我们有向 HDFS 目录追加文件的需求所以下面我们尝试继承 FileOutputFormat 实现自定义文件追加。 二.源码浅析
自定义实现之前我们需要明确一下追加文件的两个主要问题
- 允许目录存在
即避免 Output directory file XXX already exists 的报错
- 避免 File 重复
由于是追加文件这里我们要避免文件名相同导致追加失败 1.RDD.saveAsTextFile
这个是我们日常使用的文本落地 API 这里将原始的 RDD[String] 转化为 (NullWritable.get(), text) 的 pairRDD 并调用后续的 saveAsHadoopFile 并传入 TextOutputFormat
- NullWritable
在Hadoop1.中是 Comparable因此编译器无法找到隐式为其排序并将使用默认的 null。然而它是一个 Comparable[NullWritable] 在 Hadoop2. 中编译器将调用隐式 Ordering.ordered” 方法来创建为 NullWritable 排序。这就是为什么编译器会生成不同的匿名Hadoop1.和Hadoop2.中“saveAsTextFile”的类。因此在这里我们提供了一个显式排序“null”以确保编译器生成 saveAsTextFile 的字节码相同。
这里翻译自官方 API简言之我们后续自定义 OutputFormat 时需要将 RDD[String] 转换为 PairRDD 并将 key 置为 NullWritable否则这里无法调用 saveAsHadoopFile 2.TextOutputFormat
前者继承了后者我们先看下前者复写了哪些函数 非常简洁大部分方法都使用父类的实现这里 getRecordWriter 的具体实现对应我们上面提到的第二个问题即避免文件重复因为其负责根据 name 轮训生成落地的 Path 地址我们修改这个函数即可避免追加时文件重复。 3.FileOutputFormat
TextOutputFormat Extends FileOutputFormat上面 TextOutputFormat 只实现了 Path 相关的工作所以需要继续到父类 FileOutputFormat 寻找抛出异常的语句 在文件中搜索 already exists 即可定位到当前函数是不是很熟悉因此针对第一个问题避免文件的报错就要修改这里了最简单的我们把这三行注释掉即可。 三.源码修改
经过上面的源码三部曲我们如何修改 TextOutputFormat 思路也很清晰了修改文件生成逻辑、取消抛出异常即可下面看一下代码实现
1.修改文件生成逻辑 - getRecordWriter 这里我们在源码中增加了 updateFileName 函数该函数由用户自己定义输出文件名常规的我们可以按照 part-00000、part-00001 的顺序继续存储下去当然如果为了区分追加文件的添加时间与类型我们也可以给其打上时间戳和自定义标记都不想用的话也可以直接用 UUID 代替
import java.util.UUIDUUID.randomUUID().toString下面看一下 updateFileName 的实现
这里初始化变量 fileName 为 更新时对其加锁并基于当前 name 进行判断如果当前 fileName 为 则代表是第一次保存因此默认使用 name 的 part-00000后续再多次存储是我们就可以获取 fileName 的后缀进行累加输出了这里使用 DecimalFormat 实现了自动补 0 的操作。 static String fileName ;DecimalFormat decimalFormat new DecimalFormat(00000);public void updateFileName(String name) {synchronized (fileName) {if (fileName.equals()) {fileName name;} else {fileName part- decimalFormat.format(Integer.parseInt(fileName.split(-)[1]) 1);}}} 2.允许目录存在 - checkoutputSpecs 把 throw Exception 的异常去掉就好了这里保留了 Println 提示目录已经存在并开始追加。 3.全部代码 - TextOutputFormatV2
换个 TextOutputFormatV2 实现我们追加文件存储的目的。
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.text.DecimalFormat;import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.util.*;/*** An {link OutputFormat} that writes plain text files.*/
InterfaceAudience.Public
InterfaceStability.Stable
public class TextOutputFormatV2K, V extends FileOutputFormatK, V {static String fileName ;DecimalFormat decimalFormat new DecimalFormat(00000);public void updateFileName(String name) {synchronized (fileName) {if (fileName.equals()) {fileName name;} else {fileName part- decimalFormat.format(Integer.parseInt(fileName.split(-)[1]) 1);}}}protected static class LineRecordWriterK, Vimplements RecordWriterK, V {private static final String utf8 UTF-8;private static final byte[] newline;static {try {newline \n.getBytes(utf8);} catch (UnsupportedEncodingException uee) {throw new IllegalArgumentException(cant find utf8 encoding);}}protected DataOutputStream out;private final byte[] keyValueSeparator;public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {this.out out;try {this.keyValueSeparator keyValueSeparator.getBytes(utf8);} catch (UnsupportedEncodingException uee) {throw new IllegalArgumentException(cant find utf8 encoding);}}public LineRecordWriter(DataOutputStream out) {this(out, \t);}/*** Write the object to the byte stream, handling Text as a special* case.* param o the object to print* throws IOException if the write throws, we pass it on*/private void writeObject(Object o) throws IOException {if (o instanceof Text) {Text to (Text) o;out.write(to.getBytes(), 0, to.getLength());} else {out.write(o.toString().getBytes(utf8));}}public synchronized void write(K key, V value)throws IOException {boolean nullKey key null || key instanceof NullWritable;boolean nullValue value null || value instanceof NullWritable;if (nullKey nullValue) {return;}if (!nullKey) {writeObject(key);}if (!(nullKey || nullValue)) {out.write(keyValueSeparator);}if (!nullValue) {writeObject(value);}out.write(newline);}public synchronized void close(Reporter reporter) throws IOException {out.close();}}Overridepublic void checkOutputSpecs(FileSystem ignored, JobConf job)throws FileAlreadyExistsException,InvalidJobConfException, IOException {// Ensure that the output directory is set and not already therePath outDir getOutputPath(job);if (outDir null job.getNumReduceTasks() ! 0) {throw new InvalidJobConfException(Output directory not set in JobConf.);}if (outDir ! null) {FileSystem fs outDir.getFileSystem(job);// normalize the output directoryoutDir fs.makeQualified(outDir);setOutputPath(job, outDir);// get delegation token for the outDirs file systemTokenCache.obtainTokensForNamenodes(job.getCredentials(),new Path[] {outDir}, job);// check its existenceif (fs.exists(outDir)) {System.out.println(Output directory outDir already exists, Start Append!);}}}public RecordWriterK, V getRecordWriter(FileSystem ignored,JobConf job,String name,Progressable progress)throws IOException {boolean isCompressed getCompressOutput(job);String keyValueSeparator job.get(mapreduce.output.textoutputformat.separator,\t);updateFileName(name);if (!isCompressed) {Path file FileOutputFormat.getTaskOutputPath(job, fileName);FileSystem fs file.getFileSystem(job);FSDataOutputStream fileOut fs.create(file, progress);return new com.CommonTool.TextOutputFormatV2.LineRecordWriterK, V(fileOut, keyValueSeparator);} else {Class? extends CompressionCodec codecClass getOutputCompressorClass(job, GzipCodec.class);// create the named codecCompressionCodec codec ReflectionUtils.newInstance(codecClass, job);// build the filename including the extensionPath file FileOutputFormat.getTaskOutputPath(job,fileName codec.getDefaultExtension());FileSystem fs file.getFileSystem(job);FSDataOutputStream fileOut fs.create(file, progress);return new com.CommonTool.TextOutputFormatV2.LineRecordWriterK, V(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);}}
} 四.追加存储代码实战 def main(args: Array[String]): Unit {val (argsList, argsMap) ArgsParseUtil.parseArgs(args)val conf (new SparkConf).setAppName(AppendFileToHdfs).setMaster(local[*])val spark SparkSession.builder.config(conf).getOrCreate()val sc spark.sparkContextval output argsMap.getOrElse(output, ./append_output)val data sc.parallelize(0 to 1000).mapPartitions { iter val text new Text()iter.map { x text.set(x.toString)(NullWritable.get(), text)}}(0 until 10).foreach(epoch {// 存储一次无法继续存储// data.saveAsTextFile(output)data.saveAsHadoopFile(output, classOf[NullWritable], classOf[Text], classOf[TextOutputFormatV2[NullWritable, String]])})}
首先转化为 (NullWritable.get(), text) 的 pairRDD随后调用 saveAsHadoopFile 方法并传入我们自定义的 TextOutputFormatV2 即可由于我们 for 循环了 10 次所以打印了 10 个相关日志。 下面再看下生成的文件 每个文件存 250 个数字每次存储 4 个 part10 次 40 个且保持递增顺序。 五.总结
想要结合源码进行修改时结合自己的需求带着问题去找对应的函数再复写就 OK 了由于这里逻辑比较简单所以我们也没踩太多坑。上面采用的是单次 Job 连续存储所以 Format 里 FileName 能够达到累加的情况如果是多个 Job 重复启动则每次获取的都是 part-00000这时如果还想要保持文件名递增的话可以使用 FileSystem.listStatus 遍历文件夹获取 modifyTime 最新的文件并取其 name 即可拿到最新的文件名 part此时将参数传入 TextFormat 并修改 update 逻辑即可实现多 Job 重复启动且文件名递增的需求了。当然了使用 UUID Date 是最省事滴。