文档章节

Flume学习-自定义Source

Endless2010
 Endless2010
发布于 2017/08/28 21:37
字数 437
阅读 39
收藏 0

自定义Source

FLume Source 有PollableSource和EventDrivenSource,启动Source时会判断

输入图片说明

EventDrivenSourceRunner

EventDrivenSourceRunner启动后调用source的start()就完了,自定义Event类型Source时,实现 EventDrivenSource接口即可

输入图片说明

PollableSourceRunner

Pollable类型的Source启动后会起一个新的线程,一直调用Source的process()方法. 这个方法可以返回Status.READY或Status.BACKOFF,如果返回READY,这次调用就结束,如果返回BACKOFF,则表示遇到了问题或者没有数据等异常情形,这时PollingRunner就会sleep一段时间

输入图片说明

测试程序

定义了2个source,一个Event类型,一个Pollable类型,都是每过一段时间打印helloworld

agent.sources = mySource1 mySource2
agent.channels = memoryChannel
agent.sinks = loggerSink

agent.sources.mySource1.type = com.endless.flume.MyPollableSource
agent.sources.mySource1.message = pollable
agent.sources.mySource1.channels = memoryChannel

agent.sources.mySource2.type = com.endless.flume.MyEventSource
agent.sources.mySource2.message = event
agent.sources.mySource2.period = 3
agent.sources.mySource2.channels = memoryChannel

agent.channels.memoryChannel.type = memory 

agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memoryChannel
public class MyEventSource extends AbstractSource implements EventDrivenSource,
		Configurable {
	private static final Logger logger = LoggerFactory.getLogger(MyEventSource.class);

	private String message = "helloworld";
	private int period = 1;
	private SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");   
    
	@Override
	public void configure(Context context) {
    	//读取配置文件中lines配置的值
    	logger.info("MyEventSource reading context..");
    	message=context.getString("message");
    	period=context.getInteger("period");
	}

	@Override
	public synchronized void start() {
		logger.info("MyEventSource start");			
		while(true){
			HashMap<String, String> headers = new HashMap<String, String>();  
			headers.put("time", formatter.format(new Date()));
		    getChannelProcessor().processEvent(
		        EventBuilder.withBody(message, Charset.forName("UTF-8"), headers)); 			
		    try {
				TimeUnit.SECONDS.sleep(period);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

	}

	@Override
	public synchronized void stop() {
		logger.info("MyEventSource stop");
	}
}

public class MyPollableSource extends AbstractSource implements Configurable, PollableSource {  
	private static final Logger logger = LoggerFactory.getLogger(MyPollableSource.class);
	private String message = "helloworld";
	private SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	private boolean status=true;
    
@Override  
    public Status process() throws EventDeliveryException {
    	status=!status;//测试用,模拟每秒1次消息
    	if(status)
    		return Status.BACKOFF;
        HashMap<String, String> headers = new HashMap<String, String>();  
        headers.put("time", formatter.format(new Date()));        	
        getChannelProcessor().processEvent(
        		EventBuilder.withBody(message, Charset.forName("UTF-8"), headers)); 
        return Status.READY;  
    }  
  
    @Override  
    public void configure(Context context) {  
    	//读取配置文件中lines配置的值
    	logger.info("MyPollableSource reading context..");
    	message=context.getString("message");
    }
    
    @Override  
    public long getBackOffSleepIncrement() {  
        return 1000;  //每次返回BACKOFF sleep 1秒
    }  
  
    @Override  
    public long getMaxBackOffSleepInterval() {  
        return 10000;  //返回BACKOFF 最多sleep 10 秒
    }  
}  

输入图片说明

© 著作权归作者所有

共有 人打赏支持
Endless2010
粉丝 1
博文 36
码字总数 23027
作品 0
南京
程序员
Flume学习系列(一)----总体介绍

前言: 本文是flume学习系列的开篇,主要介绍了flume的各种组件及相关配置。但是本篇文章并不打算从环境搭建开始,因为比较简单而且网上资料也很详尽: So,研读了一下官方文档,特此把Flume...

小北觅
08/20
0
0
阿里大数据工程师教你怎样理解Flume

lume是干什么的? 收集日志的 flume如何搜集日志? 我们把flume比作情报人员 (1)搜集信息 (2)获取记忆信息 (3)传递报告间谍信息 flume是怎么完成上面三件事情的,三个组件: source: ...

JAVA丶学习
04/14
0
0
Flume学习系列(四)---- Interceptors(拦截器)

前言:flume通过使用Interceptors(拦截器)实现修改和过滤事件的功能。举个栗子,一个网站每天产生海量数据,但是可能会有很多数据是不完整的(缺少重要字段),或冗余的,如果不对这些数据...

小北觅
08/21
0
0
02. Spark Streaming实时流处理学习——分布式日志收集框架Flume

2. 分布式日志收集框架Flume 2.1 业务现状分析 如上图,大量的系统和各种服务的日志数据持续生成。用户有了很好的商业创意想要充分利用这些系统日志信息。比如用户行为分析,轨迹跟踪等等。 ...

牦牛sheriff
09/02
0
0
flume 1.7 源码导入eclipse windows

安装maven,设置MAVEN_HOME等配置 下载flume源码 eclipse-oxygen,设置eclipse 使用外部maven,并配置settings.xml 遇到问题: 如果顺利,已将所需jar都下载下来了。 导入后遇到如下问题 fl...

柯里昂
2017/10/31
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Android JNI开发系列(十三) JNI异常处理

JNI 异常处理 JNI异常与JAVA处理异常的区别 JAVA 有异常处理机制,而JNI没有 如果JAVA中异常没有捕获,后面的代码不会执行,JNI会执行 JAVA编译时的异常,是在方法显示的声明了某一个异常,编...

蔡小鹏
33分钟前
1
0
简单介绍Java 的JAR包、EAR包、WAR包区别

WAR包 WAR(Web Archive file)网络应用程序文件,是与平台无关的文件格式,它允许将许多文件组合成一个压缩文件。War专用于Web方面。大部分的JAVA WEB工程,都是打成WAR包进行发布的。 War是...

Linux就该这么学
57分钟前
1
0
Qt那些事0.0.7

在帮助文档(Overview - QML and C++ Integration)中随缘遇到一张图,是关于C++对象与QML整合介绍的,值得标记下来,虽然大部分功能也有所涉猎,但是还是留个记号,万一哪天我失忆了还想写Q...

Ev4n
今天
0
0
快速幂运算

题:求一个数 data 的 n 次幂,要求时间复杂度为log(n) 1:递归算法: /** * x^3=(x^2)*x;x^7=(x^3)^2 * x * * 递归算法 * @param data 底数 * @param n 次...

偶尔诗文
今天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部