kafka java 示例

原创
2017/04/21 13:28
阅读数 124

producer Java 示例

import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;


import java.util.Properties;
import java.util.Scanner;

/**
 * Created by hadoop on 17-4-20.
 */
public class TestProducer {
    private  kafka.javaapi.producer.Producer<Integer, String> producer;
    private  String topic ;
    private  Properties props = new Properties();

    public TestProducer(String topic)
    {
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("metadata.broker.list", "localhost:9092");
        // Use random partitioner. Don't need the key type. Just set it to Integer.
        // The message is of type String.
        producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
        this.topic = topic;
    }

    public static void main(String[] args){
        String topic = "test";
        TestProducer testProducer = new TestProducer(topic);
        Scanner s = new Scanner(System.in);
        System.out.println("please input:");
        while (true) {
            String line = s.nextLine();
            if (line.equals("exit")) break;
            System.out.println(">>>" + line);
            testProducer.send(line);
        }

    }

    public void send(String messagestr){
        String messageStr = messagestr;
        producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
    }

}

consumer Java 示例

import bonc.kafka.test.KafkaProperties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * Created by hadoop on 17-4-21.
 */
public class TestConsumer {
    private final ConsumerConnector consumer;
    private final String topic;

    public TestConsumer(String topic)
    {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig());
        this.topic = topic;
    }

    private static ConsumerConfig createConsumerConfig()
    {
        Properties props = new Properties();
        props.put("zookeeper.connect", "127.0.0.1:2181");
        props.put("group.id", "group1");
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset","smallest");
        return new ConsumerConfig(props);

    }

    public static void main(String[] args){
        String topic = "test";
        TestConsumer testConsumer = new TestConsumer(topic);
        testConsumer.receive();


    }
    public void receive() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream =  consumerMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while(it.hasNext())
            System.out.println(new String(it.next().message()));
    }
}
展开阅读全文
打赏
0
1 收藏
分享
加载中
更多评论
打赏
0 评论
1 收藏
0
分享
返回顶部
顶部