文档章节

spring kakfa集成

Mr-Bird-Lee
 Mr-Bird-Lee
发布于 2017/09/05 11:15
字数 1652
阅读 18
收藏 0
点赞 0
评论 0

一、生产端

1.1 kafka-producer.xml配置说明

<!-- spring的属性加载器,加载多个properties文件中的属性 ,
 如果只有一个properties文件则用<context />就行了,用了这个加载器过后不用在其他xml中再使用了-->
<bean id="propertyConfigurer"
 class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
    <property name="locations">
        <list>
            <value>classpath:/properties/kafka-producer.properties</value>
        </list>
    </property>
    <property name="fileEncoding" value="utf-8" />
</bean>
<!-- 定义producer的参数 -->
<bean id="producerProperties" class="java.util.HashMap">
    <constructor-arg>
        <map>
            <entry key="bootstrap.servers" value="${bootstrap.servers}" /> //kafka服务集群
            <entry key="group.id" value="${group.id}" /> //分组
            <entry key="retries" value="${retries}" /> //重试次数
            <entry key="batch.size" value="${batch.size}" /> //批量数量
            <entry key="linger.ms" value="${linger.ms}" />
            <entry key="buffer.memory" value="${buffer.memory}" />
            <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
            <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
        </map>
    </constructor-arg>
</bean>

<!-- 创建kafkatemplate需要使用的producerfactory bean -->
<bean id="producerFactory"
 class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
    <constructor-arg ref="producerProperties" />
</bean>

<!-- 创建kafkatemplate -->
<bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg index="0" ref="producerFactory" />
    <constructor-arg index="1" value="true" />
    <property name="defaultTopic" value="${defaultTopic}" /> //topic名称
</bean>

<bean id="kafkaProducerServer" class="com.rkhd.ienterprise.kafka.producer.KafkaProducerServer">
    <property name="kafkaTemplate" ref="kafkaTemplate"/>
</bean>

1.2 kafka-producer.properties属性文件

bootstrap.servers=192.168.0.75:9092,192.168.0.75:9093,192.168.0.75:9094
group.id=testGroup
retries=1
batch.size=16384
linger.ms=1
buffer.memory=33554432
defaultTopic=topic-test

1.3 生产端接口封装说明:

1)类名:
com.rkhd.ienterprise.kafka.producer.KafkaProducerServer

2)方法:

/**
 * 发送信息(不分区)
 * @param data 要发送的数据
 * @return 返回一个map。如果成功code为0,其他则为失败
 */
public Map<String, Object> sendDefault(Object data);
/**
 * 发送信息(不分区)
 * @param key 要发送的键
 * @param data 要发送的数据
 * @return 返回一个map。如果成功code为0,其他则为失败
 */
public Map<String, Object> sendDefault(Object key, Object data);
/**
 * 发送信息(分区)
 * @param partitionNum 分区数(大于1),请注意分区数是在topic创建的时候就指定了,不能改变了
 * @param key 要发送的键
 * @param data 要发送的数据
 * @return 返回一个map。如果成功code为0,其他则为失败
 */
public Map<String, Object> sendDefault(int partitionNum, Object key, Object data);
/**
 * 发送信息(不分区)
 * @param topic 发送目的topic名称,如果topic为null或者是为"",则会使用xml中配置的defaultTopic
 * @param data 要发送的数据
 * @return 返回一个map。如果成功code为0,其他则为失败
 */
public Map<String, Object> sendMessage(String topic, Object data);
/**
 * 发送信息(不分区)
 * @param topic 发送目的topic名称,如果topic为null或者是为"",则会使用xml中配置的defaultTopic
 * @param key 要发送的键
 * @param data 要发送的数据
 * @return 返回一个map。如果成功code为0,其他则为失败
 * */
public Map<String, Object> sendMessage(String topic, Object key, Object data);
/**
 * 发送信息(分区)
 * @param topic 发送目的topic名称,如果topic为null或者是为"",则会使用xml中配置的defaultTopic
 * @param partitionNum 分区数(大于1),请注意分区数是在topic创建的时候就指定了,不能改变了
 * @param data 要发送的数据
 * @return 返回一个map。如果成功code为0,其他则为失败
 */
public Map<String, Object> sendMessage(String topic, Integer partitionNum, Object data);
/**
 * 发送信息(分区)
 * @param topic 发送目的topic名称,如果topic为null或者是为"",则会使用xml中配置的defaultTopic
 * @param key 要发送的键
 * @param value 要发送的数据
 * @param partitionNum 分区数(大于1),请注意分区数是在topic创建的时候就指定了,不能改变了
 * @return 返回一个map。如果成功code为0,其他则为失败
 * */
public Map<String, Object> sendMessage(String topic, int partitionNum, Object key, Object value);

二、消费端

2.1 kafka-consumer.xml配置说明

