文档章节

springboot+kafka(ip地址瞎写的)

我真是小菜鸡
 我真是小菜鸡
发布于 2018/10/23 19:11
字数 630
阅读 58
收藏 0

1,首先springboot对kafka的支持也很好,同样是在配置文件中配置好参数,然后就可以直接使用。先说一下,很简单,,,不要怕

2,我用的依赖是

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
</dependency>

配置文件

kafka:
  bootstrap-servers: 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092
  producer:
    retries: 1
    batch-size: 16384
    buffer-memory: 33554432
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.apache.kafka.common.serialization.StringSerializer
    bootstrap-servers: 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092
  consumer:
    bootstrap-servers:  12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092
    enable-auto-commit: true
    auto-offset-reset: latest
    auto-commit-interval: 1000
    group-id: gzj

然后在需要往kafka发送数据的地方,也就是生产者,直接注入即可

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

消费者,监听

@KafkaListener(topics = {"gzj"})
public void receive(String content){
    System.err.println("Receive:" + content);
}

消费者还有另一种方法,

package com.gzj.demo.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * Description
 * <p>
 * </p>
 * DATE 2018/10/23.
 *
 * @author guozhenjiang.
 */
@Component
public class KafkaConsumerTask implements Runnable,InitializingBean {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerTask.class);
    private Thread thread;
    @Resource(name="kafkaConsumer")
    private KafkaConsumer<String,String> kafkaConsumer;

    @Override
    public void run() {
        logger.info("消费数据任务启动");
        while(true){
            try{
                ConsumerRecords<String ,String > records=kafkaConsumer.poll(1000);
                if(records!=null){
                    for(ConsumerRecord<String ,String > record:records){
                        logger.error(record.key());
                        logger.error(record.topic());
                        logger.error(record.value());
                    }
                }
            }catch(Exception e){
               // logger.error("我也不知道哪儿错了");
            }finally {
               // logger.error("不放弃");
            }
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        this.thread=new Thread(this);
        this.thread.start();
    }

}

 

package com.gzj.demo.config;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Arrays;
import java.util.Properties;

/**
 * Description
 * <p>
 * </p>
 * DATE 2018/10/23.
 *
 * @author guozhenjiang.
 */
@Configuration
@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaConnectConfig {

    @Bean(name = "kafkaConsumer")
    public KafkaConsumer<String, String> kafkaConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("group.id", "ggg");
        props.setProperty("enable.auto.commit", enableAutoCommit);
        props.setProperty("auto.offset.reset", autoOffsetReset);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("gzj"));

        return consumer;
    }
    @Value("${server.port}")
    private String port;
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private String enableAutoCommit;
    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    public String getGroupId() {
        return groupId;
    }

    public void setGroupId(String groupId) {
        this.groupId = groupId;
    }

    public String getBootstrapServers() {
        return bootstrapServers;
    }

    public void setBootstrapServers(String bootstrapServers) {
        this.bootstrapServers = bootstrapServers;
    }


    public String getEnableAutoCommit() {
        return enableAutoCommit;
    }

    public void setEnableAutoCommit(String enableAutoCommit) {
        this.enableAutoCommit = enableAutoCommit;
    }

    public String getAutoOffsetReset() {
        return autoOffsetReset;
    }

    public void setAutoOffsetReset(String autoOffsetReset) {
        this.autoOffsetReset = autoOffsetReset;
    }
}

后一种暂未发现有什么优点。都可以实现监听kafka,充当消费者

 

3,现在我有两个消费者,之前一直好奇如果多个消费者,如何让他们重复消费,或协同消费,听说是通过配置groupid,亲自试验了一下,确实是,同一个groupid里是协同的,不通的是重复的。

也没什么,挺简单的,有什么问题可以提问,开源中国的app我下好了,应该经常登录

© 著作权归作者所有

共有 人打赏支持
我真是小菜鸡
粉丝 1
博文 18
码字总数 15522
作品 0
吕梁
私信 提问
用telnet传输文件

用telnet传输文件 解决libcrypto.so.1.0.1e丢失导致ssh连不上 libcrypto.so.1.0.1e丢失,ssh连不上,而且wget,yum,scp等等命令都没法使用了,这时候该怎么办呢,可以利用telnet来进行文件传...

卢禹
2016/10/20
190
0
java程序怎么实现的双击直接运行

平时不写单机程序,但是记得以前瞎写的时候遇到一个问题,就是怎么让一个java程序双击直接运行,而且可以四处拷贝。 求解,不要鄙视低等问题哦,哈哈

再见理想
2011/06/17
3.7K
21
问一下springboot基础配置的问题

上面是我瞎写的,实际上就是想简单限制一下并发用户数.. 然而后面发现一个问题,比如设定为1,确实同一时刻只能有一个访问,然而另外的访问会像进入队列一样在那里转圈圈,如何直接拒绝这些访...

Anur
2017/11/21
55
1
《PHP和MySQL Web 开发》 第12章 MySQL高级管理

我决定好好写学习笔记了,对应上书上的目录和重要信息。不瞎jb写了。从这章开始吧,然后之前写的会编辑后重发。嗯,就酱。 12.1 深入理解权限系统 妈蛋 开头就卡住了。。。我先回去修改之前的...

十万猛虎下画山
2018/07/19
0
0
关于echarts 词云鼠标悬浮错位的问题

版权声明:本文为博主瞎写的,请随便转载 https://blog.csdn.net/minose/article/details/83589194 最近使用echarts插件做了一个数据可视化的demo,发现使用词云时,数据错位了。 解决方法就是...

minose
2018/10/31
0
0

没有更多内容

加载失败,请刷新页面

加载更多

刚入职阿里,告诉你真实的职场生活,兼谈P6、P7、P8的等级

一:拿下offer的人,基本上都有什么特征? 二:为什么选择阿里? 三:阿里的工作氛围什么样? 四:阿里的薪资情况? 五:阿里的晋升空间有多大? 最近部门招聘,很多工程师,包括我在内都参与...

java知识分子
13分钟前
1
0

中国龙-扬科
16分钟前
1
0
深入理解定时器系列第一篇——理解setTimeout和setInterval

很长时间以来,定时器一直是javascript动画的核心技术。但是,关于定时器,人们通常只了解如何使用setTimeout()和setInterval(),对它们的内在运行机制并不理解,对于与预想不同的实际运行状...

Jack088
19分钟前
1
0
windows 安装nvm

1、nvw-windows的官网:https://github.com/coreybutler/nvm-windows/releases 2、选择nvm-setup.zip安装 3、配置环境变量 4、检查nvm是否安装成功 使用管理员权限打开一个命令行。输入nvm v...

灰白发
30分钟前
1
0
MySQL

慢日志查询作用 慢日志查询的主要功能就是,记录sql语句中超过设定的时间阈值的查询语句。例如,一条查询sql语句,我们设置的阈值为1s,当这条查询语句的执行时间超过了1s,则将被写入到慢查...

士兵7
31分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部