文档章节

IBatchSpout API

岩之有理
 岩之有理
发布于 2014/12/10 17:47
字数 384
阅读 67
收藏 0

IBatchSpout是storm trident推出的一种可以批量发射的Spout。非事务性,基本的spout

1:Map getComponentConfiguration();定义配置,可以用backtype.storm.Config。

2:void open(Map conf, TopologyContext context); Spout的初始化方法 ,参数conf即是getComponentConfiguration定义的配置

3:Fields getOutputFields(); 声明输出的fields

4:void emitBatch(long batchId, TridentCollector collector); 批量发射tuple,本次的批次号为batchId

5:void ack(long batchId);批次号为batchId的数据处理成功

6:  void close();

一个例子

package storm.projectA;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IBatchSpout;
import backtype.storm.Config;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class MySpout implements IBatchSpout{
 /**
  * 
  */
 private static final long serialVersionUID = 1L;
 private long maxBatchSize;//每批次最大的数量
 private BufferedReader br;//源文件流
 HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();//保存发送过的所有数据,以便于重复发送
 /**
  * @param conf 配置
  * @param context 
  */
 @Override
 public void open(Map conf, TopologyContext context) {
  String filePath = (String)conf.get("filePath");
  maxBatchSize = (Long)conf.get("maxBatchSize");
  try {
   br = new BufferedReader(new FileReader(filePath));
  } catch (FileNotFoundException e) {
   e.printStackTrace();
  }
 }
 /*** spout的发送方法
  * @param batchId 批次id
  * @param collector 批量发射器
  */
 @Override
 public void emitBatch(long batchId, TridentCollector collector) {
  List<List<Object>> batch = batches.get(batchId);
  if (batch == null) {
   batch = new ArrayList<List<Object>>();
   for (int i = 0; i < maxBatchSize; i++) {
    try {
     String line = br.readLine();
     if(line == null){
      break;
     }
     batch.add(new Values(line));
    } catch (IOException e) {
     e.printStackTrace();
    }
   }
  }
  for(List<Object> list : batch){
            collector.emit(list);
        }
 }
 @Override
 public void ack(long batchId) {
  batches.remove(batchId);
 }
 /**
  * close 方法
  */
 @Override
 public void close() {
  if(br!=null){
   try {
    br.close();
   } catch (IOException e) {
    e.printStackTrace();
   }
  }
  
 }
 @Override
 public Map getComponentConfiguration() {
  Config conf = new Config();
  //最大并行度 本地模式设置为1
  conf.setMaxTaskParallelism(1);
  conf.put("filePath", "D:\\aaa.txt");
  conf.put("maxBatchSize", 2);
  return conf;
 }
 /**
  * 输出的fileds
  */
 @Override
 public Fields getOutputFields() {
  return new Fields("sentence");
 }
}

© 著作权归作者所有

岩之有理
粉丝 7
博文 18
码字总数 6134
作品 0
徐汇
高级程序员
私信 提问
聊聊storm trident的coordinator

序 本文主要研究一下storm trident的coordinator 实例 代码示例 这里使用的spout为FixedBatchSpout,它是IBatchSpout类型 拓扑图 MasterBatchCoordinator storm-1.2.2/storm-core/src/jvm/or...

go4it
2018/11/10
23
0
聊聊storm TridentTopology的构建

序 本文主要研究一下storm TridentTopology的构建 实例 后面的分析为了简单起见,很多是依据这个实例来 TridentTopology.newStream storm-core-1.2.2-sources.jar!/org/apache/storm/trident...

go4it
2018/11/09
24
0
如何用Google APIs和Google的应用系统进行集成(2)----Google APIs的所有的RESTFul服务一览

上篇文章,我提到了,Google APIs暴露了86种不同种类和版本的API。我们可以通过在浏览器里面输入https://www.googleapis.com/discovery/v1/apis这个URL地址,其将会把所有Google API支持的不...

chancein007
2014/06/02
0
0
springboot整合redis使用Lettuceke客户端超时问题

问题:项目启动后,redis可正常运行几分钟,之后就连接不上报超时。重启工程后又可用一段时间。请教各位大牛 1. 包版本: springboot2.1.2.RELEASE spring-boot-starter-data-redis2.1.2.RE...

Chance_
2019/06/19
4.7K
6
各类免费的API分享

今天在找API的时候,看到的超多免费的各类API ,http://jammk.iteye.com/blog/2331084,我大概看了下,无限次,但是需要注册并认证。 各类免费的API接口分享: 手机号码归属地API:https://w...

夜晚晚
2016/10/24
2K
0

没有更多内容

加载失败,请刷新页面

加载更多

Vue造轮子-Tabs测试(下)

1. 如果g-tabs里面不是g-tabs-head,g-tabs-body期望会报错。 // 目前没有报错,所以先改 // tabs.vue if(this.$children.length===0){ // 这个$children是看子组件,不是子元...

ories
17分钟前
20
0
解决与二进制文件的Git冲突

我一直在Windows上使用Git(msysgit)来跟踪我一直在做的一些设计工作的变化。 今天我一直在使用不同的PC(使用远程repo brian ),我现在正尝试将今天完成的编辑合并到我的笔记本电脑上的常...

javail
17分钟前
39
0
忽略本地更改时会拉动Git?

有没有办法做一个git pull来忽略任何本地文件的更改,而又不浪费目录,也不必执行git clone ? #1楼 如果您使用的是Linux: git fetchfor file in `git diff origin/master..HEAD --name-o...

技术盛宴
32分钟前
48
0
Linux云主机安全加固

背景 最近在登录自己的云主机的时候,遇到了自己的机器被恶意的登录了几百次,如 Last failed login: Tue Feb 10 23:32:08 EST 2019 from xxx There were 166 failed login attempts since ...

项昂之
57分钟前
73
0
Java – Try with Resources

1. Overview Support for try-with-resources – introduced in Java 7 – allows us to declare resources to be used in a try block with the assurance that the resources will be clo......

Ciet
今天
53
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部