文档章节

kafka/zookeeper学习记录

skanda
 skanda
发布于 2017/07/20 20:30
字数 488
阅读 16
收藏 1
点赞 0
评论 0

 

1,kafka依赖于zookeeper,下载:

kafka2.10-0.10.00包下载zookeeper3.4.10下载

2,配置启动ZOOKEEPER

    配置项:ZOOKEEPER_HOME,和PATH;参考:

修改zookeeper-3.4.10/conf下,zoo.conf文件:

设置项:

dataDir=/home/t/source/zookeeper-3.4.10/dataDir
dataLogDir=/home/t/source/zookeeper-3.4.10/dataLogDir

zookeeper启动:

./zkServer.sh start

3,配置启动kafka

修改kafka配置项:

启动kafka

./kafka-server-start.sh  ../config/server.properties

创建topic(消息类型)

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

生产消息:

./kafka-console-producer.sh  --broker-list localhost:9092 --topic test

消费消息:

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning  

最终效果:

生产端输入什么,消费端输出什么。

partition

分区,对于一个topic,3个分区,则同一组消费者数量应当<=3,否则有消费者接受不到数据;

http://www.cnblogs.com/liuwei6/p/6900686.html

Topic在逻辑上可以被认为是一个queue,每条消费都必须指定它的Topic,可以简单理解为必须指明把这条消息放进哪个queue里。为了使得Kafka的吞吐率可以线性提高,物理上把Topic分成一个或多个Partition,每个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的所有消息和索引文件。

kafka外网访问 advertised.listeners=PLAINTEXT://x.x.x.x:9092

kafka读写

import org.apache.kafka.clients.consumer.ConsumerRecord;  
import org.apache.kafka.clients.consumer.ConsumerRecords;  
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.Date;
import java.util.Properties;

import javax.print.attribute.standard.PrinterLocation;  
  
public class KafkaConsumerExample {  
    public static void main(String[] args) throws InterruptedException {  
        Properties props = new Properties();  
        props.put("bootstrap.servers", "192.168.1.166:9092");  
        props.put("group.id", "test13");  
        props.put("enable.auto.commit", "true");  
        props.put("auto.commit.interval.ms", "1000");  
        props.put("session.timeout.ms", "30000");  
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
        props.put("auto.offset.reset", "earliest");        
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 1024*1024*5);  
        //往kafka服务器提交消息间隔时间,0则立即提交不等待  
        props.put(ProducerConfig.LINGER_MS_CONFIG,0);  
        
        
        //Kafka Reader
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);  
        consumer.subscribe(Arrays.asList("test"));  
        consumer.seek(new TopicPartition("test", 1), 1);
        while (true) {  
            ConsumerRecords<String, String> records = consumer.poll(2000);
            System.out.println("-------------"+new Date());
            for (ConsumerRecord<String, String> record : records)  
                System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());  
        }
        
        /*
        //KafkaWriter
        KafkaProducer<String, String> productor = new KafkaProducer<>(props);
        productor.send(new ProducerRecord<String, String>("test", "aaa", "xiaoxiaoxiao2018"));
        */
    }  
}  

名词解释:

bootstrap.servers:Kafka集群连接串,可以由多个host:port组成【your.host.name:9092】

 

© 著作权归作者所有

共有 人打赏支持
skanda
粉丝 9
博文 75
码字总数 50007
作品 0
厦门
使用docker安装kafka

我们这里使用第三方docker镜像来安装kafka环境,分别是:wurstmeister/kafka 和 wurstmeister/zookeeper ,如需修改和自定义请查看对应的Dockerfile。 下载镜像 sudo docker pull wurstmeis...

John ⋅ 06/15 ⋅ 0

CentOs7 Kafka单机消息的发布-订阅

这段时间一直在学习大数据相关的知识,从Spark,Spark Streaming,Scala到Kafka等等,涉及到的知识面很多,总体看下来,觉得大数据还是很好玩的,在现在及以后的方方面面都很适用。下面说下K...

海岸线的曙光 ⋅ 03/02 ⋅ 0

docker容器中搭建kafka集群环境

Kafka集群管理、状态保存是通过zookeeper实现,所以先要搭建zookeeper集群 zookeeper集群搭建 一、软件环境: zookeeper集群需要超过半数的的node存活才能对外服务,所以服务器的数量应该是2...

qq_41587243 ⋅ 05/25 ⋅ 0

Kafka 单机和分布式环境搭建与案例使用

Kafka 单机和分布式环境搭建与案例使用 目录(?)[+] 一、单机环境搭建 官方参考文章: http://kafka.apache.org/quickstart 1、下载和解压安装包 这里下载了zookeeper和kafaka两个安装包,下载...

yucaifu1989 ⋅ 04/19 ⋅ 0

docker入门到实战(6)在docker中安装和使用kafka

下载镜像 这里使用了wurstmeister/kafka和wurstmeister/zookeeper这两个版本的镜像,在hub.docker.com中可以搜索到。 1、docker pull wurstmeister/zookeeper 2、docker pull wurstmeister/...

编程老司机 ⋅ 05/14 ⋅ 0

