文档章节

kafka学习(二)---- Kafka简单的Java版本的Hello World实例

火龙战士
 火龙战士
发布于 2016/08/12 15:43
字数 1788
阅读 2047
收藏 7

kafka学习(二)---- Kafka简单的Java版本的Hello World实例

源码git地址:http://git.oschina.net/zhengweishan/Kafka_study_demo

github下载地址

1、开发环境

我使用的是官网的kafka_2.11-0.10.0.0版本,最新的是kafka_2.11-0.10.0.1版本,大家自行下载安装配置。点击进入下载地址点击进入如何win下配置开发环境 ##2、 创建项目 ## 两种方式:

(a)普通的方式创建

注意:开发时候,需要将下载kafka-2.11-0.10.0.0.jar包加入到classpath下面,这个包包含了所有Kafka的api的实现。由于kafka是使用Scala编写的,所以可能下载的kafka中的libs文件中的kafka-2.11-0.10.0.0.jar放到项目中不能用,而且还依赖scala-library-2.11.8.jar,所以推荐使用第二种方式构建项目。

项目结构图:

(b)maven构建项目 maven下载配置这里不再叙述,请参看:eclipse创建maven多模块项目中有关maven的介绍。好处在于不用自己去添加依赖了,maven自己帮我们加载依赖。

项目结构图:

3、实例源码

3.1 生产者

package com.kafka.demo;

import java.util.Date;
import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

/**
 * [@see](http://my.oschina.net/weimingwei) https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example
 * [@see](http://my.oschina.net/weimingwei) http://kafka.apache.org/documentation.html#producerapi
 * [@author](http://my.oschina.net/arthor) wesley
 *
 */
public class ProducerDemo {
	@SuppressWarnings("deprecation")
	public static void main(String[] args) {
		int events = 20;
		// [@see](http://my.oschina.net/weimingwei) http://kafka.apache.org/08/configuration.html-- 3.3 Producer
		// Configs
		// @see http://kafka.apache.org/documentation.html#producerconfigs
		// 设置配置属性
		Properties props = new Properties();
		props.put("metadata.broker.list", "127.0.0.1:9092"); // 配置kafka的IP和端口
		props.put("serializer.class", "kafka.serializer.StringEncoder");
		// key.serializer.class默认为serializer.class
		props.put("key.serializer.class", "kafka.serializer.StringEncoder");
		// 可选配置,如果不配置,则使用默认的partitioner
		props.put("partitioner.class", "com.kafka.demo.PartitionerDemo");
		// 触发acknowledgement机制,否则是fire and forget,可能会引起数据丢失
		// 值为0,1,-1,可以参考
		props.put("request.required.acks", "1");
		ProducerConfig config = new ProducerConfig(props);

		// 创建producer
		Producer<String, String> producer = new Producer<String, String>(config);
		// 产生并发送消息
		long start = System.currentTimeMillis();
		for (long i = 0; i < events; i++) {
			long runtime = new Date().getTime();
			String ip = "192.168.1." + i;
			String msg = runtime + "--www.kafkademo.com--" + ip;
			// 如果topic不存在,则会自动创建,默认replication-factor为1,partitions为0
			KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
			System.out.println("-----Kafka Producer----createMessage----" + data);
			producer.send(data);
		}
		System.out.println("Time consuming:" + (System.currentTimeMillis() - start));
		// 关闭producer
		producer.close();
	}
}

3.2 生产者需要配置的Partition类

package com.kafka.demo;

import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

@SuppressWarnings("deprecation")
public class PartitionerDemo implements Partitioner {
	
    public PartitionerDemo (VerifiableProperties props) {
 
    }
 
    public int partition(Object key, int a_numPartitions) {
        int partition = 0;
        String stringKey = (String) key;
        int offset = stringKey.lastIndexOf('.');
        if (offset > 0) {
           partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
        }
       return partition;
  }
 
}

运行之后的效果:

查看控制台:

红色部分就是新生成的待消费的信息。

3.3 消费者(单线程实例)

package com.kafka.demo;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
/**
 * @see http://kafka.apache.org/documentation.html#consumerapi
 * @see https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
 * @see https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
 * @author wesley
 *
 */
public class ConsumerSimpleDemo extends Thread {
	// 消费者连接
	private final ConsumerConnector consumer;
	// 要消费的话题
	private final String topic;

