文档章节

Kafka创建Topic(API方式)

huxihx
 huxihx
发布于 2017/05/19 17:02
字数 429
阅读 175
收藏 0

    Kafka官方提供了两个脚本来管理topic,包括topic的增删改查。其中kafka-topics.sh负责topic的创建与删除;kafka-configs.sh脚本负责topic的修改和查询,但很多用户都更加倾向于使用程序API的方式对topic进行操作。

    上一篇文章中提到了如何使用客户端协议(client protocol)来创建topic,本文则使用服务器端的Java API对topic进行增删改查。开始之前,需要明确的是,下面的代码需要引入kafka-core的依赖,以kafka 0.10.2版本为例:

Maven版本

<dependency>

    <groupId>org.apache.kafka</groupId>

    <artifactId>kafka_2.10</artifactId>

    <version>0.10.2.0</version>

</dependency>

Gradle版本

compile group: 'org.apache.kafka', name: 'kafka_2.10', version: '0.10.2.0'

创建topic

ZkUtils zkUtils = ZkUtils.apply("localhost:2181", 30000, 30000, 
    JaasUtils.isZkSecurityEnabled());

// 创建一个单分区单副本名为t1的topic
AdminUtils.createTopic(zkUtils, "t1", 1, 1, 
    new Properties(), RackAwareMode.Enforced$.MODULE$);

zkUtils.close();

删除topic

ZkUtils zkUtils = ZkUtils.apply("localhost:2181", 30000, 30000, 
    JaasUtils.isZkSecurityEnabled());

// 删除topic 't1'
AdminUtils.deleteTopic(zkUtils, "t1");

zkUtils.close();

    比较遗憾地是,不管是创建topic还是删除topic,目前Kafka实现的方式都是后台异步操作的,而且没有提供任何回调机制或返回任何结果给用户,所以用户除了捕获异常以及查询topic状态之外似乎并没有特别好的办法可以检测操作是否成功。

查询topic

ZkUtils zkUtils = ZkUtils.apply("localhost:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled());

// 获取topic 'test'的topic属性属性
Properties props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), "test");

// 查询topic-level属性
Iterator it = props.entrySet().iterator();
while(it.hasNext()){
    Map.Entry entry=(Map.Entry)it.next();
    Object key = entry.getKey();
    Object value = entry.getValue();
    System.out.println(key + " = " + value);
}

zkUtils.close();

修改topic

ZkUtils zkUtils = ZkUtils.apply("localhost:2181", 30000, 30000, JaasUtils.isZkSecurityEnabled());

Properties props = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic(), "test");

// 增加topic级别属性
props.put("min.cleanable.dirty.ratio", "0.3");

// 删除topic级别属性
props.remove("max.message.bytes");

// 修改topic 'test'的属性
AdminUtils.changeTopicConfig(zkUtils, "test", props);

zkUtils.close();

 

© 著作权归作者所有

huxihx
粉丝 1
博文 18
码字总数 30674
作品 0
东城
程序员
私信 提问
spark读取kafka数据流

spark读取kafka数据流提供了两种方式createDstream和createDirectStream。 两者区别如下: 1、KafkaUtils.createDstream 构造函数为KafkaUtils.createDstream(ssc, [zk], [consumer group i......

恶魔苏醒ing
2017/06/07
0
0
Spark Streaming整合kafka实战

kafka作为一个实时的分布式消息队列,实时的生产和消费消息,这里我们可以利用SparkStreaming实时计算框架实时地读取kafka中的数据然后进行计算。在spark1.3版本后,kafkaUtils里面提供了两个...

hblt-j
2018/11/15
282
0
【Kafka】Kafka的代码实现及与Flume的集成

版权声明:本文为博主原创文章,转载请注明出处。 https://blog.csdn.net/gongxifacai_believe/article/details/86547209 1、Kafka API Kafka分别提供了基于Java和Scala的API,由于Kafka不仅...

魏晓蕾
01/19
0
0
8.输入DStream之Kafka数据源实战(基于Receiver的方式)

基于Receiver的方式 这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Str...

weixin_32265569
2017/11/16
0
0
Apache Flink 漫谈系列(14-1) - DataStream Connectors之Kafka【编写中]

聊什么 为了满足本系列读者的需求,在完成《Apache Flink 漫谈系列(14) - DataStream Connectors》之前,我先介绍一下Kafka在Apache Flink中的使用。所以本篇以一个简单的示例,向大家介绍在...

金竹
01/14
0
0

没有更多内容

加载失败,请刷新页面

加载更多

移动开发中的 Web:WebView、WebKit、JSCore、Web 优化、热修复、跨平台、Native、Hybrid……

移动开发领域近年来已经逐渐告别了野蛮生长的时期,进入了相对成熟的时代。而一直以来 Native 和 Web 的争论从未停止,通过开发者孜孜不倦的努力,Web 的效率和 Native 的体验也一直在寻求着...

编辑部的故事
19分钟前
11
0
MySQL8.0.17 - Multi-Valued Indexes 简述

本文主要简单介绍下8.0.17新引入的功能multi-valued index, 顾名思义,索引上对于同一个Primary key, 可以建立多个二级索引项,实际上已经对array类型的基础功能做了支持 (感觉官方未来一定...

阿里云官方博客
今天
10
0
make4.1降级 make-3.81、2错误

在编译 make-3.82 的时候出现如下错误提示 glob/glob.c:xxx: undefined reference to `__alloca'` 修改 /glob/glob.c // #if !defined __alloca && !defined __GNU_LIBRARY__ # ifdef __GNUC......

Domineering
今天
16
0
Rainbond集群的安装和运维的原理

本文将解读Rainbond集群的安装和运维的原理,使用户基本了解Rainbond的安装机制和运维重点,便于用户搭建大型Rainbond集群。 1.Rainbond集群节点概述 1.1 节点分类 属性 类型 说明 manage 管...

好雨云帮
今天
10
0
好程序员大数据学习路线分享UDF函数

1.为什么需要UDF? 1)、因为内部函数没法满足需求。 2)、hive它本身就是一个灵活框架,允许用自定义模块功能,如可以自定义UDF、serde、输入输出等。 2.UDF是什么? UDF:user difine fun...

好程序员官方
今天
13
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部