python多进程处理大量日志
python多进程处理大量日志
mahengyang 发表于1年前
python多进程处理大量日志
  • 发表于 1年前
  • 阅读 108
  • 收藏 0
  • 点赞 0
  • 评论 0

腾讯云 新注册用户 域名抢购1元起>>>   

摘要: nginx日志数据量很大,每天几十G,原来是awk、sort、uniq对请求数据进行分析,业务复杂之后不便于维护,改为使用python处理,python相较于awk效率低很多,使用多进程的方式可以大大提高效率

nginx日志格式如下

{"serverType":"nginx","meta":{"clientIp":"36.163.27.71","accessTime":"10/Nov/2016:23:01:32 +0800","accessTime_unix":"1478790092.299","request":"POST /api/v250/index.api?a=1&b=2 HTTP/1.1","status":403,"bodySizes":578,"referer":"-","userAgent":"qt-android/2.5.0","host":"www.youpiaole.com","xForwardedFor":"36.163.27.71, 223.245.176.31","reqTime":"0.000","upRespTime":"-","httpVersion":"2.5.0","devType":"t_android","devId":"353591913759","channel":"t_android","appVer":"250","iosVer":"-","userId":"-","userType":"-","devToken":"-","srcType":"client","cookie":"JSESSIONID=D2D7E71BBCA2461A48B4D5F7015A2B3A; route=8bf44254c3316f9c4f0c454d3","upstreamIp":"192.168.1.101","scheme":"http"}}

原来使用cut、sed提取所需字段

