文档章节

kafka消息中间件-python实现 -实现数据生产和消费-实用的脚本

o
 osc_wws45aot
发布于 2019/08/20 15:18
字数 256
阅读 5
收藏 0

精选30+云产品,助力企业轻松上云!>>>

kafka实现推送生产数据的tmp_sc.py脚本文件
import socket

import json
from kafka import KafkaProducer
from kafka.errors import KafkaError
#kafka的配置文件:(bootstrap_servers:kafka的集群地址,topic_name:主题,consumer_id:消费分组)
KAFKA_SETTING = {
'bootstrap_servers': ["172.x.x.x:9092", "172.x.x.x:9092", "172.x.x.x:9092"],
'topic_name': 'user_data',
'consumer_id': 'consumer_ai'
}
conf= KAFKA_SETTING

print("[setting] =", conf)


producer = KafkaProducer(bootstrap_servers=conf['bootstrap_servers'],
api_version = (0,10),
retries=5)

partitions = producer.partitions_for(conf['topic_name'])
print('Topic下分区: %s' % partitions)
#需要推送的数据:(推送到kafka的数据类型必须的json类型)
user_data = {
"appToken": "d23ea83dbf7c411aa36e5ab519f41818",
"appId": "JF_WK_001",
"mobile": "15950857927",
"isRealTimeReturn": True,
"applyTime": 15100226057,
"uuid": "a91140f54b898w85d7a50d4b95994",
"customerNo": 1153265851
}

send_data = bytes(json.dumps(user_data), encoding="utf-8")

try:
future = producer.send(conf['topic_name'], send_data)
future.get()
print('send message succeed.')
except KafkaError as e:
print('send message failed. [e] ='),

kafka实现推送生产数据的tmp_xf.py脚本文件
import socket
from kafka import KafkaConsumer
from kafka.errors import KafkaError


KAFKA_SETTING = {
'bootstrap_servers': ["172.x.x.x:9092", "172.x.x.x:9092", "172.x.x.x:9092"],
'topic_name': 'result_data',
'topic_name_user': 'user_data',
'consumer_id': 'consumer_ai'
}

conf = KAFKA_SETTING

consumer = KafkaConsumer(bootstrap_servers=conf['bootstrap_servers'],
group_id=conf['consumer_id'],
api_version = (0,10))

print('consumer start to consuming...')
consumer.subscribe((conf['topic_name'], ))
from IPython import embed
# embed()

print("consumer = ", consumer)
for message in consumer:
print(message.topic, message.offset, message.key, message.value, message.partition)
 
o
粉丝 0
博文 500
码字总数 0
作品 0
私信 提问
加载中
请先登录后再评论。

暂无文章

告别传统机房:3D 机房数据可视化实现智能化与VR技术的新碰撞

前言 随着各行业对计算机依赖性的日益提高,计算机信息系统的发展使得作为其网络设备、主机服务器、数据存储设备、网络安全设备等核心设备存放地的计算机机房日益显现出它的重要地位,而机房...

xhload3d
昨天
13
0
如何使用.css()应用!important? - How to apply !important using .css()?

问题: I am having trouble applying a style that is !important . 我在应用!important样式时遇到麻烦。 I've tried: 我试过了: $("#elem").css("width", "100px !important"); This doe......

富含淀粉
昨天
5
0
spring源码解析-xml配置文件读取

整个 XML配置文件读取的大致流程如下: 通过继承自AbstractBeanDefinitionReader中的方法,来使用ResourLoader将资源文件路径转换为对应的Resource文件(读取资源文件并将其转为Resource) ...

wc_飞豆
昨天
16
0
salesforce community cloud 1

NO.1 Universal Containers has a Community for their partners. They would like to add a new partner company and grant their users access to the Community. What is the first step ......

jinzongyu
昨天
11
0
如何使用PHP计算两个日期之间的差异? - How to calculate the difference between two dates using PHP?

问题: I have two dates of the form: 我有两个日期格式: Start Date: 2007-03-24 End Date: 2009-06-26 Now I need to find the difference between these two in the following form:......

技术盛宴
昨天
14
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部