文档章节

java操作kafka

Zero零_度
 Zero零_度
发布于 2016/03/07 11:46
字数 268
阅读 88
收藏 0

maven依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>0.9.0.0</version>
</dependency>

生产者:

package com.sniper.kafka.helloworld;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;

public class KafkaProducer {
    
    public static void main(String[] args) {
        Properties prop = new Properties();
        //生成者可以不设置zookeeper
        //prop.setProperty("zookeeper.connect", "sniper5:2181");
        prop.setProperty("serializer.class", StringEncoder.class.getName());
        prop.setProperty("metadata.broker.list", "sniper5:9092, sniper6:9092, sniper7:9092");
        
        Producer<Integer, String> producer = new Producer<Integer, String>(new ProducerConfig(prop));
        
        for(int i=0; i<3; i++) {
            KeyedMessage<Integer, String> msg = new KeyedMessage<Integer, String>("test1", "hi-" + i);
            producer.send(msg);
        }
        
        producer.close();
        
    }
    
}

消费者:

package com.sniper.kafka.helloworld;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaConsumer {
    
    public static void main(String[] args) {
        Properties prop = new Properties();
        prop.setProperty("zookeeper.connect", "sniper5:2181,sniper6:2181,sniper7:2181");
        prop.put("zk.connectiontimeout.ms", "1000000");  
        prop.setProperty("group.id", "group1");
        
        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(prop));
        
        Map<String, Integer> map = new HashMap<String, Integer>();
        map.put("test1", 1);
        
        Map<String, List<KafkaStream<byte[], byte[]>>> msgStreams = consumer.createMessageStreams(map);
        KafkaStream<byte[], byte[]> kafkaStreams = msgStreams.get("test1").get(0);
        ConsumerIterator<byte[], byte[]> iterator = kafkaStreams.iterator();
        while(iterator.hasNext()) {
            String msg = new String(iterator.next().message());
            System.out.println(msg);
        }
    }
    
}

遇到问题:

1、消费端设置zookeeper集群,逗号之间不允许有空格

2、没有主题的情况下,启动消费端会报错,顺序为:创建主题,启动消费端关注主题,生产端生产消息到主题

© 著作权归作者所有

共有 人打赏支持
Zero零_度
粉丝 69
博文 1253
码字总数 256597
作品 0
程序员
私信 提问
kafka开源管理工具Kafka-manager部署

  简介      Kafka-manager是雅虎开源的apache-kafka管理工具,是用Scala写,所以在web页面进行操作即可。   Githubhttps://github.com/yahoo/kafka-manager   主要特性:   1. ...

linux运维菜
2018/04/19
0
0
Grafana、elasticsearch、kafka、logstash和pinpoint结合

一、Grafana 1)下载安装 wget https://s3-us-west-2.amazonaws.com/grafana-releases/release/grafana-4.2.0-1.x8664.rpm sudo yum localinstall grafana-4.2.0-1.x86_64.rpm 2)启动 serv......

半船水
2017/10/25
0
0
不使用 Cygwin 的情况下在Windows 运行 Apache Kafka

引言 本教程的目的是提供在 Windows 操作系统上运行 Apache Kafka 的一个手把手的指南。本指南也将提供设置 Java 与 ZooKeeper 的指导。Apache kafka 是一个快速且可伸缩的消息队列系统,具有...

oschina
2016/01/08
3.9K
9
请教Kafka在window下运行错误的问题

我在window server 2012 上运行 kafka 的时候 出现这个错误 我本地是可以的,但是在线上服务器就出错了 应该如何解决,请教下 !! Java的版本: C:\Users\Administrator>java -version jav...

Macrotea
2018/04/21
311
0
kafka开发环境搭建

@FrankHui 你好,想跟你请教个问题:kafka开发环境搭建时,在windows下运行java项目,报:Unable to connect to zookeeper server within timeout: 6000,在windows下ping liunx地址是可以p...

yang009ww
2013/03/08
7.6K
5

没有更多内容

加载失败,请刷新页面

加载更多

Neo 虚拟机

上一篇《Neo 编译器》中说明了Neo编译器是怎么把CIL转成neo虚拟机的opcode,那么vm虚拟机又是怎么处理这些代码的,这篇文章我们看一下虚拟机的代码。 框架 虚拟机所处的位置 在框架图中,我们...

NEO-FANS
28分钟前
1
0
TiDB-Lightning Toolset & TiDB-DM 正式开源,前排开“坑”、PR 走起!

在刚刚结束的 TiDB DevCon 2019 上,我们宣布将大家期待已久的 TiDB-Ligthning Toolset 和 TiDB-DM 开源(惊不惊喜、意不意外?!),感兴趣的小伙伴们赶紧前排关注一波,开“坑(issues)”...

TiDB
42分钟前
2
0
人人都可以做深度学习应用:入门篇

本文由云+社区发表 作者:徐汉彬 一、人工智能和新科技革命 2017年围棋界发生了一件比较重要事,Master(Alphago)以60连胜横扫天下,击败各路世界冠军,人工智能以气势如虹的姿态出现在我们...

腾讯云加社区
46分钟前
1
0
C++ RAII

C++ RAII RAII是resource acquisition is initialization的缩写,意为“资源获取即初始化”。它是C++之父Bjarne Stroustrup提出的设计理念,其核心是把资源和对象的生命周期绑定,对象创建获...

mskk
49分钟前
1
0
web.xml is missing and is set to true一步解决

次报错说的是在WebContent/WEB-INF下面没有web.xml,而默认web.xml不在此路径,需要重新指定路径,操作如下: 先取消Dynamic Web Module勾选,点击apply,然后再勾上Dynamic Web Module,此时...

宇昕
今天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部