文档章节

Python解析Pcap包类源码学习

o
 osc_n6euf5h6
发布于 2019/03/19 23:19
字数 4885
阅读 125
收藏 0

精选30+云产品,助力企业轻松上云!>>>

0x1、前言

​ 在现场取证遇到分析流量包的情况会比较少,虽然流量类设备原理是把数据都抓出来进行解析,很大一定程度上已经把人可以做的事情交给了机器自动完成。

​ 可用于PCAP包分析的软件比如科来,Wireshark都是很好用的分析软件,找Pcap解析的编程类代码时发现已经有很多大佬写过Python脚本辅助解析Pcap,也有提取将Pcap信息以界面形式展示出来框架。

​ 本文对利用Python里的Scapy库提取协议五元组信息进行学习性总结,没有用于实战,因为实践过程中发现PCAP读包解包查包速度太慢了。

0x2、参考库

Python解析pcap包的常见库有Scapy、dpkt、Pyshark等。可以参考的源码,工具如下

https://github.com/thepacketgeek/cloud-pcap
https://github.com/madpowah/ForensicPCAP
https://github.com/le4f/pcap-analyzer
https://github.com/HatBoy/Pcap-Analyzer
https://github.com/caesar0301/awesome-pcaptools
https://asecuritysite.com/forensics/pcap
https://github.com/DanMcInerney/net-creds

提供解析Pcap包服务的网站https://packettotal.com/、https://www.capanalysis.net/ca/、https://asecuritysite.com/forensics/pcap?infile=smtp.pcap&infile=smtp.pcap、https://app.any.run

0x3、邮件协议

提取发件人的邮箱,就要熟悉SMTP的几个端口进行识别。

25  端口为SMTP(Simple Mail Transfer Protocol,简单邮件传输协议)服务所开放的,是用于发送邮件。
110端口是为POP3(邮件协议3)服务开放的,POP2、POP3都是主要用于接收邮件的,目前POP3使用的比较多,许多服务器都同时支持POP2和POP3。
143端口主要是用于“Internet Message AccessProtocol”v2(Internet消息访问协议,简称IMAP),和POP3一样,是用于电子邮件的接收的协议。
465 端口是SSL/TLS通讯协议的 内容一开始就被保护起来了 是看不到原文的。
465 端口(SMTPS):465端口是为SMTPS(SMTP-over-SSL)协议服务开放的,这是SMTP协议基于SSL安全协议之上的一种变种协议,它继承了SSL安全协议的非对称加密的高度安全可靠性,可防止邮件泄露。
587 端口是STARTTLS协议的 属于TLS通讯协议 只是他是在STARTTLS命令执行后才对之后的原文进行保护的。

SMTP常用命令语法

在Wireshark中SMTP协议数据是在建立TCP三次握手后会出现的,常用的命令如下。

SMTP命令不区分大小写,但参数区分大小写,有关这方面的详细说明请参考RFC821。
HELO <domain> <CRLF>。向服务器标识用户身份发送者能欺骗,说谎,但一般情况下服务器都能检测到。
MAIL FROM: <reverse-path> <CRLF>。<reverse-path>为发送者地址,此命令用来初始化邮件传输,即用来对所有的状态和缓冲区进行初始化。
RCPT TO:<forward-path> <CRLF>。 <forward-path>用来标志邮件接收者的地址,常用在MAIL FROM后,可以有多个RCPT TO。
DATA <CRLF>。将之后的数据作为数据发送,以<CRLF>.<CRLF>标志数据的结尾。
REST <CRLF>。重置会话,当前传输被取消。
NOOP <CRLF>。要求服务器返回OK应答,一般用作测试。
QUIT <CRLF>。结束会话。
VRFY <string> <CRLF>。验证指定的邮箱是否存在,由于安全方面的原因,服务器大多禁止此命令。
EXPN <string> <CRLF>。验证给定的邮箱列表是否存在,由于安全方面的原因,服务器大多禁止此命令。
HELP <CRLF>。查询服务器支持什么命令。

代码

forensicPCAP 可以根据解析PCAP包,然后利用CMD模块循环交互界面,输入命令调出对应函数。核心原理为

基于Scapy找到源端口或目的端口为110、143端口的数据记录保存作为邮件数据。

关键代码代码如下:

self.pcap是构造函数self.pcap = rdpcap(namefile)提前读取,然后enumerate()遍历。指定目标端口和源端口是110、143的数据筛选出来。

	def do_mail(self, arg, opts=None):
		"""Print the number of mail's requests and store its
Usage :
- mail"""
		sys.stdout.write(bcolors.TXT + "## Searching mail's request ... ")
		sys.stdout.flush()
		con = []
		mailpkts = []
		for i,packet in enumerate(self.pcap):
		    # TCP的包
			if TCP in packet:
			    # 获取源端口或目的端口为110、143端口的数据记录
				if packet.getlayer('TCP').dport == 110 or packet.getlayer('TCP').sport == 110 or packet.getlayer('TCP').dport == 143 or packet.getlayer('TCP').sport == 143 :
					if packet.getlayer('TCP').flags == 2:
						con.append(i)
					mailpkts.append(packet)
		sys.stdout.write("OK.\n")
		print "## Result : Mail's request : " + str(len(con))  
		sys.stdout.write(bcolors.TXT + "## Saving mails ... ")
		sys.stdout.flush()
		res = ""
		for packet in mailpkts:
				if packet.getlayer('TCP').flags == 24:
					res = res + packet.getlayer('Raw').load
					sys.stdout.write(".")
					sys.stdout.flush()
		sys.stdout.write("OK\n")
		sys.stdout.flush()			
		self.cmd = "mail"			
		self.last = res