Kafka offset存储方式与获取消费实现

转载:http://www.aboutyun.com/thread-21104-1-1.html 1.概述 目前,Kafka 官网最新版[0.10.1.1],已默认将消费的 offset 迁入到了 Kafka 一个名为 __consumer_offsets 的Topic中。其实,早...

xiaomin0322 ⋅ 05/10 ⋅ 0

【Kafka 1.x】快速入门

本博客文章如无特别说明,均为原创!转载请注明出处:Big data enthusiast(http://www.lubinsu.com/) 本文链接地址:【Kafka 1.x】快速入门(http://www.lubinsu.com/index.php/archives/475)...

snoopy93 ⋅ 05/08 ⋅ 0

kafka的简单shell命令管理

启动kafka步骤 1、先启动zookeeper(kafka自带zookeeper的,可以启动自身的)当前目录为bin目录上一级 ./bin/zookeeper-server-start.sh config/zookeeper.properties & (启动后,基本上当前...

qq_38872310 ⋅ 04/16 ⋅ 0

kafka在windows上的安装、运行

kafka在windows上的安装、运行 目录(?)[+] 1.简介 Kafka是一种高吞吐量的分布式发布订阅消息系统。详细介绍可查阅官网:kafka官网 2.环境搭建 2.1 安装JDK 下载地址:jre下载 有关jdk的安装不...

yucaifu1989 ⋅ 04/21 ⋅ 0

从程序安装到设置,Kafka的配置属性解析!

  【IT168 技术】Kafka是由Scala和Java编写的最流行的发布者 - 订阅者模型之一。它最初由LinkedIn开发,后来经过开源。Kafka是一种高吞吐量的分布式发布订阅消息系统,因可以处理重负载而出...

it168网站 ⋅ 05/22 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

个人博客的运营模式能否学习TMALL天猫质量为上?

心情随笔|个人博客的运营模式能否学习TMALL天猫质量为上? 中国的互联网已经发展了很多年了,记得在十年前,个人博客十分流行,大量的人都在写博客,而且质量还不错,很多高质量的文章都是在...

原创小博客 ⋅ 今天 ⋅ 0

JavaScript零基础入门——(十一)JavaScript的DOM操作

JavaScript零基础入门——(十一)JavaScript的DOM操作 大家好,欢迎回到我们的JavaScript零基础入门。最近有些同学问我说,我讲的的比书上的精简不少。其实呢,我主要讲的是我在开发中经常会...

JandenMa ⋅ 今天 ⋅ 0

volatile和synchronized的区别

volatile和synchronized的区别 在讲这个之前需要先了解下JMM(Java memory Model :java内存模型):并发过程中如何处理可见性、原子性、有序性的问题--建立JMM模型 详情请看:https://baike.b...

MarinJ_Shao ⋅ 今天 ⋅ 0

深入分析Kubernetes Critical Pod(一)

Author: xidianwangtao@gmail.com 摘要:大家在部署Kubernetes集群AddOn组件的时候,经常会看到Annotation scheduler.alpha.kubernetes.io/critical-pod"="",以表示这是一个关键服务,那你知...

WaltonWang ⋅ 今天 ⋅ 0

原子性 - synchronized关键词

原子性概念 原子性提供了程序的互斥操作,同一时刻只能有一个线程能对某块代码进行操作。 原子性的实现方式 在jdk中,原子性的实现方式主要分为: synchronized:关键词,它依赖于JVM,保证了同...

dotleo ⋅ 今天 ⋅ 0

【2018.06.22学习笔记】【linux高级知识 14.4-15.3】

14.4 exportfs命令 14.5 NFS客户端问题 15.1 FTP介绍 15.2/15.3 使用vsftpd搭建ftp

lgsxp ⋅ 今天 ⋅ 0

JeeSite 4.0 功能权限管理基础(Shiro)

Shiro是Apache的一个开源框架,是一个权限管理的框架,实现用户认证、用户授权等。 只要有用户参与一般都要有权限管理,权限管理实现对用户访问系统的控制,按照安全规则或者安全策略控制用户...

ThinkGem ⋅ 昨天 ⋅ 0

python f-string 字符串格式化

主要内容 从Python 3.6开始,f-string是格式化字符串的一种很好的新方法。与其他格式化方式相比,它们不仅更易读,更简洁,不易出错,而且速度更快! 在本文的最后,您将了解如何以及为什么今...

阿豪boy ⋅ 昨天 ⋅ 0

Python实现自动登录站点

如果我们想要实现自动登录,那么我们就需要能够驱动浏览器(比如谷歌浏览器)来实现操作,ChromeDriver 刚好能够帮助我们这一点(非谷歌浏览器的驱动有所不同)。 一、确认软件版本 首先我们...

blackfoxya ⋅ 昨天 ⋅ 0

线性回归原理和实现基本认识

一:介绍 定义:线性回归在假设特证满足线性关系,根据给定的训练数据训练一个模型,并用此模型进行预测。为了了解这个定义,我们先举个简单的例子;我们假设一个线性方程 Y=2x+1, x变量为商...

wangxuwei ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部