文档章节

python通过Pykafka库来连接kafka并收发消息

啊哈关关
 啊哈关关
发布于 2016/11/15 11:31
字数 444
阅读 7194
收藏 1

1.安装pykafka

pip install pykafka 

2.下载安装

git clone https://github.com/Parsely/pykafka.git

然后将下载下来的pykafka文件夹下的pykafka文件(pykafka的库文件)放到/Library/Python/2.7/site-packages/路径下即可

3.假设你有至少一个卡夫卡实例在本地运行,你可以使用pykafka连接它。

consumer.py 消费者

#!/usr/bin/python
# -*- coding:utf-8 -*-
from pykafka import KafkaClient

#kafka默认端口为9092
client = KafkaClient(hosts='192.168.1.140:9092,192.168.1.141:9092,192.168.1.142:9092')#这里连接多个客户端
topic = client.topics['test_kafka_topic']

#从zookeeper消费,zookeeper的默认端口为2181
balanced_consumer = topic.get_balanced_consumer(
    consumer_group='test_kafka_group',
    auto_commit_enable=True,  # 设置为False的时候不需要添加consumer_group,直接连接topic即可取到消息
    zookeeper_connect='192.168.1.140:2181,192.168.1.141:2181,192.168.1.142:2181'#这里就是连接多个zk
)

for message in balanced_consumer:
    # print message
    if message is not None:
        print message.offset, message.value#打印接收到的消息体的偏移个数和值

producer.py 生产者

#!/usr/bin/python
# -*- coding:utf-8 -*-

from pykafka import KafkaClient
 
client = KafkaClient(hosts ="192.168.1.140:9092,192.168.1.141:9092,192.168.1.142:9092") #可接受多个client
#查看所有的topic
client.topics
print client.topics


topic = client.topics['test_kafka_topic']#选择一个topic

message ="test message test message"
#当有了topic之后呢,可以创建一个producer,来发消息,生产kafka数据,通过字符串形式,
with topic.get_sync_producer() as producer:
    producer.produce(message)
#The example above would produce to kafka synchronously - 
#the call only returns after we have confirmation that the message made it to the cluster.
#以上的例子将产生kafka同步消息,这个调用仅仅在我们已经确认消息已经发送到集群之后

#但生产环境,为了达到高吞吐量,要采用异步的方式,通过delivery_reports =True来启用队列接口;
with topic.get_sync_producer() as producer:
     producer.produce('test message',partition_key='{}'.)
producer=topic.get_producer()
producer.produce(message)
print message

 

© 著作权归作者所有

啊哈关关
粉丝 8
博文 176
码字总数 77974
作品 0
深圳
程序员
私信 提问
加载中

评论(1)

Jack0606
Jack0606
哥哥 `producer.produce('test message',partition_key='{}' ` 能解释一下 `partition_key` 的作用吗
python kafka kerberos 验证 消费 生产

[toc] 安装 pykafkagithub 注意kafka版本只支持 kafka 1.1, 1.0,0.11, 0.10, 0.9,0.8 (201902) 该作者在https://github.com/dpkp/kafka-python/pull/1152 这个推送增加了kerberos支持 验证......

stys35
02/28
0
0
打造基于Python的流式数据分析平台

基于Python已经有多个科学研究和数据分析库,使用非常方便。结合OpenStack(http://www.openstack.org)、RabbitMQ(http://www.rabbitmq.com)、Celery(http://www.celeryproject.org)可以...

openthings
2015/05/21
0
3
spark hive python依赖第三方包

下载python对应版本源代码,https://www.python.org/downloads/source/ 构建过程: 3. 安装需要用到的库,以 pykafka 为例 4. 打包 5. 上传到hdfs spark yarn client模式 spark yarn cluster...

张欢19933
01/10
0
0
Apache Kafka 客户端 SDK - KafkaBridge

KafkaBridge 是奇虎 360 开源的 Kafka 客户端 SDK ,底层基于 librdkafka ,与之相比封装了大量的使用细节,简单易用,使用者无需了解过多的 Kafka 系统细节,只需调用极少量的接口,就可完成...

匿名
2018/10/16
0
0
小简历一份有意思的看看

简 历 基 本 信 息 姓 名 蒋宁 性 别 男 政治面貌 团员 出生日期 1991.11.20 院校专业 计算机网络技术 地 址 北京海淀区菊园东站 电子邮件 591508750@qq.com 联系电话 18211103203 求 职 意 ...

宁宁123韩
2013/10/16
5K
37

没有更多内容

加载失败,请刷新页面

加载更多

android6.0源码分析之Camera API2.0下的Preview(预览)流程分析

本文将基于android6.0的源码,对Camera API2.0下Camera的preview的流程进行分析。在文章android6.0源码分析之Camera API2.0下的初始化流程分析中,已经对Camera2内置应用的Open即初始化流程进...

天王盖地虎626
31分钟前
2
0
java 序列化和反序列化

1. 概述 序列恢复为Java对象的过程。 对象的序列化主要有两 首先我们介绍下序列化和反序列化的概念: 序列化:把Java对象转换为字节序列的过程。 反序列化:把字节序列恢复为Java对象的过程。...

edison_kwok
43分钟前
1
0
分布式数据一致性

狼王黄师傅
今天
2
0
经验

相信每位开发者在自己开发的过程中,都会反思一些问题,比如怎样提高编程能力、如何保持心态不砍产品经理、996 之后怎样恢复精力……最近开发者 Tomasz Łakomy 将他 7 年的开发生涯中学习到...

WinkJie
今天
4
0
从源码的角度来看SpringMVC

SpringMVC核心流程图 简单总结 首先请求进入DispatcherServlet 由DispatcherServlet 从HandlerMappings中提取对应的Handler 此时只是获取到了对应的Handle,然后得去寻找对应的适配器,即:H...

骚年锦时
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部