文档章节

爬虫代理小记与aiohttp代理尝试

mickelfeng
 mickelfeng
发布于 2017/05/16 17:54
字数 3289
阅读 179
收藏 0

代理IP方案简述

  • 快速抓取网站时,要应对IP每分钟访问次数有限制甚至会封IP的服务器,使用代理IP可以帮助我们。
  • Kaito爬虫代理服务讲得很清楚,推荐。
  • Github上现成的开源代理爬虫也不少,如:
    qiyeboy/IPProxyPool
    jhao104/proxy_pool
    awolfly9/IPProxyTool
    fancoo/Proxy
    derekhe/mobike-crawler/modules/ProxyProvider.py
  • 思路也都很清楚,从更多的代理网站上爬取免费代理,存入自己的数据库,定时更新。在拿到代理IP后,验证该代理的有效性。并且提供简单的API来获取代理IP。
  • 七夜的博客python开源IP代理池--IPProxys详细地阐释了自己的代码。
  • 大部分的代理网站的爬取还是比较简单的,在上述开源的代码中包含了不少代理网站的爬取与解析。困难点的有js反爬机制,也都被用selenium操作无头webkit或者js代码解析以及python的js代码执行库所解决。此外有趣的是,在上面的开源代码中出现了用爬取得到的代理来访问代理网站的情况。
  • 定时刷新代理,有自定义的代理定时刷新模块,也可用celery定时任务。
  • 验证有效性的方式有:
  • API的提供可以用BaseHTTPServer拓展下,也可用简便的flask或者Django加上插件提供restful api服务。
  • 对于免费的代理IP来说,最重要的一是量大,就算有很大比例无效的,还是能拿到一些高质量的代理。一个网站的未筛选代理能有几千个,多个网站就很可观了,当然要考虑到重复。
  • 再就是代理IP的筛选机制,不少开源库都添加了评分机制,这是非常重要的。例如利用对累计超时次数以及成功率的加权来评判代理IP的质量。在每次使用后都对代理IP的情况进行评价,以此来刷新数据库,方便下一次选取优质的代理IP。
  • 如何对代理IP进行评价,在成功和失败的各种情况中如何奖惩,筛选出最优质的代理IP是非常重要的。
  • 此外,每个代理IP的使用也要考虑是否要设置一定的使用间隔,避免过于频繁导致失效。

尝试

  • 自然,首先要做的就是从免费代理网站上获取大量代理IP,我选择了最方便的66ip,接口很简单,一次性访问可以拿到3000左右的代理,当然,频繁访问会导致js反爬机制,这时再简单地使用selenium+phantomJs即可。
      url = ("http://m.66ip.cn/mo.php?tqsl={proxy_number}")
      url = url.format(proxy_number=10000)
      html = requests.get(url, headers=headers).content
      html = html.decode(chardet.detect(html)['encoding'])
      pattern = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5}'
      all_ip = re.findall(pattern, html)
  • 然后就是设置代理IP的奖惩机制,我参考了摩拜单车爬虫源码及解析使用类,看起来简单清晰,每个代理IP对象拥有自己的ip与分数,@property令它们可以被点操作符访问。初始分数为100分,这里分数都弄成整数。如果代理成功,按照延时的大小来给予奖励,即调用代理的相应方法,最高10分,而超时扣10分,连接错误扣30分,其它错误扣50分(可以酌情修改)。为了便于代理IP之间的比较,修改了__lt__方法。

    class Proxy:
      def __init__(self, ip):
          self._url = 'http://' + ip
          self._score = 100
    
      @property
      def url(self):
          return self._url
    
      @property
      def score(self):
          return self._score
    
      def __lt__(self, other):
          '''
          由于优先队列是返回最小的,而这里分数高的代理优秀
          所以比较时反过来
          '''
          return self._score > other._score
    
      def success(self, time):
          self._score += int(10 / int(time + 1))
    
      def timeoutError(self):
          self._score -= 10
    
      def connectError(self):
          self._score -= 30
    
      def otherError(self):
          self._score -= 50
  • 感觉上,最好的验证方式是直接访问目标网站。除去根本不能用的一部分,代理IP的有效性对不同的目标网站是有区别的,在我的尝试中,豆瓣相比摩拜对代理IP的应对明显更好,这里代码为访问摩拜。在第一轮的筛选中,对每个代理IP,访问两次目标网站,超时时间为10秒,依情况奖惩,保留分数大于50的。总共花费22秒左右时间,排除了一小部分根本不能使用的代理,也对所有代理的分数初步更新。
async def douban(proxy, session):
# 使用代理访问目标网站,并按情况奖惩代理
    try:
        start = time.time()
        async with session.post(mobike_url,
                                data=data,
                                proxy=proxy.url,
                                headers=headers,  # 可以引用到外部的headers
                                timeout=10) as resp:
            end = time.time()
            # print(resp.status)
            if resp.status == 200:
                proxy.success(end - start)
                print('%6.3d' % proxy._score, 'Used time-->', end - start, 's')
            else:
                proxy.otherError()
                print('*****', resp.status, '*****')
    except TimeoutError as te:
        print('%6.3d' % proxy._score, 'timeoutError')
        proxy.timeoutError()
    except ClientConnectionError as ce:
        print('%6.3d' % proxy._score, 'connectError')
        proxy.connectError()
    except Exception as e:
        print('%6.3d' % proxy._score, 'otherError->', e)
        proxy.otherError()
# ClientHttpProxyError

# TCPConnector维持链接池,限制并行连接的总量,当池满了,有请求退出再加入新请求,500和100相差不大
# ClientSession调用TCPConnector构造连接,Session可以共用
# Semaphore限制同时请求构造连接的数量,Semphore充足时,总时间与timeout差不多


async def initDouban():

    conn = aiohttp.TCPConnector(verify_ssl=False,
                                limit=100,  # 连接池在windows下不能太大
                                use_dns_cache=True)
    tasks = []
    async with aiohttp.ClientSession(loop=loop, connector=conn) as session:
        for p in proxies:
            task = asyncio.ensure_future(douban(p, session))
            tasks.append(task)

        responses = asyncio.gather(*tasks)
        await responses
    conn.close()


def firstFilter():
    for i in range(2):
        s = time.time()
        future = asyncio.ensure_future(initDouban())
        loop.run_until_complete(future)
        e = time.time()
        print('----- init time %s-----\n' % i, e - s, 's')

    num = 0
    pq = PriorityQueue()
    for proxy in proxies:
        if proxy._score > 50:
            pq.put_nowait(proxy)
            num += 1
    print('原始ip数:%s' % len(all_ip), '; 筛选后:%s' % num)
    return pq
  • 然后就是正式的访问了,这里我使用了基于堆的asyncio优先队列(非线程安全)。通过asyncio.Semaphore限制并发请求连接的数量,不断地从队列中拿取最优质的代理IP,在访问结束后再将它放回队列。结果是,多个连接不会同时使用一个代理IP,如果代理成功,它将会很快被放回队列,再次使用。(如果需要设置成功代理的使用间隔,可以改为在访问成功后,先释放连接与信号量,然后使用asyncio.sleep(x)等待一段时间再放入优先队列,如果在genDouban函数里实现,可设置为range(concurrency)一定程度上大于Semaphore(concurrency))奖惩一直进行,一开始会有一段筛选的过程,稳定后的输出如下:

pq = firstFilter()


async def genDouban(sem, session):
    # Getter function with semaphore.
    while True:
        async with sem:
            proxy = await pq.get()
            await douban(proxy, session)
            await pq.put(proxy)


async def dynamicRunDouban(concurrency):
    '''
    TCPConnector维持链接池,限制并行连接的总量,当池满了,有请求退出再加入新请求
    ClientSession调用TCPConnector构造连接,Session可以共用
    Semaphore限制同时请求构造连接的数量,Semphore充足时,总时间与timeout差不多
    '''
    conn = aiohttp.TCPConnector(verify_ssl=False,
                                limit=concurrency,
                                use_dns_cache=True)
    tasks = []
    sem = asyncio.Semaphore(concurrency)

    async with aiohttp.ClientSession(loop=loop, connector=conn) as session:
        try:
            for i in range(concurrency):
                task = asyncio.ensure_future(genDouban(sem, session))
                tasks.append(task)

            responses = asyncio.gather(*tasks)
            await responses
        except KeyboardInterrupt:
            print('-----finishing-----\n')
            for task in tasks:
                task.cancel()
            if not conn.closed:
                conn.close()


future = asyncio.ensure_future(dynamicRunDouban(200))
loop.run_until_complete(future)
  • 最后,我们中断程序,查看下代理IP的得分情况:
    scores = [p.score for p in proxies]
    scores.sort(reverse=True)
    print('Most popular IPs:\n ------------\n', scores[:50],
        [i for i in scores if i > 100])
    loop.is_closed()

其它方案概览:

  • 在Scrapy官方文档避免被封的建议提到了Tor - 洋葱路由

    use a pool of rotating IPs. For example, the free Tor project or paid services like ProxyMesh. An open source alterantive is scrapoxy, a super proxy that you can attach your own proxies to.

  • 在知乎python 爬虫 ip池怎么做?的回答中,提到了Squid与修改x-forward-for标签的方法:
    1. 使用squid的cache_peer机制,把这些代理按照一定格式(具体格式参考文档)写入到配置文件中,配置好squid的端口,那么squid就可以帮你调度代理了,而且还可以摒弃失效的代理。
    2. 在访问的http request里添加x-forward-for标签client随机生成,宣称自己是一台透明代理服务器
  • 如何突破豆瓣爬虫限制频率?中提到:

    用带 bid (可以伪造)的 cookie 去访问 - github

其他资料

代码

from selenium import webdriver
import time
import aiohttp
from aiohttp.client_exceptions import ClientConnectionError
from aiohttp.client_exceptions import TimeoutError
import asyncio
from asyncio.queues import PriorityQueue
import chardet
import re
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

headers = {'User-Agent': ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                          'AppleWebKit/537.36 (KHTML, like Gecko) ')}
loop = asyncio.get_event_loop()


class Proxy:
    def __init__(self, ip):
        self._url = 'http://' + ip
        self._score = 100

    @property
    def url(self):
        return self._url

    @property
    def score(self):
        return self._score

    def __lt__(self, other):
        '''
        由于优先队列是返回最小的,而这里分数高的代理优秀
        所以比较时反过来
        '''
        return self._score > other._score

    def success(self, time):
        self._score += int(10 / int(time + 1))

    def timeoutError(self):
        self._score -= 10

    def connectError(self):
        self._score -= 30

    def otherError(self):
        self._score -= 50


def getProxies():
    url = ("http://m.66ip.cn/mo.php?tqsl={proxy_number}")
    url = url.format(proxy_number=10000)
    html = requests.get(url, headers=headers).content
    html = html.decode(chardet.detect(html)['encoding'])
    pattern = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5}'
    all_ip = re.findall(pattern, html)
    if len(all_ip) == 0:
        driver = webdriver.PhantomJS(
            executable_path=r'D:/phantomjs/bin/phantomjs.exe')
        driver.get(url)
        time.sleep(12)  # js等待5秒
        html = driver.page_source
        driver.quit()
        all_ip = re.findall(pattern, html)
    with open('66ip_' + str(time.time()), 'w', encoding='utf-8') as f:
        f.write(html)
    return all_ip


all_ip = set(getProxies()) | set(getProxies())
proxies = [Proxy(proxy) for proxy in all_ip]

mobike_url = "https://mwx.mobike.com/mobike-api/rent/nearbyBikesInfo.do"
data = {  # 请求参数: 纬度,经度!
    'latitude': '33.2',
    'longitude': '113.4',
}
headers = {
    'referer': "https://servicewechat.com/",
}


async def douban(proxy, session):

    try:
        start = time.time()
        async with session.post(mobike_url,
                                data=data,
                                proxy=proxy.url,
                                headers=headers,  # 可以引用到外部的headers
                                timeout=10) as resp:
            end = time.time()
            # print(resp.status)
            if resp.status == 200:
                proxy.success(end - start)
                print('%6.3d' % proxy._score, 'Used time-->', end - start, 's')
            else:
                proxy.otherError()
                print('*****', resp.status, '*****')
    except TimeoutError as te:
        print('%6.3d' % proxy._score, 'timeoutError')
        proxy.timeoutError()
    except ClientConnectionError as ce:
        print('%6.3d' % proxy._score, 'connectError')
        proxy.connectError()
    except Exception as e:
        print('%6.3d' % proxy._score, 'otherError->', e)
        proxy.otherError()
# ClientHttpProxyError

# TCPConnector维持链接池,限制并行连接的总量,当池满了,有请求退出再加入新请求,500和100相差不大
# ClientSession调用TCPConnector构造连接,Session可以共用
# Semaphore限制同时请求构造连接的数量,Semphore充足时,总时间与timeout差不多


async def initDouban():

    conn = aiohttp.TCPConnector(verify_ssl=False,
                                limit=100,  # 连接池在windows下不能太大, <500
                                use_dns_cache=True)
    tasks = []
    async with aiohttp.ClientSession(loop=loop, connector=conn) as session:
        for p in proxies:
            task = asyncio.ensure_future(douban(p, session))
            tasks.append(task)

        responses = asyncio.gather(*tasks)
        await responses
    conn.close()


def firstFilter():
    for i in range(2):
        s = time.time()
        future = asyncio.ensure_future(initDouban())
        loop.run_until_complete(future)
        e = time.time()
        print('----- init time %s-----\n' % i, e - s, 's')

    num = 0
    pq = PriorityQueue()
    for proxy in proxies:
        if proxy._score > 50:
            pq.put_nowait(proxy)
            num += 1
    print('原始ip数:%s' % len(all_ip), '; 筛选后:%s' % num)
    return pq


pq = firstFilter()


async def genDouban(sem, session):
    # Getter function with semaphore.
    while True:
        async with sem:
            proxy = await pq.get()
            await douban(proxy, session)
            await pq.put(proxy)


async def dynamicRunDouban(concurrency):
    '''
    TCPConnector维持链接池,限制并行连接的总量,当池满了,有请求退出再加入新请求
    ClientSession调用TCPConnector构造连接,Session可以共用
    Semaphore限制同时请求构造连接的数量,Semphore充足时,总时间与timeout差不多
    '''
    conn = aiohttp.TCPConnector(verify_ssl=False,
                                limit=concurrency,
                                use_dns_cache=True)
    tasks = []
    sem = asyncio.Semaphore(concurrency)

    async with aiohttp.ClientSession(loop=loop, connector=conn) as session:
        try:
            for i in range(concurrency):
                task = asyncio.ensure_future(genDouban(sem, session))
                tasks.append(task)

            responses = asyncio.gather(*tasks)
            await responses
        except KeyboardInterrupt:
            print('-----finishing-----\n')
            for task in tasks:
                task.cancel()
            if not conn.closed:
                conn.close()


future = asyncio.ensure_future(dynamicRunDouban(200))
loop.run_until_complete(future)


scores = [p.score for p in proxies]
scores.sort(reverse=True)
print('Most popular IPs:\n ------------\n', scores[:50],
      [i for i in scores if i > 100])
loop.is_closed()

访问百度

async def baidu(proxy):
  '''
  验证是否可以访问百度
  '''
  async with aiohttp.ClientSession(loop=loop) as session:
      async with session.get("http://baidu.com",
                             proxy='http://' + proxy,
                             timeout=5) as resp:
          text = await resp.text()
          if 'baidu.com' not in text:
              print(proxy,
                    '\n----\nis bad for baidu.com\n')
              return False
          return True

访问icanhazip

async def testProxy(proxy):
  '''
  http://aiohttp.readthedocs.io/en/stable/client_reference.html#aiohttp.ClientSession.request
  '''
  async with aiohttp.ClientSession(loop=loop) as session:
      async with session.get("http://icanhazip.com",
                             proxy='http://' + proxy,
                             timeout=5) as resp:
          text = await resp.text()
          if len(text) > 20:
              return
          else:
              if await baidu(proxy):
                  firstFilteredProxies.append(proxy)
                  # print('原始:', proxy, '; 结果:', text)

访问HttpBin

async def httpbin(proxy):
  '''
  访问httpbin获取headers详情, 注意访问https 代理仍为http
  参考资料: https://imququ.com/post/x-forwarded-for-header-in-http.html
  http://www.cnblogs.com/wenthink/p/HTTTP_Proxy_TCP_Http_Headers_Check.html
  '''
  async with aiohttp.ClientSession(loop=loop) as session:
      async with session.get("https://httpbin.org/get?show_env=1",
                             proxy='http://' + proxy,
                             timeout=4) as resp:
          json_ = await resp.json()
          origin_ip = json_['origin']
          proxy_ip = json_['headers']['X-Forwarded-For']
          via = json_['headers'].get('Via', None)
          print('原始IP:', origin_ip,
                '; 代理IP:', proxy_ip,
                '---Via:', via)
          if proxy_ip != my_ip and origin_ip == proxy_ip:
              annoy_proxies.append(proxy)

访问豆瓣API

async def douban(proxy):

  async with aiohttp.ClientSession(loop=loop) as session:
      try:
          async with session.get(('https://api.douban.com/v2/movie/top250'
                                  '?count=10'),
                                 proxy='http://' + proxy,
                                 headers=headers,
                                 timeout=4) as resp:
              print(resp.status)
      except TimeoutError as te:
          print(proxy, te, 'timeoutError')
      except ClientProxyConnectionError as pce:
          print(proxy, pce, 'proxyError')
      except ClientConnectionError as ce:
          print(proxy, ce, 'connectError')

循环访问豆瓣导致暂时被封IP

headers = {'User-Agent': ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                        'AppleWebKit/537.36 (KHTML, like Gecko) ')}
while True:
  r = requests.get('http://douban.com', headers=headers)
  print(r.status_code)
  r = requests.get('https://movie.douban.com/j/search_subjects?'
                   'type=movie&tag=%E8%B1%86%E7%93%A3%E9%AB%9',
                   headers=headers)
  print(r.status_code)

本文转载自:http://www.jianshu.com/p/1cd6d34407e2

mickelfeng

mickelfeng

粉丝 234
博文 2769
码字总数 597345
作品 0
成都
高级程序员
私信 提问
#圆桌讨论#python3下如何编写异步爬虫?

#圆桌讨论#python3下如何编写异步爬虫? 以前一直在python2环境下,使用scrapy框架开发爬虫,近期想要把工作环境迁移到python3上来,顺便想要学习一下新的异步协程方面的库,不知爬虫方面有哪...

赤松
2018/10/27
21
0
利用aiohttp制作异步爬虫

简介 asyncio可以实现单线程并发IO操作,是Python中常用的异步处理模块。关于asyncio模块的介绍,笔者会在后续的文章中加以介绍,本文将会讲述一个基于asyncio实现的HTTP框架——aiohttp,它...

技术小能手
2018/11/29
0
0
Jianhui Zhao/rtty

rtty 通过Web浏览器访问你的终端。项目名称里面的“r”是指“反向代理”或者“远程”。它由客户端和服务端组成。 你可以根据MAC地址通过Web浏览器访问你的任意一台终端。 服务端依赖 python...

Jianhui Zhao
2018/01/12
0
0
Python aiohttp爬取必应背景图

Hello,大家好。上周末有事出去了,也没更新文章,所以这回就补一下文章,算是两周合为一篇吧【其实是有点懒】,国庆快乐哦! 页面分析 Bing的首页分析还算简单的,直接利用Chrome定位元素,...

EmptyChan
2017/10/01
0
0
分布式代理爬虫:架构篇

开了专栏这么久,这是专栏第一篇文章,也是一篇广告贴,写给有缘人看。 历时大致两个月,到现在终于完成了分布式代理抓取爬虫,目前开源在了Github上。写这个项目的原因主要有两点,一是自己...

resolvewang
2018/02/27
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Angular 英雄编辑器

应用程序现在有了基本的标题。 接下来你要创建一个新的组件来显示英雄信息并且把这个组件放到应用程序的外壳里去。 创建英雄组件 使用 Angular CLI 创建一个名为 heroes 的新组件。 ng gener...

honeymoose
31分钟前
3
0
Kernel DMA

为什么会有DMA(直接内存访问)?我们知道通常情况下,内存数据跟外设之间的通信是通过cpu来传递的。cpu运行io指令将数据从内存拷贝到外设的io端口,或者从外设的io端口拷贝到内存。由于外设...

yepanl
今天
6
0
hive

一、hive的定义: Hive是一个SQL解析引擎,将SQL语句转译成MR Job,然后再在Hadoop平台上运行,达到快速开发的目的 Hive中的表是纯逻辑表,就只是表的定义,即表的元数据。本质就是Hadoop的目...

霉男纸
今天
3
0
二、Spring Cloud—Eureka(Greenwich.SR1)

注:本系列文章所用工具及版本如下:开发工具(IDEA 2018.3.5),Spring Boot(2.1.3.RELEASE),Spring Cloud(Greenwich.SR1),Maven(3.6.0),JDK(1.8) Eureka: Eureka是Netflix开发...

倪伟伟
昨天
11
0
eclipse常用插件

amaterasUML https://takezoe.github.io/amateras-update-site/ https://github.com/takezoe/amateras-modeler modelGoon https://www.cnblogs.com/aademeng/articles/6890266.html......

大头鬼_yc
昨天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部