文档章节

Storm【压力测试】- 系列1: 进行简单的压力测试

止静
 止静
发布于 2014/10/16 14:51
字数 428
阅读 415
收藏 0

代码比较简单,看图说话

package storm.benchmark;

import backtype.storm.Config;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;
import java.util.Random;

public class ThroughputTest {
    public static class GenSpout extends BaseRichSpout {
        private static final Character[] CHARS = new Character[] { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'};
        
        SpoutOutputCollector _collector;
        int _size;
        Random _rand;
        String _id;
        String _val;
        
        public GenSpout(int size) {
            _size = size;
        }
        
        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            _collector = collector;
            _rand = new Random();
            _id = randString(5);
            _val = randString(_size);
        }

        @Override
        public void nextTuple() {
            _collector.emit(new Values(_id, _val));
            
        }

        private String randString(int size) {
            StringBuffer buf = new StringBuffer();
            for(int i=0; i<size; i++) {
                buf.append(CHARS[_rand.nextInt(CHARS.length)]);
            }
            return buf.toString();
        }
        
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "item"));
        }        
    }
    
    public static class IdentityBolt extends BaseBasicBolt {
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("id", "item"));
        }

        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            collector.emit(tuple.getValues());
        }        
    }

    public static class CountBolt extends BaseBasicBolt {
        int _count;
        
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("count"));
        }

        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            _count+=1;
            collector.emit(new Values(_count));
        }        
    }
    
    public static class AckBolt extends BaseBasicBolt {
        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
        }

        @Override
        public void execute(Tuple tuple, BasicOutputCollector collector) {
        }       
    }
    
    
    //storm jar storm-benchmark-0.0.1-SNAPSHOT-standalone.jar storm.benchmark.ThroughputTest demo 100 8 8 8 10000
    public static void main(String[] args) throws Exception {
        int size = Integer.parseInt(args[1]);
        int workers = Integer.parseInt(args[2]);
        int spout = Integer.parseInt(args[3]);
        int bolt = Integer.parseInt(args[4]);        
        int maxPending = Integer.parseInt(args[5]);
        
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new GenSpout(size), spout);
//        builder.setBolt("count", new CountBolt(), bolt)
//                .fieldsGrouping("bolt", new Fields("id"));
//        builder.setBolt("bolt", new IdentityBolt(), bolt)
//                .shuffleGrouping("spout");
        builder.setBolt("bolt2", new AckBolt(), bolt)
               .shuffleGrouping("spout");
//        builder.setBolt("count2", new CountBolt(), bolt)
//                .fieldsGrouping("bolt2", new Fields("id"));
        
        Config conf = new Config();
        conf.setNumWorkers(workers);
        //conf.setMaxSpoutPending(maxPending);
        conf.setNumAckers(0);
        conf.setStatsSampleRate(0.0001);
        //topology.executor.receive.buffer.size: 8192 #batched
        //topology.executor.send.buffer.size: 8192 #individual messages
        //topology.transfer.buffer.size: 1024 # batched
        
        //conf.put("topology.executor.send.buffer.size", 1024);
        //conf.put("topology.transfer.buffer.size", 8);
        //conf.put("topology.receiver.buffer.size", 8);
        //conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xdebug -Xrunjdwp:transport=dt_socket,address=1%ID%,server=y,suspend=n");
        
        StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }
}


© 著作权归作者所有

止静
粉丝 122
博文 134
码字总数 125762
作品 0
东城
技术主管
私信 提问
Storm入门 第三章 Storm安装部署步骤

本文以Twitter Storm官方Wiki为基础,详细描述如何快速搭建一个Storm集群,其中,项目实践中遇到的问题及经验总结,在相应章节以“注意事项”的形式给出。 3.1 Storm集群组件 Storm集群中包含...

坏坏一笑
2014/12/03
114
0
Apache Storm 1.2.3 发布,分布式实时计算

Apache Storm 1.2.3 发布了,更新内容如下: 新特性 [STORM-3233] - zookeeper 客户端升级到最新版本 (3.4.13) 改进 [STORM-3077] - Disruptor 升级至 3.3.11 [STORM-3083] - HikariCP 升级至...

xplanet
2019/07/20
870
0
Apache Storm 2.0.0 发布,基于 Java ​​​​​​​的新架构

Apache Storm 2.0.0 发布了,距离它上次更新已过去一年,新版本在性能、新功能和与外部系统的集成方面进行了重大改进,下面是一些主要功能及改进: 用 Java 实现的新架构 在之前的版本中,S...

xplanet
2019/06/03
3.1K
5
Apache Storm 0.9.7 发布,分布式实时计算

Apache Storm 0.9.7 发布了,Apache Storm 的前身是 Twitter Storm 平台,目前已经归于 Apache 基金会管辖。 Apache Storm 是一个免费开源的分布式实时计算系统。简化了流数据的可靠处理,像...

开源中国股侠
2016/09/08
815
1
Kafka实战-Storm Cluster

1.概述   在《Kafka实战-实时日志统计流程》一文中,谈到了Storm的相关问题,在完成实时日志统计时,我们需要用到Storm去消费Kafka Cluster中的数据,所以,这里我单独给大家分享一篇Sto...

smartloli
2015/06/18
0
0

没有更多内容

加载失败,请刷新页面

加载更多

MBTI助你成功,让你更了解你自己

MBTI助你成功,让你更了解你自己 生活总是一个七日接着又一个七日,相信看过第七日的小伙伴,很熟悉这段开场白,人生是一个测试接着又一个测试,上学的时候测试,是为了证明你的智力,可谓从...

蛤蟆丸子
今天
55
0
Android实现App版本自动更新

现在很多的App中都会有一个检查版本的功能。例如斗鱼TV App的设置界面下: 当我们点击检查更新的时候,就会向服务器发起版本检测的请求。一般的处理方式是:服务器返回的App版本与当前手机安...

shzwork
昨天
72
0
npm 发布webpack插件 webpack-html-cdn-plugin

初始化一个项目 npm init 切换到npm源 淘宝 npm config set registry https://registry.npm.taobao.org npm npm config set registry http://registry.npmjs.org 登录 npm login 登录状态......

阿豪boy
昨天
87
0
java基础(16)递归

一.说明 递归:方法内调用自己 public static void run1(){ //递归 run1(); } 二.入门: 三.执行流程: 四.无限循环:经常用 无限递归不要轻易使用,无限递归的终点是:栈内存溢出错误 五.递...

煌sir
昨天
63
0
REST接口设计规范总结

URI格式规范 URI中尽量使用连字符”-“代替下划线”_”的使用 URI中统一使用小写字母 URI中不要包含文件(脚本)的扩展名 URI命名规范 文档(Document)类型的资源用名词(短语)单数命名 集合(Co...

Treize
昨天
69
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部