文档章节

Flume学习-自定义Source

Endless2010
 Endless2010
发布于 2017/08/28 21:37
字数 437
阅读 34
收藏 0

自定义Source

FLume Source 有PollableSource和EventDrivenSource,启动Source时会判断

输入图片说明

EventDrivenSourceRunner

EventDrivenSourceRunner启动后调用source的start()就完了,自定义Event类型Source时,实现 EventDrivenSource接口即可

输入图片说明

PollableSourceRunner

Pollable类型的Source启动后会起一个新的线程,一直调用Source的process()方法. 这个方法可以返回Status.READY或Status.BACKOFF,如果返回READY,这次调用就结束,如果返回BACKOFF,则表示遇到了问题或者没有数据等异常情形,这时PollingRunner就会sleep一段时间

输入图片说明

测试程序

定义了2个source,一个Event类型,一个Pollable类型,都是每过一段时间打印helloworld

agent.sources = mySource1 mySource2
agent.channels = memoryChannel
agent.sinks = loggerSink

agent.sources.mySource1.type = com.endless.flume.MyPollableSource
agent.sources.mySource1.message = pollable
agent.sources.mySource1.channels = memoryChannel

agent.sources.mySource2.type = com.endless.flume.MyEventSource
agent.sources.mySource2.message = event
agent.sources.mySource2.period = 3
agent.sources.mySource2.channels = memoryChannel

agent.channels.memoryChannel.type = memory 

agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memoryChannel
public class MyEventSource extends AbstractSource implements EventDrivenSource,
		Configurable {
	private static final Logger logger = LoggerFactory.getLogger(MyEventSource.class);

	private String message = "helloworld";
	private int period = 1;
	private SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");   
    
	@Override
	public void configure(Context context) {
    	//读取配置文件中lines配置的值
    	logger.info("MyEventSource reading context..");
    	message=context.getString("message");
    	period=context.getInteger("period");
	}

	@Override
	public synchronized void start() {
		logger.info("MyEventSource start");			
		while(true){
			HashMap<String, String> headers = new HashMap<String, String>();  
			headers.put("time", formatter.format(new Date()));
		    getChannelProcessor().processEvent(
		        EventBuilder.withBody(message, Charset.forName("UTF-8"), headers)); 			
		    try {
				TimeUnit.SECONDS.sleep(period);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

	}

	@Override
	public synchronized void stop() {
		logger.info("MyEventSource stop");
	}
}

public class MyPollableSource extends AbstractSource implements Configurable, PollableSource {  
	private static final Logger logger = LoggerFactory.getLogger(MyPollableSource.class);
	private String message = "helloworld";
	private SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	private boolean status=true;
    
@Override  
    public Status process() throws EventDeliveryException {
    	status=!status;//测试用,模拟每秒1次消息
    	if(status)
    		return Status.BACKOFF;
        HashMap<String, String> headers = new HashMap<String, String>();  
        headers.put("time", formatter.format(new Date()));        	
        getChannelProcessor().processEvent(
        		EventBuilder.withBody(message, Charset.forName("UTF-8"), headers)); 
        return Status.READY;  
    }  
  
    @Override  
    public void configure(Context context) {  
    	//读取配置文件中lines配置的值
    	logger.info("MyPollableSource reading context..");
    	message=context.getString("message");
    }
    
    @Override  
    public long getBackOffSleepIncrement() {  
        return 1000;  //每次返回BACKOFF sleep 1秒
    }  
  
    @Override  
    public long getMaxBackOffSleepInterval() {  
        return 10000;  //返回BACKOFF 最多sleep 10 秒
    }  
}  

输入图片说明

© 著作权归作者所有

共有 人打赏支持
Endless2010
粉丝 1
博文 36
码字总数 23027
作品 0
南京
程序员
阿里大数据工程师教你怎样理解Flume

lume是干什么的? 收集日志的 flume如何搜集日志? 我们把flume比作情报人员 (1)搜集信息 (2)获取记忆信息 (3)传递报告间谍信息 flume是怎么完成上面三件事情的,三个组件: source: ...

JAVA丶学习
04/14
0
0
Flume - Kafka日志平台整合

1. Flume介绍 Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行...

mantoudev
03/21
0
0
flume 1.7 源码导入eclipse windows

安装maven,设置MAVEN_HOME等配置 下载flume源码 eclipse-oxygen,设置eclipse 使用外部maven,并配置settings.xml 遇到问题: 如果顺利,已将所需jar都下载下来了。 导入后遇到如下问题 fl...

柯里昂
2017/10/31
0
0
Flume NG 学习笔记(一)简介

一、简介 Flume是一个分布式、可靠、高可用的海量日志聚合系统,支持在系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据的简单处理,并写到各种数据接收方的能力。 Flume在0...

jackwxh
06/29
0
0
Flume NG 简介及配置实战

Flume 作为 cloudera 开发的实时日志收集系统,受到了业界的认可与广泛应用。Flume 初始的发行版本目前被统称为 Flume OG(original generation),属于 cloudera。但随着 FLume 功能的扩展,...

大数据之路
2014/07/08
0
9

没有更多内容

加载失败,请刷新页面

加载更多

下一页

git +STS使用问题解决一

1. 2.点以一个pull就是更新代码 3.synchronize workSpace 同步代码,同SVN一致

森火
4分钟前
0
0
powerBi odbc 连接impala 实现自助分析

配置Impala以使用ODBC 可以将第三方产品设计为使用ODBC与Impala集成。为获得最佳体验,请确保支持您打算使用的任何第三方产品。验证支持包括检查Impala,ODBC,操作系统和第三方产品的版本是...

hblt-j
9分钟前
0
0
Purism FAQ

<font size="37" color="#006248" face="幼圆"> <p align="center"> Purism FAQ </p> </font> 原文:https://puri.sm/faq/ 原作者:Purism Team 翻译者:冰焰火灵X 1079092922@qq.com 文章许......

ICE冰焰火灵X
24分钟前
0
0
nginx+webdav

1、配置Nginx以支持WebDav: Webdav是nginx一个组件,默认编译nginx时是没有安装这个组件的。 如果跟应用公用一个nginx,需要重新编译安装nginx,重新安装前需要备份好原来的nginx.conf。 1....

yaukie
30分钟前
0
0
spring 事件

ContextRefreshedEvent Event raised when an {@code ApplicationContext} gets initialized or refreshed. ContextClosedEvent Event raised when an {@code ApplicationContext} gets clos......

Canaan_
41分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部