0x4、DNS解析

forensicPCAP 解析DNS部分代码。bcolors.TXT是设定了一个字体显示的颜色,res = packet.getlayer('DNS').qd.qname是获取DNS域名解析记录。

############# do_dns() ###########
    def do_dns(self, arg, opts=None):
        """Print all DNS requests in the PCAP file
Usage :
- dns"""
        sys.stdout.write(bcolors.TXT + "## Listing all DNS requests ...")
        sys.stdout.flush()
        dns = []
        dns.append([])
        
        # 枚举PCAP包的数据
        for i,packet in enumerate(self.pcap):
            if DNS in packet:
                # 获取DNS域名解析记录
                res = packet.getlayer('DNS').qd.qname
                if res[len(res) - 1] == '.':
                    res = res[:-1]
                # 保存域名
                dns.append([i, res])

        sys.stdout.write("OK.\n")
        # 统计DNS数目
        print bcolors.TXT + "## Result : " + str(len(dns) - 1) + " DNS request(s)" + bcolors.ENDC
        self.last = dns
        self.cmd = "dns"

pcap-analyzer核心的代码是借鉴了forensicPCAP,获取解析为DNS的请求记录显示的时候会统计出次数最多的IP的前十名。

代码

def get_dns(file):
    dns = []
    # 打开PCAP文件
    pcap = rdpcap(UPLOAD_FOLDER+file)
    for packet in pcap:
        if DNS in packet:
                # 核心代码
                res = packet.getlayer('DNS').qd.qname
                if res[len(res) - 1] == '.':
                    res = res[:-1]
                dns.append(res)
    # 统计DNS协议,出现次数最多的IP前十名            
    dns = Counter(dns).most_common(10)

0x5、密码信息提取

net-creds 以Scapy为基础,解析PCAP中含有密码信息的一个脚本。

代码

主要代码由other_parser()函数实现,分割每个包中的HTTP等内容,然后搜索身份验证相关的关键字筛选出账户、密码。

def other_parser(src_ip_port, dst_ip_port, full_load, ack, seq, pkt, verbose):
    '''
    Pull out pertinent info from the parsed HTTP packet data
    '''
    user_passwd = None
    http_url_req = None
    method = None
    http_methods = ['GET ', 'POST ', 'CONNECT ', 'TRACE ', 'TRACK ', 'PUT ', 'DELETE ', 'HEAD ']
    http_line, header_lines, body = parse_http_load(full_load, http_methods)
    headers = headers_to_dict(header_lines)
    if 'host' in headers:
        host = headers['host']
    else:
        host = ''

    if http_line != None:
        method, path = parse_http_line(http_line, http_methods)
        http_url_req = get_http_url(method, host, path, headers)
        if http_url_req != None:
            if verbose == False:
                if len(http_url_req) > 98:
                    http_url_req = http_url_req[:99] + '...'
            printer(src_ip_port, None, http_url_req)

    # Print search terms
    searched = get_http_searches(http_url_req, body, host)
    if searched:
        printer(src_ip_port, dst_ip_port, searched)

    # Print user/pwds
    if body != '':
        user_passwd = get_login_pass(body)
        if user_passwd != None:
            try:
                http_user = user_passwd[0].decode('utf8')
                http_pass = user_passwd[1].decode('utf8')
                # Set a limit on how long they can be prevent false+
                if len(http_user) > 75 or len(http_pass) > 75:
                    return
                user_msg = 'HTTP username: %s' % http_user
                printer(src_ip_port, dst_ip_port, user_msg)
                pass_msg = 'HTTP password: %s' % http_pass
                printer(src_ip_port, dst_ip_port, pass_msg)
            except UnicodeDecodeError:
                pass

    # Print POST loads
    # ocsp is a common SSL post load that's never interesting
    if method == 'POST' and 'ocsp.' not in host:
        try:
            if verbose == False and len(body) > 99:
                # If it can't decode to utf8 we're probably not interested in it
                msg = 'POST load: %s...' % body[:99].encode('utf8')
            else:
                msg = 'POST load: %s' % body.encode('utf8')
            printer(src_ip_port, None, msg)
        except UnicodeDecodeError:
            pass

    # Kerberos over TCP
    decoded = Decode_Ip_Packet(str(pkt)[14:])
    kerb_hash = ParseMSKerbv5TCP(decoded['data'][20:])
    if kerb_hash:
        printer(src_ip_port, dst_ip_port, kerb_hash)

    # Non-NETNTLM NTLM hashes (MSSQL, DCE-RPC,SMBv1/2,LDAP, MSSQL)
    NTLMSSP2 = re.search(NTLMSSP2_re, full_load, re.DOTALL)
    NTLMSSP3 = re.search(NTLMSSP3_re, full_load, re.DOTALL)
    if NTLMSSP2:
        parse_ntlm_chal(NTLMSSP2.group(), ack)
    if NTLMSSP3:
        ntlm_resp_found = parse_ntlm_resp(NTLMSSP3.group(), seq)
        if ntlm_resp_found != None:
            printer(src_ip_port, dst_ip_port, ntlm_resp_found)

    # Look for authentication headers
    if len(headers) == 0:
        authenticate_header = None
        authorization_header = None
    for header in headers:
        authenticate_header = re.match(authenticate_re, header)
        authorization_header = re.match(authorization_re, header)
        if authenticate_header or authorization_header:
            break

    if authorization_header or authenticate_header:
        # NETNTLM
        netntlm_found = parse_netntlm(authenticate_header, authorization_header, headers, ack, seq)
        if netntlm_found != None:
            printer(src_ip_port, dst_ip_port, netntlm_found)

        # Basic Auth
        parse_basic_auth(src_ip_port, dst_ip_port, headers, authorization_header)

关键字列表:

# Regexs
authenticate_re = '(www-|proxy-)?authenticate'
authorization_re = '(www-|proxy-)?authorization'
ftp_user_re = r'USER (.+)\r\n'
ftp_pw_re = r'PASS (.+)\r\n'
irc_user_re = r'NICK (.+?)((\r)?\n|\s)'
irc_pw_re = r'NS IDENTIFY (.+)'
irc_pw_re2 = 'nickserv :identify (.+)'
mail_auth_re = '(\d+ )?(auth|authenticate) (login|plain)'
mail_auth_re1 =  '(\d+ )?login '
NTLMSSP2_re = 'NTLMSSP\x00\x02\x00\x00\x00.+'
NTLMSSP3_re = 'NTLMSSP\x00\x03\x00\x00\x00.+'
# Prone to false+ but prefer that to false-
http_search_re = '((search|query|&q|\?q|search\?p|searchterm|keywords|keyword|command|terms|keys|question|kwd|searchPhrase)=([^&][^&]*))'

0x6、提取数据需要关注的元素

  • 源IP、目的IP、源端口、目的端口、协议、数据包大小
关联出受害者,攻击者控制的跳板
  • 协议:HTTP、FTP、邮件协议
- 先查看HTTP、HTTPS类,然后查看数据包长度。
- 提取相关IP信息。
- 判断是否是木马远控、密码账户、可疑IP
  • 账户密码字段
- 渗透测试,嗅探回来的数据包分析含有账户密码信息
- 暴力破解IP

0x7、测试dpkt解析pcap

本想借助dpkt解析mail、dns、http来辅助分析pcap包进行分析,查阅资料学习却发现并不如使用scapy那么方便。

dpkt是一个python模块,可以对简单的数据包创建/解析,以及基本TCP / IP协议的解析,速度很快。

dpkt 手册

https://dpkt.readthedocs.io/en/latest/ dpkt 下载

https://pypi.org/project/dpkt/

看官方手册发现DPKT是读取每个pcap包里的内容,用isinstance判断是不是有IP的包,再判断是属于哪个协议,对应的协议已经封装好API如果发现可以匹配某个协议API就输出来相关值。

想要扩展这个源码还需要去学习一下协议相关的字段含义。

API调用:

https://dpkt.readthedocs.io/en/latest/api/api_auto.html#module-dpkt.qq

在手册中找到了在Github中部分API的示例代码,具备参考价值。

https://github.com/jeffsilverm/dpkt_doc

手册例子

以下代码是手册中的例子,通过查询发现inet_pton无法直接使用,按照网络上的解决方法修改了一下。

打印数据包

使用DPKT读取pcap文件并打印出数据包的内容。打印出以太网帧和IP数据包中的字段。

python2测试代码:

#!/usr/bin/env python
"""
Use DPKT to read in a pcap file and print out the contents of the packets
This example is focused on the fields in the Ethernet Frame and IP packet
"""
import dpkt
import datetime
import socket
from dpkt.compat import compat_ord
import ctypes
import os


def mac_addr(address):
    """Convert a MAC address to a readable/printable string

       Args:
           address (str): a MAC address in hex form (e.g. '\x01\x02\x03\x04\x05\x06')
       Returns:
           str: Printable/readable MAC address
    """
    return ':'.join('%02x' % compat_ord(b) for b in address)


class sockaddr(ctypes.Structure):
    _fields_ = [("sa_family", ctypes.c_short),
                ("__pad1", ctypes.c_ushort),
                ("ipv4_addr", ctypes.c_byte * 4),
                ("ipv6_addr", ctypes.c_byte * 16),
                ("__pad2", ctypes.c_ulong)]

if hasattr(ctypes, 'windll'):
    WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA
    WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA
else:
    def not_windows():
        raise SystemError(
            "Invalid platform. ctypes.windll must be available."
        )
    WSAStringToAddressA = not_windows
    WSAAddressToStringA = not_windows


def inet_pton(address_family, ip_string):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = ctypes.c_int(ctypes.sizeof(addr))

    if WSAStringToAddressA(
            ip_string,
            address_family,
            None,
            ctypes.byref(addr),
            ctypes.byref(addr_size)
    ) != 0:
        raise socket.error(ctypes.FormatError())

    if address_family == socket.AF_INET:
        return ctypes.string_at(addr.ipv4_addr, 4)
    if address_family == socket.AF_INET6:
        return ctypes.string_at(addr.ipv6_addr, 16)

    raise socket.error('unknown address family')


