Confluent kafka rest实战

原创
2017/01/11 15:04
阅读数 8.1K

Confluent platform是个什么东西?

    是由LinkedIn开发出Apache Kafka的团队成员,基于这项技术创立了新公司Confluent,Confluent的产品也是围绕着Kafka做的。基本架构如下:

 

可以免费使用的组件:

    Confluent Kafka Brokers (开源)
    Confluent Kafka Connectors(开源)
    Confluent Kafka Clients(开源)
    Confluent Kafka REST Proxy(开源)
    Confluent Schema Registry(开源)

我们的关注:

    本次我们主要使用REST Proxy,当然底层的broker也是使用confluent的kafka组件。

    实验平台:CentOS release 6.7 (Final)

    kafka版本:confluent-kafka-2.11-0.10.1.0-1

    rest proxy版本:confluent-kafka-rest-3.1.1-1

添加Yum仓库:

    本地添加confluent的repo仓库即可

[Confluent.dist]
name=Confluent repository (dist)
baseurl=http://packages.confluent.io/rpm/3.1/6
gpgcheck=1
gpgkey=http://packages.confluent.io/rpm/3.1/archive.key
enabled=1

[Confluent]
name=Confluent repository
baseurl=http://packages.confluent.io/rpm/3.1
gpgcheck=1
gpgkey=http://packages.confluent.io/rpm/3.1/archive.key
enabled=1

安装:

yum clean all
yum makecache
yum install confluent-kafka confluent-kafka-rest -y

配置:

    zookeeper:/etc/kafka/zookeeper.properties

dataDir=/var/lib/zookeeper
clientPort=2181
maxClientCnxns=0

    kafka broker:/etc/kafka/server.properties

broker.id=50
delete.topic.enable=true
listeners=PLAINTEXT://10.205.51.50:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/lib/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.205.51.50:2181
zookeeper.connection.timeout.ms=6000
confluent.support.metrics.enable=true
confluent.support.customer.id=anonymous

    rest proxy:/etc/kafka-rest/kafka-rest.properties

id=kafka-rest-server
zookeeper.connect=10.205.51.50:2181

   schema registry:/etc/schema-registry/schema-registry.properties

listeners=http://0.0.0.0:8081
kafkastore.connection.url=10.205.51.50:2181
kafkastore.topic=_schemas
debug=false

启动:

    启动zookeeper:

zookeeper-server-start -daemon /etc/kafka/zookeeper.properties

   

    启动kafka broker

kafka-server-start -daemon /etc/kafka/server.properties

   

    启动rest proxy

kafka-rest-start -daemon /etc/kafka-rest/kafka-rest.properties

    启动schema registry

schema-registry-start -daemon /etc/schema-registry/schema-registry.properties

实验过程:

    rest proxy支持avro、json、binary数据格式,本文以avro、json格式为例进行实战。

    查看当前topics:

curl http://10.205.51.50:8082/topics

   

    查看集群的brokers:

curl http://10.205.51.50:8082/brokers

    

    创建topic test2,存放avro格式的数据:

kafka-topics --create --zookeeper 10.205.51.50:2181 --partitions 1 --replication-factor 1 --topic test2

    通过rest接口向test2 push数据:

curl -i -X POST -H "Content-Type: application/vnd.kafka.avro.v1+json" --data '{"value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"username\", \"type\": \"string\"}]}","records": [{"value": {"username": "testUser"}},{"value": {"username": "testUser2"}}]}' http://10.205.51.50:8082/topics/test2

    注册consumer group:

curl -i -X POST -H "Content-Type: application/vnd.kafka.v1+json" --data '{"format": "avro", "auto.offset.reset": "smallest"}' http://10.205.51.50:8082/consumers/my_avro_consumer

    通过rest接口消费数据:

curl -i -X GET -H "Accept: application/vnd.kafka.avro.v1+json" http://10.205.51.50:8082/consumers/my_avro_consumer/instances/rest-consumer-kafka-rest-server-25354850-1a4e-4503-bce2-75b9d9a6fd1a/topics/test2

    删除注册的consumer实例:

curl -i -X DELETE http://10.205.51.50:8082/consumers/my_avro_consumer/instances/rest-consumer-kafka-rest-server-25354850-1a4e-4503-bce2-75b9d9a6fd1a

    创建topic test3,存放json格式的数据:

kafka-topics --create --zookeeper 10.205.51.50:2181 --topic test3 --replication-factor 1 --partitions 1

    通过rest接口向test3 push数据:

curl -i -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" --data '{"records": [{"key": "somekey","value": {"foo": "bar"}},{"value": [ "foo", "bar" ],"partition": 0}]}' http://10.205.51.50:8082/topics/test3

    注册consumer group:

curl -i -X POST -H "Content-Type: application/vnd.kafka.v1+json" --data '{"name": "test3","format": "json", "auto.offset.reset": "smallest"}' http://10.205.51.50:8082/consumers/my_json_consumer

    通过rest接口消费数据:

curl -i -X GET -H "Accept: application/vnd.kafka.json.v1+json" http://10.205.51.50:8082/consumers/my_json_consumer/instances/test3/topics/test3

    删除consumer实例

curl -i -X DELETE http://10.205.51.50:8082/consumers/my_json_consumer/instances/test3

 

    可以看到整个过程还是比较麻烦的,依赖多个服务。

 

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
你好,我想问一下,我通过端口直接访问查询topics、指定的topic详情和broker都是可以的,但是往topic推送消息却不成功,使用这篇文章的Content-Type推送的话会立即响应超时“HTTP/1.1 408 Request Timeout”(很奇怪),使用application/vnd.kafka.avro.v2+json则响应为“HTTP/1.1 422“。。。我完全没有头绪。。大神知道是什么情况吗?
02/25 18:03
回复
举报
China_OS博主
@data_dog 现在帮不忙,还在封锁中……
02/28 13:52
回复
举报
China_OS博主

引用来自“momisabuilder”的评论

你好,我想问下支持oracle吗?

@momisabuilder 后端的数据库?
2018/08/17 20:55
回复
举报
你好,我想问下支持oracle吗?
2018/08/10 18:28
回复
举报
更多评论
打赏
4 评论
0 收藏
0
分享
返回顶部
顶部