文档章节

Flume学习-自定义Source

Endless2010
 Endless2010
发布于 2017/08/28 21:37
字数 437
阅读 41
收藏 0

自定义Source

FLume Source 有PollableSource和EventDrivenSource,启动Source时会判断

输入图片说明

EventDrivenSourceRunner

EventDrivenSourceRunner启动后调用source的start()就完了,自定义Event类型Source时,实现 EventDrivenSource接口即可

输入图片说明

PollableSourceRunner

Pollable类型的Source启动后会起一个新的线程,一直调用Source的process()方法. 这个方法可以返回Status.READY或Status.BACKOFF,如果返回READY,这次调用就结束,如果返回BACKOFF,则表示遇到了问题或者没有数据等异常情形,这时PollingRunner就会sleep一段时间

输入图片说明

测试程序

定义了2个source,一个Event类型,一个Pollable类型,都是每过一段时间打印helloworld

agent.sources = mySource1 mySource2
agent.channels = memoryChannel
agent.sinks = loggerSink

agent.sources.mySource1.type = com.endless.flume.MyPollableSource
agent.sources.mySource1.message = pollable
agent.sources.mySource1.channels = memoryChannel

agent.sources.mySource2.type = com.endless.flume.MyEventSource
agent.sources.mySource2.message = event
agent.sources.mySource2.period = 3
agent.sources.mySource2.channels = memoryChannel

agent.channels.memoryChannel.type = memory 

agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memoryChannel
public class MyEventSource extends AbstractSource implements EventDrivenSource,
		Configurable {
	private static final Logger logger = LoggerFactory.getLogger(MyEventSource.class);

	private String message = "helloworld";
	private int period = 1;
	private SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");   
    
	@Override
	public void configure(Context context) {
    	//读取配置文件中lines配置的值
    	logger.info("MyEventSource reading context..");
    	message=context.getString("message");
    	period=context.getInteger("period");
	}

	@Override
	public synchronized void start() {
		logger.info("MyEventSource start");			
		while(true){
			HashMap<String, String> headers = new HashMap<String, String>();  
			headers.put("time", formatter.format(new Date()));
		    getChannelProcessor().processEvent(
		        EventBuilder.withBody(message, Charset.forName("UTF-8"), headers)); 			
		    try {
				TimeUnit.SECONDS.sleep(period);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

	}

	@Override
	public synchronized void stop() {
		logger.info("MyEventSource stop");
	}
}

public class MyPollableSource extends AbstractSource implements Configurable, PollableSource {  
	private static final Logger logger = LoggerFactory.getLogger(MyPollableSource.class);
	private String message = "helloworld";
	private SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	private boolean status=true;
    
@Override  
    public Status process() throws EventDeliveryException {
    	status=!status;//测试用,模拟每秒1次消息
    	if(status)
    		return Status.BACKOFF;
        HashMap<String, String> headers = new HashMap<String, String>();  
        headers.put("time", formatter.format(new Date()));        	
        getChannelProcessor().processEvent(
        		EventBuilder.withBody(message, Charset.forName("UTF-8"), headers)); 
        return Status.READY;  
    }  
  
    @Override  
    public void configure(Context context) {  
    	//读取配置文件中lines配置的值
    	logger.info("MyPollableSource reading context..");
    	message=context.getString("message");
    }
    
    @Override  
    public long getBackOffSleepIncrement() {  
        return 1000;  //每次返回BACKOFF sleep 1秒
    }  
  
    @Override  
    public long getMaxBackOffSleepInterval() {  
        return 10000;  //返回BACKOFF 最多sleep 10 秒
    }  
}  

输入图片说明

© 著作权归作者所有

共有 人打赏支持
Endless2010
粉丝 1
博文 36
码字总数 23027
作品 0
南京
程序员
私信 提问
Flume学习系列(一)----总体介绍

前言: 本文是flume学习系列的开篇,主要介绍了flume的各种组件及相关配置。但是本篇文章并不打算从环境搭建开始,因为比较简单而且网上资料也很详尽: So,研读了一下官方文档,特此把Flume...

小北觅
08/20
0
0
Apache Flume 1.6.0 发布,日志服务器

Apache Flume 1.6.0 发布,此版本现已提供下载: http://flume.apache.org/download.html 更新内容: ** Bug 修复 [FLUME-1793] - Unit test TestElasticSearchLogStashEventSerializer fail......

oschina
2015/06/03
3.1K
2
Apache Flume 1.7.0 发布,日志服务器

Apache Flume 1.7.0 发布了,Flume 是一个分布式、可靠和高可用的服务,用于收集、聚合以及移动大量日志数据,使用一个简单灵活的架构,就流数据模型。这是一个可靠、容错的服务。 本次更新如...

局长
2016/10/19
2K
3
Apache Flume 1.5.0 发布,日志服务器

Apache Flume 1.5.0 发布,Flume 是一个分布式、可靠和高可用的服务,用于收集、聚合以及移动大量日志数据,使用一个简单灵活的架构,就流数据模型。这是一个可靠、容错的服务。 改进内容包括...

oschina
2014/05/22
2.8K
4
Apache Flume 1.3.0 发布,分布式日志服务器

Flume 是一个分布式、可靠和高可用的服务,用于收集、聚合以及移动大量日志数据,使用一个简单灵活的架构,就流数据模型。这是一个可靠、容错的服务。 Flume 1.3.0 版本包含如下新特性: [F...

oschina
2012/12/05
1K
0

没有更多内容

加载失败,请刷新页面

加载更多

我的Linux系统九阴真经

在今天,互联网的迅猛发展,科技技术也日新月异,各种编程技术也如雨后春笋一样,冒出尖来了。各种创业公司也百花齐放百家争鸣,特别是针对服务行业,新型互联网服务行业,共享经济等概念的公...

linux-tao
今天
17
0
MySQL: Starting MySQL….. ERROR! The server quit without updating PID file

前段时间打包了一个数据库镜像,但是启动容器之后发现报错 ··· ··· MySQL: Starting MySQL….. ERROR! The server quit without updating PID file 查了网络上的解决方案比较全,遂转帖...

blackfoxya
今天
4
0
C4C销售订单行项目价格维护方法

需求很简单,能够创建销售订单,在行项目里添加产品,带出价格来,同时把总价显示在销售订单抬头区域。 如下图所示: 下面是具体配置。 Business Configuration里,点击Sales Order的配置: ...

JerryWang_SAP
今天
14
0
deepin中配置robot framework环境

本文永久更新地址:https://my.oschina.net/bysu/blog/2989005 【若要到岸,请摇船:开源中国 不最醉不龟归】 1.在终端中输入pip,回车,如果提示没有该命令,则先安转pip sudo apt-get inst...

不最醉不龟归
今天
16
0
OSChina 周日乱弹 —— 钱不还,我就当你人不在了

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @莱布妮子 :分享Bigleaf的单曲《小鹿》 《小鹿》- Bigleaf 手机党少年们想听歌,请使劲儿戳(这里) 周日在家做什么? 做手工呀, @poorfis...

小小编辑
今天
318
6

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部