批量下载字幕
博客专区 > quminzi 的博客 > 博客详情
批量下载字幕
quminzi 发表于6个月前
批量下载字幕
  • 发表于 6个月前
  • 阅读 4
  • 收藏 0
  • 点赞 0
  • 评论 0

【腾讯云】如何购买服务器最划算?>>>   

需求:下载大量英文字幕

站点: subsmax.com

关键: 主要靠 http proxy 越过下载限制(24个字幕/小时)。另外,尝试多线程加速(对于io密集任务)。requets session 复用 http 连接减轻 http 延迟。

 

import io
import logging
import os
import threading
import time
import zipfile
from queue import Queue

import requests
from freeproxy import (fetch_proxies, from_cn_proxy, init_db, read_proxies,
                       test_proxies, enable_logging)

log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)
if not log.handlers:
    stream = logging.StreamHandler()
    stream.setFormatter(logging.Formatter(
        fmt='[%(levelname)-5s] %(asctime)s %(threadName)s: %(message)s'))
    log.addHandler(stream)


# enable_logging()


class MyProxy(object):
    TEST_URL = 'http://subsmax.com/'

    def __init__(self, test_url=None, load_from_db=True):
        if test_url:
            self.test_url = test_url
        else:
            self.test_url = self.TEST_URL
        init_db()
        self.proxies = set()
        if load_from_db:
            self.proxies.update(read_proxies())  # init from db
        log.debug('loaded %d proxies' % len(self.proxies))

    def fetch(self):
        # return fetch_proxies()
        return from_cn_proxy()

    def test(self, proxies):
        return test_proxies(proxies, timeout=5, single_url=self.test_url)

    def update(self):
        log.debug('updating proxies ...')
        # ret = subprocess.call(['freeproxy', '-l', '-t', self.test_url])
        # self.proxies.extend(read_proxies())
        self.proxies.update(self.test(self.fetch()))
        log.debug('updated %d proxies' % len(self.proxies))
        return self.proxies


def get_proxy():
    # g_lock.acquire()
    # while len(g_proxy.proxies) < TOTAL_THREAD:
    #     g_proxy.update()
    # g_lock.release()
    while not g_proxy.proxies:
        time.sleep(1)
    return g_proxy.proxies.pop()


def change_proxy(session):
    session.proxies.update({'http': get_proxy()})
    log.debug('session changed proxy to %s' % session.proxies)


def get_session():
    s = requests.Session()
    change_proxy(s)
    return s


def load_urls(filename):
    with open(filename) as f:
        for line in f:
            yield line.rstrip()


def is_zipfile(data):
    return zipfile.is_zipfile(io.BytesIO(data))
    # return data[:4] == b'\x50\x4b\x03\x04'  # fast dirty check


def save_zipfile(url, data):
    sub_id = url.rsplit('/', 1)[-1]
    filename = os.path.join(OUT_DIR, sub_id)
    with open(filename, 'wb') as f:
        f.write(data)
    log.info('saved data to %s' % filename)


def download_subtitle(session, url):
    failed = False
    try:
        r = session.get(url, timeout=5)
    except requests.exceptions.RequestException as e:
        log.error('request error: <%s> %s' % (e.__class__.__name__, e))
        failed = True
    else:
        if r.status_code != 200:
            log.warn('response status code: %s' % (r.status_code))
            failed = True
        elif not is_zipfile(r.content):
            log.warn('reponse data not zip-encoding')
            failed = True
    if failed:
        change_proxy(session)
    else:
        save_zipfile(url, r.content)
    return not failed


# def message(s):
#     print('{}: {}'.format(threading.current_thread().name, s))


def download_worker(url_queue):
    session = get_session()
    proxy = session.proxies.get('http')
    q = url_queue
    while 1:
        url = q.get()
        log.debug('proxy={}, url={}'.format(proxy, url))
        if not download_subtitle(session, url):
            q.put(url)
        q.task_done()


def update_proxy_pool():
    log.debug('start')
    check_point = int(TOTAL_THREAD * 1.2)
    while 1:
        g_lock.acquire()
        if len(g_proxy.proxies) <= check_point:
            g_proxy.update()
        g_lock.release()
        time.sleep(60)
    log.debug('end')


def start_proxy_pool():
    proxy_worker = threading.Thread(
        target=update_proxy_pool,
        name='proxy-worker',
    )
    proxy_worker.setDaemon(True)
    proxy_worker.start()


def main():
    jobs = []
    for i in range(TOTAL_THREAD):
        t = threading.Thread(
            group=None,
            target=download_worker,
            name='woker-%d' % i,
            args=(g_url_queue,),
        )
        t.setDaemon(True)
        t.start()
        jobs.append(t)

    log.debug('putting urls in queue ...')
    total_url = 0
    for url in load_urls(LINK_FILENAME):
        g_url_queue.put(url)
        total_url += 1
    log.info('enqueued %d urls' % total_url)

    start_proxy_pool()

    log.debug('waiting url queue to join')
    g_url_queue.join()
    log.debug('all task done')


# def main():
#     urls = load_urls('./en.links')
#     session = get_session()
#     succs = fails = 0
#     for url in urls:
#         if download_subtitle(session, url):
#             succs += 1
#         else:
#             fails += 1
#     log.debug('succs={}, fails={}'.format(succs, fails))


LINK_FILENAME = './todo.links'
OUT_DIR = './episodes'
TOTAL_THREAD = 4
g_proxy = MyProxy(load_from_db=True)
g_url_queue = Queue()
g_lock = threading.Lock()


if __name__ == '__main__':
    main()

 

标签: Subtitle Scrapy Proxy
共有 人打赏支持
粉丝 0
博文 1
码字总数 525
×
quminzi
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: