python 多进程爬虫

原创
2017/03/27 15:59
阅读数 670

1.在用python抓取图片的时候,很慢,因为是国外的网站,所以考虑使用分布式多进程的方式来提高抓取效率。

2.废话不多说,直接上demo,这个是核心的抓取图片类,wpd.py

# -*- coding:utf-8 -*-
import urllib
import urllib2
import re
import thread
import time

import logging


class Wpd:
    # 初始化方法,定义一些变量
    def __init__(self):
        # 定义一个Handler打印INFO及以上级别的日志到sys.stderr
        console = logging.StreamHandler()
        # 设置日志打印格式
        formatter = logging.Formatter('%(asctime)s-[%(name)s]-[%(funcName)s] - %(levelname)s - %(message)s')
        console.setFormatter(formatter)
        # 将定义好的console日志handler添加到root logg
        self.wpdlogger = logging.getLogger('wpd application')
        self.wpdlogger.setLevel(logging.INFO)
        self.wpdlogger.addHandler(console)

        self.pageIndex = 2
        self.hasReadPage = 0
        self.user_agent = 'Mozilla/4.0 (compatible; MSIE 5.5; Windows NT)'
        # 初始化headers
        self.headers = {'User-Agent': self.user_agent}
        # 存放段子的变量,每一个元素是每一页的段子们
        self.stories = []
        # 存放程序是否继续运行的变量
        self.enable = False

    # 传入某一页的索引获得页面代码
    def getPage(self, pageIndex):
        try:
            url = 'http://www.2wpd.com/advanced-search?q=&category=2&resolution=&license=&page=' + str(pageIndex)
            # 构建请求的request
            request = urllib2.Request(url, headers=self.headers)
            # 利用urlopen获取页面代码
            response = urllib2.urlopen(request)
            # 将页面转化为UTF-8编码
            pageCode = response.read().decode('utf-8')
            return pageCode

        except urllib2.URLError, e:
            if hasattr(e, "reason"):
                self.wpdlogger.error(u"连接原服务器失败,错误原因"+e.reason)
                self.wpdlogger.error(url)
                return None

    def getdetailPage(self, prefix):
        try:
            url = 'http://www.2wpd.com/landscape/'+prefix+'.htm'
            # 构建请求的request
            request = urllib2.Request(url, headers=self.headers)
            # 利用urlopen获取页面代码
            response = urllib2.urlopen(request)
            # 将页面转化为UTF-8编码
            pageCode = response.read().decode('utf-8')
            return pageCode

        except urllib2.URLError, e:
            if hasattr(e, "reason"):
                self.wpdlogger.error(u"连接原服务器失败,错误原因"+e.reason)
                self.wpdlogger.error(url)
                return None

    # 获取图片并存入
    def getImg(self, html):
        flag =False
        prefix = html.lower();
        pageinfo = self.getdetailPage(prefix)

        if not pageinfo:
            return None
        # pattern = re.compile('<div.*?author">.*?<a.*?<img.*?>(.*?)</a>.*?<div.*?'+
        #                  'content">(.*?)<!--(.*?)-->.*?</div>(.*?)<div class="stats.*?class="number">(.*?)</i>',re.S)
        pattern = re.compile('<a href="/preview/(.*?).htm".*?</a>',
                             re.S)
        items = re.findall(pattern, pageinfo)
        for item in items:
            #仅爬取1280*800图片
            if re.search('1280x800',item):
                if len(items)>0:
                    imagesrc = 'http://www.2wpd.com/images/%s.jpg' % item
                    flag = self.download(imagesrc,prefix)
                    break
        return flag
    def download(self,imagesrc,prefix):
        return urllib.urlretrieve(imagesrc, 'D:\wpd\%s.jpg' % prefix)

    # 传入某一页代码
    def getPageItems(self,pageindex):
        pi =self.hasReadPage+1
        for pagenumber in range(pi,pageindex):
            self.hasReadPage=pi
            pageCode = self.getPage(pagenumber)
            pageStories = []
            if not pageCode:
                self.wpdlogger.error(u"页面加载失败,错误原因")
                return None
            pattern = re.compile('<img itemprop=".*?src="/uploads/wallpapers/.*?/.*?/.*?/(.*?)/.*?alt="(.*?)" .*?>',re.S)
            items = re.findall(pattern, pageCode)

            # 用来存储每页图片alt
            pageAlt = []
            # 遍历正则表达式匹配的信息
            for item in items:
                # 是否含有图片
                # haveImg = re.search("img",item[3])
                # #如果不含有图片,把它加入list中
                text = re.sub("\s+", "-", item[1])
                text = re.sub("-Wallpaper", "", text)
                text = text + '-' + item[0]
                # self.getImg(text)
                pageStories.append(text);
                # replaceBR = re.compile('<br/>')

        return pageStories

    # 开始方法
    def start(self):
        return self.getPageItems()

2.抓取的图片服务端,taskmanager.py,这里创建了两个队列,一个发送下载任务的队列、另一个接受下载结果的队列。

# -*- coding:utf-8 -*-

#################分布式爬虫demo,manager###########################
from spider import wpd
import threading
import random, time, Queue,logging
import dill
from multiprocessing.managers import BaseManager

# 定义一个Handler打印INFO及以上级别的日志到sys.stderr
console = logging.StreamHandler()
# 设置日志打印格式
formatter = logging.Formatter('%(asctime)s -[%(threadName)s] - %(levelname)s - %(message)s')
console.setFormatter(formatter)
# 将定义好的console日志handler添加到root logg
logger = logging.getLogger('manager')
logger.setLevel(logging.INFO)
logger.addHandler(console)

# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):
    pass
def make_server_manager():

    # 发送任务的队列:
    task_queue = Queue.Queue()
    # 接收结果的队列:
    result_queue = Queue.Queue()

    # 把两个Queue都注册到网络上, callable参数关联了Queue对象:
    QueueManager.register('get_task_queue', callable=lambda: task_queue)
    QueueManager.register('get_result_queue', callable=lambda: result_queue)
    # 绑定端口5000, 设置验证码'abc':
    manager = QueueManager(address=('127.0.0.1', 5000), authkey='abc')
    # 启动Queue:
    manager.start()
    t1 = threading.Thread(target=startTask, name='TaskQueue',args=(manager,))
    t2 = threading.Thread(target=startresultQueue, name='ResultQueue',args=(manager,))
    t1.start()
    t2.start()
    # t1.join()
    # t2.join()

def startTask(manager):

    # 初始化爬虫
    spider1 = wpd.Wpd()
    # 获得通过网络访问的Queue对象:
    task = manager.get_task_queue()
    # 爬取页数:
    page =10
    n=1
    while page>=n:

        logger.info(u'读取第 %d页....' % n)
        n += 1
        imgs = spider1.getPageItems(n)
        for v in imgs:
            # n = random.randint(0, 10000)
            logger.info(u'下载任务放入队列 %s...' % v)
            task.put(v)
# 从result队列读取结果:
def startresultQueue(manager):
    result = manager.get_result_queue()
    logger.info(u'尝试获取下次结果...')
    while True:
        try:
            r = result.get(timeout=10)
            logger.info(u'结果: %s' % r)
        except Queue.Empty:
            logger.warning('task queue is empty.')
    # 关闭:
    manager.shutdown()

if __name__=='__main__':
    make_server_manager()

3.启动多个worker下载图片,并把下载结果返回给服务端

# -*- coding:utf-8 -*-
#################分布式爬虫demo,worker###########################3
import time, sys, Queue
from multiprocessing.managers import BaseManager
from multiprocessing import Process
import logging

from spider import wpd

# 创建类似的QueueManager:
class QueueManager(BaseManager):
    pass

def start_client():
    # 定义一个Handler打印INFO及以上级别的日志到sys.stderr
    console = logging.StreamHandler()
    # 设置日志打印格式
    formatter = logging.Formatter('%(asctime)s -[%(name)s] - %(levelname)s - %(message)s')
    console.setFormatter(formatter)
    # 将定义好的console日志handler添加到root logg
    logger = logging.getLogger('client')
    logger.setLevel(logging.INFO)
    logger.addHandler(console)

    # 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
    QueueManager.register('get_task_queue')
    QueueManager.register('get_result_queue')

    # 连接到服务器,也就是运行taskmanager.py的机器:
    server_addr = '127.0.0.1'
    logger.info('Connect to server %s...' % server_addr)
    # 端口和验证码注意保持与taskmanager.py设置的完全一致:
    m = QueueManager(address=(server_addr, 5000), authkey='abc')
    # 从网络连接:
    m.connect()
    # 获取Queue的对象:
    task = m.get_task_queue()
    result = m.get_result_queue()

    spider1 = wpd.Wpd()
    # 从task队列取任务,并把结果写入result队列:
    while True:
        try:
            n = task.get()
            logger.info(u'开始下载图片 %s...' % n)
            flag = spider1.getImg(n)
            #result.put('cuck')
            r = u'图片 '+ n +u' 下载 失败'
            if flag:
                r = u'图片 ' + n + u' 下载 成功'
                logger.info(u'下载图片完成 %s.............finish' % n)
            result.put(r)
        except Queue.Empty:
            logger.info('task queue is empty.')

if __name__ == '__main__':
    #启动10个进程做这个事情
    for n in range(10):
        p1 = Process(target=start_client)
        p1.start()

4,分别启动worker和mananger,获取结果

客户端运行截图

服务器端运行截图

展开阅读全文
打赏
1
3 收藏
分享
加载中
很专业,可不可以和我这样的小白多多分享一下思路啊,经验啊什么的。。。
2017/07/20 17:55
回复
举报
更多评论
打赏
1 评论
3 收藏
1
分享
返回顶部
顶部