bboss kafka组件使用介绍
博客专区 > yin_bp 的博客 > 博客详情
bboss kafka组件使用介绍
yin_bp 发表于6个月前
bboss kafka组件使用介绍
  • 发表于 6个月前
  • 阅读 5
  • 收藏 1
  • 点赞 0
  • 评论 0

【腾讯云】如何购买服务器最划算?>>>   

bboss kafka组件使用介绍
本文使用的实例对应的gradle源码工程git访问地址:
http://git.oschina.net/bboss/bestpractice
testkafka子工程地址
http://git.oschina.net/bboss/bestpractice/tree/master/testkafka

1.导入bboss kafka组件
maven坐标

<dependency>
    <groupId>com.bbossgroups.plugins</groupId>
    <artifactId>bboss-plugin-kafka</artifactId>
    <version>5.0.3.1</version>
</dependency>

gradle坐标

compile 'com.bbossgroups.plugins:bboss-plugin-kafka:5.0.3.1'

2.使用kafka producer,发送消息
2.1 kafka producer配置
编写kafka.xml配置文件,放到classpath跟路径下面

相关配置说明:
bootstrap.servers kafka服务器地址配置
value.serializer kafka消息序列化插件配置
key.serializer kafka消息key序列化插件配置
f:sendDatatoKafka="true" 是否启动消息发送功能,false 禁用,true 启用

2.2 发送kafka消息

发送kafka消息相关组件:
org.frameworkset.plugin.kafka.KafkaUtil
org.frameworkset.plugin.kafka.KafkaProductor

KafkaUtil组件加载配置文件并获取KafkaProductor ,通过KafkaProductor 发送kafka消息

3.接收和处理kafka消息
3.1 kafka consumer配置
新建kafkaconsumer.xml文件,放到classpath根路径下面

配置说明:
storeService 配置消息处理组件
zookeeper.connect 配置管理kafka服务器和消息的zookeeper集群地址
f:topic="blackcat" 消费的kafka topic
f:partitions="4" topic对应的分区数,决定并行处理消息的工作线程

3.2 接收和处理消息
接收和处理消息相关组件:
org.frameworkset.plugin.kafka.KafkaConsumer
org.frameworkset.plugin.kafka.StoreService

编写消息处理组件,处理组件需要实现接口
org.frameworkset.plugin.kafka.StoreService
public void store(MessageAndMetadata<byte[], byte[]> message)  throws Exception ;
public void closeService();

StoreServiceTest实现:

3.3 加载kafka consumer配置并启动消息接收线程

BaseApplicationContext context = DefaultApplicationContext.getApplicationContext("kafkaconfumer.xml");
		KafkaConsumer consumer = context.getTBeanObject("kafkaconsumer", KafkaConsumer.class);
		Thread t = new Thread(consumer);
		t.start();

 

标签: bboss Kafka
共有 人打赏支持
粉丝 2
博文 2
码字总数 458
×
yin_bp
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: