文档章节

SpringBoot ——kafka消费多个不同服务器地址消息解决方案

哎小艾
 哎小艾
发布于 2017/09/08 15:44
字数 1328
阅读 164
收藏 0

一、背景

       在springboot实际项目开发中,kafka可能需要消费多个不同服务器地址的数据,这时懂得如何进行配置就显得非常必要了。

二、配置

       1、KafkaConfig.java配置

package com.lantaiyuan.ebus.kafka;

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

@Configuration
@EnableKafka
public class KafkaConfig {
	
	@Value("${kafka.bootstrap.servers}")
	private String kafkaBootstrapServers;
	@Value("${kafka.session.timeout-ms}")
	private Integer sessionTimeoutMs;
	@Value("${kafka.auto-commit.enable}")
	private boolean enableAutoCommit;
	@Value("${kafka.auto-commit.interval-ms}")
	private Integer autoCommitIntervalMs;
	@Value("${kafka.auto-offset.reset}")
	private String autoOffsetReset;
	@Value("${kafka.group.id}")
	private String groupId;
	@Value("${kafka.topic.city-code}")
	private String topicCityCodeStr;

	/**
	 * 以下是新增配置,获取司机上下班打卡信息(注入新增的kafka服务消费地址)
	 */
	@Value("${kafka.bootstrap.driver-servers}")
	private String kafkaBootstrapDriverServers;
	
	@Bean
	@Primary//重要!!!指定该ContainerFactory为主要的容器工厂,kafka消费者默认关联该容器
	KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactory() {
		ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(consumerFactory());
		factory.setConcurrency(3);
		factory.getContainerProperties().setPollTimeout(3000);
		return factory;
	}

	@Bean
	public ConsumerFactory<Integer, String> consumerFactory() {
		return new DefaultKafkaConsumerFactory<>(consumerConfigs());
	}

	@Bean
	public Map<String, Object> consumerConfigs() {
		Map<String, Object> props = new HashMap<>();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
		props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
		props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
		props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		return props;
	}


	/**
	 * 以下为司机打卡kafka配置,即在原有kafka消费配置基础上重新再复制一份(共3个方法,记得同步更改方法名)
	 */


	@Bean
	KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaListenerContainerFactoryForDriverSchedule() {
		ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
		factory.setConsumerFactory(consumerFactoryForDriverSchedule());
		factory.setConcurrency(3);
		factory.getContainerProperties().setPollTimeout(3000);
		return factory;
	}

	@Bean
	public ConsumerFactory<Integer, String> consumerFactoryForDriverSchedule() {
		return new DefaultKafkaConsumerFactory<>(consumerConfigsForDriverSchedule());
	}


	/**
	 *
	 * 司机打卡信息kafka消费者配置
	 */
	@Bean
	public Map<String, Object> consumerConfigsForDriverSchedule() {
		Map<String, Object> props = new HashMap<>();
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapDriverServers);
		props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
		props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
		props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
		return props;
	}
	
	@Bean(name="topicCityMap")
	public Map<String, String> topicCityMap() {
		Map<String, String> map = new HashMap<>();
		String[] topicCityArray = topicCityCodeStr.split("\\|");
		for (String topicCity : topicCityArray) {
			String[]  array = topicCity.split(":");
			map.put(array[0], array[1]);
		}
		return map;
	}
	
	
}

      2、KafkaConsumer.java更改说明

package com.lantaiyuan.ebus.kafka;

import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import javax.annotation.Resource;

import com.lantaiyuan.ebus.dao.BaseDriverScheduleMapper;
import com.lantaiyuan.ebus.model.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.lantaiyuan.ebus.constants.KafkaConsts;
import com.lantaiyuan.ebus.service.BaseRouteServiceI;
import com.lantaiyuan.ebus.service.BaseStationServiceI;
import com.lantaiyuan.ebus.service.JpushServiceI;
import com.lantaiyuan.ebus.service.MyTrailServiceI;
import com.lantaiyuan.ebus.service.NoticeHistoryServiceI;
import com.lantaiyuan.ebus.service.OnBusInfoServiceI;
import com.lantaiyuan.ebus.service.RelRouteBusServiceI;
import com.lantaiyuan.ebus.service.TaskTimerServiceI;
import com.lantaiyuan.ebus.util.Tools;
import com.lantaiyuan.ebus.virtual.card.model.VirtualCardSwipeHistory;
import com.lantaiyuan.ebus.virtual.card.model.enummodel.VirtualCardRecordHeaderTypeEnum;
import com.lantaiyuan.ebus.virtual.card.service.VirtualCardSwipeHistoryServiceI;


@Component
public class KafkaConsumer {


	//.........................


    //containerFactory默认关联kafkaListenerContainerFactory容器工厂
	@KafkaListener(topics = "${kafka.topic.od-topic}")
	public void odConsumer(ConsumerRecord<Integer, String> msg) {
		String record = msg.value();
		JSONObject jsonObject = JSONObject.parseObject(record);
		if (!StringUtils.isEmpty(jsonObject.getString("startStationId"))) { // O点
			ProduceOriginPoint originPoint = JSONObject.parseObject(record, ProduceOriginPoint.class);
			myTrailService.insertOriginPoint(originPoint);
		} else { // D点
			ProduceDestPoint destPoint = JSONObject.parseObject(record, ProduceDestPoint.class);
			myTrailService.updateDestPoint(destPoint);
		}
	}
	

    //.........................
	

	/***
	 *
	 * <p>Title: </p>
	 * <p>Description: 消费司机考勤消息</p>
	 */
    //containerFactory手动关联到kafkaListenerContainerFactoryForDriverSchedule容器工厂
	@KafkaListener(topics = "${kafka.topic.driverSheduleTopic}", containerFactory = "kafkaListenerContainerFactoryForDriverSchedule")
	public void driverSheduleConsumer(ConsumerRecord<Integer, String> msg) {
		String record = msg.value();
		JSONObject jsonObject = JSONObject.parseObject(record);

		BaseDriverSchedule baseDriverSchedule = new BaseDriverSchedule();
		baseDriverSchedule.setCityCode(jsonObject.getString("cityCode"));
		baseDriverSchedule.setDriverName(jsonObject.getString("driverName"));
		baseDriverSchedule.setOccurTime(jsonObject.getDate("occurTime"));

        //jsonObject的typeId为int类型,model为String类型,但是可以jsonObject.getString()进行自动类型转换
		baseDriverSchedule.setOnboardId(jsonObject.getString("onBoardId"));
        
        //jsonObject的typeId为int类型,model为byte类型,但是可以jsonObject.getByte()进行自动类型转换
		baseDriverSchedule.setTypeId(jsonObject.getByte("typeId"));
		baseDriverSchedule.setWorkId(jsonObject.getString("workerId"));

		this.baseDriverScheduleMapper.insertSelective(baseDriverSchedule);
	}
}

三、附KafkaListener注解源码

/*
 * Copyright 2016 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.kafka.annotation;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Repeatable;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import org.springframework.messaging.handler.annotation.MessageMapping;

/**
 * Annotation that marks a method to be the target of a Kafka message listener on the
 * specified topics.
 *
 * The {@link #containerFactory()} identifies the
 * {@link org.springframework.kafka.config.KafkaListenerContainerFactory
 * KafkaListenerContainerFactory} to use to build the Kafka listener container. If not
 * set, a <em>default</em> container factory is assumed to be available with a bean name
 * of {@code kafkaListenerContainerFactory} unless an explicit default has been provided
 * through configuration.
 *
 * <p>
 * Processing of {@code @KafkaListener} annotations is performed by registering a
 * {@link KafkaListenerAnnotationBeanPostProcessor}. This can be done manually or, more
 * conveniently, through {@link EnableKafka} annotation.
 *
 * <p>
 * Annotated methods are allowed to have flexible signatures similar to what
 * {@link MessageMapping} provides, that is
 * <ul>
 * <li>{@link org.apache.kafka.clients.consumer.ConsumerRecord} to access to the raw Kafka
 * message</li>
 * <li>{@link org.springframework.kafka.support.Acknowledgment} to manually ack</li>
 * <li>{@link org.springframework.messaging.handler.annotation.Payload @Payload}-annotated
 * method arguments including the support of validation</li>
 * <li>{@link org.springframework.messaging.handler.annotation.Header @Header}-annotated
 * method arguments to extract a specific header value, defined by
 * {@link org.springframework.kafka.support.KafkaHeaders KafkaHeaders}</li>
 * <li>{@link org.springframework.messaging.handler.annotation.Headers @Headers}-annotated
 * argument that must also be assignable to {@link java.util.Map} for getting access to
 * all headers.</li>
 * <li>{@link org.springframework.messaging.MessageHeaders MessageHeaders} arguments for
 * getting access to all headers.</li>
 * <li>{@link org.springframework.messaging.support.MessageHeaderAccessor
 * MessageHeaderAccessor} for convenient access to all method arguments.</li>
 * </ul>
 *
 * <p>When defined at the method level, a listener container is created for each method.
 * The {@link org.springframework.kafka.listener.MessageListener} is a
 * {@link org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter},
 * configured with a {@link org.springframework.kafka.config.MethodKafkaListenerEndpoint}.
 *
 * <p>When defined at the class level, a single message listener container is used to
 * service all methods annotated with {@code @KafkaHandler}. Method signatures of such
 * annotated methods must not cause any ambiguity such that a single method can be
 * resolved for a particular inbound message. The
 * {@link org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter} is
 * configured with a
 * {@link org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint}.
 *
 * @author Gary Russell
 *
 * @see EnableKafka
 * @see KafkaListenerAnnotationBeanPostProcessor
 * @see KafkaListeners
 */
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(KafkaListeners.class)
public @interface KafkaListener {

	/**
	 * The unique identifier of the container managing for this endpoint.
	 * <p>If none is specified an auto-generated one is provided.
	 * @return the {@code id} for the container managing for this endpoint.
	 * @see org.springframework.kafka.config.KafkaListenerEndpointRegistry#getListenerContainer(String)
	 */
	String id() default "";

	/**
	 * The bean name of the {@link org.springframework.kafka.config.KafkaListenerContainerFactory}
	 * to use to create the message listener container responsible to serve this endpoint.
	 * <p>If not specified, the default container factory is used, if any.
	 * @return the container factory bean name.
	 */
	String containerFactory() default "";

	/**
	 * The topics for this listener.
	 * The entries can be 'topic name', 'property-placeholder keys' or 'expressions'.
	 * Expression must be resolved to the topic name.
	 * Mutually exclusive with {@link #topicPattern()} and {@link #topicPartitions()}.
	 * @return the topic names or expressions (SpEL) to listen to.
	 */
	String[] topics() default {};

	/**
	 * The topic pattern for this listener.
	 * The entries can be 'topic name', 'property-placeholder keys' or 'expressions'.
	 * Expression must be resolved to the topic pattern.
	 * Mutually exclusive with {@link #topics()} and {@link #topicPartitions()}.
	 * @return the topic pattern or expression (SpEL).
	 */
	String topicPattern() default "";

	/**
	 * The topicPartitions for this listener.
	 * Mutually exclusive with {@link #topicPattern()} and {@link #topics()}.
	 * @return the topic names or expressions (SpEL) to listen to.
	 */
	TopicPartition[] topicPartitions() default {};

	/**
	 * If provided, the listener container for this listener will be added to a bean
	 * with this value as its name, of type {@code Collection<MessageListenerContainer>}.
	 * This allows, for example, iteration over the collection to start/stop a subset
	 * of containers.
	 * @return the bean name for the group.
	 */
	String group() default "";

}

 

© 著作权归作者所有

共有 人打赏支持
哎小艾
粉丝 12
博文 320
码字总数 162589
作品 0
深圳
程序员
SpringBoot开发案例之整合Kafka实现消息队列

前言 最近在做一款秒杀的案例,涉及到了同步锁、数据库锁、分布式锁、进程内队列以及分布式消息队列,这里对SpringBoot集成Kafka实现消息队列做一个简单的记录。Kafka简介 Kafka是由Apache软...

小柒2012
05/18
0
0
搞懂分布式技术23:SpringBoot Kafka 整合使用

Spring Boot系列文章(一):SpringBoot Kafka 整合使用2018-01-05 ×文章目录 1. 前提 2. 创建项目 3. Kafka 设置 4. 运行 5. 关注我 6. 最后 前提 假设你了解过 SpringBoot 和 Kafka。 1、...

你的猫大哥
07/05
0
0
SpringBoot整合ActiveMq要分以下几个步骤:

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/u013115157/article/details/79413429 第一步:从ActiveMq官方上下载ActiveMq服务 http://activemq.apache.o...

MorganLai
03/01
0
0
恒宇少年/spring-boot-chapter

简书整套文档以及源码解析 专题 专题名称 专题描述 001 Spring Boot 核心技术 讲解SpringBoot一些企业级层面的核心组件 002 Spring Cloud 核心技术 对Spring Cloud核心技术全面讲解 003 Quer...

恒宇少年
04/19
0
0
Spring Boot学习笔记

文件上传与下载 springboot 上传文件到服务器 Spring Boot2.0连载(33)-- Spring Boot文件上传下载 SpringBoot项目的The temporary upload location ***is not valid 问题 /tmp/tomcat.8483...

OSC_fly
07/26
0
0

没有更多内容

加载失败,请刷新页面

加载更多

docker多容器部署lnmp环境

环境:RHEL7.5 ip:192.168.10.102,主机名:lb02 一、创建web、数据库目录 web网站目录为:/wwwroot,属主属组:www [root@lb02 ~]# mkdir /wwwroot[root@lb02 ~]# useradd -s /sbin/nolo...

人在艹木中
12分钟前
0
0
eclipse运行springboot项目报错‘找不到或无法加载主类’

这是一个很烦躁的问题~,往往困住大家好长时间,然后各种百度。借此,咱将这个问题有可能产生的原因进行一下总结。若有不完善之处欢迎大家在下面留言指出~~ Duang!问题出现 然后开始尝试解决...

Code辉
33分钟前
0
0
springboot oauth2 跨域设置

@Overridepublic void configure(HttpSecurity http) throws Exception { http .authorizeRequests() .antMatchers("/security/**") .authentica......

昆虫大侠
35分钟前
0
0
08-利用思维导图梳理JavaSE-泛型

08-利用思维导图梳理JavaSE-泛型 主要内容 1.泛型的基本概念 1.1.定义 1.2.使用前提 1.3.使用泛型的好处 2.泛型的使用 2.1.泛型类定义 2.2.泛型对象定义 2.3.泛型中的构造方法 2.4.泛型方法的...

飞鱼说编程
37分钟前
0
0
Docker 部署 Spring Boot 项目指南

仅想在Docker里运行一个Spring Boot项目,捣鼓了许久。。。 本文主要适用于Windows环境下的Docker 一、运行环境 Windows 10 Maven 3.5 Docker 18.06.1-ce-win73 (19507) 二、创建Spring Boot...

AmosWang
43分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部