文档章节

hadoop-ID分析

cookqq
 cookqq
发布于 2014/01/05 20:41
字数 1073
阅读 465
收藏 1

我们开始来分析Hadoop MapReduce的内部的运行机制。用户向Hadoop提交Job(作业),作业在JobTracker对象的控制下执行。Job被分解成为Task(任务),分发到集群中,在TaskTracker的控制下运行。Task包括MapTask和ReduceTask,是MapReduce的Map操作和Reduce操作执行的地方。这中任务分布的方法比较类似于HDFS中NameNode和DataNode的分工,NameNode对应的是JobTracker,DataNode对应的是TaskTracker。JobTracker,TaskTracker和MapReduce的客户端通过RPC通信,具体可以参考HDFS部分的分析。

我们先来分析一些辅助类,首先是和ID有关的类,ID的继承树如下:


* Licensed to the Apache Software Foundation (ASF) under one

package org.apache.hadoop.mapreduce;

import java.io.DataInput;

/**
 * A general identifier, which internally stores the id
 * as an integer. This is the super class of {@link JobID}, 
 * {@link TaskID} and {@link TaskAttemptID}.
 * 
 * @see JobID
 * @see TaskID
 * @see TaskAttemptID
 */
public abstract class ID implements WritableComparable<ID> {
  protected static final char SEPARATOR = '_';
  protected int id;

  /** constructs an ID object from the given int */
  public ID(int id) {
    this.id = id;
  }

  protected ID() {
  }

  /** returns the int which represents the identifier */
  public int getId() {
    return id;
  }

  @Override
  public String toString() {
    return String.valueOf(id);
  }

  @Override
  public int hashCode() {
    return Integer.valueOf(id).hashCode();
  }

  @Override
  public boolean equals(Object o) {
    if (this == o)
      return true;
    if(o == null)
      return false;
    if (o.getClass() == this.getClass()) {
      ID that = (ID) o;
      return this.id == that.id;
    }
    else
      return false;
  }

  /** Compare IDs by associated numbers */
  public int compareTo(ID that) {
    return this.id - that.id;
  }

  public void readFields(DataInput in) throws IOException {
    this.id = in.readInt();
  }

  public void write(DataOutput out) throws IOException {
    out.writeInt(id);
  }
  
}

* Licensed to the Apache Software Foundation (ASF) under one

package org.apache.hadoop.mapreduce;

import java.io.DataInput;

/**
 * JobID represents the immutable and unique identifier for 
 * the job. JobID consists of two parts. First part 
 * represents the jobtracker identifier, so that jobID to jobtracker map 
 * is defined. For cluster setup this string is the jobtracker 
 * start time, for local setting, it is "local".
 * Second part of the JobID is the job number. <br> 
 * An example JobID is : 
 * <code>job_200707121733_0003</code> , which represents the third job 
 * running at the jobtracker started at <code>200707121733</code>. 
 * <p>
 * Applications should never construct or parse JobID strings, but rather 
 * use appropriate constructors or {@link #forName(String)} method. 
 * 
 * @see TaskID
 * @see TaskAttemptID
 * @see org.apache.hadoop.mapred.JobTracker#getNewJobId()
 * @see org.apache.hadoop.mapred.JobTracker#getStartTime()
 */
public class JobID extends org.apache.hadoop.mapred.ID 
                   implements Comparable<ID> {
  protected static final String JOB = "job";
  private final Text jtIdentifier;
  
  protected static final NumberFormat idFormat = NumberFormat.getInstance();
  static {
    idFormat.setGroupingUsed(false);
    idFormat.setMinimumIntegerDigits(4);
  }
  
  /**
   * Constructs a JobID object 
   * @param jtIdentifier jobTracker identifier
   * @param id job number
   */
  public JobID(String jtIdentifier, int id) {
    super(id);
    this.jtIdentifier = new Text(jtIdentifier);
  }
  
  public JobID() { 
    jtIdentifier = new Text();
  }
  
  public String getJtIdentifier() {
    return jtIdentifier.toString();
  }
  
  @Override
  public boolean equals(Object o) {
    if (!super.equals(o))
      return false;

    JobID that = (JobID)o;
    return this.jtIdentifier.equals(that.jtIdentifier);
  }
  
  /**Compare JobIds by first jtIdentifiers, then by job numbers*/
  @Override
  public int compareTo(ID o) {
    JobID that = (JobID)o;
    int jtComp = this.jtIdentifier.compareTo(that.jtIdentifier);
    if(jtComp == 0) {
      return this.id - that.id;
    }
    else return jtComp;
  }
  
  /**
   * Add the stuff after the "job" prefix to the given builder. This is useful,
   * because the sub-ids use this substring at the start of their string.
   * @param builder the builder to append to
   * @return the builder that was passed in
   */
  public StringBuilder appendTo(StringBuilder builder) {
    builder.append(SEPARATOR);
    builder.append(jtIdentifier);
    builder.append(SEPARATOR);
    builder.append(idFormat.format(id));
    return builder;
  }

  @Override
  public int hashCode() {
    return jtIdentifier.hashCode() + id;
  }

  @Override
  public String toString() {
    return appendTo(new StringBuilder(JOB)).toString();
  }

  @Override
  public void readFields(DataInput in) throws IOException {
    super.readFields(in);
    this.jtIdentifier.readFields(in);
  }

  @Override
  public void write(DataOutput out) throws IOException {
    super.write(out);
    jtIdentifier.write(out);
  }
  
  /** Construct a JobId object from given string 
   * @return constructed JobId object or null if the given String is null
   * @throws IllegalArgumentException if the given string is malformed
   */
  public static JobID forName(String str) throws IllegalArgumentException {
    if(str == null)
      return null;
    try {
      String[] parts = str.split("_");
      if(parts.length == 3) {
        if(parts[0].equals(JOB)) {
          return new org.apache.hadoop.mapred.JobID(parts[1], 
                                                    Integer.parseInt(parts[2]));
        }
      }
    }catch (Exception ex) {//fall below
    }
    throw new IllegalArgumentException("JobId string : " + str 
        + " is not properly formed");
  }
  
}



这张图可以看出现在Hadoop的org.apache.hadoop.mapred向org.apache.hadoop.mapreduce迁移带来的一些问题,其中灰色是标注为@Deprecated的。ID携带一个整型,实现了WritableComparable接口,这表明它可以比较,而且可以被Hadoop的io机制串行化/解串行化(必须实现compareTo/readFields/write方法)。JobID是系统分配给作业的唯一标识符,它的toString结果是job_<jobtrackerID>_<jobNumber>。例子:job_200707121733_0003表明这是jobtracker 200707121733(利用jobtracker的开始时间作为ID)的第3号作业。


作业分成任务执行,任务号TaskID包含了它所属的作业ID,同时也有任务ID,同时还保持了这是否是一个Map任务(成员变量isMap)。任务号的字符串表示为task_<jobtrackerID>_<jobNumber>_[m|r]_<taskNumber>,如task_200707121733_0003_m_000005表示作业200707121733_0003的000005号任务,改任务是一个Map任务。

一个任务有可能有多个执行(错误恢复/消除Stragglers等),所以必须区分任务的多个执行,这是通过类TaskAttemptID来完成,它在任务号的基础上添加了尝试号。一个任务尝试号的例子是attempt_200707121733_0003_m_000005_0,它是任务task_200707121733_0003_m_000005的第0号尝试。

JVMId用于管理任务执行过程中的Java虚拟机,我们后面再讨论。


本文转载自:http://caibinbupt.iteye.com/blog/346766

上一篇: Hadoop杂记
下一篇: hadoop-reduce分析
cookqq

cookqq

粉丝 119
博文 268
码字总数 156096
作品 0
海淀
技术主管
私信 提问
Hadoop的RPC设计分析

  之前鼓捣Hbase的时候,觉得单机和伪分布式模式太low了,就在笔记本上用三个虚拟机搭建了一个“完全分布式”的Hbase环境(心疼破本子一秒钟)。刚好趁这个元旦假期,我就研究了一下Hadoop。...

大数据头条
2018/01/04
0
0
Hadoop源代码分析(*IDs类和*Context类)

我们开始来分析Hadoop MapReduce的内部的运行机制。用户向Hadoop提交Job(作业),作业在JobTracker对象的控制下执行。Job被分解成为Task(任务),分发到集群中,在TaskTracker的控制下运行...

超人学院
2015/05/26
128
0
CentOS SSH无密码登录原理,配置以及常见问题

1.原理简介 为了便于理解,假设需要在hadoop148这台机器上可以通过无密码登录的方式连接到hadoop107上。 首先在 hadoop148上生成一个密 钥对,包括一个公钥和一个私钥,并将公钥复制到hadoo...

双月通天
2016/03/22
160
0
Cloudera Manager中Uber模式下MapReduce任务执行无法加载Native Libraries

问题现象 Cloudera Manager(以下简称CM)安装CDH,在Hive中执行任务,MapReduce任务使用Uber模式运行,报如下错误: 日志分析 查看Job的详细日志为: 问题分析:Mapreduce任务进入Uber模式,但...

Yulong_
2016/10/14
231
1
伪分布模式hadoop运行java源程序

写好源代码之后,首先要编译:javac -classpath /usr/local/hadoop/hadoop-core-1.2.1.jar:/usr/local/hadoop/lib/commons-cli-1.2.jar count.java -d org在org目录下生成三个class文件:cou......

lcj1992
2014/01/12
178
0

没有更多内容

加载失败,请刷新页面

加载更多

toast组件单元测试

先看是否存在 describe('Toast', () => { it('存在.', () => { expect(Toast).to.be.exist }) }); 看属性,我们要测 ToastVue 和 plugin.js describe('Toast', () =>......

ories
34分钟前
57
0
如何将整个MySQL数据库字符集和排序规则转换为UTF-8?

如何将整个MySQL数据库字符集转换为UTF-8并将排序规则转换为UTF-8? #1楼 在命令行外壳上 如果您是命令行外壳程序之一,则可以非常快速地执行此操作。 只需填写“ dbname”:D DB="dbname"(...

javail
今天
80
0
开源矿工系统内部的层

开源矿工系统内部的层 所谓“层”、“界”、“域”、“集合”,这些词其实是在试图表达物质系统的组成结构和运动景象中的规矩,这些不同人发明的词都是来源于对同一个规律的观察、发现、表达...

NTMiner
今天
88
0
如何将文件从一个git repo移到另一个(不是克隆),保留历史记录

我们的Git储存库是作为单个Monster SVN储存库的一部分开始的,其中每个项目都有自己的树,如下所示: project1/branches /tags /trunkproject2/branches /tags ...

技术盛宴
今天
65
0
数据结构之数组-c代码实现

在上一篇文章里讲了数组的具体内容,然后自己使用c语言对数组进行了实现。 其中定义了一个结构体,定义了长度、已使用长度和地址指针。 定义alloc函数来分配内存空间 之后便是插入元素的ins...

无心的梦呓
今天
65
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部