文档章节

No output fields defined for component:xxx::defaul

Beaver_
 Beaver_
发布于 2015/04/26 10:08
字数 955
阅读 436
收藏 1

        学习jstorm过程中,碰到一问题:


 ERROR com.alibaba.jstorm.daemon.supervisor.SyncProcessEvent  - Failed Sync Process

java.lang.IllegalArgumentException: No output fields defined for component:stream wordNormalizer_componentId:default

at backtype.storm.task.GeneralTopologyContext.getComponentOutputFields(GeneralTopologyContext.java:114)

at backtype.storm.task.TopologyContext.getThisOutputFields(TopologyContext.java:157)

at com.alibaba.jstorm.cluster.Common.outbound_components(Common.java:600)

at com.alibaba.jstorm.task.Task.makeSendTargets(Task.java:133)

at com.alibaba.jstorm.task.Task.echoToSystemBolt(Task.java:162)

at com.alibaba.jstorm.task.Task.execute(Task.java:244)

at com.alibaba.jstorm.task.Task.mk_task(Task.java:289)

at com.alibaba.jstorm.daemon.worker.Worker.createTasks(Worker.java:123)

at com.alibaba.jstorm.daemon.worker.Worker.execute(Worker.java:218)

at com.alibaba.jstorm.daemon.worker.Worker.mk_worker(Worker.java:258)

at com.alibaba.jstorm.daemon.supervisor.SyncProcessEvent.launchWorker(SyncProcessEvent.java:402)

at com.alibaba.jstorm.daemon.supervisor.SyncProcessEvent.startNewWorkers(SyncProcessEvent.java:828)

at com.alibaba.jstorm.daemon.supervisor.SyncProcessEvent.run(SyncProcessEvent.java:157)

at com.alibaba.jstorm.event.EventManagerImpExecute.run(EventManagerImpExecute.java:38)

at java.lang.Thread.run(Thread.java:745)


    最后折腾了一下,改变了一下数据流分组就不报这个错误了。

情况如下:

1. spout产生的数据流带了streamId,

即:

collector.emit(Chapter2CommonConstant.wordProducer_streamId, new Values(random));
	
	declarer.declareStream(Chapter2CommonConstant.wordProducer_streamId, new Fields(Chapter2CommonConstant.wordProducer_fields));

   2.bolt产生的数据流也定义了入spout的数据流定义。

3. 拓扑数据流流向配置:

topologyBuilder.setBolt(Chapter2CommonConstant.wordNormalizer_componentId, new WordNormalizerBolt(), 1)
				.fieldsGrouping(Chapter2CommonConstant.wordProducer_componentId, new
						Fields(Chapter2CommonConstant.wordProducer_fields));

		topologyBuilder.setBolt(Chapter2CommonConstant.wordCounter_componentId, new WordCounterBolt(), 1)
				.fieldsGrouping(Chapter2CommonConstant.wordNormalizer_componentId, new
						Fields(Chapter2CommonConstant.wordNormalizer_fields));


这种情况下会报本文给出的异常。

解决办法:

3中拓扑数据流流向配置改为:就不会出现异常。

topologyBuilder.setBolt(Chapter2CommonConstant.wordNormalizer_componentId, new WordNormalizerBolt(), 1)
				.shuffleGrouping(Chapter2CommonConstant.wordProducer_componentId, Chapter2CommonConstant.wordProducer_streamId);

		topologyBuilder.setBolt(Chapter2CommonConstant.wordCounter_componentId, new WordCounterBolt(), 1)
				.shuffleGrouping(Chapter2CommonConstant.wordNormalizer_componentId, Chapter2CommonConstant.wordNormalizer_streamId);

完整代码:

package com.doctor.ebook.getting_started_with_storm;

/**
 * @author doctor
 *
 * @time 2015年4月25日 下午10:59:20
 */
public final class Chapter2CommonConstant {

	public static final String wordProducer_componentId = "wordProducer_componentId";
	public static final String wordProducer_streamId = "wordProducer_streamId";
	public static final String wordProducer_fields = "wordProducer_randomString";

	public static final String wordNormalizer_componentId = "wordNormalizer_componentId";
	public static final String wordNormalizer_streamId = "wordNormalizer_streamId";
	public static final String wordNormalizer_fields = "wordNormalizer_fields";

	public static final String wordCounter_componentId = "wordCounter_componentId";
}
package com.doctor.ebook.getting_started_with_storm;

import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang.RandomStringUtils;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import com.doctor.common.ContextBaseRichSpout;

/**
 * @author doctor
 *
 * @time 2015年4月25日 下午10:39:21
 */
public class WordProducerSpout extends ContextBaseRichSpout {
	private static final long serialVersionUID = -930888930597360858L;
	private String content = "A spout emits a list of defined fields. This architecture allows you to have" +
			"different kinds of bolts reading the same spout stream, which can then" +
			"define fields for other bolts to consume and so on";