def inet_ntop(address_family, packed_ip):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = ctypes.c_int(ctypes.sizeof(addr))
    ip_string = ctypes.create_string_buffer(128)
    ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string))

    if address_family == socket.AF_INET:
        if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr):
            raise socket.error('packed IP wrong length for inet_ntoa')
        ctypes.memmove(addr.ipv4_addr, packed_ip, 4)
    elif address_family == socket.AF_INET6:
        if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr):
            raise socket.error('packed IP wrong length for inet_ntoa')
        ctypes.memmove(addr.ipv6_addr, packed_ip, 16)
    else:
        raise socket.error('unknown address family')

    if WSAAddressToStringA(
            ctypes.byref(addr),
            addr_size,
            None,
            ip_string,
            ctypes.byref(ip_string_size)
    ) != 0:
        raise socket.error(ctypes.FormatError())

    return ip_string[:ip_string_size.value - 1]

# Adding our two functions to the socket library
if os.name == 'nt':
    socket.inet_pton = inet_pton
    socket.inet_ntop = inet_ntop

def inet_to_str(inet):
    return socket.inet_ntop(socket.AF_INET, inet)

def print_packets(pcap):
    """Print out information about each packet in a pcap

       Args:
           pcap: dpkt pcap reader object (dpkt.pcap.Reader)
    """
    # packet num count
    r_num = 0
    # For each packet in the pcap process the contents
    for timestamp, buf in pcap:
        r_num=r_num+1
        print ('packet num count :' , r_num )
        # Print out the timestamp in UTC
        print('Timestamp: ', str(datetime.datetime.utcfromtimestamp(timestamp)))

        # Unpack the Ethernet frame (mac src/dst, ethertype)
        eth = dpkt.ethernet.Ethernet(buf)
        print('Ethernet Frame: ', mac_addr(eth.src), mac_addr(eth.dst), eth.type)

        # Make sure the Ethernet data contains an IP packet
        if not isinstance(eth.data, dpkt.ip.IP):
            print('Non IP Packet type not supported %s\n' % eth.data.__class__.__name__)
            continue

        # Now unpack the data within the Ethernet frame (the IP packet)
        # Pulling out src, dst, length, fragment info, TTL, and Protocol
        ip = eth.data

        # Pull out fragment information (flags and offset all packed into off field, so use bitmasks)
        do_not_fragment = bool(ip.off & dpkt.ip.IP_DF)
        more_fragments = bool(ip.off & dpkt.ip.IP_MF)
        fragment_offset = ip.off & dpkt.ip.IP_OFFMASK

        # Print out the info
        print('IP: %s -> %s   (len=%d ttl=%d DF=%d MF=%d offset=%d)\n' % \
              (inet_to_str(ip.src), inet_to_str(ip.dst), ip.len, ip.ttl, do_not_fragment, more_fragments, fragment_offset))



def test():
    """Open up a test pcap file and print out the packets"""
    with open('pcap222.pcap', 'rb') as f:
        pcap = dpkt.pcap.Reader(f)
        print_packets(pcap)

if __name__ == '__main__':
    test()

输出:

('packet num count :', 4474)
('Timestamp: ', '2017-08-01 03:55:03.314832')
('Ethernet Frame: ', '9c:5c:8e:76:bf:24', 'ec:88:8f:86:14:5c', 2048)
IP: 192.168.1.103 -> 211.90.25.31   (len=52 ttl=64 DF=1 MF=0 offset=0)

('packet num count :', 4475)
('Timestamp: ', '2017-08-01 03:55:03.485679')
('Ethernet Frame: ', '9c:5c:8e:76:bf:24', 'ec:88:8f:86:14:5c', 2048)
IP: 192.168.1.103 -> 180.97.33.12   (len=114 ttl=64 DF=0 MF=0 offset=0)

('packet num count :', 4476)
('Timestamp: ', '2017-08-01 03:55:03.486141')
('Ethernet Frame: ', '9c:5c:8e:76:bf:24', 'ec:88:8f:86:14:5c', 2048)
IP: 192.168.1.103 -> 119.75.222.122   (len=52 ttl=64 DF=1 MF=0 offset=0)

打印ICMP

检查ICMP数据包并显示ICMP内容。

#!/usr/bin/env python
"""
Use DPKT to read in a pcap file and print out the contents of the packets
This example is focused on the fields in the Ethernet Frame and IP packet
"""
import dpkt
import datetime
import socket
from dpkt.compat import compat_ord
import ctypes
import os


def mac_addr(address):
    """Convert a MAC address to a readable/printable string

       Args:
           address (str): a MAC address in hex form (e.g. '\x01\x02\x03\x04\x05\x06')
       Returns:
           str: Printable/readable MAC address
    """
    return ':'.join('%02x' % compat_ord(b) for b in address)


class sockaddr(ctypes.Structure):
    _fields_ = [("sa_family", ctypes.c_short),
                ("__pad1", ctypes.c_ushort),
                ("ipv4_addr", ctypes.c_byte * 4),
                ("ipv6_addr", ctypes.c_byte * 16),
                ("__pad2", ctypes.c_ulong)]

if hasattr(ctypes, 'windll'):
    WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA
    WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA
else:
    def not_windows():
        raise SystemError(
            "Invalid platform. ctypes.windll must be available."
        )
    WSAStringToAddressA = not_windows
    WSAAddressToStringA = not_windows


