文档章节

java操作kafka

Zero零_度
 Zero零_度
发布于 2016/03/07 11:46
字数 268
阅读 78
收藏 0

maven依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.9.0.0</version>
</dependency>

生产者:

package com.sniper.kafka.helloworld;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;

public class KafkaProducer {
    
    public static void main(String[] args) {
        Properties prop = new Properties();
        //生成者可以不设置zookeeper
        //prop.setProperty("zookeeper.connect", "sniper5:2181");
        prop.setProperty("serializer.class", StringEncoder.class.getName());
        prop.setProperty("metadata.broker.list", "sniper5:9092, sniper6:9092, sniper7:9092");
        
        Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(prop));
        
        for(int i=0; i<3; i++) {
            KeyedMessage<Integer, String> msg = new KeyedMessage<Integer, String>("test1", "hi-" + i);
            producer.send(msg);
        }
        
        producer.close();
        
    }
    
}

消费者:

package com.sniper.kafka.helloworld;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaConsumer {
    
    public static void main(String[] args) {
        Properties prop = new Properties();
        prop.setProperty("zookeeper.connect", "sniper5:2181,sniper6:2181,sniper7:2181");
        prop.put("zk.connectiontimeout.ms", "1000000");  
        prop.setProperty("group.id", "group1");
        
        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(prop));
        
        Map<String, Integer> map = new HashMap<String, Integer>();
        map.put("test1", 1);
        
        Map<String, List<KafkaStream<byte[], byte[]>>> msgStreams = consumer.createMessageStreams(map);
        KafkaStream<byte[], byte[]> kafkaStreams = msgStreams.get("test1").get(0);
        ConsumerIterator<byte[], byte[]> iterator = kafkaStreams.iterator();
        while(iterator.hasNext()) {
            String msg = new String(iterator.next().message());
            System.out.println(msg);
        }
    }
    
}

遇到问题:

1、消费端设置zookeeper集群,逗号之间不允许有空格

2、没有主题的情况下,启动消费端会报错,顺序为:创建主题,启动消费端关注主题,生产端生产消息到主题

© 著作权归作者所有

共有 人打赏支持
Zero零_度
粉丝 68
博文 1246
码字总数 252959
作品 0
程序员
请教Kafka在window下运行错误的问题

我在window server 2012 上运行 kafka 的时候 出现这个错误 我本地是可以的,但是在线上服务器就出错了 应该如何解决,请教下 !! Java的版本: C:UsersAdministrator>java -version java ...

Macrotea
04/21
0
0
Zookeeper+Kafka集群搭建

Zookeeper集群搭建 Kafka集群是把状态保存在Zookeeper中的,首先要搭建Zookeeper集群。 1、软件环境 (3台服务器-我的测试) 192.168.30.204 server1 192.168.30.205 server2 192.168.30.206...

qianghong000
06/26
0
0
kafka开源管理工具Kafka-manager部署

  简介      Kafka-manager是雅虎开源的apache-kafka管理工具,是用Scala写,所以在web页面进行操作即可。   Githubhttps://github.com/yahoo/kafka-manager   主要特性:   1. ...

linux运维菜
04/19
0
0
Grafana、elasticsearch、kafka、logstash和pinpoint结合

一、Grafana 1)下载安装 wget https://s3-us-west-2.amazonaws.com/grafana-releases/release/grafana-4.2.0-1.x8664.rpm sudo yum localinstall grafana-4.2.0-1.x86_64.rpm 2)启动 serv......

半船水
2017/10/25
0
0
Windows 安装运行 Apache Kafka 教程

下面是分步指南,教你如何在Windows OS上安装运行Apache Zookeeper和Apache Kafka。 简介 本文讲述了如何在Windows OS上配置并启动Apache Kafka,这篇指南将会指导你安装Java和Apache Zookee...

大数据之路
2012/08/26
0
0

没有更多内容

加载失败,请刷新页面

加载更多

【大福利】极客时间专栏返现二维码大汇总

我已经购买了如下专栏,大家通过我的二维码你可以获得一定额度的返现! 然后,再给大家来个福利,只要你通过我的二维码购买,并且关注了【飞鱼说编程】公众号,可以加我微信或者私聊我,我再...

飞鱼说编程
今天
1
0
Spring5对比Spring3.2源码之容器的基本实现

最近看了《Spring源码深度解析》,该书是基于Spring3.2版本的,其中关于第二章容器的基本实现部分,目前spring5的实现方式已有较大改变。 Spring3.2的实现: public void testSimpleLoad(){...

Ilike_Java
今天
1
0
【王阳明心学语录】-001

1.“破山中贼易,破心中贼难。” 2.“夫万事万物之理不外于吾心。” 3.“心即理也。”“心外无理,心外无物,心外无事。” 4.“人心之得其正者即道心;道心之失其正者即人心。” 5.“无...

卯金刀GG
今天
2
0
OSChina 周三乱弹 —— 我们无法成为野兽

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @ _刚刚好: 霸王洗发水这波很骚 手机党少年们想听歌,请使劲儿戳(这里) hahahahahahh @嘻酱:居然忘了喝水。 让你喝可乐的话, 你准忘不了...

小小编辑
今天
9
0
vm GC 日志 配置及查看

-XX:+PrintGCDetails 打印 gc 日志 -XX:+PrintTenuringDistribution 监控晋升分布 -XX:+PrintGCTimeStamps 包含时间戳 -XX:+printGCDateStamps 包含时间 -Xloggc:<filename> 可以将数据保存为......

Canaan_
昨天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部