文档章节

Giraph源码分析(八)—— 统计每个SuperStep中参与计算的顶点数目

数澜科技
 数澜科技
发布于 08/20 11:03
字数 1112
阅读 6
收藏 0

作者|白松

目的:科研中,需要分析在每次迭代过程中参与计算的顶点数目,来进一步优化系统。比如,在SSSP的compute()方法最后一行,都会把当前顶点voteToHalt,即变为InActive状态。所以每次迭代完成后,所有顶点都是InActive状态。在大同步后,收到消息的顶点会被激活,变为Active状态,然后调用顶点的compute()方法。本文的目的就是统计每次迭代过程中,参与计算的顶点数目。下面附上SSSP的compute()方法:

@Override
  public void compute(Iterable messages) {
    if (getSuperstep() == 0) {
      setValue(new DoubleWritable(Double.MAX_VALUE));
    }
    double minDist = isSource() ? 0d : Double.MAX_VALUE;
    for (DoubleWritable message : messages) {
      minDist = Math.min(minDist, message.get());
    }
    if (minDist < getValue().get()) {
      setValue(new DoubleWritable(minDist));
      for (Edge edge : getEdges()) {
        double distance = minDist + edge.getValue().get();
        sendMessage(edge.getTargetVertexId(), new DoubleWritable(distance));
      }
    }
	//把顶点置为InActive状态
    voteToHalt();
  }

附:giraph中算法的终止条件是:没有活跃顶点且worker间没有消息传递。

hama-0.6.0中算法的终止条件只是:判断是否有活跃顶点。不是真正的pregel思想,半成品。

修改过程如下:

  1. org.apache.giraph.partition. PartitionStats 类

添加变量和方法,用来统计每个Partition在每个超步中参与计算的顶点数目。添加的变量和方法如下:

/** computed vertices in this partition */
private long computedVertexCount=0;
 
/**
* Increment the computed vertex count by one.
*/
public void incrComputedVertexCount() {
    ++ computedVertexCount;
}
 
/**
 * @return the computedVertexCount
 */
public long getComputedVertexCount() {
	return computedVertexCount;
}

修改readFields()和write()方法,每个方法追加最后一句。当每个Partition计算完成后,会把自己的computedVertexCount发送给Master,Mater再读取汇总。

@Override
public void readFields(DataInput input) throws IOException {
    partitionId = input.readInt();
    vertexCount = input.readLong();
    finishedVertexCount = input.readLong();
    edgeCount = input.readLong();
    messagesSentCount = input.readLong();
    //添加下条语句
    computedVertexCount=input.readLong();
}
 
@Override
public void write(DataOutput output) throws IOException {
    output.writeInt(partitionId);
    output.writeLong(vertexCount);
    output.writeLong(finishedVertexCount);
    output.writeLong(edgeCount);
    output.writeLong(messagesSentCount);
    //添加下条语句
    output.writeLong(computedVertexCount);
}
  1. org.apache.giraph.graph. GlobalStats 类

    添加变量和方法,用来统计每个超步中参与计算的顶点总数目,包含每个Worker上的所有Partitions。

 /** computed vertices in this partition 
  *  Add by BaiSong 
  */
  private long computedVertexCount=0;
	 /**
	 * @return the computedVertexCount
	 */
	public long getComputedVertexCount() {
		return computedVertexCount;
	}

修改addPartitionStats(PartitionStats partitionStats)方法,增加统计computedVertexCount功能。

/**
  * Add the stats of a partition to the global stats.
  *
  * @param partitionStats Partition stats to be added.
  */
  public void addPartitionStats(PartitionStats partitionStats) {
    this.vertexCount += partitionStats.getVertexCount();
    this.finishedVertexCount += partitionStats.getFinishedVertexCount();
    this.edgeCount += partitionStats.getEdgeCount();
    //Add by BaiSong,添加下条语句
    this.computedVertexCount+=partitionStats.getComputedVertexCount();
 }

当然为了Debug方便,也可以修改该类的toString()方法(可选),修改后的如下:

public String toString() {
		return "(vtx=" + vertexCount + ", computedVertexCount="
				+ computedVertexCount + ",finVtx=" + finishedVertexCount
				+ ",edges=" + edgeCount + ",msgCount=" + messageCount
				+ ",haltComputation=" + haltComputation + ")";
	}
  1. org.apache.giraph.graph. ComputeCallable<I,V,E,M>

添加统计功能。在computePartition()方法中,添加下面一句。

