大规模异步新闻爬虫: 用asyncio实现异步爬虫

2021/06/23 17:19
阅读数 117

“等了好久终于等到今天,梦里好久终于把梦实现”,脑海里不禁响起来刘德华这首歌。是啊,终于可以写我最喜欢的异步爬虫了。前面那么多章节,一步一步、循序渐进的讲解,实在是“唠叨”了不少,可是为了小猿们能由浅入深的学习爬虫,老猿我又不得不说那么多“唠叨”,可把我给憋死了,今天就大书特书异步爬虫,说个痛快!

用asyncio实现一个异步新闻爬虫

关于异步IO这个概念,可能有些小猿们不是非常明白,那就先来看看异步IO是怎么回事儿。
为了大家能够更形象得理解这个概念,我们拿放羊来打个比方:

  • 下载请求开始,就是放羊出去吃草;
  • 下载任务完成,就是羊吃饱回羊圈。

同步放羊的过程就是这样的:
羊倌儿小同要放100只羊,他就先放一只羊出去吃草,等羊吃饱了回来在放第二只羊,等第二只羊吃饱了回来再放第三只羊出去吃草……这样放羊的羊倌儿实在是……

再看看异步放羊的过程:
羊倌儿小异也要放100只羊,他观察后发现,小同放羊的方法比较笨,他觉得草地一下能容下10只羊(带宽)吃草,所以它就一次放出去10只羊等它们回来,然后他还可以给羊剪剪羊毛。有的羊吃得快回来的早,他就把羊关到羊圈接着就再放出去几只,尽量保证草地上都有10只羊在吃草。

很明显,异步放羊的效率高多了。同样的,网络世界里也是异步的效率高。

到了这里,可能有小猿要问,为什么不用多线程、多进程实现爬虫呢? 没错,多线程和多进程也可以提高前面那个同步爬虫的抓取效率,但是异步IO提高的更多,也更适合爬虫这个场景。后面机会我们可以对比一下三者抓取的效率。

1. 异步的downloader

还记得我们之前使用requests实现的那个downloader吗?同步情况下,它很好用,但不适合异步,所以我们要先改造它。幸运的是,已经有aiohttp模块来支持异步http请求了,那么我们就用aiohttp来实现异步downloader。

async def fetch(session, url, headers=None, timeout=9):
    _headers = {
        'User-Agent': ('Mozilla/5.0 (compatible; MSIE 9.0; '
                       'Windows NT 6.1; Win64; x64; Trident/5.0)'),
    }
    if headers:
        _headers = headers
    try:
        async with session.get(url, headers=_headers, timeout=timeout) as response:
            status = response.status
            html = await response.read()
            encoding = response.get_encoding()
            if encoding == 'gb2312':
                encoding = 'gbk'
            html = html.decode(encoding, errors='ignore')
            redirected_url = str(response.url)
    except Exception as e:
        msg = 'Failed download: {} | exception: {}, {}'.format(url, str(type(e)), str(e))
        print(msg)
        html = ''
        status = 0
        redirected_url = url
    return status, html, redirected_url

这个异步的downloader,我们称之为fetch(),它有两个必须参数:

  • seesion: 这是一个aiohttp.ClientSession的对象,这个对象的初始化在crawler里面完成,每次调用fetch()时,作为参数传递。
  • url:这是需要下载的网址。

实现中使用了异步上下文管理器(async with),编码的判断我们还是用cchardet来实现。
有了异步下载器,我们的异步爬虫就可以写起来啦~

2. 异步新闻爬虫

跟同步爬虫一样,我们还是把整个爬虫定义为一个类,它的主要成员有:

  • self.urlpool 网址池
  • self.loop 异步的事件循环
  • self.seesion aiohttp.ClientSession的对象,用于异步下载
  • self.db 基于aiomysql的异步数据库连接
  • self._workers 当前并发下载(放出去的羊)的数量

通过这几个主要成员来达到异步控制、异步下载、异步存储(数据库)的目的,其它成员作为辅助。爬虫类的相关方法,参加下面的完整实现代码:

#!/usr/bin/env python3
# File: news-crawler-async.py
# Author: veelion

import traceback
import time
import asyncio
import aiohttp
import urllib.parse as urlparse
import farmhash
import lzma

import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

import sanicdb

from urlpool import UrlPool
import functions as fn
import config


