文档章节

kafka的使用

o
 osc_odyg6b92
发布于 2018/07/13 13:59
字数 939
阅读 29
收藏 1

行业解决方案、产品招募中!想赚钱就来传!>>>

kafka基于zookeeper。

需要安装kafka、zookeeper。

安装方法参考:http://tzz6.iteye.com/blog/2401197

启动zookeeper:点击zkServer.cmd启动zookeeper。

启动kafka:

如果启动报错:

启动kafka的时候报错:

ERROR Error while deleting the clean shutdown file in dir E:\kafka_2.11-1.0.0\tmp\kafka-logs (kafka.server.LogDirFailureChannel)
java.nio.file.FileSystemException: E:\kafka_2.11-1.0.0\tmp\kafka-logs\__consumer_offsets-9\00000000000000000000.timeindex: 另一个程序正在使用此文件,进程无法访问。

解决办法:

删除日志:

日志的路径在kafka的文件中找到server.properties:log.dirs=/tmp/kafka-logs。删除tmp文件夹下的kafka-logs文件夹。重启kafka即可。

kafka配置:

kafka-beans.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:mvc="http://www.springframework.org/schema/mvc" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd"> <context:component-scan base-package="com.unionpay.producer"></context:component-scan> <context:component-scan base-package="com.unionpay.consumer"></context:component-scan> <bean id="kafkaProducerDemo" class="com.test.www.unionpay.producer.KafkaProducerDemo"> <property name="properties"> <props> <prop key="topic">my-replicated-topic</prop> <prop key="bootstrap.servers">127.0.0.1:9092</prop> <prop key="acks">all</prop> <prop key="key.serializer">org.apache.kafka.common.serialization.StringSerializer </prop> <prop key="value.serializer">org.apache.kafka.common.serialization.StringSerializer </prop> <prop key="buffer.memory">33554432</prop> </props> </property> </bean> <bean id="kafkaConsumerDemo" class="com.test.www.unionpay.consumer.KafkaConsumerDemo"> <property name="props"> <props> <prop key="topic">my-replicated-topic</prop> <prop key="bootstrap.servers">127.0.0.1:9092</prop> <prop key="group.id">group1</prop> <prop key="enable.auto.commit">true</prop> <prop key="auto.commit.interval.ms">1000</prop> <prop key="session.timeout.ms">30000</prop> <prop key="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer </prop> <prop key="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer </prop> </props> </property> </bean> </beans>

 redis.properties:

# 控制一个pool可分配多少个jedis实例
redis.pool.maxTotal=1000
# 控制一个pool最多有多少个状态为idle(空闲)的jedis实例
redis.pool.maxIdle=200
# 表示当borrow一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException
redis.pool.maxWaitMillis=2000
#在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的
redis.pool.testOnBorrow=true
# redis 单机
# 单机 host
jedis.host=127.0.0.1
# 单机 port
jedis.port=6379

KafkaProducerDemo.java:

package com.test.www.unionpay.producer;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; public class KafkaProducerDemo { Properties properties; public KafkaProducerDemo() { } public KafkaProducerDemo(Properties properties) { super(); this.properties = properties; } public Properties getProperties() { return properties; } public void setProperties(Properties properties) { this.properties = properties; } public void sendMessage(String msg) { KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); ProducerRecord<String, String> record = new ProducerRecord<String, String>(properties.getProperty("topic"),msg); producer.send(record); producer.close(); } }

KafkaConsumerDemo.java:

package com.test.www.unionpay.consumer;

import java.util.Arrays;
import java.util.Properties; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class KafkaConsumerDemo { private Properties props; public KafkaConsumerDemo() { } public KafkaConsumerDemo(Properties props) { super(); this.props = props; } public Properties getProps() { return props; } public void setProps(Properties props) { this.props = props; } public String receive(){ KafkaConsumer<String,String> consumer = new KafkaConsumer<String,String>(props); consumer.subscribe(Arrays.asList(props.getProperty("topic"))); String msg = ""; while(true){ ConsumerRecords<String,String> consumerRecords = consumer.poll(100); for(ConsumerRecord<String, String> consumerRecord:consumerRecords){ msg += consumerRecord.value(); } consumer.close(); return msg; } } }

KafkaController.java:

package com.test.www.web.controller;