if (!vertex.isHalted()) {
        context.progress();
        TimerContext computeOneTimerContext = computeOneTimer.time();
        try {
            vertex.compute(messages);
	    //添加下面一句,当顶点调用完compute()方法后,就把该Partition的computedVertexCount加1
            partitionStats.incrComputedVertexCount();
        } finally {
           computeOneTimerContext.stop();
        }
……
  1. 添加Counters统计,和我的博客Giraph源码分析(七)—— 添加消息统计功能 类似,此处不再详述。添加的类为:org.apache.giraph.counters.GiraphComputedVertex,下面附上该类的源码:
package org.apache.giraph.counters;
 
import java.util.Iterator;
import java.util.Map;
 
import org.apache.hadoop.mapreduce.Mapper.Context;
import com.google.common.collect.Maps;
 
/**
 * Hadoop Counters in group "Giraph Messages" for counting every superstep
 * message count.
 */
 
public class GiraphComputedVertex extends HadoopCountersBase {
	/** Counter group name for the giraph Messages */
	public static final String GROUP_NAME = "Giraph Computed Vertex";
 
	/** Singleton instance for everyone to use */
	private static GiraphComputedVertex INSTANCE;
 
	/** superstep time in msec */
	private final Map superstepVertexCount;
 
	private GiraphComputedVertex(Context context) {
		super(context, GROUP_NAME);
		superstepVertexCount = Maps.newHashMap();
	}
 
	/**
	 * Instantiate with Hadoop Context.
	 * 
	 * @param context
	 *            Hadoop Context to use.
	 */
	public static void init(Context context) {
		INSTANCE = new GiraphComputedVertex(context);
	}
 
	/**
	 * Get singleton instance.
	 * 
	 * @return singleton GiraphTimers instance.
	 */
	public static GiraphComputedVertex getInstance() {
		return INSTANCE;
	}
 
	/**
	 * Get counter for superstep messages
	 * 
	 * @param superstep
	 * @return
	 */
	public GiraphHadoopCounter getSuperstepVertexCount(long superstep) {
		GiraphHadoopCounter counter = superstepVertexCount.get(superstep);
		if (counter == null) {
			String counterPrefix = "Superstep: " + superstep+" ";
			counter = getCounter(counterPrefix);
			superstepVertexCount.put(superstep, counter);
		}
		return counter;
	}
 
	@Override
	public Iterator iterator() {
		return superstepVertexCount.values().iterator();
	}
}
  1. 实验结果,运行程序后。会在终端输出每次迭代参与计算的顶点总数目。 测试SSSP(SimpleShortestPathsVertex类),输入图中共有9个顶点和12条边。输出结果如下:

上图测试中,共有6次迭代。红色框中,显示出了每次迭代过冲参与计算的顶点数目,依次是:9,4,4,3,4,0

解释:在第0个超步,每个顶点都是活跃的,所有共有9个顶点参与计算。在第5个超步,共有0个顶点参与计算,那么就不会向外发送消息,加上每个顶点都是不活跃的,所以算法迭代终止。

【阅读更多文章请访问数澜社区

© 著作权归作者所有

数澜科技
粉丝 0
博文 30
码字总数 57666
作品 0
杭州
私信 提问
Giraph源码分析(一)— 启动ZooKeeper服务

作者 | 白松 【注:本文为原创,引用转载需与数澜联系。】 Giraph介绍: Apache Giraph is an iterative graph processing system built for high scalability. For example, it is current......

数懒
07/19
0
0
直击Hadoop Summit 2011:迎接海量数据挑战

海量数据正在不断生成,对于急需改变自己传统IT架构的企业而言,面对海量数据,如何分析并有效利用其价值,同时优化企业业务已成为现代企业转型过程中不可避免的问题。 作为海量数据处理的一...

疯狂的流浪
2011/07/01
3.9K
6
Giraph作业控制及其容错的问题

@run6.13 你好,想跟你请教个问题:我现在做一个类似Giraph的作业控制中心,我下载了它的源码但是不知从哪里看起,我做的作业控制中心也是能够建立在工作节点上,然后能够通过zookeeper进行领...

刘佳男
2013/07/05
223
1
【电子书】Hadoop实战手册 (样章第一章)

Hadoop实战手册 [美] Jonathan R. Owens,Jon Lentz,Brian Femiano 著; 傅杰,赵磊,卢学裕 译 内容简介   这是一本Hadoop实用手册,主要针对实际问题给出相应的解决方案。《Hadoop实战手...

dwf07223
2018/06/28
0
0
开源图谱数据库 Neo4j 获 2000万 美元投资

开源图谱数据库 Neo4j 的商业版发行公司Neo Technology近日获得2000万美元新一轮融资,累计融资金额达到4500万美元(Neo Technology目前员工数量约80人) 图谱数据库是图谱分析(又称图论分析...

oschina
2015/01/16
4.5K
8

没有更多内容

加载失败,请刷新页面

加载更多

二叉查找树的第 K 个结点

private TreeNode ret;private int cnt = 0;public TreeNode KthNode(TreeNode pRoot, int k) { inOrder(pRoot, k); return ret;}private void inOrder(TreeNode root......

Garphy
54分钟前
4
0
windo8 weblogic

需要的软件包 现在安装jdk 则先进入你电脑自带jdk \bin目录下 然后java -jar 执行你的jar包就可以了 欢迎界面直接点击下一步,跳到更新界面,直接选择跳过 然后选择安装目录(注意:目录不要有...

恩多
今天
8
0
Activiti 批注

Activiti添加批注(comment)信息 在每次提交任务的时候需要描述一些批注信息,例如:请假流程提交的时候要描述信息为什么请假,如果领导驳回可以批注驳回原因等  1、添加批注 // 由于流程...

奔跑的android
今天
4
0
centos7命令行和图形界面的相互切换

最近安装了centos7,发现在命令行和图形界面的相互切换命令上,与centos以往版本有很大不同。 1,centos7默认安装后,跟其他版本一样,启动默认进入图形界面; 2,在图形化桌面,右击鼠标,选...

无名氏的程序员
今天
6
0
快速失败 (fail-fast) 和安全失败 (fail-safe) 的区别是什么

一:快速失败(fail—fast) 在用迭代器遍历一个集合对象时,如果遍历过程中对集合对象的内容进行了修改(增加、删除、修改),则会抛出Concurrent Modification Exception。 原理:迭代器在...

Bb进阶
今天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部