Kafkactl

原创
2017/03/21 14:20
阅读数 73

实时监控topic消息进入条数

#!/usr/local/bin/python2.7 
# coding:utf-8

import re
import os
import sys
import argparse
import kafka
import shlex
import time
import datetime
import netifaces
import subprocess
import ConfigParser
from  kazoo.client import KazooClient
from termcolor import cprint 

action = ['list', 'real', 'count']
broker = '10.205.151.11:9092'

def localip():
    allfaces = netifaces.interfaces()
    if 'bond0' in allfaces:
        bond_ip = netifaces.ifaddresses('bond0')[netifaces.AF_INET][0]['addr']
    elif 'eth0' in allfaces:
        bond_ip = netifaces.ifaddresses('eth0')[netifaces.AF_INET][0]['addr']
    elif 'eth1' in allfaces:
        bond_ip = netifaces.ifaddresses('eth1')[netifaces.AF_INET][0]['addr']
    elif 'eth2' in allfaces:
        bond_ip = netifaces.ifaddresses('eth2')[netifaces.AF_INET][0]['addr']
    else:
        cprint ("ERROR,Please check local ip address",'red')
        sys.exit()
    return bond_ip

def get_zkadd():

    kafkaconf = '/opt/programs/kafka_0.10.0/config/server.properties'
    command = "grep '^zookeeper.connect=' %s " % kafkaconf
    command = shlex.split(command)
    zkadd = subprocess.Popen(command,stdout=subprocess.PIPE).communicate()[0].split('=')[1] 

    return zkadd

def get_brokers():

    ip1,ip2,ip3,ip4 = localip().split('.')
    ips = '.'.join([ip1,ip2,ip3])
  
    zkadd = get_zkadd()

    zk = KazooClient(hosts='%s' % zkadd,read_only=True)
    zk.start()
    ids = zk.get_children('/brokers/ids')
    zk.stop()
    zk.close()
    
    ip = ''
    for i in ids:
	ip = ip + '%s.%s:9092,' % (ips,i)
    return ip.strip(',')

def list_topic():

    ip = localip()

    zkadd = get_zkadd()
    zk = KazooClient(hosts='%s' % zkadd,read_only=True)
    zk.start()
    topics = zk.get_children('/brokers/topics')
    topics.sort()
    zk.stop()
    zk.close()
    cprint('===== All Topics =====', 'yellow') 
    for topic in topics:
	cprint ('%s' % topic, 'cyan')

def count_topic(topic):

    consumer=kafka.KafkaConsumer(group_id='kafkactl',bootstrap_servers=['%s' % broker],enable_auto_commit=False)

    alls = int(0)
    cprint('===== Count Topics =====', 'yellow')
    datenow = datetime.datetime.now().strftime('%H:%M:%S')
    for ids in consumer.partitions_for_topic(topic):
        partition = kafka.structs.TopicPartition(topic,ids)
        consumer.assign([partition])
        alls = alls + consumer.position(partition)
    cprint('%s======[%d]' % (datenow,int(alls)), 'cyan')



def count_real(topic):

    consumer=kafka.KafkaConsumer(group_id='kafkactl',bootstrap_servers=['10.205.151.11:9092'],enable_auto_commit=False)
    
    cprint('===== Real Count Topics =====', 'yellow')
    while True:
	all1 = int(0)
	all2 = int(0)
        datenow1 = datetime.datetime.now().strftime('%H:%M:%S')
        for ids in consumer.partitions_for_topic(topic):
            partition = kafka.structs.TopicPartition(topic,ids)
            consumer.assign([partition])
            all1 = all1 + consumer.position(partition)
    
        time.sleep(2)
  
        datenow2 = datetime.datetime.now().strftime('%H:%M:%S')
        for ids in consumer.partitions_for_topic(topic):
	    partition = kafka.structs.TopicPartition(topic,ids)
	    consumer.assign([partition])
	    all2 = all2 + consumer.position(partition)
	alls = int(all2) - int(all1)
        cprint('%s======[%d]' % (datenow2,alls), 'cyan')


def main():
    parser = argparse.ArgumentParser(description='Count Kafka Topic message')
    parser.add_argument('topic', metavar='TopicName', help='Topic Name or all')
    parser.add_argument('action', choices=action, metavar='Action', help='Action in %s' % ', '.join(action))
    args = parser.parse_args()
   
    if args.topic:
	if args.action == 'list':
	    list_topic()
        if args.action == 'count':
	    count_topic(args.topic)	
        if args.action == 'real':
	    count_real(args.topic)
 

if __name__ == '__main__':
    main()

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部