文档章节

java操作kafka

Zero零_度
 Zero零_度
发布于 2016/03/07 11:46
字数 268
阅读 74
收藏 0

maven依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.9.0.0</version>
</dependency>

生产者:

package com.sniper.kafka.helloworld;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;

public class KafkaProducer {
    
    public static void main(String[] args) {
        Properties prop = new Properties();
        //生成者可以不设置zookeeper
        //prop.setProperty("zookeeper.connect", "sniper5:2181");
        prop.setProperty("serializer.class", StringEncoder.class.getName());
        prop.setProperty("metadata.broker.list", "sniper5:9092, sniper6:9092, sniper7:9092");
        
        Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(prop));
        
        for(int i=0; i<3; i++) {
            KeyedMessage<Integer, String> msg = new KeyedMessage<Integer, String>("test1", "hi-" + i);
            producer.send(msg);
        }
        
        producer.close();
        
    }
    
}

消费者:

package com.sniper.kafka.helloworld;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaConsumer {
    
    public static void main(String[] args) {
        Properties prop = new Properties();
        prop.setProperty("zookeeper.connect", "sniper5:2181,sniper6:2181,sniper7:2181");
        prop.put("zk.connectiontimeout.ms", "1000000");  
        prop.setProperty("group.id", "group1");
        
        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(prop));
        
        Map<String, Integer> map = new HashMap<String, Integer>();
        map.put("test1", 1);
        
        Map<String, List<KafkaStream<byte[], byte[]>>> msgStreams = consumer.createMessageStreams(map);
        KafkaStream<byte[], byte[]> kafkaStreams = msgStreams.get("test1").get(0);
        ConsumerIterator<byte[], byte[]> iterator = kafkaStreams.iterator();
        while(iterator.hasNext()) {
            String msg = new String(iterator.next().message());
            System.out.println(msg);
        }
    }
    
}

遇到问题:

1、消费端设置zookeeper集群,逗号之间不允许有空格

2、没有主题的情况下,启动消费端会报错,顺序为:创建主题,启动消费端关注主题,生产端生产消息到主题

© 著作权归作者所有

共有 人打赏支持
Zero零_度
粉丝 67
博文 1245
码字总数 252866
作品 0
程序员
请教Kafka在window下运行错误的问题

我在window server 2012 上运行 kafka 的时候 出现这个错误 我本地是可以的,但是在线上服务器就出错了 应该如何解决,请教下 !! Java的版本: C:UsersAdministrator>java -version java ...

Macrotea
04/21
0
0
Zookeeper+Kafka集群搭建

Zookeeper集群搭建 Kafka集群是把状态保存在Zookeeper中的,首先要搭建Zookeeper集群。 1、软件环境 (3台服务器-我的测试) 192.168.30.204 server1 192.168.30.205 server2 192.168.30.206...

qianghong000
06/26
0
0
kafka开源管理工具Kafka-manager部署

  简介      Kafka-manager是雅虎开源的apache-kafka管理工具,是用Scala写,所以在web页面进行操作即可。   Githubhttps://github.com/yahoo/kafka-manager   主要特性:   1. ...

linux运维菜
04/19
0
0
Grafana、elasticsearch、kafka、logstash和pinpoint结合

一、Grafana 1)下载安装 wget https://s3-us-west-2.amazonaws.com/grafana-releases/release/grafana-4.2.0-1.x8664.rpm sudo yum localinstall grafana-4.2.0-1.x86_64.rpm 2)启动 serv......

半船水
2017/10/25
0
0
Windows 安装运行 Apache Kafka 教程

下面是分步指南,教你如何在Windows OS上安装运行Apache Zookeeper和Apache Kafka。 简介 本文讲述了如何在Windows OS上配置并启动Apache Kafka,这篇指南将会指导你安装Java和Apache Zookee...

大数据之路
2012/08/26
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

c语言之内存分配笔记

先看一个数组: short array[5] = {1,2} // 这儿定义的一个int类型的数组,数组第1和第2个元素值是1和2.其余后面默认会给值为0; 或者 short array[] = {1,2};//这儿数组第1和第2个元素,数组...

DannyCoder
今天
2
0
Shell | linux安装包不用选择Y/N的方法

apt-get install -y packageOR echo "y" | sudo apt-get install package

云迹
今天
2
0
Hadoop的大数据生态圈

基于Hadoop的大数据的产品圈 大数据产品的一句话概括 Apache Hadoop: 是Apache开源组织的一个分布式计算开源框架,提供了一个分布式文件系统子项目(HDFS)和支持MapReduce分布式计算的软件架...

zimingforever
今天
5
0
八大包装类型的equals方法

先看其中一个源码 结论:八大包装类型的equals方法都是先判断类型是否相同,不相同则是false,相同则判断值是否相等 注意:包装类型不能直接用==来等值比较,否则编译报错,但是数值的基本类型...

xuklc
今天
2
0
NoSQL , Memcached介绍

什么是NoSQL 非关系型数据库就是NoSQL,关系型数据库代表MySQL 对于关系型数据库来说,是需要把数据存储到库、表、行、字段里,查询的时候根据条件一行一行地去匹配,当量非常大的时候就很耗...

TaoXu
昨天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部