引言
在现代Python开发中,异步编程已经成为处理IO密集型任务的核心技术。无论是网络爬虫、API服务还是实时数据处理,异步编程都能显著提升应用性能和资源利用率。本文将深入探讨Python异步编程的精髓,从基础概念到高级实践,系统性地介绍如何利用asyncio构建高性能的网络爬虫和API服务。
什么是异步编程
异步编程的核心概念
异步编程是一种编程范式,允许程序在等待某些操作完成时执行其他任务,而不是阻塞等待。在传统的同步编程中,当一个函数需要等待IO操作完成时(如网络请求、文件读写),整个线程都会被阻塞,直到操作完成。
相比之下,异步编程通过协程(coroutines)和事件循环(event loop)机制,使得程序可以在等待IO操作的同时执行其他任务。这种非阻塞的特性极大地提高了程序的并发处理能力。
异步编程的优势
- 高并发性:能够同时处理大量并发连接
- 资源效率:减少线程切换开销,降低内存占用
- 响应性:应用程序不会因为单个慢操作而阻塞整个系统
- 可扩展性:更容易构建大规模的IO密集型应用
asyncio模块详解
asyncio基础概念
asyncio是Python标准库中用于编写异步代码的核心模块。它提供了事件循环、协程、任务和未来对象等核心组件。
import asyncio
import time
# 基本的异步函数定义
async def hello_world():
print("Hello")
await asyncio.sleep(1) # 模拟异步操作
print("World")
# 运行异步函数
asyncio.run(hello_world())
事件循环(Event Loop)
事件循环是异步编程的核心,它负责调度和执行协程。Python的asyncio模块自动管理事件循环,但在某些情况下,我们可能需要手动控制。
import asyncio
# 获取当前事件循环
loop = asyncio.get_event_loop()
# 或者创建新的事件循环
new_loop = asyncio.new_event_loop()
asyncio.set_event_loop(new_loop)
# 运行任务
async def main():
await asyncio.sleep(1)
print("Hello from event loop")
# 运行在当前事件循环中
loop.run_until_complete(main())
协程(Coroutines)
协程是异步编程的基础构建块。它们是特殊的函数,可以暂停执行并在稍后恢复。
import asyncio
async def fetch_data(url):
print(f"Starting to fetch data from {url}")
await asyncio.sleep(2) # 模拟网络延迟
print(f"Finished fetching data from {url}")
return f"Data from {url}"
async def main():
# 创建多个协程任务
tasks = [
fetch_data("http://api1.com"),
fetch_data("http://api2.com"),
fetch_data("http://api3.com")
]
# 并发执行所有任务
results = await asyncio.gather(*tasks)
print(results)
# 运行主函数
asyncio.run(main())
异步编程基础实践
定义和运行异步函数
import asyncio
# 基本异步函数定义
async def simple_async_function():
print("Starting async function")
await asyncio.sleep(1)
print("Async function completed")
return "Result"
# 异步函数的几种调用方式
async def main():
# 方式1:直接await
result = await simple_async_function()
# 方式2:创建任务并等待
task = asyncio.create_task(simple_async_function())
result = await task
print(result)
# 运行
asyncio.run(main())
任务和未来对象
在异步编程中,Task是Future的子类,用于管理协程的执行。
import asyncio
async def long_running_task(name, delay):
print(f"Task {name} started")
await asyncio.sleep(delay)
print(f"Task {name} completed after {delay} seconds")
return f"Result from {name}"
async def main():
# 创建任务
task1 = asyncio.create_task(long_running_task("A", 2))
task2 = asyncio.create_task(long_running_task("B", 1))
task3 = asyncio.create_task(long_running_task("C", 3))
# 等待所有任务完成
results = await asyncio.gather(task1, task2, task3)
print("All tasks completed:", results)
asyncio.run(main())
高性能网络爬虫构建
异步HTTP请求处理
构建高性能爬虫的核心是使用异步HTTP客户端来减少等待时间。
import asyncio
import aiohttp
import time
async def fetch_url(session, url):
"""异步获取单个URL的内容"""
try:
async with session.get(url) as response:
if response.status == 200:
content = await response.text()
return {
'url': url,
'status': response.status,
'content_length': len(content)
}
else:
return {
'url': url,
'status': response.status,
'error': f'HTTP {response.status}'
}
except Exception as e:
return {
'url': url,
'error': str(e)
}
async def fetch_multiple_urls(urls, max_concurrent=10):
"""并发获取多个URL"""
# 创建连接池
connector = aiohttp.TCPConnector(limit=max_concurrent)
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(
connector=connector,
timeout=timeout
) as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
# 使用示例
async def main():
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/3',
'https://httpbin.org/delay/1'
]
start_time = time.time()
results = await fetch_multiple_urls(urls, max_concurrent=5)
end_time = time.time()
print(f"Processed {len(urls)} URLs in {end_time - start_time:.2f} seconds")
for result in results:
if isinstance(result, dict):
print(f"URL: {result['url']}, Status: {result.get('status', 'N/A')}")
# asyncio.run(main())
高级爬虫架构设计
import asyncio
import aiohttp
from typing import List, Dict, Optional
import time
from dataclasses import dataclass
from urllib.parse import urljoin, urlparse
@dataclass
class CrawlResult:
url: str
status_code: int
content_length: int
response_time: float
error: Optional[str] = None
class AsyncCrawler:
def __init__(self, max_concurrent: int = 10, timeout: int = 30):
self.max_concurrent = max_concurrent
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.session = None
async def __aenter__(self):
connector = aiohttp.TCPConnector(
limit=self.max_concurrent,
limit_per_host=10,
ttl_dns_cache=300,
use_dns_cache=True
)
self.session = aiohttp.ClientSession(
connector=connector,
timeout=self.timeout,
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def crawl_single(self, url: str) -> CrawlResult:
"""爬取单个URL"""
start_time = time.time()
try:
async with self.session.get(url) as response:
content_length = len(await response.text()) if response.content else 0
response_time = time.time() - start_time
return CrawlResult(
url=url,
status_code=response.status,
content_length=content_length,
response_time=response_time
)
except Exception as e:
response_time = time.time() - start_time
return CrawlResult(
url=url,
status_code=0,
content_length=0,
response_time=response_time,
error=str(e)
)
async def crawl_batch(self, urls: List[str]) -> List[CrawlResult]:
"""批量爬取URL"""
tasks = [self.crawl_single(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理异常情况
processed_results = []
for result in results:
if isinstance(result, CrawlResult):
processed_results.append(result)
else:
print(f"Error processing request: {result}")
return processed_results
# 使用示例
async def demo_crawler():
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/status/200',
'https://httpbin.org/status/404',
'https://httpbin.org/delay/1'
]
async with AsyncCrawler(max_concurrent=3) as crawler:
results = await crawler.crawl_batch(urls)
for result in results:
if result.error:
print(f"Error crawling {result.url}: {result.error}")
else:
print(f"Crawled {result.url} - Status: {result.status_code}, "
f"Time: {result.response_time:.2f}s")
# asyncio.run(demo_crawler())
爬虫监控和错误处理
import asyncio
import aiohttp
import time
from collections import defaultdict
from typing import Dict, List, Tuple
import logging
class AdvancedCrawler:
def __init__(self, max_concurrent: int = 10, timeout: int = 30):
self.max_concurrent = max_concurrent
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.session = None
self.stats = defaultdict(int)
self.errors = []
# 配置日志
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
async def __aenter__(self):
connector = aiohttp.TCPConnector(
limit=self.max_concurrent,
limit_per_host=5,
ttl_dns_cache=300,
use_dns_cache=True,
ssl=False # 生产环境中应该设置为True
)
self.session = aiohttp.ClientSession(
connector=connector,
timeout=self.timeout,
headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def crawl_with_retry(self, url: str, max_retries: int = 3) -> Dict:
"""带重试机制的爬取"""
for attempt in range(max_retries):
try:
start_time = time.time()
async with self.session.get(url) as response:
content_length = len(await response.text()) if response.content else 0
response_time = time.time() - start_time
# 统计状态码
self.stats[f'status_{response.status}'] += 1
return {
'url': url,
'status_code': response.status,
'content_length': content_length,
'response_time': response_time,
'attempt': attempt + 1,
'success': True,
'error': None
}
except Exception as e:
self.stats['errors'] += 1
self.errors.append(f"{url}: {str(e)}")
if attempt < max_retries - 1:
# 指数退避
await asyncio.sleep(2 ** attempt)
continue
else:
return {
'url': url,
'status_code': 0,
'content_length': 0,
'response_time': time.time() - start_time,
'attempt': attempt + 1,
'success': False,
'error': str(e)
}
async def crawl_with_rate_limiting(self, urls: List[str], delay: float = 0.1) -> List[Dict]:
"""带速率限制的批量爬取"""
tasks = []
for i, url in enumerate(urls):
# 添加延迟以避免过于频繁的请求
if i > 0:
await asyncio.sleep(delay)
task = self.crawl_with_retry(url)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in results if isinstance(r, dict)]
def get_stats(self) -> Dict:
"""获取爬虫统计信息"""
return dict(self.stats)
def print_report(self):
"""打印爬取报告"""
stats = self.get_stats()
total_requests = sum(stats.values())
print(f"\n=== Crawler Report ===")
print(f"Total requests: {total_requests}")
print(f"Total errors: {stats.get('errors', 0)}")
for key, value in stats.items():
if not key.startswith('status_'):
continue
status_code = key.replace('status_', '')
print(f"Status {status_code}: {value} requests")
# 使用示例
async def advanced_crawler_demo():
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/status/200',
'https://httpbin.org/status/404',
'https://httpbin.org/delay/2',
'https://httpbin.org/status/500'
]
async with AdvancedCrawler(max_concurrent=3) as crawler:
results = await crawler.crawl_with_rate_limiting(urls, delay=0.5)
# 打印结果
for result in results:
if result['success']:
print(f"✓ {result['url']} - Status: {result['status_code']}, "
f"Time: {result['response_time']:.2f}s")
else:
print(f"✗ {result['url']} - Error: {result['error']}")
# 打印报告
crawler.print_report()
# asyncio.run(advanced_crawler_demo())
异步API服务构建
基础异步Web服务器
import asyncio
import aiohttp
from aiohttp import web
import json
from datetime import datetime
class AsyncAPIServer:
def __init__(self):
self.app = web.Application()
self.setup_routes()
def setup_routes(self):
"""设置路由"""
self.app.router.add_get('/', self.home_handler)
self.app.router.add_get('/health', self.health_handler)
self.app.router.add_get('/api/data/{id}', self.data_handler)
self.app.router.add_post('/api/data', self.create_data_handler)
async def home_handler(self, request):
"""主页处理器"""
return web.json_response({
'message': 'Welcome to Async API Server',
'timestamp': datetime.now().isoformat()
})
async def health_handler(self, request):
"""健康检查处理器"""
return web.json_response({
'status': 'healthy',
'timestamp': datetime.now().isoformat(),
'service': 'async-api-server'
})
async def data_handler(self, request):
"""数据获取处理器"""
data_id = request.match_info['id']
# 模拟异步数据库查询
await asyncio.sleep(0.1)
return web.json_response({
'id': data_id,
'data': f'Some data for ID {data_id}',
'timestamp': datetime.now().isoformat()
})
async def create_data_handler(self, request):
"""数据创建处理器"""
try:
data = await request.json()
# 模拟异步处理
await asyncio.sleep(0.2)
return web.json_response({
'message': 'Data created successfully',
'id': f'data_{datetime.now().timestamp()}',
'data': data,
'timestamp': datetime.now().isoformat()
})
except Exception as e:
return web.json_response({
'error': str(e)
}, status=400)
def run(self, host='localhost', port=8080):
"""运行服务器"""
web.run_app(self.app, host=host, port=port)
# 启动示例
# server = AsyncAPIServer()
# server.run()
异步数据库操作
import asyncio
import asyncpg
from typing import List, Dict, Optional
import time
class AsyncDatabaseManager:
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.pool = None
async def __aenter__(self):
self.pool = await asyncpg.create_pool(self.connection_string)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.pool:
await self.pool.close()
async def fetch_user(self, user_id: int) -> Optional[Dict]:
"""获取单个用户"""
async with self.pool.acquire() as connection:
query = """
SELECT id, username, email, created_at
FROM users
WHERE id = $1
"""
row = await connection.fetchrow(query, user_id)
return dict(row) if row else None
async def fetch_users_batch(self, user_ids: List[int]) -> List[Dict]:
"""批量获取用户"""
async with self.pool.acquire() as connection:
# 使用IN查询优化
query = """
SELECT id, username, email, created_at
FROM users
WHERE id = ANY($1)
"""
rows = await connection.fetch(query, user_ids)
return [dict(row) for row in rows]
async def create_user(self, username: str, email: str) -> Dict:
"""创建用户"""
async with self.pool.acquire() as connection:
query = """
INSERT INTO users (username, email, created_at)
VALUES ($1, $2, NOW())
RETURNING id, username, email, created_at
"""
row = await connection.fetchrow(query, username, email)
return dict(row)
async def batch_create_users(self, users_data: List[Dict]) -> List[Dict]:
"""批量创建用户"""
results = []
async with self.pool.acquire() as connection:
for user_data in users_data:
query = """
INSERT INTO users (username, email, created_at)
VALUES ($1, $2, NOW())
RETURNING id, username, email, created_at
"""
row = await connection.fetchrow(
query,
user_data['username'],
user_data['email']
)
results.append(dict(row))
return results
# 使用示例
async def db_example():
# 假设数据库连接字符串
db_string = "postgresql://user:password@localhost:5432/mydb"
async with AsyncDatabaseManager(db_string) as db:
# 创建用户
user = await db.create_user("john_doe", "john@example.com")
print(f"Created user: {user}")
# 批量创建用户
users_data = [
{"username": "alice", "email": "alice@example.com"},
{"username": "bob", "email": "bob@example.com"},
{"username": "charlie", "email": "charlie@example.com"}
]
results = await db.batch_create_users(users_data)
print(f"Created {len(results)} users")
# asyncio.run(db_example())
性能优化技巧
连接池管理
import asyncio
import aiohttp
from typing import Optional
class OptimizedHTTPClient:
def __init__(self, max_connections: int = 100, timeout: int = 30):
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.connector = aiohttp.TCPConnector(
limit=max_connections,
limit_per_host=20,
ttl_dns_cache=300,
use_dns_cache=True,
ssl=False
)
self.session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(
connector=self.connector,
timeout=self.timeout,
headers={
'User-Agent': 'Mozilla/5.0 (compatible; AsyncCrawler/1.0)',
'Accept': 'application/json',
'Connection': 'keep-alive'
}
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def get(self, url: str, **kwargs) -> aiohttp.ClientResponse:
"""GET请求"""
return await self.session.get(url, **kwargs)
async def post(self, url: str, **kwargs) -> aiohttp.ClientResponse:
"""POST请求"""
return await self.session.post(url, **kwargs)
async def fetch_many(self, urls: List[str], max_concurrent: int = 10) -> List[Dict]:
"""并发获取多个URL"""
semaphore = asyncio.Semaphore(max_concurrent)
async def fetch_with_semaphore(url):
async with semaphore:
try:
async with self.session.get(url) as response:
content = await response.text()
return {
'url': url,
'status': response.status,
'content_length': len(content),
'success': True
}
except Exception as e:
return {
'url': url,
'error': str(e),
'success': False
}
tasks = [fetch_with_semaphore(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理异常结果
processed_results = []
for result in results:
if isinstance(result, dict):
processed_results.append(result)
else:
print(f"Error in fetch: {result}")
return processed_results
# 使用示例
async def optimized_client_example():
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/status/200',
'https://httpbin.org/delay/1'
]
async with OptimizedHTTPClient(max_connections=50) as client:
results = await client.fetch_many(urls, max_concurrent=5)
for result in results:
if result['success']:
print(f"✓ {result['url']} - Status: {result['status']}")
else:
print(f"✗ {result['url']} - Error: {result.get('error', 'Unknown error')}")
# asyncio.run(optimized_client_example())
缓存和中间件
import asyncio
import aiohttp
from typing import Dict, Any, Optional
import hashlib
import time
class CachingHTTPClient:
def __init__(self, max_cache_size: int = 1000, cache_ttl: int = 3600):
self.cache = {}
self.max_cache_size = max_cache_size
self.cache_ttl = cache_ttl
self.session = None
async def __aenter__(self):
connector = aiohttp.TCPConnector(limit=100)
self.session = aiohttp.ClientSession(
connector=connector,
timeout=aiohttp.ClientTimeout(total=30)
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
def _generate_cache_key(self, url: str, params: Dict = None) -> str:
"""生成缓存键"""
key_string = f"{url}{str(params or {})}"
return hashlib.md5(key_string.encode()).hexdigest()
def _is_cache_valid(self, cache_entry: Dict) -> bool:
"""检查缓存是否有效"""
if not cache_entry:
return False
return time.time() - cache_entry['timestamp'] < self.cache_ttl
async def get(self, url: str, params: Dict = None, use_cache: bool = True) -> Dict:
"""获取数据,支持缓存"""
# 生成缓存键
cache_key = self._generate_cache_key(url, params)
# 检查缓存
if use_cache and cache_key in self.cache:
cached_entry = self.cache[cache_key]
if self._is_cache_valid(cached_entry):
print(f"Cache hit for {url}")
return cached_entry['data']
else:
# 缓存过期,删除条目
del self.cache[cache_key]
# 发送请求
try:
async with self.session.get(url, params=params) as response:
content = await response.text()
data = {
'url': url,
'status': response.status,
'content': content,
'timestamp': time.time()
}
# 更新缓存
if use_cache:
# 清理缓存以保持大小限制
if len(self.cache) >= self.max_cache_size:
# 删除最旧的条目
oldest_key = min(self.cache.keys(), key=lambda k: self.cache[k]['timestamp'])
del self.cache[oldest_key]
self.cache[cache_key] = {
'data': data,
'timestamp': time.time()
}
return data
except Exception as e:
return {
'url': url,
'error': str(e),
'timestamp': time.time()
}
# 使用示例
async def caching_client_example():
client = CachingHTTPClient(max_cache_size=10, cache_ttl=60)
async with client:
# 第一次请求 - 会缓存
result1 = await client.get('https://httpbin.org/delay/1')
print(f"First request: {result1['status']}")
# 第二次请求 - 使用缓存
result2 = await client.get('https://httpbin.org/delay/1')
print(f"Second request: {result2['status']}")
# asyncio.run(caching_client_example())
最佳实践和注意事项
异常处理和错误恢复
import asyncio
import aiohttp
import logging
from typing import List, Dict
import time
评论 (0)