zcat n_log/www.youpiaole.com/20161027/access/* | cut -d , -f 2,5,6,13,14,26 | sed 's/"//g;s/meta:{//;s/clientIp://;s/,request:/,/;s/?.*status:/,/;s/reqTime://;s/,upRespTime:/,/;s/,upstreamIp:/,/;s/GET //;s/POST //' > /tmp/20161027.log

再使用awk做统计

awk -F"," '{a[$1]+=1} END {for(i in a) {if(a[i] > 10) print a[i],i}}' /tmp/20161027.log

但由于日志是json格式,使用cut和sed提取所需字段过于繁琐,也不便于进一步的分析,后面改为使用python做统计分析。

python的多线程由于存在全局锁,效率不高,使用多进程的方式处理大量的日志数据

# -*- coding: utf-8 -*-
import string,gzip,json,sys,os,re,threading
import multiprocessing
from multiprocessing import Process,Queue,Lock,Manager
from Queue import Empty
import multiprocessing.pool
import sys 
reload(sys) 
# 设置字符编码格式为utf-8
sys.setdefaultencoding('utf-8')

test = len(sys.argv)
# 字符串转为数字,float类型
def time2float(t) :
  if t == '-':
    tmp = 0
  else :
    tmp = float(t)      
  return tmp

logfiles = []

destDir = sys.argv[1]
listfile = os.listdir(destDir)
# 遍历日志目录,获取每一个日志文件全路径
for path in listfile:
  path = destDir + path
  logfiles.append(path)

def parseLine (line, lock, urlQueue, clientIpQueue, upstreamIpQueue) :
  line = re.sub(r'\\x','',line)
  meta = json.loads(line)["meta"]
  # 提取所需字段
  clientIp = meta["clientIp"]
  # 提取请求url,并去掉无关紧要的信息
  request = meta["request"].replace("HTTP/1.1","").replace("POST ","").replace("GET ","")
  # 去掉url后面跟的参数
  request = re.sub(r'\?.*$',"",request)
  status = meta["status"]
  reqTime = time2float(meta["reqTime"])
  upRespTime = time2float(meta["upRespTime"])
  upstreamIp = meta["upstreamIp"]
  # 分别放入队列中
  urlQueue.put((request, status, reqTime, upRespTime))
  clientIpQueue.put(clientIp)
  upstreamIpQueue.put(upstreamIp)
 
def parse (path, lock, urlQueue, clientIpQueue, upstreamIpQueue) :
  fileContent = gzip.open(path, 'rt')
  a = 1
  for line in fileContent:
    a = a + 1
    # 如果是测试用的,到达指定行数(命令行最后一个参数)就退出
    if test == 3:
      if a > int(sys.argv[2]):
        return 1 
    parseLine(line, lock, urlQueue, clientIpQueue, upstreamIpQueue)

def readUrlQueue (urlQueue, urlsResultQueue, lock) :
  urls = {}
  while True:
    try:
      request,status,reqTime,upRespTime = urlQueue.get(True, 2)
      # 存入字典中   
      urlCount,urlReqTime,urlUpRespTime,urlStatus = urls.get(request,(1,reqTime,upRespTime,{}))
      urlStatus[status] = urlStatus.get(status,1) + 1
      urls[request] = (urlCount + 1, urlReqTime + reqTime, urlUpRespTime + upRespTime, urlStatus)
    except Empty :
      break
  urlsResultQueue.put(urls)

def readClientIpQueue (clientIpQueue, clientIpsResultQueue, lock) :
  clientIps = {}
  while True:
    try:
      clientIp = clientIpQueue.get(True, 2)
      clientIps[clientIp] = clientIps.get(clientIp,1) + 1
    except Empty :
      break
  # 把最终结果放入队列,返回给主线程
  clientIpsResultQueue.put(clientIps)

def readUpstreamIpQueue (upstreamIpQueue, upstreamIpsResultQueue, lock) :
  upstreamIps = {}
  while True:
    try:
      upstreamIp = upstreamIpQueue.get(True, 2)
      upstreamIps[upstreamIp] = upstreamIps.get(upstreamIp,1) + 1
    except Empty :
      break
  # 把最终结果放入队列,返回给主线程
  upstreamIpsResultQueue.put(upstreamIps)

# 队列读取进程
readThreads = []
manager = Manager()
lock = manager.Lock()
urlsResultQueue = manager.Queue(1)
clientIpsResultQueue = manager.Queue(1)
upstreamIpsResultQueue = manager.Queue(1)

# 每个收集项一个队列
urlQueue = manager.Queue(500)
clientIpQueue = manager.Queue(500)
upstreamIpQueue = manager.Queue(500)
# 使用进程池,读取日志文件,每个文件一个进程
pool = Pool(24)
for f in logfiles :
  pool.apply_async(parse, (f, lock, urlQueue, clientIpQueue, upstreamIpQueue))

# 分别启动队列读取进程
urlP = Process(target=readUrlQueue, args=(urlQueue, urlsResultQueue, lock))
urlP.start()
readThreads.append(urlP)
clientIpP = Process(target=readClientIpQueue, args=(clientIpQueue, clientIpsResultQueue, lock))
clientIpP.start()
readThreads.append(clientIpP)
upstreamIpP = Process(target=readUpstreamIpQueue, args=(upstreamIpQueue, upstreamIpsResultQueue, lock))
upstreamIpP.start()
readThreads.append(upstreamIpP)
# 由于队列读取进程中是死循环(while True),并设置了读取超时时间是2s,所以等待读取进程结束即可确保所有数据被处理完
for t in readThreads : 
  t.join()

urls = urlsResultQueue.get(False, 1)
clientIps = clientIpsResultQueue.get(False, 1)
upstreamIps = upstreamIpsResultQueue.get(False, 1)

# 排序,根据访问量从高到底排序
finalUrls = sorted(urls.iteritems(), key=lambda d:d[1][0], reverse = True)
finalClientIps = sorted(clientIps.iteritems(), key=lambda d:d[1], reverse = True)
finalUpstreamIps = sorted(upstreamIps.iteritems(), key=lambda d:d[1], reverse = True)

# 打印最终的统计数据
print "upstreamIp","count"
for key,value in finalUpstreamIps:
  print key, value
print
print "count", "status", "reqTime", "upRespTime", "url"
for key,value in finalUrls :
  urlCount,urlReqTime, urlUpRespTime, status = value 
  print urlCount, round(urlReqTime / urlCount,2), round(urlUpRespTime / urlCount,2), str.join(',', map(lambda x: '{0}:{1}'.format(x,status[x]),status)), key
print 
print "clientIp","count"
for key,value in finalClientIps:
  if value > 10 :
    print key,value

统计结果如下

upstreamIp count
- 385
192.168.1.101:8080 26
192.168.1.102:8080 25
192.168.1.103:8080 21

count status reqTime upRespTime url
410 403:410,200:102 0.03 0.0 /api/v250/index.api
67 200:42 0.2 0.2 /api/wap1.1.0/login/wap.api

clientIp count
36.163.27.71 33
  • reqTime指总的接口耗时时间

  • upRespTime指从nginx向后端服务器建立连接,到后端服务器返回完数据,并关闭连接为止的时间

  • 第一段是通过统计每个应用服务器处理的请求数,了解应用服务器的压力分布情况

  • 第二段是统计每个url的请求次数,http code状态分布,掌握应用服务器的处理能力

  • 第三段是统计客户端ip的访问次数,由于ip较多,只打印请求次数大于10的ip,可以找到恶意请求的ip

共有 人打赏支持
粉丝 51
博文 45
码字总数 31887
×
mahengyang
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: