文档章节

SpringBoot ——kafka消费多个不同服务器地址消息解决方案

哎小艾
 哎小艾
发布于 2017/09/08 15:44
字数 1328
阅读 79
收藏 0
点赞 0
评论 0

一、背景

       在springboot实际项目开发中,kafka可能需要消费多个不同服务器地址的数据,这时懂得如何进行配置就显得非常必要了。

二、配置

       1、KafkaConfig.java配置

package com.lantaiyuan.ebus.kafka;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

@Configuration
@EnableKafka
public class KafkaConfig {
	
	@Value("${kafka.bootstrap.servers}")
	private String kafkaBootstrapServers;
	@Value("${kafka.session.timeout-ms}")
	private Integer sessionTimeoutMs;
	@Value("${kafka.auto-commit.enable}")
	private boolean enableAutoCommit;
	@Value("${kafka.auto-commit.interval-ms}")
	private Integer autoCommitIntervalMs;
	@Value("${kafka.auto-offset.reset}")
	private String autoOffsetReset;
	@Value("${kafka.group.id}")
	private String groupId;
	@Value("${kafka.topic.city-code}")
	private String topicCityCodeStr;

	/**
	 * 以下是新增配置,获取司机上下班打卡信息(注入新增的kafka服务消费地址)
	 */
	@Value("${kafka.bootstrap.driver-servers}")
	private String kafkaBootstrapDriverServers;
	
	@Bean
	@Primary//重要!!!指定该ContainerFactory为主要的容器工厂,kafka消费者默认关联该容器
	KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
		ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(consumerFactory());
		factory.setConcurrency(3);
		factory.getContainerProperties().setPollTimeout(3000);
		return factory;
	}

	@Bean
	public ConsumerFactory<Integer, String> consumerFactory() {
		return new DefaultKafkaConsumerFactory<>(consumerConfigs());
	}

	@Bean
	public Map<String, Object> consumerConfigs() {
		Map<String, Object> props = new HashMap<>();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
		props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
		props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
		props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		return props;
	}


	/**
	 * 以下为司机打卡kafka配置,即在原有kafka消费配置基础上重新再复制一份(共3个方法,记得同步更改方法名)
	 */


	@Bean
	KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactoryForDriverSchedule() {
		ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(consumerFactoryForDriverSchedule());
		factory.setConcurrency(3);
		factory.getContainerProperties().setPollTimeout(3000);
		return factory;
	}

	@Bean
	public ConsumerFactory<Integer, String> consumerFactoryForDriverSchedule() {
		return new DefaultKafkaConsumerFactory<>(consumerConfigsForDriverSchedule());
	}


	/**
	 *
	 * 司机打卡信息kafka消费者配置
	 */
	@Bean
	public Map<String, Object> consumerConfigsForDriverSchedule() {
		Map<String, Object> props = new HashMap<>();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapDriverServers);
		props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
		props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
		props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		return props;
	}
	
	@Bean(name="topicCityMap")
	public Map<String, String> topicCityMap() {
		Map<String, String> map = new HashMap<>();
		String[] topicCityArray = topicCityCodeStr.split("\\|");
		for (String topicCity : topicCityArray) {
			String[]  array = topicCity.split(":");
			map.put(array[0], array[1]);
		}
		return map;
	}
	
	
}

      2、KafkaConsumer.java更改说明

package com.lantaiyuan.ebus.kafka;

import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import javax.annotation.Resource;

import com.lantaiyuan.ebus.dao.BaseDriverScheduleMapper;
import com.lantaiyuan.ebus.model.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.lantaiyuan.ebus.constants.KafkaConsts;
import com.lantaiyuan.ebus.service.BaseRouteServiceI;
import com.lantaiyuan.ebus.service.BaseStationServiceI;
import com.lantaiyuan.ebus.service.JpushServiceI;
import com.lantaiyuan.ebus.service.MyTrailServiceI;
import com.lantaiyuan.ebus.service.NoticeHistoryServiceI;
import com.lantaiyuan.ebus.service.OnBusInfoServiceI;
import com.lantaiyuan.ebus.service.RelRouteBusServiceI;
import com.lantaiyuan.ebus.service.TaskTimerServiceI;
import com.lantaiyuan.ebus.util.Tools;
import com.lantaiyuan.ebus.virtual.card.model.VirtualCardSwipeHistory;
import com.lantaiyuan.ebus.virtual.card.model.enummodel.VirtualCardRecordHeaderTypeEnum;
import com.lantaiyuan.ebus.virtual.card.service.VirtualCardSwipeHistoryServiceI;


@Component
public class KafkaConsumer {


	//.........................


