confluent+mysql实现实时数据交换

原创
2018/03/01 21:15
阅读数 1.6W

2014 年的时候,Kafka 的三个主要开发人员从 LinkedIn 出来创业,开了一家叫作 Confluent 的公司。和其他大数据公司类似,Confluent 的产品叫作 Confluent Platform。这个产品的核心是 Kafka,分为三个版本:Confluent Open Source、Confluent Enterprise 和 Confluent Cloud。

这里就不过多说confluent的背景,详细的情况可以查看官方网站https://www.confluent.io,这里主要介绍,如利用confluent平台实时的捕获mysql中的数据。

 安装jdbc-mysql-driver

wget http://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.39.tar.gz
tar xzvf mysql-connector-java-5.1.39.tar.gz
sed -i '$a export CLASSPATH=/root/mysql-connector-java-5.1.39/mysql-connector-java-5.1.39-bin.jar:$CLASSPATH' /etc/profile
source /etc/profile

安装confluent

下载confluent的tar包解压安装。

cd /usr/local
# tar zxvf confluent.tar.gz

confluent平台各组件的默认端口号

Component Default Port
Zookeeper  2181
Apache Kafka brokers (plain text) 9092
Schema Registry REST API 8081
REST Proxy 8082
Kafka Connect REST API 8083
Confluent Control Center 9021

confluent的mysql数据源配置

创建一个confluent从mysql加载数据的配置文件quickstart-mysql.properties

name=mysql-whitelist-timestamp-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10
connection.user=root
connection.password=root
connection.url=jdbc:mysql://192.168.248.128:3306/foodsafe?characterEncoding=utf8&useSSL=true

#数据表白名单
#table.whitelist=t1

mode=timestamp+incrementing
timestamp.column.name=modified
incrementing.column.name=id

#topic的前缀,confulent平台会为每张表创建一个topic,topic的名称为前缀+表名
topic.prefix=mysql-test-


自定义查询模式:

如果使用上面的配置来启动服务,则confluent平台将会监测拉取所有表的数据,有时候可能并不需要这样做,confulent平台提供了自定义查询模式。配置参考如下:

#User defined connector instance name
name=mysql-whitelist-timestamp-source
#The class implementing the connector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
#Maximum number of tasks to run for this connector instance
tasks.max=10

connection.url=jdbc:mysql://192.168.248.128:3306/foodsafe?characterEncoding=utf8&useSSL=true
connection.user=root
connection.password=root
query=SELECT f.`name`,p.price,f.create_time from foods f join price p on (f.id = p.food_id)
mode=timestamp
timestamp.column.name=timestamp

topic.prefix=mysql-joined-data


query模式下使用where查询语句容易造成kafka拼接sql错误,最好采用join

1.启动zookeeper

因为zookeeper是一个长期的服务,最好在后台运行,同时需要有写权限到/var/lib在这一步以及之后的步骤,如果没有权限请查看安装confulent的用户是否具有/var/lib的写权限

# cd /usr/local/confulent-3.2.2
# ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties &
# 以守护进程方式启动
# sudo confluent-3.2.2/bin/zookeeper-server-start -daemon /etc/kafka/zookeeper.properties

停止zookeeper

$ ./bin/zookeeper-server-stop

2.启动kafka

# cd /usr/local/confluent-3.2.2
# ./bin/kafka-server-start ./etc/kafka/server.properties &

停止kafka服务

./bin/kafka-server-stop

3.启动Schema Registry

# cd /usr/local/confluent-3.2.2
# ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties &

停止schema-registry

# ./bin/schema-registry-stop

4.启动监听mysql数据的producer

# cd /usr/local/confluent-3.2.2
# ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-jdbc/quickstart-mysql.properties &

5.启动消费数据的consumer

# cd /usr/local/confluent-3.2.2
#./bin/kafka-avro-console-consumer --new-consumer --bootstrap-server localhost:9092 --topic mysql-test-t1 --from-beginning

测试sql

DROP TABLE IF EXISTS `t1`;
CREATE TABLE `t1` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(200) DEFAULT NULL,
  `createtime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `modified` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  KEY `id` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=9 DEFAULT CHARSET=utf8;

-- ----------------------------
-- Records of t1
-- ----------------------------
INSERT INTO `t1` VALUES ('1', 'aa', '2017-07-10 08:03:51', '2017-07-10 23:03:30');
INSERT INTO `t1` VALUES ('3', 'bb', '2017-07-10 08:03:45', '2017-07-10 23:03:34');
INSERT INTO `t1` VALUES ('4', '年内', '2017-07-10 08:05:51', '2017-07-10 23:05:45');
INSERT INTO `t1` VALUES ('5', '年内', '2017-07-10 08:44:28', '2017-07-10 23:15:45');
INSERT INTO `t1` VALUES ('6', '公共', '2017-07-18 06:05:11', '2017-07-18 21:04:58');
INSERT INTO `t1` VALUES ('7', '哈哈', '2017-07-18 19:05:04', '2017-07-18 07:32:13');
INSERT INTO `t1` VALUES ('8', '公共经济', '2017-07-27 20:33:10', '2017-07-18 07:34:43');

数据插入语句

INSERT INTO `t1` (name,createtime,modified)VALUES ('公共经济2', '2017-07-27 20:33:10', '2017-07-18 07:34:43');

插入新数据后将会在consumer端实时输出我们插入的数据

{"id":7,"name":{"string":"哈哈"},"createtime":1500429904000,"modified":1500388333000}
{"id":8,"name":{"string":"公共经济"},"createtime":1501212790000,"modified":1500388483000}
{"id":9,"name":{"string":"公共经济1"},"createtime":1501212790000,"modified":1500388483000}
{"id":10,"name":{"string":"公共经济2"},"createtime":1501212790000,"modified":1500388483000}

关于confluent的使用国内目前使用似乎很少,相关的中文文档也极少。本文是去年7月份我在做实时数据交换技术调研是根据官方文档实践的记录。

 

展开阅读全文
加载中
点击加入讨论🔥(3) 发布并加入讨论🔥
打赏
3 评论
26 收藏
0
分享
返回顶部
顶部