文档章节

我封装的kafka consumer

芙蓉镇
 芙蓉镇
发布于 2015/07/30 17:22
字数 264
阅读 204
收藏 3

之前项目一直使用原生的kafka consumer api,很难做到真正的批量,提交offset也不是很方便,重要的是每次都有配置一大堆辅助的类。

为了解决这些我封装了一个比较容易使用的consumer和produer库,完全基于原生的kafka高级api,实现了批量处理批量提交offset,并且对于新配置一个topic consumer也非常方便。使用这个库开发的项目已经在实际的线上环境运行几个月了,表现良好,每日负责处理大量的feed和用户行为流数据。

配置方式:

<!--初始化BatchConsumer,会自动启动一个线程-->
    <bean id="kafkaBatchConsumer" class="com.myz.base.kafka.KafkaBatchConsumer" init-method="start">
        <property name="zkConnect" value="${kafka.queue.zookeeper_connect}"/>
        <property name="topic" value="${topicName}"/>
        <property name="groupId" value="$group_id}"/>
        <property name="consumerId" value="${consumer_id}"/>
        <property name="autoCommit" value="false"/>
        <property name="consumerTimeout" value="10"/>
        <property name="messageConsumer" ref="messageConsumer"/>
    </bean>

    <!--实现一个具体的消息处理类,里面可以加入自己的处理逻辑-->
    <bean id="messageConsumer" class="xxxx.queue.FeedMessageConsumer">
    </bean>

代码仓库:https://github.com/frozen007/kafka-effective



© 著作权归作者所有

共有 人打赏支持
芙蓉镇
粉丝 2
博文 1
码字总数 264
作品 0
石景山
高级程序员
私信 提问
加载中

评论(1)

omeweb
omeweb
干的不错
记Structured Streaming 2.3.1的OOM排查过程

记Structured Streaming 2.3.1的OOM排查过程 缘起 最近在使用Structured Streaming开发一套自助配置SQL的来生成流式作业的平台,在测试的过程中发现有些作业长时间运行后会有Executor端的OOM...

纳兰清风
2018/09/26
0
0
python kafka kerberos 验证 消费 生产

[toc] 安装 pykafkagithub 注意kafka版本只支持 kafka 1.1, 1.0,0.11, 0.10, 0.9,0.8 (201902) 该作者在https://github.com/dpkp/kafka-python/pull/1152 这个推送增加了kerberos支持 验证......

stys35
02/28
0
0
Kafka Consumer多线程实例

Kafka 0.9版本开始推出了Java版本的consumer,优化了coordinator的设计以及摆脱了对zookeeper的依赖。社区最近也在探讨正式用这套consumer API替换Scala版本的consumer的计划。鉴于目前这方面...

matrix_google
2018/04/22
0
0
kafkaoffsetmonitor监控topic的logsize和offset数据没有变化

问题描述: 使用kafkaoffsetmonitor监控线上的kafka的集群信息。监控平台搭建之后,发现offset和logsize的值几乎都是平行的,没有任何的数据变化,并且在kafkaoffsetmonitor的监控web界面上发...

liuhuang9496
2017/04/06
0
0
kafka消费者如何动态订阅或者退订(Java)?

kafka.0.10消费者想在程序运行中,动态订阅或者退订topic,要怎么实现? 直接操作subscribe方法的话,报错 KafkaConsumer is not safe for multi-threaded access 请问如何操作呢?...

林旺
2017/06/16
474
0

没有更多内容

加载失败,请刷新页面

加载更多

C++ vector和list的区别

1.vector数据结构 vector和数组类似,拥有一段连续的内存空间,并且起始地址不变。 因此能高效的进行随机存取,时间复杂度为o(1); 但因为内存空间是连续的,所以在进行插入和删除操作时,会造...

shzwork
今天
3
0
Spring之invokeBeanFactoryPostProcessors详解

Spring的refresh的invokeBeanFactoryPostProcessors,就是调用所有注册的、原始的BeanFactoryPostProcessor。 相关源码 public static void invokeBeanFactoryPostProcessors(Configu......

cregu
昨天
4
0
ibmcom/db2express-c_docker官方使用文档

(DEPRECIATED) Please check DB2 Developer-C Edition for the replacement. What is IBM DB2 Express-C ? ``IBM DB2 Express-C``` is the no-charge community edition of DB2 server, a si......

BG2KNT
昨天
3
0
Ubuntu 18.04.2 LTS nvidia-docker2 : 依赖: docker-ce (= 5:18.09.0~3-0~ubuntu-bionic)

平台:Ubuntu 18.04.2 LTS nvidia-docker2 版本:2.0.3 错误描述:在安装nvidia-docker2的时候报dpkg依赖错误 nvidia-docker2 : 依赖: docker-ce (= 5:18.09.0~3-0~ubuntu-bionic) 先看一下依......

Pulsar-V
昨天
4
0
学习笔记1-goland结构体(struct)

写在前面:若有侵权,请发邮件by.su@qq.com告知。 转载者告知:如果本文被转载,但凡涉及到侵权相关事宜,转载者需负责。请知悉! 本文永久更新地址:https://my.oschina.net/bysu/blog/3036...

不最醉不龟归
昨天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部