引言
在现代Web应用开发中,处理大量并发请求和I/O密集型任务已成为开发者面临的常见挑战。传统的同步编程模型在面对高并发场景时往往显得力不从心,而Python异步编程技术为我们提供了一种更加高效、优雅的解决方案。本文将深入探讨Python异步编程的核心概念,重点介绍asyncio库的使用方法,并结合实际案例演示如何构建高性能的异步网络爬虫和API服务。
什么是异步编程
异步编程的基本概念
异步编程是一种编程范式,它允许程序在等待某些操作完成时继续执行其他任务,而不是阻塞等待。这种模式特别适用于I/O密集型任务,如网络请求、文件读写等操作。
在传统的同步编程中,当一个函数需要等待网络响应时,整个线程都会被阻塞,直到响应返回。而在异步编程中,程序可以在发起请求后立即继续执行其他任务,当响应到达时再处理结果。
异步编程的优势
- 高并发性:能够同时处理大量并发请求
- 资源效率:减少线程创建和上下文切换的开销
- 响应性:应用程序在等待I/O操作时不会阻塞其他任务
- 可扩展性:更容易构建大规模的网络应用
asyncio库详解
asyncio基础概念
asyncio是Python标准库中用于编写异步I/O程序的核心模块。它基于事件循环(Event Loop)的概念,提供了一套完整的异步编程基础设施。
import asyncio
import time
# 基本的异步函数定义
async def hello_world():
print("Hello")
await asyncio.sleep(1) # 模拟异步等待
print("World")
# 运行异步函数
asyncio.run(hello_world())
事件循环(Event Loop)
事件循环是asyncio的核心组件,它负责调度和执行异步任务。每个Python程序只能有一个事件循环实例。
import asyncio
async def main():
# 获取当前事件循环
loop = asyncio.get_event_loop()
# 创建任务
task1 = asyncio.create_task(fetch_data("url1"))
task2 = asyncio.create_task(fetch_data("url2"))
# 等待所有任务完成
results = await asyncio.gather(task1, task2)
return results
async def fetch_data(url):
print(f"Fetching data from {url}")
await asyncio.sleep(1) # 模拟网络请求
return f"Data from {url}"
# 运行主函数
asyncio.run(main())
异步任务管理
asyncio提供了多种方式来管理异步任务:
import asyncio
async def task_with_delay(name, delay):
print(f"Task {name} started")
await asyncio.sleep(delay)
print(f"Task {name} completed")
return f"Result from {name}"
async def manage_tasks():
# 创建多个任务
tasks = [
asyncio.create_task(task_with_delay("A", 2)),
asyncio.create_task(task_with_delay("B", 1)),
asyncio.create_task(task_with_delay("C", 3))
]
# 等待所有任务完成
results = await asyncio.gather(*tasks)
print("All tasks completed:", results)
# 或者使用wait方法
done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
print(f"Done: {len(done)}, Pending: {len(pending)}")
# 运行任务管理示例
asyncio.run(manage_tasks())
并发控制与资源管理
限制并发数量
在处理大量并发请求时,需要合理控制并发数量以避免资源耗尽。Python提供了多种方式来实现并发控制。
import asyncio
import aiohttp
from typing import List
class ConcurrencyController:
def __init__(self, max_concurrent: int = 10):
self.semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_with_limit(self, session: aiohttp.ClientSession, url: str) -> dict:
# 获取信号量
async with self.semaphore:
try:
async with session.get(url) as response:
data = await response.json()
return {
'url': url,
'status': response.status,
'data': data
}
except Exception as e:
return {
'url': url,
'error': str(e)
}
async def fetch_multiple_urls(urls: List[str], max_concurrent: int = 5):
controller = ConcurrencyController(max_concurrent)
async with aiohttp.ClientSession() as session:
tasks = [controller.fetch_with_limit(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
# 使用示例
async def main():
urls = [
'https://jsonplaceholder.typicode.com/posts/1',
'https://jsonplaceholder.typicode.com/posts/2',
'https://jsonplaceholder.typicode.com/posts/3',
# ... 更多URL
]
results = await fetch_multiple_urls(urls, max_concurrent=3)
for result in results:
print(result)
# asyncio.run(main())
任务超时控制
合理的超时设置可以避免长时间等待导致的资源浪费:
import asyncio
import aiohttp
async def fetch_with_timeout(session: aiohttp.ClientSession, url: str, timeout: int = 10) -> dict:
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=timeout)) as response:
data = await response.json()
return {
'url': url,
'status': response.status,
'data': data
}
except asyncio.TimeoutError:
return {
'url': url,
'error': 'Timeout'
}
except Exception as e:
return {
'url': url,
'error': str(e)
}
async def fetch_with_timeout_control():
urls = [
'https://jsonplaceholder.typicode.com/posts/1',
'https://httpbin.org/delay/2', # 这个会超时
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_timeout(session, url, timeout=1) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
# asyncio.run(fetch_with_timeout_control())
异步数据库操作优化
异步数据库连接池
在异步应用中,合理使用数据库连接池可以显著提升性能:
import asyncio
import asyncpg
from typing import List, Dict
class AsyncDatabaseManager:
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.pool = None
async def create_pool(self, min_size: int = 10, max_size: int = 20):
"""创建连接池"""
self.pool = await asyncpg.create_pool(
self.connection_string,
min_size=min_size,
max_size=max_size
)
async def fetch_users(self, limit: int = 100) -> List[Dict]:
"""异步获取用户数据"""
if not self.pool:
raise Exception("Database pool not initialized")
async with self.pool.acquire() as connection:
rows = await connection.fetch('SELECT * FROM users LIMIT $1', limit)
return [dict(row) for row in rows]
async def insert_users(self, users_data: List[Dict]) -> int:
"""批量插入用户数据"""
if not self.pool:
raise Exception("Database pool not initialized")
async with self.pool.acquire() as connection:
# 使用批量插入提高效率
query = """
INSERT INTO users (name, email, created_at)
VALUES ($1, $2, $3)
"""
# 准备数据
data_tuples = [
(user['name'], user['email'], user['created_at'])
for user in users_data
]
# 批量执行
result = await connection.executemany(query, data_tuples)
return len(data_tuples)
# 使用示例
async def database_example():
db_manager = AsyncDatabaseManager('postgresql://user:password@localhost/db')
await db_manager.create_pool()
# 获取数据
users = await db_manager.fetch_users(10)
print(f"Fetched {len(users)} users")
# 插入数据
new_users = [
{'name': f'User{i}', 'email': f'user{i}@example.com', 'created_at': '2023-01-01'}
for i in range(5)
]
inserted_count = await db_manager.insert_users(new_users)
print(f"Inserted {inserted_count} users")
# asyncio.run(database_example())
异步事务处理
在异步环境中正确处理事务同样重要:
import asyncio
import asyncpg
class AsyncTransactionManager:
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.pool = None
async def create_pool(self):
self.pool = await asyncpg.create_pool(self.connection_string)
async def transfer_money(self, from_account: int, to_account: int, amount: float) -> bool:
"""异步转账操作"""
async with self.pool.acquire() as connection:
try:
# 开始事务
async with connection.transaction():
# 检查余额
balance = await connection.fetchval(
'SELECT balance FROM accounts WHERE id = $1',
from_account
)
if balance < amount:
raise ValueError("Insufficient funds")
# 扣款
await connection.execute(
'UPDATE accounts SET balance = balance - $1 WHERE id = $2',
amount, from_account
)
# 加款
await connection.execute(
'UPDATE accounts SET balance = balance + $1 WHERE id = $2',
amount, to_account
)
return True
except Exception as e:
print(f"Transaction failed: {e}")
return False
# asyncio.run(transfer_money_example())
高性能网络爬虫开发
基础异步爬虫实现
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time
from urllib.parse import urljoin, urlparse
import logging
class AsyncWebScraper:
def __init__(self, max_concurrent: int = 10, timeout: int = 30):
self.max_concurrent = max_concurrent
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session = None
# 配置日志
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
async def __aenter__(self):
"""异步上下文管理器入口"""
self.session = aiohttp.ClientSession(
timeout=self.timeout,
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步上下文管理器出口"""
if self.session:
await self.session.close()
async def fetch_page(self, url: str) -> dict:
"""获取单个页面内容"""
async with self.semaphore:
try:
start_time = time.time()
async with self.session.get(url) as response:
content = await response.text()
elapsed_time = time.time() - start_time
return {
'url': url,
'status': response.status,
'content': content,
'elapsed_time': elapsed_time,
'timestamp': time.time()
}
except Exception as e:
self.logger.error(f"Error fetching {url}: {e}")
return {
'url': url,
'error': str(e),
'timestamp': time.time()
}
async def parse_page(self, page_data: dict) -> dict:
"""解析页面内容"""
if 'error' in page_data:
return page_data
try:
soup = BeautifulSoup(page_data['content'], 'html.parser')
# 提取标题
title = soup.find('title')
title_text = title.get_text().strip() if title else ''
# 提取所有链接
links = []
for link in soup.find_all('a', href=True):
absolute_url = urljoin(page_data['url'], link['href'])
links.append({
'text': link.get_text().strip(),
'url': absolute_url
})
return {
**page_data,
'title': title_text,
'links_count': len(links),
'links': links[:10] # 只保留前10个链接
}
except Exception as e:
self.logger.error(f"Error parsing {page_data['url']}: {e}")
return {
**page_data,
'error': f'Parse error: {e}'
}
# 使用示例
async def basic_crawler_example():
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/delay/1',
]
async with AsyncWebScraper(max_concurrent=3) as scraper:
# 并发获取页面
fetch_tasks = [scraper.fetch_page(url) for url in urls]
page_data_list = await asyncio.gather(*fetch_tasks)
# 解析页面
parse_tasks = [scraper.parse_page(data) for data in page_data_list]
parsed_results = await asyncio.gather(*parse_tasks)
for result in parsed_results:
print(f"URL: {result['url']}")
print(f"Title: {result.get('title', 'N/A')}")
print(f"Links: {result.get('links_count', 0)}")
print("-" * 50)
# asyncio.run(basic_crawler_example())
高级爬虫功能实现
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time
from urllib.parse import urljoin, urlparse
from dataclasses import dataclass
from typing import List, Set, Optional
import json
@dataclass
class CrawlResult:
url: str
title: str
content: str
links: List[str]
status_code: int
crawl_time: float
error: Optional[str] = None
class AdvancedAsyncScraper:
def __init__(self,
max_concurrent: int = 10,
timeout: int = 30,
delay: float = 0.1,
respect_robots: bool = True):
self.max_concurrent = max_concurrent
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.delay = delay
self.respect_robots = respect_robots
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session = None
self.visited_urls: Set[str] = set()
self.crawled_count = 0
async def __aenter__(self):
self.session = aiohttp.ClientSession(
timeout=self.timeout,
headers={
'User-Agent': ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/91.0.4472.124 Safari/537.36')
}
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def fetch_page(self, url: str) -> CrawlResult:
"""获取并解析单个页面"""
async with self.semaphore:
try:
# 避免重复爬取
if url in self.visited_urls:
return CrawlResult(
url=url,
title='',
content='',
links=[],
status_code=200,
crawl_time=0,
error='Already visited'
)
self.visited_urls.add(url)
start_time = time.time()
await asyncio.sleep(self.delay) # 避免过于频繁的请求
async with self.session.get(url) as response:
content = await response.text()
elapsed_time = time.time() - start_time
# 解析内容
soup = BeautifulSoup(content, 'html.parser')
# 提取标题
title = soup.find('title')
title_text = title.get_text().strip() if title else ''
# 提取文本内容
text_content = ' '.join([
p.get_text().strip()
for p in soup.find_all('p')
])
# 提取链接
links = []
for link in soup.find_all('a', href=True):
absolute_url = urljoin(url, link['href'])
links.append(absolute_url)
self.crawled_count += 1
return CrawlResult(
url=url,
title=title_text,
content=text_content[:500], # 限制内容长度
links=links,
status_code=response.status,
crawl_time=elapsed_time
)
except Exception as e:
return CrawlResult(
url=url,
title='',
content='',
links=[],
status_code=0,
crawl_time=0,
error=str(e)
)
async def crawl_urls(self, urls: List[str], max_depth: int = 1) -> List[CrawlResult]:
"""爬取多个URL"""
results = []
# 第一层爬取
tasks = [self.fetch_page(url) for url in urls]
batch_results = await asyncio.gather(*tasks)
results.extend(batch_results)
# 如果需要更深的爬取,可以继续提取链接进行递归爬取
if max_depth > 1:
all_links = set()
for result in batch_results:
if not result.error and result.links:
all_links.update(result.links)
# 过滤掉已经访问过的链接
new_urls = [url for url in all_links if url not in self.visited_urls]
if new_urls:
print(f"Found {len(new_urls)} new URLs to crawl")
depth_tasks = [self.fetch_page(url) for url in new_urls[:10]] # 限制数量
depth_results = await asyncio.gather(*depth_tasks)
results.extend(depth_results)
return results
def save_results(self, results: List[CrawlResult], filename: str):
"""保存爬取结果到文件"""
data = []
for result in results:
data.append({
'url': result.url,
'title': result.title,
'content': result.content,
'links_count': len(result.links),
'status_code': result.status_code,
'crawl_time': result.crawl_time,
'error': result.error
})
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
print(f"Saved {len(results)} results to {filename}")
# 使用示例
async def advanced_crawler_example():
urls = [
'https://httpbin.org/html',
'https://httpbin.org/links/10/0',
]
async with AdvancedAsyncScraper(max_concurrent=5, delay=0.5) as scraper:
results = await scraper.crawl_urls(urls, max_depth=1)
# 显示结果
for result in results:
if not result.error:
print(f"✓ {result.url}")
print(f" Title: {result.title[:50]}...")
print(f" Links: {result.links_count}")
print(f" Time: {result.crawl_time:.2f}s")
else:
print(f"✗ {result.url} - Error: {result.error}")
print()
# 保存结果
scraper.save_results(results, 'crawl_results.json')
# asyncio.run(advanced_crawler_example())
性能优化技巧
连接池优化
import asyncio
import aiohttp
from typing import Optional
class OptimizedAsyncClient:
def __init__(self,
max_concurrent: int = 100,
timeout: int = 30,
connection_timeout: int = 5):
self.session: Optional[aiohttp.ClientSession] = None
self.max_concurrent = max_concurrent
self.timeout = timeout
self.connection_timeout = connection_timeout
async def __aenter__(self):
"""创建优化的会话"""
connector = aiohttp.TCPConnector(
limit=self.max_concurrent, # 连接池大小
limit_per_host=30, # 每个主机的最大连接数
ttl_dns_cache=300, # DNS缓存时间
use_dns_cache=True, # 启用DNS缓存
force_close=True, # 强制关闭连接
)
self.session = aiohttp.ClientSession(
connector=connector,
timeout=aiohttp.ClientTimeout(
total=self.timeout,
connect=self.connection_timeout
),
headers={
'User-Agent': ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/91.0.4472.124 Safari/537.36')
}
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""清理资源"""
if self.session:
await self.session.close()
async def fetch(self, url: str) -> dict:
"""优化的单次请求"""
try:
async with self.session.get(url) as response:
content = await response.text()
return {
'url': url,
'status': response.status,
'content': content,
'headers': dict(response.headers)
}
except Exception as e:
return {
'url': url,
'error': str(e),
'status': 0
}
# 性能测试示例
async def performance_test():
urls = [
f'https://httpbin.org/delay/{i%3+1}'
for i in range(50)
]
start_time = time.time()
async with OptimizedAsyncClient(max_concurrent=20) as client:
tasks = [client.fetch(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = time.time()
print(f"Processed {len(results)} requests in {end_time - start_time:.2f} seconds")
缓存机制实现
import asyncio
import aiohttp
import hashlib
import time
from typing import Dict, Optional, Any
class CachedAsyncClient:
def __init__(self,
cache_ttl: int = 300, # 5分钟缓存
max_concurrent: int = 10):
self.cache_ttl = cache_ttl
self.cache: Dict[str, tuple] = {}
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
def _get_cache_key(self, url: str, params: Optional[Dict] = None) -> str:
"""生成缓存键"""
key_string = f"{url}:{params}" if params else url
return hashlib.md5(key_string.encode()).hexdigest()
def _is_expired(self, timestamp: float) -> bool:
"""检查缓存是否过期"""
return time.time() - timestamp > self.cache_ttl
async def fetch(self, url: str, params: Optional[Dict] = None) -> dict:
"""带缓存的异步请求"""
cache_key = self._get_cache_key(url, params)
# 检查缓存
if cache_key in self.cache:
cached_data, timestamp = self.cache[cache_key]
if not self._is_expired(timestamp):
print(f"Cache hit for {url}")
return cached_data
# 缓存未命中,发起请求
async with self.semaphore:
try:
print(f"Fetching {url}")
async with self.session.get(url, params=params) as response:
content = await response.text()
result = {
'url': url,
'status': response.status,
'content': content,
'cached': False
}
# 缓存结果
self.cache[cache_key] = (result, time.time())
return result
except Exception as e:
error_result = {
'url': url,
'error': str(e),
'cached': False
}
return error_result
def clear_cache(self):
"""清空缓存"""
self.cache.clear()
# 使用示例
async def cached_crawler_example():
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1', # 相同URL,应该使用缓存
]
async with CachedAsyncClient(cache_ttl=60) as client:
results = []
for url in urls:
result = await client.fetch(url)
results.append(result)
print("Results:", len(results))
for result in results:
print(f"URL: {result['url']}, Status: {result.get('status', 'Error')}")
错误处理与监控
完善的错误处理机制
import asyncio
import aiohttp
import logging
from typing import List, Dict, Any
import time
class RobustAsyncClient:
def __init__(self,
max_retries: int = 3,
backoff_factor: float = 1.0,
timeout: int = 30):
self.max_retries = max_retries
self.backoff_factor = backoff_factor
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.session = None
self.logger = logging.getLogger(__name__)
async def __aenter__(self):
self.session = aiohttp.ClientSession(timeout=self.timeout)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def fetch_with_retry(self,
url: str,
method: str = 'GET',
**kwargs) -> Dict[str, Any]:
"""带重试机制的异步请求"""
last_exception = None

评论 (0)