Flume实时采集MySQL增量数据

2019/12/11 11:19
阅读数 359

01-简介

相关项目计划实时增量采集MySQL数据库存储的数据到Kafka中,供消费者消费。在网上了解了一种方案,使用Flume第三方Source插件“flume-ng-sql-source”,可以实时增量采集MySQL数据

flume-ng-sql-source的GitHub地址:https://github.com/keedio/flume-ng-sql-source

 

02-环境信息

1.基础环境信息

  • 集群环境
    • CDH 5.8.3
    • CDH提供的Flume
    • CDH提供的Kafka
  • 数据源
    • MySQL 5.6
  • Source插件
    • flume-ng-sql-source 1.4.3
  • MySQL JDBC
    • mysql-connector-java-5.1.35.jar

2.插件版本选择

 

 

 根据官网提供的说明,Flume1.7以及更早的版本,使用1.4.3版本的插件

03-测试搭建

1.插件编译打包

使用Maven进行编译打包,java默认版本1.7

2.主机创建相关目录并上传jar包

 

 

 按照CDH集群Flume配置的主目录以及插件目录,在agent运行的节点创建如下两个目录:

  • /var/lib/flume-ng/plugins.d/sql-source/lib
    • 存储flume-ng-sql-source的jar
  • /var/lib/flume-ng/plugins.d/sql-source/libext
    • 存储MySQL JDBC的jar

由于需要通过状态文件来时间增量采集,配置文件中会指定状态文件名以及存放的目录。提前创建好目录以及状态文件。

注意:由于Flume配置的执行用户是flume,所以状态目录以及文件要给flume用户赋权,或者将属主属组改成flume:flume;Flume会在该文件更新增量

 

 

 

 3.配置文件

 1 tier1.sources = sqlSource
 2 tier1.channels = sqlChannel
 3 tier1.sinks = kafkaSink
 4 #Source配置
 5 # For each one of the sources, the type is defined
 6 tier1.sources.sqlSource.type = org.keedio.flume.source.SQLSource
 7 tier1.sources.sqlSource.hibernate.connection.url = jdbc:mysql://132.35.231.131:3306/zhaoxj_db
 8 # Hibernate Database connection properties
 9 tier1.sources.sqlSource.hibernate.connection.user = zhaoxj
10 tier1.sources.sqlSource.hibernate.connection.password = *********
11 tier1.sources.sqlSource.hibernate.connection.autocommit = true
12 tier1.sources.sqlSource.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
13 tier1.sources.sqlSource.hibernate.connection.driver_class = com.mysql.jdbc.Driver
14 #agent.sources.sqlSource.table = employee1
15 # Columns to import to kafka (default * import entire row)
16 #agent.sources.sqlSource.columns.to.select = *
17 # Query delay, each configured milisecond the query will be sent
18 tier1.sources.sqlSource.run.query.delay = 10000
19 #Source查询状态配置
20 tier1.sources.sqlSource.status.file.path = /var/log/flume
21 tier1.sources.sqlSource.status.file.name = sqlSource.status
22 # Source查询配置
23 #增量起始值
24 tier1.sources.sqlSource.start.from = 0
25 #这里引号需要用转移字符\转义,$@$会被替换为:1.当状态文件中增量标识不存在,也就是第一次查询的时候,替换为start.from指定的值;2.当增量标识有值,则替换为状态文件里存储的值
26 tier1.sources.sqlSource.custom.query = select * from test_line_feed where t_id > \'$@$\'
27 tier1.sources.sqlSource.batch.size = 1000
28 #每次查询1000条数据,查询的语句会自动拼接 “limit 1000”
29 tier1.sources.sqlSource.max.rows = 1000
30 #字段分隔符
31 tier1.sources.sqlSource.delimiter.entry = |
32 #连接池的配置
33 tier1.sources.sqlSource.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
34 tier1.sources.sqlSource.hibernate.c3p0.min_size = 1
35 tier1.sources.sqlSource.hibernate.c3p0.max_size = 10
36 
37 #channel配置
38 # The channel can be defined as follows.
39 tier1.sources.sqlSource.channels = sqlChannel
40 
41 tier1.channels.sqlChannel.type = memory
42 tier1.channels.sqlChannel.capacity = 500
43 
44 #sink配置
45 #使用的flume自带的kafka sink
46 tier1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
47 tier1.sinks.kafkaSink.flumeBatchSize = 640
48 tier1.sinks.kafkaSink.kafka.bootstrap.servers = 132.35.231.160:9092,132.35.231.161:9092,132.35.231.162:9092
49 tier1.sinks.kafkaSink.kafka.topic = zhaoxj_test
50 
51 #关联配置
52 tier1.sources.sqlSource.channels = sqlChannel
53 tier1.sinks.kafkaSink.channel = sqlChannel

 注意:无法指定字段为判断增量字段,代码中默认查询的第一个字段为增量字段。解决方案:可以在查询的时候,将想要作为判断增量依据的字段放在第一个位置。

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部