文档章节

IBatchSpout API

岩之有理
 岩之有理
发布于 2014/12/10 17:47
字数 384
阅读 59
收藏 0
点赞 0
评论 0

IBatchSpout是storm trident推出的一种可以批量发射的Spout。非事务性,基本的spout

1:Map getComponentConfiguration();定义配置,可以用backtype.storm.Config。

2:void open(Map conf, TopologyContext context); Spout的初始化方法 ,参数conf即是getComponentConfiguration定义的配置

3:Fields getOutputFields(); 声明输出的fields

4:void emitBatch(long batchId, TridentCollector collector); 批量发射tuple,本次的批次号为batchId

5:void ack(long batchId);批次号为batchId的数据处理成功

6:  void close();

一个例子

package storm.projectA;
import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.IBatchSpout;
import backtype.storm.Config;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
public class MySpout implements IBatchSpout{
 /**
  * 
  */
 private static final long serialVersionUID = 1L;
 private long maxBatchSize;//每批次最大的数量
 private BufferedReader br;//源文件流
 HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>();//保存发送过的所有数据,以便于重复发送
 /**
  * @param conf 配置
  * @param context 
  */
 @Override
 public void open(Map conf, TopologyContext context) {
  String filePath = (String)conf.get("filePath");
  maxBatchSize = (Long)conf.get("maxBatchSize");
  try {
   br = new BufferedReader(new FileReader(filePath));
  } catch (FileNotFoundException e) {
   e.printStackTrace();
  }
 }
 /*** spout的发送方法
  * @param batchId 批次id
  * @param collector 批量发射器
  */
 @Override
 public void emitBatch(long batchId, TridentCollector collector) {
  List<List<Object>> batch = batches.get(batchId);
  if (batch == null) {
   batch = new ArrayList<List<Object>>();
   for (int i = 0; i < maxBatchSize; i++) {
    try {
     String line = br.readLine();
     if(line == null){
      break;
     }
     batch.add(new Values(line));
    } catch (IOException e) {
     e.printStackTrace();
    }
   }
  }
  for(List<Object> list : batch){
            collector.emit(list);
        }
 }
 @Override
 public void ack(long batchId) {
  batches.remove(batchId);
 }
 /**
  * close 方法
  */
 @Override
 public void close() {
  if(br!=null){
   try {
    br.close();
   } catch (IOException e) {
    e.printStackTrace();
   }
  }
  
 }
 @Override
 public Map getComponentConfiguration() {
  Config conf = new Config();
  //最大并行度 本地模式设置为1
  conf.setMaxTaskParallelism(1);
  conf.put("filePath", "D:\\aaa.txt");
  conf.put("maxBatchSize", 2);
  return conf;
 }
 /**
  * 输出的fileds
  */
 @Override
 public Fields getOutputFields() {
  return new Fields("sentence");
 }
}

© 著作权归作者所有

共有 人打赏支持
岩之有理
粉丝 7
博文 18
码字总数 6134
作品 0
徐汇
高级程序员
Postfix & Dovecot 管理工具--Posty

Posty 是一款基于 Web 的 Postfix & Dovecot 管理工具,功能模块包括:API、CLI 和 WebUI。 API 方法: Domains: GET - http://API-URL/api/v1/domains - get all domains GET - http://API-......

红薯 ⋅ 2014/07/17 ⋅ 0

如何用Google APIs和Google的应用系统进行集成(2)----Google APIs的所有的RESTFul服务一览

上篇文章,我提到了,Google APIs暴露了86种不同种类和版本的API。我们可以通过在浏览器里面输入https://www.googleapis.com/discovery/v1/apis这个URL地址,其将会把所有Google API支持的不...

chancein007 ⋅ 2014/06/02 ⋅ 0

Google 突然宣布将关闭一批 API!

让Android走向封闭我们还可以理解,但是似乎其它Google产品也在走向封闭,这就让我们很不解了。最近Google宣布将关闭一批API,被称为“春季大扫除”。 让Android走向封闭我们还可以理解,但是...

老枪 ⋅ 2011/06/01 ⋅ 8

Android适配全面总结(二)

上一篇文章讲了 屏幕适配 http://www.jianshu.com/p/7aa34434ad4d 这一篇文章讲一下 版本适配。 *在我们的开发中,会对不同安卓版本做适配,比如我之前做过的项目中最低兼容到4.4,最高兼容是...

阿韦爱Android ⋅ 2017/11/15 ⋅ 0

20 个对开发者非常有用的 Google APIs

Google 网站有各种各样的功能,多数功能都对外提供了 API 供访问,我们可以利用这些 API 来丰富自己的网站功能,比如 Google 地图 API 可为你的网站增加了地图功能,翻译 API 可增加翻译功能...

红薯 ⋅ 2011/08/02 ⋅ 8

Mozilla 发布 WebAPI 规范

