KafkaConfig

原创
02/01 11:22
阅读数 72
package com.qianlima.solr.qy.configuration ;

import org.apache.kafka.clients.consumer.ConsumerConfig ;

import org.apache.kafka.common.serialization.StringDeserializer ;

import org.springframework.context.annotation.Bean ;

import org.springframework.context.annotation.Configuration ;

import org.springframework.kafka.annotation.EnableKafka ;

import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory ;

import org.springframework.kafka.config.KafkaListenerContainerFactory ;

import org.springframework.kafka.core.ConsumerFactory ;

import org.springframework.kafka.core.DefaultKafkaConsumerFactory ;

import org.springframework.kafka.listener.AbstractMessageListenerContainer ;

import org.springframework.kafka.listener.ConcurrentMessageListenerContainer ;

import java.util.HashMap ;

import java.util.Map ;
 /** * @author liweihai * @date 2020/12/23 15:42 * @desc */ 
 @ Configuration @ EnableKafka 
 public class KafkaConfig {
	 @ Bean public KafkaListenerContainerFactory > kafkaListenerContainerFactory() {
		ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory <  > () ;

		factory.setConsumerFactory(consumerFactory()) ;

		factory.setConcurrency(10) ;

		factory.getContainerProperties().setPollTimeout(15000) ;

		factory.setBatchListener(true) ;
 
		//@KafkaListener 批量消费 每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG 
		factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE) ;
 
		//设置提交偏移量的方式 
		return factory ;
 } 
		public ConsumerFactory consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()) ;
 } 
		public Map consumerConfigs() { 
		Map propsMap = new HashMap<>(16) ;
 
		propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.117.118:9092") ;
 
		propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false) ;
 
		propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 10000) ;
 
		propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 198000) ;
 
		propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,200000) ;
 
		propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) ;
 
		propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) ;
 
		propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "groupIdEnterprise") ;
 
		propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") ;
 
		propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000) ;
 
		propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 200000) ;
 
		propsMap.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024) ;
 propsMap.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 190000) ;
 //每个批次获取数 return propsMap ;
 } } 
package com.qianlima.solr.qy.service.indexer.impl;

import com.alibaba.fastjson.JSONObject;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.qianlima.solr.qy.bean.companys.EnterpriseAttachedEntity;
import com.qianlima.solr.qy.bean.companys.EnterprisePositionEntity;
import com.qianlima.solr.qy.bean.companys.EnterpriseProducerEntity;
import com.qianlima.solr.qy.configuration.rocketmq.config.MqConfig;
import com.qianlima.solr.qy.configuration.rocketmq.producer.ProducerClient;
import com.qianlima.solr.qy.dto.DataDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

/**
 * @author liweihai
 * @date 2020/11/30 13:53
 * @desc
 */
@Slf4j
@Service
public class EnterpriseProducerService {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    @Autowired
    private ProducerClient producerClient;
    @Autowired
    private ProducerBean producer;
    @Autowired
    private MqConfig mqConfig;



    /**
     * 联系电话
     *
     * @param entity
     */
    public void sendEnterpriseContactData(EnterpriseProducerEntity entity) {

        kafkaTemplate.send("topicEnterpriseContact", JSONObject.toJSONString(entity));

    }

    /**
     * 公告
     *
     * @param entity
     */
    public void sendEnterpriseNoticeData(DataDTO entity) {

        kafkaTemplate.send("topicEnterpriseNotice", JSONObject.toJSONString(entity));

    }

    /**
     * 职位
     *
     * @param entity
     */
    public void sendEnterprisePositionData(EnterprisePositionEntity entity) {

        kafkaTemplate.send("topicEnterprisePosition", JSONObject.toJSONString(entity));

    }

    /**
     * 附属
     *
     * @param entity
     */
    public void sendEnterpriseAttachedData(EnterpriseAttachedEntity entity) {

        kafkaTemplate.send("topicEnterpriseAttached", JSONObject.toJSONString(entity));

    }

    /**
     * 关系
     *
     * @param entity
     */
    public void sendEnterpriseRelationData(EnterpriseProducerEntity entity) {

        kafkaTemplate.send("topicEnterpriseRelation", JSONObject.toJSONString(entity));
    }








    /**
     * 联系电话
     *
     * @param entity
     */
    public void sendEnterpriseContactDataRocket(EnterpriseProducerEntity entity) {

        producerClient.send(producer, JSONObject.toJSONString(entity),mqConfig.getTagContact(),entity.getId()+"-"+entity.getCate());

    }

    /**
     * 公告
     *
     * @param entity
     */
    public void sendEnterpriseNoticeDataRocket(DataDTO entity) {

        producerClient.send(producer, JSONObject.toJSONString(entity),mqConfig.getTagNotice(),String.valueOf(entity.getId()));

    }

    /**
     * 职位
     *
     * @param entity
     */
    public void sendEnterprisePositionDataRocket(EnterprisePositionEntity entity) {

        producerClient.send(producer, JSONObject.toJSONString(entity),mqConfig.getTagPosition(),entity.getId()+"-"+entity.getCate());

    }

    /**
     * 附属
     *
     * @param entity
     */
    public void sendEnterpriseAttachedDataRocket(EnterpriseAttachedEntity entity) {

        producerClient.send(producer,JSONObject.toJSONString(entity),mqConfig.getTagAttached(),String.valueOf(entity.getId()));

    }


}

 

 

 

kafka:
  bootstrap-servers: 192.168.117.118:9092
  producer:
    retries: 0
    batch-size: 16384
    buffer-memory: 33554432
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.apache.kafka.common.serialization.StringSerializer
    properties:
      linger.ms: 1
  consumer:
    enable-auto-commit: true
    auto-commit-interval: 100ms
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    properties:
      session.timeout.ms: 15000
    max-poll-records: 1000
展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部