文档章节

Kafkactl

China_OS
 China_OS
发布于 2017/03/21 14:20
字数 431
阅读 1
收藏 0

实时监控topic消息进入条数

#!/usr/local/bin/python2.7 
# coding:utf-8

import re
import os
import sys
import argparse
import kafka
import shlex
import time
import datetime
import netifaces
import subprocess
import ConfigParser
from  kazoo.client import KazooClient
from termcolor import cprint 

action = ['list', 'real', 'count']
broker = '10.205.151.11:9092'

def localip():
    allfaces = netifaces.interfaces()
    if 'bond0' in allfaces:
        bond_ip = netifaces.ifaddresses('bond0')[netifaces.AF_INET][0]['addr']
    elif 'eth0' in allfaces:
        bond_ip = netifaces.ifaddresses('eth0')[netifaces.AF_INET][0]['addr']
    elif 'eth1' in allfaces:
        bond_ip = netifaces.ifaddresses('eth1')[netifaces.AF_INET][0]['addr']
    elif 'eth2' in allfaces:
        bond_ip = netifaces.ifaddresses('eth2')[netifaces.AF_INET][0]['addr']
    else:
        cprint ("ERROR,Please check local ip address",'red')
        sys.exit()
    return bond_ip

def get_zkadd():

    kafkaconf = '/opt/programs/kafka_0.10.0/config/server.properties'
    command = "grep '^zookeeper.connect=' %s " % kafkaconf
    command = shlex.split(command)
    zkadd = subprocess.Popen(command,stdout=subprocess.PIPE).communicate()[0].split('=')[1] 

    return zkadd

def get_brokers():

    ip1,ip2,ip3,ip4 = localip().split('.')
    ips = '.'.join([ip1,ip2,ip3])
  
    zkadd = get_zkadd()

    zk = KazooClient(hosts='%s' % zkadd,read_only=True)
    zk.start()
    ids = zk.get_children('/brokers/ids')
    zk.stop()
    zk.close()
    
    ip = ''
    for i in ids:
	ip = ip + '%s.%s:9092,' % (ips,i)
    return ip.strip(',')

def list_topic():

    ip = localip()

    zkadd = get_zkadd()
    zk = KazooClient(hosts='%s' % zkadd,read_only=True)
    zk.start()
    topics = zk.get_children('/brokers/topics')
    topics.sort()
    zk.stop()
    zk.close()
    cprint('===== All Topics =====', 'yellow') 
    for topic in topics:
	cprint ('%s' % topic, 'cyan')

def count_topic(topic):

    consumer=kafka.KafkaConsumer(group_id='kafkactl',bootstrap_servers=['%s' % broker],enable_auto_commit=False)

    alls = int(0)
    cprint('===== Count Topics =====', 'yellow')
    datenow = datetime.datetime.now().strftime('%H:%M:%S')
    for ids in consumer.partitions_for_topic(topic):
        partition = kafka.structs.TopicPartition(topic,ids)
        consumer.assign([partition])
        alls = alls + consumer.position(partition)
    cprint('%s======[%d]' % (datenow,int(alls)), 'cyan')



def count_real(topic):

    consumer=kafka.KafkaConsumer(group_id='kafkactl',bootstrap_servers=['10.205.151.11:9092'],enable_auto_commit=False)
    
    cprint('===== Real Count Topics =====', 'yellow')
    while True:
	all1 = int(0)
	all2 = int(0)
        datenow1 = datetime.datetime.now().strftime('%H:%M:%S')
        for ids in consumer.partitions_for_topic(topic):
            partition = kafka.structs.TopicPartition(topic,ids)
            consumer.assign([partition])
            all1 = all1 + consumer.position(partition)
    
        time.sleep(2)
  
        datenow2 = datetime.datetime.now().strftime('%H:%M:%S')
        for ids in consumer.partitions_for_topic(topic):
	    partition = kafka.structs.TopicPartition(topic,ids)
	    consumer.assign([partition])
	    all2 = all2 + consumer.position(partition)
	alls = int(all2) - int(all1)
        cprint('%s======[%d]' % (datenow2,alls), 'cyan')


def main():
    parser = argparse.ArgumentParser(description='Count Kafka Topic message')
    parser.add_argument('topic', metavar='TopicName', help='Topic Name or all')
    parser.add_argument('action', choices=action, metavar='Action', help='Action in %s' % ', '.join(action))
    args = parser.parse_args()
   
    if args.topic:
	if args.action == 'list':
	    list_topic()
        if args.action == 'count':
	    count_topic(args.topic)	
        if args.action == 'real':
	    count_real(args.topic)
 

if __name__ == '__main__':
    main()

 

© 著作权归作者所有

China_OS
粉丝 427
博文 463
码字总数 519985
作品 0
静安
技术主管
私信 提问
如何在Rails应用程序中使用Kafka?

背景介绍 有那么一段时间,我们的系统需要用到分布式流式处理和消息系统,而 Apache Kafka 似乎成了我们建立业务关键型应用程序的坚实基础。它可用于很多场景下,比如产品更新管道、订单跟踪...

java菜分享
2018/12/16
11
0

没有更多内容

加载失败,请刷新页面

加载更多

没有更多内容

64.监控平台介绍 安装zabbix 忘记admin密码

19.1 Linux监控平台介绍 19.2 zabbix监控介绍 19.3/19.4/19.6 安装zabbix 19.5 忘记Admin密码如何做 19.1 Linux监控平台介绍: 常见开源监控软件 ~1.cacti、nagios、zabbix、smokeping、ope...

oschina130111
今天
13
0
当餐饮遇上大数据,嗯真香!

之前去开了一场会,主题是「餐饮领袖新零售峰会」。认真听完了餐饮前辈和新秀们的分享,觉得获益匪浅,把脑子里的核心纪要整理了一下,今天和大家做一个简单的分享,欢迎感兴趣的小伙伴一起交...

数澜科技
今天
7
0
DNS-over-HTTPS 的下一代是 DNS ON BLOCKCHAIN

本文作者:PETER LAI ,是 Diode 的区块链工程师。在进入软件开发领域之前,他主要是在做工商管理相关工作。Peter Lai 也是一位活跃的开源贡献者。目前,他正在与 Diode 团队一起开发基于区块...

红薯
今天
12
0
CC攻击带来的危害我们该如何防御?

随着网络的发展带给我们很多的便利,但是同时也带给我们一些网站安全问题,网络攻击就是常见的网站安全问题。其中作为站长最常见的就是CC攻击,CC攻击是网络攻击方式的一种,是一种比较常见的...

云漫网络Ruan
今天
12
0
实验分析性专业硕士提纲撰写要点

为什么您需要研究论文的提纲? 首先当您进行研究时,您需要聚集许多信息和想法,研究论文提纲可以较好地组织你的想法, 了解您研究资料的流畅度和程度。确保你写作时不会错过任何重要资料以此...

论文辅导员
今天
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部