    //containerFactory默认关联kafkaListenerContainerFactory容器工厂
	@KafkaListener(topics = "${kafka.topic.od-topic}")
	public void odConsumer(ConsumerRecord<Integer, String> msg) {
		String record = msg.value();
		JSONObject jsonObject = JSONObject.parseObject(record);
		if (!StringUtils.isEmpty(jsonObject.getString("startStationId"))) { // O点
			ProduceOriginPoint originPoint = JSONObject.parseObject(record, ProduceOriginPoint.class);
			myTrailService.insertOriginPoint(originPoint);
		} else { // D点
			ProduceDestPoint destPoint = JSONObject.parseObject(record, ProduceDestPoint.class);
			myTrailService.updateDestPoint(destPoint);
		}
	}
	

    //.........................
	

	/***
	 *
	 * <p>Title: </p>
	 * <p>Description: 消费司机考勤消息</p>
	 */
    //containerFactory手动关联到kafkaListenerContainerFactoryForDriverSchedule容器工厂
	@KafkaListener(topics = "${kafka.topic.driverSheduleTopic}", containerFactory = "kafkaListenerContainerFactoryForDriverSchedule")
	public void driverSheduleConsumer(ConsumerRecord<Integer, String> msg) {
		String record = msg.value();
		JSONObject jsonObject = JSONObject.parseObject(record);

		BaseDriverSchedule baseDriverSchedule = new BaseDriverSchedule();
		baseDriverSchedule.setCityCode(jsonObject.getString("cityCode"));
		baseDriverSchedule.setDriverName(jsonObject.getString("driverName"));
		baseDriverSchedule.setOccurTime(jsonObject.getDate("occurTime"));

        //jsonObject的typeId为int类型,model为String类型,但是可以jsonObject.getString()进行自动类型转换
		baseDriverSchedule.setOnboardId(jsonObject.getString("onBoardId"));
        
        //jsonObject的typeId为int类型,model为byte类型,但是可以jsonObject.getByte()进行自动类型转换
		baseDriverSchedule.setTypeId(jsonObject.getByte("typeId"));
		baseDriverSchedule.setWorkId(jsonObject.getString("workerId"));

		this.baseDriverScheduleMapper.insertSelective(baseDriverSchedule);
	}
}

三、附KafkaListener注解源码

/*
 * Copyright 2016 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.kafka.annotation;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Repeatable;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.springframework.messaging.handler.annotation.MessageMapping;

/**
 * Annotation that marks a method to be the target of a Kafka message listener on the
 * specified topics.
 *
 * The {@link #containerFactory()} identifies the
 * {@link org.springframework.kafka.config.KafkaListenerContainerFactory
 * KafkaListenerContainerFactory} to use to build the Kafka listener container. If not
 * set, a <em>default</em> container factory is assumed to be available with a bean name
 * of {@code kafkaListenerContainerFactory} unless an explicit default has been provided
 * through configuration.
 *
 * <p>
 * Processing of {@code @KafkaListener} annotations is performed by registering a
 * {@link KafkaListenerAnnotationBeanPostProcessor}. This can be done manually or, more
 * conveniently, through {@link EnableKafka} annotation.
 *
 * <p>
 * Annotated methods are allowed to have flexible signatures similar to what
 * {@link MessageMapping} provides, that is
 * <ul>
 * <li>{@link org.apache.kafka.clients.consumer.ConsumerRecord} to access to the raw Kafka
 * message</li>
 * <li>{@link org.springframework.kafka.support.Acknowledgment} to manually ack</li>
 * <li>{@link org.springframework.messaging.handler.annotation.Payload @Payload}-annotated
 * method arguments including the support of validation</li>
 * <li>{@link org.springframework.messaging.handler.annotation.Header @Header}-annotated
 * method arguments to extract a specific header value, defined by
 * {@link org.springframework.kafka.support.KafkaHeaders KafkaHeaders}</li>
 * <li>{@link org.springframework.messaging.handler.annotation.Headers @Headers}-annotated
 * argument that must also be assignable to {@link java.util.Map} for getting access to
 * all headers.</li>
 * <li>{@link org.springframework.messaging.MessageHeaders MessageHeaders} arguments for
 * getting access to all headers.</li>
 * <li>{@link org.springframework.messaging.support.MessageHeaderAccessor
 * MessageHeaderAccessor} for convenient access to all method arguments.</li>
 * </ul>
 *
 * <p>When defined at the method level, a listener container is created for each method.
 * The {@link org.springframework.kafka.listener.MessageListener} is a
 * {@link org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter},
 * configured with a {@link org.springframework.kafka.config.MethodKafkaListenerEndpoint}.
 *
 * <p>When defined at the class level, a single message listener container is used to
 * service all methods annotated with {@code @KafkaHandler}. Method signatures of such
 * annotated methods must not cause any ambiguity such that a single method can be
 * resolved for a particular inbound message. The
 * {@link org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter} is
 * configured with a
 * {@link org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint}.
 *
 * @author Gary Russell
 *
 * @see EnableKafka
 * @see KafkaListenerAnnotationBeanPostProcessor
 * @see KafkaListeners
 */
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(KafkaListeners.class)
public @interface KafkaListener {

	/**
	 * The unique identifier of the container managing for this endpoint.
	 * <p>If none is specified an auto-generated one is provided.
	 * @return the {@code id} for the container managing for this endpoint.
	 * @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)
	 */
	String id() default "";

	/**
	 * The bean name of the {@link org.springframework.kafka.config.KafkaListenerContainerFactory}
	 * to use to create the message listener container responsible to serve this endpoint.
	 * <p>If not specified, the default container factory is used, if any.
	 * @return the container factory bean name.
	 */
	String containerFactory() default "";

	/**
	 * The topics for this listener.
	 * The entries can be 'topic name', 'property-placeholder keys' or 'expressions'.
	 * Expression must be resolved to the topic name.
	 * Mutually exclusive with {@link #topicPattern()} and {@link #topicPartitions()}.
	 * @return the topic names or expressions (SpEL) to listen to.
	 */
	String[] topics() default {};

	/**
	 * The topic pattern for this listener.
	 * The entries can be 'topic name', 'property-placeholder keys' or 'expressions'.
	 * Expression must be resolved to the topic pattern.
	 * Mutually exclusive with {@link #topics()} and {@link #topicPartitions()}.
	 * @return the topic pattern or expression (SpEL).
	 */
	String topicPattern() default "";

	/**
	 * The topicPartitions for this listener.
	 * Mutually exclusive with {@link #topicPattern()} and {@link #topics()}.
	 * @return the topic names or expressions (SpEL) to listen to.
	 */
	TopicPartition[] topicPartitions() default {};

	/**
	 * If provided, the listener container for this listener will be added to a bean
	 * with this value as its name, of type {@code Collection<MessageListenerContainer>}.
	 * This allows, for example, iteration over the collection to start/stop a subset
	 * of containers.
	 * @return the bean name for the group.
	 */
	String group() default "";

}

 

© 著作权归作者所有

共有 人打赏支持
哎小艾
粉丝 11
博文 191
码字总数 149558
作品 0
深圳
程序员
SpringBoot开发案例之整合Kafka实现消息队列

前言 最近在做一款秒杀的案例,涉及到了同步锁、数据库锁、分布式锁、进程内队列以及分布式消息队列,这里对SpringBoot集成Kafka实现消息队列做一个简单的记录。Kafka简介 Kafka是由Apache软...

小柒2012 ⋅ 05/18 ⋅ 0

恒宇少年/spring-boot-chapter

简书整套文档以及源码解析 专题 专题名称 专题描述 001 Spring Boot 核心技术 讲解SpringBoot一些企业级层面的核心组件 002 Spring Cloud 核心技术 对Spring Cloud核心技术全面讲解 003 Quer...

恒宇少年 ⋅ 04/19 ⋅ 0

Springboot集成Kafka实现producer和consumer的示例代码

本文介绍如何在springboot项目中集成kafka收发message。 Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性: 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消...

庞陆阳 ⋅ 05/24 ⋅ 0

【Spring Cloud Stream】异步任务

一、前言 前两篇博客提高了用线程池和消息队列才实现异步任务。本篇博客谈一谈用SpringCloud Stream来实现异步任务。 Spring Cloud Stream是一个用来为微服务应用构建消息驱动能力的框架。它...

qq_26545305 ⋅ 05/20 ⋅ 0

springboot + shiro 权限注解、请求乱码解决、统一异常处理

springboot + shiro 权限注解、请求乱码解决、统一异常处理 前篇 后台权限管理系统 相关: spring boot + mybatis + layui + shiro后台权限管理系统 springboot + shiro之登录人数限制、登录...

wyait ⋅ 06/06 ⋅ 0

(一)SpringBoot——helloworld

一、为什么会诞生SpringBoot? 先看看spring的优势: 1、代码解耦、简化开发:代码中不再需要new去构造对象,而是交由spring去管理对象。 2、支持AOP:面向切面的编程,方便进行权限拦截、日...

solidwang ⋅ 04/17 ⋅ 0

Spring Boot 中使用@KafkaListener批量接收消息ack

之前介绍了如何在SpringBoot中集成Kafka,但是默认情况下,@KafkaListener都是一条一条消费,如果想要一次消费一个批量的话,我们都知道,在kafka原生的API可以通过poll(num)来获取一次获取n...

xiaomin0322 ⋅ 05/14 ⋅ 0

小柒2012/spring-boot-seckill

分布式秒杀系统 开发环境 JDK1.7、Maven、Mysql、Eclipse、SpringBoot1.5.10、zookeeper3.4.6、kafka_2.11、redis-2.8.4、curator-2.10.0 友情提示 由于工作原因,项目正在完善中(仅供参考)...

小柒2012 ⋅ 05/19 ⋅ 0

Netty(一) SpringBoot 整合长连接心跳机制

前言 Netty 是一个高性能的 NIO 网络框架,本文基于 SpringBoot 以常见的心跳机制来认识 Netty。 最终能达到的效果: 客户端每隔 N 秒检测是否需要发送心跳。 服务端也每隔 N 秒检测是否需要...

crossoverJie ⋅ 05/28 ⋅ 0

Spring Cloud-honghu Cloud分布式微服务云系统

简介 鸿鹄云Cloud是基于SpringCloud来封装的,是一系列框架的有序集合。利用Spring Boot的开发模式简化了分布式系统基础设施的开发,如服务发现、注册、配置中心、消息总线、负载均衡、断路器...

itcloud ⋅ 04/25 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Mahout推荐算法API详解

前言 用Mahout来构建推荐系统,是一件既简单又困难的事情。简单是因为Mahout完整地封装了“协同过滤”算法,并实现了并行化,提供非常简单的API接口;困难是因为我们不了解算法细节,很难去根...

xiaomin0322 ⋅ 27分钟前 ⋅ 0

WampServer默认web服务器根目录位置

安装WampServer之后的web服务器根目录默认位置在WampServer安装目录下的www:

临江仙卜算子 ⋅ 28分钟前 ⋅ 0

Redux的一些手法记录

Redux Redux的基本概念见另一篇文。 这里记录一下Redux在项目中的实际操作的手法。 actions 首先定义action.js,actions的type,可以另起一个action-type.js文件。 action-type.js用来存...

LinearLaw ⋅ 29分钟前 ⋅ 0

android 手势检测(左右滑动、上下滑动)

GestureDetector类可以让我们快速的处理手势事件,如点击,滑动等。 使用GestureDetector分三步: 1. 定义GestureDetector类 2. 初始化手势类,同时设置手势监听 3. 将touch事件交给gesture...

王先森oO ⋅ 44分钟前 ⋅ 0

java 方法的执行时间监控 设置超时(Future 接口)

java 方法的执行时间监控 设置超时(Future 接口) import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor......

青峰Jun19er ⋅ 48分钟前 ⋅ 0

一名开源小白的Apache成长自述

今天收到了来自Apache Vote我成为Serviceomb项目Committer的邮件,代表自己的贡献得到了充分的肯定;除了感谢团队的给力支持,我更希望将自己的成长经历——如何践行Apache Way的心得介绍给大...

微服务框架 ⋅ 50分钟前 ⋅ 0

vim介绍、颜色显示和移动光标、一般模式下复制、剪切和粘贴

1.vim 是 vi 的升级版 vim 是带有颜色显示的 mini安装的系统,一般都不带有vim [root@aminglinux-128 ~]# yum install -y vim-enhanced已加载插件:fastestmirror, langpacksLoading mir...

oschina130111 ⋅ 51分钟前 ⋅ 0

Deepin 操作系统四面楚歌

作为国内做的最好的 Linux 发行版,源自 Debian sid 的 Deepin 目前正面临重重困境,新版本不断延期,开发人员离职,bug 长期得不到修复,和 Debian/Ubuntu 的兼容性问题也面临越来越严重的挑...

六库科技 ⋅ 51分钟前 ⋅ 0

MyBatis之动态sql

我们需要知道的是,使用mybatis重点是对sql的灵活解析和处理。在原先的UserMappser.xml中,我们这样查询表中满足条件的记录 : 123 <select id="findUserList" parameterType="userQuery...

瑟青豆 ⋅ 51分钟前 ⋅ 0

这届俄罗斯世界杯的冷门那么多怎么办?

最纯粹的世界杯,最神奇的大冷门。 德国0比1被墨西哥摩擦了。 日本历史性的赢了哥伦比亚。 C罗也挑平了西班牙。 梅西被冰岛狮吼吼愣神了。 就连11次进世界杯4强的巴西也被瑞士逼平了。 天台已...

开源中国众包平台 ⋅ 52分钟前 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部