<!-- 定义consumer的参数 -->
<bean id="consumerProperties" class="java.util.HashMap">
    <constructor-arg>
        <map>
            <entry key="bootstrap.servers" value="${bootstrap.servers}"/> //kafka服务集群
            <entry key="group.id" value="${group.id}"/> //分组
            <entry key="enable.auto.commit" value="${enable.auto.commit}"/> //是否自动提交
            <entry key="auto.commit.interval.ms" value="${auto.commit.interval.ms}"/> //自动提交间隔时间
            <entry key="session.timeout.ms" value="${session.timeout.ms}"/> //session过期时间
            <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
            <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
        </map>
    </constructor-arg>
</bean>

<!-- 创建consumerFactory bean -->
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
    <constructor-arg>
        <ref bean="consumerProperties"/>
    </constructor-arg>
</bean>

<!-- 实际执行消息消费的类 -->
<bean id="messageListernerConsumerService" class="com.rkhd.ienterprise.mq.client.consumer.generalFormula.GeneralFormulaConsumer"/>

<!-- 消费者容器配置信息 -->
<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
    <constructor-arg value="${topicName}"/>
    <property name="messageListener" ref="messageListernerConsumerService"/>
    <!--<property name="ackMode" value="COUNT"/> //手动提交模式分为三种:(1)模式COUNT:数量达到COUNT时提交;(2)模式TIME:时间达到TIME;(3)模式COUNT_TIME:数量达到COUNT或时间达到TIME是提交;
 
<property name="ackCount" value="90"/>-->
 <!--<property name="ackMode" value="TIME"/>
 <property name="ackTime" value="5000"/>-->
</bean>

<!-- 创建单实例KafkaMessageListenerContainer-->
<!--<bean id="messageListenerContainer_trade" class="org.springframework.kafka.listener.KafkaMessageListenerContainer"
 init-method="doStart">
 <constructor-arg ref="consumerFactory"/>
 <constructor-arg ref="containerProperties_trade"/>
</bean>-->
<!-- 创建多实例ConcurrentMessageListenerContainer-->
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer"
 init-method="doStart" >
    <constructor-arg ref="consumerFactory"/>
    <constructor-arg ref="containerProperties"/>
    <property name="concurrency" value="1"/> //配置消费端数量
</bean>

2.2 kafka-consumer.properties属性文件

bootstrap.servers=192.168.0.75:9092,192.168.0.75:9093,192.168.0.75:9094
group.id=testGroup
enable.auto.commit=false
auto.commit.interval.ms=1000
session.timeout.ms=15000
topicName=ahao-test

2.3 消费端接口封装说明

1)类名:com.rkhd.ienterprise.mq.client.consumer.client.KafkaConsumerClient

2)对外提供抽象方法(根据不同的业务实现):

public abstract void onConsumer(ConsumerRecord<String, String> record);

3)实现说明:各业务线通过继承该类实现该抽象方法;

三、Kafka技术概览

3.1 Kafka的特性

  • 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒
  • 可扩展性:kafka集群支持热扩展
  • 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
  • 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
  • 高并发:支持数千个客户端同时读写
3.2 Kafka架构组件

       Kafka中发布订阅的对象是topic。我们可以为每类数据创建一个topic,把向topic发布消息的客户端称作producer,从topic订阅消息的客户端称作consumer。Producers和                consumers可以同时从多个topic读写数据。一个kafka集群由一个或多个broker服务器组成,它负责持久化和备份具体的kafka消息。

  • topic:消息存放的目录即主题
  • Producer:生产消息到topic的一方
  • Consumer:订阅topic消费消息的一方
  • Broker:Kafka的服务实例就是一个broker

3.3  kafka 应用场景

  • 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
  • 消息系统:解耦和生产者和消费者、缓存消息等。
  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
  • 流式处理:比如spark streaming和storm
  • 事件源

© 著作权归作者所有

共有 人打赏支持
Mr-Bird-Lee
粉丝 1
博文 20
码字总数 33438
作品 0
海淀
程序员
Spring Cloud简介/版本选择/ZooKeeper例子搭建简单说明

一、什么是Spring Cloud 官方的说法就是Spring Cloud 给开发者提供一套按照一定套路快速开发分布式系统的工具。 具体点就是Spring Boot实现的微服务架构开发工具。它为微服务架构中涉及的配置...

easonjim ⋅ 2017/09/18 ⋅ 0

Spring集成Redis方案(spring-data-redis)(基于Jedis的单机模式)(待实践)

说明:请注意Spring Data Redis的版本以及Spring的版本!最新版本的Spring Data Redis已经去除Jedis的依赖包,需要自行引入,这个是个坑点。并且会与一些低版本的Spring有冲突,要看官方文档...

easonjim ⋅ 2017/10/05 ⋅ 0

Spring Boot集成Spring Data Reids和Spring Session实现Session共享

首先,需要先集成Redis的支持,参考:http://www.cnblogs.com/EasonJim/p/7805665.html Spring Boot集成Spring Data Redis+Spring Session非常的简单,也不用担心版本问题,只需要引入相应的...

easonjim ⋅ 2017/11/10 ⋅ 0

Mybatis-Generator插件的使用与Spring集成Mybatis的配置

Mybatis-Generator插件 Mybatis-Generator是一个用于自动生成dao层接口、pojo以及mapper xml的一个Mybatis插件,该插件有三种用法:命令行运行、Eclipse插件、maven插件。个人觉得maven插件最...

ZeroOne01 ⋅ 04/15 ⋅ 0

Spring Framework体系结构简介

说明:以下转自Spring官方文档,用的版本为4.3.11版本。 一、引用官方文档 所述核心容器由以下部分组成, ,,,和(弹簧表达式语言)模块。 的和模块提供框架的基本零件,包括IOC和依赖注入...

easonjim ⋅ 2017/09/16 ⋅ 0

一文读懂 Spring Boot、微服务架构和大数据治理三者之间的故事

微服务架构 微服务的诞生并非偶然,它是在互联网高速发展,技术日新月异的变化以及传统架构无法适应快速变化等多重因素的推动下诞生的产物。互联网时代的产品通常有两类特点:需求变化快和用...

ityouknow ⋅ 05/16 ⋅ 0

一文读懂 Spring Boot、微服务架构和大数据治理三者之间的故事

微服务的诞生并非偶然,它是在互联网高速发展,技术日新月异的变化以及传统架构无法适应快速变化等多重因素的推动下诞生的产物。互联网时代的产品通常有两类特点:需求变化快和用户群体庞大,...

java高级架构牛人 ⋅ 05/14 ⋅ 0

Spring MVC集成Spring Data Reids和Spring Session实现Session共享

说明:Spring MVC中集成Spring Data Redis和Spring Session时版本是一个坑点,比如最新版本的Spring Data Redis已经不包含Jedis了,需要自行引入。且最新版本的2.0.1会与Spring MVC 4.1.4有冲...

easonjim ⋅ 2017/11/10 ⋅ 0

Spring Boot Starters启动器

Starters是什么? Starters可以理解为启动器,它包含了一系列可以集成到应用里面的依赖包,你可以一站式集成Spring及其他技术,而不需要到处找示例代码和依赖包。如你想使用Spring JPA访问数...

Java技术栈 ⋅ 06/10 ⋅ 0

简化SSM搭建详细分析配置

一直使用SSH2 和 spring boot,最近换工作,使用新框架SpringMVC,带着SSH2的思路学习SpringMVC还是挺容易的,下面分享一下SSM的搭建 总体来说搭建SSM分五步 一:创建maven工程 这里是创建m...

红尾巴的猪 ⋅ 2017/11/10 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Java Web如何操作Cookie的添加修改和删除

创建Cookie对象 Cookie cookie = new Cookie("id", "1"); 修改Cookie值 cookie.setValue("2"); 设置Cookie有效期和删除Cookie cookie.setMaxAge(24*60*60); // Cookie有效时间 co......

二营长意大利炮 ⋅ 今天 ⋅ 0

【每天一个JQuery特效】淡入淡出显示或隐藏窗口

我是JQuery新手爱好者,有时间就练练代码,防止手生,争取每天一个JQuery练习,在这个博客记录下学习的笔记。 本特效主要采用fadeIn()和fadeOut()方法显示淡入淡出的显示效果显示或隐藏元...

Rhymo-Wu ⋅ 今天 ⋅ 0

Spring JDBC使用方法

普通实现: 1、创建数据表customer。 可以使用任何数据库实现,在项目中要引入相应数据库驱动包并配置相应数据库连接。 2、创建Customer pojo。 Customer类的属性对应数据库的属性,除了为每...

霍淇滨 ⋅ 今天 ⋅ 0

Contos 7 安装Jenkins

Jenkins是一款能提高效率的软件,它能帮你把软件开发过程形成工作流,典型的工作流包括以下几个步骤 开发 提交 编译 测试 发布 有了Jenkins的帮助,在这5步中,除了第1步,后续的4步都是自动...

欧虞山 ⋅ 今天 ⋅ 0

revel

revel install go get github.com/revel/revelgo get github.com/revel/cmd create new app revel new git.oschina.net/zdglf/myapp run app revel run git.oschina.net/zdglf/myapp ot......

zdglf ⋅ 今天 ⋅ 0

49. Group Anagrams - LeetCode

Question 49. Group Anagrams Solution 思路:维护一个map,key是输入数组中的字符串(根据字符排好序) Java实现: public List<List<String>> groupAnagrams(String[] strs) { Map<Strin......

yysue ⋅ 今天 ⋅ 0

spring Email

使用spring发Email其实就是使用spring自己封装携带的一个javamail.JavaMailSenderImpl类而已。这个类可以当一个普通的java对象来使用,也可以通过把它配置变成spring Bean的方式然后注入使用...

BobwithB ⋅ 今天 ⋅ 0

spark 整理的一些知识

Spark 知识点 请描述spark RDD原理与特征? RDD全称是resilient distributed dataset(具有弹性的分布式数据集)。一个RDD仅仅是一个分布式的元素集合。在Spark中,所有工作都表示为创建新的...

tuoleisi77 ⋅ 今天 ⋅ 0

思考

时间一天天过感觉自己有在成长吗?最怕的是时光匆匆而过,自己没有收获!下面总结下最近自己的思考。 认识自己 认识另一个自己,人们常说要虚心听取别人意见和建议。然而人往往是很难做到的,...

hello_hp ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部