	/**
	 * open is the first method called in any spout.
	 * 
	 * The parameters it receives are the TopologyContext, which contains all our topology data; the conf object, which is created
	 * in the topology definition; and the SpoutOutputCollector, which enables us to emit the data that will be processed by the
	 * bolts.
	 */
	@Override
	public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
		super.open(conf, context, collector);
	}

	/**
	 * from this method, we’ll emit values to be processed by the bolts.
	 */
	@Override
	public void nextTuple() {
		String random = RandomStringUtils.random(6, content);
		try {
			TimeUnit.SECONDS.sleep(1);
			collector.emit(Chapter2CommonConstant.wordProducer_streamId, new Values(random));
			log.info("WordProducerSpout:" + random);
		} catch (InterruptedException e) {
			log.error("TimeUnit.SECONDS.sleep.error", e);
		}

	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declareStream(Chapter2CommonConstant.wordProducer_streamId, new Fields(Chapter2CommonConstant.wordProducer_fields));

	}

}
package com.doctor.ebook.getting_started_with_storm;

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import com.doctor.common.ContextBaseRichBolt;

/**
 * @author doctor
 *
 * @time 2015年4月25日 下午11:14:27
 */
public class WordNormalizerBolt extends ContextBaseRichBolt {
	private static final long serialVersionUID = -1244951787400604294L;

	@Override
	public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context, OutputCollector collector) {
		super.prepare(stormConf, context, collector);
	}

	/**
	 * The bolt will receive the line from the words file and process it to Normalize this line
	 * 
	 * The normalize will be put the words in lower case
	 */
	@Override
	public void execute(Tuple input) {
		if (Chapter2CommonConstant.wordProducer_componentId.equals(input.getSourceComponent()) &&
				Chapter2CommonConstant.wordProducer_streamId.equals(input.getSourceStreamId())) {
			String field = input.getStringByField(Chapter2CommonConstant.wordProducer_fields);
			log.info("WordNormalizer.execute:" + field);
			field = field.toLowerCase();
			collector.emit(Chapter2CommonConstant.wordNormalizer_streamId, new Values(field));
		}

	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		declarer.declareStream(Chapter2CommonConstant.wordNormalizer_streamId, new Fields(Chapter2CommonConstant.wordNormalizer_fields));

	}

}
package com.doctor.ebook.getting_started_with_storm;

import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;

import com.doctor.common.ContextBaseRichBolt;

/**
 * @author doctor
 *
 * @time 2015年4月25日 下午11:35:05
 */
public class WordCounterBolt extends ContextBaseRichBolt {
	private static final long serialVersionUID = 8157872805076023917L;
	private Map<String, Integer> counters;

	@Override
	public void prepare(@SuppressWarnings("rawtypes") Map stormConf, TopologyContext context, OutputCollector collector) {
		super.prepare(stormConf, context, collector);
		counters = new HashMap<>();
	}

	@Override
	public void execute(Tuple input) {
		if (Chapter2CommonConstant.wordNormalizer_componentId.equals(input.getSourceComponent()) &&
				Chapter2CommonConstant.wordNormalizer_streamId.equals(input.getSourceStreamId())) {

			String field = input.getStringByField(Chapter2CommonConstant.wordNormalizer_fields);
			if (counters.containsKey(field)) {
				Integer num = counters.get(field);
				counters.put(field, num + 1);
				log.info("WordCounterBolt.execute:" + field + ":" + num + 1);
			} else {
				counters.put(field, 1);
				log.info("WordCounterBolt.execute:" + field + ":" + 1);

			}
		}

	}

	@Override
	public void cleanup() {
		counters.clear();
		super.cleanup();
	}

	@Override
	public void declareOutputFields(OutputFieldsDeclarer declarer) {
		// TODO Auto-generated method stub

	}

}
package com.doctor.ebook.getting_started_with_storm;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;

/**
 * @author doctor
 *
 * @time 2015年4月25日 下午10:34:14
 */
public class Chapter2TopologyMain {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		TopologyBuilder topologyBuilder = new TopologyBuilder();
		topologyBuilder.setSpout(Chapter2CommonConstant.wordProducer_componentId, new WordProducerSpout(), 1);

		// topologyBuilder.setBolt(Chapter2CommonConstant.wordNormalizer_componentId, new WordNormalizerBolt(), 1)
		// .fieldsGrouping(Chapter2CommonConstant.wordProducer_componentId, new
		// Fields(Chapter2CommonConstant.wordProducer_fields));
		//
		// topologyBuilder.setBolt(Chapter2CommonConstant.wordCounter_componentId, new WordCounterBolt(), 1)
		// .fieldsGrouping(Chapter2CommonConstant.wordNormalizer_componentId, new
		// Fields(Chapter2CommonConstant.wordNormalizer_fields));

