文档章节

【SpringBoot整合】1.整合kafka

Areya
 Areya
发布于 03/14 09:34
字数 706
阅读 99
收藏 5

简介

kafka简介。

再次之前,先安装kafka服务。

参考文档:

  1. spring for kafka文档

  2. spring boot for kafka文档

1、依赖包

    <!-- kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

2、配置组件

注意一些配置可以移除到配置文件中。

2.1、配置生产者组件

package com.sunrun.emailanalysis.config;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
@EnableKafka
public class KafkaProducerConfigure {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    /**
     * Kafka配置信息
     * @return
     */
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.21.1.24:9092");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    /**
     * Spring Kafka的template
     * @return
     */
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

}

2.2、配置消费者组件

package com.sunrun.emailanalysis.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.21.1.24:9092");
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return propsMap;
    }
}

定义了ProducerConfig和ConsumerConfig后我们需要实现具体的生产者和消费者。其中,我们在KafkaListenerContainerFactory中使用了ConcurrentKafkaListenerContainer, 我们将使用多线程消费消息。

3、编写实例

定义好了组件后,我们就可以在程序中使用它们了。

3.1、编写Service

package com.sunrun.emailanalysis.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Service
public class MyProducerService {
    @Autowired
    private KafkaTemplate template;

    //发送消息方法
    public void send(String topic, String message) {
        ListenableFuture<SendResult<String, String>> future = template.send(topic,message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                System.out.println("msg OK." + result.toString());
            }

            @Override
            public void onFailure(Throwable ex) {
                System.out.println("msg send failed: ");
            }
        });
    }
}

3.2、编写Controller

package com.sunrun.emailanalysis.controller;

import com.sunrun.emailanalysis.service.MyProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

@Controller
@RequestMapping("kafka")
public class KafkaController {

    @Autowired
    private MyProducerService myProducerService;
    @RequestMapping("index")
    public void index(){
        myProducerService.send("test","你好啊,整合Spring KAFKA");
    }
}

3.3、编写监听者

我们可以使用我们的consumer配置创建消费者组件(@Compenent、@Bean),Spring项目启动的时候加载消费者。

还可以直接使用@KafkaListener(topics = {"topicName"})注解。

package com.sunrun.emailanalysis.cosumer;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class EAConsumer {
    @KafkaListener(topics = {"test"})
    public void receive(String message){
        System.out.println("接收到信息:" + message);
    }
}

在spring kafka中,可以把cosumer看做是listener。

接下来启动SpringBoot项目,输入网址

http://127.0.0.1/kafka/index

即可激活生产消息的方式,在控制台由监听程序打印出发送的消息

msg OK.SendResult [producerRecord=ProducerRecord(topic=test, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=你好啊,整合Spring KAFKA, timestamp=null), recordMetadata=test-0@6]
接收到信息:你好啊,整合Spring KAFKA

总结

1、参考文档:

© 著作权归作者所有

Areya
粉丝 28
博文 95
码字总数 164784
作品 0
广州
私信 提问
springboot整合kafka应用

1、kafka在消息传递的使用非常普遍,相对于activemq来说kafka的分布式管理和使用更加灵活。 2、activemq的搭建和使用可以参考:   activemq搭建和springmvc的整合:http://www.cnblogs.co...

小不点丶
2017/11/09
0
0
搞懂分布式技术23:SpringBoot Kafka 整合使用

Spring Boot系列文章(一):SpringBoot Kafka 整合使用2018-01-05 ×文章目录 1. 前提 2. 创建项目 3. Kafka 设置 4. 运行 5. 关注我 6. 最后 前提 假设你了解过 SpringBoot 和 Kafka。 1、...

你的猫大哥
2018/07/05
0
0
SpringBoot 学习二:操作数据库

本文将从以下几个方面介绍: 前言 配置数据源 SpringBoot 整合 Mybatis SpringBoot 整合 JdbcTemplate SpringBoot 整合 Redis 前言 在上篇文章 SpringBoot 学习一 中已经学习了 SpringBoot的...

tsmyk0715
2018/09/26
642
0
SpringBoot 整合(六)Security & Oauth2.0(完整篇)

1. 快速实现篇(实现最基本的登录): SpringSecurity 快速实现项目 2. 企业级封装篇 我的 Spring Security 文集 SpringBoot 整合 Security(一)实现用户认证并判断返回json还是view SpringBo...

FantJ
2018/05/22
0
0
关于springboot整合kafka遇到的问题

版本信息: kafka:2.9.2-0.8.11 springboot:2.0.4 zookeeper:3.4.6 我在linux里面测了kafka是否成功安装配置是可以用的,然后在spring boot里面的配置 然后我启动springboot的入口类 下面是启...

喻湘东
2018/09/07
1K
1

没有更多内容

加载失败,请刷新页面

加载更多

聊聊nacos的NacosDiscoveryAutoConfiguration

序 本文主要研究一下nacos的NacosDiscoveryAutoConfiguration NacosDiscoveryAutoConfiguration nacos-spring-boot-project/nacos-discovery-spring-boot-autoconfigure/src/main/java/com/a......

go4it
18分钟前
4
0
如何保证消息的顺序性?

面试题 如何保证消息的顺序性? 面试官心理分析 其实这个也是用 MQ 的时候必问的话题,第一看看你了不了解顺序这个事儿?第二看看你有没有办法保证消息是有顺序的?这是生产系统中常见的问题...

米兜
23分钟前
6
0
变量求解:RMT函数

1. RMT函数:计算贷款每月付款额 = PMT (贷款利率,付款限期,本金) 2.单变量求解: 数据选项卡----> 模拟分析------>单变量求解:单变量求解前必须先执行PMT函数...

东方墨天
24分钟前
2
2
网络安全市场需求

最近,网络安全技能差距的热门话题流传开来。技能差距经常被紧急讨论,可以看出它在实践中的作用是很大的。但信息安全是一门广泛的学科,所以在谈论“技能差距”时需要更具体。有专家表示,真...

linuxCool
44分钟前
3
0
饿了么快应用初体验

作者:饿了么 顾诚 为什么我们选择了快应用 在很长一段时间里,原生饿了么应用对于新用户来说体验成本略高,对于迫切想要点餐的老用户操作有点繁琐;而 Web 版的饿了么应用在体验、速度、功能...

前端老手
46分钟前
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部