引言
在现代Python开发中,异步编程已成为提升应用性能和响应能力的关键技术。随着网络请求、数据库操作等I/O密集型任务的增多,传统的同步编程模式已经无法满足高性能应用的需求。Python的asyncio库为开发者提供了强大的异步编程支持,通过事件循环机制和协程,能够有效处理高并发场景下的任务执行。
本文将深入探讨Python异步编程的核心概念和最佳实践,从基础的asyncio使用到复杂的并发控制策略,再到实际的性能优化技巧,帮助开发者构建高效、可靠的异步应用。
1. 异步编程基础概念
1.1 协程与异步函数
在Python中,协程(Coroutine)是一种可以暂停执行并在稍后恢复的函数。异步函数使用async def关键字定义,返回一个协程对象而不是直接执行结果。
import asyncio
# 定义异步函数
async def fetch_data(url):
print(f"开始获取 {url}")
# 模拟网络请求延迟
await asyncio.sleep(1)
print(f"完成获取 {url}")
return f"数据来自 {url}"
# 创建协程对象
coroutine = fetch_data("https://api.example.com/data")
print(type(coroutine)) # <class 'coroutine'>
1.2 事件循环
事件循环是异步编程的核心,负责调度和执行协程。Python的asyncio库提供了完整的事件循环实现:
import asyncio
async def main():
print("开始执行")
await asyncio.sleep(1)
print("执行完成")
# 运行事件循环
asyncio.run(main())
1.3 await关键字
await关键字用于等待协程的执行结果,只有在异步函数中才能使用:
import asyncio
async def calculate(x, y):
await asyncio.sleep(0.5)
return x + y
async def main():
# 等待异步函数执行
result = await calculate(10, 20)
print(f"计算结果: {result}")
asyncio.run(main())
2. asyncio核心组件详解
2.1 Task对象
Task是Future的子类,用于包装协程并允许在事件循环中调度执行:
import asyncio
import time
async def slow_operation(name):
print(f"开始 {name}")
await asyncio.sleep(2)
print(f"完成 {name}")
return f"{name} 的结果"
async def main():
# 创建Task对象
task1 = asyncio.create_task(slow_operation("任务1"))
task2 = asyncio.create_task(slow_operation("任务2"))
# 等待所有任务完成
result1 = await task1
result2 = await task2
print(f"结果: {result1}, {result2}")
asyncio.run(main())
2.2 Future对象
Future代表一个异步操作的结果,可以被回调函数处理:
import asyncio
def callback(future):
print(f"Future完成,结果: {future.result()}")
async def async_operation():
await asyncio.sleep(1)
return "异步操作完成"
async def main():
# 创建Future对象
future = asyncio.ensure_future(async_operation())
# 添加回调函数
future.add_done_callback(callback)
# 等待结果
result = await future
print(f"最终结果: {result}")
asyncio.run(main())
2.3 事件循环管理
正确管理事件循环对于异步程序的性能至关重要:
import asyncio
import time
class AsyncManager:
def __init__(self):
self.loop = None
async def setup(self):
"""初始化异步环境"""
self.loop = asyncio.get_running_loop()
print("异步环境已准备就绪")
async def cleanup(self):
"""清理资源"""
if self.loop:
print("正在清理资源...")
await asyncio.sleep(0.1) # 模拟清理时间
async def run_task(self, task_name):
"""执行任务"""
print(f"开始执行 {task_name}")
await asyncio.sleep(1)
print(f"完成执行 {task_name}")
return f"{task_name} 完成"
# 使用示例
async def main():
manager = AsyncManager()
await manager.setup()
try:
tasks = [
manager.run_task("任务A"),
manager.run_task("任务B"),
manager.run_task("任务C")
]
results = await asyncio.gather(*tasks)
print(f"所有任务结果: {results}")
finally:
await manager.cleanup()
asyncio.run(main())
3. 并发控制策略
3.1 信号量控制并发数量
信号量(Semaphore)是控制并发访问资源的重要工具:
import asyncio
import aiohttp
import time
class ConcurrencyController:
def __init__(self, max_concurrent=5):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def fetch_url(self, url):
"""使用信号量控制并发"""
async with self.semaphore: # 获取信号量
try:
print(f"开始请求 {url}")
async with self.session.get(url) as response:
result = await response.text()
print(f"完成请求 {url}")
return result
except Exception as e:
print(f"请求失败 {url}: {e}")
return None
async def main():
urls = [
f"https://httpbin.org/delay/{i%3+1}"
for i in range(10)
]
async with ConcurrencyController(max_concurrent=3) as controller:
tasks = [controller.fetch_url(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
print(f"总共获取 {len(results)} 个结果")
successful = sum(1 for r in results if not isinstance(r, Exception))
print(f"成功: {successful}, 失败: {len(results) - successful}")
# asyncio.run(main())
3.2 限制并发任务数
使用队列和工作协程模式控制并发:
import asyncio
import time
class TaskQueue:
def __init__(self, max_workers=5):
self.queue = asyncio.Queue()
self.workers = max_workers
self.results = []
async def worker(self, worker_id):
"""工作协程"""
while True:
try:
# 从队列获取任务
task_data = await self.queue.get()
if task_data is None: # 结束信号
break
task_name, delay = task_data
print(f"工人 {worker_id} 开始处理 {task_name}")
# 模拟工作
await asyncio.sleep(delay)
result = f"{task_name} 完成,耗时 {delay}s"
self.results.append(result)
print(f"工人 {worker_id} 完成 {task_name}")
# 标记任务完成
self.queue.task_done()
except Exception as e:
print(f"工人 {worker_id} 发生错误: {e}")
async def add_task(self, task_name, delay):
"""添加任务到队列"""
await self.queue.put((task_name, delay))
async def run(self):
"""运行任务队列"""
# 创建工作协程
workers = [
asyncio.create_task(self.worker(i))
for i in range(self.workers)
]
# 等待所有任务完成
await self.queue.join()
# 停止所有工作协程
for _ in range(self.workers):
await self.queue.put(None)
# 等待所有工作协程结束
await asyncio.gather(*workers)
async def main():
task_queue = TaskQueue(max_workers=3)
# 添加任务
tasks = [
("任务1", 1),
("任务2", 2),
("任务3", 1),
("任务4", 3),
("任务5", 1),
("任务6", 2),
]
for task_name, delay in tasks:
await task_queue.add_task(task_name, delay)
# 运行队列
start_time = time.time()
await task_queue.run()
end_time = time.time()
print(f"总耗时: {end_time - start_time:.2f}s")
for result in task_queue.results:
print(result)
# asyncio.run(main())
3.3 动态并发控制
根据系统负载动态调整并发数量:
import asyncio
import time
import psutil
from collections import deque
class DynamicConcurrencyController:
def __init__(self, max_concurrent=10, min_concurrent=1):
self.max_concurrent = max_concurrent
self.min_concurrent = min_concurrent
self.current_concurrent = min_concurrent
self.semaphore = asyncio.Semaphore(self.current_concurrent)
self.load_history = deque(maxlen=10)
def get_system_load(self):
"""获取系统负载"""
cpu_percent = psutil.cpu_percent(interval=0.1)
memory_percent = psutil.virtual_memory().percent
return cpu_percent, memory_percent
def adjust_concurrency(self):
"""根据系统负载调整并发数"""
cpu_load, memory_load = self.get_system_load()
# 简单的负载判断逻辑
if cpu_load > 80 or memory_load > 80:
# 高负载,减少并发
self.current_concurrent = max(
self.min_concurrent,
int(self.current_concurrent * 0.7)
)
elif cpu_load < 30 and memory_load < 30:
# 低负载,增加并发
self.current_concurrent = min(
self.max_concurrent,
int(self.current_concurrent * 1.2)
)
# 更新信号量
self.semaphore = asyncio.Semaphore(self.current_concurrent)
print(f"当前并发数: {self.current_concurrent}")
async def execute_with_control(self, task_func, *args, **kwargs):
"""在控制下执行任务"""
self.adjust_concurrency()
async with self.semaphore:
return await task_func(*args, **kwargs)
# 使用示例
async def sample_task(task_id, delay):
print(f"任务 {task_id} 开始")
await asyncio.sleep(delay)
print(f"任务 {task_id} 完成")
return f"任务 {task_id} 结果"
async def main():
controller = DynamicConcurrencyController(max_concurrent=10, min_concurrent=2)
tasks = [
controller.execute_with_control(sample_task, i, 0.5)
for i in range(20)
]
results = await asyncio.gather(*tasks)
print(f"所有任务完成,结果数量: {len(results)}")
# asyncio.run(main())
4. 异步数据库操作
4.1 使用asyncpg进行PostgreSQL异步操作
import asyncio
import asyncpg
from typing import List, Dict, Any
class AsyncDatabaseManager:
def __init__(self, connection_string: str):
self.connection_string = connection_string
self.pool = None
async def connect(self):
"""建立连接池"""
self.pool = await asyncpg.create_pool(
self.connection_string,
min_size=5,
max_size=20,
command_timeout=60
)
print("数据库连接池已建立")
async def disconnect(self):
"""关闭连接池"""
if self.pool:
await self.pool.close()
print("数据库连接池已关闭")
async def execute_query(self, query: str, params: tuple = None) -> List[Dict[str, Any]]:
"""执行查询"""
if not self.pool:
raise Exception("数据库未连接")
try:
async with self.pool.acquire() as connection:
rows = await connection.fetch(query, *params)
return [dict(row) for row in rows]
except Exception as e:
print(f"查询错误: {e}")
raise
async def execute_update(self, query: str, params: tuple = None) -> int:
"""执行更新操作"""
if not self.pool:
raise Exception("数据库未连接")
try:
async with self.pool.acquire() as connection:
result = await connection.execute(query, *params)
return result.split()[1] # 返回影响的行数
except Exception as e:
print(f"更新错误: {e}")
raise
async def batch_insert(self, table: str, data: List[Dict[str, Any]]) -> int:
"""批量插入数据"""
if not self.pool or not data:
return 0
try:
columns = list(data[0].keys())
placeholders = ', '.join([f'${i+1}' for i in range(len(columns))])
column_names = ', '.join(columns)
query = f"INSERT INTO {table} ({column_names}) VALUES ({placeholders})"
async with self.pool.acquire() as connection:
# 使用事务批量插入
async with connection.transaction():
for row in data:
await connection.execute(query, *row.values())
return len(data)
except Exception as e:
print(f"批量插入错误: {e}")
raise
# 使用示例
async def database_example():
# 连接数据库(请替换为实际的连接字符串)
db_manager = AsyncDatabaseManager("postgresql://user:password@localhost/dbname")
try:
await db_manager.connect()
# 创建表
await db_manager.execute_update("""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100),
created_at TIMESTAMP DEFAULT NOW()
)
""")
# 插入数据
sample_data = [
{"name": "Alice", "email": "alice@example.com"},
{"name": "Bob", "email": "bob@example.com"},
{"name": "Charlie", "email": "charlie@example.com"}
]
inserted_count = await db_manager.batch_insert("users", sample_data)
print(f"插入了 {inserted_count} 条记录")
# 查询数据
users = await db_manager.execute_query("SELECT * FROM users WHERE name = $1", ("Alice",))
print(f"查询结果: {users}")
except Exception as e:
print(f"数据库操作错误: {e}")
finally:
await db_manager.disconnect()
# asyncio.run(database_example())
4.2 异步Redis操作
import asyncio
import aioredis
from typing import Any, Optional
class AsyncRedisManager:
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_url = redis_url
self.redis = None
async def connect(self):
"""连接Redis"""
self.redis = await aioredis.from_url(
self.redis_url,
encoding="utf-8",
decode_responses=True
)
print("Redis连接成功")
async def disconnect(self):
"""断开Redis连接"""
if self.redis:
await self.redis.close()
print("Redis连接已关闭")
async def get_data(self, key: str) -> Optional[str]:
"""获取数据"""
if not self.redis:
raise Exception("Redis未连接")
try:
return await self.redis.get(key)
except Exception as e:
print(f"获取数据错误: {e}")
return None
async def set_data(self, key: str, value: str, expire: int = 3600) -> bool:
"""设置数据"""
if not self.redis:
raise Exception("Redis未连接")
try:
await self.redis.set(key, value, ex=expire)
return True
except Exception as e:
print(f"设置数据错误: {e}")
return False
async def batch_get(self, keys: List[str]) -> Dict[str, str]:
"""批量获取数据"""
if not self.redis:
raise Exception("Redis未连接")
try:
results = await self.redis.mget(*keys)
return dict(zip(keys, results))
except Exception as e:
print(f"批量获取错误: {e}")
return {}
async def pipeline_operations(self, operations: List[tuple]) -> List[Any]:
"""管道操作"""
if not self.redis:
raise Exception("Redis未连接")
try:
pipe = self.redis.pipeline()
for op in operations:
operation_type, *args = op
if operation_type == 'get':
pipe.get(args[0])
elif operation_type == 'set':
pipe.set(args[0], args[1])
return await pipe.execute()
except Exception as e:
print(f"管道操作错误: {e}")
return []
# 使用示例
async def redis_example():
redis_manager = AsyncRedisManager("redis://localhost:6379")
try:
await redis_manager.connect()
# 设置数据
await redis_manager.set_data("user:1", "Alice", 3600)
await redis_manager.set_data("user:2", "Bob", 3600)
# 获取单个数据
user1 = await redis_manager.get_data("user:1")
print(f"获取用户1: {user1}")
# 批量获取
users = await redis_manager.batch_get(["user:1", "user:2", "user:3"])
print(f"批量获取结果: {users}")
# 管道操作
operations = [
('set', 'counter', '100'),
('get', 'counter')
]
results = await redis_manager.pipeline_operations(operations)
print(f"管道操作结果: {results}")
except Exception as e:
print(f"Redis操作错误: {e}")
finally:
await redis_manager.disconnect()
# asyncio.run(redis_example())
5. 高并发性能优化
5.1 连接池优化
import asyncio
import aiohttp
from typing import Dict, Any
class OptimizedHttpClient:
def __init__(self,
max_connections: int = 100,
timeout: int = 30,
keepalive_timeout: int = 60):
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.connector = aiohttp.TCPConnector(
limit=max_connections,
limit_per_host=max_connections // 4,
ttl_dns_cache=300,
use_dns_cache=True,
keepalive_timeout=keepalive_timeout
)
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(
timeout=self.timeout,
connector=self.connector
)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def fetch(self, url: str, **kwargs) -> Dict[str, Any]:
"""获取数据"""
try:
async with self.session.get(url, **kwargs) as response:
return {
'url': url,
'status': response.status,
'headers': dict(response.headers),
'content': await response.text(),
'success': True
}
except Exception as e:
return {
'url': url,
'error': str(e),
'success': False
}
async def performance_test():
urls = [
f"https://httpbin.org/delay/{i%3+1}"
for i in range(50)
]
# 性能测试
start_time = asyncio.get_event_loop().time()
async with OptimizedHttpClient(max_connections=20) as client:
tasks = [client.fetch(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
end_time = asyncio.get_event_loop().time()
print(f"总耗时: {end_time - start_time:.2f}s")
successful = sum(1 for r in results if isinstance(r, dict) and r.get('success'))
print(f"成功请求: {successful}/{len(urls)}")
# asyncio.run(performance_test())
5.2 缓存策略优化
import asyncio
import time
from typing import Dict, Any, Optional
from collections import OrderedDict
class AsyncCache:
def __init__(self, max_size: int = 1000, ttl: int = 3600):
self.max_size = max_size
self.ttl = ttl
self.cache = OrderedDict()
self.timestamps = {}
self.lock = asyncio.Lock()
async def get(self, key: str) -> Optional[Any]:
"""获取缓存数据"""
async with self.lock:
if key not in self.cache:
return None
# 检查是否过期
if time.time() - self.timestamps.get(key, 0) > self.ttl:
del self.cache[key]
del self.timestamps[key]
return None
# 移动到末尾(最近使用)
self.cache.move_to_end(key)
return self.cache[key]
async def set(self, key: str, value: Any):
"""设置缓存数据"""
async with self.lock:
# 如果已存在,移动到末尾
if key in self.cache:
self.cache.move_to_end(key)
# 检查是否需要删除旧数据
if len(self.cache) >= self.max_size:
oldest_key = next(iter(self.cache))
del self.cache[oldest_key]
del self.timestamps[oldest_key]
# 设置新数据
self.cache[key] = value
self.timestamps[key] = time.time()
async def invalidate(self, key: str):
"""删除缓存"""
async with self.lock:
if key in self.cache:
del self.cache[key]
del self.timestamps[key]
async def cleanup_expired(self):
"""清理过期数据"""
async with self.lock:
current_time = time.time()
expired_keys = [
key for key, timestamp in self.timestamps.items()
if current_time - timestamp > self.ttl
]
for key in expired_keys:
del self.cache[key]
del self.timestamps[key]
# 使用示例
async def cache_example():
cache = AsyncCache(max_size=100, ttl=10)
# 测试缓存
await cache.set("key1", "value1")
result = await cache.get("key1")
print(f"获取缓存: {result}")
# 模拟过期
await asyncio.sleep(11)
result = await cache.get("key1")
print(f"过期后获取: {result}")
# 批量操作
tasks = [
cache.set(f"key{i}", f"value{i}")
for i in range(50)
]
await asyncio.gather(*tasks)
print(f"缓存大小: {len(cache.cache)}")
# asyncio.run(cache_example())
5.3 负载均衡与错误处理
import asyncio
import random
from typing import List, Dict, Any
from dataclasses import dataclass
@dataclass
class Server:
url: str
weight: int = 1
healthy: bool = True
error_count: int = 0
max_errors: int = 3
class LoadBalancer:
def __init__(self, servers: List[Server]):
self.servers = servers
self.current_index = 0
def get_next_server(self) -> Server:
"""获取下一个服务器"""
# 简单的轮询策略
server = self.servers[self.current_index]
self.current_index = (self.current_index + 1) % len(self.servers)
return server
def get_weighted_server(self) -> Server:
"""获取加权服务器(权重高的优先)"""
# 过滤健康服务器
healthy_servers = [s for s in self.servers if s.healthy]
if not healthy_servers:
raise Exception("没有可用的服务器")
# 计算总权重
total_weight = sum(s.weight for s in healthy_servers)
random_weight = random.randint(1, total_weight)
# 选择服务器
current_weight = 0
for server in healthy_servers:
current_weight += server.weight
if random_weight <= current_weight:
return server
return healthy_servers[-1] # 默认返回最后一个
def mark_server_unhealthy(self, server_url: str):
"""标记服务器不健康"""
for server in self.servers:
if server.url == server_url:
server.error_count += 1
if server.error_count >= server.max_errors:
server.healthy = False
break
def mark_server_healthy(self, server_url: str):
"""标记服务器健康"""
for server in self.servers:
if server.url == server_url:
server.healthy = True
server.error_count = 0
break
class RobustAsyncClient:
def __init__(self, servers: List[str], max_retries: int = 3):
self.load_balancer = LoadBalancer([
Server(url, weight=1) for url in servers
])
self.max_retries = max_retries
self.session = None
async def __aenter__(self):
import aiohttp
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.session:
await self.session.close()
async def request(self, endpoint: str, **kwargs) -> Dict[str, Any]:
"""发送请求"""
for attempt in range(self.max_retries):
try:
server = self.load_balancer.get_weighted_server()
url = f"{server.url}{endpoint}"
print(f"请求 {url} (尝试 {attempt + 1})")
async with self.session.get(url, **kwargs) as response:
return {
'server': server.url,
'status': response.status,
'data': await response.text(),
'success': True
}
except Exception as e:
print(f"请求失败 {server.url}: {e}")
self.load_balancer.mark_server_unhealthy(server.url)
if attempt < self.max_retries - 1:
# 等待后重试
await asyncio.sleep(2 ** attempt)
else:
raise
# 使用示例
async def load_balancer_example():
servers = [

评论 (0)