Message Queue 之 Kafka
Author: Lijb
Email: lijb1121@163.com
Message:一般用于系统间通信,通常系统间通信之间通过两种方式发送消息.即时消息:消息发送方和接收方必须同时在线,例如WebService、Dubbo等这些常见的RPC框架一般发送都是即时消息(类似打电话)。离线消息:消息发送方和消息接受方不需要同时在线,实现消息异步接收。例如:Message Queue就是专门用于发送异步消息(类似发短信)。
消息队列:消息遵循先进先出的原则FIFO ,kafka 属于发布与订阅的
kafka 特点:
1.高吞吐量、低延迟:(把消息书写到本地磁盘上,持久化消息,利用磁盘的顺序读写,追加文件)
Message Queue使用场景
参考:http://www.cnblogs.com/linjiqin/p/5720865.html
消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ
- 异步通信
- 系统间解耦
- 削峰填谷
Kafka 系统架构
生产者:发送消息的进程,将消息发送到topic中
消费者:消费消息的进程,从topic中读取消息。
ConsumerGroup:由多个消费者组成的组,每一个组中的所有消费者共同消费一个完整的topic,彼此消费的消息不重复。
broker:对应一个kafka实例。kafka集群负责分区leader的选举和迁移。上右图中红线对应的是leader。
record:对应一个消息,key,value,ts(timestamp).
topic:kafka对消息进行分类,每一类对应一个topic,一个topic至少被分成一个分区partition,所有的分区的消息加起来组成一个topic。
replication:副本,每个分区都有多个副本。
leader、follower:leader负责消息的读写,follower负责从leader复制消息。
offset: 偏移量,每个分区中的offset是有序的,局部性的,对应每个record的唯一标识。
topic默认的分区策略:根据输入的key的hash值%分区个数。
zookeeper:负责kafka元数据的管理及Consumer相关数据的管理。
消费者的负载均衡:rang 均分| round-robin 轮询
总结:当一个ConsumerGroup中,有消费者成员Consumer加入或者离开时,就会触发kafka分区(partition)的重新分配,也就是partition的均衡,均衡的目的是为了提升topic的并发(多个线程并发消费分区)消费能力。至于哪个消费者消费哪一个分区,这是有一个算法的,这个算法是能够保证一个分区一定只能被一个消费者消费,而不能被多个消费者消费,还能够保证一个消费者可以消费多个分区。也就是分区和消费者之间是多对一(包含一对一)的关系。这个算法的最终结果是,当一个消费者组(ConsumerGroup)中消费者成员(Consumer)的数量大于一个topic分区的数量时,多余的消费者就没有办法消费到数据了。
搭建Kaka集群
-
搭建zookeeper集群并且启动(略)
-
解压并安装kafka集群(务必配置主机名和IP映射关系)
[root@CentOSX ~]# tar -zxf kafka_2.11-0.11.0.0.tgz -C /usr/ [root@CentOSX ~]# vi /usr/kafka_2.11-0.11.0.0/config/server.properties
#####Server Basics ######## broker.id=
0
# 每个节点都要修改,且不同 delete.topic.enable=trueSocket Server Settings
listeners=PLAINTEXT://
CentOSC
:9092 # 每个节点都要修改,且不同 ######### Log Basics ########## log.dirs=/usr/kafka-logsLog Retention Policy
log.retention.hours=168 ########## Zookeeper ########## zookeeper.connect=CentOSA:2181,CentOSB:2181,CentOSC:2181
-
启动Kafka集群
[root@CentOSX ~]# cd /usr/kafka_2.11-0.11.0.0/ [root@CentOSX kafka_2.11-0.11.0.0]# ./bin/kafka-server-start.sh -daemon config/server.properties
-
关闭kafka
[root@CentOSX ~]# vi /usr/kafka_2.11-0.11.0.0/bin/kafka-server-stop.sh PIDS=$(jps| grep Kafka | awk '{print $1}') # 修改 获取进程id 号
if [ -z "$PIDS" ]; then echo "No kafka server to stop" exit 1 else kill -s TERM $PIDS fi [root@CentOSX ~]# cd /usr/kafka_2.11-0.11.0.0/ [root@CentOSX kafka_2.11-0.11.0.0]# ./bin/kafka-server-stop.sh
Kafka测试
//创建分区
[root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --create --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 --topic topic01 --partitions 3 --replication-factor 3
//启动消费者
[root@CentOSB kafka_2.11-0.11.0.0]# ./bin/kafka-console-consumer.sh --bootstrap-server CentOSA:9092,CentOSB:9092,CentOSC:9092 --topic topic01 --from-beginning
//启动生产者
[root@CentOSC kafka_2.11-0.11.0.0]# ./bin/kafka-console-producer.sh --broker-list CentOSA:9092,CentOSB:9092,CentOSC:9092 --topic topic01
> hello kafka
Topic基本操作
-
创建topic
[root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --create --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 --topic topic01 --partitions 3 --replication-factor 3
partitions:分区的个数,replication-factor副本因子
-
查看topic详情
[root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --describe --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 --topic topic01 Topic:topic01 PartitionCount:3 ReplicationFactor:3 Configs: Topic: topic01 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: topic01 Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: topic01 Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
-
查看所有Topic
[root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --list --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 topic01 topic02 topic03
-
删除topic
[root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --delete --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 --topic topic03 Topic topic03 is marked for deletion. Note: This will have no impact if delete.topic.enable is not set to true.
-
修改Topic
[root@CentOSA kafka_2.11-0.11.0.0]# ./bin/kafka-topics.sh --alter --zookeeper CentOSA:2181,CentOSB:2181,CentOSC:2181 --topic topic02 --partitions 2 WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
分区数目只允许增加,不允许减少。
Java 连接Kafka集群
<!--kafka依赖-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.2</version>
</dependency>
生产者
//1.创建Properties对象
Properties props=new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
//2.序列化
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
KafkaProducer<String,String> kafkaProducer=new KafkaProducer<String, String>(props);
//3.封装Record
ProducerRecord<String,String> record=
new ProducerRecord<String, String>("topic01","0100","zhangsan 男 18");
//4.发送消息
kafkaProducer.send(record);
kafkaProducer.flush();
kafkaProducer.close();
消费者
//1.创建Properties对象
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"CentOSA:9092,CentOSB:9092,CentOSC:9092");
//2.反序列化
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG,"g1");
KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(props);
//3.订阅相应的topic
kafkaConsumer.subscribe(Arrays.asList("topic01"));
//4.开始获取消息
while(true){
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
String key=record.key();
String value = record.value();
long ts = record.timestamp();
int partition = record.partition();
long offset = record.offset();
System.out.println(key+"=>"+value+"\t offset "+offset+" ,partition:"+partition+"\t"+ts);
}
}
自定义序列化发送对象
public class ObjectSerializer implements Serializer<Object> { // Serializer该接口是kafka的类
public void configure(Map<String, ?> map, boolean isKey) {}
public byte[] serialize(String topic, Object o) {
return SerializationUtils.serialize((Serializable) o);
}
public void close() {}
}
---
public class ObjectDeserializer implements Deserializer<Object> {
public void configure(Map<String, ?> configs, boolean isKey) {
}
public Object deserialize(String topic, byte[] data) {
return SerializationUtils.deserialize(data);
}
public void close() {
}
}
如何干预Kafka分区策略 提高 record 的并行度
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,XxxPartitioner.class)
//自定义分区类实现Partitioner接口,然后指定分区策略
public class XxxPartitioner implements Partitioner {
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster){
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
return (key.hashCode()&Integer.MAX_VALUE)%numPartitions;
}
public void close(){
}
}
订阅形式
-
subscribe(订阅)方式
props.put(ConsumerConfig.GROUP_ID_CONFIG,"g1"); kafkaConsumer.subscribe(Arrays.asList("topic02"));
优点:可是自动实现 组内负载均衡和故障转移。
-
assign (分配)方式
TopicPartition part02 = new TopicPartition("topic02", 2); // 分区个数 kafkaConsumer.assign(Arrays.asList(part02));
优点:手动指定分区信息,缺点:无法实现负载均衡和故障转移
Offset自动提交
默认客户端自动开启了自动提交功能,默认提交时间间隔是5秒钟,用户可以采取手动提交的方式实现。开启手动提交如下:
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);//关闭自动提交
//消费代码后追加
kafkaConsumer.commitAsync();
或者适当调小自动提交时间间隔:
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);//关闭自动提交
Kafka Stream-High Level
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Arrays;
import java.util.Properties;
public class KafkaStreamDemo {
public static void main(String[] args) {
//1.创建Properties对象
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
//2.添加kafka集群信息
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "CentOSA:9092,CentOSB:9092,CentOSC:9092");
//3.添加数据类型
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
//4.创建KStreamBuilder并读取数据文件
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream("TextLinesTopic");
//5.对数据文件进行切分计算
KTable<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count("counts");
wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic");
//6.把KStreaamBuilder和Properties对象放入stream流
KafkaStreams streams = new KafkaStreams(builder, props);
//7.启动stream
streams.start();
}
}