def inet_pton(address_family, ip_string):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = ctypes.c_int(ctypes.sizeof(addr))

    if WSAStringToAddressA(
            ip_string,
            address_family,
            None,
            ctypes.byref(addr),
            ctypes.byref(addr_size)
    ) != 0:
        raise socket.error(ctypes.FormatError())

    if address_family == socket.AF_INET:
        return ctypes.string_at(addr.ipv4_addr, 4)
    if address_family == socket.AF_INET6:
        return ctypes.string_at(addr.ipv6_addr, 16)

    raise socket.error('unknown address family')


def inet_ntop(address_family, packed_ip):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = ctypes.c_int(ctypes.sizeof(addr))
    ip_string = ctypes.create_string_buffer(128)
    ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string))

    if address_family == socket.AF_INET:
        if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr):
            raise socket.error('packed IP wrong length for inet_ntoa')
        ctypes.memmove(addr.ipv4_addr, packed_ip, 4)
    elif address_family == socket.AF_INET6:
        if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr):
            raise socket.error('packed IP wrong length for inet_ntoa')
        ctypes.memmove(addr.ipv6_addr, packed_ip, 16)
    else:
        raise socket.error('unknown address family')

    if WSAAddressToStringA(
            ctypes.byref(addr),
            addr_size,
            None,
            ip_string,
            ctypes.byref(ip_string_size)
    ) != 0:
        raise socket.error(ctypes.FormatError())

    return ip_string[:ip_string_size.value - 1]

# Adding our two functions to the socket library
if os.name == 'nt':
    socket.inet_pton = inet_pton
    socket.inet_ntop = inet_ntop

def inet_to_str(inet):
    return socket.inet_ntop(socket.AF_INET, inet)

def print_icmp(pcap):
    """Print out information about each packet in a pcap

       Args:
           pcap: dpkt pcap reader object (dpkt.pcap.Reader)
    """
    # packet num count
    r_num = 0
    # For each packet in the pcap process the contents
    for timestamp, buf in pcap:
        r_num=r_num+1
        print ('packet num count :' , r_num )
        # Unpack the Ethernet frame (mac src/dst, ethertype)
        eth = dpkt.ethernet.Ethernet(buf)

        # Make sure the Ethernet data contains an IP packet
        if not isinstance(eth.data, dpkt.ip.IP):
            print('Non IP Packet type not supported %s\n' % eth.data.__class__.__name__)
            continue

        # Now grab the data within the Ethernet frame (the IP packet)
        ip = eth.data

        # Now check if this is an ICMP packet
        if isinstance(ip.data, dpkt.icmp.ICMP):
            icmp = ip.data

            # Pull out fragment information (flags and offset all packed into off field, so use bitmasks)
            do_not_fragment = bool(ip.off & dpkt.ip.IP_DF)
            more_fragments = bool(ip.off & dpkt.ip.IP_MF)
            fragment_offset = ip.off & dpkt.ip.IP_OFFMASK

            # Print out the info
            print('Timestamp: ', str(datetime.datetime.utcfromtimestamp(timestamp)))
            print( 'Ethernet Frame: ', mac_addr(eth.src), mac_addr(eth.dst), eth.type)
            print( 'IP: %s -> %s   (len=%d ttl=%d DF=%d MF=%d offset=%d)' % \
                  (inet_to_str(ip.src), inet_to_str(ip.dst), ip.len, ip.ttl, do_not_fragment, more_fragments, fragment_offset))
            print('ICMP: type:%d code:%d checksum:%d data: %s\n' % (icmp.type, icmp.code, icmp.sum, repr(icmp.data)))



def test():
    """Open up a test pcap file and print out the packets"""
    with open('pcap222.pcap', 'rb') as f:
        pcap = dpkt.pcap.Reader(f)
        print_icmp(pcap)



if __name__ == '__main__':
    test()

输出:

('packet num count :', 377)
('Timestamp: ', '2017-08-01 03:45:56.403640')
('Ethernet Frame: ', 'ec:88:8f:86:14:5c', '9c:5c:8e:76:bf:24', 2048)
IP: 202.118.168.73 -> 192.168.1.103   (len=56 ttl=253 DF=0 MF=0 offset=0)
ICMP: type:3 code:13 checksum:52074 data: Unreach(data=IP(len=28, id=2556, off=16384, ttl=61, p=6, sum=36831, src='\xc0\xa8\x01g', dst='\xcal\x17q', opts='', data='n\xb1\x00P\x85)=]'))

打印HTTP请求

#!/usr/bin/env python
"""
Use DPKT to read in a pcap file and print out the contents of the packets
This example is focused on the fields in the Ethernet Frame and IP packet
"""
import dpkt
import datetime
import socket
from dpkt.compat import compat_ord
import ctypes
import os


def mac_addr(address):
    """Convert a MAC address to a readable/printable string

       Args:
           address (str): a MAC address in hex form (e.g. '\x01\x02\x03\x04\x05\x06')
       Returns:
           str: Printable/readable MAC address
    """
    return ':'.join('%02x' % compat_ord(b) for b in address)


class sockaddr(ctypes.Structure):
    _fields_ = [("sa_family", ctypes.c_short),
                ("__pad1", ctypes.c_ushort),
                ("ipv4_addr", ctypes.c_byte * 4),
                ("ipv6_addr", ctypes.c_byte * 16),
                ("__pad2", ctypes.c_ulong)]

if hasattr(ctypes, 'windll'):
    WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA
    WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA
else:
    def not_windows():
        raise SystemError(
            "Invalid platform. ctypes.windll must be available."
        )
    WSAStringToAddressA = not_windows
    WSAAddressToStringA = not_windows


