文档章节

springboot+kafka(ip地址瞎写的)

我真是小菜鸡
 我真是小菜鸡
发布于 10/23 19:11
字数 630
阅读 29
收藏 0

1,首先springboot对kafka的支持也很好,同样是在配置文件中配置好参数,然后就可以直接使用。先说一下,很简单,,,不要怕

2,我用的依赖是

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
</dependency>

配置文件

kafka:
  bootstrap-servers: 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092
  producer:
    retries: 1
    batch-size: 16384
    buffer-memory: 33554432
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.apache.kafka.common.serialization.StringSerializer
    bootstrap-servers: 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092
  consumer:
    bootstrap-servers:  12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092, 12.34.56.78:9092
    enable-auto-commit: true
    auto-offset-reset: latest
    auto-commit-interval: 1000
    group-id: gzj

然后在需要往kafka发送数据的地方,也就是生产者,直接注入即可

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

消费者,监听

@KafkaListener(topics = {"gzj"})
public void receive(String content){
    System.err.println("Receive:" + content);
}

消费者还有另一种方法,

package com.gzj.demo.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * Description
 * <p>
 * </p>
 * DATE 2018/10/23.
 *
 * @author guozhenjiang.
 */
@Component
public class KafkaConsumerTask implements Runnable,InitializingBean {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerTask.class);
    private Thread thread;
    @Resource(name="kafkaConsumer")
    private KafkaConsumer<String,String> kafkaConsumer;

    @Override
    public void run() {
        logger.info("消费数据任务启动");
        while(true){
            try{
                ConsumerRecords<String ,String > records=kafkaConsumer.poll(1000);
                if(records!=null){
                    for(ConsumerRecord<String ,String > record:records){
                        logger.error(record.key());
                        logger.error(record.topic());
                        logger.error(record.value());
                    }
                }
            }catch(Exception e){
               // logger.error("我也不知道哪儿错了");
            }finally {
               // logger.error("不放弃");
            }
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        this.thread=new Thread(this);
        this.thread.start();
    }

}

 

package com.gzj.demo.config;

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Arrays;
import java.util.Properties;

/**
 * Description
 * <p>
 * </p>
 * DATE 2018/10/23.
 *
 * @author guozhenjiang.
 */
@Configuration
@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaConnectConfig {

    @Bean(name = "kafkaConsumer")
    public KafkaConsumer<String, String> kafkaConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("group.id", "ggg");
        props.setProperty("enable.auto.commit", enableAutoCommit);
        props.setProperty("auto.offset.reset", autoOffsetReset);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("gzj"));

        return consumer;
    }
    @Value("${server.port}")
    private String port;
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private String enableAutoCommit;
    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    public String getGroupId() {
        return groupId;
    }

    public void setGroupId(String groupId) {
        this.groupId = groupId;
    }

    public String getBootstrapServers() {
        return bootstrapServers;
    }

    public void setBootstrapServers(String bootstrapServers) {
        this.bootstrapServers = bootstrapServers;
    }


    public String getEnableAutoCommit() {
        return enableAutoCommit;
    }

    public void setEnableAutoCommit(String enableAutoCommit) {
        this.enableAutoCommit = enableAutoCommit;
    }

    public String getAutoOffsetReset() {
        return autoOffsetReset;
    }

    public void setAutoOffsetReset(String autoOffsetReset) {
        this.autoOffsetReset = autoOffsetReset;
    }
}

后一种暂未发现有什么优点。都可以实现监听kafka,充当消费者

 

3,现在我有两个消费者,之前一直好奇如果多个消费者,如何让他们重复消费,或协同消费,听说是通过配置groupid,亲自试验了一下,确实是,同一个groupid里是协同的,不通的是重复的。

也没什么,挺简单的,有什么问题可以提问,开源中国的app我下好了,应该经常登录

© 著作权归作者所有

共有 人打赏支持
我真是小菜鸡
粉丝 1
博文 18
码字总数 15242
作品 0
吕梁
私信 提问
用telnet传输文件

用telnet传输文件 解决libcrypto.so.1.0.1e丢失导致ssh连不上 libcrypto.so.1.0.1e丢失,ssh连不上,而且wget,yum,scp等等命令都没法使用了,这时候该怎么办呢,可以利用telnet来进行文件传...

卢禹
2016/10/20
190
0
java程序怎么实现的双击直接运行

平时不写单机程序,但是记得以前瞎写的时候遇到一个问题,就是怎么让一个java程序双击直接运行,而且可以四处拷贝。 求解,不要鄙视低等问题哦,哈哈

再见理想
2011/06/17
3.6K
21
问一下springboot基础配置的问题

上面是我瞎写的,实际上就是想简单限制一下并发用户数.. 然而后面发现一个问题,比如设定为1,确实同一时刻只能有一个访问,然而另外的访问会像进入队列一样在那里转圈圈,如何直接拒绝这些访...

Anur
2017/11/21
36
1
《PHP和MySQL Web 开发》 第12章 MySQL高级管理

我决定好好写学习笔记了,对应上书上的目录和重要信息。不瞎jb写了。从这章开始吧,然后之前写的会编辑后重发。嗯,就酱。 12.1 深入理解权限系统 妈蛋 开头就卡住了。。。我先回去修改之前的...

十万猛虎下画山
07/19
0
0
简历平淡的大三学生,如何在接下来的时间里给自己制造些闪光点?

大学生,一直憋在象牙塔里,最容易犯的一个错误是:想当然。 什么是闪光点? 不是自己觉得的,而是面试官认为的。到底闪不闪,人家说了算。 这也不用靠猜或推测,职位要求随便找个招聘网站,...

明哥聊求职
08/31
0
0

没有更多内容

加载失败,请刷新页面

加载更多

docker部署springboot项目

安装docker 菜鸟教程 springboot项目 maven依赖 <?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001......

yimingkeji
今天
10
0
ios多个target

1.建立3个target,分别为heroone,heroone test,heroone dev;分别为正式环境,test环境,dev环境 2.注意取消掉autocreate以防止名字不对,分别以Duplicate的方式建立另外两个scheme 3.创建...

HeroHY
今天
5
0
php获取客户端IP

php获取客户端IP 首先先阅读关于IP真实性安全的文章:如何正確的取得使用者 IP? 「任何從客戶端取得的資料都是不可信任的!」 HTTP_CLIENT_IP头是有的,但未成标准,不一定服务器都实现。 ...

DrChenXX
昨天
0
0
. The valid characters are defined in RFC 7230 and RFC 问题

通过这里的回答,我们可以知道: Tomcat在 7.0.73, 8.0.39, 8.5.7 版本后,添加了对于http头的验证。 具体来说,就是添加了些规则去限制HTTP头的规范性 参考这里 具体来说: org.apache.tom...

west_coast
昨天
1
0
刷leetcode第704题-二分查找

今天双十一买的算法书到货了,路上刷到有人说的这个题,借(chao)鉴(xi)一下别人的思路,这个是C++标准库里面的经典方法,思路精巧,优雅好品味 int search(int* nums, int numsSize, in...

锟斤拷烫烫烫
昨天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部