MapReduce源码之FileOutputFormat

原创
2017/08/11 00:15
阅读数 279

FileOutputFormat是所有基于文件的OutputFormat的基类。这个基类提供了checkOutputSpecs和getOutputCommitter的通用性实现。FileOutputFormat的子类需要覆写getRecordWriter方法来返回LineRecordWriter

/** A base class for {@link OutputFormat}s that read from {@link FileSystem}s.*/

 

TextOutputFormat.getRecordWriter方法

public RecordWriter<K, V> 
       getRecordWriter(TaskAttemptContext job
                       ) throws IOException, InterruptedException {
  Configuration conf = job.getConfiguration();
  boolean isCompressed = getCompressOutput(job);
  String keyValueSeparator= conf.get(SEPERATOR, "\t");
  CompressionCodec codec = null;
  String extension = "";
  if (isCompressed) {
    Class<? extends CompressionCodec> codecClass = 
      getOutputCompressorClass(job, GzipCodec.class);
    codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
    extension = codec.getDefaultExtension();
  }
  Path file = getDefaultWorkFile(job, extension);
  FileSystem fs = file.getFileSystem(conf);
  if (!isCompressed) {
    FSDataOutputStream fileOut = fs.create(file, false);
    return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
  } else {
    FSDataOutputStream fileOut = fs.create(file, false);
    return new LineRecordWriter<K, V>(new DataOutputStream
                                      (codec.createOutputStream(fileOut)),
                                      keyValueSeparator);
  }
}

TextOutputFormat内部类LineRecordWriter的write方法将结果写入文件。

protected static class LineRecordWriter<K, V>
  extends RecordWriter<K, V> {
  private static final String utf8 = "UTF-8";
  private static final byte[] newline;
  static {
    try {
      newline = "\n".getBytes(utf8);
    } catch (UnsupportedEncodingException uee) {
      throw new IllegalArgumentException("can't find " + utf8 + " encoding");
    }
  }

  protected DataOutputStream out;
  private final byte[] keyValueSeparator;

  public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
    this.out = out;
    try {
      this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
    } catch (UnsupportedEncodingException uee) {
      throw new IllegalArgumentException("can't find " + utf8 + " encoding");
    }
  }

  public LineRecordWriter(DataOutputStream out) {
    this(out, "\t");
  }

  /**
   * Write the object to the byte stream, handling Text as a special
   * case.
   * @param o the object to print
   * @throws IOException if the write throws, we pass it on
   */
  private void writeObject(Object o) throws IOException {
    if (o instanceof Text) {
      Text to = (Text) o;
      out.write(to.getBytes(), 0, to.getLength());
    } else {
      out.write(o.toString().getBytes(utf8));
    }
  }

  public synchronized void write(K key, V value)
    throws IOException {

    boolean nullKey = key == null || key instanceof NullWritable;
    boolean nullValue = value == null || value instanceof NullWritable;
    if (nullKey && nullValue) {
      return;
    }
    if (!nullKey) {
      writeObject(key);
    }
    if (!(nullKey || nullValue)) {
      out.write(keyValueSeparator);
    }
    if (!nullValue) {
      writeObject(value);
    }
    out.write(newline);
  }

  public synchronized 
  void close(TaskAttemptContext context) throws IOException {
    out.close();
  }
}

 

 

SequenceFileOutputFormat

package org.apache.hadoop.mapreduce.lib.output;

import java.io.IOException;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;

/** An {@link OutputFormat} that writes {@link SequenceFile}s. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class SequenceFileOutputFormat <K,V> extends FileOutputFormat<K, V> {

  protected SequenceFile.Writer getSequenceWriter(TaskAttemptContext context,
      Class<?> keyClass, Class<?> valueClass) 
      throws IOException {
    Configuration conf = context.getConfiguration();
       
    CompressionCodec codec = null;
    CompressionType compressionType = CompressionType.NONE;
    if (getCompressOutput(context)) {
      // find the kind of compression to do
      compressionType = getOutputCompressionType(context);
      // find the right codec
      Class<?> codecClass = getOutputCompressorClass(context, 
                                                     DefaultCodec.class);
      codec = (CompressionCodec) 
        ReflectionUtils.newInstance(codecClass, conf);
    }
    // get the path of the temporary output file 
    Path file = getDefaultWorkFile(context, "");
    FileSystem fs = file.getFileSystem(conf);
    return SequenceFile.createWriter(fs, conf, file,
             keyClass,
             valueClass,
             compressionType,
             codec,
             context);
  }
  
  public RecordWriter<K, V> 
         getRecordWriter(TaskAttemptContext context
                         ) throws IOException, InterruptedException {
    final SequenceFile.Writer out = getSequenceWriter(context,
      context.getOutputKeyClass(), context.getOutputValueClass());

    return new RecordWriter<K, V>() {

        public void write(K key, V value)
          throws IOException {

          out.append(key, value);
        }

        public void close(TaskAttemptContext context) throws IOException { 
          out.close();
        }
      };
  }

  /**
   * Get the {@link CompressionType} for the output {@link SequenceFile}.
   * @param job the {@link Job}
   * @return the {@link CompressionType} for the output {@link SequenceFile}, 
   *         defaulting to {@link CompressionType#RECORD}
   */
  public static CompressionType getOutputCompressionType(JobContext job) {
    String val = job.getConfiguration().get(FileOutputFormat.COMPRESS_TYPE, 
                                            CompressionType.RECORD.toString());
    return CompressionType.valueOf(val);
  }
  
  /**
   * Set the {@link CompressionType} for the output {@link SequenceFile}.
   * @param job the {@link Job} to modify
   * @param style the {@link CompressionType} for the output
   *              {@link SequenceFile} 
   */
  public static void setOutputCompressionType(Job job, 
                                                CompressionType style) {
    setCompressOutput(job, true);
    job.getConfiguration().set(FileOutputFormat.COMPRESS_TYPE, 
                               style.toString());
  }

}
展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部