文档章节

python多进程处理大量日志

mahengyang
 mahengyang
发布于 2016/11/13 13:09
字数 1191
阅读 154
收藏 0
点赞 0
评论 0

nginx日志格式如下

{"serverType":"nginx","meta":{"clientIp":"36.163.27.71","accessTime":"10/Nov/2016:23:01:32 +0800","accessTime_unix":"1478790092.299","request":"POST /api/v250/index.api?a=1&b=2 HTTP/1.1","status":403,"bodySizes":578,"referer":"-","userAgent":"qt-android/2.5.0","host":"www.youpiaole.com","xForwardedFor":"36.163.27.71, 223.245.176.31","reqTime":"0.000","upRespTime":"-","httpVersion":"2.5.0","devType":"t_android","devId":"353591913759","channel":"t_android","appVer":"250","iosVer":"-","userId":"-","userType":"-","devToken":"-","srcType":"client","cookie":"JSESSIONID=D2D7E71BBCA2461A48B4D5F7015A2B3A; route=8bf44254c3316f9c4f0c454d3","upstreamIp":"192.168.1.101","scheme":"http"}}

原来使用cut、sed提取所需字段

zcat n_log/www.youpiaole.com/20161027/access/* | cut -d , -f 2,5,6,13,14,26 | sed 's/"//g;s/meta:{//;s/clientIp://;s/,request:/,/;s/?.*status:/,/;s/reqTime://;s/,upRespTime:/,/;s/,upstreamIp:/,/;s/GET //;s/POST //' > /tmp/20161027.log

再使用awk做统计

awk -F"," '{a[$1]+=1} END {for(i in a) {if(a[i] > 10) print a[i],i}}' /tmp/20161027.log

但由于日志是json格式,使用cut和sed提取所需字段过于繁琐,也不便于进一步的分析,后面改为使用python做统计分析。

python的多线程由于存在全局锁,效率不高,使用多进程的方式处理大量的日志数据

# -*- coding: utf-8 -*-
import string,gzip,json,sys,os,re,threading
import multiprocessing
from multiprocessing import Process,Queue,Lock,Manager
from Queue import Empty
import multiprocessing.pool
import sys 
reload(sys) 
# 设置字符编码格式为utf-8
sys.setdefaultencoding('utf-8')

test = len(sys.argv)
# 字符串转为数字,float类型
def time2float(t) :
  if t == '-':
    tmp = 0
  else :
    tmp = float(t)      
  return tmp

logfiles = []

destDir = sys.argv[1]
listfile = os.listdir(destDir)
# 遍历日志目录,获取每一个日志文件全路径
for path in listfile:
  path = destDir + path
  logfiles.append(path)

def parseLine (line, lock, urlQueue, clientIpQueue, upstreamIpQueue) :
  line = re.sub(r'\\x','',line)
  meta = json.loads(line)["meta"]
  # 提取所需字段
  clientIp = meta["clientIp"]
  # 提取请求url,并去掉无关紧要的信息
  request = meta["request"].replace("HTTP/1.1","").replace("POST ","").replace("GET ","")
  # 去掉url后面跟的参数
  request = re.sub(r'\?.*$',"",request)
  status = meta["status"]
  reqTime = time2float(meta["reqTime"])
  upRespTime = time2float(meta["upRespTime"])
  upstreamIp = meta["upstreamIp"]
  # 分别放入队列中
  urlQueue.put((request, status, reqTime, upRespTime))
  clientIpQueue.put(clientIp)
  upstreamIpQueue.put(upstreamIp)
 
def parse (path, lock, urlQueue, clientIpQueue, upstreamIpQueue) :
  fileContent = gzip.open(path, 'rt')
  a = 1
  for line in fileContent:
    a = a + 1
    # 如果是测试用的,到达指定行数(命令行最后一个参数)就退出
    if test == 3:
      if a > int(sys.argv[2]):
        return 1 
    parseLine(line, lock, urlQueue, clientIpQueue, upstreamIpQueue)

def readUrlQueue (urlQueue, urlsResultQueue, lock) :
  urls = {}
  while True:
    try:
      request,status,reqTime,upRespTime = urlQueue.get(True, 2)
      # 存入字典中   
      urlCount,urlReqTime,urlUpRespTime,urlStatus = urls.get(request,(1,reqTime,upRespTime,{}))
      urlStatus[status] = urlStatus.get(status,1) + 1
      urls[request] = (urlCount + 1, urlReqTime + reqTime, urlUpRespTime + upRespTime, urlStatus)
    except Empty :
      break
  urlsResultQueue.put(urls)

def readClientIpQueue (clientIpQueue, clientIpsResultQueue, lock) :
  clientIps = {}
  while True:
    try:
      clientIp = clientIpQueue.get(True, 2)
      clientIps[clientIp] = clientIps.get(clientIp,1) + 1
    except Empty :
      break
  # 把最终结果放入队列,返回给主线程
  clientIpsResultQueue.put(clientIps)

def readUpstreamIpQueue (upstreamIpQueue, upstreamIpsResultQueue, lock) :
  upstreamIps = {}
  while True:
    try:
      upstreamIp = upstreamIpQueue.get(True, 2)
      upstreamIps[upstreamIp] = upstreamIps.get(upstreamIp,1) + 1
    except Empty :
      break
  # 把最终结果放入队列,返回给主线程
  upstreamIpsResultQueue.put(upstreamIps)

# 队列读取进程
readThreads = []
manager = Manager()
lock = manager.Lock()
urlsResultQueue = manager.Queue(1)
clientIpsResultQueue = manager.Queue(1)
upstreamIpsResultQueue = manager.Queue(1)

# 每个收集项一个队列
urlQueue = manager.Queue(500)
clientIpQueue = manager.Queue(500)
upstreamIpQueue = manager.Queue(500)
# 使用进程池,读取日志文件,每个文件一个进程
pool = Pool(24)
for f in logfiles :
  pool.apply_async(parse, (f, lock, urlQueue, clientIpQueue, upstreamIpQueue))

# 分别启动队列读取进程
urlP = Process(target=readUrlQueue, args=(urlQueue, urlsResultQueue, lock))
urlP.start()
readThreads.append(urlP)
clientIpP = Process(target=readClientIpQueue, args=(clientIpQueue, clientIpsResultQueue, lock))
clientIpP.start()
readThreads.append(clientIpP)
upstreamIpP = Process(target=readUpstreamIpQueue, args=(upstreamIpQueue, upstreamIpsResultQueue, lock))
upstreamIpP.start()
readThreads.append(upstreamIpP)
# 由于队列读取进程中是死循环(while True),并设置了读取超时时间是2s,所以等待读取进程结束即可确保所有数据被处理完
for t in readThreads : 
  t.join()

urls = urlsResultQueue.get(False, 1)
clientIps = clientIpsResultQueue.get(False, 1)
upstreamIps = upstreamIpsResultQueue.get(False, 1)

# 排序,根据访问量从高到底排序
finalUrls = sorted(urls.iteritems(), key=lambda d:d[1][0], reverse = True)
finalClientIps = sorted(clientIps.iteritems(), key=lambda d:d[1], reverse = True)
finalUpstreamIps = sorted(upstreamIps.iteritems(), key=lambda d:d[1], reverse = True)

# 打印最终的统计数据
print "upstreamIp","count"
for key,value in finalUpstreamIps:
  print key, value
print
print "count", "status", "reqTime", "upRespTime", "url"
for key,value in finalUrls :
  urlCount,urlReqTime, urlUpRespTime, status = value 
  print urlCount, round(urlReqTime / urlCount,2), round(urlUpRespTime / urlCount,2), str.join(',', map(lambda x: '{0}:{1}'.format(x,status[x]),status)), key
print 
print "clientIp","count"
for key,value in finalClientIps:
  if value > 10 :
    print key,value

统计结果如下

upstreamIp count
- 385
192.168.1.101:8080 26
192.168.1.102:8080 25
192.168.1.103:8080 21

count status reqTime upRespTime url
410 403:410,200:102 0.03 0.0 /api/v250/index.api
67 200:42 0.2 0.2 /api/wap1.1.0/login/wap.api

clientIp count
36.163.27.71 33
  • reqTime指总的接口耗时时间

  • upRespTime指从nginx向后端服务器建立连接,到后端服务器返回完数据,并关闭连接为止的时间

  • 第一段是通过统计每个应用服务器处理的请求数,了解应用服务器的压力分布情况

  • 第二段是统计每个url的请求次数,http code状态分布,掌握应用服务器的处理能力

  • 第三段是统计客户端ip的访问次数,由于ip较多,只打印请求次数大于10的ip,可以找到恶意请求的ip

© 著作权归作者所有

共有 人打赏支持
mahengyang
粉丝 52
博文 46
码字总数 32090
作品 0
苏州
程序员
一次批量重启引发的 Neutron 大面积网络故障

现场回顾 故事发生于某个下午,采用 salt 更新某集群的 neutron.conf (log 相关配置项) 并批量重启 neutron-openvswitch-agent(以下简称 neutron-ovs-agent),不久便有人反馈云主机宕机。 立...

koala bear ⋅ 2015/10/10 ⋅ 0

python--多进程的用法详解实例

想让python实现多进程(multiprocessing),我们要先区分不同的操作系统的不同之处。 Linux操作系统下提供了一个fork()系统调用,普通函数调用一次返回一次,fork()调用一次返回两次,因为操作...

山有木兮有木兮 ⋅ 05/14 ⋅ 0

python学习笔记 | Python中的线程与进程简介

近日,我开始对代码的各个部分进行计时,以了解我是否可以加快速度。 令我惊讶的是,我发现数据增强是最大的瓶颈。我使用的方法:旋转,翻转,缩放。依靠Numpy并在CPU上运行。Numpy在某些情况...

跨界的聚能 ⋅ 05/25 ⋅ 0

教程 | 如何使用Docker、TensorFlow目标检测API和OpenCV实现实时目标检测和视频处理

  选自TowardsDataScience   作者:Léo Beaucourt   机器之心编译   参与:李诗萌、路雪      本文展示了如何使用 Docker 容器中的 TensorFlow 目标检测 API,通过网络摄像头执...

机器之心 ⋅ 04/20 ⋅ 0

王老板Python面试(10):17道python笔试面试真题

1、一行代码实现1--100之和 利用sum()函数求和 2、如何在一个函数内部修改全局变量 利用global 修改全局变量 3、列出5个python标准库 os:提供了不少与操作系统相关联的函数 sys: 通常用于命...

程序员八阿哥 ⋅ 05/22 ⋅ 0

团队拙作《Python机器学习实战》

之前看国内外的 Python 机器学习的书,鲜有将机器学习到底怎么做人脸识别、怎么做风险控制、怎么做 OCR 算法模型列出的,并且真正的一个 Python 应用,不止是从机器学习库中导入一下配置一下...

yijun2018 ⋅ 04/20 ⋅ 0

Python:经过了十几年,你们还没有消除的对我的误解吗?

摘要: 大学毕业到现在用的最多的编程语言还是C,C++,后来学习了一下Python,觉得Python是门学了不后悔的语言。尤其适合非程序员学习,作为青少年学习计算机的首门语言也不错,大学生学习计...

雁横 ⋅ 05/03 ⋅ 0

5本必读Python入门书籍,你都看过吗?(附福利)

今天技术学派为大家准备了5本Python入门书籍,除了书籍小编还整理了3个常用的资源网站分享给大家。 1.Python基础教程 《Python基础教程》是经典的Python入门教程书籍,本书层次鲜明,结构严谨...

Python燕大侠 ⋅ 06/07 ⋅ 0

使用Logging Handler自动上传Python程序日志到日志服务

想要日志上云,又不想修改程序代码? 或者不希望进行相对复杂的客户端部署?那么您需要使用Logging Handler,现在Python程序也支持了! 概述 使用Python SDK提供的Log Handler可以实现每一条P...

成喆 ⋅ 04/13 ⋅ 0

Python怎么利用多核cpu

原文链接http://www.cnblogs.com/stubborn412/p/4033651.html def dead_loop(): def dead_loop(): {void DeadLoop() { while (true); } } from threading import Thread lib = cdll.LoadLibr......

dby_freedom ⋅ 05/06 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Redis 单线程 为何却需要事务处理并发问题

Redis是单线程处理,也就是命令会顺序执行。那么为什么会存在并发问题呢? 个人理解是,虽然redis是单线程,但是可以同时有多个客户端访问,每个客户端会有 一个线程。客户端访问之间存在竞争...

码代码的小司机 ⋅ 35分钟前 ⋅ 0

到底会改名吗?微软GVFS 改名之争

微软去年透露了 Git Virtual File System(GVFS)项目,GVFS 是 Git 版本控制系统的一个开源插件,允许 Git 处理 TB 规模的代码库,比如 270 GB 的 Windows 代码库。该项目公布之初就引发了争...

linux-tao ⋅ 45分钟前 ⋅ 0

笔试题之Java基础部分【简】【二】

1.静态变量和实例变量的区别 在语法定义上的区别:静态变量前要加static关键字,而实例变量前则不加。在程序运行时的区别:实例变量属于某个对象的属性,必须创建了实例对象,其中的实例变...

anlve ⋅ 今天 ⋅ 0

Lombok简单介绍及使用

官网 通过简单注解来精简代码达到消除冗长代码的目的 优点 提高编程效率 使代码更简洁 消除冗长代码 避免修改字段名字时忘记修改方法名 4.idea中安装lombnok pom.xml引入 <dependency> <grou...

to_ln ⋅ 今天 ⋅ 0

【转】JS浮点数运算Bug的解决办法

37.5*5.5=206.08 (JS算出来是这样的一个结果,我四舍五入取两位小数) 我先怀疑是四舍五入的问题,就直接用JS算了一个结果为:206.08499999999998 怎么会这样,两个只有一位小数的数字相乘,怎...

NickSoki ⋅ 今天 ⋅ 0

table eg

user_id user_name full_name 1 zhangsan 张三 2 lisi 李四 `` ™ [========] 2018-06-18 09:42:06 星期一½ gdsgagagagdsgasgagadsgdasgagsa...

qwfys ⋅ 今天 ⋅ 0

一个有趣的Java问题

先来看看源码: public class TestDemo { public static void main(String[] args) { Integer a = 10; Integer b = 20; swap(a, b); System.out......

linxyz ⋅ 今天 ⋅ 0

十五周二次课

十五周二次课 17.1mysql主从介绍 17.2准备工作 17.3配置主 17.4配置从 17.5测试主从同步 17.1mysql主从介绍 MySQL主从介绍 MySQL主从又叫做Replication、AB复制。简单讲就是A和B两台机器做主...

河图再现 ⋅ 今天 ⋅ 0

docker安装snmp rrdtool环境

以Ubuntu16:04作为基础版本 docker pull ubuntu:16.04 启动一个容器 docker run -d -i -t --name flow_mete ubuntu:16.04 bash 进入容器 docker exec -it flow_mete bash cd ~ 安装基本软件 ......

messud4312 ⋅ 今天 ⋅ 0

OSChina 周一乱弹 —— 快别开心了,你还没有女友呢。

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @莱布妮子 :分享吴彤的单曲《好春光》 《好春光》- 吴彤 手机党少年们想听歌,请使劲儿戳(这里) @clouddyy :小萝莉街上乱跑,误把我认错成...

小小编辑 ⋅ 今天 ⋅ 9

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部