文档章节

kafka的单机部署版本

l
 lry77
发布于 2016/08/16 20:13
字数 716
阅读 288
收藏 7

本部署使用的版本为kafka_2.8.0-0.8.0。 
参考了http://blog.csdn.net/itleochen/article/details/17451455这篇博文; 
并根据官网介绍http://kafka.apache.org/documentation.html#quickstart完成。 
废话少说,直接上步骤 
1.下载kafka_2.8.0-0.8.0.tar.gz 
https://archive.apache.org/dist/kafka/0.8.0/kafka_2.8.0-0.8.0.tar.gz 
2.解压缩 
tar -vxf kafka_2.8.0-0.8.0.tar.gz 
3.修改配置文件 
修改conf/server.properties 
host.name=192.168.110.129(修改为主机ip,不然服务器返回给客户端的是主机的hostname,客户端并不一定能够识别) 
修改conf/zookeeper.properties 属性文件 
dataDir=/usr/local/tmp/zookeeper   (zookeeper临时数据文件) 
4.启动zookeeper和kafka 
cd bin 
启动zookeeper 
./zookeeper-server-start.sh ../config/zookeeper.properties & (&推出命令行,服务守护执行) 
启动kafka 
./kafka-server-start.sh ../config/server.properties & 
5.验证是否成功 
*创建主题 
./kafka-create-topic.sh --partition 1 --replica 1 --zookeeper localhost:2181 --topic test 
检查是否创建主题成功 
./kafka-list-topic.sh --zookeeper localhost:2181 
*启动produce 
./bin/kafka-console-producer.sh --broker-list 192.168.110.129:9092  --topic test 
*启动consumer 
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test 
6.关闭kafka和zookeeper 
./kafka-server-stop.sh ../config/server.properties 
./zookeeper-server-stop.sh 
心得总结: 
1.produce启动的时候参数使用的是kafka的端口而consumer启动的时候使用的是zookeeper的端口; 
2.必须先创建topic才能使用; 
3.topic本质是以文件的形式储存在zookeeper上的。

 

消费者

package com.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;

public class KafkaConsumer
{

	private final ConsumerConnector consumer;


	private KafkaConsumer()
	{
		Properties props = new Properties();
		// zookeeper 配置
		props.put( "zookeeper.connect", "192.168.110.129:2181" );

		// group 代表一个消费组
		props.put( "group.id", "jd-group" );

		// zk连接超时
		props.put( "zookeeper.session.timeout.ms", "4000" );
		props.put( "zookeeper.sync.time.ms", "200" );
		props.put( "auto.commit.interval.ms", "1000" );
		props.put( "auto.offset.reset", "smallest" );
		// 序列化类
		props.put( "serializer.class", "kafka.serializer.StringEncoder" );

		ConsumerConfig config = new ConsumerConfig( props );

		consumer = kafka.consumer.Consumer.createJavaConsumerConnector( config );
	}


	void consume()
	{
		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
		topicCountMap.put( KafkaProducer.TOPIC, new Integer( 1 ) );

		StringDecoder keyDecoder = new StringDecoder( new VerifiableProperties() );
		StringDecoder valueDecoder = new StringDecoder( new VerifiableProperties() );

		Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams( topicCountMap, keyDecoder, valueDecoder );
		KafkaStream<String, String> stream = consumerMap.get( KafkaProducer.TOPIC ).get( 0 );
		ConsumerIterator<String, String> it = stream.iterator();
		while (it.hasNext())
			System.out.println( it.next().message() );
	}


	public static void main(String[] args)
	{
		new KafkaConsumer().consume();
	}
}

生产者

package com.kafka;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * Hello world!
 *
 */
public class KafkaProducer
{
	private final Producer<String, String> producer;

	public final static String TOPIC = "TEST-TOPIC";


	private KafkaProducer()
	{
		Properties props = new Properties();
		// 此处配置的是kafka的端口
		props.put( "metadata.broker.list", "192.168.110.129:9092" );

		// 配置value的序列化类
		props.put( "serializer.class", "kafka.serializer.StringEncoder" );
		// 配置key的序列化类
		props.put( "key.serializer.class", "kafka.serializer.StringEncoder" );

		// request.required.acks
		// 0, which means that the producer never waits for an acknowledgement from the broker (the same
		// behavior as 0.7). This option provides the lowest latency but the weakest durability guarantees
		// (some data will be lost when a server fails).
		// 1, which means that the producer gets an acknowledgement after the leader replica has received the
		// data. This option provides better durability as the client waits until the server acknowledges the
		// request as successful (only messages that were written to the now-dead leader but not yet
		// replicated will be lost).
		// -1, which means that the producer gets an acknowledgement after all in-sync replicas have received
		// the data. This option provides the best durability, we guarantee that no messages will be lost as
		// long as at least one in sync replica remains.
		props.put( "request.required.acks", "-1" );

		producer = new Producer<String, String>( new ProducerConfig( props ) );
	}


