文档章节

kafka java 示例

Yulong_
 Yulong_
发布于 2017/04/21 13:28
字数 269
阅读 115
收藏 1

producer Java 示例

import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;


import java.util.Properties;
import java.util.Scanner;

/**
 * Created by hadoop on 17-4-20.
 */
public class TestProducer {
    private  kafka.javaapi.producer.Producer<Integer, String> producer;
    private  String topic ;
    private  Properties props = new Properties();

    public TestProducer(String topic)
    {
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("metadata.broker.list", "localhost:9092");
        // Use random partitioner. Don't need the key type. Just set it to Integer.
        // The message is of type String.
        producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
        this.topic = topic;
    }

    public static void main(String[] args){
        String topic = "test";
        TestProducer testProducer = new TestProducer(topic);
        Scanner s = new Scanner(System.in);
        System.out.println("please input:");
        while (true) {
            String line = s.nextLine();
            if (line.equals("exit")) break;
            System.out.println(">>>" + line);
            testProducer.send(line);
        }

    }

    public void send(String messagestr){
        String messageStr = messagestr;
        producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
    }

}

consumer Java 示例

import bonc.kafka.test.KafkaProperties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * Created by hadoop on 17-4-21.
 */
public class TestConsumer {
    private final ConsumerConnector consumer;
    private final String topic;

    public TestConsumer(String topic)
    {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig());
        this.topic = topic;
    }

    private static ConsumerConfig createConsumerConfig()
    {
        Properties props = new Properties();
        props.put("zookeeper.connect", "127.0.0.1:2181");
        props.put("group.id", "group1");
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset","smallest");
        return new ConsumerConfig(props);

    }

    public static void main(String[] args){
        String topic = "test";
        TestConsumer testConsumer = new TestConsumer(topic);
        testConsumer.receive();


    }
    public void receive() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while(it.hasNext())
            System.out.println(new String(it.next().message()));
    }
}

© 著作权归作者所有

上一篇: Kafka MirrorMaker
下一篇: kafka sink
Yulong_
粉丝 10
博文 145
码字总数 253510
作品 0
朝阳
部门经理
私信 提问
Kafka连接器深度解读之JDBC源连接器

在现实业务中,Kafka经常会遇到的一个集成场景就是,从数据库获取数据,因为关系数据库是一个非常丰富的事件源。数据库中的现有数据以及对该数据的任何更改都可以流式传输到Kafka主题中,在这...

李玉珏
03/12
1K
4
Grafana、elasticsearch、kafka、logstash和pinpoint结合

一、Grafana 1)下载安装 wget https://s3-us-west-2.amazonaws.com/grafana-releases/release/grafana-4.2.0-1.x8664.rpm sudo yum localinstall grafana-4.2.0-1.x86_64.rpm 2)启动 serv......

半船水
2017/10/25
263
0
不使用 Cygwin 的情况下在Windows 运行 Apache Kafka

引言 本教程的目的是提供在 Windows 操作系统上运行 Apache Kafka 的一个手把手的指南。本指南也将提供设置 Java 与 ZooKeeper 的指导。Apache kafka 是一个快速且可伸缩的消息队列系统,具有...

oschina
2016/01/08
4K
9
Kafka与操作系统适配性研究

在《Apache Kafka实战》一书中,提到了操作系统与Kafka的适配性。涉及的的epoll与select问题,以及linux零拷贝技术较为重要,因此在此处将操作系统相关特性部分进行记录。 clients 底层网络库...

analanxingde
04/24
0
0
请教Kafka在window下运行错误的问题

我在window server 2012 上运行 kafka 的时候 出现这个错误 我本地是可以的,但是在线上服务器就出错了 应该如何解决,请教下 !! Java的版本: C:\Users\Administrator>java -version jav...

Macrotea
2018/04/21
653
0

没有更多内容

加载失败,请刷新页面

加载更多

OpenStack 简介和几种安装方式总结

OpenStack :是一个由NASA和Rackspace合作研发并发起的,以Apache许可证授权的自由软件和开放源代码项目。项目目标是提供实施简单、可大规模扩展、丰富、标准统一的云计算管理平台。OpenSta...

小海bug
昨天
5
0
DDD(五)

1、引言 之前学习了解了DDD中实体这一概念,那么接下来需要了解的就是值对象、唯一标识。值对象,值就是数字1、2、3,字符串“1”,“2”,“3”,值时对象的特征,对象是一个事物的具体描述...

MrYuZixian
昨天
6
0
数据库中间件MyCat

什么是MyCat? 查看官网的介绍是这样说的 一个彻底开源的,面向企业应用开发的大数据库集群 支持事务、ACID、可以替代MySQL的加强版数据库 一个可以视为MySQL集群的企业级数据库,用来替代昂贵...

沉浮_
昨天
6
0
解决Mac下VSCode打开zsh乱码

1.乱码问题 iTerm2终端使用Zsh,并且配置Zsh主题,该主题主题需要安装字体来支持箭头效果,在iTerm2中设置这个字体,但是VSCode里这个箭头还是显示乱码。 iTerm2展示如下: VSCode展示如下: 2...

HelloDeveloper
昨天
7
0
常用物流快递单号查询接口种类及对接方法

目前快递查询接口有两种方式可以对接,一是和顺丰、圆通、中通、天天、韵达、德邦这些快递公司一一对接接口,二是和快递鸟这样第三方集成接口一次性对接多家常用快递。第一种耗费时间长,但是...

程序的小猿
昨天
10
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部