		topologyBuilder.setBolt(Chapter2CommonConstant.wordNormalizer_componentId, new WordNormalizerBolt(), 1)
				.shuffleGrouping(Chapter2CommonConstant.wordProducer_componentId, Chapter2CommonConstant.wordProducer_streamId);

		topologyBuilder.setBolt(Chapter2CommonConstant.wordCounter_componentId, new WordCounterBolt(), 1)
				.shuffleGrouping(Chapter2CommonConstant.wordNormalizer_componentId, Chapter2CommonConstant.wordNormalizer_streamId);

		LocalCluster localCluster = new LocalCluster();

		Config conf = new Config();
		conf.setDebug(true);
		conf.setNumWorkers(1);
		localCluster.submitTopology("Chapter2TopologyMain", conf, topologyBuilder.createTopology());

	}

}

注释掉的数据流配置会出现本文给出的异常。

这个由于对jstorm源码没研究,也不知道是怎么回事。

附:fieldsGrouping用参数最多的那个,包含最全信息,也不报错。难道是这个原因

// 3.
		topologyBuilder.setBolt(Chapter2CommonConstant.wordNormalizer_componentId, new WordNormalizerBolt(), 1)
				.fieldsGrouping(Chapter2CommonConstant.wordProducer_componentId, Chapter2CommonConstant.wordProducer_streamId, new Fields(Chapter2CommonConstant.wordProducer_fields));

		topologyBuilder.setBolt(Chapter2CommonConstant.wordCounter_componentId, new WordCounterBolt(), 1)
				.fieldsGrouping(Chapter2CommonConstant.wordNormalizer_componentId, Chapter2CommonConstant.wordNormalizer_streamId, new Fields(Chapter2CommonConstant.wordNormalizer_fields));


© 著作权归作者所有

Beaver_
粉丝 24
博文 136
码字总数 36878
作品 0
杨浦
程序员
私信 提问
Orange的扩展插件Widgets开发(六)-OWWidget

Orange的扩展插件Widgets开发(六) -OWWidget The is the main component for implementing a widget in the Orange Canvas workflow. It both defines the widget input/output capabilit......

openthings
2016/01/02
94
0
Google的JSON类库 Gson开发者指南

由于 site.google.com 被墙,本人特意翻墙出去扒了分 User Guide 回来,不过是英文的。 Gson User Guide Contents 1 Authors: Inderjeet Singh, Joel Leitch 1.1 Overview 1.2 Goals for Gs......

红薯
2009/12/31
11.7K
13
Spring注解注解解析

我们在使用spring的时候经常会用到这些注解,那么这些注解到底有什么区别呢。我们先来看代码 同样分三层来看: Action 层: package com.ulewo.ioc;import org.springframework.beans.facto...

Rickxue
2016/01/22
173
0
How to Invoke a Mule Flow From Java

In this article, we will learn how to invoke Java components from a Mule flow, along with calling a Mule flow from Java component while following the standards of centralizing c......

Vishnu Ramakrishnan
2017/12/18
0
0
聊聊storm的JoinBolt

序 本文主要研究一下storm的JoinBolt 实例 JoinBolt storm-2.0.0/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java JoinBolt继承了BaseWindowedBolt,定义了Selector selectorType......

go4it
2018/10/26
20
0

没有更多内容

加载失败,请刷新页面

加载更多

OpenStack 简介和几种安装方式总结

OpenStack :是一个由NASA和Rackspace合作研发并发起的,以Apache许可证授权的自由软件和开放源代码项目。项目目标是提供实施简单、可大规模扩展、丰富、标准统一的云计算管理平台。OpenSta...

小海bug
昨天
6
0
DDD(五)

1、引言 之前学习了解了DDD中实体这一概念,那么接下来需要了解的就是值对象、唯一标识。值对象,值就是数字1、2、3,字符串“1”,“2”,“3”,值时对象的特征,对象是一个事物的具体描述...

MrYuZixian
昨天
6
0
数据库中间件MyCat

什么是MyCat? 查看官网的介绍是这样说的 一个彻底开源的,面向企业应用开发的大数据库集群 支持事务、ACID、可以替代MySQL的加强版数据库 一个可以视为MySQL集群的企业级数据库,用来替代昂贵...

沉浮_
昨天
6
0
解决Mac下VSCode打开zsh乱码

1.乱码问题 iTerm2终端使用Zsh,并且配置Zsh主题,该主题主题需要安装字体来支持箭头效果,在iTerm2中设置这个字体,但是VSCode里这个箭头还是显示乱码。 iTerm2展示如下: VSCode展示如下: 2...

HelloDeveloper
昨天
7
0
常用物流快递单号查询接口种类及对接方法

目前快递查询接口有两种方式可以对接,一是和顺丰、圆通、中通、天天、韵达、德邦这些快递公司一一对接接口,二是和快递鸟这样第三方集成接口一次性对接多家常用快递。第一种耗费时间长,但是...

程序的小猿
昨天
10
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部