InputFormat到key-value生成流程,reduce写出数据流程
InputFormat到key-value生成流程,reduce写出数据流程
Zero零_度 发表于2年前
InputFormat到key-value生成流程,reduce写出数据流程
  • 发表于 2年前
  • 阅读 42
  • 收藏 1
  • 点赞 0
  • 评论 0

华为云·免费上云实践>>>   

摘要: InputFormat到key-value生成流程

inputformat读取数据流程

public abstract class InputFormat<K, V> {

  public abstract
    List<InputSplit> getSplits(JobContext context
                               ) throws IOException, InterruptedException;
 
  public abstract
    RecordReader<K,V> createRecordReader(InputSplit split,
                                         TaskAttemptContext context
                                        ) throws IOException,
                                                 InterruptedException;

}

public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {

public class TextInputFormat extends FileInputFormat<LongWritable, Text> {

getSplits方法,获得对输入文件的切分数量,每一个split对应一个map。
创建RecordReader,该RecordReader接收切分好的split,实现nextKeyValue、getCurrentKey、getCurrentValue。

如下所示,每个map类都会继承Mapper类,在Mapper类中,run方法会调用InputFormat中的RecordReader来获得key、value

public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
  /**
   * Expert users can override this method for more complete control over the
   * execution of the Mapper.
   * @param context
   * @throws IOException
   */
  public void run(Context context) throws IOException, InterruptedException {
    setup(context);
    try {
      while (context.nextKeyValue()) {
        map(context.getCurrentKey(), context.getCurrentValue(), context);
      }
    } finally {
      cleanup(context);
    }
  }
}

reduce写出数据流程:

reduce读取map输出的中间结果
org.apache.hadoop.mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
public void run(Context context) throws IOException, InterruptedException {
  setup(context);
  try {
    while (context.nextKey()) {
      //调用reduce方法,该方法一般由自定义的业务reducer重写
      reduce(context.getCurrentKey(), context.getValues(), context);
      Iterator<VALUEIN> iter = context.getValues().iterator();
      if(iter instanceof ReduceContext.ValueIterator) {
        ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
      }
    }
  } finally {
    cleanup(context);
  }
}

/**
 * 自定义reducer,重写父类reduce方法
 */
public class IssueDataRecordDistinctReducer extends Reducer<Text, LongWritable, NullWritable,Text> {

private NullWritable nullKey = NullWritable.get();

protected void reduce(Text key, Iterable<LongWritable> values, Context context)	throws IOException, InterruptedException {
	context.write(nullKey, new Text("hello"));
}

}

context.write(nullKey, new Text("hello"));
reduce中,每一次context.write操作都会调用自定义OutputFormat中的RecordWrite类中的write方法

 

共有 人打赏支持
粉丝 63
博文 725
码字总数 240499
×
Zero零_度
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: