文档章节

Confluent kafka rest实战

China_OS
 China_OS
发布于 2017/01/11 15:04
字数 679
阅读 1632
收藏 0

Confluent platform是个什么东西?

    是由LinkedIn开发出Apache Kafka的团队成员,基于这项技术创立了新公司Confluent,Confluent的产品也是围绕着Kafka做的。基本架构如下:

 

可以免费使用的组件:

    Confluent Kafka Brokers (开源)
    Confluent Kafka Connectors(开源)
    Confluent Kafka Clients(开源)
    Confluent Kafka REST Proxy(开源)
    Confluent Schema Registry(开源)

我们的关注:

    本次我们主要使用REST Proxy,当然底层的broker也是使用confluent的kafka组件。

    实验平台:CentOS release 6.7 (Final)

    kafka版本:confluent-kafka-2.11-0.10.1.0-1

    rest proxy版本:confluent-kafka-rest-3.1.1-1

添加Yum仓库:

    本地添加confluent的repo仓库即可

[Confluent.dist]
name=Confluent repository (dist)
baseurl=http://packages.confluent.io/rpm/3.1/6
gpgcheck=1
gpgkey=http://packages.confluent.io/rpm/3.1/archive.key
enabled=1

[Confluent]
name=Confluent repository
baseurl=http://packages.confluent.io/rpm/3.1
gpgcheck=1
gpgkey=http://packages.confluent.io/rpm/3.1/archive.key
enabled=1

安装:

yum clean all
yum makecache
yum install confluent-kafka confluent-kafka-rest -y

配置:

    zookeeper:/etc/kafka/zookeeper.properties

dataDir=/var/lib/zookeeper
clientPort=2181
maxClientCnxns=0

    kafka broker:/etc/kafka/server.properties

broker.id=50
delete.topic.enable=true
listeners=PLAINTEXT://10.205.51.50:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/lib/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.205.51.50:2181
zookeeper.connection.timeout.ms=6000
confluent.support.metrics.enable=true
confluent.support.customer.id=anonymous

    rest proxy:/etc/kafka-rest/kafka-rest.properties

id=kafka-rest-server
zookeeper.connect=10.205.51.50:2181

   schema registry:/etc/schema-registry/schema-registry.properties

listeners=http://0.0.0.0:8081
kafkastore.connection.url=10.205.51.50:2181
kafkastore.topic=_schemas
debug=false

启动:

    启动zookeeper:

zookeeper-server-start -daemon /etc/kafka/zookeeper.properties

   

    启动kafka broker

kafka-server-start -daemon /etc/kafka/server.properties

   

    启动rest proxy

kafka-rest-start -daemon /etc/kafka-rest/kafka-rest.properties

    启动schema registry

schema-registry-start -daemon /etc/schema-registry/schema-registry.properties

实验过程:

    rest proxy支持avro、json、binary数据格式,本文以avro、json格式为例进行实战。

    查看当前topics:

curl http://10.205.51.50:8082/topics

   

    查看集群的brokers:

curl http://10.205.51.50:8082/brokers

    

    创建topic test2,存放avro格式的数据:

kafka-topics --create --zookeeper 10.205.51.50:2181 --partitions 1 --replication-factor 1 --topic test2

    通过rest接口向test2 push数据:

curl -i -X POST -H "Content-Type: application/vnd.kafka.avro.v1+json" --data '{"value_schema": "{\"type\": \"record\", \"name\": \"User\", \"fields\": [{\"name\": \"username\", \"type\": \"string\"}]}","records": [{"value": {"username": "testUser"}},{"value": {"username": "testUser2"}}]}' http://10.205.51.50:8082/topics/test2

    注册consumer group:

curl -i -X POST -H "Content-Type: application/vnd.kafka.v1+json" --data '{"format": "avro", "auto.offset.reset": "smallest"}' http://10.205.51.50:8082/consumers/my_avro_consumer

    通过rest接口消费数据:

curl -i -X GET -H "Accept: application/vnd.kafka.avro.v1+json" http://10.205.51.50:8082/consumers/my_avro_consumer/instances/rest-consumer-kafka-rest-server-25354850-1a4e-4503-bce2-75b9d9a6fd1a/topics/test2

    删除注册的consumer实例:

curl -i -X DELETE http://10.205.51.50:8082/consumers/my_avro_consumer/instances/rest-consumer-kafka-rest-server-25354850-1a4e-4503-bce2-75b9d9a6fd1a

    创建topic test3,存放json格式的数据:

kafka-topics --create --zookeeper 10.205.51.50:2181 --topic test3 --replication-factor 1 --partitions 1

    通过rest接口向test3 push数据:

