如何在Python中使用并发加速网页抓取
抓取网站数据是开发人员的典型用例。无论是业余项目还是您正在建立一家初创公司,都有很多理由来抓取网络。
例如,如果您想启动一个价格比较网站,您需要从各种电子商务网站上抓取价格。也许你想构建一个可以识别产品并在亚马逊上查询价格的人工智能。可能性是无止境。
但是你有没有注意到获取所有页面的速度有多慢?你会一个接一个地刮掉所有的产品吗?一定有更好的解决办法吧?正确的?!
抓取网站可能很耗时,因为您必须处理等待服务器响应和速率限制的问题。这就是为什么我们将向您展示如何通过在 Python 中使用并发来加速您的 Web 抓取项目。
先决条件
要使代码正常工作,您需要安装 python3。有些系统已经预装了它。之后,通过运行安装所有必需的库pip install
。
pip install requests beautifulsoup4 aiohttp numpy
并发
并发是一个处理同时运行多个计算任务的能力的术语。
当您按顺序向网站发出请求时,您一次发送一个请求,等待它返回,然后再发送下一个。
但是,您可以同时发送多个并发请求,并在它们返回时处理所有请求。这种方法带来的速度提升令人难以置信。与顺序请求相比,并发请求将更快,无论它们是否并行运行(多个 CPU)——稍后会详细介绍。
要了解并发的好处,我们需要了解顺序处理任务和并发处理任务之间的区别。例如,假设我们有五个任务,每个任务需要 10 秒才能完成。
按顺序处理时,完成所有五个所需的时间为 50 秒。但是,并发处理时,5 个任务全部完成仅需 10 秒。
为什么要异步?
要决定使用什么技术,我们必须了解asyncio
和之间的区别multiprocessing
。还有 I/O 绑定和 CPU 绑定。
asyncio “是一个使用 async/await 语法编写并发代码的库”。它在单个处理器上运行。
multiprocessing “是一个使用 API […] 支持生成进程的包,允许程序员在给定机器上充分利用多个处理器”。每个进程都会在不同的 CPU 中启动自己的 Python 解释器。
I/O-bound意味着程序将由于输入/输出操作而运行得更慢。在我们的例子中,主要是网络请求。
受 CPU 限制意味着程序将由于中央处理器的使用而运行得更慢——例如,数学计算。
为什么这会影响我们将用于并发的库?因为并发成本的很大一部分是创建和维护线程/进程。对于受 CPU 限制的问题,在不同的 CPU 中拥有多个进程会有所回报。但对于受 I/O 限制的场景,情况可能并非如此。
由于抓取主要是 I/O 绑定,我们选择了asyncio
. 但是如果有疑问(或者只是为了好玩),您可以使用多处理复制这个想法并比较结果。
顺序版本
我们将以抓取scrapeme.live
作为示例开始,这是一个准备测试的假 Pokémon 电子商务。
首先,我们将从scraper 的顺序版本开始。几个片段是所有案例的一部分,因此它们将保持不变。
通过访问页面,我们看到有48个页面。由于它是一个测试环境,因此不会很快改变。我们的第一个常量将是基本 URL 和页面范围。
base_url = "https://scrapeme.live/shop/page" pages = range(1, 49) # max page (48) + 1
现在,从产品中提取基础知识。为此,使用requests.get
获取 HTML 然后BeautifulSoup
解析它。我们将遍历每个产品并从中获取一些基本信息。所有选择器都来自对内容的手动审查(使用 DevTools),但为了简洁起见,我们不会在这里详细介绍。
import requests from bs4 import BeautifulSoup def extract_details(page): # concatenate page number to base URL response = requests.get(f"{base_url}/{page}/") soup = BeautifulSoup(response.text, "html.parser") pokemon_list = [] for pokemon in soup.select(".product"): # loop each product pokemon_list.append({ "id": pokemon.find(class_="add_to_cart_button").get("data-product_id"), "name": pokemon.find("h2").text.strip(), "price": pokemon.find(class_="price").text.strip(), "url": pokemon.find(class_="woocommerce-loop-product__link").get("href"), }) return pokemon_list
该extract_details
函数将获取一个页码并将其连接起来以创建一个具有之前看到的基础的 URL。获取内容并创建一系列产品后,返回它们。这意味着返回值将是一个字典列表。这是以后必不可少的细节。
我们需要为每个页面运行上面的函数,获取所有结果,并存储它们。
import csv # modified to avoid running all the pages unintentionally pages = range(1, 3) def store_results(list_of_lists): pokemon_list = sum(list_of_lists, []) # flatten lists with open("pokemon.csv", "w") as pokemon_file: # get dictionary keys for the CSV header fieldnames = pokemon_list[0].keys() file_writer = csv.DictWriter(pokemon_file, fieldnames=fieldnames) file_writer.writeheader() file_writer.writerows(pokemon_list) list_of_lists = [ extract_details(page) for page in pages ] store_results(list_of_lists)
运行上面的代码将得到两个产品页面,提取产品(总共 32 个),并将它们存储在一个名为 .csv 的 CSV 文件中pokemon.csv
。该store_results
函数不影响顺序或并发模式下的抓取。你可以跳过它。
由于结果是列表,我们必须将它们展平以允许writerows
完成它的工作。这就是我们命名变量的原因list_of_lists
(即使它有点奇怪),只是为了提醒大家它不是扁平的。
输出 CSV 文件的示例:
ID | 姓名 | 价格 | 网址 |
---|---|---|---|
759 | 妙蛙种子 | £63.00 | https://scrapeme.live/shop/Bulbasaur/ |
729 | 藤龙 | £87.00 | https://scrapeme.live/shop/Ivysaur/ |
730 | 金龙 | £105.00 | https://scrapeme.live/shop/Venusaur/ |
731 | 小火龙 | £48.00 | https://scrapeme.live/shop/Charmander/ |
732 | 变色龙 | £165.00 | https://scrapeme.live/shop/Charmeleon/ |
如果您要为总共的每一页 (48) 运行该脚本,它将生成一个包含 755 种产品的 CSV,并花费大约 30 秒。
time python script.py real 0m31,806s user 0m1,936s sys 0m0,073s
引入异步
我们知道我们可以做得更好。如果我们同时执行所有请求,应该会少很多,对吧?也许只要最慢的请求?
并发确实应该运行得更快,但它也涉及一些开销。所以它不是一个线性的数学改进。但我们会改进的。
为此,我们将使用提到的asyncio
. 它允许我们在事件循环中的同一个线程上运行多个任务(就像 Javascript 那样)。它将运行一个函数,并在允许时将上下文切换到不同的上下文。在我们的例子中,HTTP 请求允许这种切换。
我们将开始看到一个会休眠一秒钟的例子。并且脚本应该需要一秒钟才能运行。请注意,我们不能main
直接调用。我们需要让我们asyncio
知道这是一个需要执行的异步函数。
import asyncio async def main(): print("Hello ...") await asyncio.sleep(1) print("... World!") asyncio.run(main())
time python script.py Hello ... ... World! real 0m1,054s user 0m0,045s sys 0m0,008s
并行的简单代码
接下来,我们将扩展一个示例案例来运行一百个功能。他们每个人都会睡一秒钟并打印一段文字。如果我们按顺序运行它们,大约需要一百秒。使用asyncio
,只需一个!
这就是并发背后的力量。如前所述,对于纯 I/O 绑定任务,它会执行得更快——睡眠不是,但它对示例很重要。
我们需要创建一个辅助函数,它会休眠一秒钟并打印一条消息。然后,我们编辑main
以调用该函数一百次并将每次调用存储在任务列表中。最后也是关键的部分是执行并等待所有任务完成。就是这样asyncio.gather
。
import asyncio async def demo_function(i): await asyncio.sleep(1) print(f"Hello {i}") async def main(): tasks = [ demo_function(i) for i in range(0, 100) ] await asyncio.gather(*tasks) asyncio.run(main())
正如预期的那样,一百条消息和一秒钟的时间来执行。
time python script.py Hello 0 ... Hello 99 real 0m1,065s user 0m0,063s sys 0m0,000s
用异步抓取
我们需要将这些知识应用到抓取中。遵循的方法是同时请求和返回产品列表。所有请求完成后,存储它们。最好在每次请求后或批量保存数据,以避免真实案例中的数据丢失。
我们的第一次尝试不会有并发限制,所以使用时要小心。如果用数千个 URL 运行它……好吧,它几乎可以同时执行所有这些请求。这可能会给服务器带来巨大的负载,并可能烧毁您的计算机。
requests
不支持开箱即用的异步,因此我们将使用它aiohttp
来避免并发症。requests
可以完成这项工作,并且没有实质性的性能差异。但是使用 . 代码更具可读性aiohttp
。
import asyncio import aiohttp from bs4 import BeautifulSoup async def extract_details(page, session): # similar to requests.get but with a different syntax async with session.get(f"{base_url}/{page}/") as response: # notice that we must await the .text() function soup = BeautifulSoup(await response.text(), "html.parser") # [...] same as before return pokemon_list async def main(): # create an aiohttp session and pass it to each function execution async with aiohttp.ClientSession() as session: tasks = [ extract_details(page, session) for page in pages ] list_of_lists = await asyncio.gather(*tasks) store_results(list_of_lists) asyncio.run(main())
CSV 文件应该像以前一样包含每个产品 (755)。由于我们同时执行所有页面调用,因此结果不会按顺序到达。如果我们要将结果添加到内部文件中,extract_details
它们可能是无序的。由于我们是等所有任务完成后再进行处理,所以顺序不会有问题。
time python script.py real 0m11,442s user 0m1,332s sys 0m0,060s
我们做到了!快3 倍很好,但是……不应该是 40 倍吗?没那么简单。许多因素都会影响性能(网络、CPU、RAM 等)。
在这个演示页面中,我们注意到当我们执行多个调用时响应时间会变慢。这可能是设计使然。一些服务器/提供商可以限制并发请求的数量,以避免来自同一 IP 的流量过多。它不是一个块,而是一个队列。你会得到服务,但要等一会儿。
要查看真正的加速,您可以针对延迟页面进行测试。这是另一个测试页面,将等待 2 秒然后返回响应。
base_url = "https://httpbin.org/delay/2" #... async def extract_details(page, session): async with session.get(base_url) as response: #...
去掉了所有的提取和存储逻辑,只调用了 48 次延迟 URL。它运行不到 3 秒。
time python script.py real 0m2,865s user 0m0,245s sys 0m0,031s
使用信号量限制并发
如前所述,我们应该限制并发请求的数量,尤其是针对单个域。
asyncio
带有Semaphore,一个将获取和释放锁的对象。它的内部功能将阻塞一些调用,直到获得锁,从而创建最大的并发性。
我们需要创建我们想要的最大值的信号量。然后等待提取函数,直到它可以使用async with sem
.
max_concurrency = 3 sem = asyncio.Semaphore(max_concurrency) async def extract_details(page, session): async with sem: # semaphore limits num of simultaneous downloads async with session.get(f"{base_url}/{page}/") as response: # ... async def main(): # ... loop = asyncio.get_event_loop() loop.run_until_complete(main())
它完成了工作,而且实施起来相对容易!这是最大并发设置为 3 的输出。
time python script.py real 0m13,062s user 0m1,455s sys 0m0,047s
说明无限并发的版本没有全速运行🤦。如果我们将限制增加到 10,则总时间类似于未绑定脚本。
使用 TCPConnector 限制并发
aiohttp
提供了提供进一步配置的替代解决方案。我们可以创建传入自定义TCPConnector 的客户端会话。
我们可以使用适合我们需要的两个参数来构建它:
limit
– “同时连接总数”。limit_per_host
– “限制同时连接到同一端点”(相同的主机、端口和is_ssl
)。max_concurrency = 10 max_concurrency_per_host = 3 async def main(): connector = aiohttp.TCPConnector(limit=max_concurrency, limit_per_host=max_concurrency_per_host) async with aiohttp.ClientSession(connector=connector) as session: # ... asyncio.run(main())
也易于实施和维护!这是最大并发设置为每个主机 3 的输出。
time python script.py real 0m16,188s user 0m1,311s sys 0m0,065s
优势在于Semaphore
可以选择限制每个域的并发调用和请求总量。我们可以使用相同的方式session
来抓取不同的站点,并且每个站点都有自己的限制。
缺点是看起来有点慢。针对真实案例场景,使用更多页面和实际数据运行一些测试。
多重处理
就像我们之前看到的那样,抓取是 I/O 绑定的。但是,如果我们需要将它与一些 CPU 密集型计算混合使用怎么办?为了测试这种情况,我们将使用一个函数,该函数将count_a_lot
在每个抓取的页面之后(达到一亿)。这是一种强制 CPU 忙碌一段时间的简单(而且愚蠢)的方法。
def count_a_lot(): count_to = 100_000_000 counter = 0 while counter < count_to: counter = counter + 1 async def extract_details(page, session): async with session.get(f"{base_url}/{page}/") as response: # ... count_a_lot() return pokemon_list
对于 asyncio 版本,只需像以前一样运行它。可能需要很长时间⏳。
time python script.py real 2m37,827s user 2m35,586s sys 0m0,244s
添加multiprocessing
有点困难。我们需要创建一个ProcessPoolExecutor,它“使用进程池异步执行调用”。它将在不同的 CPU 中处理每个进程的创建和控制。
但它不会分配负载。为此,我们将使用NumPy
s array_split
,它会pages
根据 CPU 的数量将范围分成相等的块。
函数的其余部分main
与版本类似asyncio
,但更改了一些语法以匹配multiprocessing
样式。
本质区别是我们不能extract_details
直接调用。multiprocessing
我们可以,但我们会尝试通过与混合来获得最大功率asyncio
。
from concurrent.futures import ProcessPoolExecutor from multiprocessing import cpu_count import numpy as np num_cores = cpu_count() # number of CPU cores def main(): executor = ProcessPoolExecutor(max_workers=num_cores) tasks = [ executor.submit(asyncio_wrapper, pages_for_task) for pages_for_task in np.array_split(pages, num_cores) ] doneTasks, _ = concurrent.futures.wait(tasks) results = [ item.result() for item in doneTasks ] store_results(results) main()
长话短说,每个 CPU 进程都会有几个页面要抓取。共有 48 个页面,假设您的机器有 8 个 CPU,每个进程将请求六个页面 (6 * 8 = 48)。
这六个页面将同时运行!之后,计算将不得不等待,因为它们是 CPU 密集型的。但是我们有很多 CPU,所以它们应该比纯asyncio
版本运行得更快。
async def extract_details_task(pages_for_task): async with aiohttp.ClientSession() as session: tasks = [ extract_details(page, session) for page in pages_for_task ] list_of_lists = await asyncio.gather(*tasks) return sum(list_of_lists, []) def asyncio_wrapper(pages_for_task): return asyncio.run(extract_details_task(pages_for_task))
每个 CPU 进程将从asyncio
页面的子集开始(例如,第一个页面从 1 到 6)。
然后,其中的每一个都将调用多个 URL,使用已知的extract_details
函数。
花点时间吸收它。整个过程是这样的:
- 创建执行器。
- 拆分页面。
asyncio
每个过程开始。- 创建
aiohttp
会话并创建页面子集的任务。 - 提取每个页面的数据。
- 合并并存储结果。
这是执行时间。我们没有提到它,但user
这里的时间起着显着的作用。对于仅运行的脚本asyncio
:
time python script.py real 2m37,827s user 2m35,586s sys 0m0,244s
asyncio
多进程版本:
time python script.py real 0m38,048s user 3m3,147s sys 0m0,532s
你发现区别了吗?第一个用了两分多钟,第二个用了 40 秒。但在总 CPU 时间(user
时间)中,第二个超过了三分钟!由于系统开销和所有其他原因,这要多一些。
这表明并行处理“浪费”了更多时间(总计)但之前完成了。然后由您决定选择哪种方法。还要考虑到开发和调试更加复杂。
结论
我们已经看到这asyncio
可能足以进行抓取,因为大部分运行时间都花在了网络上。它是 I/O 绑定的,并且可以很好地与单核中的并发处理配合使用。
如果收集的数据需要一些 CPU 密集型工作,这种情况就会改变。我们已经看到了一个关于计数的愚蠢例子,但你明白了。
在大多数情况下,asyncio
使用aiohttp
– 比异步工作更适合requests
– 可以完成工作。添加自定义连接器以限制每个域的请求数和并发请求数。有了这三个部分,您就可以开始构建一个可以扩展的爬虫了。一个重要的部分是允许新的 URL/任务(类似于队列)