class NewsCrawlerAsync:
    def __init__(self, name):
        self._workers = 0
        self._workers_max = 30
        self.logger = fn.init_file_logger(name+ '.log')

        self.urlpool = UrlPool(name)

        self.loop = asyncio.get_event_loop()
        self.session = aiohttp.ClientSession(loop=self.loop)
        self.db = sanicdb.SanicDB(
            config.db_host,
            config.db_db,
            config.db_user,
            config.db_password,
            loop=self.loop
        )

    async def load_hubs(self,):
        sql = 'select url from crawler_hub'
        data = await self.db.query(sql)
        self.hub_hosts = set()
        hubs = []
        for d in data:
            host = urlparse.urlparse(d['url']).netloc
            self.hub_hosts.add(host)
            hubs.append(d['url'])
        self.urlpool.set_hubs(hubs, 300)

    async def save_to_db(self, url, html):
        urlhash = farmhash.hash64(url)
        sql = 'select url from crawler_html where urlhash=%s'
        d = await self.db.get(sql, urlhash)
        if d:
            if d['url'] != url:
                msg = 'farmhash collision: %s <=> %s' % (url, d['url'])
                self.logger.error(msg)
            return True
        if isinstance(html, str):
            html = html.encode('utf8')
        html_lzma = lzma.compress(html)
        sql = ('insert into crawler_html(urlhash, url, html_lzma) '
               'values(%s, %s, %s)')
        good = False
        try:
            await self.db.execute(sql, urlhash, url, html_lzma)
            good = True
        except Exception as e:
            if e.args[0] == 1062:
                # Duplicate entry
                good = True
                pass
            else:
                traceback.print_exc()
                raise e
        return good

    def filter_good(self, urls):
        goodlinks = []
        for url in urls:
            host = urlparse.urlparse(url).netloc
            if host in self.hub_hosts:
                goodlinks.append(url)
        return goodlinks

    async def process(self, url, ishub):
        status, html, redirected_url = await fn.fetch(self.session, url)
        self.urlpool.set_status(url, status)
        if redirected_url != url:
            self.urlpool.set_status(redirected_url, status)
        # 提取hub网页中的链接, 新闻网页中也有“相关新闻”的链接,按需提取
        if status != 200:
            return
        if ishub:
            newlinks = fn.extract_links_re(redirected_url, html)
            goodlinks = self.filter_good(newlinks)
            print("%s/%s, goodlinks/newlinks" % (len(goodlinks), len(newlinks)))
            self.urlpool.addmany(goodlinks)
        else:
            await self.save_to_db(redirected_url, html)
        self._workers -= 1

    async def loop_crawl(self,):
        await self.load_hubs()
        last_rating_time = time.time()
        counter = 0
        while 1:
            tasks = self.urlpool.pop(self._workers_max)
            if not tasks:
                print('no url to crawl, sleep')
                await asyncio.sleep(3)
                continue
            for url, ishub in tasks.items():
                self._workers += 1
                counter += 1
                print('crawl:', url)
                asyncio.ensure_future(self.process(url, ishub))

            gap = time.time() - last_rating_time
            if gap > 5:
                rate = counter / gap
                print('\tloop_crawl() rate:%s, counter: %s, workers: %s' % (round(rate, 2), counter, self._workers))
                last_rating_time = time.time()
                counter = 0
            if self._workers > self._workers_max:
                print('====== got workers_max, sleep 3 sec to next worker =====')
                await asyncio.sleep(3)

    def run(self):
        try:
            self.loop.run_until_complete(self.loop_crawl())
        except KeyboardInterrupt:
            print('stopped by yourself!')
            del self.urlpool
            pass


if __name__ == '__main__':
    nc = NewsCrawlerAsync('yrx-async')
    nc.run()

爬虫的主流程是在方法loop_crawl()里面实现的。它的主体是一个while循环,每次从self.urlpool里面获取定量的爬虫作为下载任务(从羊圈里面选出一批羊),通过ensure_future()开始异步下载(把这些羊都放出去)。而process()这个方法的流程是下载网页并存储、提取新的url,这就类似羊吃草、下崽等。

通过self._workersself._workers_max来控制并发量。不能一直并发,给本地CPU、网络带宽带来压力,同样也会给目标服务器带来压力。

至此,我们实现了同步和异步两个新闻爬虫,分别实现了NewsCrawlerSync和NewsCrawlerAsync两个爬虫类,他们的结构几乎完全一样,只是抓取流程一个是顺序的,一个是并发的。小猿们可以通过对比两个类的实现,来更好的理解异步的流程。

爬虫知识点

1. uvloop模块
uvloop这个模块是用Cython编写建立在libuv库之上,它是asyncio内置事件循环的替代,使用它仅仅是多两行代码而已:

import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

uvloop使得asyncio很快,比odejs、gevent和其它Python异步框架的快至少2倍,接近于Go语言的性能。

uvloop作者的性能测试

这是uvloop作者的性能对比测试。
目前,uvloop不支持Windows系统和Python 3.5 及其以上版本,这在它源码的setup.py文件中可以看到:

if sys.platform in ('win32', 'cygwin', 'cli'):
    raise RuntimeError('uvloop does not support Windows at the moment')

vi = sys.version_info
if vi < (3, 5):
    raise RuntimeError('uvloop requires Python 3.5 or greater')

所以,使用Windows的小猿们要运行异步爬虫,就要把uvloop那两行注释掉哦。

思考题

1. 给同步的downloader()或异步的fetch()添加功能
或许有些小猿还没见过这样的html代码,它出现在<head>里面:

<meta http-equiv="refresh" content="5; url=https://example.com/">

它的意思是,告诉浏览器在5秒之后跳转到另外一个url:https://example.com/
那么问题来了,请给downloader(fetch())添加代码,让它支持这个跳转。

2. 如何控制hub的刷新频率,及时发现最新新闻
这是我们写新闻爬虫要考虑的一个很重要的问题,我们实现的新闻爬虫中并没有实现这个机制,小猿们来思考一下,并对手实现实现。

到这老猿要讲的实现一个异步定向新闻爬虫已经讲完了,感谢你的阅读,有任何建议和问题请再下方留言,我会一一回复你,你也可以关注 猿人学 公众号,那里可以及时看到我新发的文章。

后面的章节,是介绍如何使用工具,比如如何使用charles抓包,如何管理浏览器cookie,如何使用selenium等等,也欢迎你的阅读。

 

猿人学banner宣传图

 

我的公众号:猿人学 Python 上会分享更多心得体会,敬请关注。

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部