自制爬虫框架

2020/10/21 11:30
阅读数 20

自制python爬虫程序模板(爬虫小白亦可用)


  在平时挥手大干项目的过程中,时不时会有一些小的爬虫任务需要处理,因此专门写了一个爬虫框架,基本覆盖平常用到的网站,觉得使用效果不错,分享出来给大家使用,也请各路大神走过路过提些好的意见。
  接下来为大家简单介绍一下每个模块实现过程及思路。本文结束后处会附全部代码,前面代码只是便于大家理解,无需挨个粘贴。

1.mysql数据库链接

  本程序使用mysql数据库读取和保存数据,为了工作过程中的安全和方便,我们通过另外一个程序将数据库链接账号密码等数据,保存中windows注册表中,可通过 win+regedit 调出查看。(此块仅适用于windows系统,若需在linux上使用,则不使用此模块链接数据库),本模块中数据库链接方式见代码:

    def read_setttings_zhuce(self, file, winn_c_u=winreg.HKEY_CURRENT_USER):
        """
            读取注册表中的设置
        """
        parentkey = winreg.OpenKey(winn_c_u, file)
        # 获取该键的所有键值,因为没有方法可以获取键值的个数,所以只能用这种方法进行遍历
        item = dict()
        try:
            i = 0
            while True:
                # EnumValue方法用来枚举键值,EnumKey用来枚举子键
                name, value, type = winreg.EnumValue(parentkey, i)
                item[name] = value
                i += 1
        except Exception as e:
            pass
        return item
    
    def __init__(self, start_p):
        # 注意,super().__init__() 一定要写
        # 而且要写在最前面,否则会报错。
        super().__init__()
        self.item_fwq = self.read_setttings_zhuce("Software\lxl\lxl_program")
        # 链接数据库
        self.conn = pymysql.connect(
            user=self.item_fwq["user"], password=self.item_fwq["password"], host=self.item_fwq["host"], port=int(self.item_fwq["port"]),
            database=self.item_fwq["database"], use_unicode=True,
            charset="utf8")
        self.start_p = start_p
        print("数据库开启中......")
        # 获取游标
        self.cursor = self.conn.cursor()

2. 页面请求过程

  此处说明一下,整个模块是通过dict来传递数据的,因此在使用过程中,可以随时随地添加我们需要传递的参数。我们平常用到的页面一般是get或post请求方式,get方式通过修改传递的url链接即可请求获取数据,post方式通过data参数传递获取数据。因此将两种方式分开处理。同时将请求回来的数据做deocde解码处理,一般遇到的有utf8或者GBK的,我写了两种,如果你们使用过程中出现其他的解码,添加上去即可,此处代码比较low我就不贴在此处了,各位结尾处直接复制即可,(我贴几行重点吧,否则好像显得此处特殊)。

        item_fwq_ip = read_setttings_zhuce("Software\lxl\lxl_program")
        # 读取实时写入windows注册表中的ip代理  本人喜欢使用无忧代理 不是打广告,而是品质确实好
        proxies = {"http": "%s" % item_fwq_ip["ip"], "https": "%s" % item_fwq_ip["ip"]}
        headers = {
             "user-agent": item_fwq_ip['user_agent']
        }
        try:
            response = requests.get(url=url, headers=headers, timeout=20).content
            if response:
                return response
        except Exception as f:
            print("重新请求")
        try:
            response = requests.post(url=url, headers=headers, data=data,timeout=20).content
            if response:
                return response
        except Exception as f:
            print("重新请求")

3. 数据提取处理

  页面请求成功之后,会返回三种格式,一种是html格式,一种是json格式,还有一种是我请求不到数据返回的无数据结果(未针对此处如何处理,若有需要,自行处理)。针对html格式我们使用xpath解析数据(本来想着能不能通过代码去自动处理xpath,太忙没时间,以后补上吧);针对json格式,就简单许多了,直接对应读取出来即可。两种格式处理之后,将数据以dict格式传递至数据保存处理中即可 见代码:

    def response_json(self, response, meta={}):
        """
            json 格式解析
        """
        list_data = response['result']['data']
        for ds in list_data:
            item = dict()
            """
                此处可以对数据进行处理,若不需特殊处理的 则直接合并到item字典中,保存入数据库
                列: item["pid] = ds['id']
            """
            item = {**item, **meta}
            where_list = ["pid"]  # 此处添加mysql保存判断条件中查询的字段 可写多个字段
            table_name = 'your_databases_tablename'  # 此处添加你需要保存的数据表名称 注: 若没有新建数据表, 代码可自动建立新的数据表
            self.mysql_f_item(item, table_name=table_name, where_list=where_list)
    
    def response_html(self, response, meta={}):
        """
            html 格式解析
        """
        list_response = response.xpath('//div[@class="name"]')
        for resp in list_response:
            item = dict()
            """
                此处可以对数据进行xpath解析处理,保存入数据库
                列: item["pid] = resp.xpath('./a/@href')[0]
            """
            print(item)
            item = {**item, **meta}
            where_list = ["pid"]  # 此处添加mysql保存判断条件中查询的字段 可写多个字段
            table_name = "your_databases_tablename" # 此处添加你需要保存的数据表名称 注: 若没有新建数据表, 代码可自动建立新的数据表
            self.mysql_f_item(item, table_name, where_list=where_list)

