引言
在现代Web应用开发中,性能和响应速度已经成为用户体验的关键因素。传统的同步编程模型虽然简单直观,但在面对高并发请求时往往显得力不从心。Python作为一门广泛应用的编程语言,其异步编程能力为构建高性能应用提供了强有力的支持。
本文将深入探讨Python异步编程的核心技术,从底层的asyncio库开始,逐步深入到实际应用中的FastAPI框架,通过丰富的代码示例和最佳实践,帮助开发者掌握构建响应式、高吞吐量异步应用系统的完整技能栈。
Python异步编程基础
什么是异步编程
异步编程是一种编程范式,它允许程序在等待某些操作完成时执行其他任务,而不是阻塞整个线程。这种模式特别适用于I/O密集型任务,如网络请求、文件读写等场景。
在Python中,异步编程主要通过async和await关键字实现,配合asyncio库来管理协程的执行。
协程基础概念
协程(Coroutine)是异步编程的核心概念。与普通函数不同,协程可以在执行过程中暂停并恢复,允许在等待I/O操作时让出控制权给其他任务。
import asyncio
# 定义一个简单的协程
async def simple_coroutine():
print("开始执行协程")
await asyncio.sleep(1) # 模拟异步操作
print("协程执行完成")
# 运行协程
asyncio.run(simple_coroutine())
异步函数与同步函数的区别
import asyncio
import time
# 同步函数
def sync_function():
time.sleep(1) # 阻塞1秒
return "同步完成"
# 异步函数
async def async_function():
await asyncio.sleep(1) # 非阻塞等待1秒
return "异步完成"
# 比较执行时间
async def compare_execution():
start_time = time.time()
# 同步方式 - 串行执行
result1 = sync_function()
result2 = sync_function()
result3 = sync_function()
print(f"同步方式耗时: {time.time() - start_time:.2f}秒")
start_time = time.time()
# 异步方式 - 并行执行
tasks = [async_function(), async_function(), async_function()]
results = await asyncio.gather(*tasks)
print(f"异步方式耗时: {time.time() - start_time:.2f}秒")
# asyncio.run(compare_execution())
asyncio核心机制详解
事件循环(Event Loop)
事件循环是异步编程的核心引擎,它负责调度和执行协程。Python的asyncio库提供了一个事件循环来管理所有异步任务。
import asyncio
import time
async def task(name, delay):
print(f"任务 {name} 开始")
await asyncio.sleep(delay)
print(f"任务 {name} 完成")
return f"结果: {name}"
async def main():
# 创建多个任务
tasks = [
task("A", 1),
task("B", 2),
task("C", 1)
]
# 并发执行所有任务
results = await asyncio.gather(*tasks)
print(results)
# 运行事件循环
# asyncio.run(main())
任务管理(Task)
在asyncio中,任务是协程的包装器,提供了更多的控制能力。
import asyncio
async def fetch_data(url, delay):
await asyncio.sleep(delay)
return f"数据来自 {url}"
async def main():
# 创建任务
task1 = asyncio.create_task(fetch_data("api1.com", 1))
task2 = asyncio.create_task(fetch_data("api2.com", 2))
task3 = asyncio.create_task(fetch_data("api3.com", 1))
# 等待所有任务完成
results = await asyncio.gather(task1, task2, task3)
print(results)
# asyncio.run(main())
异步上下文管理器
import asyncio
import aiohttp
class AsyncDatabase:
def __init__(self, connection_string):
self.connection_string = connection_string
self.connection = None
async def __aenter__(self):
print("建立数据库连接")
# 模拟异步连接建立
await asyncio.sleep(0.1)
self.connection = "连接已建立"
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("关闭数据库连接")
# 模拟异步连接关闭
await asyncio.sleep(0.1)
self.connection = None
async def database_operation():
async with AsyncDatabase("postgresql://localhost:5432/mydb") as db:
print(f"当前连接状态: {db.connection}")
await asyncio.sleep(1)
print("执行数据库操作")
# asyncio.run(database_operation())
异步IO处理实战
网络请求异步化
在实际应用中,网络请求是最常见的I/O密集型操作。使用aiohttp库可以轻松实现异步HTTP请求。
import asyncio
import aiohttp
import time
async def fetch_url(session, url):
try:
async with session.get(url) as response:
return {
'url': url,
'status': response.status,
'content_length': len(await response.text())
}
except Exception as e:
return {
'url': url,
'error': str(e)
}
async def fetch_multiple_urls():
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/delay/1',
'https://httpbin.org/status/200',
'https://httpbin.org/status/404'
]
async with aiohttp.ClientSession() as session:
start_time = time.time()
# 并发执行所有请求
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
end_time = time.time()
print(f"总耗时: {end_time - start_time:.2f}秒")
for result in results:
if 'error' in result:
print(f"URL: {result['url']} - 错误: {result['error']}")
else:
print(f"URL: {result['url']} - 状态: {result['status']}")
# asyncio.run(fetch_multiple_urls())
文件操作异步化
import asyncio
import aiofiles
import os
async def read_file_async(filename):
"""异步读取文件"""
try:
async with aiofiles.open(filename, 'r') as file:
content = await file.read()
return {
'filename': filename,
'size': len(content),
'content': content[:100] + '...' if len(content) > 100 else content
}
except Exception as e:
return {
'filename': filename,
'error': str(e)
}
async def write_file_async(filename, content):
"""异步写入文件"""
try:
async with aiofiles.open(filename, 'w') as file:
await file.write(content)
return {'filename': filename, 'status': '写入成功'}
except Exception as e:
return {
'filename': filename,
'error': str(e)
}
async def file_operations_demo():
# 创建测试文件
test_files = ['test1.txt', 'test2.txt', 'test3.txt']
# 异步写入多个文件
write_tasks = []
for i, filename in enumerate(test_files):
content = f"这是第{i+1}个测试文件的内容\n时间: {asyncio.get_event_loop().time()}"
write_tasks.append(write_file_async(filename, content))
# 并发写入
write_results = await asyncio.gather(*write_tasks)
print("写入结果:", write_results)
# 异步读取文件
read_tasks = [read_file_async(filename) for filename in test_files]
read_results = await asyncio.gather(*read_tasks)
print("读取结果:")
for result in read_results:
if 'error' in result:
print(f"错误: {result['filename']} - {result['error']}")
else:
print(f"文件: {result['filename']} - 大小: {result['size']} 字符")
# asyncio.run(file_operations_demo())
数据库异步操作
import asyncio
import asyncpg
import time
class AsyncDatabaseManager:
def __init__(self, connection_string):
self.connection_string = connection_string
self.pool = None
async def connect(self):
"""建立连接池"""
self.pool = await asyncpg.create_pool(
self.connection_string,
min_size=5,
max_size=20
)
print("数据库连接池已建立")
async def close(self):
"""关闭连接池"""
if self.pool:
await self.pool.close()
print("数据库连接池已关闭")
async def execute_query(self, query, *args):
"""执行查询"""
try:
async with self.pool.acquire() as connection:
result = await connection.fetch(query, *args)
return result
except Exception as e:
print(f"查询错误: {e}")
return None
async def execute_update(self, query, *args):
"""执行更新操作"""
try:
async with self.pool.acquire() as connection:
result = await connection.execute(query, *args)
return result
except Exception as e:
print(f"更新错误: {e}")
return None
async def database_demo():
# 初始化数据库管理器
db_manager = AsyncDatabaseManager("postgresql://user:password@localhost:5432/testdb")
try:
await db_manager.connect()
# 执行多个查询
queries = [
"SELECT * FROM users LIMIT 1",
"SELECT count(*) FROM orders",
"SELECT * FROM products WHERE price > $1"
]
start_time = time.time()
tasks = []
for query in queries:
if '$1' in query:
task = db_manager.execute_query(query, 100)
else:
task = db_manager.execute_query(query)
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
print(f"数据库查询耗时: {end_time - start_time:.2f}秒")
for i, result in enumerate(results):
if result:
print(f"查询 {i+1}: 返回 {len(result)} 条记录")
else:
print(f"查询 {i+1}: 执行失败")
finally:
await db_manager.close()
# asyncio.run(database_demo())
FastAPI高性能Web框架
FastAPI基础概念
FastAPI是一个现代、快速(高性能)的Web框架,基于Python 3.7+的类型提示。它使用asyncio来处理异步请求,并提供了自动化的API文档生成、数据验证等功能。
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List
import asyncio
# 创建FastAPI应用实例
app = FastAPI(title="异步API示例", version="1.0.0")
# 数据模型定义
class User(BaseModel):
id: int
name: str
email: str
class UserCreate(BaseModel):
name: str
email: str
# 模拟数据库存储
users_db = [
User(id=1, name="张三", email="zhangsan@example.com"),
User(id=2, name="李四", email="lisi@example.com")
]
# 异步路由处理
@app.get("/")
async def root():
"""根路径"""
return {"message": "欢迎使用异步FastAPI应用"}
@app.get("/users")
async def get_users():
"""获取所有用户"""
# 模拟异步延迟
await asyncio.sleep(0.1)
return users_db
@app.get("/users/{user_id}")
async def get_user(user_id: int):
"""获取特定用户"""
await asyncio.sleep(0.05)
for user in users_db:
if user.id == user_id:
return user
raise HTTPException(status_code=404, detail="用户不存在")
@app.post("/users")
async def create_user(user: UserCreate):
"""创建新用户"""
await asyncio.sleep(0.05)
new_id = max([u.id for u in users_db]) + 1 if users_db else 1
new_user = User(id=new_id, name=user.name, email=user.email)
users_db.append(new_user)
return new_user
# 异步处理大量数据
@app.get("/users/batch")
async def get_users_batch():
"""批量获取用户信息"""
# 模拟并行处理多个异步操作
tasks = []
for i in range(10):
task = asyncio.sleep(0.1) # 模拟异步操作
tasks.append(task)
await asyncio.gather(*tasks)
return {
"message": "批量处理完成",
"count": len(users_db)
}
异步依赖注入
from fastapi import Depends, FastAPI, HTTPException
from typing import AsyncGenerator
import asyncio
app = FastAPI()
# 模拟异步数据库连接
class DatabaseConnection:
def __init__(self):
self.connected = False
async def connect(self):
await asyncio.sleep(0.1) # 模拟连接延迟
self.connected = True
print("数据库已连接")
async def disconnect(self):
await asyncio.sleep(0.05) # 模拟断开延迟
self.connected = False
print("数据库已断开")
async def execute_query(self, query: str):
await asyncio.sleep(0.02) # 模拟查询延迟
return f"查询结果: {query}"
# 异步依赖提供器
async def get_db_connection() -> AsyncGenerator[DatabaseConnection, None]:
db = DatabaseConnection()
await db.connect()
try:
yield db
finally:
await db.disconnect()
# 使用依赖注入的路由
@app.get("/data")
async def get_data(db: DatabaseConnection = Depends(get_db_connection)):
"""使用数据库连接获取数据"""
result = await db.execute_query("SELECT * FROM users")
return {"data": result}
@app.get("/async-data")
async def get_async_data(db: DatabaseConnection = Depends(get_db_connection)):
"""异步处理多个查询"""
# 并发执行多个查询
queries = [
"SELECT count(*) FROM users",
"SELECT * FROM orders LIMIT 5",
"SELECT * FROM products WHERE status = 'active'"
]
tasks = [db.execute_query(query) for query in queries]
results = await asyncio.gather(*tasks)
return {
"queries": len(queries),
"results": results
}
高性能异步API设计
from fastapi import FastAPI, BackgroundTasks, HTTPException
import asyncio
import time
from typing import Optional
app = FastAPI()
# 模拟数据处理队列
processing_queue = []
@app.post("/process-data")
async def process_data(data: dict, background_tasks: BackgroundTasks):
"""异步数据处理,使用后台任务"""
# 将处理任务添加到后台任务中
background_tasks.add_task(process_background_task, data)
return {
"message": "数据已提交处理",
"timestamp": time.time(),
"data_id": data.get("id", "unknown")
}
async def process_background_task(data: dict):
"""后台异步处理任务"""
print(f"开始处理数据: {data}")
# 模拟复杂的异步处理
await asyncio.sleep(2) # 处理延迟
# 模拟一些异步操作
tasks = [
asyncio.sleep(0.5),
asyncio.sleep(0.3),
asyncio.sleep(1.0)
]
await asyncio.gather(*tasks)
print(f"数据处理完成: {data}")
@app.get("/stream-data")
async def stream_data():
"""流式数据返回"""
async def data_generator():
for i in range(10):
await asyncio.sleep(0.1) # 模拟延迟
yield f"数据块 {i}: {time.time()}\n"
return data_generator()
# 异步限流和并发控制
from fastapi import Request
from starlette.middleware.base import BaseHTTPMiddleware
class AsyncRateLimitMiddleware(BaseHTTPMiddleware):
def __init__(self, app, max_requests: int = 100, window_seconds: int = 60):
super().__init__(app)
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = {}
async def dispatch(self, request: Request, call_next):
# 简单的请求计数器实现
client_ip = request.client.host
current_time = time.time()
if client_ip not in self.requests:
self.requests[client_ip] = []
# 清理过期的请求记录
self.requests[client_ip] = [
req_time for req_time in self.requests[client_ip]
if current_time - req_time < self.window_seconds
]
# 检查是否超过限制
if len(self.requests[client_ip]) >= self.max_requests:
raise HTTPException(status_code=429, detail="请求过于频繁")
# 记录当前请求
self.requests[client_ip].append(current_time)
response = await call_next(request)
return response
# 应用中间件
app.add_middleware(AsyncRateLimitMiddleware, max_requests=50, window_seconds=60)
@app.get("/health")
async def health_check():
"""健康检查"""
# 模拟异步健康检查
await asyncio.sleep(0.01)
return {
"status": "healthy",
"timestamp": time.time(),
"service": "async-fastapi-service"
}
实际应用案例
构建高并发异步API服务
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
import asyncio
import aiohttp
import time
from typing import List, Dict, Any
app = FastAPI(title="高并发异步服务", version="1.0.0")
class DataRequest(BaseModel):
urls: List[str]
timeout: int = 5
class ProcessingResult(BaseModel):
url: str
status: int
response_time: float
content_length: int
error: Optional[str] = None
class BatchProcessResult(BaseModel):
total_requests: int
successful_requests: int
failed_requests: int
total_processing_time: float
results: List[ProcessingResult]
# 模拟外部API调用
async def fetch_external_api(url: str, timeout: int = 5) -> ProcessingResult:
"""异步获取外部API数据"""
start_time = time.time()
try:
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) as session:
async with session.get(url) as response:
content = await response.text()
end_time = time.time()
return ProcessingResult(
url=url,
status=response.status,
response_time=end_time - start_time,
content_length=len(content)
)
except Exception as e:
end_time = time.time()
return ProcessingResult(
url=url,
status=0,
response_time=end_time - start_time,
content_length=0,
error=str(e)
)
@app.post("/batch-fetch", response_model=BatchProcessResult)
async def batch_fetch_data(request: DataRequest, background_tasks: BackgroundTasks):
"""批量获取数据"""
start_time = time.time()
# 创建所有异步任务
tasks = [fetch_external_api(url, request.timeout) for url in request.urls]
# 并发执行所有任务
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
processed_results = []
successful_count = 0
failed_count = 0
for i, result in enumerate(results):
if isinstance(result, Exception):
# 处理异常情况
error_result = ProcessingResult(
url=request.urls[i],
status=0,
response_time=0,
content_length=0,
error=str(result)
)
processed_results.append(error_result)
failed_count += 1
else:
if result.status != 0: # 成功的请求
successful_count += 1
processed_results.append(result)
end_time = time.time()
return BatchProcessResult(
total_requests=len(request.urls),
successful_requests=successful_count,
failed_requests=failed_count,
total_processing_time=end_time - start_time,
results=processed_results
)
# 实时数据处理服务
@app.websocket("/ws/realtime")
async def realtime_data_websocket(websocket):
"""实时数据WebSocket连接"""
await websocket.accept()
try:
while True:
# 接收客户端消息
data = await websocket.receive_text()
# 模拟异步处理
await asyncio.sleep(0.1)
# 发送响应
response = {
"timestamp": time.time(),
"received_data": data,
"processed": True
}
await websocket.send_json(response)
except Exception as e:
print(f"WebSocket错误: {e}")
finally:
await websocket.close()
# 监控和指标收集
@app.get("/metrics")
async def get_metrics():
"""获取服务指标"""
return {
"service": "async-data-processor",
"version": "1.0.0",
"timestamp": time.time(),
"active_connections": 0,
"request_count": 0,
"error_count": 0
}
# 异步缓存管理
import json
from typing import Any
class AsyncCache:
def __init__(self):
self.cache = {}
self.ttl = 3600 # 1小时过期
async def get(self, key: str) -> Any:
"""获取缓存数据"""
if key in self.cache:
data, timestamp = self.cache[key]
if time.time() - timestamp < self.ttl:
return data
else:
del self.cache[key]
return None
async def set(self, key: str, value: Any):
"""设置缓存数据"""
self.cache[key] = (value, time.time())
async def delete(self, key: str):
"""删除缓存数据"""
if key in self.cache:
del self.cache[key]
# 全局缓存实例
cache = AsyncCache()
@app.get("/cached-data/{key}")
async def get_cached_data(key: str):
"""获取缓存数据"""
# 尝试从缓存获取
cached_data = await cache.get(key)
if cached_data:
return {"data": cached_data, "from_cache": True}
# 模拟异步数据获取
await asyncio.sleep(0.1)
data = f"从数据库获取的数据: {key}"
# 存储到缓存
await cache.set(key, data)
return {"data": data, "from_cache": False}
# 性能优化示例
@app.get("/optimized-endpoint")
async def optimized_endpoint():
"""性能优化的端点"""
# 使用异步生成器减少内存使用
async def data_generator():
for i in range(1000):
await asyncio.sleep(0.001) # 模拟处理时间
yield {"id": i, "value": f"item_{i}"}
# 使用异步迭代器处理大量数据
items = []
async for item in data_generator():
items.append(item)
if len(items) >= 100: # 分批处理
break
return {
"processed_items": len(items),
"first_100_items": items[:100]
}
高性能异步任务队列
from fastapi import FastAPI, BackgroundTasks
import asyncio
import time
from typing import Dict, List
from pydantic import BaseModel
app = FastAPI()
class Task(BaseModel):
id: str
name: str
status: str = "pending"
created_at: float = time.time()
updated_at: float = time.time()
# 模拟任务队列
task_queue: Dict[str, Task] = {}
processing_tasks: List[str] = []
async def process_task(task_id: str):
"""异步处理任务"""
if task_id not in task_queue:
return
# 更新任务状态
task_queue[task_id].status = "processing"
task_queue[task_id].updated_at = time.time()
try:
# 模拟任务处理时间
await asyncio.sleep(2)
# 模拟一些异步操作
for i in range(5):
await asyncio.sleep(0.1)
print(f"任务 {task_id} 进度: {i+1}/5")
# 更新任务状态为完成
task_queue[task_id].status = "completed"
task_queue[task_id].updated_at = time.time()
print(f"任务 {task_id} 处理完成")
except Exception as e:
task_queue[task_id].status = "failed"
task_queue[task_id].updated_at = time.time()
print(f"任务 {task_id} 处理失败: {e}")
@app.post("/tasks")
async def create_task(task_data: dict, background_tasks: BackgroundTasks):
"""创建异步任务"""
task_id = f"task_{int(time.time())}"
task = Task(
id=task_id,
name=task_data.get("name", "unknown"),
status="pending"
)
task_queue[task_id] = task
# 将任务添加到后台处理
background_tasks.add_task(process_task, task_id)
return {
"message": "任务已创建",
"task_id": task_id,
"status": task.status
}
@app.get("/tasks")
async def get_all_tasks():
"""获取所有任务"""
return list(task_queue.values())
@app.get("/tasks/{task_id}")
async def get_task(task_id: str):
"""获取特定任务状态"""
if task_id not in task_queue:
raise HTTPException(status_code=404, detail="任务不存在")
return task_queue
评论 (0)