文档章节

SpringBoot ——kafka消费多个不同服务器地址消息解决方案

哎小艾
 哎小艾
发布于 2017/09/08 15:44
字数 1328
阅读 254
收藏 0

一、背景

       在springboot实际项目开发中,kafka可能需要消费多个不同服务器地址的数据,这时懂得如何进行配置就显得非常必要了。

二、配置

       1、KafkaConfig.java配置

package com.lantaiyuan.ebus.kafka;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

@Configuration
@EnableKafka
public class KafkaConfig {
	
	@Value("${kafka.bootstrap.servers}")
	private String kafkaBootstrapServers;
	@Value("${kafka.session.timeout-ms}")
	private Integer sessionTimeoutMs;
	@Value("${kafka.auto-commit.enable}")
	private boolean enableAutoCommit;
	@Value("${kafka.auto-commit.interval-ms}")
	private Integer autoCommitIntervalMs;
	@Value("${kafka.auto-offset.reset}")
	private String autoOffsetReset;
	@Value("${kafka.group.id}")
	private String groupId;
	@Value("${kafka.topic.city-code}")
	private String topicCityCodeStr;

	/**
	 * 以下是新增配置,获取司机上下班打卡信息(注入新增的kafka服务消费地址)
	 */
	@Value("${kafka.bootstrap.driver-servers}")
	private String kafkaBootstrapDriverServers;
	
	@Bean
	@Primary//重要!!!指定该ContainerFactory为主要的容器工厂,kafka消费者默认关联该容器
	KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
		ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(consumerFactory());
		factory.setConcurrency(3);
		factory.getContainerProperties().setPollTimeout(3000);
		return factory;
	}

	@Bean
	public ConsumerFactory<Integer, String> consumerFactory() {
		return new DefaultKafkaConsumerFactory<>(consumerConfigs());
	}

	@Bean
	public Map<String, Object> consumerConfigs() {
		Map<String, Object> props = new HashMap<>();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
		props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
		props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
		props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		return props;
	}


	/**
	 * 以下为司机打卡kafka配置,即在原有kafka消费配置基础上重新再复制一份(共3个方法,记得同步更改方法名)
	 */


	@Bean
	KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactoryForDriverSchedule() {
		ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(consumerFactoryForDriverSchedule());
		factory.setConcurrency(3);
		factory.getContainerProperties().setPollTimeout(3000);
		return factory;
	}

	@Bean
	public ConsumerFactory<Integer, String> consumerFactoryForDriverSchedule() {
		return new DefaultKafkaConsumerFactory<>(consumerConfigsForDriverSchedule());
	}


	/**
	 *
	 * 司机打卡信息kafka消费者配置
	 */
	@Bean
	public Map<String, Object> consumerConfigsForDriverSchedule() {
		Map<String, Object> props = new HashMap<>();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapDriverServers);
		props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
		props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
		props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		return props;
	}
	
	@Bean(name="topicCityMap")
	public Map<String, String> topicCityMap() {
		Map<String, String> map = new HashMap<>();
		String[] topicCityArray = topicCityCodeStr.split("\\|");
		for (String topicCity : topicCityArray) {
			String[]  array = topicCity.split(":");
			map.put(array[0], array[1]);
		}
		return map;
	}
	
	
}

      2、KafkaConsumer.java更改说明

package com.lantaiyuan.ebus.kafka;

import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import javax.annotation.Resource;

import com.lantaiyuan.ebus.dao.BaseDriverScheduleMapper;
import com.lantaiyuan.ebus.model.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.lantaiyuan.ebus.constants.KafkaConsts;
import com.lantaiyuan.ebus.service.BaseRouteServiceI;
import com.lantaiyuan.ebus.service.BaseStationServiceI;
import com.lantaiyuan.ebus.service.JpushServiceI;
import com.lantaiyuan.ebus.service.MyTrailServiceI;
import com.lantaiyuan.ebus.service.NoticeHistoryServiceI;
import com.lantaiyuan.ebus.service.OnBusInfoServiceI;
import com.lantaiyuan.ebus.service.RelRouteBusServiceI;
import com.lantaiyuan.ebus.service.TaskTimerServiceI;
import com.lantaiyuan.ebus.util.Tools;
import com.lantaiyuan.ebus.virtual.card.model.VirtualCardSwipeHistory;
import com.lantaiyuan.ebus.virtual.card.model.enummodel.VirtualCardRecordHeaderTypeEnum;
import com.lantaiyuan.ebus.virtual.card.service.VirtualCardSwipeHistoryServiceI;


@Component
public class KafkaConsumer {


	//.........................


    //containerFactory默认关联kafkaListenerContainerFactory容器工厂
	@KafkaListener(topics = "${kafka.topic.od-topic}")
	public void odConsumer(ConsumerRecord<Integer, String> msg) {
		String record = msg.value();
		JSONObject jsonObject = JSONObject.parseObject(record);
		if (!StringUtils.isEmpty(jsonObject.getString("startStationId"))) { // O点
			ProduceOriginPoint originPoint = JSONObject.parseObject(record, ProduceOriginPoint.class);
			myTrailService.insertOriginPoint(originPoint);
		} else { // D点
			ProduceDestPoint destPoint = JSONObject.parseObject(record, ProduceDestPoint.class);
			myTrailService.updateDestPoint(destPoint);
		}
	}
	

