Kafka开发环境搭建

原创
2012/12/12 20:24
阅读数 2.8W

如果你要利用代码来跑kafka的应用,那你最好先把官网给出的example先在单机环境和分布式环境下跑通,然后再逐步将原有的consumerproducerbroker替换成自己写的代码。所以在阅读这篇文章前你需要具备以下前提:

1.  简单了解kafka功能,理解kafka的分布式原理

2.  能在分布式环境下成功运行—topic test

如果你还没有完成上述两个前提,请先看:

kafka分布式初步      kafka搭建分布式环境

接下来我们就简单介绍下kafka开发环境的搭建。

1.    搭建scala环境,这里有两种方案,第一直接去scala官网下载SDK,解压配置环境变量;第二种办法,你可以不用安装SDK,直接在项目中引用kafka编译后下载下来的scala编译包和编码包(scala-compiler.jarscala-library.jar)。我推荐第二种,因为通过kafka编译下载下来的scala版本和kafka版本都是匹配的(但是有时候可能会跟eclipse的插件需要的环境冲突,所以最好把第一种也安装一下,以防万一),而一般我们用的是java项目来写,所以直接导入相关依赖包就可以了,第一种方案有助于我们看源码和用scala开发。这些jar的路径位于kafka-0.7.2-incubating-src\javatest\lib目录下。

2.    eclipse安装scala开发环境。这只是一个插件,可以在:http://scala-ide.org/中下载安装或者在线安装。装完之后,你就能在eclipse中创建scala的项目了。

我们可以先写一个hello world试一下,这样一来,是不是又多了一种语言写hello world了。

有些人new的时候找不到scala相关的类,那是因为你eclipseperspective不对,切换到scalaperspective下就可以了。注意new的是object,然后输入:

package com.a2.kafka.scala.test

object Hello {

  def main(args: Array[String]): Unit = {
    printf("Hello Scala!!");
  }
}

3.     找到编码需要的依赖包。记住去你linux上经过updatekafka文件夹里找,不要从直接从官网上下载的文件里找。具体路径是:kafka-0.7.2-incubating-src\javatest\lib 这是你用java开发kafka相关程序用到的最基础的包,如果你用到了hadoop,只要去相关的文件夹找一下就可以了。然后把这些包加到项目里即可。

到了这里,基本的开发环境应该是搭建完了,然后我们要开始写点儿简单的代码了。我们还是根据之前《分布式环境搭建》中给出的例子。稍微回忆下:

1.   启动zookeeper server bin/zookeeper-server-start.sh ../config/zookeeper.properties  & (&是为了能退出命令行)

2.   启动kafka server:  bin/kafka-server-start.sh ../config/server.properties  &

3.    Kafka为我们提供了一个console来做连通性测试,下面我们先运行producerbin/kafka-console-producer.sh --zookeeper 192.168.10.11:2181 --topic test 这是相当于开启了一个producer的命令行。

4.   接下来运行consumer,新启一个terminalbin/kafka-console-consumer.sh --zookeeper 192.168.10.11:2181 --topic test --from-beginning

5.    执行完consumer的命令后,你可以在producerterminal中输入信息,马上在consumerterminal中就会出现你输的信息。

这个例子就是在分布式的环境下producer生产数据,然后consumerbroker抓取数据显示在console上。当然注意一点server.properties中的hostname需要换成你的对应地址,具体可以回去看《分布式环境搭建》。现在我们就用代码来模拟producer发送数据的过程:

这里我们建一个java project就可以了,导入依赖包。kafka-0.7.2-incubating-src\javatest\lib目录下的jar.

package com.a2.test.kafka;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.javaapi.producer.ProducerData;

import kafka.producer.ProducerConfig;

public class Producertest {
	public static void main(String[] args) {
		Properties props = new Properties();
		props.put("zk.connect", "192.168.10.11:2181");
		props.put("serializer.class", "kafka.serializer.StringEncoder");
		ProducerConfig config = new ProducerConfig(props);
		Producer<String, String> producer = new Producer<String, String>(config);
		ProducerData<String, String> data = new ProducerData<String, String>("test", "Hello");
		producer.send(data);
	}
}

这样我们就用代码代替了console去发送信息了,这里我们用了utils中的properties对象直接构建了配置,而不是直接读取,当然你也可以读配置。Data里的两个参数,第一个是指定topic,第二个是发送的内容。

接下来就是运行了,如果你是在windows下,可能会等待很久然后报Unable to connect to zookeeper server within timeout: 6000,这个可能是网卡的原因,你可以直接放到linux上,然后用命令行运行,注意引包。

Java –jar test.jar  Dclasspath=/lib

Consumer的代码以及实现和producer差不多,如果你感兴趣可以去官网找相关的代码,都很简单。当然还有一部分关于producerconsumer的配置,我们下篇再说。



展开阅读全文
打赏
4
26 收藏
分享
加载中
你好,我的jar包里没有kafka.javaapi.producer.ProducerData类,我的是0.8版本的,难道是这个类没有了?我下的是Binary版,不是source版
2014/07/31 11:00
回复
举报
很想问下kafka跟flume是否有可比性?我现在是flume+kafka--》hdfs,两者结合有什么好处?谢谢
2014/06/26 14:31
回复
举报
楼主,可否把那几个javatest/jar包发我下。
xloogson@gmail.com
2014/04/09 18:21
回复
举报
Gaischen博主

引用来自“yonguo”的评论

另外,./sbt update后,下载的依赖包既有scala2.7.7,又有scala2.8.0,

scala-compiler.jar
scala-library.jar

开发的时候上面这两个包应该用哪个版本?

我用的是scala2.7.7版本
2013/06/26 11:10
回复
举报
另外,./sbt update后,下载的依赖包既有scala2.7.7,又有scala2.8.0,

scala-compiler.jar
scala-library.jar

开发的时候上面这两个包应该用哪个版本?
2013/06/26 11:02
回复
举报

引用来自“FrankHui”的评论

引用来自“yang009ww”的评论

引用来自“FrankHui”的评论

引用来自“yang009ww”的评论

根据你的文章已初步成功搭建分布式环境,现在搭建开发环境时,在linux下kafka-0.7.2-incubating-src根目录下没有你所说的javatest,所以找不到开发所依赖的jar包

肯定有的 这个在./sbt的时候都是下载好的 估计就是./sbt update的时候没有完全

我又重新执行了./sbt update ./sbt package,都提示成功了![info] Total session time: 2 s, completed Mar 7, 2013 10:55:16 AM
[success] Build completed successfully.
但是还是没有出现javates目录,该怎么办呢?

留个邮箱 我发给你

kafka-0.7.2-incubating-src\javatest\lib

确实没有上面的路径,是不是新的安装包不一样,看到下面路径:

[kafka@kafka1 compile]$ pwd
/home/kafka/kafka-0.7.2-incubating-src/core/lib_managed/scala_2.8.0/compile
[kafka@kafka1 compile]$ ll
total 2108
-rw-rw-r-- 1 kafka kafka 53244 Dec 7 2009 jopt-simple-3.2.jar
-rw-rw-r-- 1 kafka kafka 391834 Aug 30 2007 log4j-1.2.15.jar
-rw-rw-r-- 1 kafka kafka 995968 Oct 4 2011 snappy-java-1.0.4.1.jar
-rw-rw-r-- 1 kafka kafka 62913 Apr 13 2011 zkclient-0.1.jar
-rw-rw-r-- 1 kafka kafka 604182 Nov 29 2011 zookeeper-3.3.4.jar
2013/06/26 10:58
回复
举报

引用来自“FrankHui”的评论

引用来自“yang009ww”的评论

引用来自“FrankHui”的评论

引用来自“yang009ww”的评论

根据你的文章已初步成功搭建分布式环境,现在搭建开发环境时,在linux下kafka-0.7.2-incubating-src根目录下没有你所说的javatest,所以找不到开发所依赖的jar包

肯定有的 这个在./sbt的时候都是下载好的 估计就是./sbt update的时候没有完全

我又重新执行了./sbt update ./sbt package,都提示成功了![info] Total session time: 2 s, completed Mar 7, 2013 10:55:16 AM
[success] Build completed successfully.
但是还是没有出现javates目录,该怎么办呢?

留个邮箱 我发给你

yang009ww@163.com,非常感谢!
2013/03/07 11:01
回复
举报
Gaischen博主

引用来自“yang009ww”的评论

引用来自“FrankHui”的评论

引用来自“yang009ww”的评论

根据你的文章已初步成功搭建分布式环境,现在搭建开发环境时,在linux下kafka-0.7.2-incubating-src根目录下没有你所说的javatest,所以找不到开发所依赖的jar包

肯定有的 这个在./sbt的时候都是下载好的 估计就是./sbt update的时候没有完全

我又重新执行了./sbt update ./sbt package,都提示成功了![info] Total session time: 2 s, completed Mar 7, 2013 10:55:16 AM
[success] Build completed successfully.
但是还是没有出现javates目录,该怎么办呢?

留个邮箱 我发给你
2013/03/07 11:00
回复
举报

引用来自“FrankHui”的评论

引用来自“yang009ww”的评论

根据你的文章已初步成功搭建分布式环境,现在搭建开发环境时,在linux下kafka-0.7.2-incubating-src根目录下没有你所说的javatest,所以找不到开发所依赖的jar包

肯定有的 这个在./sbt的时候都是下载好的 估计就是./sbt update的时候没有完全

我又重新执行了./sbt update ./sbt package,都提示成功了![info] Total session time: 2 s, completed Mar 7, 2013 10:55:16 AM
[success] Build completed successfully.
但是还是没有出现javates目录,该怎么办呢?
2013/03/07 10:57
回复
举报
Gaischen博主

引用来自“yang009ww”的评论

根据你的文章已初步成功搭建分布式环境,现在搭建开发环境时,在linux下kafka-0.7.2-incubating-src根目录下没有你所说的javatest,所以找不到开发所依赖的jar包

肯定有的 这个在./sbt的时候都是下载好的 估计就是./sbt update的时候没有完全
2013/03/07 10:48
回复
举报
更多评论
打赏
17 评论
26 收藏
4
分享
返回顶部
顶部