kafka, zeromq demo
kafka, zeromq demo
git.toutiao.im 发表于1个月前
kafka, zeromq demo
  • 发表于 1个月前
  • 阅读 24
  • 收藏 0
  • 点赞 0
  • 评论 0

腾讯云 十分钟定制你的第一个小程序>>>   

kafka:

修改bin/windows/kafka-server-start.bat set KAFKA_HEAP_OPTS=-Xmx512M -Xms512M<br>

server.properties port=9092<br>

启动zk zookeeper-server-start.bat ../../config/zookeeper.properties<br>

启动server kafka-server-start.bat ../../config/server.properties<br>

 

 

Dependency代码 

 收藏代码

  1. <dependency>  
  2.             <groupId>org.apache.kafka</groupId>  
  3.             <artifactId>kafka_2.8.2</artifactId>  
  4.             <version>0.8.0</version>  
  5.             <exclusions>  
  6.                 <exclusion>  
  7.                     <groupId>log4j</groupId>  
  8.                     <artifactId>log4j</artifactId>  
  9.                 </exclusion>  
  10.             </exclusions>  
  11.         </dependency>  
  12.         <dependency>  
  13.             <groupId>org.scala-lang</groupId>  
  14.             <artifactId>scala-library</artifactId>  
  15.             <version>2.8.2</version>  
  16.         </dependency>  
  17.         <dependency>  
  18.             <groupId>com.yammer.metrics</groupId>  
  19.             <artifactId>metrics-core</artifactId>  
  20.             <version>2.2.0</version>  
  21.         </dependency>  
  22.         <dependency>  
  23.             <groupId>com.101tec</groupId>  
  24.             <artifactId>zkclient</artifactId>  
  25.             <version>0.3</version>  
  26.         </dependency>  

 

 

Producer:

 

Java代码 

 收藏代码

  1. import kafka.javaapi.producer.Producer;  
  2. import kafka.producer.KeyedMessage;  
  3. import kafka.producer.ProducerConfig;  
  4.   
  5. import java.util.Properties;  
  6.   
  7. public class KafkaProducer {  
  8.   
  9.     public static void main(String[] args) {  
  10.         Properties props = new Properties();  
  11.         props.put("metadata.broker.list", "127.0.0.1:9092");  
  12.         props.put("serializer.class", "kafka.serializer.StringEncoder");  
  13.         props.put("key.serializer.class", "kafka.serializer.StringEncoder");  
  14.         props.put("request.required.acks","-1");  
  15.   
  16.         Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(props));  
  17.   
  18.         int messageNo = 100;  
  19.         final int COUNT = 1000;  
  20.         while (messageNo < COUNT) {  
  21.             String key = String.valueOf(messageNo);  
  22.             String data = "hello kafka message " + key;  
  23.             producer.send(new KeyedMessage<String, String>("TestTopic", key ,data));  
  24.             System.out.println(data);  
  25.             messageNo ++;  
  26.         }  
  27.     }    
  28. }  

 Consumer:

 

 

Java代码 

 收藏代码

  1. import kafka.consumer.ConsumerConfig;  
  2. import kafka.consumer.ConsumerIterator;  
  3. import kafka.consumer.KafkaStream;  
  4. import kafka.javaapi.consumer.ConsumerConnector;  
  5. import kafka.serializer.StringDecoder;  
  6. import kafka.utils.VerifiableProperties;  
  7.   
  8. import java.util.HashMap;  
  9. import java.util.List;  
  10. import java.util.Map;  
  11. import java.util.Properties;  
  12.   
  13. public class KafkaConsumer {  
  14.   
  15.     public static void main(String[] args) {  
  16.         Properties props = new Properties();  
  17.         props.put("zookeeper.connect", "127.0.0.1:2181");  
  18.         //group 代表一个消费组  
  19.         props.put("group.id", "jd-group");  
  20.         props.put("zookeeper.session.timeout.ms", "4000");  
  21.         props.put("zookeeper.sync.time.ms", "200");  
  22.         props.put("auto.commit.interval.ms", "1000");  
  23.         props.put("auto.offset.reset", "smallest");  
  24.         props.put("serializer.class", "kafka.serializer.StringEncoder");  
  25.   
  26.         ConsumerConfig config = new ConsumerConfig(props);  
  27.   
  28.         ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);  
  29.   
  30.         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
  31.         topicCountMap.put("TestTopic", 1);  
  32.   
  33.         StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());  
  34.         StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());  
  35.   
  36.         Map<String, List<KafkaStream<String, String>>> consumerMap =  
  37.                 consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);  
  38.         KafkaStream<String, String> stream = consumerMap.get("TestTopic").get(0);  
  39.         ConsumerIterator<String, String> it = stream.iterator();  
  40.         while (it.hasNext()) {  
  41.             System.out.println(it.next().message());  
  42.         }  
  43.         System.out.println("finished");  
  44.     }    
  45. }  

 

 

zeromq:

 

Dependency代码 

 收藏代码

  1. <dependency>  
  2.             <groupId>org.zeromq</groupId>  
  3.             <artifactId>jeromq</artifactId>  
  4.             <version>0.4.2</version>  
  5.         </dependency>  

 

Java代码 

 收藏代码

  1. import org.zeromq.ZMQ;  
  2.   
  3. public class Publisher {  
  4.   
  5.     public static void main(String args[]) {  
  6.         ZMQ.Context context = ZMQ.context(1); // 创建包含一个I/O线程的context  
  7.         ZMQ.Socket publisher = context.socket(ZMQ.PUB);  
  8.         publisher.bind("tcp://*:5555");  
  9.    
  10.         while (!Thread.currentThread ().isInterrupted()) {  
  11.             String message = "toutiao hello";  
  12.             publisher.send(message.getBytes());  
  13.             System.out.println("sent : " + message);  
  14.         }    
  15.    
  16.         publisher.close();    
  17.         context.term();    
  18.     }    
  19. }  

 

 

Java代码 

 收藏代码

  1. import org.zeromq.ZMQ;  
  2.   
  3. public class Subscriber {  
  4.   
  5.     public static void main(String args[]) {    
  6.         for (int j = 0; j < 10; j++) {  
  7.             new Thread(new Runnable(){  
  8.                 public void run() {    
  9.                     ZMQ.Context context = ZMQ.context(1);  
  10.                     ZMQ.Socket subscriber = context.socket(ZMQ.SUB);  
  11.                     subscriber.connect("tcp://127.0.0.1:5555");  
  12.                     subscriber.subscribe("toutiao".getBytes());  
  13.   
  14.                     try {  
  15.                         while (true) {  
  16.                             byte[] message = subscriber.recv();  
  17.                             System.out.println(Thread.currentThread().getName() + " receive : " + new String(message));  
  18.                         }  
  19.                     } finally {  
  20.                         subscriber.close();  
  21.                         context.term();  
  22.                     }  
  23.                 }  
  24.             }).start();    
  25.         }  
  26.     }    
  27. }  

 

共有 人打赏支持
粉丝 133
博文 273
码字总数 784444
×
git.toutiao.im
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: