文档章节

(三) 分布式基础爬虫

acutesun
 acutesun
发布于 2017/07/22 21:50
字数 606
阅读 11
收藏 0
点赞 0
评论 0

在基于前面的基础爬虫,实现分布式的爬虫

节点管理,需要四个队列进行通信

  • url_queue:  URL管理进程将URL传递给爬虫节点的通道
  • result_queue: 用于数据处理进程获取数据的通道
  • conn_queue: 数据处理进程将新的URL传递给 URL管理进程的通道
  • store_queue: 数据处理进程将数据传递给数据存储进程的通道

多个进程协调进行工作

  • url_manager_proc进程: 两个作用:一是将url放入 url_queue队列,让工作节点获取url。二是将数据处理进程提取的urls集合存放到 URLS集合中去
  • result_solve_proc 进程:1. 获取数据后(result_queue.get() )将数据存放到 store_queue给数据存储进程使用。2. 将urls存放到conn_queue给url_manager_proc进程使用
  • store_proc 进程:将数据存储到文件或者数据库中

NodeManager.oy:

from multiprocessing import Queue, Process
from multiprocessing.managers import BaseManager
import time
from DataOutput import DataOutput
from URLManager import UrlManager
class NodeManager(object):
    def start_manager(self, url_que, result_que):
        '''
        创建分布式管理器
        :param url_que: url队列
        :param result_que: 结果队列
        :return: 返回管理器对象
        '''
        # 将队列注册到网络上
        BaseManager.register('get_task_queue', callable=lambda: url_que)
        BaseManager.register('get_result_queue', callable=lambda: result_que)

        manager = BaseManager(address=('', 8000), authkey='123'.encode('utf-8'))
        return manager

    def url_manager_proc(self, url_que, conn_que, root_url):
        url_manager = UrlManager()
        url_manager.add_new_url(root_url)
        while True:
            while url_manager.has_new_url():
                new_url = url_manager.get_new_url()
                # 将新的url发给工作节点
                url_que.put(new_url)
                print('old_url=', url_manager.old_urls_size())
                if url_manager.old_urls_size()>2000:
                    url_que.put('end')
                    print('控制节点发出结束通知')
                    # 关闭管理节点,同时存储set状态
                    url_manager.save_progress('new_urls.txt', url_manager.new_urls)
                    url_manager.save_progress('old_urls.txt', url_manager.old_urls)
                    return
            # 将从result_solve_proc  获取的urls添加到URL管理器
            try:
                if not conn_que.empty():
                    urls = conn_que.get()
                    for url in urls:
                        url_manager.add_new_url(url)

            except BaseException:
                time.sleep(0.1)

    def result_solve_proc(self, result_que, conn_que, store_que):
        while True:
            try:
                if not result_que.empty():
                    content = result_que.get(True)
                    if content['new_urls'] == 'end':
                        # 结果分析进程接收通知然后结束
                        print('结果分析进程接收通知然后结束')
                        store_que.put('end')
                        return
                    conn_que.put(content['new_urls'])  # url为set类型
                    store_que.put(content['data'])     # 解析的数据为dict类型
                else:
                    time.sleep(0.1)
            except BaseException as e:
                time.sleep(0.1)

    def store_proc(self, store_que):
        out = DataOutput()
        while True:
            if not store_que.empty():
                data = store_que.get()
                if data == 'end':
                    print('存储进程接收通知结束')
                    out.ouput_end(out.filepath)
                    return
                out.store_data(data)
            else:
                time.sleep(0.1)

if __name__ == '__main__':
    url_que = Queue()
    result_que = Queue()
    store_que = Queue()
    conn_que = Queue()

    node = NodeManager()
    manager = node.start_manager(url_que, result_que)
    # 创建URL管理进程, 数据提取进程, 数据存储进程
    url_manager_proc = Process(target=node.url_manager_proc,
                               args=(url_que, conn_que, 'http://baike.baidu.com/view/284853.htm'))
    result_solve_proc = Process(target=node.result_solve_proc, args=(result_que, conn_que, store_que))
    store_proc = Process(target=node.store_proc, args=(store_que, ))

    # 启动进程和分布式管理器
    url_manager_proc.start()
    result_solve_proc.start()
    store_proc.start()
    manager.get_server().serve_forever()

© 著作权归作者所有

共有 人打赏支持
acutesun
粉丝 0
博文 71
码字总数 83152
作品 0
程序员
Python3爬虫视频学习教程

大家好哈,现在呢静觅博客已经两年多啦,可能大家过来更多看到的是爬虫方面的博文,首先非常感谢大家的支持,希望我的博文对大家有帮助! 最近,主要的任务就是开发性感美女图片大全,使用p...

yangjiyue0520
2017/11/18
0
0
一个月入门Python爬虫,快速获取大规模数据

数据是创造和决策的原材料,高质量的数据都价值不菲。而利用爬虫,我们可以获取大量的价值数据,经分析可以发挥巨大的价值,比如: 豆瓣、知乎:爬取优质答案,筛选出各话题下热门内容,探索...

Python开发者
04/25
0
0
python-02:学习路线

随时更新的学习路线 1. python基础知识 麦子学院的几个短视频 python 爬虫基础 2. 以python爬虫作为入手点深入学习 1. 爬虫基础知识,最简单的爬虫程序,理解最简单的爬虫程序 2. 丑事百科实...

达岭凹老大
2015/11/23
270
0
分布式网络爬虫实例——获取静态数据和动态数据

前言 刚刚介绍完基于PyHusky的分布式爬虫原理及实现,让我们具备了设计分布式网络爬虫方便地调动计算资源来实现高效率的数据获取能力。可以说,有了前面的基础,已经能够解决互联网上的绝大部...

happengft
2017/04/11
0
0
python3 scrapy_redis 分布式爬取房天下存mongodb

(一)scrapy_redis 简单介绍 scrapy_redis基于scrapy框架的基础上集成了redis,通过了redis实现了去重,多台服务器进行分布式的爬取数据。 (二)scrapy_redis 简单配置 (1)settings.py 文...

徐代龙
05/02
0
0
爬虫面试遇到外行面试官

爬虫岗位很少,我总共也就面过五六家,其中某金融互联网公司技术最好,虽然他们的爬虫人员也是后来转的 问题一:如果连接断了怎么办? 什么连接呢,猜是tcp 连接吧,tcp连接不是时时联通的,...

HZCoder
2016/01/18
1K
0
python全栈和python自动化课程的区别在哪?

老男孩算是国内组早的做python培训的机构了,下面小编对于python自动化课程及全栈课程做了一个总结,希望能帮到你们: python全栈开发: 适合人群:应届本科生,专科,及零基础学员 学习基础...

运维自动化
2017/04/25
0
0
基于PyHusky的分布式爬虫原理及实现

原理 爬虫是我们获取互联网数据的一个非常有效的方法,而分布式爬虫则是利用许多台机器协调工作来加快抓取数据效率的不二途径。分布式爬虫是由访问某些原始网址开始,在获取这些网址的内容后...

happengft
2017/04/06
0
0
scrapy-redis分布式爬虫的搭建过程

scrapy-redis分布式爬虫的搭建过程 Scrapy 是一个通用的爬虫框架,但是不支持分布式,Scrapy-redis是为了更方便地实现Scrapy分布式爬取,而提供了一些以redis为基础的组件(仅有组件)。 pip ...

zwq912318834
2017/12/20
0
0
2个月精通Python爬虫——3大爬虫框架+6场实战+分布式爬虫,包教包会

阿里云大学在线工作坊上线,原理精讲+实操演练,让你真正掌握云计算、大数据技能。 在第一批上线的课程中,有一个Python爬虫的课程,畅销书《精通Python网络爬虫》作者韦玮,带你两个月从入门...

云木西
06/27
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

扫码二维码跳转到某个网站

添加maven依赖 <dependency><groupId>com.google.zxing</groupId><artifactId>core</artifactId><version>3.0.0</version></dependency><dependency><groupId>com.goog......

gaomq
7分钟前
0
0
Windows平台下搭建Git服务器的图文教程

Git没有客户端服务器端的概念,但是要共享Git仓库,就需要用到SSH协议(FTP , HTTPS , SFTP等协议也能实现Git共享,此文档不讨论),但是SSH有客户端服务器端,所以在windows下的开发要把自己...

MKChan
13分钟前
0
0
告警系统主脚本&告警系统配置文件&告警系统监控项目

20.20 告警系统主脚本 准备工作 定义监控系统的各个目录,然后再去定义主脚本,因为是分布式的,所以需要每一台机器都需要定义,事先创建好各个脚本和各个目录,随后脚本直接拷贝过去即可,然...

影夜Linux
14分钟前
0
0
谈谈神秘的ES6——(一)初识ECMAScript

谈谈神秘的ES6——(一)初识ECMAScript 在《零基础入门JavaScript》我们就说过,ECMAScript是JavaScript的核心,是JavaScript语法和语义的解释器,同时也是一个标准。而ECMAScript标准其实也...

JandenMa
今天
1
0
第16章 Tomcat配置

16.1 Tomcat介绍 ####Tomcat介绍 LNMP架构针对的开发语言是PHP语言,php 是一门开发web程序非常流行的语言,早些年流行的是asp,在Windows平台上运行的一种编程语言,但安全性差,就网站开发...

Linux学习笔记
今天
0
0
流利阅读笔记29-20180718待学习

高等教育未来成谜,前景到底在哪里? Ray 2018-07-18 1.今日导读 在这个信息爆炸的年代,获取知识是一件越来越容易的事情。人们曾经认为,如此的时代进步会给高等教育带来众多便利。但事实的...

aibinxiao
今天
12
0
OSChina 周三乱弹 —— 你被我从 osc 老婆们名单中踢出了

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @小鱼丁:分享五月天的单曲《后来的我们 (电影《后来的我们》片名曲)》: 《后来的我们 (电影《后来的我们》片名曲)》- 五月天 手机党少年们想...

小小编辑
今天
604
19
Spring Boot Admin 2.0开箱体验

概述 在我之前的 《Spring Boot应用监控实战》 一文中,讲述了如何利用 Spring Boot Admin 1.5.X 版本来可视化地监控 Spring Boot 应用。说时迟,那时快,现在 Spring Boot Admin 都更新到 ...

CodeSheep
今天
5
0
Python + Selenium + Chrome 使用代理 auth 的用户名密码授权

米扑代理,全球领导的代理品牌,专注代理行业近十年,提供开放、私密、独享代理,并可免费试用 米扑代理官网:https://proxy.mimvp.com 本文示例,是结合米扑代理的私密、独享、开放代理,专...

sunboy2050
今天
0
0
实现异步有哪些方法

有哪些方法可以实现异步呢? 方式一:java 线程池 示例: @Test public final void test_ThreadPool() throws InterruptedException { ScheduledThreadPoolExecutor scheduledThre......

黄威
今天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部