文档章节

Flume学习-自定义Source

Endless2010
 Endless2010
发布于 2017/08/28 21:37
字数 437
阅读 168
收藏 0

自定义Source

FLume Source 有PollableSource和EventDrivenSource,启动Source时会判断

输入图片说明

EventDrivenSourceRunner

EventDrivenSourceRunner启动后调用source的start()就完了,自定义Event类型Source时,实现 EventDrivenSource接口即可

输入图片说明

PollableSourceRunner

Pollable类型的Source启动后会起一个新的线程,一直调用Source的process()方法. 这个方法可以返回Status.READY或Status.BACKOFF,如果返回READY,这次调用就结束,如果返回BACKOFF,则表示遇到了问题或者没有数据等异常情形,这时PollingRunner就会sleep一段时间

输入图片说明

测试程序

定义了2个source,一个Event类型,一个Pollable类型,都是每过一段时间打印helloworld

agent.sources = mySource1 mySource2
agent.channels = memoryChannel
agent.sinks = loggerSink

agent.sources.mySource1.type = com.endless.flume.MyPollableSource
agent.sources.mySource1.message = pollable
agent.sources.mySource1.channels = memoryChannel

agent.sources.mySource2.type = com.endless.flume.MyEventSource
agent.sources.mySource2.message = event
agent.sources.mySource2.period = 3
agent.sources.mySource2.channels = memoryChannel

agent.channels.memoryChannel.type = memory 

agent.sinks.loggerSink.type = logger
agent.sinks.loggerSink.channel = memoryChannel
public class MyEventSource extends AbstractSource implements EventDrivenSource,
		Configurable {
	private static final Logger logger = LoggerFactory.getLogger(MyEventSource.class);

	private String message = "helloworld";
	private int period = 1;
	private SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");   
    
	@Override
	public void configure(Context context) {
    	//读取配置文件中lines配置的值
    	logger.info("MyEventSource reading context..");
    	message=context.getString("message");
    	period=context.getInteger("period");
	}

	@Override
	public synchronized void start() {
		logger.info("MyEventSource start");			
		while(true){
			HashMap<String, String> headers = new HashMap<String, String>();  
			headers.put("time", formatter.format(new Date()));
		    getChannelProcessor().processEvent(
		        EventBuilder.withBody(message, Charset.forName("UTF-8"), headers)); 			
		    try {
				TimeUnit.SECONDS.sleep(period);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}

	}

	@Override
	public synchronized void stop() {
		logger.info("MyEventSource stop");
	}
}

public class MyPollableSource extends AbstractSource implements Configurable, PollableSource {  
	private static final Logger logger = LoggerFactory.getLogger(MyPollableSource.class);
	private String message = "helloworld";
	private SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	private boolean status=true;
    
@Override  
    public Status process() throws EventDeliveryException {
    	status=!status;//测试用,模拟每秒1次消息
    	if(status)
    		return Status.BACKOFF;
        HashMap<String, String> headers = new HashMap<String, String>();  
        headers.put("time", formatter.format(new Date()));        	
        getChannelProcessor().processEvent(
        		EventBuilder.withBody(message, Charset.forName("UTF-8"), headers)); 
        return Status.READY;  
    }  
  
    @Override  
    public void configure(Context context) {  
    	//读取配置文件中lines配置的值
    	logger.info("MyPollableSource reading context..");
    	message=context.getString("message");
    }
    
    @Override  
    public long getBackOffSleepIncrement() {  
        return 1000;  //每次返回BACKOFF sleep 1秒
    }  
  
    @Override  
    public long getMaxBackOffSleepInterval() {  
        return 10000;  //返回BACKOFF 最多sleep 10 秒
    }  
}  

输入图片说明

© 著作权归作者所有

Endless2010
粉丝 1
博文 36
码字总数 23027
作品 0
南京
程序员
私信 提问
加载中

评论(0)

大数据技术之_09_Flume学习_Flume概述+Flume快速入门+Flume企业开发案例+Flume监控之Ganglia+Flume高级之自定义MySQLSource+Flume企业真实面...

第1章 Flume概述1.1 Flume定义1.2 Flume组成架构1.2.1 Agent1.2.2 Source1.2.3 Channel1.2.4 Sink1.2.5 Event1.3 Flume拓扑结构1.4 Flume Agent内部原理1.5 Hadoop三大发行版本第2章 Flume快......

osc_6ogjsu3t
2019/03/04
15
0
【Flume】(五)Flume 企业开发实战(自定义 Interceptor、自定义 Source、自定义 Sink)

文章目录 一、复制和多路复用 1)案例需求 使用 Flume-1 监控文件变动,Flume-1 将变动内容传递给 Flume-2,Flume-2 负责存储 到 HDFS。同时 Flume-1 将变动内容传递给 Flume-3,Flume-3 负责...

云祁°
04/08
0
0
Flume学习系列(一)----总体介绍

前言: 本文是flume学习系列的开篇,主要介绍了flume的各种组件及相关配置。但是本篇文章并不打算从环境搭建开始,因为比较简单而且网上资料也很详尽: So,研读了一下官方文档,特此把Flume...

小北觅
2018/08/20
0
0
Flume学习之路 (一)Flume的基础介绍

目录 一、背景 二、Flume的简介 三、Flume NG的介绍 四、Flume的部署类型 五、Flume的安装 正文 回到顶部 一、背景 Hadoop业务的整体开发流程:   从Hadoop的业务开发流程图中可以看出,在...

Tim&Blog
01/22
0
0
flume http source示例讲解

一、介绍 flume自带的Http Source可以通过Http Post接收事件。 场景:对于有些应用程序环境,它可能不能部署Flume SDK及其依赖项,或客户端代码倾向于通过HTTP而不是Flume的PRC发送数据的情况...

osc_ojx9hm4t
2018/06/13
5
0

没有更多内容

加载失败,请刷新页面

加载更多

智慧城市交通的要素:路口监管可视化系统的解决方案

前言 随着信息时代的发展变迁,荧幕里呈现的 智慧城市慢慢出现了在现实生活中,很大程度上便利了日常的管理和维护。在智慧城市的大背景下, 智慧交通监管可视化系统是其重要的组成部分,通过...

osc_b8epmas9
15分钟前
11
0
CPU上下文切换以及相关指标的理解

前言 上下文切换这个词一直不理解,看了无数遍就忘了无数遍,知道看到《操作系统导论》这本书,终于有了略微的理解。这也证明了我的方向是没错的,一直认为做运维还是得理解底层的知识,不理...

osc_n1x6m26g
16分钟前
9
0
记一次 React Native 大版本升级过程——从0.40到0.59

去年把公司几个react native 相关的项目升级了下,已经过去一段时间了,这里系统整理下之前的整个过程。 背景 之前到公司的时候发现公司用的还是0.40的版本,据了解,当时项目做的比较早,导...

osc_j34n26zn
17分钟前
13
0
谈谈压测

背景 随着业务不断发展,用户量不断增加,系统负载越来越高。为了解决系统负载问题,我们是不是直接大量增加机器就可以了? 同时,公司业务开展需要,可能需要开展各种营销活动,目前系统是否...

osc_cudh2wh2
19分钟前
13
0
scipy.sparse的一些整理

一、scipy.sparse中七种稀疏矩阵类型 1、bsr_matrix:分块压缩稀疏行格式 介绍   BSR矩阵中的inptr列表的第i个元素与i+1个元素是储存第i行的数据的列索引以及数据的区间索引,即indices[i...

osc_auwur47t
21分钟前
20
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部