文档章节

python多进程处理大量日志

mahengyang
 mahengyang
发布于 2016/11/13 13:09
字数 1191
阅读 284
收藏 0

nginx日志格式如下

{"serverType":"nginx","meta":{"clientIp":"36.163.27.71","accessTime":"10/Nov/2016:23:01:32 +0800","accessTime_unix":"1478790092.299","request":"POST /api/v250/index.api?a=1&b=2 HTTP/1.1","status":403,"bodySizes":578,"referer":"-","userAgent":"qt-android/2.5.0","host":"www.youpiaole.com","xForwardedFor":"36.163.27.71, 223.245.176.31","reqTime":"0.000","upRespTime":"-","httpVersion":"2.5.0","devType":"t_android","devId":"353591913759","channel":"t_android","appVer":"250","iosVer":"-","userId":"-","userType":"-","devToken":"-","srcType":"client","cookie":"JSESSIONID=D2D7E71BBCA2461A48B4D5F7015A2B3A; route=8bf44254c3316f9c4f0c454d3","upstreamIp":"192.168.1.101","scheme":"http"}}

原来使用cut、sed提取所需字段

zcat n_log/www.youpiaole.com/20161027/access/* | cut -d , -f 2,5,6,13,14,26 | sed 's/"//g;s/meta:{//;s/clientIp://;s/,request:/,/;s/?.*status:/,/;s/reqTime://;s/,upRespTime:/,/;s/,upstreamIp:/,/;s/GET //;s/POST //' > /tmp/20161027.log

再使用awk做统计

awk -F"," '{a[$1]+=1} END {for(i in a) {if(a[i] > 10) print a[i],i}}' /tmp/20161027.log

但由于日志是json格式,使用cut和sed提取所需字段过于繁琐,也不便于进一步的分析,后面改为使用python做统计分析。

python的多线程由于存在全局锁,效率不高,使用多进程的方式处理大量的日志数据

# -*- coding: utf-8 -*-
import string,gzip,json,sys,os,re,threading
import multiprocessing
from multiprocessing import Process,Queue,Lock,Manager
from Queue import Empty
import multiprocessing.pool
import sys 
reload(sys) 
# 设置字符编码格式为utf-8
sys.setdefaultencoding('utf-8')

test = len(sys.argv)
# 字符串转为数字,float类型
def time2float(t) :
  if t == '-':
    tmp = 0
  else :
    tmp = float(t)      
  return tmp

logfiles = []

destDir = sys.argv[1]
listfile = os.listdir(destDir)
# 遍历日志目录,获取每一个日志文件全路径
for path in listfile:
  path = destDir + path
  logfiles.append(path)

def parseLine (line, lock, urlQueue, clientIpQueue, upstreamIpQueue) :
  line = re.sub(r'\\x','',line)
  meta = json.loads(line)["meta"]
  # 提取所需字段
  clientIp = meta["clientIp"]
  # 提取请求url,并去掉无关紧要的信息
  request = meta["request"].replace("HTTP/1.1","").replace("POST ","").replace("GET ","")
  # 去掉url后面跟的参数
  request = re.sub(r'\?.*$',"",request)
  status = meta["status"]
  reqTime = time2float(meta["reqTime"])
  upRespTime = time2float(meta["upRespTime"])
  upstreamIp = meta["upstreamIp"]
  # 分别放入队列中
  urlQueue.put((request, status, reqTime, upRespTime))
  clientIpQueue.put(clientIp)
  upstreamIpQueue.put(upstreamIp)
 
def parse (path, lock, urlQueue, clientIpQueue, upstreamIpQueue) :
  fileContent = gzip.open(path, 'rt')
  a = 1
  for line in fileContent:
    a = a + 1
    # 如果是测试用的,到达指定行数(命令行最后一个参数)就退出
    if test == 3:
      if a > int(sys.argv[2]):
        return 1 
    parseLine(line, lock, urlQueue, clientIpQueue, upstreamIpQueue)

def readUrlQueue (urlQueue, urlsResultQueue, lock) :
  urls = {}
  while True:
    try:
      request,status,reqTime,upRespTime = urlQueue.get(True, 2)
      # 存入字典中   
      urlCount,urlReqTime,urlUpRespTime,urlStatus = urls.get(request,(1,reqTime,upRespTime,{}))
      urlStatus[status] = urlStatus.get(status,1) + 1
      urls[request] = (urlCount + 1, urlReqTime + reqTime, urlUpRespTime + upRespTime, urlStatus)
    except Empty :
      break
  urlsResultQueue.put(urls)

def readClientIpQueue (clientIpQueue, clientIpsResultQueue, lock) :
  clientIps = {}
  while True:
    try:
      clientIp = clientIpQueue.get(True, 2)
      clientIps[clientIp] = clientIps.get(clientIp,1) + 1
    except Empty :
      break
  # 把最终结果放入队列,返回给主线程
  clientIpsResultQueue.put(clientIps)

def readUpstreamIpQueue (upstreamIpQueue, upstreamIpsResultQueue, lock) :
  upstreamIps = {}
  while True:
    try:
      upstreamIp = upstreamIpQueue.get(True, 2)
      upstreamIps[upstreamIp] = upstreamIps.get(upstreamIp,1) + 1
    except Empty :
      break
  # 把最终结果放入队列,返回给主线程
  upstreamIpsResultQueue.put(upstreamIps)

# 队列读取进程
readThreads = []
manager = Manager()
lock = manager.Lock()
urlsResultQueue = manager.Queue(1)
clientIpsResultQueue = manager.Queue(1)
upstreamIpsResultQueue = manager.Queue(1)

# 每个收集项一个队列
urlQueue = manager.Queue(500)
clientIpQueue = manager.Queue(500)
upstreamIpQueue = manager.Queue(500)
# 使用进程池,读取日志文件,每个文件一个进程
pool = Pool(24)
for f in logfiles :
  pool.apply_async(parse, (f, lock, urlQueue, clientIpQueue, upstreamIpQueue))

# 分别启动队列读取进程
urlP = Process(target=readUrlQueue, args=(urlQueue, urlsResultQueue, lock))
urlP.start()
readThreads.append(urlP)
clientIpP = Process(target=readClientIpQueue, args=(clientIpQueue, clientIpsResultQueue, lock))
clientIpP.start()
readThreads.append(clientIpP)
upstreamIpP = Process(target=readUpstreamIpQueue, args=(upstreamIpQueue, upstreamIpsResultQueue, lock))
upstreamIpP.start()
readThreads.append(upstreamIpP)
# 由于队列读取进程中是死循环(while True),并设置了读取超时时间是2s,所以等待读取进程结束即可确保所有数据被处理完
for t in readThreads : 
  t.join()

urls = urlsResultQueue.get(False, 1)
clientIps = clientIpsResultQueue.get(False, 1)
upstreamIps = upstreamIpsResultQueue.get(False, 1)

# 排序,根据访问量从高到底排序
finalUrls = sorted(urls.iteritems(), key=lambda d:d[1][0], reverse = True)
finalClientIps = sorted(clientIps.iteritems(), key=lambda d:d[1], reverse = True)
finalUpstreamIps = sorted(upstreamIps.iteritems(), key=lambda d:d[1], reverse = True)

# 打印最终的统计数据
print "upstreamIp","count"
for key,value in finalUpstreamIps:
  print key, value
print
print "count", "status", "reqTime", "upRespTime", "url"
for key,value in finalUrls :
  urlCount,urlReqTime, urlUpRespTime, status = value 
  print urlCount, round(urlReqTime / urlCount,2), round(urlUpRespTime / urlCount,2), str.join(',', map(lambda x: '{0}:{1}'.format(x,status[x]),status)), key
print 
print "clientIp","count"
for key,value in finalClientIps:
  if value > 10 :
    print key,value

统计结果如下

upstreamIp count
- 385
192.168.1.101:8080 26
192.168.1.102:8080 25
192.168.1.103:8080 21

count status reqTime upRespTime url
410 403:410,200:102 0.03 0.0 /api/v250/index.api
67 200:42 0.2 0.2 /api/wap1.1.0/login/wap.api

clientIp count
36.163.27.71 33
  • reqTime指总的接口耗时时间

  • upRespTime指从nginx向后端服务器建立连接,到后端服务器返回完数据,并关闭连接为止的时间

  • 第一段是通过统计每个应用服务器处理的请求数,了解应用服务器的压力分布情况

  • 第二段是统计每个url的请求次数,http code状态分布,掌握应用服务器的处理能力

  • 第三段是统计客户端ip的访问次数,由于ip较多,只打印请求次数大于10的ip,可以找到恶意请求的ip

© 著作权归作者所有

共有 人打赏支持
mahengyang
粉丝 56
博文 46
码字总数 32090
作品 0
苏州
程序员
私信 提问
一次批量重启引发的 Neutron 大面积网络故障

现场回顾 故事发生于某个下午,采用 salt 更新某集群的 neutron.conf (log 相关配置项) 并批量重启 neutron-openvswitch-agent(以下简称 neutron-ovs-agent),不久便有人反馈云主机宕机。 立...

koala bear
2015/10/10
0
0
Python 2.6 亮点:multiprocessing模块

本来以为Python 2.6只是Python 3.0的过渡版本,不会有太多的新功能。但看到这个2.6的重大改动列表,才发现自己挺落后的。在2.6中新增的multiprocessing模块也绝对是Python 2.6的杀手级应用(...

索隆
2012/05/02
0
0
有轻功:用3行代码让Python数据处理脚本获得4倍提速

Python是一门非常适合处理数据和自动化完成重复性工作的编程语言,我们在用数据训练机器学习模型之前,通常都需要对数据进行预处理,而Python就非常适合完成这项工作,比如需要重新调整几十万...

爱喵的程序员
07/26
0
0
python--多进程的用法详解实例

想让python实现多进程(multiprocessing),我们要先区分不同的操作系统的不同之处。 Linux操作系统下提供了一个fork()系统调用,普通函数调用一次返回一次,fork()调用一次返回两次,因为操作...

山有木兮有木兮
05/14
0
0
Python 并发模型

Python及线程 vs 微线程(微进程) vs Greenlets 最近,我有注意到论坛上有很多询问线程,微线程及绿色线程并发模型之间的具体区别的问题。问题诸如 它们的实现有什么不同? microthreads/g...

oschina
2013/05/30
2.3K
0

没有更多内容

加载失败,请刷新页面

加载更多

eslint rules 规则

'rules': { "comma-dangle": ["error", "never"], //是否允许对象中出现结尾逗号 "no-cond-assign": 2, //条件语句的条件中不允许出现赋值运算符 "no-console": 2, //不允许出现console语句 ...

agenyun
16分钟前
1
0
类型判断时instanceof和equals的不同用法

接口设计时为了避免序列化的麻烦,将接口定义为参数为map<String,String>类型的接口,但是现在调用时需要转换当前的实体Bean为Map,接口接收方再把Map转换为另一个Bean实体。过程中的需要对类...

wangtx
22分钟前
1
0
vue 组件间传值(个人精编)

1.父组件向子组件传值 1⃣️.子组件标签绑定需要传递的参数名2⃣️.子组件页面使用props 接收参数 2.子组件向父组件传值  1⃣️.子组件使用$emit来触发一个自定义事件,并传递一个参...

MrBoyce
32分钟前
1
0
(荷兰)彼得·冯·门施著:博物馆学研究的目的

博物馆学研究的目的 (荷)彼得·冯·门施 尽管诸多关于博物馆学认知目的的不同看法可以被归纳为数个主要群体,但没有一个群体可以被称为“学派”。一般来说,学派是由于博物馆学研究目的的不...

乔老哥
42分钟前
2
0
Vue slot的用法

之前看官方文档,由于自己理解的偏差,不知道slot是干嘛的,看到小标题,使用Slot分发内容,就以为 是要往下派发内容。然后就没有理解插槽的概念。其实说白了,使用slot就是先圈一块地,将来...

peakedness丶
54分钟前
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部