package com.qianlima.solr.qy.configuration ;
import org.apache.kafka.clients.consumer.ConsumerConfig ;
import org.apache.kafka.common.serialization.StringDeserializer ;
import org.springframework.context.annotation.Bean ;
import org.springframework.context.annotation.Configuration ;
import org.springframework.kafka.annotation.EnableKafka ;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory ;
import org.springframework.kafka.config.KafkaListenerContainerFactory ;
import org.springframework.kafka.core.ConsumerFactory ;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory ;
import org.springframework.kafka.listener.AbstractMessageListenerContainer ;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer ;
import java.util.HashMap ;
import java.util.Map ;
/** * @author liweihai * @date 2020/12/23 15:42 * @desc */
@ Configuration @ EnableKafka
public class KafkaConfig {
@ Bean public KafkaListenerContainerFactory > kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory < > () ;
factory.setConsumerFactory(consumerFactory()) ;
factory.setConcurrency(10) ;
factory.getContainerProperties().setPollTimeout(15000) ;
factory.setBatchListener(true) ;
//@KafkaListener 批量消费 每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE) ;
//设置提交偏移量的方式
return factory ;
}
public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()) ;
}
public Map consumerConfigs() {
Map propsMap = new HashMap<>(16) ;
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.117.118:9092") ;
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false) ;
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000) ;
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 198000) ;
propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,200000) ;
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) ;
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) ;
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "groupIdEnterprise") ;
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") ;
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000) ;
propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 200000) ;
propsMap.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024) ;
propsMap.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 190000) ;
//每个批次获取数 return propsMap ;
} }
package com.qianlima.solr.qy.service.indexer.impl;
import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.qianlima.solr.qy.bean.companys.EnterpriseAttachedEntity;
import com.qianlima.solr.qy.bean.companys.EnterprisePositionEntity;
import com.qianlima.solr.qy.bean.companys.EnterpriseProducerEntity;
import com.qianlima.solr.qy.configuration.rocketmq.config.MqConfig;
import com.qianlima.solr.qy.configuration.rocketmq.producer.ProducerClient;
import com.qianlima.solr.qy.dto.DataDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
/**
* @author liweihai
* @date 2020/11/30 13:53
* @desc
*/
@Slf4j
@Service
public class EnterpriseProducerService {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
@Autowired
private ProducerClient producerClient;
@Autowired
private ProducerBean producer;
@Autowired
private MqConfig mqConfig;
/**
* 联系电话
*
* @param entity
*/
public void sendEnterpriseContactData(EnterpriseProducerEntity entity) {
kafkaTemplate.send("topicEnterpriseContact", JSONObject.toJSONString(entity));
}
/**
* 公告
*
* @param entity
*/
public void sendEnterpriseNoticeData(DataDTO entity) {
kafkaTemplate.send("topicEnterpriseNotice", JSONObject.toJSONString(entity));
}
/**
* 职位
*
* @param entity
*/
public void sendEnterprisePositionData(EnterprisePositionEntity entity) {
kafkaTemplate.send("topicEnterprisePosition", JSONObject.toJSONString(entity));
}
/**
* 附属
*
* @param entity
*/
public void sendEnterpriseAttachedData(EnterpriseAttachedEntity entity) {
kafkaTemplate.send("topicEnterpriseAttached", JSONObject.toJSONString(entity));
}
/**
* 关系
*
* @param entity
*/
public void sendEnterpriseRelationData(EnterpriseProducerEntity entity) {
kafkaTemplate.send("topicEnterpriseRelation", JSONObject.toJSONString(entity));
}
/**
* 联系电话
*
* @param entity
*/
public void sendEnterpriseContactDataRocket(EnterpriseProducerEntity entity) {
producerClient.send(producer, JSONObject.toJSONString(entity),mqConfig.getTagContact(),entity.getId()+"-"+entity.getCate());
}
/**
* 公告
*
* @param entity
*/
public void sendEnterpriseNoticeDataRocket(DataDTO entity) {
producerClient.send(producer, JSONObject.toJSONString(entity),mqConfig.getTagNotice(),String.valueOf(entity.getId()));
}
/**
* 职位
*
* @param entity
*/
public void sendEnterprisePositionDataRocket(EnterprisePositionEntity entity) {
producerClient.send(producer, JSONObject.toJSONString(entity),mqConfig.getTagPosition(),entity.getId()+"-"+entity.getCate());
}
/**
* 附属
*
* @param entity
*/
public void sendEnterpriseAttachedDataRocket(EnterpriseAttachedEntity entity) {
producerClient.send(producer,JSONObject.toJSONString(entity),mqConfig.getTagAttached(),String.valueOf(entity.getId()));
}
}
kafka: bootstrap-servers: 192.168.117.118:9092 producer: retries: 0 batch-size: 16384 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer properties: linger.ms: 1 consumer: enable-auto-commit: true auto-commit-interval: 100ms key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer properties: session.timeout.ms: 15000 max-poll-records: 1000