爬虫代理小记与aiohttp代理尝试

总结了一些爬虫代理的资料和知识,并尝试使用asyncio和aiohttp使用代理ip访问目标网站,按代理IP的访问效果实时更新代理IP得分,初始获取3000左右代理IP,在稳定后,对摩拜单车信息的访问可以达到40次/秒-100次/秒。

代理IP方案简述

  • 快速抓取网站时,要应对IP每分钟访问次数有限制甚至会封IP的服务器,使用代理IP可以帮助我们。
  • Kaito爬虫代理服务讲得很清楚,推荐。
  • Github上现成的开源代理爬虫也不少,如:
    qiyeboy/IPProxyPool
    jhao104/proxy_pool
    awolfly9/IPProxyTool
    fancoo/Proxy
    derekhe/mobike-crawler/modules/ProxyProvider.py
  • 思路也都很清楚,从更多的代理网站上爬取免费代理,存入自己的数据库,定时更新。在拿到代理IP后,验证该代理的有效性。并且提供简单的API来获取代理IP。
  • 七夜的博客python开源IP代理池--IPProxys详细地阐释了自己的代码。
  • 大部分的代理网站的爬取还是比较简单的,在上述开源的代码中包含了不少代理网站的爬取与解析。困难点的有js反爬机制,也都被用selenium操作无头webkit或者js代码解析以及python的js代码执行库所解决。此外有趣的是,在上面的开源代码中出现了用爬取得到的代理来访问代理网站的情况。
  • 定时刷新代理,有自定义的代理定时刷新模块,也可用celery定时任务。
  • 验证有效性的方式有:
  • API的提供可以用BaseHTTPServer拓展下,也可用简便的flask或者Django加上插件提供restful api服务。
  • 对于免费的代理IP来说,最重要的一是量大,就算有很大比例无效的,还是能拿到一些高质量的代理。一个网站的未筛选代理能有几千个,多个网站就很可观了,当然要考虑到重复。
  • 再就是代理IP的筛选机制,不少开源库都添加了评分机制,这是非常重要的。例如利用对累计超时次数以及成功率的加权来评判代理IP的质量。在每次使用后都对代理IP的情况进行评价,以此来刷新数据库,方便下一次选取优质的代理IP。
  • 如何对代理IP进行评价,在成功和失败的各种情况中如何奖惩,筛选出最优质的代理IP是非常重要的。
  • 此外,每个代理IP的使用也要考虑是否要设置一定的使用间隔,避免过于频繁导致失效。

尝试

  • 自然,首先要做的就是从免费代理网站上获取大量代理IP,我选择了最方便的66ip,接口很简单,一次性访问可以拿到3000左右的代理,当然,频繁访问会导致js反爬机制,这时再简单地使用selenium+phantomJs即可。
    url = ("http://m.66ip.cn/mo.php?tqsl={proxy_number}")
    url = url.format(proxy_number=10000)
    html = requests.get(url, headers=headers).content
    html = html.decode(chardet.detect(html)['encoding'])
    pattern = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5}'
    all_ip = re.findall(pattern, html)
  • 然后就是设置代理IP的奖惩机制,我参考了摩拜单车爬虫源码及解析使用类,看起来简单清晰,每个代理IP对象拥有自己的ip与分数,@property令它们可以被点操作符访问。初始分数为100分,这里分数都弄成整数。如果代理成功,按照延时的大小来给予奖励,即调用代理的相应方法,最高10分,而超时扣10分,连接错误扣30分,其它错误扣50分(可以酌情修改)。为了便于代理IP之间的比较,修改了__lt__方法。
class Proxy:
    def __init__(self, ip):
        self._url = 'http://' + ip
        self._score = 100

    @property
    def url(self):
        return self._url

    @property
    def score(self):
        return self._score

    def __lt__(self, other):
        '''
        由于优先队列是返回最小的,而这里分数高的代理优秀
        所以比较时反过来
        '''
        return self._score > other._score

    def success(self, time):
        self._score += int(10 / int(time + 1))

    def timeoutError(self):
        self._score -= 10

    def connectError(self):
        self._score -= 30

    def otherError(self):
        self._score -= 50
  • 感觉上,最好的验证方式是直接访问目标网站。除去根本不能用的一部分,代理IP的有效性对不同的目标网站是有区别的,在我的尝试中,豆瓣相比摩拜对代理IP的应对明显更好,这里代码为访问摩拜。在第一轮的筛选中,对每个代理IP,访问两次目标网站,超时时间为10秒,依情况奖惩,保留分数大于50的。总共花费22秒左右时间,排除了一小部分根本不能使用的代理,也对所有代理的分数初步更新。
