文档章节

Flume连接oracle实时推送数据到kafka

四叶草666
 四叶草666
发布于 2017/07/20 09:18
字数 647
阅读 33
收藏 0

版本号:

RedHat6.5   JDK1.8    flume-1.6.0   kafka_2.11-0.8.2.1

flume安装

RedHat6.5安装单机flume1.6:RedHat6.5安装单机flume1.6

kafka安装

RedHat6.5安装kafka集群 : RedHat6.5安装kafka集群

1、下载flume-ng-sql-source-1.4.3.jar

CSDN下载地址:http://download.csdn.net/detail/chongxin1/9892184

flume-ng-sql-source-1.4.3.jar是flume用于连接数据库的重要支撑jar包。

2、把flume-ng-sql-source-1.4.3.jar放到flume的lib目录下

 

3、把oracle(此处用的是oracle库)的驱动包放到flume的lib目录下

oracle的jdbc驱动包,放在oracle安装目录下,路径为:D:\app\product\11.2.0\dbhome_1\jdbc\lib

如图:

把ojdbc5.jar放到flume的lib目录下,如图:

4、新建flume-sql.conf

在conf目录新建flume-sql.conf :

 
  1. touch /usr/local/flume/apache-flume-1.6.0-bin/conf/flume-sql.conf
  2. sudo gedit /usr/local/flume/apache-flume-1.6.0-bin/conf/flume-sql.conf

flume-sql.conf输入以下内容:

 

 
  1. agentOne.channels = channelOne
  2. agentOne.sources = sourceOne
  3. agentOne.sinks = sinkOne
  4. ###########sql source#################
  5. # For each one of the sources, the type is defined
  6. agentOne.sources.sourceOne.type = org.keedio.flume.source.SQLSource
  7. agentOne.sources.sourceOne.hibernate.connection.url = jdbc:oracle:thin:@192.168.168.100:1521/orcl
  8. # Hibernate Database connection properties
  9. agentOne.sources.sourceOne.hibernate.connection.user = flume
  10. agentOne.sources.sourceOne.hibernate.connection.password = 1234
  11. agentOne.sources.sourceOne.hibernate.connection.autocommit = true
  12. agentOne.sources.sourceOne.hibernate.dialect = org.hibernate.dialect.Oracle10gDialect
  13. agentOne.sources.sourceOne.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriver
  14. agentOne.sources.sourceOne.run.query.delay=10000
  15. agentOne.sources.sourceOne.status.file.path = /tmp
  16. agentOne.sources.sourceOne.status.file.name = sqlSource.status
  17. # Custom query
  18. agentOne.sources.sourceOne.start.from = 0
  19. agentOne.sources.sourceOne.custom.query = select sysdate from dual
  20. agentOne.sources.sourceOne.batch.size = 1000
  21. agentOne.sources.sourceOne.max.rows = 1000
  22. agentOne.sources.sourceOne.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
  23. agentOne.sources.sourceOne.hibernate.c3p0.min_size=1
  24. agentOne.sources.sourceOne.hibernate.c3p0.max_size=10
  25. ##############################
  26. agentOne.channels.channelOne.type = memory
  27. agentOne.channels.channelOne.capacity = 10000
  28. agentOne.channels.channelOne.transactionCapacity = 10000
  29. agentOne.channels.channelOne.byteCapacityBufferPercentage = 20
  30. agentOne.channels.channelOne.byteCapacity = 800000
  31.  
  32. agentOne.sinks.sinkOne.type = org.apache.flume.sink.kafka.KafkaSink
  33. agentOne.sinks.sinkOne.topic = test
  34. agentOne.sinks.sinkOne.brokerList = 192.168.168.200:9092
  35. agentOne.sinks.sinkOne.requiredAcks = 1
  36. agentOne.sinks.sinkOne.batchSize = 20
  37. agentOne.sinks.sinkOne.channel = channelOne
  38.  
  39. agentOne.sinks.sinkOne.channel = channelOne
  40. agentOne.sources.sourceOne.channels=channelOne

5、flume-ng启动flume-sql.conf和测试

 

 
  1. cd /usr/local/flume/apache-flume-1.6.0-bin
  2. bin/flume-ng agent --conf conf --conf-file conf/flume-sql.conf --name agentOne -Dflume.root.logger=INFO,console

运行成功日志如下:

 
  1. 2017-07-08 00:12:55,393 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: SINK, name: sinkOne: Successfully registered new MBean.
  2. 2017-07-08 00:12:55,394 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SINK, name: sinkOne started
  3. 2017-07-08 00:12:55,463 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Fetching metadata from broker id:0,host:localhost,port:9092 with correlation id 0 for 1 topic(s) Set(test)
  4. 2017-07-08 00:12:55,528 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Connected to localhost:9092 for producing
  5. 2017-07-08 00:12:55,551 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Disconnecting from localhost:9092
  6. 2017-07-08 00:12:55,582 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - kafka.utils.Logging$class.info(Logging.scala:68)] Connected to slave2:9092 for producing