curl -i -X POST -H "Content-Type: application/vnd.kafka.json.v1+json" --data '{"records": [{"key": "somekey","value": {"foo": "bar"}},{"value": [ "foo", "bar" ],"partition": 0}]}' http://10.205.51.50:8082/topics/test3

    注册consumer group:

curl -i -X POST -H "Content-Type: application/vnd.kafka.v1+json" --data '{"name": "test3","format": "json", "auto.offset.reset": "smallest"}' http://10.205.51.50:8082/consumers/my_json_consumer

    通过rest接口消费数据:

curl -i -X GET -H "Accept: application/vnd.kafka.json.v1+json" http://10.205.51.50:8082/consumers/my_json_consumer/instances/test3/topics/test3

    删除consumer实例

curl -i -X DELETE http://10.205.51.50:8082/consumers/my_json_consumer/instances/test3

 

    可以看到整个过程还是比较麻烦的,依赖多个服务。

 

 

© 著作权归作者所有

上一篇: Kafka pixy
下一篇: Hive 1.0 HWI安装
China_OS
粉丝 427
博文 463
码字总数 519985
作品 0
静安
技术主管
私信 提问
加载中

评论(2)

China_OS
China_OS 博主

引用来自“momisabuilder”的评论

你好,我想问下支持oracle吗?

@momisabuilder 后端的数据库?
momisabuilder
momisabuilder
你好,我想问下支持oracle吗?
基于kafka rest实现资源访问服务化(实战)

问题引出 新产品的体系架构包含多个模块,模块集特点是数量多、模块间交互复杂。那么统一接口是一个很好的解决方案,为了实现统一接口打算采用微服务的核心思想,设计了采用restful service...

xnchall
2018/08/01
0
0
Confluent 修改开源许可证,限制云供应商滥用

Confluent CEO Jay Kreps 近日宣布将 Confluent Platform 的部分组件从 Apache 2.0 更改为 Confluent 社区许可证(Confluent Community License),变更的主要原因和之前的 Redis 和 MongoD...

王练
01/10
2.1K
5
kafka connect分布式安装

kafka connect分布式部署 Apache Kafka Schema Registry Kafka Connect Kafka Rest Proxy Kafka Clients 说明:以上服务除Apache kafka由Linkedin始创并开源,其他组件皆由Confluent公司开发...

-九天-
2017/12/10
650
7
DataPipeline & Confluent Kafka Meetup上海站

一、活动介绍 Confluent作为国际数据“流”处理技术领先者,提供实时数据处理解决方案,在市场上拥有大量企业客户,帮助企业轻松访问各类数据。DataPipeline作为国内首家原生支持Kafka解决方...

DataPipeline
2018/09/21
38
0
DataPipeline & Confluent Kafka Meetup上海站

一、活动介绍 Confluent作为国际数据“流”处理技术领先者,提供实时数据处理解决方案,在市场上拥有大量企业客户,帮助企业轻松访问各类数据。DataPipeline作为国内首家原生支持Kafka解决方...

DataPipeline
2018/09/21
264
0

没有更多内容

加载失败,请刷新页面

加载更多

哪些情况下适合使用云服务器?

我们一直在说云服务器价格适中,具备弹性扩展机制,适合部署中小规模的网站或应用。那么云服务器到底适用于哪些情况呢?如果您需要经常原始计算能力,那么使用独立服务器就能满足需求,因为他...

云漫网络Ruan
今天
10
0
Java 中的 String 有没有长度限制

转载: https://juejin.im/post/5d53653f5188257315539f9a String是Java中很重要的一个数据类型,除了基本数据类型以外,String是被使用的最广泛的了,但是,关于String,其实还是有很多东西...

低至一折起
今天
26
0
OpenStack 简介和几种安装方式总结

OpenStack :是一个由NASA和Rackspace合作研发并发起的,以Apache许可证授权的自由软件和开放源代码项目。项目目标是提供实施简单、可大规模扩展、丰富、标准统一的云计算管理平台。OpenSta...

小海bug
昨天
11
0
DDD(五)

1、引言 之前学习了解了DDD中实体这一概念,那么接下来需要了解的就是值对象、唯一标识。值对象,值就是数字1、2、3,字符串“1”,“2”,“3”,值时对象的特征,对象是一个事物的具体描述...

MrYuZixian
昨天
9
0
解决Mac下VSCode打开zsh乱码

1.乱码问题 iTerm2终端使用Zsh,并且配置Zsh主题,该主题主题需要安装字体来支持箭头效果,在iTerm2中设置这个字体,但是VSCode里这个箭头还是显示乱码。 iTerm2展示如下: VSCode展示如下: 2...

HelloDeveloper
昨天
10
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部