文档章节

kafka将offset自动重置为最新

ktlb
 ktlb
发布于 2017/05/26 15:48
字数 347
阅读 315
收藏 0

 1.可以将kafka中的偏移量自动重置为最新的

 2.使用于kafka有积压,但是也不想处理积压,直接消费最新的数据

 3.此版本只支持offset存储在zk中, 暂未提供offset存储在kafka中的版本

# -*- coding:utf-8 -*-
import time

import sys
from kafka.client_async import KafkaClient
from kafka.protocol.commit import OffsetCommitRequest_v0
from kafka.protocol.offset import OffsetRequest_v0, OffsetResponse_v0
from kafka.structs import TopicPartition

servers = '192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092'
gid = 'group_id'
topic = 'topic'

# 手动重置时需要配置,同时修改main方法中调用方法
manual_logsize = sys.maxint


def parse_logsize(t, p, responses):
    """
        单个broker中单个partition的logsize
    :param responses:
    :param p:
    :param t:
    :return:
    """
    for response in responses:
        if not isinstance(response, OffsetResponse_v0):
            continue
        tps = response.topics
        tpc = tps[0][0]
        partition_list = tps[0][1]
        parti = partition_list[0][0]
        if tpc == t and parti == p and partition_list[0][1] == 0:
            logsize_list = partition_list[0][2]
            logsize = logsize_list[0]
            return logsize
    return None


def auto_lateset(g, t):
	"""
		自动重置为最新的offset
	"""
    client = KafkaClient(bootstrap_servers=servers, request_timeout_ms=3000)
    partitions = client.cluster.partitions_for_topic(t)
    for partition in partitions:
        nodeId = client.cluster.leader_for_partition(TopicPartition(topic=t, partition=partition))
        while not client.is_ready(nodeId):
            client.ready(nodeId)
            time.sleep(1)
        client.send(nodeId, OffsetRequest_v0(replica_id=-1, topics=[(t, [(partition, -1, 1)])]))
        log_size = parse_logsize(t, partition, client.poll(timeout_ms=3000))
        if log_size:
            client.send(nodeId, OffsetCommitRequest_v0(consumer_group=g,
                                                       topics=[(t, [(partition, log_size, '')])]))
            print client.poll()


def manual(g, t, log_size):
	"""
		手动重置offset为 manual_logsize的值,注意:所有分区都会重置
	"""
    client = KafkaClient(bootstrap_servers=servers, request_timeout_ms=3000)
    partitions = client.cluster.partitions_for_topic(t)
    for partition in partitions:
        nodeId = client.cluster.leader_for_partition(TopicPartition(topic=t, partition=partition))
        while not client.is_ready(nodeId):
            client.ready(nodeId)
            time.sleep(1)
        client.send(nodeId, OffsetCommitRequest_v0(consumer_group=g,
                                                   topics=[(t, [(partition, log_size, '')])]))
        print client.poll()


if __name__ == "__main__":
    auto_lateset(gid, topic)
    # manual(gid, topic, manual_logsize)

 

© 著作权归作者所有

ktlb
粉丝 5
博文 14
码字总数 5517
作品 0
南京
程序员
私信 提问
kafka 的offset的重置

转载自:https://www.cnblogs.com/hd-zg/p/5831219.html 最近在spark读取kafka消息时,每次读取都会从kafka最新的offset读取。但是如果数据丢失,如果在使用Kafka来分发消息,在数据处理的过...

weixin_37589896
2017/11/29
0
0
Learning Apache Kafka 2nd Edition读书笔记

Chap 1 Kafka简介 1.Apache Kafka是一款开源的,分布式的,基于分区、日志提交和订阅推送的消息系统。设计用于: 持久化消息到硬盘,TB级别 高吞吐量,每秒数百M读写 分布式,支持弹性伸缩 ...

GunnerAha
2018/04/10
0
0
spark streaming从指定offset处消费Kafka数据

一、情景:当Spark streaming程序意外退出时,数据仍然再往Kafka中推送,然而由于Kafka默认是从latest的offset读取,这会导致数据丢失。为了避免数据丢失,那么我们需要记录每次消费的offse...

刺猬一号
2018/07/19
0
0
Kafka: Consumer

Kafka Consumer 通过之前的架构介绍,对Consumer有了一个初步的了解。这里再深入一点来了解一下Consumer。 1、Consumer Group与Topic订阅 1.1 Consumer与partition 1.2 Consumer与Consumer ...

LUIS1983
2018/08/10
16
0
Kafka offset存储方式与获取消费实现

转载:http://www.aboutyun.com/thread-21104-1-1.html 1.概述 目前,Kafka 官网最新版[0.10.1.1],已默认将消费的 offset 迁入到了 Kafka 一个名为 __consumer_offsets 的Topic中。其实,早...

xiaomin0322
2018/05/10
34
0

没有更多内容

加载失败,请刷新页面

加载更多

DDD(五)

1、引言 之前学习了解了DDD中实体这一概念,那么接下来需要了解的就是值对象、唯一标识。值对象,值就是数字1、2、3,字符串“1”,“2”,“3”,值时对象的特征,对象是一个事物的具体描述...

MrYuZixian
今天
3
0
数据库中间件MyCat

什么是MyCat? 查看官网的介绍是这样说的 一个彻底开源的,面向企业应用开发的大数据库集群 支持事务、ACID、可以替代MySQL的加强版数据库 一个可以视为MySQL集群的企业级数据库,用来替代昂贵...

沉浮_
今天
4
0
解决Mac下VSCode打开zsh乱码

1.乱码问题 iTerm2终端使用Zsh,并且配置Zsh主题,该主题主题需要安装字体来支持箭头效果,在iTerm2中设置这个字体,但是VSCode里这个箭头还是显示乱码。 iTerm2展示如下: VSCode展示如下: 2...

HelloDeveloper
今天
6
0
常用物流快递单号查询接口种类及对接方法

目前快递查询接口有两种方式可以对接,一是和顺丰、圆通、中通、天天、韵达、德邦这些快递公司一一对接接口,二是和快递鸟这样第三方集成接口一次性对接多家常用快递。第一种耗费时间长,但是...

程序的小猿
今天
6
0
Python机器学习之数据探索可视化库yellowbrick

背景介绍 从学sklearn时,除了算法的坎要过,还得学习matplotlib可视化,对我的实践应用而言,可视化更重要一些,然而matplotlib的易用性和美观性确实不敢恭维。陆续使用过plotly、seaborn,...

yeayee
今天
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部