async def douban(proxy, session):
# 使用代理访问目标网站,并按情况奖惩代理
    try:
        start = time.time()
        async with session.post(mobike_url,
                                data=data,
                                proxy=proxy.url,
                                headers=headers,  # 可以引用到外部的headers
                                timeout=10) as resp:
            end = time.time()
            # print(resp.status)
            if resp.status == 200:
                proxy.success(end - start)
                print('%6.3d' % proxy._score, 'Used time-->', end - start, 's')
            else:
                proxy.otherError()
                print('*****', resp.status, '*****')
    except TimeoutError as te:
        print('%6.3d' % proxy._score, 'timeoutError')
        proxy.timeoutError()
    except ClientConnectionError as ce:
        print('%6.3d' % proxy._score, 'connectError')
        proxy.connectError()
    except Exception as e:
        print('%6.3d' % proxy._score, 'otherError->', e)
        proxy.otherError()
# ClientHttpProxyError

# TCPConnector维持链接池,限制并行连接的总量,当池满了,有请求退出再加入新请求,500和100相差不大
# ClientSession调用TCPConnector构造连接,Session可以共用
# Semaphore限制同时请求构造连接的数量,Semphore充足时,总时间与timeout差不多


async def initDouban():

    conn = aiohttp.TCPConnector(verify_ssl=False,
                                limit=100,  # 连接池在windows下不能太大
                                use_dns_cache=True)
    tasks = []
    async with aiohttp.ClientSession(loop=loop, connector=conn) as session:
        for p in proxies:
            task = asyncio.ensure_future(douban(p, session))
            tasks.append(task)

        responses = asyncio.gather(*tasks)
        await responses
    conn.close()


def firstFilter():
    for i in range(2):
        s = time.time()
        future = asyncio.ensure_future(initDouban())
        loop.run_until_complete(future)
        e = time.time()
        print('----- init time %s-----\n' % i, e - s, 's')

    num = 0
    pq = PriorityQueue()
    for proxy in proxies:
        if proxy._score > 50:
            pq.put_nowait(proxy)
            num += 1
    print('原始ip数:%s' % len(all_ip), '; 筛选后:%s' % num)
    return pq
  • 然后就是正式的访问了,这里我使用了基于堆的asyncio优先队列(非线程安全)。通过asyncio.Semaphore限制并发请求连接的数量,不断地从队列中拿取最优质的代理IP,在访问结束后再将它放回队列。结果是,多个连接不会同时使用一个代理IP,如果代理成功,它将会很快被放回队列,再次使用。(如果需要设置成功代理的使用间隔,可以改为在访问成功后,先释放连接与信号量,然后使用asyncio.sleep(x)等待一段时间再放入优先队列,如果在genDouban函数里实现,可设置为range(concurrency)一定程度上大于Semaphore(concurrency))奖惩一直进行,一开始会有一段筛选的过程,稳定后的输出如下:
pq = firstFilter()


async def genDouban(sem, session):
    # Getter function with semaphore.
    while True:
        async with sem:
            proxy = await pq.get()
            await douban(proxy, session)
            await pq.put(proxy)


async def dynamicRunDouban(concurrency):
    '''
    TCPConnector维持链接池,限制并行连接的总量,当池满了,有请求退出再加入新请求
    ClientSession调用TCPConnector构造连接,Session可以共用
    Semaphore限制同时请求构造连接的数量,Semphore充足时,总时间与timeout差不多
    '''
    conn = aiohttp.TCPConnector(verify_ssl=False,
                                limit=concurrency,
                                use_dns_cache=True)
    tasks = []
    sem = asyncio.Semaphore(concurrency)

    async with aiohttp.ClientSession(loop=loop, connector=conn) as session:
        try:
            for i in range(concurrency):
                task = asyncio.ensure_future(genDouban(sem, session))
                tasks.append(task)

            responses = asyncio.gather(*tasks)
            await responses
        except KeyboardInterrupt:
            print('-----finishing-----\n')
            for task in tasks:
                task.cancel()
            if not conn.closed:
                conn.close()


future = asyncio.ensure_future(dynamicRunDouban(200))
loop.run_until_complete(future)
  • 最后,我们中断程序,查看下代理IP的得分情况:
scores = [p.score for p in proxies]
scores.sort(reverse=True)
print('Most popular IPs:\n ------------\n', scores[:50],
      [i for i in scores if i > 100])
loop.is_closed()

其它方案概览:

  • 在Scrapy官方文档避免被封的建议提到了Tor - 洋葱路由

    use a pool of rotating IPs. For example, the free Tor project or paid services like ProxyMesh. An open source alterantive is scrapoxy, a super proxy that you can attach your own proxies to.

  • 在知乎python 爬虫 ip池怎么做?的回答中,提到了Squid与修改x-forward-for标签的方法:
    1. 使用squid的cache_peer机制,把这些代理按照一定格式(具体格式参考文档)写入到配置文件中,配置好squid的端口,那么squid就可以帮你调度代理了,而且还可以摒弃失效的代理。
    2. 在访问的http request里添加x-forward-for标签client随机生成,宣称自己是一台透明代理服务器
  • 如何突破豆瓣爬虫限制频率?中提到:

    用带 bid (可以伪造)的 cookie 去访问 - github

其他资料

代码

from selenium import webdriver
import time
import aiohttp
from aiohttp.client_exceptions import ClientConnectionError
from aiohttp.client_exceptions import TimeoutError
import asyncio
from asyncio.queues import PriorityQueue
import chardet
import re
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)

headers = {'User-Agent': ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                          'AppleWebKit/537.36 (KHTML, like Gecko) ')}
loop = asyncio.get_event_loop()


class Proxy:
    def __init__(self, ip):
        self._url = 'http://' + ip
        self._score = 100

    @property
    def url(self):
        return self._url

    @property
    def score(self):
        return self._score

    def __lt__(self, other):
        '''
        由于优先队列是返回最小的,而这里分数高的代理优秀
        所以比较时反过来
        '''
        return self._score > other._score

    def success(self, time):
        self._score += int(10 / int(time + 1))

    def timeoutError(self):
        self._score -= 10

    def connectError(self):
        self._score -= 30

    def otherError(self):
        self._score -= 50


def getProxies():
    url = ("http://m.66ip.cn/mo.php?tqsl={proxy_number}")
    url = url.format(proxy_number=10000)
    html = requests.get(url, headers=headers).content
    html = html.decode(chardet.detect(html)['encoding'])
    pattern = r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}:\d{1,5}'
    all_ip = re.findall(pattern, html)
    if len(all_ip) == 0:
        driver = webdriver.PhantomJS(
            executable_path=r'D:/phantomjs/bin/phantomjs.exe')
        driver.get(url)
        time.sleep(12)  # js等待5秒
        html = driver.page_source
        driver.quit()
        all_ip = re.findall(pattern, html)
    with open('66ip_' + str(time.time()), 'w', encoding='utf-8') as f:
        f.write(html)
    return all_ip


all_ip = set(getProxies()) | set(getProxies())
proxies = [Proxy(proxy) for proxy in all_ip]

mobike_url = "https://mwx.mobike.com/mobike-api/rent/nearbyBikesInfo.do"
data = {  # 请求参数: 纬度,经度!
    'latitude': '33.2',
    'longitude': '113.4',
}
headers = {
    'referer': "https://servicewechat.com/",
}


async def douban(proxy, session):

    try:
        start = time.time()
        async with session.post(mobike_url,
                                data=data,
                                proxy=proxy.url,
                                headers=headers,  # 可以引用到外部的headers
                                timeout=10) as resp:
            end = time.time()
            # print(resp.status)
            if resp.status == 200:
                proxy.success(end - start)
                print('%6.3d' % proxy._score, 'Used time-->', end - start, 's')
            else:
                proxy.otherError()
                print('*****', resp.status, '*****')
    except TimeoutError as te:
        print('%6.3d' % proxy._score, 'timeoutError')
        proxy.timeoutError()
    except ClientConnectionError as ce:
        print('%6.3d' % proxy._score, 'connectError')
        proxy.connectError()
    except Exception as e:
        print('%6.3d' % proxy._score, 'otherError->', e)
        proxy.otherError()
# ClientHttpProxyError

# TCPConnector维持链接池,限制并行连接的总量,当池满了,有请求退出再加入新请求,500和100相差不大
# ClientSession调用TCPConnector构造连接,Session可以共用
# Semaphore限制同时请求构造连接的数量,Semphore充足时,总时间与timeout差不多


async def initDouban():

    conn = aiohttp.TCPConnector(verify_ssl=False,
                                limit=100,  # 连接池在windows下不能太大, <500
                                use_dns_cache=True)
    tasks = []
    async with aiohttp.ClientSession(loop=loop, connector=conn) as session:
        for p in proxies:
            task = asyncio.ensure_future(douban(p, session))
            tasks.append(task)

        responses = asyncio.gather(*tasks)
        await responses
    conn.close()


def firstFilter():
    for i in range(2):
        s = time.time()
        future = asyncio.ensure_future(initDouban())
        loop.run_until_complete(future)
        e = time.time()
        print('----- init time %s-----\n' % i, e - s, 's')

    num = 0
    pq = PriorityQueue()
    for proxy in proxies:
        if proxy._score > 50:
            pq.put_nowait(proxy)
            num += 1
    print('原始ip数:%s' % len(all_ip), '; 筛选后:%s' % num)
    return pq


pq = firstFilter()


async def genDouban(sem, session):
    # Getter function with semaphore.
    while True:
        async with sem:
            proxy = await pq.get()
            await douban(proxy, session)
            await pq.put(proxy)


async def dynamicRunDouban(concurrency):
    '''
    TCPConnector维持链接池,限制并行连接的总量,当池满了,有请求退出再加入新请求
    ClientSession调用TCPConnector构造连接,Session可以共用
    Semaphore限制同时请求构造连接的数量,Semphore充足时,总时间与timeout差不多
    '''
    conn = aiohttp.TCPConnector(verify_ssl=False,
                                limit=concurrency,
                                use_dns_cache=True)
    tasks = []
    sem = asyncio.Semaphore(concurrency)

    async with aiohttp.ClientSession(loop=loop, connector=conn) as session:
        try:
            for i in range(concurrency):
                task = asyncio.ensure_future(genDouban(sem, session))
                tasks.append(task)

            responses = asyncio.gather(*tasks)
            await responses
        except KeyboardInterrupt:
            print('-----finishing-----\n')
            for task in tasks:
                task.cancel()
            if not conn.closed:
                conn.close()


future = asyncio.ensure_future(dynamicRunDouban(200))
loop.run_until_complete(future)


scores = [p.score for p in proxies]
scores.sort(reverse=True)
print('Most popular IPs:\n ------------\n', scores[:50],
      [i for i in scores if i > 100])
loop.is_closed()
  • 访问百度
async def baidu(proxy):
    '''
    验证是否可以访问百度
    '''
    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.get("http://baidu.com",
                               proxy='http://' + proxy,
                               timeout=5) as resp:
            text = await resp.text()
            if 'baidu.com' not in text:
                print(proxy,
                      '\n----\nis bad for baidu.com\n')
                return False
            return True
  • 访问icanhazip
async def testProxy(proxy):
    '''
    http://aiohttp.readthedocs.io/en/stable/client_reference.html#aiohttp.ClientSession.request
    '''
    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.get("http://icanhazip.com",
                               proxy='http://' + proxy,
                               timeout=5) as resp:
            text = await resp.text()
            if len(text) > 20:
                return
            else:
                if await baidu(proxy):
                    firstFilteredProxies.append(proxy)
                    # print('原始:', proxy, '; 结果:', text)
  • 访问HttpBin
async def httpbin(proxy):
    '''
    访问httpbin获取headers详情, 注意访问https 代理仍为http
    参考资料: https://imququ.com/post/x-forwarded-for-header-in-http.html
    http://www.cnblogs.com/wenthink/p/HTTTP_Proxy_TCP_Http_Headers_Check.html
    '''
    async with aiohttp.ClientSession(loop=loop) as session:
        async with session.get("https://httpbin.org/get?show_env=1",
                               proxy='http://' + proxy,
                               timeout=4) as resp:
            json_ = await resp.json()
            origin_ip = json_['origin']
            proxy_ip = json_['headers']['X-Forwarded-For']
            via = json_['headers'].get('Via', None)
            print('原始IP:', origin_ip,
                  '; 代理IP:', proxy_ip,
                  '---Via:', via)
            if proxy_ip != my_ip and origin_ip == proxy_ip:
                annoy_proxies.append(proxy)
  • 访问豆瓣API
async def douban(proxy):

    async with aiohttp.ClientSession(loop=loop) as session:
        try:
            async with session.get(('https://api.douban.com/v2/movie/top250'
                                    '?count=10'),
                                   proxy='http://' + proxy,
                                   headers=headers,
                                   timeout=4) as resp:
                print(resp.status)
        except TimeoutError as te:
            print(proxy, te, 'timeoutError')
        except ClientProxyConnectionError as pce:
            print(proxy, pce, 'proxyError')
        except ClientConnectionError as ce:
            print(proxy, ce, 'connectError')
  • 循环访问豆瓣导致暂时被封IP
headers = {'User-Agent': ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                          'AppleWebKit/537.36 (KHTML, like Gecko) ')}
while True:
    r = requests.get('http://douban.com', headers=headers)
    print(r.status_code)
    r = requests.get('https://movie.douban.com/j/search_subjects?'
                     'type=movie&tag=%E8%B1%86%E7%93%A3%E9%AB%9',
                     headers=headers)
    print(r.status_code)
最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,172评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,346评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,788评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,299评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,409评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,467评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,476评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,262评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,699评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,994评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,167评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,499评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,149评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,387评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,028评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,055评论 2 352

推荐阅读更多精彩内容