文档章节

kafka

xiongsheng
 xiongsheng
发布于 2016/03/07 10:55
字数 676
阅读 114
收藏 3

二、kafka


1、开发流程

    produce

Properties props = new Properties();
// 此处配置的是kafka的端口
props.put("metadata.broker.list", broker_list);
// 配置value的序列化类
props.put("serializer.class", serializer);
// 配置key的序列化类
props.put("key.serializer.class", key);
props.put("request.required.acks", acks);
producer = new Producer<String, String>(new ProducerConfig(props));
KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>(topic, key, data);
producer.send(keyedMessage);
consumer
    Properties props = new Properties();
        //zookeeper 配置
        props.put("zookeeper.connect", connect);
        //group 代表一个消费组
        props.put("group.id", group_id);
        //zk连接超时
        props.put("zookeeper.session.timeout.ms", session_timeout);
        props.put("zookeeper.sync.time.ms", sync);
        props.put("auto.commit.interval.ms", interval);
        props.put("auto.offset.reset", offset_reset);
        //序列化类
        props.put("serializer.class", serializer);
        ConsumerConfig config = new ConsumerConfig(props);
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
        Map<String, List<KafkaStream<String, String>>> consumerMap =  consumer.createMessageStreams(topicCountMap,keyDecoder,valueDecoder);
        KafkaStream<String, String> stream = consumerMap.get(topic).get(0);

2、高吞吐量原理:  数据磁盘持久化:消息不在内存中cache,直接写入到磁盘,充分利用磁盘的顺序读写性能。

  支持数据批量发送和拉取。

  支持数据压缩。

                   Topic划分为多个partition,提高并行处理能力。

  因为每条消息都被append到该Partition中,属于顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证

  offet由Consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些消费过,也不需要通过broker去保证同一个Consumer Group只有一个Consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障

3、特新

Kafka集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),因此Kafka提供两种策略删除旧数据。

一是基于时间,二是基于Partition文件大小。例如可以通过配置$KAFKA_HOME/config/server.properties

log.segment.bytes

消息被路由到哪个partition上,有producer客户端决定.比如可以采用"random""key-hash""轮询"等

在JMS实现中,Topic模型基于push方式,即broker将消息推送给consumer端.不过在kafka中,采用了pull方式,即consumer在和broker建立连接之后,主动去pull(或者说fetch)消息;

这中模式有些优点,首先consumer端可以根据自己的消费能力适时的去fetch消息并处理,且可以控制消息消费的进度(offset);

4.HA机制

Kafka分配Replica的算法如下:


将所有Broker(假设共n个Broker)和待分配的Partition排序

将第i个Partition分配到第(i mod n)个Broker上

将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上


© 著作权归作者所有

上一篇: java基础
xiongsheng
粉丝 2
博文 68
码字总数 35128
作品 0
程序员
私信 提问

暂无文章

JS基础-该如何理解原型、原型链?

JS的原型、原型链一直是比较难理解的内容,不少初学者甚至有一定经验的老鸟都不一定能完全说清楚,更多的"很可能"是一知半解,而这部分内容又是JS的核心内容,想要技术进阶的话肯定不能对这个...

OBKoro1
今天
7
0
高防CDN的出现是为了解决网站的哪些问题?

高防CDN是为了更好的服务网络而出现的,是通过高防DNS来实现的。高防CDN是通过智能化的系统判断来路,再反馈给用户,可以减轻用户使用过程的复杂程度。通过智能DNS解析,能让网站访问者连接到...

云漫网络Ruan
今天
14
0
OSChina 周一乱弹 —— 熟悉的味道,难道这就是恋爱的感觉

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @xiaoshiyue :好久没分享歌了分享张碧晨的单曲《今后我与自己流浪》 《今后我与自己流浪》- 张碧晨 手机党少年们想听歌,请使劲儿戳(这里)...

小小编辑
今天
3K
24
SpringBoot中 集成 redisTemplate 对 Redis 的操作(二)

SpringBoot中 集成 redisTemplate 对 Redis 的操作(二) List 类型的操作 1、 向列表左侧添加数据 Long leftPush = redisTemplate.opsForList().leftPush("name", name); 2、 向列表右......

TcWong
今天
46
0
排序––快速排序(二)

根据排序––快速排序(一)的描述,现准备写一个快速排序的主体框架: 1、首先需要设置一个枢轴元素即setPivot(int i); 2、然后需要与枢轴元素进行比较即int comparePivot(int j); 3、最后...

FAT_mt
昨天
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部