引言
在现代Web应用开发中,处理高并发请求已成为一项核心技能。Python作为一门广泛使用的编程语言,在异步编程领域有着强大的支持,特别是通过asyncio库提供的异步I/O框架。本文将深入探讨Python异步编程的核心技术,包括asyncio事件循环、并发任务管理、异步数据库操作等,并结合实际案例展示如何构建高效稳定的异步应用架构。
什么是异步编程
异步编程是一种编程范式,允许程序在等待某些操作完成时执行其他任务,而不是阻塞等待。传统的同步编程模型中,当一个函数需要等待网络请求、文件读写或数据库查询等I/O操作完成时,整个线程会被阻塞,直到操作结束。而在异步编程中,程序可以在发起I/O请求后立即返回控制权给事件循环,继续执行其他任务。
异步编程的优势
- 高并发处理能力:单个线程可以同时处理数千个并发连接
- 资源效率:避免了创建大量线程带来的内存开销和上下文切换成本
- 响应性提升:应用能够快速响应用户请求,提供更好的用户体验
- 性能优化:充分利用CPU和I/O资源,减少等待时间
asyncio基础概念与事件循环
什么是asyncio
asyncio是Python标准库中用于编写异步I/O应用程序的框架。它提供了事件循环、协程、任务和未来对象等核心组件,使得编写高效的异步代码成为可能。
import asyncio
async def hello_world():
print("Hello")
await asyncio.sleep(1)
print("World")
# 运行异步函数
asyncio.run(hello_world())
事件循环机制
事件循环是asyncio的核心,它负责调度和执行协程。在Python中,每个进程只有一个事件循环,但可以有多个任务(Task)在同一个事件循环中并发执行。
import asyncio
import time
async def fetch_data(url):
print(f"开始获取 {url}")
await asyncio.sleep(1) # 模拟网络请求
print(f"完成获取 {url}")
return f"数据来自 {url}"
async def main():
start_time = time.time()
# 创建多个任务
tasks = [
fetch_data("https://api1.com"),
fetch_data("https://api2.com"),
fetch_data("https://api3.com")
]
# 并发执行所有任务
results = await asyncio.gather(*tasks)
end_time = time.time()
print(f"总耗时: {end_time - start_time:.2f}秒")
print(results)
# 运行主函数
asyncio.run(main())
协程与异步函数
协程的基本概念
协程是异步编程的核心概念,它是一种可以暂停执行并在稍后恢复的函数。在Python中,使用async def定义协程函数,使用await关键字来暂停和恢复协程的执行。
import asyncio
async def simple_coroutine():
print("协程开始")
await asyncio.sleep(1)
print("协程结束")
return "完成"
async def main():
result = await simple_coroutine()
print(result)
asyncio.run(main())
协程的执行控制
import asyncio
import time
async def task_with_delay(name, delay):
print(f"任务 {name} 开始")
await asyncio.sleep(delay)
print(f"任务 {name} 完成")
return f"结果来自 {name}"
async def main():
# 创建多个协程
coroutines = [
task_with_delay("A", 1),
task_with_delay("B", 2),
task_with_delay("C", 0.5)
]
# 使用 asyncio.wait() 等待所有任务完成
done, pending = await asyncio.wait(coroutines, return_when=asyncio.ALL_COMPLETED)
for task in done:
print(f"结果: {task.result()}")
asyncio.run(main())
并发任务管理
任务创建与管理
在asyncio中,任务(Task)是协程的包装器,它允许我们更好地控制和管理异步操作。
import asyncio
import aiohttp
import time
async def fetch_url(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/delay/1'
]
# 创建会话
async with aiohttp.ClientSession() as session:
# 方法1: 使用 asyncio.create_task()
tasks = [asyncio.create_task(fetch_url(session, url)) for url in urls]
results = await asyncio.gather(*tasks)
print(f"获取到 {len(results)} 个响应")
return results
# 运行示例
# asyncio.run(main())
任务取消与超时处理
import asyncio
async def long_running_task():
try:
print("任务开始")
await asyncio.sleep(10)
print("任务完成")
return "成功"
except asyncio.CancelledError:
print("任务被取消")
raise
async def main():
# 创建任务
task = asyncio.create_task(long_running_task())
try:
# 设置超时
result = await asyncio.wait_for(task, timeout=3.0)
print(f"结果: {result}")
except asyncio.TimeoutError:
print("任务超时,正在取消...")
task.cancel()
try:
await task
except asyncio.CancelledError:
print("任务已成功取消")
# asyncio.run(main())
异步数据库操作
使用异步数据库驱动
异步数据库操作是提高应用性能的重要手段。以下示例展示如何使用aiomysql和asyncpg进行异步数据库操作。
import asyncio
import aiomysql
import asyncpg
# 异步MySQL操作示例
async def mysql_example():
try:
# 创建连接池
pool = await aiomysql.create_pool(
host='localhost',
port=3306,
user='root',
password='password',
db='testdb',
autocommit=True
)
async with pool.acquire() as conn:
cursor = await conn.cursor()
# 创建表
await cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100)
)
""")
# 插入数据
await cursor.execute(
"INSERT INTO users (name, email) VALUES (%s, %s)",
("Alice", "alice@example.com")
)
# 查询数据
await cursor.execute("SELECT * FROM users")
result = await cursor.fetchall()
print("查询结果:", result)
pool.close()
await pool.wait_closed()
except Exception as e:
print(f"数据库错误: {e}")
# 异步PostgreSQL操作示例
async def postgresql_example():
try:
# 连接数据库
conn = await asyncpg.connect(
host='localhost',
port=5432,
user='postgres',
password='password',
database='testdb'
)
# 创建表
await conn.execute("""
CREATE TABLE IF NOT EXISTS products (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
price DECIMAL(10, 2)
)
""")
# 插入数据
await conn.execute(
"INSERT INTO products (name, price) VALUES ($1, $2)",
"笔记本电脑", 5999.99
)
# 查询数据
rows = await conn.fetch("SELECT * FROM products")
for row in rows:
print(f"产品: {row['name']}, 价格: {row['price']}")
await conn.close()
except Exception as e:
print(f"数据库错误: {e}")
# asyncio.run(postgresql_example())
高性能网络爬虫构建
基础异步爬虫实现
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time
from urllib.parse import urljoin, urlparse
class AsyncWebCrawler:
def __init__(self, max_concurrent=10, timeout=30):
self.max_concurrent = max_concurrent
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_page(self, session, url):
"""获取单个页面"""
async with self.semaphore: # 控制并发数
try:
async with session.get(url, timeout=self.timeout) as response:
if response.status == 200:
content = await response.text()
return {
'url': url,
'status': response.status,
'content': content,
'timestamp': time.time()
}
else:
print(f"HTTP {response.status} for {url}")
return None
except Exception as e:
print(f"请求失败 {url}: {e}")
return None
async def parse_links(self, content, base_url):
"""解析页面中的链接"""
soup = BeautifulSoup(content, 'html.parser')
links = []
for link in soup.find_all('a', href=True):
absolute_url = urljoin(base_url, link['href'])
if self.is_valid_url(absolute_url):
links.append(absolute_url)
return links
def is_valid_url(self, url):
"""验证URL是否有效"""
try:
parsed = urlparse(url)
return bool(parsed.netloc) and bool(parsed.scheme)
except Exception:
return False
async def crawl_urls(self, urls):
"""并发爬取多个URL"""
async with aiohttp.ClientSession(timeout=self.timeout) as session:
tasks = [self.fetch_page(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in results if r is not None]
# 使用示例
async def main():
crawler = AsyncWebCrawler(max_concurrent=5)
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/3'
]
start_time = time.time()
results = await crawler.crawl_urls(urls)
end_time = time.time()
print(f"爬取完成,耗时: {end_time - start_time:.2f}秒")
print(f"成功获取 {len(results)} 个页面")
# asyncio.run(main())
高级爬虫功能实现
import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time
import json
from collections import defaultdict
from dataclasses import dataclass
from typing import List, Dict, Optional
@dataclass
class CrawlResult:
url: str
status_code: int
title: str
content_length: int
crawl_time: float
links: List[str]
error: Optional[str] = None
class AdvancedCrawler:
def __init__(self, max_concurrent=10, timeout=30, rate_limit=1):
self.max_concurrent = max_concurrent
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.rate_limit = rate_limit # 每秒请求数限制
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session = None
self.request_count = 0
self.last_request_time = 0
async def _rate_limited_request(self):
"""限速请求"""
current_time = time.time()
if current_time - self.last_request_time < 1.0 / self.rate_limit:
sleep_time = 1.0 / self.rate_limit - (current_time - self.last_request_time)
await asyncio.sleep(sleep_time)
self.last_request_time = time.time()
async def fetch_page(self, url: str) -> CrawlResult:
"""获取单个页面并解析"""
await self._rate_limited_request()
async with self.semaphore:
try:
if not self.session:
self.session = aiohttp.ClientSession(timeout=self.timeout)
start_time = time.time()
async with self.session.get(url) as response:
content = await response.text()
crawl_time = time.time() - start_time
# 解析页面
soup = BeautifulSoup(content, 'html.parser')
title = soup.title.string if soup.title else "无标题"
# 提取链接
links = []
for link in soup.find_all('a', href=True):
links.append(link['href'])
return CrawlResult(
url=url,
status_code=response.status,
title=title,
content_length=len(content),
crawl_time=crawl_time,
links=links[:10] # 只保存前10个链接
)
except Exception as e:
return CrawlResult(
url=url,
status_code=0,
title="",
content_length=0,
crawl_time=0,
links=[],
error=str(e)
)
async def crawl_urls(self, urls: List[str]) -> List[CrawlResult]:
"""并发爬取多个URL"""
results = []
tasks = [self.fetch_page(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 过滤掉异常结果
valid_results = [r for r in results if not isinstance(r, Exception)]
return valid_results
async def crawl_with_depth(self, start_urls: List[str], max_depth: int = 2) -> Dict:
"""深度爬取"""
all_results = []
current_urls = set(start_urls)
for depth in range(max_depth):
if not current_urls:
break
print(f"开始第 {depth + 1} 层爬取,共 {len(current_urls)} 个URL")
# 爬取当前层的URL
results = await self.crawl_urls(list(current_urls))
all_results.extend(results)
# 提取下一层要爬取的URL
next_urls = set()
for result in results:
if result.links:
for link in result.links[:5]: # 限制每个页面提取5个链接
next_urls.add(link)
current_urls = next_urls
return {
'results': all_results,
'total_pages': len(all_results),
'crawl_time': time.time()
}
async def save_results(self, results: List[CrawlResult], filename: str):
"""保存结果到文件"""
data = []
for result in results:
data.append({
'url': result.url,
'status_code': result.status_code,
'title': result.title,
'content_length': result.content_length,
'crawl_time': result.crawl_time,
'error': result.error
})
with open(filename, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
async def close(self):
"""关闭会话"""
if self.session:
await self.session.close()
# 使用示例
async def advanced_crawler_example():
crawler = AdvancedCrawler(max_concurrent=5, rate_limit=2)
# 测试URL列表
test_urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/delay/1'
]
try:
print("开始高级爬取...")
results = await crawler.crawl_urls(test_urls)
# 显示结果
for result in results:
if not result.error:
print(f"URL: {result.url}")
print(f"标题: {result.title}")
print(f"状态码: {result.status_code}")
print(f"内容长度: {result.content_length}")
print(f"爬取时间: {result.crawl_time:.2f}秒")
print("-" * 50)
else:
print(f"错误URL: {result.url}, 错误: {result.error}")
# 保存结果
await crawler.save_results(results, 'crawl_results.json')
print("结果已保存到 crawl_results.json")
finally:
await crawler.close()
# asyncio.run(advanced_crawler_example())
性能优化与最佳实践
并发控制与资源管理
import asyncio
import aiohttp
from typing import Optional
import time
class OptimizedCrawler:
def __init__(self, max_concurrent: int = 10, timeout: int = 30):
self.max_concurrent = max_concurrent
self.timeout = timeout
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
"""异步上下文管理器入口"""
if not self.session:
self.session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=self.timeout),
connector=aiohttp.TCPConnector(
limit=100, # 连接池大小
limit_per_host=30, # 每个主机的连接限制
ttl_dns_cache=300, # DNS缓存时间
use_dns_cache=True
)
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步上下文管理器出口"""
if self.session:
await self.session.close()
async def fetch_with_retry(self, url: str, max_retries: int = 3) -> Optional[dict]:
"""带重试机制的请求"""
for attempt in range(max_retries):
try:
async with self.semaphore:
async with self.session.get(url) as response:
if response.status == 200:
content = await response.text()
return {
'url': url,
'status': response.status,
'content': content,
'attempt': attempt + 1
}
else:
print(f"HTTP {response.status} for {url}")
if response.status >= 500: # 服务器错误,重试
continue
return None
except Exception as e:
print(f"请求失败 {url} (尝试 {attempt + 1}): {e}")
if attempt < max_retries - 1: # 不是最后一次尝试,等待后重试
await asyncio.sleep(2 ** attempt)
continue
return None
async def fetch_batch(self, urls: list) -> list:
"""批量获取URL"""
tasks = [self.fetch_with_retry(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in results if r is not None and not isinstance(r, Exception)]
# 使用示例
async def optimized_crawler_example():
async with OptimizedCrawler(max_concurrent=5, timeout=10) as crawler:
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/status/500', # 模拟服务器错误
'https://httpbin.org/delay/2'
]
start_time = time.time()
results = await crawler.fetch_batch(urls)
end_time = time.time()
print(f"批量获取完成,耗时: {end_time - start_time:.2f}秒")
print(f"成功获取 {len(results)} 个页面")
# asyncio.run(optimized_crawler_example())
监控与错误处理
import asyncio
import aiohttp
import time
from collections import defaultdict
from typing import Dict, List
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MonitoredCrawler:
def __init__(self):
self.stats = defaultdict(int)
self.errors = []
self.session = None
async def fetch_with_monitoring(self, session: aiohttp.ClientSession, url: str) -> Dict:
"""带监控的请求"""
start_time = time.time()
try:
async with session.get(url) as response:
content_length = len(await response.text())
response_time = time.time() - start_time
# 统计信息
self.stats['total_requests'] += 1
self.stats['total_response_time'] += response_time
if response.status == 200:
self.stats['successful_requests'] += 1
else:
self.stats['failed_requests'] += 1
self.errors.append({
'url': url,
'status': response.status,
'error_time': time.time()
})
logger.info(f"URL: {url}, Status: {response.status}, Time: {response_time:.2f}s")
return {
'url': url,
'status': response.status,
'content_length': content_length,
'response_time': response_time,
'timestamp': time.time()
}
except Exception as e:
error_time = time.time()
self.stats['failed_requests'] += 1
self.errors.append({
'url': url,
'error': str(e),
'error_time': error_time
})
logger.error(f"请求失败 {url}: {e}")
return {
'url': url,
'status': -1,
'error': str(e),
'timestamp': error_time
}
async def crawl_with_monitoring(self, urls: List[str]) -> Dict:
"""监控爬取"""
if not self.session:
self.session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30)
)
try:
tasks = [self.fetch_with_monitoring(self.session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 过滤异常结果
valid_results = [r for r in results if not isinstance(r, Exception)]
# 计算平均响应时间
avg_response_time = (
self.stats['total_response_time'] / self.stats['successful_requests']
if self.stats['successful_requests'] > 0 else 0
)
return {
'results': valid_results,
'stats': dict(self.stats),
'avg_response_time': avg_response_time,
'errors': self.errors,
'total_time': time.time()
}
finally:
if self.session:
await self.session.close()
def print_report(self, report: Dict):
"""打印爬取报告"""
stats = report['stats']
print("\n=== 爬取报告 ===")
print(f"总请求数: {stats['total_requests']}")
print(f"成功请求数: {stats['successful_requests']}")
print(f"失败请求数: {stats['failed_requests']}")
print(f"平均响应时间: {report['avg_response_time']:.2f}秒")
print(f"总耗时: {time.time() - report['total_time']:.2f}秒")
if self.errors:
print(f"\n错误详情 ({len(self.errors)}个):")
for error in self.errors[:5]: # 只显示前5个错误
print(f" URL: {error.get('url', 'Unknown')}")
print(f" 错误: {error.get('error', 'Unknown')}")
print()
# 使用示例
async def monitored_crawler_example():
crawler = MonitoredCrawler()
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/status/200',
'https://httpbin.org/delay/2',
'https://httpbin.org/status/500'
]
try:
report = await crawler.crawl_with_monitoring(urls)
crawler.print_report(report)
print(f"成功获取 {len(report['results'])} 个页面")
except Exception as e:
logger.error(f"爬取过程中发生错误: {e}")
# asyncio.run(monitored_crawler_example())
高级异步模式与设计模式
异步工厂模式
import asyncio
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
class AsyncService(ABC):
"""异步服务抽象基类"""
@abstractmethod
async def initialize(self):
pass
@abstractmethod
async def execute(self, data: Any) -> Any:
pass
@abstractmethod
async def cleanup(self):
pass
class AsyncDatabaseService(AsyncService):
"""异步数据库服务"""
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.connection = None
async def initialize(self):
# 模拟异步连接建立
await asyncio.sleep(0.1)
print(f"数据库服务已初始化: {self.connection_string}")
async def execute(self, data: Dict) -> Dict:
# 模拟异步数据库操作
await asyncio.sleep(0.2)
return {
'result': f"数据库操作完成,数据: {data}",
'timestamp': time.time()
}
async def cleanup(self):
print("数据库服务已清理")
class AsyncCacheService(AsyncService):
"""异步缓存服务"""
def __init__(self, cache_size: int = 100):
self.cache_size = cache_size
self.cache = {}
async def initialize(self):
# 模拟异步初始化
await asyncio.sleep(0.1)
print(f"缓存服务已初始化,大小: {self.cache_size}")
async def execute(self, data: Dict) -> Dict:
# 模拟异步缓存操作
await asyncio.sleep(0.1)
key = str(data.get('key', 'default'))
self.cache[key] = data.get('value', '')
return {
'result': f"缓存操作完成,键: {key}",
'timestamp': time.time()
}
async def cleanup(self):
print("缓存服务已清理")
class AsyncServiceFactory:
"""异步服务工厂"""
@staticmethod
async def create_service(service_type: str, **kwargs) -> AsyncService:
"""创建异步服务实例"""
if service_type == 'database':
service = AsyncDatabaseService(kwargs.get('connection_string', ''))
await service.initialize()
return service
elif service_type == 'cache':
service = AsyncCacheService(kwargs.get('cache_size', 100))
await service.initialize()
return service
else:
raise
评论 (0)