引言
在现代Web开发和高性能计算领域,异步编程已成为提升应用性能的关键技术。Python作为一门广泛使用的编程语言,在异步编程方面也展现出了强大的能力。从Python 3.4引入的asyncio模块,到如今流行的FastAPI、Django ASGI等框架,异步编程正在改变我们构建高性能应用的方式。
本文将深入探讨Python异步编程的核心概念和最佳实践,从基础的asyncio事件循环开始,逐步深入到协程管理、异步数据库操作等高级主题,并结合现代Web框架如FastAPI,为读者提供一套完整的异步编程实战指南。
什么是异步编程
异步编程的基本概念
异步编程是一种编程范式,允许程序在等待某些操作完成时继续执行其他任务,而不是阻塞等待。在传统的同步编程中,当一个函数需要等待I/O操作完成时,整个线程都会被阻塞,直到操作完成。而异步编程通过事件循环机制,让程序可以在等待I/O操作的同时执行其他任务,从而显著提高程序的并发处理能力。
异步编程的优势
异步编程的主要优势包括:
- 高并发性:在单个线程中可以同时处理多个任务
- 资源效率:减少线程创建和切换的开销
- 响应性:应用程序可以更快地响应用户输入
- 可扩展性:能够处理更多的并发连接
asyncio基础:事件循环详解
事件循环的核心机制
asyncio模块是Python异步编程的核心,它提供了一个事件循环来管理异步任务的执行。事件循环是异步编程的基础,它负责调度和执行协程任务。
import asyncio
import time
async def say_hello(name, delay):
print(f"Hello {name} at {time.time()}")
await asyncio.sleep(delay)
print(f"Goodbye {name} at {time.time()}")
async def main():
# 创建多个协程任务
task1 = say_hello("Alice", 2)
task2 = say_hello("Bob", 1)
task3 = say_hello("Charlie", 3)
# 并发执行所有任务
await asyncio.gather(task1, task2, task3)
# 运行事件循环
asyncio.run(main())
事件循环的类型和选择
Python提供了多种事件循环实现:
import asyncio
import sys
# 获取当前事件循环
loop = asyncio.get_event_loop()
# 创建新的事件循环
new_loop = asyncio.new_event_loop()
# 设置事件循环
asyncio.set_event_loop(new_loop)
# 检查事件循环类型
print(f"Event loop type: {type(loop)}")
print(f"Is default loop: {loop is asyncio.get_event_loop()}")
# 不同的事件循环实现
if sys.platform == "win32":
# Windows平台使用ProactorEventLoop
print("Using ProactorEventLoop")
else:
# Unix平台使用SelectorEventLoop
print("Using SelectorEventLoop")
事件循环的生命周期管理
正确管理事件循环的生命周期对于避免内存泄漏和性能问题至关重要:
import asyncio
import weakref
class AsyncManager:
def __init__(self):
self.loop = None
self.tasks = []
def create_loop(self):
"""创建新的事件循环"""
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
return self.loop
def run_tasks(self, coroutines):
"""运行协程任务"""
if not self.loop:
self.create_loop()
# 创建任务
tasks = [asyncio.create_task(coro) for coro in coroutines]
self.tasks.extend(tasks)
# 运行任务
try:
results = self.loop.run_until_complete(asyncio.gather(*tasks))
return results
except Exception as e:
print(f"Error running tasks: {e}")
raise
finally:
# 清理任务
for task in tasks:
task.cancel()
def close(self):
"""关闭事件循环"""
if self.loop:
self.loop.close()
self.loop = None
# 使用示例
async def sample_task(name, delay):
await asyncio.sleep(delay)
return f"Task {name} completed"
async def main():
manager = AsyncManager()
try:
results = manager.run_tasks([
sample_task("A", 1),
sample_task("B", 2),
sample_task("C", 1.5)
])
print(results)
finally:
manager.close()
# asyncio.run(main())
协程管理与任务调度
协程的基本概念和语法
协程是异步编程的核心概念,它允许函数在执行过程中暂停并稍后恢复执行:
import asyncio
import time
async def fetch_data(url, delay):
"""模拟异步数据获取"""
print(f"Starting fetch from {url}")
await asyncio.sleep(delay)
print(f"Completed fetch from {url}")
return f"Data from {url}"
async def process_data():
"""处理数据的协程"""
start_time = time.time()
# 顺序执行
data1 = await fetch_data("url1", 1)
data2 = await fetch_data("url2", 1)
data3 = await fetch_data("url3", 1)
end_time = time.time()
print(f"Sequential execution took {end_time - start_time:.2f} seconds")
return [data1, data2, data3]
async def process_data_concurrent():
"""并发执行数据处理"""
start_time = time.time()
# 并发执行
task1 = fetch_data("url1", 1)
task2 = fetch_data("url2", 1)
task3 = fetch_data("url3", 1)
results = await asyncio.gather(task1, task2, task3)
end_time = time.time()
print(f"Concurrent execution took {end_time - start_time:.2f} seconds")
return results
# asyncio.run(process_data_concurrent())
任务的创建和管理
在asyncio中,任务是协程的包装器,提供了更多的控制选项:
import asyncio
import time
async def background_task(name, duration):
"""后台任务"""
print(f"Task {name} started")
await asyncio.sleep(duration)
print(f"Task {name} completed")
return f"Result from {name}"
async def task_management_example():
"""任务管理示例"""
# 创建任务
task1 = asyncio.create_task(background_task("A", 2))
task2 = asyncio.create_task(background_task("B", 1))
task3 = asyncio.create_task(background_task("C", 3))
# 等待所有任务完成
results = await asyncio.gather(task1, task2, task3)
print(f"All results: {results}")
# 任务取消示例
task4 = asyncio.create_task(background_task("D", 5))
# 延迟取消任务
await asyncio.sleep(2)
if not task4.done():
task4.cancel()
try:
await task4
except asyncio.CancelledError:
print("Task D was cancelled")
async def task_with_timeout():
"""带超时的任务"""
async def long_running_task():
await asyncio.sleep(5)
return "Long task completed"
try:
# 设置超时时间为3秒
result = await asyncio.wait_for(long_running_task(), timeout=3.0)
print(result)
except asyncio.TimeoutError:
print("Task timed out")
# asyncio.run(task_with_timeout())
任务队列和生产者-消费者模式
在实际应用中,经常需要实现任务队列来管理异步任务:
import asyncio
import random
import time
class TaskQueue:
def __init__(self):
self.queue = asyncio.Queue()
self.results = []
async def producer(self, name, num_items):
"""生产者协程"""
for i in range(num_items):
task_data = {
'id': i,
'name': name,
'data': f"Task data {i}",
'timestamp': time.time()
}
await self.queue.put(task_data)
print(f"Producer {name} produced task {i}")
await asyncio.sleep(random.uniform(0.1, 0.5))
async def consumer(self, name):
"""消费者协程"""
while True:
try:
# 从队列中获取任务,设置超时
task_data = await asyncio.wait_for(self.queue.get(), timeout=1.0)
print(f"Consumer {name} processing {task_data['id']}")
# 模拟处理时间
await asyncio.sleep(random.uniform(0.5, 1.5))
# 标记任务完成
self.queue.task_done()
self.results.append(f"Consumer {name} completed task {task_data['id']}")
except asyncio.TimeoutError:
print(f"Consumer {name} timeout, checking for more tasks")
continue
except Exception as e:
print(f"Consumer {name} error: {e}")
break
async def run(self, num_producers=2, num_consumers=3, items_per_producer=5):
"""运行队列系统"""
# 创建生产者任务
producers = [
self.producer(f"Producer-{i}", items_per_producer)
for i in range(num_producers)
]
# 创建消费者任务
consumers = [
self.consumer(f"Consumer-{i}")
for i in range(num_consumers)
]
# 并发运行所有任务
await asyncio.gather(*producers, *consumers)
return self.results
async def queue_example():
"""队列示例"""
queue = TaskQueue()
results = await queue.run(num_producers=2, num_consumers=3, items_per_producer=3)
print("Final results:", results)
# asyncio.run(queue_example())
异步数据库操作
异步数据库连接池
数据库操作通常是异步应用中的性能瓶颈,合理的异步数据库操作可以显著提升应用性能:
import asyncio
import asyncpg
import time
from typing import List, Dict
class AsyncDatabaseManager:
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.pool = None
async def create_pool(self, min_size: int = 10, max_size: int = 20):
"""创建连接池"""
self.pool = await asyncpg.create_pool(
self.connection_string,
min_size=min_size,
max_size=max_size,
command_timeout=60
)
print(f"Database pool created with size {min_size}-{max_size}")
async def close_pool(self):
"""关闭连接池"""
if self.pool:
await self.pool.close()
print("Database pool closed")
async def execute_query(self, query: str, *args):
"""执行查询"""
if not self.pool:
raise Exception("Database pool not initialized")
async with self.pool.acquire() as connection:
try:
result = await connection.fetch(query, *args)
return result
except Exception as e:
print(f"Query execution error: {e}")
raise
async def execute_update(self, query: str, *args):
"""执行更新操作"""
if not self.pool:
raise Exception("Database pool not initialized")
async with self.pool.acquire() as connection:
try:
result = await connection.execute(query, *args)
return result
except Exception as e:
print(f"Update execution error: {e}")
raise
# 数据库操作示例
async def database_example():
"""数据库操作示例"""
# 初始化数据库管理器
db_manager = AsyncDatabaseManager("postgresql://user:password@localhost:5432/mydb")
try:
# 创建连接池
await db_manager.create_pool()
# 创建示例表
create_table_query = """
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
await db_manager.execute_update(create_table_query)
# 批量插入数据
insert_query = """
INSERT INTO users (name, email) VALUES ($1, $2)
"""
users_data = [
("Alice", "alice@example.com"),
("Bob", "bob@example.com"),
("Charlie", "charlie@example.com"),
("David", "david@example.com"),
("Eve", "eve@example.com")
]
# 并发执行插入操作
start_time = time.time()
tasks = []
for name, email in users_data:
task = db_manager.execute_update(insert_query, name, email)
tasks.append(task)
await asyncio.gather(*tasks)
end_time = time.time()
print(f"Batch insert completed in {end_time - start_time:.2f} seconds")
# 查询数据
select_query = "SELECT * FROM users WHERE name = $1"
results = await db_manager.execute_query(select_query, "Alice")
print(f"Found {len(results)} users")
# 关闭连接池
await db_manager.close_pool()
except Exception as e:
print(f"Database operation failed: {e}")
await db_manager.close_pool()
# asyncio.run(database_example())
异步ORM操作
使用异步ORM可以更方便地管理数据库操作:
import asyncio
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy import select, update, delete
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from typing import Optional
# 定义数据库模型
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = 'users'
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column()
email: Mapped[str] = mapped_column()
age: Mapped[Optional[int]] = mapped_column()
class AsyncORMManager:
def __init__(self, database_url: str):
self.engine = create_async_engine(database_url, echo=True)
self.async_session = async_sessionmaker(
self.engine, expire_on_commit=False
)
async def create_tables(self):
"""创建数据库表"""
async with self.engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def create_user(self, name: str, email: str, age: Optional[int] = None):
"""创建用户"""
async with self.async_session() as session:
user = User(name=name, email=email, age=age)
session.add(user)
await session.commit()
await session.refresh(user)
return user
async def get_user(self, user_id: int):
"""获取用户"""
async with self.async_session() as session:
stmt = select(User).where(User.id == user_id)
result = await session.execute(stmt)
return result.scalar_one_or_none()
async def update_user(self, user_id: int, **kwargs):
"""更新用户"""
async with self.async_session() as session:
stmt = update(User).where(User.id == user_id).values(**kwargs)
await session.execute(stmt)
await session.commit()
async def delete_user(self, user_id: int):
"""删除用户"""
async with self.async_session() as session:
stmt = delete(User).where(User.id == user_id)
await session.execute(stmt)
await session.commit()
# 异步ORM操作示例
async def orm_example():
"""ORM操作示例"""
# 初始化ORM管理器
orm_manager = AsyncORMManager("postgresql+asyncpg://user:password@localhost:5432/mydb")
try:
# 创建表
await orm_manager.create_tables()
# 创建用户
user1 = await orm_manager.create_user("Alice", "alice@example.com", 25)
user2 = await orm_manager.create_user("Bob", "bob@example.com", 30)
print(f"Created users: {user1.id}, {user2.id}")
# 获取用户
retrieved_user = await orm_manager.get_user(user1.id)
print(f"Retrieved user: {retrieved_user.name}")
# 更新用户
await orm_manager.update_user(user1.id, age=26, email="alice.updated@example.com")
updated_user = await orm_manager.get_user(user1.id)
print(f"Updated user age: {updated_user.age}, email: {updated_user.email}")
except Exception as e:
print(f"ORM operation failed: {e}")
# asyncio.run(orm_example())
异步HTTP客户端
aiohttp基础使用
在异步应用中,HTTP客户端的性能同样重要:
import asyncio
import aiohttp
import time
from typing import List, Dict
class AsyncHTTPClient:
def __init__(self, timeout: int = 30):
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.session = None
async def __aenter__(self):
"""异步上下文管理器入口"""
self.session = aiohttp.ClientSession(timeout=self.timeout)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步上下文管理器出口"""
if self.session:
await self.session.close()
async def fetch_url(self, url: str, **kwargs) -> Dict:
"""获取单个URL"""
try:
async with self.session.get(url, **kwargs) as response:
content = await response.text()
return {
'url': url,
'status': response.status,
'content_length': len(content),
'response_time': response.headers.get('Server', 'Unknown')
}
except Exception as e:
return {
'url': url,
'error': str(e),
'status': 'error'
}
async def fetch_multiple_urls(self, urls: List[str], concurrent: int = 10) -> List[Dict]:
"""并发获取多个URL"""
semaphore = asyncio.Semaphore(concurrent)
async def fetch_with_semaphore(url):
async with semaphore:
return await self.fetch_url(url)
tasks = [fetch_with_semaphore(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理异常结果
processed_results = []
for result in results:
if isinstance(result, Exception):
processed_results.append({'error': str(result)})
else:
processed_results.append(result)
return processed_results
# HTTP客户端使用示例
async def http_client_example():
"""HTTP客户端示例"""
urls = [
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/2',
'https://httpbin.org/delay/1',
'https://httpbin.org/delay/3',
'https://httpbin.org/delay/1'
]
async with AsyncHTTPClient(timeout=10) as client:
start_time = time.time()
# 并发获取URL
results = await client.fetch_multiple_urls(urls, concurrent=3)
end_time = time.time()
print(f"Completed {len(results)} requests in {end_time - start_time:.2f} seconds")
for result in results:
if 'error' in result:
print(f"Error: {result['error']}")
else:
print(f"URL: {result['url']}, Status: {result['status']}, Length: {result['content_length']}")
# asyncio.run(http_client_example())
高级HTTP客户端功能
import asyncio
import aiohttp
import json
from typing import Dict, Any, Optional
from dataclasses import dataclass
@dataclass
class APIResponse:
"""API响应数据类"""
status_code: int
data: Any
headers: Dict[str, str]
url: str
response_time: float
class AdvancedAsyncHTTPClient:
def __init__(self, base_url: str, timeout: int = 30, headers: Optional[Dict[str, str]] = None):
self.base_url = base_url.rstrip('/')
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.default_headers = headers or {}
self.session = None
self.request_count = 0
self.total_response_time = 0.0
async def __aenter__(self):
"""异步上下文管理器入口"""
self.session = aiohttp.ClientSession(
timeout=self.timeout,
headers=self.default_headers
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步上下文管理器出口"""
if self.session:
await self.session.close()
async def get(self, endpoint: str, params: Optional[Dict] = None,
headers: Optional[Dict] = None) -> APIResponse:
"""GET请求"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
start_time = time.time()
try:
async with self.session.get(url, params=params, headers=headers) as response:
response_time = time.time() - start_time
data = await response.json() if response.content_type == 'application/json' else await response.text()
api_response = APIResponse(
status_code=response.status,
data=data,
headers=dict(response.headers),
url=url,
response_time=response_time
)
self.request_count += 1
self.total_response_time += response_time
return api_response
except Exception as e:
response_time = time.time() - start_time
print(f"Request to {url} failed: {e}")
raise
async def post(self, endpoint: str, data: Any = None,
headers: Optional[Dict] = None) -> APIResponse:
"""POST请求"""
url = f"{self.base_url}/{endpoint.lstrip('/')}"
start_time = time.time()
try:
async with self.session.post(url, json=data, headers=headers) as response:
response_time = time.time() - start_time
response_data = await response.json() if response.content_type == 'application/json' else await response.text()
api_response = APIResponse(
status_code=response.status,
data=response_data,
headers=dict(response.headers),
url=url,
response_time=response_time
)
self.request_count += 1
self.total_response_time += response_time
return api_response
except Exception as e:
response_time = time.time() - start_time
print(f"Request to {url} failed: {e}")
raise
async def get_with_retry(self, endpoint: str, max_retries: int = 3,
delay: float = 1.0) -> APIResponse:
"""带重试机制的GET请求"""
for attempt in range(max_retries):
try:
return await self.get(endpoint)
except Exception as e:
if attempt == max_retries - 1:
raise e
print(f"Attempt {attempt + 1} failed, retrying in {delay}s...")
await asyncio.sleep(delay)
delay *= 2 # 指数退避
def get_stats(self) -> Dict[str, float]:
"""获取统计信息"""
avg_response_time = self.total_response_time / self.request_count if self.request_count > 0 else 0.0
return {
'total_requests': self.request_count,
'average_response_time': avg_response_time,
'total_response_time': self.total_response_time
}
# 高级HTTP客户端使用示例
async def advanced_http_example():
"""高级HTTP客户端示例"""
async with AdvancedAsyncHTTPClient(
base_url="https://jsonplaceholder.typicode.com",
headers={"User-Agent": "AsyncClient/1.0"}
) as client:
# GET请求示例
try:
response = await client.get("/posts/1")
print(f"GET request - Status: {response.status_code}")
print(f"Response time: {response.response_time:.2f}s")
# POST请求示例
post_data = {
"title": "foo",
"body": "bar",
"userId": 1
}
post_response = await client.post("/posts", data=post_data)
print(f"POST request - Status: {post_response.status_code}")
print(f"Response time: {post_response.response_time:.2f}s")
# 带重试的请求
retry_response = await client.get_with_retry("/posts/1", max_retries=3)
print(f"Retry request - Status: {retry_response.status_code}")
# 打印统计信息
stats = client.get_stats()
print(f"Client stats: {stats}")
except Exception as e:
print(f"Request failed: {e}")
# asyncio.run(advanced_http_example())
FastAPI异步Web框架实战
FastAPI基础架构
FastAPI是现代Python异步Web框架,它基于Starlette和Pydantic,提供了自动化的API文档生成和类型检查功能:
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import List, Optional
import asyncio
import time
# 创建FastAPI应用
app = FastAPI(
title="Async API Example",
description="A demonstration of async programming with FastAPI",
version="1.0.0"
)
# 数据模型
class User(BaseModel):
id: int
name: str
email: str
age: Optional[int] = None
class UserCreate(BaseModel):
name: str
email: str
age: Optional[int] = None
# 模拟数据库
fake_database = {
1: User(id=1, name="Alice", email="alice@example.com", age=25),
2: User(id=2, name="Bob", email="bob@example.com", age=30),
3: User(id=3, name="Charlie", email="charlie@example.com", age=35)
}
# 异步依赖注入
async def get_user_by_id(user_id: int):
"""异步获取用户"""
# 模拟数据库查询延迟
await asyncio.sleep(0.1)
if user_id in fake_database:
return fake_database[user_id]
raise HTTPException(status_code=404, detail="User not found")
# 异步路由处理
@app.get("/")
async def root():
"""根路由"""
return {"message": "Welcome to Async API"}
@app.get("/users/{user_id}")
async def get_user(user_id: int, user: User = Depends(get_user_by_id)):
"""获取单个用户"""
return user
@app
评论 (0)