def inet_pton(address_family, ip_string):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = ctypes.c_int(ctypes.sizeof(addr))

    if WSAStringToAddressA(
            ip_string,
            address_family,
            None,
            ctypes.byref(addr),
            ctypes.byref(addr_size)
    ) != 0:
        raise socket.error(ctypes.FormatError())

    if address_family == socket.AF_INET:
        return ctypes.string_at(addr.ipv4_addr, 4)
    if address_family == socket.AF_INET6:
        return ctypes.string_at(addr.ipv6_addr, 16)

    raise socket.error('unknown address family')


def inet_ntop(address_family, packed_ip):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = ctypes.c_int(ctypes.sizeof(addr))
    ip_string = ctypes.create_string_buffer(128)
    ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string))

    if address_family == socket.AF_INET:
        if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr):
            raise socket.error('packed IP wrong length for inet_ntoa')
        ctypes.memmove(addr.ipv4_addr, packed_ip, 4)
    elif address_family == socket.AF_INET6:
        if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr):
            raise socket.error('packed IP wrong length for inet_ntoa')
        ctypes.memmove(addr.ipv6_addr, packed_ip, 16)
    else:
        raise socket.error('unknown address family')

    if WSAAddressToStringA(
            ctypes.byref(addr),
            addr_size,
            None,
            ip_string,
            ctypes.byref(ip_string_size)
    ) != 0:
        raise socket.error(ctypes.FormatError())

    return ip_string[:ip_string_size.value - 1]

# Adding our two functions to the socket library
if os.name == 'nt':
    socket.inet_pton = inet_pton
    socket.inet_ntop = inet_ntop

def inet_to_str(inet):
    return socket.inet_ntop(socket.AF_INET, inet)

def print_http_requests(pcap):
    """Print out information about each packet in a pcap

       Args:
           pcap: dpkt pcap reader object (dpkt.pcap.Reader)
    """
    # packet num count
    r_num = 0
    # For each packet in the pcap process the contents
    for timestamp, buf in pcap:
        r_num=r_num+1
        print ('packet num count :' , r_num )
        # Unpack the Ethernet frame (mac src/dst, ethertype)
        eth = dpkt.ethernet.Ethernet(buf)

        # Make sure the Ethernet data contains an IP packet
        if not isinstance(eth.data, dpkt.ip.IP):
            print('Non IP Packet type not supported %s\n' % eth.data.__class__.__name__)
            continue

        # Now grab the data within the Ethernet frame (the IP packet)
        ip = eth.data

        # Check for TCP in the transport layer
        if isinstance(ip.data, dpkt.tcp.TCP):

            # Set the TCP data
            tcp = ip.data

            # Now see if we can parse the contents as a HTTP request
            try:
                request = dpkt.http.Request(tcp.data)
            except (dpkt.dpkt.NeedData, dpkt.dpkt.UnpackError):
                continue

            # Pull out fragment information (flags and offset all packed into off field, so use bitmasks)
            do_not_fragment = bool(ip.off & dpkt.ip.IP_DF)
            more_fragments = bool(ip.off & dpkt.ip.IP_MF)
            fragment_offset = ip.off & dpkt.ip.IP_OFFMASK

            # Print out the info
            print('Timestamp: ', str(datetime.datetime.utcfromtimestamp(timestamp)))
            print('Ethernet Frame: ', mac_addr(eth.src), mac_addr(eth.dst), eth.type)
            print('IP: %s -> %s   (len=%d ttl=%d DF=%d MF=%d offset=%d)' %
                  (inet_to_str(ip.src), inet_to_str(ip.dst), ip.len, ip.ttl, do_not_fragment, more_fragments, fragment_offset))
            print('HTTP request: %s\n' % repr(request))

            # Check for Header spanning acrossed TCP segments
            if not tcp.data.endswith(b'\r\n'):
                print('\nHEADER TRUNCATED! Reassemble TCP segments!\n')




def test():
    """Open up a test pcap file and print out the packets"""
    with open('pcap222.pcap', 'rb') as f:
        pcap = dpkt.pcap.Reader(f)
        print_http_requests(pcap)

if __name__ == '__main__':
    test()

输出:

Timestamp:  2004-05-13 10:17:08.222534
Ethernet Frame:  00:00:01:00:00:00 fe:ff:20:00:01:00 2048
IP: 145.254.160.237 -> 65.208.228.223   (len=519 ttl=128 DF=1 MF=0 offset=0)
HTTP request: Request(body='', uri='/download.html', headers={'accept-language': 'en-us,en;q=0.5', 'accept-encoding': 'gzip,deflate', 'connection': 'keep-alive', 'keep-alive': '300', 'accept': 'text/xml,application/xml,application/xhtml+xml,text/html;q=0.9,text/plain;q=0.8,image/png,image/jpeg,image/gif;q=0.2,*/*;q=0.1', 'user-agent': 'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.6) Gecko/20040113', 'accept-charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'host': 'www.ethereal.com', 'referer': 'http://www.ethereal.com/development.html'}, version='1.1', data='', method='GET')

Timestamp:  2004-05-13 10:17:10.295515
Ethernet Frame:  00:00:01:00:00:00 fe:ff:20:00:01:00 2048
IP: 145.254.160.237 -> 216.239.59.99   (len=761 ttl=128 DF=1 MF=0 offset=0)
HTTP request: Request(body='', uri='/pagead/ads?client=ca-pub-2309191948673629&random=1084443430285&lmt=1082467020&format=468x60_as&output=html&url=http%3A%2F%2Fwww.ethereal.com%2Fdownload.html&color_bg=FFFFFF&color_text=333333&color_link=000000&color_url=666633&color_border=666633', headers={'accept-language': 'en-us,en;q=0.5', 'accept-encoding': 'gzip,deflate', 'connection': 'keep-alive', 'keep-alive': '300', 'accept': 'text/xml,application/xml,application/xhtml+xml,text/html;q=0.9,text/plain;q=0.8,image/png,image/jpeg,image/gif;q=0.2,*/*;q=0.1', 'user-agent': 'Mozilla/5.0 (Windows; U; Windows NT 5.1; en-US; rv:1.6) Gecko/20040113', 'accept-charset': 'ISO-8859-1,utf-8;q=0.7,*;q=0.7', 'host': 'pagead2.googlesyndication.com', 'referer': 'http://www.ethereal.com/download.html'}, version='1.1', data='', method='GET')

...

打印出以太网IP

594 MB的pcap解析速度是127秒。

# coding=utf-8
import dpkt
import socket
import time
import ctypes
import os
import datetime

# 测试dpkt获取IP运行时间
# 使用dpkt获取时间戳、源IP、目的IP

class sockaddr(ctypes.Structure):
    _fields_ = [("sa_family", ctypes.c_short),
                ("__pad1", ctypes.c_ushort),
                ("ipv4_addr", ctypes.c_byte * 4),
                ("ipv6_addr", ctypes.c_byte * 16),
                ("__pad2", ctypes.c_ulong)]

if hasattr(ctypes, 'windll'):
    WSAStringToAddressA = ctypes.windll.ws2_32.WSAStringToAddressA
    WSAAddressToStringA = ctypes.windll.ws2_32.WSAAddressToStringA
else:
    def not_windows():
        raise SystemError(
            "Invalid platform. ctypes.windll must be available."
        )
    WSAStringToAddressA = not_windows
    WSAAddressToStringA = not_windows


def inet_pton(address_family, ip_string):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = ctypes.c_int(ctypes.sizeof(addr))

    if WSAStringToAddressA(
            ip_string,
            address_family,
            None,
            ctypes.byref(addr),
            ctypes.byref(addr_size)
    ) != 0:
        raise socket.error(ctypes.FormatError())

    if address_family == socket.AF_INET:
        return ctypes.string_at(addr.ipv4_addr, 4)
    if address_family == socket.AF_INET6:
        return ctypes.string_at(addr.ipv6_addr, 16)

    raise socket.error('unknown address family')


def inet_ntop(address_family, packed_ip):
    addr = sockaddr()
    addr.sa_family = address_family
    addr_size = ctypes.c_int(ctypes.sizeof(addr))
    ip_string = ctypes.create_string_buffer(128)
    ip_string_size = ctypes.c_int(ctypes.sizeof(ip_string))

    if address_family == socket.AF_INET:
        if len(packed_ip) != ctypes.sizeof(addr.ipv4_addr):
            raise socket.error('packed IP wrong length for inet_ntoa')
        ctypes.memmove(addr.ipv4_addr, packed_ip, 4)
    elif address_family == socket.AF_INET6:
        if len(packed_ip) != ctypes.sizeof(addr.ipv6_addr):
            raise socket.error('packed IP wrong length for inet_ntoa')
        ctypes.memmove(addr.ipv6_addr, packed_ip, 16)
    else:
        raise socket.error('unknown address family')

    if WSAAddressToStringA(
            ctypes.byref(addr),
            addr_size,
            None,
            ip_string,
            ctypes.byref(ip_string_size)
    ) != 0:
        raise socket.error(ctypes.FormatError())

    return ip_string[:ip_string_size.value - 1]

# Adding our two functions to the socket library
if os.name == 'nt':
    socket.inet_pton = inet_pton
    socket.inet_ntop = inet_ntop

def inet_to_str(inet):
    return socket.inet_ntop(socket.AF_INET, inet)


def getip(pcap):

    Num = 0
    for timestamp, buf in pcap:
        eth = dpkt.ethernet.Ethernet(buf)

        # 对没有IP段的包过滤掉
        if eth.type != dpkt.ethernet.ETH_TYPE_IP:
            continue

        ip = eth.data
        ip_src = inet_to_str(ip.src)
        ip_dst = inet_to_str(ip.dst)
        # 打印时间戳,源->目标

        #print(ts + " " + ip_src + "-->" + ip_dst)
        Num= Num+1
        print ('{0}\ttime:{1}\tsrc:{2}-->dst:{3} '.format(Num,timestamp,ip_src ,ip_dst))
        if eth.data.__class__.__name__ == 'IP':

            ip = '%d.%d.%d.%d' % tuple(map(ord, list(eth.data.dst)))

            if eth.data.data.__class__.__name__ == 'TCP':

                if eth.data.data.dport == 80:
                    print eth.data.data.data  # http 请求的数据

if __name__ == '__main__':
    starttime = datetime.datetime.now()
    f = open('pcap222.pcap', 'rb')  # 要以rb方式打开,用r方式打开会报错
    pcap = dpkt.pcap.Reader(f)
    getip(pcap)
    endtime = datetime.datetime.now()
    print ('time : {0} seconds '.format((endtime - starttime).seconds))

输出:

1290064	time:1501562988.75	src:113.142.85.151-->dst:192.168.1.103 
1290065	time:1501562988.75	src:192.168.1.103-->dst:113.142.85.151 

1290066	time:1501562988.75	src:192.168.1.103-->dst:113.142.85.151 

1290067	time:1501562988.75	src:113.142.85.151-->dst:192.168.1.103 
1290068	time:1501562988.75	src:192.168.1.103-->dst:113.142.85.151 

1290069	time:1501562988.76	src:192.168.1.103-->dst:113.142.85.151 

1290070	time:1501562988.76	src:122.228.91.14-->dst:192.168.1.103 
1290071	time:1501562988.76	src:192.168.1.103-->dst:113.142.85.151 

1290072	time:1501562988.76	src:113.142.85.151-->dst:192.168.1.103 
1290073	time:1501562988.76	src:192.168.1.103-->dst:113.142.85.151 

1290074	time:1501562988.76	src:192.168.1.103-->dst:113.142.85.151 
GET / HTTP/1.1
Accept: application/x-ms-application, image/jpeg, application/xaml+xml, image/gif, image/pjpeg, application/x-ms-xbap, application/vnd.ms-excel, application/vnd.ms-powerpoint, application/msword, */*
Accept-Language: zh-cn
User-Agent: Mozilla/4.0 (compatible; MSIE 6.0; Mac_PowerPC; en) Opera 9.24
Referer: -
Connection: Keep-Alive
Host: win7.shangshai-qibao.cn

0x7、参考

SMTP协议分析 https://yq.aliyun.com/wenji/262429

scapy 解析pcap文件总结 https://www.cnblogs.com/14061216chen/p/8093441.html

python之字符串格式化(format) https://www.cnblogs.com/benric/p/4965224.html

o
粉丝 0
博文 500
码字总数 0
作品 0
私信 提问
加载中
请先登录后再评论。
python dpkt 解析 pcap 文件

dpkt Tutorial #2: Parsing a PCAP File 原文链接:https://jon.oberheide.org/blog/2008/10/15/dpkt-tutorial-2-parsing-a-pcap-file/ 正如我们在dpkt库第一部分教程所示,dpkt库构建数据包......

osc_o8pkds53
2019/02/14
10
0
笔记整理6——用python实现IP流量分析

一.主要思路(1).通过ip获取地理位置主要是通过ip从我们获取的数据库中查询相应信地理位置信息程序实现中已经将数据库下载到本地 (2).对经过dpkt解析的对象pcap获取ip及其位置将经过dpkt.pcap...

osc_rg5fngik
2019/08/23
11
0
史上最全的python渗透测试工具合集

史上最全的python渗透测试工具合集 如果你热爱漏洞研究、逆向工程或者渗透测试,我强烈推荐你使用 Python 作为编程语言。它包含大量实用的库和工具,本文会列举其中部分精华。 Scapy , Scap...

叫我老村长
2019/11/28
0
0
python资源大全2

原文链接 网络 Scapy, Scapy3k: 发送,嗅探,分析和伪造网络数据包。可用作交互式包处理程序或单独作为一个库。 pypcap, Pcapy, pylibpcap: 几个不同 libpcap 捆绑的python库 libdnet: 低级网...

osc_wiag8a8b
2018/05/19
2
0
pcap文件格式

好吧 早就想写一个博客了,希望可以像大佬一样,不过一直没有行动。既然已经读研了,什么都玩过了,虽然也没有很想好好学习,但是以后还是要吃饭的呀。就当记录学习心得,也当做笔记吧,希望...

osc_tbh7hwku
2018/10/18
2
0

没有更多内容

加载失败,请刷新页面

加载更多

浅谈对python pandas中 inplace 参数的理解

这篇文章主要介绍了对python pandas中 inplace 参数的理解,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧 pandas 中 inplace 参数在很多函数中都会有,它的作用是:是否...

Linux就该这么学
23分钟前
13
0
C++ 从基本数据类型说起

前言 int 在32位和64位操作系统,都是四个字节长度。为了能编写一个在32位和64位操作系统都能稳定运行的程序,建议采用std::int32_t 或者std::int64_t指定数据类型。*与long随操作系统子长变...

osc_sxdofc9c
23分钟前
9
0
游戏音乐的作用以及起源

游戏音乐是由特殊的音乐、语言符号、美学符号组成,在电子游戏的发展下,游戏音乐越来越成熟,游戏音乐与美术相融合,能够带给玩家视觉与声音的感官冲击,形成游戏音乐所具有的独特的审美效果...

奇亿音乐
24分钟前
10
0
2020,最新Model的设计-APP重构之路

很多的app使用MVC设计模式来将“用户交互”与“数据和逻辑”分开,而model其中一个重要作用就是持久化。下文中设计的Model可能不是一个完美的,扩展性强的model范例,但在我需要重构的app中,...

osc_mfzkzkxi
24分钟前
4
0
面对职业瓶颈,iOS 开发人员应该如何突破?

我们经常看到 iOS 开发人员(各种能力水平都有)的一些问题,咨询有关专业和财务发展方面的建议。 这些问题有一个共同点:前面都会说“我现在遇到了职业困境”,然后会问一些诸如“我是否应该...

osc_gfpedeca
25分钟前
21
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部