	public ConsumerSimpleDemo(String topic) {
		consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());
		this.topic = topic;
	}

	// 配置相关信息
	private static ConsumerConfig createConsumerConfig() {
		Properties props = new Properties();
		// props.put("zookeeper.connect","localhost:2181,10.XX.XX.XX:2181,10.XX.XX.XX:2181");
		// 配置要连接的zookeeper地址与端口
		props.put("zookeeper.connect", "127.0.0.1:2181");
		// 配置zookeeper的组id
		props.put("group.id", "group-1");
		// 配置zookeeper连接超时间隔
		props.put("zookeeper.session.timeout.ms", "10000");
		// 配置zookeeper异步执行时间
		props.put("zookeeper.sync.time.ms", "200");
		// 配置自动提交时间间隔
		props.put("auto.commit.interval.ms", "1000");
		return new ConsumerConfig(props);
	}

	public void run() {

		Map<String, Integer> topickMap = new HashMap<String, Integer>();
		topickMap.put(topic, 1);
		Map<String, List<KafkaStream<byte[], byte[]>>> streamMap = consumer.createMessageStreams(topickMap);

		KafkaStream<byte[], byte[]> stream = streamMap.get(topic).get(0);
		ConsumerIterator<byte[], byte[]> it = stream.iterator();
		System.out.println("*********Results********");
		while (true) {
			if (it.hasNext()) {
				// 打印得到的消息
				System.err.println(Thread.currentThread() + " get data:" + new String(it.next().message()));
			}
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
	
	public static void main(String[] args) {
		ConsumerSimpleDemo consumerThread = new ConsumerSimpleDemo("page_visits");
		consumerThread.start();
	}
}

运行之后的效果:

3.4 消费者(线程池实例)

package com.kafka.demo;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

/* https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
 * http://kafka.apache.org/documentation.html#consumerapi
 */
public class ConsumerDemo {
	private final ConsumerConnector consumer;
	private final String topic;
	private ExecutorService executor;

	public ConsumerDemo(String a_zookeeper, String a_groupId, String a_topic) {
		consumer = Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper,a_groupId));
		this.topic = a_topic;
	}

	public void shutdown() {
		if (consumer != null)
			consumer.shutdown();
		if (executor != null)
			executor.shutdown();
		try {
            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
            }
        } catch (InterruptedException e) {
            System.out.println("Interrupted during shutdown, exiting uncleanly");
        }
	}

	public void run(int numThreads) {
		System.out.println("-----Consumers begin to execute-------");
		Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
		topicCountMap.put(topic, new Integer(numThreads));
		Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer
				.createMessageStreams(topicCountMap);
		List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
		System.err.println("-----Need to consume content----"+streams);

		// now launch all the threads
		executor = Executors.newFixedThreadPool(numThreads);

		// now create an object to consume the messages
		int threadNumber = 0;
		for (final KafkaStream<byte[], byte[]> stream : streams) {
			System.out.println("-----Consumers begin to consume-------"+stream);
			executor.submit(new ConsumerMsgTask(stream, threadNumber));
			threadNumber++;
		}
	}

	private static ConsumerConfig createConsumerConfig(String a_zookeeper,
			String a_groupId) {
		Properties props = new Properties();
		// see http://kafka.apache.org/08/configuration.html --3.2 Consumer Configs
		// http://kafka.apache.org/documentation.html#consumerconfigs
		props.put("zookeeper.connect", a_zookeeper); //配置ZK地址
		props.put("group.id", a_groupId); //必填字段
		props.put("zookeeper.session.timeout.ms", "400");
		props.put("zookeeper.sync.time.ms", "200");
		props.put("auto.commit.interval.ms", "1000");
		return new ConsumerConfig(props);
	}

	public static void main(String[] arg) {
		String[] args = { "127.0.0.1:2181", "group-1", "page_visits", "10" };
		String zooKeeper = args[0];
		String groupId = args[1];
		String topic = args[2];
		int threads = Integer.parseInt(args[3]);

		ConsumerDemo demo = new ConsumerDemo(zooKeeper, groupId, topic);
		demo.run(threads);

		try {
			Thread.sleep(10000);
		} catch (InterruptedException ie) {

		}
		demo.shutdown();
	}
}

注意:这里要调用处理消息的类

3.5 处理消息的类

package com.kafka.demo;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;

public class ConsumerMsgTask implements Runnable {

	private KafkaStream<byte[], byte[]> m_stream;
	private int m_threadNumber;

	public ConsumerMsgTask(KafkaStream<byte[], byte[]> stream, int threadNumber) {
		m_threadNumber = threadNumber;
		m_stream = stream;
	}

	public void run() {
		System.out.println("-----Consumers begin to consume-------");
		ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
		while (it.hasNext()){
			System.out.println("Thread " + m_threadNumber + ": "+ new String(it.next().message()));
		}
		System.out.println("Shutting down Thread: " + m_threadNumber);
	}

}

运行效果图:

实例到此结束,大家可以多看看kafka的文档,多了解一些kafka的知识,这里只是演示了怎么用,其实也都是文档中的东西,自己总结了一下。

说明

为什么使用High Level Consumer?