	void produce()
	{
		int messageNo = 1000;
		final int COUNT = 2000;

		while (messageNo < COUNT)
		{
			String key = String.valueOf( messageNo );
			String data = "hello kafka message " + key;
			producer.send( new KeyedMessage<String, String>( TOPIC, key, data ) );
			// System.out.println( data );
			messageNo++;
		}
	}


	public static void main(String[] args)
	{
		new KafkaProducer().produce();
	}
}

 

© 著作权归作者所有

l
粉丝 1
博文 127
码字总数 80545
作品 0
南京
私信 提问
集群四部曲(四):完美的Kafka集群搭建

之前写过一篇关于Kafka消息的发布-订阅,只不过是基于一台服务器,不够全面,下面我要说下Kafka集群环境的搭建和消息的发布-订阅,希望大家喜欢。下面的集群搭建是基于单机部署的环境,所以大...

海岸线的曙光
2018/03/20
0
0
Kafka分布式环境搭建 (二)赞

这篇文章将介绍如何搭建kafka环境,我们会从单机版开始,然后逐渐往分布式扩展。单机版的搭建官网上就有,比较容易实现,这里我就简单介绍下即可,而分布式的搭建官网却没有描述,我们最终的...

老先生二号
2017/08/07
0
0
RocketMQ与Kafka对比

RocketMQ与Kafka对比(18项差异) 淘宝内部的交易系统使用了淘宝自主研发的Notify消息中间件,使用Mysql作为消息存储媒介,可完全水平扩容,为了进一步降低成本,我们认为存储部分可以进一步...

莫问viva
2015/05/08
0
0
RocketMQ与Kafka对比(18项差异)

转自:https://github.com/alibaba/RocketMQ/wiki/rmqvskafka 淘宝内部的交易系统使用了淘宝自主研发的Notify消息中间件,使用Mysql作为消息存储媒介,可完全水平扩容,为了进一步降低成本,...

洋哥6
2016/02/29
106
0
Apache RocketMQ QuickStart

RocketMQ作为一款分布式的消息中间件(阿里的说法是不遵循任何规范的,所以不能完全用JMS的那一套东西来看它),经历了Metaq1.x、Metaq2.x的发展和淘宝双十一的洗礼,在功能和性能上远超Act...

程序员诗人
2017/09/29
0
0

没有更多内容

加载失败,请刷新页面

加载更多

火焰图(flame graph)是性能分析利器

Perf命令 Perf 命令(performance的简写)是 Linux 系统原生提供的性能分析工具,返回 CPU 正在执行的函数名以及调用栈(stack)。 通常,它的执行频率是 99Hz(每秒99次),如果99次都返回同一个...

呼呼南风
5分钟前
0
0
 好程序员大数据知识点精讲 大数据之Linux

好程序员大数据知识点精讲 大数据之Linux -Linux是什么? Linux是一套作业系统,不是应用程序Linux的基本思想有两点:第一,一切都是文件;第二,每个软件都有确定的用途。 Shell——命令行解...

好程序员IT
10分钟前
0
0
mysql 多行结合

select a1.email as email ,a1.bg ,IFNULL(a1.bg, a2.bg) from ( select * from test01 where sdate = '2019-09-11' ) a1 LEFT join (select * from test01 where sdate = '2019-09-10') a2 ......

昏鸦
11分钟前
0
0
Netflix Eureka 续约 & 更新注册表信息

Eureka Client 要定期的向 Eureka Server 发送心跳请求以保持续约的状态。 也需要定期的从 Eureka Server 获取服务注册表数据,并将服务注册表数据缓存在客户端实例内。 Eureka Client 续约 ...

BryceLoski
15分钟前
11
0
IT兄弟连 Java语法教程 Java开发环境 JVM、JRE、JDK

要想开发Java程序,就需要知道什么是JVM、JRE以及JDK。JVM是运行Java程序的核心,JRE是支持Java程序运行的环境,而JDK是Java开发的核心,下面我们分别具体介绍它们以及它们之间的关系。 1.J...

老码农的一亩三分地
23分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部