Mozilla 刚刚发布了 WebAPI 项目. "WebAPI 由 Mozilla 努力为解决不同的浏览器中的差异而推出的一个统一一致的API,该API跟操作系统无关,目前正在进行规范草案以及参考实现的实施阶段,该项...

红薯 ⋅ 2011/08/23 ⋅ 3

Android 版本

Android 5.0.1开发电视。 Android 4.4W.2是Android官网发布的可穿戴设备的API,不支持EditText组件。 Android 5.1.1 API22 Android 5.0.1(L preview) API21 Android 4.4W.2 API20 Android 4.......

加速器 ⋅ 2016/04/15 ⋅ 0

Kubernetes API分类汇总

1. 资源对象 1.1. Namespace 1.2. Endpoints 1.3. Pod Pod操作: 1.4. ReplicationController 1.5. Node 1.6. Service 1.7. ResourceQuota 1.8. Secret 1.9. ServiceAccount 1.10. Persisten......

huwh_ ⋅ 2017/09/10 ⋅ 0

各类免费的API分享

今天在找API的时候,看到的超多免费的各类API ,http://jammk.iteye.com/blog/2331084,我大概看了下,无限次,但是需要注册并认证。 各类免费的API接口分享: 手机号码归属地API:https://w...

夜晚晚 ⋅ 2016/10/24 ⋅ 0

阿里云API精选手册,业界第一本

阿里云API精选手册(免费下载https://yq.aliyun.com/download/2350),为拓展API经济而生,内容出自阿里云自有产品与云生态精选产品场景化 APIs精选功能汇集、场景介绍、使用指导等等。100+ ...

仙游 ⋅ 2017/12/20 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Qt中的坑--QTreeWidget添加item 不能显示出来

QTreeWidget* pTree = ui.TreeCheckList; QTreeWidgetItem* item = new QTreeWidgetItem(pTree) ;item->setText ( 0, "test" );pTree->addTopLevelItem (item ); 原因是因为创建一个......

k91191 ⋅ 34分钟前 ⋅ 0

使用Guava的RateLimiter做限流

场景: 1. 在日常生活中,我们肯定收到过不少不少这样的短信,“京东最新优惠卷…”,“天猫送您…”。这种类型的短信是属于推广性质的短信。这种短信一般群发量会到千万级别。然而,要完成这...

wind2012 ⋅ 34分钟前 ⋅ 0

QSlider重新enterEvent

#ifndef DIALOG_H#define DIALOG_H#include <QDialog>namespace Ui {class Dialog;}class Dialog : public QDialog{ Q_OBJECTpublic: explicit Dialog(QW......

xxdd ⋅ 34分钟前 ⋅ 0

生产环境redis备份与恢复

生产环境redis备份与恢复 Tyrant0532 0人评论 1563人阅读 2018-02-01 20:34:10 redis是一个开源(BSD许可),内存存储的数据结构服务器,可用作数据库,高速缓存和消息队列代理。生产中我们主...

rootliu ⋅ 37分钟前 ⋅ 0

nginx中出现403forbidden错误

nginx “403 Forbidden” 错误 出现这个错误一般是因为以下原因: 网站禁止特定的用户访问所有内容,例:网站屏蔽某个ip访问。 访问禁止目录浏览的目录,例:设置autoindex off后访问目录。 ...

河图再现 ⋅ 37分钟前 ⋅ 0

上海云栖:金融政企行业的CDN最佳实践

摘要: 在刚刚结束的上海云栖大会飞天技术汇分论坛上,阿里云视频云产品架构师罗小飞进行了《阿里云CDN——面向金融政企的CDN最佳实践》主题分享,为上海的嘉宾介绍CDN的解决方案与技术服务体...

猫耳m ⋅ 42分钟前 ⋅ 0

docker 基本操作

docker介绍 Docker项目提供了构建在Linux内核功能之上,协同在一起的的高级工具。其目标是帮助开发和运维人员更容易地跨系统跨主机交付应用程序和他们的依赖。Docker通过Docker容器,一个安全...

haoyuehong ⋅ 43分钟前 ⋅ 0

上海云栖:金融政企行业的CDN最佳实践

摘要: 在刚刚结束的上海云栖大会飞天技术汇分论坛上,阿里云视频云产品架构师罗小飞进行了《阿里云CDN——面向金融政企的CDN最佳实践》主题分享,为上海的嘉宾介绍CDN的解决方案与技术服务体...

阿里云云栖社区 ⋅ 46分钟前 ⋅ 0

安装与配置hadoop

一、CentOS7安装 java8,参考centos7.0 安装java1.8,tomcat 二、安装hadoop 版本V3.03 1、下载并解压hadoop # mkdir /usr/local/app# mkdir /usr/local/app/hadoop# cd /usr/local/app/had......

iturtle ⋅ 47分钟前 ⋅ 0

Idea设置Serializable自动生成

File --> Settings --> Editor --> Inspections ->Serialization issues,在该项下找到“Serializable class without 'serialVersionUID' ”并勾选...

Gmupload ⋅ 50分钟前 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部