有些场景下,从Kafka中读取消息的逻辑不处理消息的offset,仅仅是获取消息数据。High Level Consumer就提供了这种功能。首先要知道的是,High Level Consumer在ZooKeeper上保存最新的offset(从指定的分区中读取)。这个offset基于consumer group名存储。Consumer group名在Kafka集群上是全局性的,在启动新的consumer group的时候要小心集群上没有关闭的consumer。当一个consumer线程启动了,Kafka会将它加入到相同的topic下的相同consumer group里,并且触发重新分配。在重新分配时,Kafka将partition分配给consumer,有可能会移动一个partition给另一个consumer。如果老的、新的处理逻辑同时存在,有可能一些消息传递到了老的consumer上。使用High LevelConsumer首先要知道的是,它应该是多线程的。消费者线程的数量跟tipic的partition数量有关,它们之间有一些特定的规则:

  • 如果线程数量大于主题的分区数量,一些线程将得不到任何消息

  • 如果分区数大于线程数,一些线程将得到多个分区的消息

  • 如果一个线程处理多个分区的消息,它接收到消息的顺序是不能保证的。比如,先从分区10获取了5条消息,从分区11获取了6条消息,然后从分区10获取了5条,紧接着又从分区10获取了5条,虽然分区11还有消息。

  • 添加更多了同consumer group的consumer将触发Kafka重新分配,某个分区本来分配给a线程的,从新分配后,有可能分配给了b线程。

4、参考资料:

  1. http://kafka.apache.org/documentation.html
  2. https://cwiki.apache.org/confluence/display/KAFKA/Index

© 著作权归作者所有

火龙战士

火龙战士

粉丝 122
博文 138
码字总数 101234
作品 0
北京
后端工程师
私信 提问
【Kafka】阿里云消息队列kafka 结合 spring cloud stream

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/kisscatforever/article/details/86231039 一、前言 在以前的博客中,小编使用过spring cloud stream 结合rab...

AresCarry
01/17
0
0
Grafana、elasticsearch、kafka、logstash和pinpoint结合

一、Grafana 1)下载安装 wget https://s3-us-west-2.amazonaws.com/grafana-releases/release/grafana-4.2.0-1.x8664.rpm sudo yum localinstall grafana-4.2.0-1.x86_64.rpm 2)启动 serv......

半船水
2017/10/25
291
0
Windows 安装运行 Apache Kafka 教程

下面是分步指南,教你如何在Windows OS上安装运行Apache Zookeeper和Apache Kafka。 简介 本文讲述了如何在Windows OS上配置并启动Apache Kafka,这篇指南将会指导你安装Java和Apache Zookee...

大数据之路
2012/08/26
668
0
dubbo+zipkin调用链监控

收集器抽象 由于zipkin支持http以及kafka两种方式上报数据,所以在配置上需要做下抽象。 AbstractZipkinCollectorConfiguration 主要是针对下面两种收集方式的一些配置上的定义,最核心的是S...

微笑向暖wx
2018/10/09
240
0
一步步动手实现高并发的Reactor模型 —— Kafka底层如何充分利用多线程优势去处理网络I/O与业务分发

一、从《Apeche Kafka源码剖析》上搬来的概念和图 Kafka网络采用的是Reactor模式,是一种基于事件驱动的模式。熟悉Java编程的读者应该了解Java NIO提供了Reactor模式的API。常见的单线程Jav...

Anur
2018/12/13
4K
7

没有更多内容

加载失败,请刷新页面

加载更多

IT兄弟连 HTML5教程 HTML5表单 小结及习题

小结 HTML表单提交的方法有get方法和post方法,get方法的作用是从指定的资源请求数据,post方法的作用是向指定的资源提交要被处理的数据。HTML表单一直都是Web的核心技术之一,有了它我们才能...

老码农的一亩三分地
24分钟前
13
0
向maven工程中导入自己封装好的jar包方法

1.打开cmd窗口 输入并执行:mvn install:install-file -DgroupId=com.test   -DartifactId=ptest -Dversion=0.1  -Dfile=E:\test\test-0.1.0.jar    -Dpackaging=jar注:Dgr......

gantaos
26分钟前
3
0
【jQuery基础学习】09 jQuery与前端(这章很水)

本文转载于:专业的前端网站➨【jQuery基础学习】09 jQuery与前端(这章很水) 这章主要是将如何将jQuery应用到网站中,或者说其实就是一些前端知识,对于我这种后端程序来说其实还是蛮有用的...

前端老手
38分钟前
11
0
深度科技与金山云完成兼容互认证 共同促进我国软件生态发展

近日,深度科技与金山云完成兼容互认证工作,经双方共同严格测试,深度操作系统ARM服务器版软件V15与金山云分布式数据库软件DragonBase V1.0相互兼容、稳定运行,可以为企业级应用提供全面保...

后浪涛涛
39分钟前
8
0
Less导入选项

Less 提供了CSS @import CSS规则的几个扩展,以提供更多的灵活性来处理外部文件。 语法: @import (keyword) "filename"; 以下是导入指令的相关详情: reference,使用较少的文件但不输出。 ...

凌兮洛
55分钟前
16
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部