如何构建分布式网络爬虫的系统和架构
正在寻找大规模构建分布式爬虫架构和解析器的指南?了解如何实现分布式爬虫,包括 Web 抓取、提取内容并以容错方式存储具有可扩展性的内容。我们将结合以前帖子中的所有知识来创建一个分布式爬虫系统。
首先,我们了解了抓取内容的专业技术,尽管我们今天只使用 CSS 选择器。然后是避免阻塞的技巧,我们将从中添加代理、标头和无头浏览器。最后,我们构建了一个并行爬虫,这篇博文就是从该代码开始的。
如果您不理解某些部分或片段,它可能在较早的帖子中。振作起来;冗长的片段来了。
先决条件
要使代码正常工作,您需要安装Redis和python3。有些系统已经预装了它。之后,通过运行安装所有必需的库pip install
。
pip install install requests beautifulsoup4 playwright "celery[redis]" npx playwright install
Celery 和 Redis 简介
Celery “是一个开源的异步任务队列”。我们在上一篇博文中创建了一个简单的并行版本。Celery 通过提供实际的分布式队列更进一步。我们将使用它在工作人员和服务器之间分配我们的负载。在现实世界中,我们将有多个节点来制作分布式网络爬虫。
Redis “是一种开源的内存数据结构存储,用作数据库、缓存和消息代理。” 我们将使用 Redis 作为数据库,而不是使用数组和集合来存储所有内容(在内存中)。此外,Celery 可以使用Redis 作为代理,因此我们不需要其他软件来运行它。
用python搭建一个分布式网络爬虫不是一件容易的事,但你要勇于尝试!我们走吧!
简单的任务
我们的第一步是在 Celery 中创建一个任务,打印参数接收到的值。将代码片段保存在名为的文件中tasks.py
并运行它。如果将它作为常规 python 文件运行,则只会打印一个字符串。如果您使用 运行它,控制台将打印两行不同的内容celery -A tasks worker
。
区别在于demo
函数调用。直接调用意味着“执行该任务”,而delay
意味着“将其排入队列以供工作人员处理”。查看 Celery 的 API 以获取有关调用任务的更多信息。
from celery import Celery app = Celery('tasks', broker_url='redis://127.0.0.1:6379/1') @app.task def demo(str): print(f'Str: {str}') demo('ZenRows') # Str: ZenRows demo.delay('ZenRows') # ?
命令celery
不会结束;我们需要通过退出控制台(即ctrl + C
)来杀死它。我们将多次使用它,因为 Celery 在代码更改后不会重新加载。
分布式任务爬取
下一步是将Celery 任务与爬网过程连接起来。这次我们将使用在上一篇文章中看到的辅助函数的一个稍微改变的版本。extract_links
将获取页面上除那些之外的所有链接nofollow
。我们稍后会添加过滤选项。
为简单起见,我们将在单个节点中执行所有操作。Celery 使我们可以轻松地在多个节点上运行分布式爬虫。留到最后才知道怎么做。
from celery import Celery import requests from bs4 import BeautifulSoup from urllib.parse import urljoin app = Celery('tasks', broker_url='redis://127.0.0.1:6379/1') @app.task def crawl(url): html = get_html(url) soup = BeautifulSoup(html, 'html.parser') links = extract_links(url, soup) print(links) def get_html(url): try: response = requests.get(url) return response.content except Exception as e: print(e) return '' def extract_links(url, soup): return list({ urljoin(url, a.get('href')) for a in soup.find_all('a') if a.get('href') and not(a.get('rel') and 'nofollow' in a.get('rel')) }) starting_url = 'https://scrapeme.live/shop/page/1/' crawl.delay(starting_url)
我们可以遍历检索到的链接并将它们排入队列,但这最终会重复抓取相同的页面。我们了解了执行任务的基础知识,现在我们将开始将代码拆分为文件并跟踪 Redis 上的页面。
用于跟踪 URL 的 Redis
我们已经说过依赖内存变量不是分布式系统中的一个选项。我们将需要保留所有这些数据:访问过的页面、当前正在抓取的页面、保留“访问”列表,并在以后存储一些内容。
尽管如此,我们将使用 Redis 来避免重新抓取和重复,而不是直接排队到 Celery。并且只对 URL 进行一次排队。
我们不会详细介绍 Redis,但我们将使用lists、sets和hashes。
获取最后一个片段并删除最后两行,即调用任务的行。main.py
使用以下内容创建一个新文件。
我们将创建一个名为的列表crawling:to_visit
并推送起始 URL。然后我们将进入一个循环,该循环将查询该列表中的项目并阻塞一分钟直到项目准备就绪。当检索到一个项目时,我们调用该crawl
函数,将其执行排队。
from redis import Redis from tasks import crawl connection = Redis(db=1) starting_url = 'https://scrapeme.live/shop/page/1/' connection.rpush('crawling:to_visit', starting_url) while True: # timeout after 1 minute item = connection.blpop('crawling:to_visit', 60) if item is None: print('Timeout! No more items to process') break url = item[1].decode('utf-8') print('Pop URL', url) crawl.delay(url)
它和以前几乎一样,但允许我们将项目添加到列表中,这将被自动处理。我们可以通过遍历并将links
它们全部推送来轻松地做到这一点,但如果没有重复数据删除和最大页面数,这不是一个好主意。我们将跟踪所有queued
正在visited
使用的集合,并在它们的总和超过允许的最大值时退出。
from redis import Redis # ... connection = Redis(db=1) @app.task def crawl(url): connection.sadd('crawling:queued', url) # add URL to set html = get_html(url) soup = BeautifulSoup(html, 'html.parser') links = extract_links(url, soup) for link in links: if allow_url_filter(link) and not seen(link): print('Add URL to visit queue', link) add_to_visit(link) # atomically move a URL from queued to visited connection.smove('crawling:queued', 'crawling:visited', url) def allow_url_filter(url): return '/shop/page/' in url and '#' not in url def seen(url): return connection.sismember('crawling:visited', url) or connection.sismember('crawling:queued', url) def add_to_visit(url): # LPOS command is not available in Redis library if connection.execute_command('LPOS', 'crawling:to_visit', url) is None: connection.rpush('crawling:to_visit', url) # add URL to the end of the list
maximum_items = 5 while True: visited = connection.scard('crawling:visited') # count URLs in visited queued = connection.scard('crawling:queued') if queued + visited > maximum_items: print('Exiting! Over maximum') break # ...
执行完之后,一切都在Redis中,所以再次运行并不能正常运行。我们需要手动清理爬行队列。我们可以通过使用或像redis-commanderredis-cli
这样的 GUI来做到这一点。有删除键(即)或刷新数据库(小心这个)的命令。DEL crawling:to_visit
职责分离
我们将在项目增长之前开始分离概念。我们已经有两个文件:tasks.py
和main.py
. 我们将创建另外两个来承载与爬虫相关的功能 ( crawler.py
) 和数据库访问 ( repo.py
)。请查看下面的回购文件片段,它不完整,但你明白了。有一个包含最终内容的GitHub 存储库,以备您查看。
from redis import Redis connection = Redis(db=1) to_visit_key = 'crawling:to_visit' visited_key = 'crawling:visited' queued_key = 'crawling:queued' def pop_to_visit_blocking(timeout=0): return connection.blpop(to_visit_key, timeout) def count_visited(): return connection.scard(visited_key) def is_queued(value): return connection.sismember(queued_key, value)
并且该crawler
文件将具有爬取、提取链接等功能。
允许解析器自定义
如上所述,我们需要一些方法来提取和存储内容并仅将特定链接子集添加到队列中。为此我们需要一个新概念:默认解析器 ( parsers/defaults.py
)。
import repo def extract_content(url, soup): return soup.title.string # extract page's title def store_content(url, content): # store in a hash with the URL as the key and the title as the content repo.set_content(url, content) def allow_url_filter(url): return True # allow all by default def get_html(url): # ... same as before
在repo.py
文件中:
content_key = 'crawling:content' # .. def set_content(key, value): connection.hset(content_key, key=key, value=value)
这里没有什么新东西,但是它将允许我们抽象链接和内容提取。它不是在爬虫中硬编码,而是一组作为参数传递的函数。现在我们可以用导入替换对这些函数的调用(暂时)。
为了将其完全抽象,我们需要一个生成器或工厂。我们将创建一个新文件来托管它 – parserlist.py
. 为了简化一点,我们允许每个域一个自定义解析器。该演示包含两个用于测试的域:scrapeme.live和quotes.toscrape.com。
尚未对每个域执行任何操作,因此我们将为它们使用默认的解析器。
from urllib.parse import urlparse from parsers import defaults parsers = { 'scrapeme.live': defaults, 'quotes.toscrape.com': defaults, } def get_parser(url): hostname = urlparse(url).hostname # extract domain from URL if hostname in parsers: # use the dict above to return the custom parser if present return parsers[hostname] return defaults
我们现在可以使用新的每个域解析器修改任务。
@app.task def crawl(url): parser = get_parser(url) # get the parser, either custom or the default one html = parser.get_html(url) # ... for link in links: if parser.allow_url_filter(link) and not seen(link): # ...
自定义解析器
我们将以scrapeme
first 为例。检查最终版本和其他自定义解析器的repo 。
这部分需要了解页面及其 HTML。想感受一下就看一看吧。总而言之,我们将获取产品列表中每个项目的产品 ID、名称和价格。然后使用 id 作为键将其存储在一个集合中。至于允许的链接,只有分页的链接才会被过滤。
import json import defaults import repo def extract_content(url, soup): return [{ 'id': product.find('a', attrs={'data-product_id': True})['data-product_id'], 'name': product.find('h2').text, 'price': product.find(class_='amount').text } for product in soup.select('.product')] def store_content(url, content): for item in content: if item['id']: repo.set_content(item['id'], json.dumps(item)) def allow_url_filter(url): return '/shop/page/' in url and '#' not in url def get_html(url): return defaults.get_html(url)
在quotes
站点中,我们需要以不同的方式处理它,因为每个报价都没有 ID。我们将提取列表中每个条目的作者和引用。然后,在该store_content
函数中,我们将为每个作者创建一个列表并添加该引用。Redis 在必要时处理列表的创建。
def extract_content(url, soup): return [{ 'quote': product.find(class_='text').text, 'author': product.find(class_='author').text } for product in soup.select('.quote')] def store_content(url, content): for item in content: if item['quote'] and item['author']: list_key = f"crawling:quote:{item['author']}" repo.add_to_list(list_key, item['quote'])
通过最后几项更改,我们引入了易于扩展的自定义解析器。添加新站点时,我们必须为每个新域创建一个文件并parserlist.py
引用它一行。我们可以更进一步并“自动发现”它们,但无需将其复杂化。
获取 HTML:无头浏览器
到目前为止,访问的每个页面都是使用 完成的requests.get
,这在某些情况下可能是不够的。假设我们想使用不同的库或无头浏览器,但只是针对某些情况或领域。加载浏览器很耗内存而且很慢,所以我们应该在非强制性的情况下避免它。解决方案?更多定制。新概念:收集器。
我们将创建一个名为的文件collectors/basic.py
并粘贴已知get_html
函数。然后通过导入更改默认值以使用它。接下来,创建一个新文件 ,collectors/headless_chromium.py
用于获取目标 HTML 的新方法。与上一篇文章一样,我们将使用playwright。如果我们想使用它们,我们还将参数化标头和代理。
from playwright.sync_api import sync_playwright def get_html(url, headers=None, proxy=None, timeout=10000): html = '' with sync_playwright() as p: browser_type = p.chromium browser = browser_type.launch(proxy=proxy) page = browser.new_page() page.set_extra_http_headers(headers) page.goto(url) page.wait_for_timeout(timeout) html = page.content() browser.close() return html
如果我们想为某些域使用无头 Chromium,只需修改get_html
该解析器(即parsers/scrapemelive.py
)。
from collectors import headless_chromium # ... def get_html(url): return headless_chromium.get_html(url)
正如您在最终回购中看到的那样,我们还有一个fake.py
收集器用于scrapemelive.py
. 由于我们使用该网站进行密集测试,因此我们在第一时间下载了所有产品页面并将它们存储在一个data
文件夹中。我们可以使用无头浏览器进行自定义,但我们可以使用文件阅读器进行相同的操作,因此使用了假名。
import time import re import random def get_html(url): try: page = int(re.search(r'd+', url).group()) with open('./data/' + str(page) + '.html') as fp: time.sleep(random.randint(1, 10) / 10) return fp.read() except Exception as e: print(e) return ''
避免使用标头和代理进行检测
你猜对了:我们想要添加自定义标头并使用代理。我们将从创建一个文件开始headers.py
。我们不会在这里粘贴全部内容,Linux 机器有三组不同的标头,而且很长。检查回购协议的详细信息。
import random chrome_linux_88 = { # ... 'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.96 Safari/537.36', } chromium_linux_92 = { # ... 'user-agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.107 Safari/537.36', } firefox_linux_88 = { # ... 'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:88.0) Gecko/20100101 Firefox/88.0', } headers = [ chrome_linux_88, chromium_linux_92, firefox_linux_88 ] def random_headers(): return random.choice(headers)
我们可以导入一组具体的标头或调用random_headers
来获取可用选项之一。稍后我们将看到一个用法示例。
这同样适用于代理:创建一个新文件,proxies.py
. 它将包含由提供商分组的列表。在我们的示例中,我们将仅包含免费代理。在字典中添加您的付费类型proxies
,并将默认类型更改为您喜欢的类型。如果我们想让事情复杂化,我们可以在失败的情况下添加与不同提供者的重试。
请注意,这些免费代理可能不适合您。他们的寿命很短。
import random free_proxies = [ {'http': 'http://62.33.210.34:58918', 'https': 'http://194.233.69.41:443'}, {'http': 'http://190.64.18.177:80', 'https': 'http://203.193.131.74:3128'}, ] proxies = { 'free': free_proxies, } def random_proxies(type='free'): return random.choice(proxies[type])
以及解析器中的用法:
from headers import random_headers from proxies import random_proxies # ... def get_html(url): return basic.get_html(url, headers=random_headers(), proxies=random_proxies())
把它们放在一起
这是一次漫长而多事的旅行。是时候通过完成拼图来结束它了。我们希望您了解分布式网络抓取和爬行的过程以及所有挑战。我们无法在此处显示最终代码,因此请查看存储库,如有任何疑问,请随时发表评论或联系我们。
这两个入口点tasks.py
用于 Celery 和main.py
开始排队 URL。从那里开始,我们开始将 URL 存储在 Redis 中以跟踪并开始抓取第一个 URL。自定义或默认解析器将获取 HTML,提取和过滤链接,并生成和存储适当的内容。我们将这些链接添加到列表中并再次开始该过程。
多亏了 Celery,一旦队列中有多个链接,分布式网络爬虫过程就开始了。
积分仍然缺失
我们已经涵盖了很多内容,但总有更多的步骤。这里有一些我们没有包括的功能。另外请注意,为简洁起见,大部分代码不包含错误处理或重试。
分布式网络爬虫
Celery 为我们提供开箱即用的分布式抓取和爬行。代码是一样的,但是执行是不同的,因为分布式爬虫有多种策略。
对于本地测试,我们可以启动两个不同的 workercelery -A tasks worker --concurrency=20 -n worker1
和... -n worker2
。但这并不是真正的分布式网络爬虫设计。
需要注意的是,worker 的名字是必不可少的,尤其是在同一台机器上启动多个 worker 的时候。如果我们在不更改 worker 名称的情况下执行上述命令两次,Celery 将无法正确识别它们。因此启动第二个作为-n worker2
.
如果项目增长,我们唯一的节点将成为瓶颈。要正确制作分布式爬虫,我们需要多个节点。他们每个人都将运行相同的代码并可以访问代理 – 在我们的例子中是 Redis。Celery 处理工人并分配负载。
机器人.txt
除了这一allow_url_filter
部分,我们还应该添加一个 robots.txt 检查器。为此,robotparser 库可以获取一个 URL 并告诉我们是否允许抓取它。我们可以将它添加到默认或作为一个独立的功能,然后每个爬虫决定是否使用它。我们认为它足够复杂,没有实现这个功能。
如果您要这样做,请考虑上次访问文件的时间,mtime()
并时不时地重新阅读它。而且,缓存它以避免为每个单独的 URL 请求它。
限制分布式爬虫的速率
Celery 的速率限制 API不允许对每个任务和参数(在我们的例子中是域)进行自定义。这意味着我们可以限制工作人员或队列,但不能像我们希望的那样限制到细粒度的细节。这意味着我们不能将我们的分布式系统作为一个整体来限制。
有几个未解决的问题和解决方法。通过阅读其中的一些内容,我们得出的结论是,如果不自己跟踪请求,我们就无法做到这一点。
我们可以使用 param 轻松地将每个任务的速率限制为每分钟 30 个请求@app.task(rate_limit="30/m")
。但请记住,它会影响任务,而不是爬网域。
结论
构建自定义的分布式网络爬虫和解析器并不容易或直接。我们提供了一些指导和技巧,希望能帮助大家完成日常任务。
在开发用于大规模生产和高性能的东西之前,请考虑一些重要的要点:
- 职责分开。
- 必要时使用抽象,但不要过度设计。
- 不要害怕使用专门的软件而不是构建一切。
- 考虑扩展,即使你现在不需要它;请记住。
感谢您加入我们直到最后。这是一个有趣的系列,我们希望它对您也有吸引力。如果你喜欢它,你可能会对Javascript Web Scraping 指南感兴趣。