文档章节

Flume学习-自定义Source

Endless2010
 Endless2010
发布于 2017/08/28 21:37
字数 437
阅读 31
收藏 0
点赞 0
评论 0

自定义Source

FLume Source 有PollableSource和EventDrivenSource,启动Source时会判断

输入图片说明

EventDrivenSourceRunner

EventDrivenSourceRunner启动后调用source的start()就完了,自定义Event类型Source时,实现 EventDrivenSource接口即可

输入图片说明

PollableSourceRunner

Pollable类型的Source启动后会起一个新的线程,一直调用Source的process()方法. 这个方法可以返回Status.READY或Status.BACKOFF,如果返回READY,这次调用就结束,如果返回BACKOFF,则表示遇到了问题或者没有数据等异常情形,这时PollingRunner就会sleep一段时间

输入图片说明

测试程序

定义了2个source,一个Event类型,一个Pollable类型,都是每过一段时间打印helloworld

agent.sources = mySource1 mySource2
agent.channels = memoryChannel
agent.sinks = loggerSink

agent.sources.mySource1.type = com.endless.flume.MyPollableSource
agent.sources.mySource1.message = pollable
agent.sources.mySource1.channels = memoryChannel

agent.sources.mySource2.type = com.endless.flume.MyEventSource
agent.sources.mySource2.message = event
agent.sources.mySource2.period = 3
agent.sources.mySource2.channels = memoryChannel

agent.channels.memoryChannel.type = memory 

agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memoryChannel
public class MyEventSource extends AbstractSource implements EventDrivenSource,
		Configurable {
	private static final Logger logger = LoggerFactory.getLogger(MyEventSource.class);

	private String message = "helloworld";
	private int period = 1;
	private SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");   
    
	@Override
	public void configure(Context context) {
    	//读取配置文件中lines配置的值
    	logger.info("MyEventSource reading context..");
    	message=context.getString("message");
    	period=context.getInteger("period");
	}

	@Override
	public synchronized void start() {
		logger.info("MyEventSource start");			
		while(true){
			HashMap<String, String> headers = new HashMap<String, String>();  
			headers.put("time", formatter.format(new Date()));
		    getChannelProcessor().processEvent(
		        EventBuilder.withBody(message, Charset.forName("UTF-8"), headers)); 			
		    try {
				TimeUnit.SECONDS.sleep(period);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

	}

	@Override
	public synchronized void stop() {
		logger.info("MyEventSource stop");
	}
}

public class MyPollableSource extends AbstractSource implements Configurable, PollableSource {  
	private static final Logger logger = LoggerFactory.getLogger(MyPollableSource.class);
	private String message = "helloworld";
	private SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	private boolean status=true;
    
@Override  
    public Status process() throws EventDeliveryException {
    	status=!status;//测试用,模拟每秒1次消息
    	if(status)
    		return Status.BACKOFF;
        HashMap<String, String> headers = new HashMap<String, String>();  
        headers.put("time", formatter.format(new Date()));        	
        getChannelProcessor().processEvent(
        		EventBuilder.withBody(message, Charset.forName("UTF-8"), headers)); 
        return Status.READY;  
    }  
  
    @Override  
    public void configure(Context context) {  
    	//读取配置文件中lines配置的值
    	logger.info("MyPollableSource reading context..");
    	message=context.getString("message");
    }
    
    @Override  
    public long getBackOffSleepIncrement() {  
        return 1000;  //每次返回BACKOFF sleep 1秒
    }  
  
    @Override  
    public long getMaxBackOffSleepInterval() {  
        return 10000;  //返回BACKOFF 最多sleep 10 秒
    }  
}  

输入图片说明

© 著作权归作者所有

共有 人打赏支持
Endless2010
粉丝 1
博文 25
码字总数 23027
作品 0
南京
程序员
阿里大数据工程师教你怎样理解Flume

lume是干什么的? 收集日志的 flume如何搜集日志? 我们把flume比作情报人员 (1)搜集信息 (2)获取记忆信息 (3)传递报告间谍信息 flume是怎么完成上面三件事情的,三个组件: source: ...

JAVA丶学习 ⋅ 04/14 ⋅ 0

flume 总结--flume入门介绍

flume介绍 flume被设计为一个灵活的分布式系统,可以很容易的扩展,而且是高度可定制化的,一个配置正确的Flume Agent和由互相连接的Agent创建的Agent管道,保证不会丢失数据,提供持久的cha...

u013362353 ⋅ 05/28 ⋅ 0

其他消息中间件及场景应用(下3)

版权声明:欢迎转载,但是看在我辛勤劳动的份上,请注明来源:http://blog.csdn.net/yinwenjie(未经允许严禁用于商业用途!) https://blog.csdn.net/yinwenjie/article/details/51516329 目...

yunlielai ⋅ 04/15 ⋅ 0

【Strom篇】Flume+Kafaka+Strom整合完成信息记录

第一部分:流程分析 由flume收集客户端发送的信息,经过kafka集群消费者收集,然后给storm集群数据处理(数据清洗),最后再发给kafka集群收集。 第二部分:集群搭建 flume+kafka 一、配合f...

chenshi_2753 ⋅ 05/06 ⋅ 0

Flume---大数据协作框架

flume是什么 Apache Flume是一个分布式的、可靠的、易用的系统,可以有效地将来自很多不同源系统的大量日志数据收集、汇总或者转移到一个数据中心存储。 Apache Flume的作用不仅限于日志汇总...

简心 ⋅ 05/06 ⋅ 0

Kafka实战-Flume到Kafka

1.概述   前面给大家介绍了整个Kafka项目的开发流程,今天给大家分享Kafka如何获取数据源,即Kafka生产数据。下面是今天要分享的目录: 数据来源 Flume到Kafka 数据源加载 预览   下面开...

smartloli ⋅ 2015/07/02 ⋅ 0

Flume pull方式和push方式整合

Pull方式 Flume Agent 编写 启动Flume Push方式 Flume Agent的编写 启动flume ==注意在本地和服务器上切换的时候需要修改flume的sink的hostname== 本地测试总结 启动SparkStreaming作业 启动...

Meet相识_bfa5 ⋅ 前天 ⋅ 0

解决Flume采集数据时在HDFS上产生大量小文件的问题

问题:flume指定HDFS类型的Sink时,采集数据至HDFS指定目录,会产生大量小文件。 问题重现: 1、创建flume配置文件flume-env.sh,: flume配置文件如下(根据自身需要修改): 因为flume可以...

舒运 ⋅ 06/10 ⋅ 0

kafka来读取flume的数据

一、查看kafka topic ./kafka-topics.sh --list --zookeeper bigdata-test-3:2181, bigdata-test-2:2181, bigdata-test-1:2181, bigdata-test-4:2181, bigdata-test-5:2181 ./kafka-topics.s......

weixin_41876523 ⋅ 05/24 ⋅ 0

flume_kafkaChannel_kafkaSink

agent.sources = source 抽取类型为目录 agent.sources.source.type = spooldir 抽取的文件目录 agent.sources.source.spoolDir = /root/tmp/flume/data 添加一个存储绝对路径文件名的头 ag...

tanj123 ⋅ 04/17 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

C++内存映射文件居然是这样?!

内存映射文件大家都时不时听过,但它到底是个什么?赶紧来看看吧 内存映射文件到底是干嘛的呢?让我们先来思考下面几个问题: 如果您想读的内容大于系统分配的内存块怎么办?如果您想搜索的字...

柳猫 ⋅ 32分钟前 ⋅ 0

MySQL 数据库设计总结

规则1:一般情况可以选择MyISAM存储引擎,如果需要事务支持必须使用InnoDB存储引擎。 注意:MyISAM存储引擎 B-tree索引有一个很大的限制:参与一个索引的所有字段的长度之和不能超过1000字节...

OSC_cnhwTY ⋅ 今天 ⋅ 0

多线程(四)

线程池和Exector框架 什么是线程池? 降低资源的消耗 提高响应速度,任务:T1创建线程时间,T2任务执行时间,T3线程销毁时间,线程池没有或者减少T1和T3 提高线程的可管理性。 线程池要做些什...

这很耳东先生 ⋅ 今天 ⋅ 0

使用SpringMVC的@Validated注解验证

1、SpringMVC验证@Validated的使用 第一步:编写国际化消息资源文件 编写国际化消息资源ValidatedMessage.properties文件主要是用来显示错误的消息定制 [java] view plain copy edit.userna...

瑟青豆 ⋅ 今天 ⋅ 0

19.压缩工具gzip bzip2 xz

6月22日任务 6.1 压缩打包介绍 6.2 gzip压缩工具 6.3 bzip2压缩工具 6.4 xz压缩工具 6.1 压缩打包介绍: linux中常见的一些压缩文件 .zip .gz .bz2 .xz .tar .gz .tar .bz2 .tar.xz 建立一些文...

王鑫linux ⋅ 今天 ⋅ 0

6. Shell 函数 和 定向输出

Shell 常用函数 简洁:目前没怎么在Shell 脚本中使用过函数,哈哈,不过,以后可能会用。就像java8的函数式编程,以后获取会用吧,行吧,那咱们简单的看一下具体的使用 Shell函数格式 linux ...

AHUSKY ⋅ 今天 ⋅ 0

单片机软件定时器

之前写了一个软件定时器,发现不够优化,和友好,现在重写了 soft_timer.h #ifndef _SOFT_TIMER_H_#define _SOFT_TIMER_H_#include "sys.h"typedef void (*timer_callback_function)(vo...

猎人嘻嘻哈哈的 ⋅ 今天 ⋅ 0

好的资料搜说引擎

鸠摩搜书 简介:鸠摩搜书是一个电子书搜索引擎。它汇集了多个网盘和电子书平台的资源,真所谓大而全。而且它还支持筛选txt,pdf,mobi,epub、azw3格式文件。还显示来自不同网站的资源。对了,...

乔三爷 ⋅ 今天 ⋅ 0

Debian下安装PostgreSQL的表分区插件pg_pathman

先安装基础的编译环境 apt-get install build-essential libssl1.0-dev libkrb5-dev 将pg的bin目录加入环境变量,主要是要使用 pg_config export PATH=$PATH:/usr/lib/postgresql/10/bin 进......

玛雅牛 ⋅ 今天 ⋅ 0

inno安装

#define MyAppName "HoldChipEngin" #define MyAppVersion "1.0" #define MyAppPublisher "Hold Chip, Inc." #define MyAppURL "http://www.holdchip.com/" #define MyAppExeName "HoldChipE......

backtrackx ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部