    //.........................
	

	/***
	 *
	 * <p>Title: </p>
	 * <p>Description: 消费司机考勤消息</p>
	 */
    //containerFactory手动关联到kafkaListenerContainerFactoryForDriverSchedule容器工厂
	@KafkaListener(topics = "${kafka.topic.driverSheduleTopic}", containerFactory = "kafkaListenerContainerFactoryForDriverSchedule")
	public void driverSheduleConsumer(ConsumerRecord<Integer, String> msg) {
		String record = msg.value();
		JSONObject jsonObject = JSONObject.parseObject(record);

		BaseDriverSchedule baseDriverSchedule = new BaseDriverSchedule();
		baseDriverSchedule.setCityCode(jsonObject.getString("cityCode"));
		baseDriverSchedule.setDriverName(jsonObject.getString("driverName"));
		baseDriverSchedule.setOccurTime(jsonObject.getDate("occurTime"));

        //jsonObject的typeId为int类型,model为String类型,但是可以jsonObject.getString()进行自动类型转换
		baseDriverSchedule.setOnboardId(jsonObject.getString("onBoardId"));
        
        //jsonObject的typeId为int类型,model为byte类型,但是可以jsonObject.getByte()进行自动类型转换
		baseDriverSchedule.setTypeId(jsonObject.getByte("typeId"));
		baseDriverSchedule.setWorkId(jsonObject.getString("workerId"));

		this.baseDriverScheduleMapper.insertSelective(baseDriverSchedule);
	}
}

三、附KafkaListener注解源码

/*
 * Copyright 2016 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.kafka.annotation;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Repeatable;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.springframework.messaging.handler.annotation.MessageMapping;

/**
 * Annotation that marks a method to be the target of a Kafka message listener on the
 * specified topics.
 *
 * The {@link #containerFactory()} identifies the
 * {@link org.springframework.kafka.config.KafkaListenerContainerFactory
 * KafkaListenerContainerFactory} to use to build the Kafka listener container. If not
 * set, a <em>default</em> container factory is assumed to be available with a bean name
 * of {@code kafkaListenerContainerFactory} unless an explicit default has been provided
 * through configuration.
 *
 * <p>
 * Processing of {@code @KafkaListener} annotations is performed by registering a
 * {@link KafkaListenerAnnotationBeanPostProcessor}. This can be done manually or, more
 * conveniently, through {@link EnableKafka} annotation.
 *
 * <p>
 * Annotated methods are allowed to have flexible signatures similar to what
 * {@link MessageMapping} provides, that is
 * <ul>
 * <li>{@link org.apache.kafka.clients.consumer.ConsumerRecord} to access to the raw Kafka
 * message</li>
 * <li>{@link org.springframework.kafka.support.Acknowledgment} to manually ack</li>
 * <li>{@link org.springframework.messaging.handler.annotation.Payload @Payload}-annotated
 * method arguments including the support of validation</li>
 * <li>{@link org.springframework.messaging.handler.annotation.Header @Header}-annotated
 * method arguments to extract a specific header value, defined by
 * {@link org.springframework.kafka.support.KafkaHeaders KafkaHeaders}</li>
 * <li>{@link org.springframework.messaging.handler.annotation.Headers @Headers}-annotated
 * argument that must also be assignable to {@link java.util.Map} for getting access to
 * all headers.</li>
 * <li>{@link org.springframework.messaging.MessageHeaders MessageHeaders} arguments for
 * getting access to all headers.</li>
 * <li>{@link org.springframework.messaging.support.MessageHeaderAccessor
 * MessageHeaderAccessor} for convenient access to all method arguments.</li>
 * </ul>
 *
 * <p>When defined at the method level, a listener container is created for each method.
 * The {@link org.springframework.kafka.listener.MessageListener} is a
 * {@link org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter},
 * configured with a {@link org.springframework.kafka.config.MethodKafkaListenerEndpoint}.
 *
 * <p>When defined at the class level, a single message listener container is used to
 * service all methods annotated with {@code @KafkaHandler}. Method signatures of such
 * annotated methods must not cause any ambiguity such that a single method can be
 * resolved for a particular inbound message. The
 * {@link org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter} is
 * configured with a
 * {@link org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint}.
 *
 * @author Gary Russell
 *
 * @see EnableKafka
 * @see KafkaListenerAnnotationBeanPostProcessor
 * @see KafkaListeners
 */
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(KafkaListeners.class)
public @interface KafkaListener {

	/**
	 * The unique identifier of the container managing for this endpoint.
	 * <p>If none is specified an auto-generated one is provided.
	 * @return the {@code id} for the container managing for this endpoint.
	 * @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)
	 */
	String id() default "";

	/**
	 * The bean name of the {@link org.springframework.kafka.config.KafkaListenerContainerFactory}
	 * to use to create the message listener container responsible to serve this endpoint.
	 * <p>If not specified, the default container factory is used, if any.
	 * @return the container factory bean name.
	 */
	String containerFactory() default "";

	/**
	 * The topics for this listener.
	 * The entries can be 'topic name', 'property-placeholder keys' or 'expressions'.
	 * Expression must be resolved to the topic name.
	 * Mutually exclusive with {@link #topicPattern()} and {@link #topicPartitions()}.
	 * @return the topic names or expressions (SpEL) to listen to.
	 */
	String[] topics() default {};

	/**
	 * The topic pattern for this listener.
	 * The entries can be 'topic name', 'property-placeholder keys' or 'expressions'.
	 * Expression must be resolved to the topic pattern.
	 * Mutually exclusive with {@link #topics()} and {@link #topicPartitions()}.
	 * @return the topic pattern or expression (SpEL).
	 */
	String topicPattern() default "";

	/**
	 * The topicPartitions for this listener.
	 * Mutually exclusive with {@link #topicPattern()} and {@link #topics()}.
	 * @return the topic names or expressions (SpEL) to listen to.
	 */
	TopicPartition[] topicPartitions() default {};

	/**
	 * If provided, the listener container for this listener will be added to a bean
	 * with this value as its name, of type {@code Collection<MessageListenerContainer>}.
	 * This allows, for example, iteration over the collection to start/stop a subset
	 * of containers.
	 * @return the bean name for the group.
	 */
	String group() default "";

}

 

© 著作权归作者所有

共有 人打赏支持
哎小艾
粉丝 13
博文 324
码字总数 163339
作品 0
深圳
程序员
私信 提问
SpringBoot开发案例之整合Kafka实现消息队列

前言 最近在做一款秒杀的案例,涉及到了同步锁、数据库锁、分布式锁、进程内队列以及分布式消息队列,这里对SpringBoot集成Kafka实现消息队列做一个简单的记录。Kafka简介 Kafka是由Apache软...

小柒2012
05/18
0
0
搞懂分布式技术23:SpringBoot Kafka 整合使用

Spring Boot系列文章(一):SpringBoot Kafka 整合使用2018-01-05 ×文章目录 1. 前提 2. 创建项目 3. Kafka 设置 4. 运行 5. 关注我 6. 最后 前提 假设你了解过 SpringBoot 和 Kafka。 1、...

你的猫大哥
07/05
0
0
SpringBoot整合ActiveMq要分以下几个步骤:

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u013115157/article/details/79413429 第一步:从ActiveMq官方上下载ActiveMq服务 http://activemq.apache.o...

MorganLai
03/01
0
0
恒宇少年/spring-boot-chapter

简书整套文档以及源码解析 专题 专题名称 专题描述 001 Spring Boot 核心技术 讲解SpringBoot一些企业级层面的核心组件 002 Spring Cloud 核心技术 对Spring Cloud核心技术全面讲解 003 Quer...

恒宇少年
04/19
0
0
springboot配置kafka生产者和消费者详解

在原有pom.xml依赖下新添加一下kafka依赖ar包 application.properties: springboot生产者配置: springboot消费者配置: 生产者测试: 消费者测试: 总结: ① 生产者环境类配置好以后,@Au...

jiapeng_lv
10/19
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Jrebel 激活服务,在springboot上面的进行热部署

1.安装JRebel 下载Jrebel插件,官网需要翻墙下载,需要的可以在csdn的下载区去进行下载 打开idea,File->settings 然后重启idea 2.破解JRebel 首先HELP -> JRebel -> Activation 在jrebel se...

glen_xu
52分钟前
1
0
设置版头的图片+网页布局

1.div的background-image(推荐) 2.div+image 1.是只有部分图,2是压图 1.frame 2.js(推荐) 因为frame不好设置大小

木之下
56分钟前
0
0
MyBatis组件之缓存实现及使用

一 .概述 先讲缓存实现,主要是mybatis一级缓存,二级缓存及缓存使用后续补充 Mybatis缓存的实现是基于Map的,从缓存里面读写数据是缓存模块的核心基础功能; 除核心功能之外,有很多额外的附...

Ala6
今天
1
0
SpringBoot中使用@RequestBody时如何自定义需要转换的日期格式

SpringBoot序列化和反序列化Json时默认使用的是Jackson(例如使用@RequestBody反序列化前端传递过来的Json字符串时), 当我们前端使用Json字符串传递到后台时日期格式可能是时间戳(即long类...

帅得拖网速
今天
1
0
可自定义扩展底部列表对话框ListBottomSheetDialogFragment

因为需要,为了方便,构建了一个可以自定义扩展的底部列表对话框,可以应付大部分场景。 效果图如下: 1.默认实现: 2.自定义列表实现 3.自定义头部和列表实现 一.可实现功能 1.默认可实现通...

明月春秋
今天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部