文档章节

kafka生产者、消费者java示例

飓风2000
 飓风2000
发布于 2018/09/28 18:41
字数 183
阅读 7
收藏 0

1. 生产者

package com.unilife.myKafka;

import java.util.Properties;

import kafka.producer.Producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class KafkaProducerExample {
	  public static void main(String[] args) {
	    Properties props = new Properties();
	    props.put("bootstrap.servers", "master:9092");
	    props.put("acks", "all");
	    props.put("retries", 0);
	    props.put("batch.size", 16384);
	    props.put("linger.ms", 1);
	    props.put("buffer.memory", 33554432);
	    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
	    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
	 
	    KafkaProducer producer = new KafkaProducer(props);
	    for(int i = 0; i < 100; i++)
	      producer.send(new ProducerRecord<>("topic1", Integer.toString(i), Integer.toString(i)));
	 
	    producer.close();
	  }
	}

2. 消费者

package com.unilife.myKafka;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class KafkaConsumerExample {
	  public static void main(String[] args) {
	    Properties props = new Properties();
	    props.put("bootstrap.servers", "master:9092");
	    props.put("group.id", "test");
	    props.put("enable.auto.commit", "true");
	    props.put("auto.commit.interval.ms", "1000");
	    props.put("session.timeout.ms", "30000");
	    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
	    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
	    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
	    consumer.subscribe(Arrays.asList("topic1"));
	    while (true) {
	      ConsumerRecords<String, String> records = consumer.poll(10);
	      for (ConsumerRecord<String, String> record : records)
	        System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
	    }
	  }
	}

© 著作权归作者所有

共有 人打赏支持
飓风2000
粉丝 38
博文 337
码字总数 138498
作品 0
浦东
高级程序员
私信 提问
SpringBoot 整合 kafka 实现组订阅模式

SpringBoot 整合 kafka 实现组订阅模式: 工程结构图 消息生产者pom.xml配置 <?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" </project> 消息消......

泉天下
2018/10/18
0
0
说说MQ之RocketMQ(二)

原文出处:Valleylord RocketMQ 的 Java API RocketMQ 是用 Java 语言开发的,因此,其 Java API 相对是比较丰富的,当然也有部分原因是 RocketMQ 本身提供的功能就比较多。RocketMQ API 提供...

Valleylord
2018/10/12
0
0
不使用 Cygwin 的情况下在Windows 运行 Apache Kafka

引言 本教程的目的是提供在 Windows 操作系统上运行 Apache Kafka 的一个手把手的指南。本指南也将提供设置 Java 与 ZooKeeper 的指导。Apache kafka 是一个快速且可伸缩的消息队列系统,具有...

oschina
2016/01/08
3.9K
9
Alpakka Kafka,反应式Kafka客户端

Alpakka Kafka 是一个要用于 Java 和 Scala 语言的开源的流感知和反应式集成数据线项目。它建立在 Akka Stream之上,提供了 DSL 来支持反应式和流式编程,内置回压功能。Akka Streams 是 Re...

羊八井
02/24
0
0
Kafka Producer Consumer

Producer API org.apache.kafka.clients.producer.KafkaProducer 如果想学习Java工程化、高性能及分布式、深入浅出。微服务、Spring,MyBatis,Netty源码分析的朋友可以加我的Java高级交流:...

编程SHA
01/18
0
0

没有更多内容

加载失败,请刷新页面

加载更多

SpringBoot引入第三方jar包或本地jar包的处理方式

在开发过程中有时会用到maven仓库里没有的jar包或者本地的jar包,这时没办法通过pom直接引入,那么该怎么解决呢 一般有两种方法 - 第一种是将本地jar包安装在本地maven库 - 第二种是将本地j...

独钓渔
今天
2
0
五、MyBatis缓存

一、MyBatis缓存介绍 缓存的使用可以明显的加快访问数据速度,提升程序处理性能,生活和工作中,使用缓存的地方很多。在开发过程中,从前端-->后端-->数据库等都涉及到缓存。MyBatis作为数据...

yangjianzhou
今天
2
0
最近研究如何加速UI界面开发,有点感觉了

最近在开发JFinal学院的JBolt开发平台,后端没啥说的,做各种极简使用的封装,开发者上手直接使用。 JBolt开发平台包含常用的用户、角色、权限、字典、全局配置、缓存、增删改查完整模块、电...

山东-小木
今天
3
0
《月亮与六便士》的读后感作文3000字

《月亮与六便士》的读后感作文3000字: 看完英国作家威廉.萨默塞特.毛姆所著《月亮与六便士》(李继宏译),第一疑问就是全书即没提到“月亮”,也没提到“六便士”。那这书名又与内容有什么...

原创小博客
昨天
2
0
微信网页授权获取用户信息(ThinkPHP5)+ 微信发送客服消息(一)

以thinkphp5为实例,创建控制器 class Kf extends Controller { /** * [protected description]微信公众号appid * @var [type] */ protected $appid = "xxxxxxxxxxxxxxx"; /** * [protected......

半缘修道半缘君丶
昨天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部