启动kafka的消费者,监听topic主题:

 
  1. kafka-console-consumer.sh --zookeeper localhost:2181 --topic test

运行成功日志如下:

 
  1. [root@master kafka_2.11-0.9.0.0]# kafka-console-consumer.sh --zookeeper localhost:2181 --topic test  
  2. "2017-07-08 00:28:53.0"
  3. "2017-07-08 00:29:03.0"
  4. "2017-07-08 00:29:13.0"
  5. "2017-07-08 00:29:23.0"
  6. "2017-07-08 00:29:33.0"
  7. "2017-07-08 00:29:43.0"
  8. "2017-07-08 00:29:53.0"
  9. "2017-07-08 00:30:03.0"

 

6、常见报错解决办法

 
  1.  2017-06-27 16:26:01,293 (C3P0PooledConnectionPoolManager[identityToken->1hgey889o1sjxqn51anc3fr|29938ba5]-AdminTaskTimer) [WARN - com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector.run(ThreadPoolAsynchronousRunner.java:759)] com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector@2d6227f3 -- APPARENT DEADLOCK!!! Complete Status:​

连接超时,造成死锁,仔细检查jdbc:oracle:thin:@192.168.168.100:1521/orcl,用户名/密码是否正确;

如果正确,还是连接不上,请检查oralce数据库是否开启了防火墙,如果是,添加入站规则或直接关闭防火墙。

© 著作权归作者所有

共有 人打赏支持
四叶草666
粉丝 0
博文 51
码字总数 50778
作品 0
深圳
程序员
私信 提问
Flume日志收集分层架构应用实践

Flume作为一个日志收集工具,非常轻量级,基于一个个Flume Agent,能够构建一个很复杂很强大的日志收集系统,它的灵活性和优势,主要体现在如下几点: 1)模块化设计:在其Flume Agent内部可...

workming
06/29
0
0
Flume+Kafka双剑合璧玩转大数据平台日志采集

概述 大数据平台每天会产生大量的日志,处理这些日志需要特定的日志系统。 一般而言,这些系统需要具有以下特征: 构建应用系统和分析系统的桥梁,并将它们之间的关联解耦; 支持近实时的在线...

java菜分享
12/11
0
0
Apache Flume 1.7.0 发布,日志服务器

Apache Flume 1.7.0 发布了,Flume 是一个分布式、可靠和高可用的服务,用于收集、聚合以及移动大量日志数据,使用一个简单灵活的架构,就流数据模型。这是一个可靠、容错的服务。 本次更新如...

局长
2016/10/19
2K
3
Apache Flume 1.6.0 发布,日志服务器

Apache Flume 1.6.0 发布,此版本现已提供下载: http://flume.apache.org/download.html 更新内容: ** Bug 修复 [FLUME-1793] - Unit test TestElasticSearchLogStashEventSerializer fail......

oschina
2015/06/03
3.1K
2
Kafka实战-Flume到Kafka

1.概述   前面给大家介绍了整个Kafka项目的开发流程,今天给大家分享Kafka如何获取数据源,即Kafka生产数据。下面是今天要分享的目录: 数据来源 Flume到Kafka 数据源加载 预览   下面开...

smartloli
2015/07/02
0
0

没有更多内容

加载失败,请刷新页面

加载更多

vue 组件使用中的一些细节点

细节一 基础例子 运行结果: 以上大家都懂,这边就不多说,回到代码里,有时候我们需要 tbody 里面每一行是一个子组件,那我们代码可以怎么写呢?我们可以这样写,定义一个全局组件,如下: ...

peakedness丶
3分钟前
0
0
vue 之 css module的使用方法

动手之前先配置项目,网上很多文章说需要下载css-loader插件,Vue中的vue-loader已经集成了 CSS Modules,因此删掉也能正常运行 在vue.config.js中添加如下配置 `css: {``loaderOptions: ...

前端小攻略
6分钟前
0
0
Range Sum Query - Immutable(leetcode303)

Given an integer array nums, find the sum of the elements between indices i and j (i ≤ j), inclusive. Example: Given nums = [-2, 0, 3, -5, 2, -1]sumRange(0, 2) -> 1sumRa......

woshixin
17分钟前
0
0
「阿里面试系列」面试加分项,从jvm层面了解线程的启动和停止

线程的启动的实现原理 线程停止的实现原理分析 为什么中断线程会抛出InterruptedException 线程的启动原理 前面我们简单分析过了线程的使用,通过调用线程的start方法来启动线程,线程启动后...

James-
24分钟前
0
0
转换 bytes 为 kb/mb/gb/tb/pb…

智能转换 bytes 为 kb/mb/gb/tb/pb… 用到了 math 模块中的一些函数 #!/usr/bin/env python# -*- coding: utf-8 -*-"""智能转换 bytes 为 kb/mb/gb/tb/pb..."""import mathdef conv...

郭恩洲_OSC博客
31分钟前
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部