component# Python异步编程实战:从asyncio到FastAPI高性能API开发最佳实践
引言
在现代Web开发中,高性能和高并发是系统设计的核心要求。Python作为一门广泛使用的编程语言,在处理I/O密集型任务时面临着传统同步编程的性能瓶颈。异步编程技术的出现为解决这一问题提供了有效方案。本文将深入探讨Python异步编程的核心概念,从基础的asyncio库开始,逐步过渡到基于FastAPI的高性能API开发实践,帮助开发者掌握异步编程的精髓和最佳实践。
什么是异步编程
异步编程基础概念
异步编程是一种编程范式,它允许程序在等待某些操作完成时执行其他任务,而不是阻塞等待。在传统的同步编程中,当一个函数需要等待网络请求、数据库查询或文件读写等I/O操作完成时,整个线程会被阻塞,直到操作完成。而异步编程通过事件循环机制,让程序可以在等待I/O操作的同时执行其他任务,从而显著提高程序的并发处理能力。
异步编程的优势
异步编程的主要优势包括:
- 高并发处理:一个线程可以同时处理多个I/O操作
- 资源利用率高:避免了线程阻塞造成的资源浪费
- 响应速度快:用户请求能够更快得到响应
- 可扩展性强:能够轻松处理大量并发连接
Python异步编程核心库:asyncio
asyncio基础概念
Python的asyncio库是异步编程的核心实现,它提供了事件循环、协程、任务和异步上下文管理器等关键组件。asyncio基于事件循环机制,通过loop.run_until_complete()来运行异步任务。
协程定义与使用
import asyncio
import aiohttp
import time
# 定义协程函数
async def fetch_data(url):
"""异步获取数据"""
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
# 运行异步任务
async def main():
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/1'
]
# 并发执行多个任务
tasks = [fetch_data(url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# 执行异步主函数
# asyncio.run(main())
事件循环管理
import asyncio
async def task(name, delay):
print(f"Task {name} starting")
await asyncio.sleep(delay)
print(f"Task {name} completed")
return f"Result from {name}"
async def main():
# 创建多个任务
task1 = asyncio.create_task(task("A", 2))
task2 = asyncio.create_task(task("B", 1))
task3 = asyncio.create_task(task("C", 3))
# 等待所有任务完成
results = await asyncio.gather(task1, task2, task3)
print("All tasks completed:", results)
# 运行事件循环
# asyncio.run(main())
异步任务处理与并发控制
任务并发控制
在高并发场景下,需要对并发数量进行控制,避免资源耗尽。asyncio.Semaphore是控制并发数量的有效工具:
import asyncio
import aiohttp
import time
class AsyncConcurrentProcessor:
def __init__(self, max_concurrent=10):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def fetch_with_limit(self, url):
"""带并发限制的异步请求"""
async with self.semaphore: # 获取信号量
try:
async with self.session.get(url) as response:
return await response.text()
except Exception as e:
print(f"Error fetching {url}: {e}")
return None
async def process_urls_concurrent(urls, max_concurrent=5):
"""并发处理URL列表"""
async with AsyncConcurrentProcessor(max_concurrent) as processor:
tasks = [processor.fetch_with_limit(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
# 使用示例
# urls = [f'https://httpbin.org/delay/1' for _ in range(20)]
# results = asyncio.run(process_urls_concurrent(urls, 5))
异步任务取消与超时
import asyncio
async def long_running_task():
"""长时间运行的任务"""
try:
for i in range(100):
await asyncio.sleep(0.1)
print(f"Task progress: {i}")
return "Task completed"
except asyncio.CancelledError:
print("Task was cancelled")
raise
async def main():
# 设置任务超时
try:
task = asyncio.create_task(long_running_task())
result = await asyncio.wait_for(task, timeout=5.0)
print(result)
except asyncio.TimeoutError:
print("Task timed out")
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Task cancelled successfully")
# asyncio.run(main())
连接池优化与数据库异步操作
异步数据库连接池
import asyncio
import asyncpg
import aiomysql
class AsyncDatabaseManager:
def __init__(self, db_config):
self.db_config = db_config
self.pool = None
async def connect(self):
"""建立数据库连接池"""
if self.db_config['type'] == 'postgresql':
self.pool = await asyncpg.create_pool(
host=self.db_config['host'],
port=self.db_config['port'],
user=self.db_config['user'],
password=self.db_config['password'],
database=self.db_config['database'],
min_size=5,
max_size=20
)
elif self.db_config['type'] == 'mysql':
self.pool = await aiomysql.create_pool(
host=self.db_config['host'],
port=self.db_config['port'],
user=self.db_config['user'],
password=self.db_config['password'],
db=self.db_config['database'],
minsize=5,
maxsize=20
)
async def execute_query(self, query, params=None):
"""执行查询"""
async with self.pool.acquire() as conn:
if self.db_config['type'] == 'postgresql':
result = await conn.fetch(query, *params) if params else await conn.fetch(query)
else:
result = await conn.fetchall(query, params) if params else await conn.fetchall(query)
return result
async def execute_update(self, query, params=None):
"""执行更新操作"""
async with self.pool.acquire() as conn:
if self.db_config['type'] == 'postgresql':
result = await conn.execute(query, *params) if params else await conn.execute(query)
else:
result = await conn.execute(query, params) if params else await conn.execute(query)
return result
async def close(self):
"""关闭连接池"""
if self.pool:
self.pool.close()
await self.pool.wait_closed()
# 使用示例
async def example_usage():
db_config = {
'type': 'postgresql',
'host': 'localhost',
'port': 5432,
'user': 'username',
'password': 'password',
'database': 'mydb'
}
db_manager = AsyncDatabaseManager(db_config)
await db_manager.connect()
try:
# 执行查询
users = await db_manager.execute_query(
"SELECT * FROM users WHERE age > $1",
[18]
)
print(f"Found {len(users)} users")
# 执行更新
result = await db_manager.execute_update(
"UPDATE users SET last_login = NOW() WHERE id = $1",
[1]
)
print(f"Updated {result} rows")
finally:
await db_manager.close()
异步缓存操作
import asyncio
import aioredis
from typing import Any, Optional
class AsyncCacheManager:
def __init__(self, redis_url: str):
self.redis_url = redis_url
self.redis = None
async def connect(self):
"""连接Redis"""
self.redis = await aioredis.from_url(self.redis_url)
async def get(self, key: str) -> Optional[Any]:
"""获取缓存数据"""
try:
value = await self.redis.get(key)
return value.decode('utf-8') if value else None
except Exception as e:
print(f"Cache get error: {e}")
return None
async def set(self, key: str, value: Any, expire: int = 3600):
"""设置缓存数据"""
try:
await self.redis.set(key, str(value), ex=expire)
except Exception as e:
print(f"Cache set error: {e}")
async def delete(self, key: str):
"""删除缓存数据"""
try:
await self.redis.delete(key)
except Exception as e:
print(f"Cache delete error: {e}")
async def close(self):
"""关闭连接"""
if self.redis:
await self.redis.close()
# 使用示例
async def cache_example():
cache = AsyncCacheManager("redis://localhost:6379")
await cache.connect()
try:
# 设置缓存
await cache.set("user:123", {"name": "John", "age": 30}, 3600)
# 获取缓存
user_data = await cache.get("user:123")
print(f"User data: {user_data}")
finally:
await cache.close()
FastAPI高性能API开发
FastAPI基础概念
FastAPI是一个现代、快速(高性能)的Web框架,基于Python 3.7+的类型提示。它使用Starlette作为底层ASGI框架,使用Pydantic进行数据验证和序列化。FastAPI的核心优势包括:
- 自动API文档:自动生成交互式API文档(Swagger UI和ReDoc)
- 高性能:基于Starlette和Uvicorn,性能接近Node.js和Go
- 类型安全:基于Python类型提示的自动数据验证
- 异步支持:原生支持异步编程
FastAPI异步路由定义
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
from typing import List, Optional
import asyncio
import aiohttp
from datetime import datetime
app = FastAPI(title="Async API Example", version="1.0.0")
# 数据模型
class User(BaseModel):
id: int
name: str
email: str
created_at: datetime
class UserCreate(BaseModel):
name: str
email: str
# 模拟数据库
fake_users_db = [
{"id": 1, "name": "John Doe", "email": "john@example.com", "created_at": datetime.now()},
{"id": 2, "name": "Jane Smith", "email": "jane@example.com", "created_at": datetime.now()},
]
# 异步路由示例
@app.get("/users", response_model=List[User])
async def get_users():
"""异步获取用户列表"""
# 模拟数据库查询延迟
await asyncio.sleep(0.1)
return fake_users_db
@app.get("/users/{user_id}", response_model=User)
async def get_user(user_id: int):
"""异步获取单个用户"""
await asyncio.sleep(0.05)
for user in fake_users_db:
if user["id"] == user_id:
return user
raise HTTPException(status_code=404, detail="User not found")
@app.post("/users", response_model=User)
async def create_user(user: UserCreate):
"""异步创建用户"""
await asyncio.sleep(0.05)
new_user = {
"id": len(fake_users_db) + 1,
"name": user.name,
"email": user.email,
"created_at": datetime.now()
}
fake_users_db.append(new_user)
return new_user
# 异步任务处理
@app.get("/users/{user_id}/profile")
async def get_user_profile(user_id: int):
"""异步获取用户详细信息"""
# 模拟多个异步操作
tasks = [
asyncio.create_task(fetch_user_data(user_id)),
asyncio.create_task(fetch_user_preferences(user_id)),
asyncio.create_task(fetch_user_activity(user_id))
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {
"user": results[0] if not isinstance(results[0], Exception) else None,
"preferences": results[1] if not isinstance(results[1], Exception) else None,
"activity": results[2] if not isinstance(results[2], Exception) else None
}
async def fetch_user_data(user_id: int):
"""异步获取用户数据"""
await asyncio.sleep(0.1)
return {"id": user_id, "name": "User Name", "email": "user@example.com"}
async def fetch_user_preferences(user_id: int):
"""异步获取用户偏好"""
await asyncio.sleep(0.05)
return {"theme": "dark", "language": "en"}
async def fetch_user_activity(user_id: int):
"""异步获取用户活动"""
await asyncio.sleep(0.08)
return {"last_login": "2023-10-01", "login_count": 15}
异步中间件与依赖注入
from fastapi import Depends, FastAPI, HTTPException, Request
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.middleware.cors import CORSMiddleware
import time
from typing import AsyncGenerator
# 异步依赖注入
async def get_db_connection():
"""异步数据库连接依赖"""
# 这里可以实现实际的数据库连接逻辑
db = {"connection": "async_connection"}
try:
yield db
finally:
# 连接关闭逻辑
pass
async def get_current_user(token: str = None):
"""获取当前用户"""
# 模拟用户验证
await asyncio.sleep(0.01)
if token == "valid_token":
return {"id": 1, "name": "John"}
raise HTTPException(status_code=401, detail="Invalid token")
# 异步中间件
class AsyncMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
# 请求开始时间
start_time = time.time()
# 处理请求
await self.app(scope, receive, send)
# 记录处理时间
process_time = time.time() - start_time
print(f"Request processed in {process_time:.2f} seconds")
# 应用配置
app = FastAPI(
title="Async API with Middleware",
description="FastAPI application with async middleware and dependencies"
)
# 添加中间件
app.add_middleware(AsyncMiddleware)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# 使用依赖
@app.get("/protected")
async def protected_endpoint(current_user: dict = Depends(get_current_user)):
"""需要认证的端点"""
return {"message": "Hello, authenticated user!", "user": current_user}
@app.get("/async-data")
async def get_async_data(db = Depends(get_db_connection)):
"""使用异步依赖的端点"""
await asyncio.sleep(0.1)
return {"data": "async_response", "db": db["connection"]}
高性能优化策略
异步任务调度与并发控制
from fastapi import FastAPI, BackgroundTasks
import asyncio
import logging
app = FastAPI()
logger = logging.getLogger(__name__)
# 限制并发任务数
semaphore = asyncio.Semaphore(10)
async def process_background_task(task_id: str, data: dict):
"""后台任务处理"""
async with semaphore: # 限制并发数
try:
# 模拟处理时间
await asyncio.sleep(1)
logger.info(f"Task {task_id} completed")
return {"task_id": task_id, "status": "completed", "data": data}
except Exception as e:
logger.error(f"Task {task_id} failed: {e}")
raise
@app.post("/background-process")
async def background_process(data: dict, background_tasks: BackgroundTasks):
"""后台任务处理"""
task_id = f"task_{int(time.time())}"
# 添加后台任务
background_tasks.add_task(process_background_task, task_id, data)
return {"message": "Task started", "task_id": task_id}
# 批量处理优化
@app.post("/batch-process")
async def batch_process(items: List[dict]):
"""批量处理优化"""
# 使用异步批量处理
tasks = [process_item(item) for item in items]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {"results": results, "total": len(items)}
async def process_item(item: dict):
"""处理单个项目"""
# 模拟异步处理
await asyncio.sleep(0.01)
return {"processed": True, "item": item}
缓存策略与性能监控
from fastapi import FastAPI, HTTPException
import asyncio
import time
from typing import Dict, Any
app = FastAPI()
# 简单内存缓存
class SimpleCache:
def __init__(self):
self.cache: Dict[str, Any] = {}
self.expire_time: Dict[str, float] = {}
def get(self, key: str) -> Any:
if key in self.cache:
if time.time() < self.expire_time.get(key, 0):
return self.cache[key]
else:
del self.cache[key]
del self.expire_time[key]
return None
def set(self, key: str, value: Any, expire: int = 3600):
self.cache[key] = value
self.expire_time[key] = time.time() + expire
cache = SimpleCache()
@app.get("/cached-data/{item_id}")
async def get_cached_data(item_id: str):
"""使用缓存的数据"""
# 检查缓存
cached_data = cache.get(item_id)
if cached_data:
return {"data": cached_data, "source": "cache"}
# 模拟数据获取
await asyncio.sleep(0.1)
data = {"id": item_id, "value": f"data_for_{item_id}"}
# 设置缓存
cache.set(item_id, data, expire=60)
return {"data": data, "source": "database"}
# 性能监控装饰器
def monitor_performance(func):
"""性能监控装饰器"""
async def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = await func(*args, **kwargs)
execution_time = time.time() - start_time
print(f"{func.__name__} executed in {execution_time:.4f} seconds")
return result
except Exception as e:
execution_time = time.time() - start_time
print(f"{func.__name__} failed after {execution_time:.4f} seconds: {e}")
raise
return wrapper
@app.get("/monitored-endpoint")
@monitor_performance
async def monitored_endpoint():
"""受监控的端点"""
await asyncio.sleep(0.05)
return {"message": "Monitored endpoint"}
实际应用案例
微服务异步调用
from fastapi import FastAPI
import aiohttp
import asyncio
from typing import List, Dict
app = FastAPI()
class MicroserviceClient:
def __init__(self):
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def call_service(self, service_url: str, endpoint: str, data: dict = None):
"""调用微服务"""
url = f"{service_url}/{endpoint}"
try:
async with self.session.post(url, json=data) as response:
if response.status == 200:
return await response.json()
else:
raise HTTPException(status_code=response.status, detail=f"Service error: {response.status}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Service call failed: {str(e)}")
@app.get("/composite-service")
async def composite_service():
"""组合多个服务的响应"""
async with MicroserviceClient() as client:
# 并发调用多个服务
tasks = [
client.call_service("http://service1:8000", "user/profile", {"user_id": 1}),
client.call_service("http://service2:8000", "user/preferences", {"user_id": 1}),
client.call_service("http://service3:8000", "user/notifications", {"user_id": 1})
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
user_data = results[0] if not isinstance(results[0], Exception) else None
preferences = results[1] if not isinstance(results[1], Exception) else None
notifications = results[2] if not isinstance(results[2], Exception) else None
return {
"user": user_data,
"preferences": preferences,
"notifications": notifications
}
# 异步任务队列处理
from celery import Celery
import asyncio
# 配置Celery
celery_app = Celery('async_tasks', broker='redis://localhost:6379/0')
@app.post("/queue-task")
async def queue_task(data: dict):
"""将任务放入队列"""
# 异步发送到任务队列
task = celery_app.send_task('process_data', args=[data])
return {"task_id": task.id, "status": "queued"}
@app.get("/task-status/{task_id}")
async def get_task_status(task_id: str):
"""获取任务状态"""
result = celery_app.AsyncResult(task_id)
return {
"task_id": task_id,
"status": result.status,
"result": result.result if result.ready() else None
}
实时数据处理
from fastapi import FastAPI, WebSocket
import asyncio
import json
from datetime import datetime
app = FastAPI()
# WebSocket连接管理
connected_clients = set()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
"""WebSocket实时连接"""
await websocket.accept()
connected_clients.add(websocket)
try:
while True:
# 接收客户端消息
data = await websocket.receive_text()
message = json.loads(data)
# 处理消息
response = {
"timestamp": datetime.now().isoformat(),
"client_id": client_id,
"received": message,
"processed": "success"
}
# 广播给所有连接的客户端
await broadcast_message(response)
except Exception as e:
print(f"WebSocket error: {e}")
finally:
connected_clients.remove(websocket)
async def broadcast_message(message: dict):
"""广播消息给所有连接的客户端"""
if connected_clients:
tasks = [
client.send_text(json.dumps(message))
for client in connected_clients
]
await asyncio.gather(*tasks, return_exceptions=True)
# 异步数据流处理
@app.get("/stream-data")
async def stream_data():
"""异步数据流"""
async def data_stream():
for i in range(100):
await asyncio.sleep(0.1) # 模拟数据生成延迟
yield {
"id": i,
"timestamp": datetime.now().isoformat(),
"value": i * 10
}
return data_stream()
性能测试与调优
异步性能测试
import asyncio
import time
import aiohttp
from concurrent.futures import ThreadPoolExecutor
import statistics
class AsyncPerformanceTester:
def __init__(self, max_concurrent=100):
self.max_concurrent = max_concurrent
self.semaphore = asyncio.Semaphore(max_concurrent)
async def test_single_request(self, session, url):
"""测试单个请求"""
start_time = time.time()
try:
async with self.semaphore:
async with session.get(url) as response:
await response.text()
return time.time() - start_time
except Exception as e:
return time.time() - start_time
async def run_performance_test(self, urls, concurrent_requests=100):
"""运行性能测试"""
async with aiohttp.ClientSession() as session:
tasks = [self.test_single_request(session, url) for url in urls]
times = await asyncio.gather(*tasks)
return {
"total_requests": len(times),
"avg_time": statistics.mean(times),
"min_time": min(times),
"max_time": max(times),
"total_time": sum(times)
}
# 使用示例
async def performance_test_example():
tester = AsyncPerformanceTester(max_concurrent=50)
urls = [f"https://httpbin.org/delay/1" for _ in range(100)]
result = await tester.run_performance_test(urls, concurrent_requests=50)
print("Performance Test Results:")
print(f"Total Requests: {result['total_requests']}")
print(f"Average Time: {result['avg_time']:.4f}s")
print(f"Min Time: {result['min_time']:.4f}s")
print(f"Max Time: {result['max_time']:.4f}s")
print(f"Total Time: {result['total_time']:.4f}s")
资源监控与优化
import psutil
import asyncio
import time
from fastapi import FastAPI
import logging
app = FastAPI()
logger = logging.getLogger(__name__)
class ResourceMonitor:
def __init__(self):
self.monitoring = False
self.monitoring_task = None
async def start_monitoring(self):
"""开始资源监控"""
self.monitoring = True
self.monitoring_task = asyncio.create
评论 (0)