import java.text.SimpleDateFormat;
import java.util.Date; import javax.annotation.Resource; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.servlet.ModelAndView; import com.test.www.unionpay.consumer.KafkaConsumerDemo; import com.test.www.unionpay.producer.KafkaProducerDemo; @Controller public class KafkaController { @Resource(name = "kafkaProducerDemo") KafkaProducerDemo producer; @Resource(name = "kafkaConsumerDemo") KafkaConsumerDemo consumer; @RequestMapping(value = "/welcome") public ModelAndView welcome() { System.out.println("--------welcome--------"); ModelAndView mv = new ModelAndView(); mv.setViewName("welcome"); return mv; } @RequestMapping(value = "/sendmessage", method = RequestMethod.GET) public ModelAndView sendMessage() { System.out.println("--------sendmessage--------"); Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String now = sdf.format(date); ModelAndView mv = new ModelAndView(); mv.addObject("time", now); mv.setViewName("kafka_send"); return mv; } @RequestMapping(value = "/onsend", method = RequestMethod.POST) public ModelAndView onsend(@RequestParam("message") String msg) { System.out.println("--------onsend--------"); producer.sendMessage(msg); ModelAndView mv = new ModelAndView(); mv.setViewName("welcome"); return mv; } @RequestMapping(value = "/receive") public ModelAndView receive() { System.out.println("--------receive--------"); String msg = consumer.receive(); ModelAndView mv = new ModelAndView(); mv.addObject("msg", msg); mv.setViewName("kafka_receive"); return mv; } }

页面:

welcome.jsp:

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title>welcome</title> </head> <body> <h1>Welcome</h1> <h2><a href="sendmessage.html">Send a Message</a></h2> <h2><a href="receive.html">Get a Message</a></h2> </body> </html>

kafka_send.jsp:

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title>kafka_send</title> </head> <body> <h1>Send a Message</h1> <form action="onsend.html" method="post"> MessageText:<textarea name="message">${time}</textarea> <br> <input type="submit" value="Submit"> </form> <h2><a href="welcome.html">RETURN HOME</a></h2> </body> </html>

kafka_receive.jsp:

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> <title>kafka_receive</title> </head> <body> <h1>Kafka_Reveive!!!</h1> <h2>Receive Message : ${msg}</h2> <h2><a href="welcome.html">RETURN HOME</a></h2> </body> </html>

效果图:

如图,kafka发送消息、接受消息运行成功。

 

o
粉丝 1
博文 500
码字总数 0
作品 0
私信 提问
加载中
请先登录后再评论。
CDH5: 使用parcels配置lzo

一、Parcel 部署步骤 1 下载: 首先需要下载 Parcel。下载完成后,Parcel 将驻留在 Cloudera Manager 主机的本地目录中。 2 分配: Parcel 下载后,将分配到群集中的所有主机上并解压缩。 3 激...

cloud-coder
2014/07/01
6.8K
1
使用IBPP在C++中操作FireBird/Interbase数据库

FireBird是一种小巧的关系型数据库,它有多种版本,包括服务器版(象MySQL),单机版(象Access)以及嵌入式(象SQLite)。而且不管是服务器版还是嵌入式版它都完整支持视图、触发器、存储过程等...

Waiting4you
2009/07/26
3.8K
2
使用CImg处理三维图像

http://www.cppprog.com/2009/0429/110.html

Waiting4you
2009/05/05
1.6K
0
在C++中使用Lua

http://www.cppprog.com/2009/0209/62.html

Waiting4you
2009/05/05
1K
0
分享红薯推荐的数据库管理工具使用技巧

嘿嘿。。不好意思呀,打着红薯的旗号把你骗进来了。。 数据库管理工具 navicat 我们在查询大量数据的时候可能会想知道具体是哪儿出了毛病,影响 --- 性 ----能... 今天红薯介绍navicat给我。...

用户已屏蔽
2011/04/19
495
2

没有更多内容

加载失败,请刷新页面

加载更多

Vim清除最后一个搜索突出显示 - Vim clear last search highlighting

问题: Want to improve this post? 想要改善这篇文章吗? Provide detailed answers to this question, including citations and an explanation of why your answer is correct. 提供此问题......

技术盛宴
50分钟前
13
0
原子属性和非原子属性有什么区别? - What's the difference between the atomic and nonatomic attributes?

问题: What do atomic and nonatomic mean in property declarations? 属性声明中atomic和nonatomic是什么意思? @property(nonatomic, retain) UITextField *userName;@property(atomic, ......

fyin1314
今天
7
0
马化腾每天刷 Leetcode?代码你打算写到几岁?

本文作者:o****0 前几天,一张未证真伪的截图流传,图中显示马化腾几乎每天都会在 Leetcode 上提交代码。 截图还贴出一个 Leetcode 账户地址。该地址的头像已从马化腾的照片换成腾讯 logo,...

百度开发者中心
前天
13
0
滴滴 3000+ Kylin Cube 背后的实践经验揭秘

本次分享主要有三个部分:Kylin 在滴滴的整体应用、架构的实践经验、滴滴全局字典最新版本的实现以及 Kylin 最新实时 OLAP 探索经验分享。 Kylin 在滴滴的应用&架构 Kylin 在滴滴的三类应用场...

浪尖聊大数据
昨天
9
0
ssh“权限太开放”错误 - ssh “permissions are too open” error

问题: I had a problem with my mac where I couldn't save any kind of file on the disk anymore. 我的Mac出现问题,无法再在磁盘上保存任何类型的文件。 I had to reboot OSX lion and r......

javail
今天
19
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部