引言
在现代Python开发中,异步编程已成为提高程序性能和响应能力的关键技术。随着网络请求、数据库操作等I/O密集型任务的增多,传统的同步编程方式已无法满足高性能应用的需求。Python的asyncio库为我们提供了强大的异步编程支持,让我们能够编写出更加高效、响应更快的应用程序。
本文将深入探讨Python异步编程的核心技术,从基础概念到高级应用,通过实际的网络爬虫开发实例,展示异步编程在实际项目中的应用价值。我们将涵盖asyncio的工作原理、协程的使用、异步数据库操作等关键技术点,帮助读者全面掌握异步编程的精髓。
一、异步编程基础概念
1.1 什么是异步编程
异步编程是一种编程范式,允许程序在等待某些操作完成时执行其他任务,而不是阻塞等待。在传统的同步编程中,当一个函数需要等待网络请求或文件读取等I/O操作完成时,整个程序会被阻塞,直到操作完成。而异步编程允许程序在等待期间执行其他任务,从而提高整体效率。
1.2 同步vs异步对比
让我们通过一个简单的例子来理解同步和异步的区别:
import time
import requests
# 同步方式
def sync_requests():
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1'
]
start_time = time.time()
for url in urls:
response = requests.get(url)
print(f"Status: {response.status_code}")
end_time = time.time()
print(f"同步方式耗时: {end_time - start_time:.2f}秒")
# 异步方式
import asyncio
import aiohttp
async def async_requests():
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1'
]
start_time = time.time()
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
for response in responses:
print(f"Status: {response.status}")
end_time = time.time()
print(f"异步方式耗时: {end_time - start_time:.2f}秒")
在同步方式中,三个请求需要依次执行,总耗时约为3秒。而在异步方式中,三个请求可以并行执行,总耗时约为1秒。
1.3 异步编程的核心概念
异步编程涉及几个核心概念:
- 协程(Coroutine):异步编程的基础单元,可以暂停和恢复执行
- 事件循环(Event Loop):管理协程执行的循环机制
- 任务(Task):包装协程的高级对象,提供更多的控制能力
- 异步上下文管理器:提供异步的资源管理
二、asyncio核心原理详解
2.1 事件循环机制
asyncio的核心是事件循环,它负责调度和执行协程。事件循环的工作原理如下:
import asyncio
import time
async def task(name, delay):
print(f"任务 {name} 开始执行")
await asyncio.sleep(delay)
print(f"任务 {name} 执行完成")
return f"结果来自 {name}"
async def main():
# 创建多个任务
task1 = asyncio.create_task(task("任务1", 2))
task2 = asyncio.create_task(task("任务2", 1))
task3 = asyncio.create_task(task("任务3", 3))
# 等待所有任务完成
results = await asyncio.gather(task1, task2, task3)
print("所有任务完成:", results)
# 运行事件循环
# asyncio.run(main())
2.2 协程的创建和执行
协程可以通过async def关键字定义,使用await关键字来暂停和恢复执行:
import asyncio
# 定义协程
async def my_coroutine():
print("协程开始执行")
await asyncio.sleep(1)
print("协程执行完毕")
return "协程返回值"
# 执行协程
async def execute_coroutine():
# 方法1:直接调用
result = await my_coroutine()
print(f"结果: {result}")
# 方法2:创建任务
task = asyncio.create_task(my_coroutine())
result = await task
print(f"任务结果: {result}")
# asyncio.run(execute_coroutine())
2.3 任务和未来对象
在asyncio中,任务(Task)是协程的包装器,提供了更多的控制能力:
import asyncio
import time
async def fetch_data(url, delay):
print(f"开始获取 {url}")
await asyncio.sleep(delay)
print(f"完成获取 {url}")
return f"数据来自 {url}"
async def main():
start_time = time.time()
# 创建多个任务
tasks = [
asyncio.create_task(fetch_data("url1", 1)),
asyncio.create_task(fetch_data("url2", 2)),
asyncio.create_task(fetch_data("url3", 1))
]
# 等待所有任务完成
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.time()
print(f"总耗时: {end_time - start_time:.2f}秒")
print(f"结果: {results}")
# asyncio.run(main())
三、异步网络编程实践
3.1 异步HTTP客户端
异步HTTP客户端是异步编程中最常用的应用场景之一。aiohttp是Python中最流行的异步HTTP库:
import asyncio
import aiohttp
import time
async def fetch_url(session, url):
"""异步获取单个URL"""
try:
async with session.get(url) as response:
if response.status == 200:
content = await response.text()
return {
'url': url,
'status': response.status,
'content_length': len(content)
}
else:
return {
'url': url,
'status': response.status,
'error': 'HTTP错误'
}
except Exception as e:
return {
'url': url,
'error': str(e)
}
async def fetch_multiple_urls(urls, max_concurrent=5):
"""并发获取多个URL"""
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_with_semaphore(url):
async with semaphore:
return await fetch_url(session, url)
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_semaphore(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
# 使用示例
async def main():
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1'
]
start_time = time.time()
results = await fetch_multiple_urls(urls, max_concurrent=3)
end_time = time.time()
print(f"总耗时: {end_time - start_time:.2f}秒")
for result in results:
if isinstance(result, dict):
print(f"URL: {result['url']}, 状态: {result.get('status', '未知')}")
else:
print(f"错误: {result}")
# asyncio.run(main())
3.2 异步请求的错误处理
在异步网络编程中,错误处理尤为重要:
import asyncio
import aiohttp
from typing import Optional, Dict, Any
class AsyncHttpClient:
def __init__(self, timeout: int = 30, max_retries: int = 3):
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.max_retries = max_retries
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(timeout=self.timeout)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def fetch_with_retry(self, url: str) -> Optional[Dict[str, Any]]:
"""带重试机制的异步获取"""
for attempt in range(self.max_retries):
try:
async with self.session.get(url) as response:
if response.status == 200:
content = await response.text()
return {
'url': url,
'status': response.status,
'content': content,
'attempt': attempt + 1
}
else:
print(f"HTTP {response.status} for {url}")
if response.status in [404, 403]:
# 对于这些错误,不重试
return None
raise aiohttp.ClientResponseError(
request_info=response.request_info,
history=response.history,
status=response.status
)
except aiohttp.ClientError as e:
print(f"请求失败 (尝试 {attempt + 1}): {url} - {e}")
if attempt < self.max_retries - 1:
await asyncio.sleep(2 ** attempt) # 指数退避
continue
return None
except asyncio.TimeoutError:
print(f"请求超时 (尝试 {attempt + 1}): {url}")
if attempt < self.max_retries - 1:
await asyncio.sleep(2 ** attempt)
continue
return None
except Exception as e:
print(f"未知错误: {url} - {e}")
return None
return None
# 使用示例
async def main():
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/status/404',
'https://httpbin.org/delay/2'
]
async with AsyncHttpClient(max_retries=3) as client:
tasks = [client.fetch_with_retry(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, dict):
print(f"成功: {result['url']} (尝试 {result['attempt']})")
elif result is None:
print("请求失败")
else:
print(f"异常: {result}")
# asyncio.run(main())
四、异步数据库操作
4.1 异步数据库连接
异步数据库操作可以显著提高应用性能,特别是在处理大量数据时:
import asyncio
import asyncpg
import time
from typing import List, Dict, Any
class AsyncDatabaseManager:
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.pool = None
async def __aenter__(self):
self.pool = await asyncpg.create_pool(self.connection_string)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.pool:
await self.pool.close()
async def execute_query(self, query: str, *args) -> List[Dict[str, Any]]:
"""执行查询并返回结果"""
async with self.pool.acquire() as connection:
try:
rows = await connection.fetch(query, *args)
return [dict(row) for row in rows]
except Exception as e:
print(f"查询执行失败: {e}")
return []
async def execute_update(self, query: str, *args) -> int:
"""执行更新操作"""
async with self.pool.acquire() as connection:
try:
result = await connection.execute(query, *args)
# 解析返回的行数
if result.endswith('rows'):
return int(result.split()[0])
return 0
except Exception as e:
print(f"更新执行失败: {e}")
return 0
async def batch_insert(self, table: str, data_list: List[Dict[str, Any]]) -> int:
"""批量插入数据"""
if not data_list:
return 0
# 构建插入语句
columns = list(data_list[0].keys())
placeholders = ', '.join([f'${i+1}' for i in range(len(columns))])
columns_str = ', '.join(columns)
query = f"INSERT INTO {table} ({columns_str}) VALUES ({placeholders})"
async with self.pool.acquire() as connection:
try:
# 使用事务批量插入
async with connection.transaction():
for data in data_list:
await connection.execute(query, *data.values())
return len(data_list)
except Exception as e:
print(f"批量插入失败: {e}")
return 0
# 使用示例
async def database_example():
# 假设有一个数据库连接字符串
db_string = "postgresql://user:password@localhost:5432/mydb"
# 创建测试数据
test_data = [
{'name': 'Alice', 'age': 25, 'email': 'alice@example.com'},
{'name': 'Bob', 'age': 30, 'email': 'bob@example.com'},
{'name': 'Charlie', 'age': 35, 'email': 'charlie@example.com'}
]
async with AsyncDatabaseManager(db_string) as db:
# 批量插入数据
inserted_count = await db.batch_insert('users', test_data)
print(f"插入了 {inserted_count} 条记录")
# 查询数据
results = await db.execute_query("SELECT * FROM users WHERE age > $1", 25)
print(f"查询结果: {len(results)} 条记录")
# asyncio.run(database_example())
4.2 异步数据库连接池管理
连接池是提高数据库性能的重要手段:
import asyncio
import asyncpg
from typing import Optional
class AsyncConnectionPool:
def __init__(self, connection_string: str, min_size: int = 10, max_size: int = 20):
self.connection_string = connection_string
self.min_size = min_size
self.max_size = max_size
self.pool: Optional[asyncpg.Pool] = None
async def __aenter__(self):
self.pool = await asyncpg.create_pool(
self.connection_string,
min_size=self.min_size,
max_size=self.max_size,
command_timeout=60
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.pool:
await self.pool.close()
async def get_connection(self):
"""获取数据库连接"""
if not self.pool:
raise RuntimeError("连接池未初始化")
return await self.pool.acquire()
async def release_connection(self, connection):
"""释放数据库连接"""
if self.pool and connection:
await self.pool.release(connection)
async def execute_with_connection(self, func):
"""使用连接执行操作"""
connection = None
try:
connection = await self.get_connection()
return await func(connection)
finally:
if connection:
await self.release_connection(connection)
# 使用示例
async def pool_example():
db_pool = AsyncConnectionPool(
"postgresql://user:password@localhost:5432/mydb",
min_size=5,
max_size=15
)
async with db_pool:
# 执行数据库操作
async def db_operation(conn):
result = await conn.fetch("SELECT version()")
return result[0]['version']
version = await db_pool.execute_with_connection(db_operation)
print(f"数据库版本: {version}")
五、高性能网络爬虫开发
5.1 爬虫基础架构
一个高性能的异步爬虫需要考虑多个方面:并发控制、请求管理、数据存储等:
import asyncio
import aiohttp
import time
from urllib.parse import urljoin, urlparse
from bs4 import BeautifulSoup
import json
from dataclasses import dataclass
from typing import List, Set, Optional
@dataclass
class ScrapedData:
url: str
title: str
content: str
links: List[str]
timestamp: float
class AsyncWebScraper:
def __init__(self,
base_url: str,
max_concurrent: int = 10,
delay: float = 0.1,
timeout: int = 30):
self.base_url = base_url
self.max_concurrent = max_concurrent
self.delay = delay
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.visited_urls: Set[str] = set()
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(timeout=self.timeout)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def fetch_page(self, url: str) -> Optional[ScrapedData]:
"""获取单个页面"""
async with self.semaphore:
try:
await asyncio.sleep(self.delay) # 避免过于频繁的请求
async with self.session.get(url) as response:
if response.status != 200:
return None
content = await response.text()
soup = BeautifulSoup(content, 'html.parser')
# 提取页面信息
title = soup.title.string if soup.title else ''
text_content = soup.get_text()[:1000] # 限制内容长度
# 提取链接
links = []
for link in soup.find_all('a', href=True):
absolute_url = urljoin(url, link['href'])
links.append(absolute_url)
return ScrapedData(
url=url,
title=title,
content=text_content,
links=links,
timestamp=time.time()
)
except Exception as e:
print(f"获取页面失败 {url}: {e}")
return None
async def scrape_url(self, url: str) -> Optional[ScrapedData]:
"""爬取单个URL"""
if url in self.visited_urls:
return None
self.visited_urls.add(url)
return await self.fetch_page(url)
async def scrape_urls(self, urls: List[str]) -> List[ScrapedData]:
"""并发爬取多个URL"""
tasks = [self.scrape_url(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 过滤掉None值和异常
valid_results = [r for r in results if isinstance(r, ScrapedData)]
return valid_results
async def scrape_with_depth(self, start_url: str, max_depth: int = 2) -> List[ScrapedData]:
"""深度爬取"""
all_data = []
current_urls = [start_url]
for depth in range(max_depth):
if not current_urls:
break
print(f"爬取第 {depth + 1} 层,共 {len(current_urls)} 个URL")
# 爬取当前层的URL
data = await self.scrape_urls(current_urls)
all_data.extend(data)
# 提取下一层的URL(从当前层的链接中提取)
next_urls = set()
for item in data:
for link in item.links[:10]: # 限制每个页面最多提取10个链接
if self.is_valid_url(link) and link not in self.visited_urls:
next_urls.add(link)
current_urls = list(next_urls)
return all_data
def is_valid_url(self, url: str) -> bool:
"""检查URL是否有效"""
try:
parsed = urlparse(url)
return bool(parsed.netloc) and bool(parsed.scheme)
except:
return False
# 使用示例
async def main():
# 创建爬虫实例
async with AsyncWebScraper(
base_url="https://example.com",
max_concurrent=5,
delay=0.5
) as scraper:
# 爬取单个页面
data = await scraper.scrape_url("https://httpbin.org/html")
if data:
print(f"标题: {data.title}")
print(f"内容长度: {len(data.content)}")
# 爬取多个页面
urls = [
"https://httpbin.org/html",
"https://httpbin.org/json",
"https://httpbin.org/xml"
]
results = await scraper.scrape_urls(urls)
print(f"共爬取 {len(results)} 个页面")
# 保存结果到文件
with open('scraped_data.json', 'w', encoding='utf-8') as f:
json.dump([{
'url': item.url,
'title': item.title,
'content': item.content[:200],
'timestamp': item.timestamp
} for item in results], f, ensure_ascii=False, indent=2)
# asyncio.run(main())
5.2 爬虫性能优化
为了进一步提高爬虫性能,我们可以实现更多的优化策略:
import asyncio
import aiohttp
import time
from collections import deque
from typing import Deque, List, Optional
import logging
class OptimizedWebScraper:
def __init__(self,
base_url: str,
max_concurrent: int = 10,
delay: float = 0.1,
timeout: int = 30,
retry_count: int = 3):
self.base_url = base_url
self.max_concurrent = max_concurrent
self.delay = delay
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.retry_count = retry_count
self.visited_urls: set = set()
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session: Optional[aiohttp.ClientSession] = None
self.rate_limiter = RateLimiter(max_requests=10, time_window=1)
self.logger = logging.getLogger(__name__)
async def __aenter__(self):
# 配置连接池
connector = aiohttp.TCPConnector(
limit=self.max_concurrent,
limit_per_host=5,
ttl_dns_cache=300,
use_dns_cache=True,
)
self.session = aiohttp.ClientSession(
connector=connector,
timeout=self.timeout,
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def fetch_with_retry(self, url: str) -> Optional[dict]:
"""带重试机制的页面获取"""
for attempt in range(self.retry_count):
try:
# 速率限制
await self.rate_limiter.acquire()
async with self.semaphore:
async with self.session.get(url) as response:
if response.status == 200:
content = await response.text()
return {
'url': url,
'status': response.status,
'content': content,
'headers': dict(response.headers),
'attempt': attempt + 1
}
elif response.status == 429: # 速率限制
wait_time = int(response.headers.get('Retry-After', 1))
await asyncio.sleep(wait_time)
continue
else:
self.logger.warning(f"HTTP {response.status} for {url}")
return None
except asyncio.TimeoutError:
self.logger.warning(f"超时: {url}")
if attempt < self.retry_count - 1:
await asyncio.sleep(2 ** attempt)
continue
return None
except Exception as e:
self.logger.error(f"请求失败 {url}: {e}")
if attempt < self.retry_count - 1:
await asyncio.sleep(2 ** attempt)
continue
return None
return None
async def scrape_batch(self, urls: List[str]) -> List[dict]:
"""批量爬取URL"""
# 去重
unique_urls = list(set(urls))
# 过滤已访问的URL
not_visited = [url for url in unique_urls if url not in self.visited_urls]
if not not_visited:
return []
# 更新已访问集合
self.visited_urls.update(not_visited)
# 并发执行
tasks = [self.fetch_with_retry(url) for url in not_visited]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 过滤结果
valid_results = []
for result in results:
if isinstance(result, dict) and result.get('content'):
valid_results.append(result)
return valid_results
class RateLimiter:
def __init__(self, max_requests: int, time_window: float):
self.max_requests = max_requests
self.time_window = time_window
self.requests: Deque[float] = deque()
self.lock = asyncio.Lock()
async def acquire(self):
async with self.lock:
now = time.time()
# 清理过期请求
while self.requests and self.requests[0] <= now - self.time_window:
self.requests.popleft()
# 如果达到限制,等待
if len(self.requests) >= self.max_requests:
sleep_time = self.time_window - (now - self.requests[0])
if sleep_time > 0:
await asyncio.sleep(sleep_time)
# 记录当前请求
self.requests.append(now)
# 使用示例
async def optimized_scrape_example():
# 配置日志
logging.basicConfig(level=logging.INFO)
async with OptimizedWebScraper(
base_url="https://example.com",
max_concurrent=5,
delay=0.1,
timeout=30,
retry_count=3
) as scraper:
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/1"
]
start_time = time.time()
results = await scraper.scrape_batch(urls)
end_time = time.time()
print(f"批量爬取完成,耗时: {end_time - start_time:.2f}秒")
print(f"成功获取 {len(results)} 个页面")
#
评论 (0)