4. 数据保存处理。

  数据库选用mysql保存,在此模块中,我加入了自动创建表和自动拼接sql的功能,传入一个数据表名称,若存在则进行下一步处理,不存在会进行数据表创建,此时dict中的字段名称就起到了一定的作用,我通过字段中所带的值,作为创建字段的类型(此处也可自行添加);同时数据保存过程中,有时会需要做判重,通过在指定列表 where_list 中添加字段即可(默认为空,不判重。其他的没什么了都是一些常规操作了。见代码:

        sql = "insert into %s(" % table_name
        for item in lst:
            sql = sql + "`%s`," % item
        sql = sql.strip(',') + ") values ("
        if list_flag is False:
            for item in lst:
                sql = sql + "'{%s}'," % item
        else:
            for i in range(len(lst)):
                sql = sql + "'{0[%s]}'," % i
        sql = sql.strip(',') + ")"
        return sql

                sql_begin = """CREATE TABLE `%s` (  `id` int(11) NOT NULL AUTO_INCREMENT,""" % table_name
        sql_end = """ PRIMARY KEY (`id`)
                    ) ENGINE=%s AUTO_INCREMENT=0 DEFAULT CHARSET=%s;""" % (engine, charset)
        sql_temp = " `%s` varchar(256) DEFAULT NULL,"
        sql_temp_time = "`%s` datetime DEFAULT NULL,"
        sql_temp_content = "`%s` text,"
        sql_temp_sgin = "`%s` enum('0','1') DEFAULT '0',"
        sql = str()
        for item in lst:
            # 生成新的数据表时 可根据item中的字段名称 来决定数据库中字段的类型
            if "time" in item:
                sql += sql_temp_time % item
            elif "content" in item:
                sql += sql_temp_content % item
            elif "sgin" in item:
                sql += sql_temp_sgin % item
            else:
                sql += sql_temp % (item)

        sql = sql_begin + sql + sql_end
        return sql

  好了,这次就写到这里吧,如果之后对这个模块做大的更新或调整再说吧。 如果对以上代码有不懂之处,可以发送至邮件 xiang_long_liu@163.com,大家共同探讨吧。
结尾处付全部代码:

import requests, winreg, pymysql, re, json
from lxml import etree
from threading import Thread
import settings  # 将服务器数据库等链接方式写入windows注册表中,然后再在该程序中读取出来


def read_setttings_zhuce(file, winn_c_u=winreg.HKEY_CURRENT_USER):
    """
        读取注册表中的设置
    """
    parentkey = winreg.OpenKey(winn_c_u, file)
    # 获取该键的所有键值,因为没有方法可以获取键值的个数,所以只能用这种方法进行遍历
    item = dict()
    try:
        i = 0
        while True:
            # EnumValue方法用来枚举键值,EnumKey用来枚举子键
            name, value, type = winreg.EnumValue(parentkey, i)
            # print(name, value)
            item[name] = value
            i += 1
    except Exception as e:
        pass
    return item


class ALi_Main(Thread):

    def read_setttings_zhuce(self, file, winn_c_u=winreg.HKEY_CURRENT_USER):
        """
            读取注册表中的设置
        """
        parentkey = winreg.OpenKey(winn_c_u, file)
        # 获取该键的所有键值,因为没有方法可以获取键值的个数,所以只能用这种方法进行遍历
        item = dict()
        try:
            i = 0
            while True:
                # EnumValue方法用来枚举键值,EnumKey用来枚举子键
                name, value, type = winreg.EnumValue(parentkey, i)
                item[name] = value
                i += 1
        except Exception as e:
            pass
        return item

    def __init__(self, start_p):
        # 注意,super().__init__() 一定要写
        # 而且要写在最前面,否则会报错。
        super().__init__()
        self.item_fwq = self.read_setttings_zhuce("Software\lxl\lxl_program")
        # 链接数据库
        self.conn = pymysql.connect(
            user=self.item_fwq["user"], password=self.item_fwq["password"], host=self.item_fwq["host"], port=int(self.item_fwq["port"]),
            database=self.item_fwq["database"], use_unicode=True,
            charset="utf8")
        self.start_p = start_p
        print("数据库开启中......")
        # 获取游标
        self.cursor = self.conn.cursor()

    def main(self, url="https://www.baidu.com/", formdata={}, meta={}):
        """
            开关
        """
        response = self.url_f_requests(url, formdata)
        if response != "无结果":
            # 对返回的结果解码
            response = self.response_decode(response)
            print(response)
            response, fangshi = self.t_f_response_json_html(response)
            if fangshi is "json":
                self.response_json(response, meta)
            elif fangshi is "html":
                self.response_html(response, meta)
            else:
                print(fangshi)
                print("返回的页面数据有误请检查")
        else:
            print("数据无结果,未获取到")
            
    def url_f_requests(self, url, formdata):
        """
            get / post 请求发送
        """
        if formdata == {}:
            response = self.requests_url(url)
            print("{INFO}:url以 get 方式请求")
            # print(response)
        else:
            response = self.requests_url_post(url, formdata)
            print("{INFO}:url以 post 方式请求")
            # print(response)
        return response
            
    def t_f_response_json_html(self, response):
        """
            判断返回的结果
        """
        try:
            response = json.loads(response)
            print("{INFO}:数据以json格式返回")
            return response, "json"
        except Exception as f:
            try:
                response = etree.HTML(response)
                print("{INFO}:数据以html格式返回")
                return response, "html"
            except Exception as f:
                response = response
                return response, "None"
        
    def response_decode(self, response):
        """
            对返回的结果解码
        """
        try:
            response = response.decode()
            print("{INFO}:数据以utf-8解码")
        except Exception as f:
            try:
                response = response.decode("GBK")
                print("{INFO}:数据以 GBK 解码")
            except Exception as f:
                print("{INFO}:数据以未指定解码方式返回")
                response = response
        return response
        
    def response_json(self, response, meta={}):
        """
            json 格式解析
        """
        list_data = response['result']['data']
        for ds in list_data:
            item = dict()
            """
                此处可以对数据进行处理,若不需特殊处理的 则直接合并到item字典中,保存入数据库
                列: item["pid] = ds['id']
            """
            item = {**item, **meta}
            where_list = ["pid"]  # 此处添加mysql保存判断条件中查询的字段 可写多个字段
            table_name = 'your_databases_tablename'  # 此处添加你需要保存的数据表名称 注: 若没有新建数据表, 代码可自动建立新的数据表
            self.mysql_f_item(item, table_name=table_name, where_list=where_list)
    
    def response_html(self, response, meta={}):
        """
            html 格式解析
        """
        list_response = response.xpath('//div[@class="name"]')
        for resp in list_response:
            item = dict()
            """
                此处可以对数据进行xpath解析处理,保存入数据库
                列: item["pid] = resp.xpath('./a/@href')[0]
            """
            print(item)
            item = {**item, **meta}
            where_list = ["pid"]  # 此处添加mysql保存判断条件中查询的字段 可写多个字段
            table_name = "your_databases_tablename" # 此处添加你需要保存的数据表名称 注: 若没有新建数据表, 代码可自动建立新的数据表
            self.mysql_f_item(item, table_name, where_list=where_list)
        
    def mysql_f_item(self, item, table_name="new_table_name", where_list=[]):
        """
            保存创建mysql数据库
        """
        lst = item.keys()
        # print(lst)
        insert_sql = self.create_insert_sql_for_list(table_name=table_name, lst=lst)
        insert_sql = insert_sql.format(**item)
        # print(insert_sql)
        select_sql = self.create_select_sql(table_name=table_name, where_list=where_list)
        select_sql = select_sql.format(**item)
        # print(select_sql)
        self.insert_mysql(insert_sql=insert_sql, select_sql=select_sql, table_name=table_name, lst=lst)
        print("--------------------------------")

    def create_insert_sql_for_list(self, table_name, lst, list_flag=False):
        """
        动态生成sql文(单条)
        :param table_name:表名
        :param lst:插入的数据列表
        :param list_flag: true:代表lst字段是 list嵌套list,   false:代表list嵌套dict
        :return:返回单条插入的sql
        """
        sql = "insert into %s(" % table_name
        for item in lst:
            sql = sql + "`%s`," % item
        sql = sql.strip(',') + ") values ("
        if list_flag is False:
            for item in lst:
                sql = sql + "'{%s}'," % item
        else:
            for i in range(len(lst)):
                sql = sql + "'{0[%s]}'," % i
        sql = sql.strip(',') + ")"
        return sql

    def create_select_sql(self, table_name, where_list):
        """
            动态生成sql文
        """
        if where_list == []:
            return ""
        else:
            sql = 'select id from %s where' % table_name
            for i in range(len(where_list)):
                sql = sql + " `%s` = '{%s}' and " % (where_list[i], where_list[i])
            sql = sql.strip('and ')
            # print(sql)
            return sql

    def create_table(self, table_name, lst, engine='MyISAM', charset='utf8'):
        """
        生成建表sql
        :param table_name:表名
        :param lst:字段列表
        :param engine:数据库类型
        :param charset:字符集
        :return:sql
        """
        sql_begin = """CREATE TABLE `%s` (  `id` int(11) NOT NULL AUTO_INCREMENT,""" % table_name
        sql_end = """ PRIMARY KEY (`id`)
                    ) ENGINE=%s AUTO_INCREMENT=0 DEFAULT CHARSET=%s;""" % (engine, charset)
        sql_temp = " `%s` varchar(256) DEFAULT NULL,"
        sql_temp_time = "`%s` datetime DEFAULT NULL,"
        sql_temp_content = "`%s` text,"
        sql_temp_sgin = "`%s` enum('0','1') DEFAULT '0',"
        sql = str()
        for item in lst:
            # 生成新的数据表时 可根据item中的字段名称 来决定数据库中字段的类型
            if "time" in item:
                sql += sql_temp_time % item
            elif "content" in item:
                sql += sql_temp_content % item
            elif "sgin" in item:
                sql += sql_temp_sgin % item
            else:
                sql += sql_temp % (item)

        sql = sql_begin + sql + sql_end
        return sql

    def insert_mysql(self, insert_sql, select_sql='', update_sql='', table_name='', lst=()):
        """
            保存数据
        """
        while True:
            # 获取游标
            self.conn.ping(reconnect=True)
            if select_sql:
                try:
                    self.cursor.execute(select_sql)
                    if self.cursor.fetchone() is None:
                        print(insert_sql)
                        try:
                            self.cursor.execute(insert_sql)
                            self.conn.commit()
                            print("数据保存中......")
                            if update_sql:
                                self.cursor.execute(update_sql)
                                self.conn.commit()
                                print("数据更新中......")
                            break
                        except Exception as f:
                            # print(insert_sql)
                            print(f)
                            print("数据保存失败")
                            break
                    else:
                        print("数据已存在")
                    break
                except Exception as f:
                    print(f)
                    # 首次执行 创建一个新的数据表
                    if "Table" in str(f) and "doesn't exist" in str(f):
                        print("*" * 100)
                        print("创建数据库中......")
                        sql = self.create_table(table_name=table_name, lst=lst)
                        self.cursor.execute(sql)
                        self.conn.commit()
                    else:
                        break
            else:
                try:
                    print(insert_sql)
                    print("数据保存中......")
                    self.cursor.execute(insert_sql)
                    self.conn.commit()
                    break
                except Exception as f:
                    print(f)
                    # 首次执行 创建一个新的数据表
                    if "Table" in str(f) and "doesn't exist" in str(f):
                        print("*" * 100)
                        print("创建数据库中......")
                        sql = self.create_table(table_name=table_name, lst=lst)
                        self.cursor.execute(sql)
                        self.conn.commit()
                    else:
                        print("保存失败")
                        break

    def getDropStr(self, l_strHtml):
        """清洗字符串"""
        strList = re.findall(
            r'[\u4e00-\u9fa5a-zA-Z0-9,.;?!_\]\'\"\[{}+-\u2014\u2026\uff1b\u3002\uff1b\uff0c\uff1a\u201c\u201d\uff08\uff09\u3001\uff1f\u300a\u300b]',
            l_strHtml)
        return "".join(strList)

    def requests_url(self, url, data=None):
        """
            发送请求,返回相应
        """
        item_fwq_ip = read_setttings_zhuce("Software\lxl\lxl_program")
        # 读取实时写入windows注册表中的ip代理  本人喜欢使用无忧代理 不是打广告,而是品质确实好
        proxies = {"http": "%s" % item_fwq_ip["ip"], "https": "%s" % item_fwq_ip["ip"]}
        headers = {
             "user-agent": item_fwq_ip['user_agent']
        }
        try:
            response = requests.get(url=url, headers=headers, timeout=20).content
            if response:
                return response
        except Exception as f:
            print("重新请求")
            i = 0
            while True:
                i += 1
                if i >= 5:
                    return "无结果"
                try:
                    response = requests.get(url=url, headers=headers, proxies=proxies, timeout=20).content
                    if response:
                        return response
                except Exception as f:
                    print("重新请求")
        
    def requests_url_post(self, url, data):
        """
            发送请求,返回相应
        """
        item_fwq_ip = read_setttings_zhuce("Software\lxl\lxl_program")
        # 读取实时写入windows注册表中的ip代理  本人喜欢使用无忧代理 不是打广告,而是品质确实好
        proxies = {"http": "%s" % item_fwq_ip["ip"], "https": "%s" % item_fwq_ip["ip"]}
        headers = {
            "user-agent": item_fwq_ip['user_agent']
        }
        try:
            response = requests.post(url=url, headers=headers, data=data,timeout=20).content
            if response:
                return response
        except Exception as f:
            print("重新请求")
            i = 0
            while True:
                i += 1
                if i >= 5:
                    return "无结果"
                try:

                    response = requests.post(url=url, headers=headers, data=data, proxies=proxies, timeout=20).content
                    if response:
                        return response
                except Exception as f:
                    print("重新请求")
    
    def __del__(self):
        self.cursor.close()
        self.conn.close()
        print("数据库关闭中......")


def main_thread(number_p):
    """
        多线程启动
        若使用多线程爬取是 将 main 函数改为 run 函数 传递参数控制url使用个数从而决定多线程条数
    """
    print("多线程启动程序")
    list_thread = list()
    for p in range(0, number_p+1000, 1000):
        thread = ALi_Main(p)
        list_thread.append(thread)
    
    for threads in list_thread:
        threads.start()
    
    for threads in list_thread:
        threads.join()


if __name__ == '__main__':
    # 初始化
    # settings.main()
    
    alm = ALi_Main(0)

    meta = dict()
    meta["key_name"] = "传值"
    url = "https://search.sina.com.cn/?range=title&q=" + str(meta["key_name"]) + "&c=news&time=&ie=utf-8&col=&source=&from=&country=&size=&a=&page=1&pf=0&ps=0&dpc=1"
    print(url)
    alm.main(url=url, meta=meta)


展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部