Python异步编程最佳实践:asyncio、并发控制与高并发场景优化

Violet340
Violet340 2026-01-29T05:16:16+08:00
0 0 1

引言

在现代Python开发中,异步编程已成为提升应用性能和响应能力的关键技术。随着网络请求、数据库操作等I/O密集型任务的增多,传统的同步编程模式已经无法满足高性能应用的需求。Python的asyncio库为开发者提供了强大的异步编程支持,通过事件循环机制和协程,能够有效处理高并发场景下的任务执行。

本文将深入探讨Python异步编程的核心概念和最佳实践,从基础的asyncio使用到复杂的并发控制策略,再到实际的性能优化技巧,帮助开发者构建高效、可靠的异步应用。

1. 异步编程基础概念

1.1 协程与异步函数

在Python中,协程(Coroutine)是一种可以暂停执行并在稍后恢复的函数。异步函数使用async def关键字定义,返回一个协程对象而不是直接执行结果。

import asyncio

# 定义异步函数
async def fetch_data(url):
    print(f"开始获取 {url}")
    # 模拟网络请求延迟
    await asyncio.sleep(1)
    print(f"完成获取 {url}")
    return f"数据来自 {url}"

# 创建协程对象
coroutine = fetch_data("https://api.example.com/data")
print(type(coroutine))  # <class 'coroutine'>

1.2 事件循环

事件循环是异步编程的核心,负责调度和执行协程。Python的asyncio库提供了完整的事件循环实现:

import asyncio

async def main():
    print("开始执行")
    await asyncio.sleep(1)
    print("执行完成")

# 运行事件循环
asyncio.run(main())

1.3 await关键字

await关键字用于等待协程的执行结果,只有在异步函数中才能使用:

import asyncio

async def calculate(x, y):
    await asyncio.sleep(0.5)
    return x + y

async def main():
    # 等待异步函数执行
    result = await calculate(10, 20)
    print(f"计算结果: {result}")

asyncio.run(main())

2. asyncio核心组件详解

2.1 Task对象

Task是Future的子类,用于包装协程并允许在事件循环中调度执行:

import asyncio
import time

async def slow_operation(name):
    print(f"开始 {name}")
    await asyncio.sleep(2)
    print(f"完成 {name}")
    return f"{name} 的结果"

async def main():
    # 创建Task对象
    task1 = asyncio.create_task(slow_operation("任务1"))
    task2 = asyncio.create_task(slow_operation("任务2"))
    
    # 等待所有任务完成
    result1 = await task1
    result2 = await task2
    
    print(f"结果: {result1}, {result2}")

asyncio.run(main())

2.2 Future对象

Future代表一个异步操作的结果,可以被回调函数处理:

import asyncio

def callback(future):
    print(f"Future完成,结果: {future.result()}")

async def async_operation():
    await asyncio.sleep(1)
    return "异步操作完成"

async def main():
    # 创建Future对象
    future = asyncio.ensure_future(async_operation())
    
    # 添加回调函数
    future.add_done_callback(callback)
    
    # 等待结果
    result = await future
    print(f"最终结果: {result}")

asyncio.run(main())

2.3 事件循环管理

正确管理事件循环对于异步程序的性能至关重要:

import asyncio
import time

class AsyncManager:
    def __init__(self):
        self.loop = None
    
    async def setup(self):
        """初始化异步环境"""
        self.loop = asyncio.get_running_loop()
        print("异步环境已准备就绪")
    
    async def cleanup(self):
        """清理资源"""
        if self.loop:
            print("正在清理资源...")
            await asyncio.sleep(0.1)  # 模拟清理时间
    
    async def run_task(self, task_name):
        """执行任务"""
        print(f"开始执行 {task_name}")
        await asyncio.sleep(1)
        print(f"完成执行 {task_name}")
        return f"{task_name} 完成"

# 使用示例
async def main():
    manager = AsyncManager()
    await manager.setup()
    
    try:
        tasks = [
            manager.run_task("任务A"),
            manager.run_task("任务B"),
            manager.run_task("任务C")
        ]
        
        results = await asyncio.gather(*tasks)
        print(f"所有任务结果: {results}")
        
    finally:
        await manager.cleanup()

asyncio.run(main())

3. 并发控制策略

3.1 信号量控制并发数量

信号量(Semaphore)是控制并发访问资源的重要工具:

import asyncio
import aiohttp
import time

class ConcurrencyController:
    def __init__(self, max_concurrent=5):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.session = None
    
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
    
    async def fetch_url(self, url):
        """使用信号量控制并发"""
        async with self.semaphore:  # 获取信号量
            try:
                print(f"开始请求 {url}")
                async with self.session.get(url) as response:
                    result = await response.text()
                    print(f"完成请求 {url}")
                    return result
            except Exception as e:
                print(f"请求失败 {url}: {e}")
                return None

async def main():
    urls = [
        f"https://httpbin.org/delay/{i%3+1}" 
        for i in range(10)
    ]
    
    async with ConcurrencyController(max_concurrent=3) as controller:
        tasks = [controller.fetch_url(url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        print(f"总共获取 {len(results)} 个结果")
        successful = sum(1 for r in results if not isinstance(r, Exception))
        print(f"成功: {successful}, 失败: {len(results) - successful}")

# asyncio.run(main())

3.2 限制并发任务数

使用队列和工作协程模式控制并发:

import asyncio
import time

class TaskQueue:
    def __init__(self, max_workers=5):
        self.queue = asyncio.Queue()
        self.workers = max_workers
        self.results = []
    
    async def worker(self, worker_id):
        """工作协程"""
        while True:
            try:
                # 从队列获取任务
                task_data = await self.queue.get()
                if task_data is None:  # 结束信号
                    break
                
                task_name, delay = task_data
                print(f"工人 {worker_id} 开始处理 {task_name}")
                
                # 模拟工作
                await asyncio.sleep(delay)
                
                result = f"{task_name} 完成,耗时 {delay}s"
                self.results.append(result)
                print(f"工人 {worker_id} 完成 {task_name}")
                
                # 标记任务完成
                self.queue.task_done()
                
            except Exception as e:
                print(f"工人 {worker_id} 发生错误: {e}")
    
    async def add_task(self, task_name, delay):
        """添加任务到队列"""
        await self.queue.put((task_name, delay))
    
    async def run(self):
        """运行任务队列"""
        # 创建工作协程
        workers = [
            asyncio.create_task(self.worker(i)) 
            for i in range(self.workers)
        ]
        
        # 等待所有任务完成
        await self.queue.join()
        
        # 停止所有工作协程
        for _ in range(self.workers):
            await self.queue.put(None)
        
        # 等待所有工作协程结束
        await asyncio.gather(*workers)

async def main():
    task_queue = TaskQueue(max_workers=3)
    
    # 添加任务
    tasks = [
        ("任务1", 1),
        ("任务2", 2),
        ("任务3", 1),
        ("任务4", 3),
        ("任务5", 1),
        ("任务6", 2),
    ]
    
    for task_name, delay in tasks:
        await task_queue.add_task(task_name, delay)
    
    # 运行队列
    start_time = time.time()
    await task_queue.run()
    end_time = time.time()
    
    print(f"总耗时: {end_time - start_time:.2f}s")
    for result in task_queue.results:
        print(result)

# asyncio.run(main())

3.3 动态并发控制

根据系统负载动态调整并发数量:

import asyncio
import time
import psutil
from collections import deque

class DynamicConcurrencyController:
    def __init__(self, max_concurrent=10, min_concurrent=1):
        self.max_concurrent = max_concurrent
        self.min_concurrent = min_concurrent
        self.current_concurrent = min_concurrent
        self.semaphore = asyncio.Semaphore(self.current_concurrent)
        self.load_history = deque(maxlen=10)
    
    def get_system_load(self):
        """获取系统负载"""
        cpu_percent = psutil.cpu_percent(interval=0.1)
        memory_percent = psutil.virtual_memory().percent
        return cpu_percent, memory_percent
    
    def adjust_concurrency(self):
        """根据系统负载调整并发数"""
        cpu_load, memory_load = self.get_system_load()
        
        # 简单的负载判断逻辑
        if cpu_load > 80 or memory_load > 80:
            # 高负载,减少并发
            self.current_concurrent = max(
                self.min_concurrent, 
                int(self.current_concurrent * 0.7)
            )
        elif cpu_load < 30 and memory_load < 30:
            # 低负载,增加并发
            self.current_concurrent = min(
                self.max_concurrent,
                int(self.current_concurrent * 1.2)
            )
        
        # 更新信号量
        self.semaphore = asyncio.Semaphore(self.current_concurrent)
        print(f"当前并发数: {self.current_concurrent}")
    
    async def execute_with_control(self, task_func, *args, **kwargs):
        """在控制下执行任务"""
        self.adjust_concurrency()
        
        async with self.semaphore:
            return await task_func(*args, **kwargs)

# 使用示例
async def sample_task(task_id, delay):
    print(f"任务 {task_id} 开始")
    await asyncio.sleep(delay)
    print(f"任务 {task_id} 完成")
    return f"任务 {task_id} 结果"

async def main():
    controller = DynamicConcurrencyController(max_concurrent=10, min_concurrent=2)
    
    tasks = [
        controller.execute_with_control(sample_task, i, 0.5)
        for i in range(20)
    ]
    
    results = await asyncio.gather(*tasks)
    print(f"所有任务完成,结果数量: {len(results)}")

# asyncio.run(main())

4. 异步数据库操作

4.1 使用asyncpg进行PostgreSQL异步操作

import asyncio
import asyncpg
from typing import List, Dict, Any

class AsyncDatabaseManager:
    def __init__(self, connection_string: str):
        self.connection_string = connection_string
        self.pool = None
    
    async def connect(self):
        """建立连接池"""
        self.pool = await asyncpg.create_pool(
            self.connection_string,
            min_size=5,
            max_size=20,
            command_timeout=60
        )
        print("数据库连接池已建立")
    
    async def disconnect(self):
        """关闭连接池"""
        if self.pool:
            await self.pool.close()
            print("数据库连接池已关闭")
    
    async def execute_query(self, query: str, params: tuple = None) -> List[Dict[str, Any]]:
        """执行查询"""
        if not self.pool:
            raise Exception("数据库未连接")
        
        try:
            async with self.pool.acquire() as connection:
                rows = await connection.fetch(query, *params)
                return [dict(row) for row in rows]
        except Exception as e:
            print(f"查询错误: {e}")
            raise
    
    async def execute_update(self, query: str, params: tuple = None) -> int:
        """执行更新操作"""
        if not self.pool:
            raise Exception("数据库未连接")
        
        try:
            async with self.pool.acquire() as connection:
                result = await connection.execute(query, *params)
                return result.split()[1]  # 返回影响的行数
        except Exception as e:
            print(f"更新错误: {e}")
            raise
    
    async def batch_insert(self, table: str, data: List[Dict[str, Any]]) -> int:
        """批量插入数据"""
        if not self.pool or not data:
            return 0
        
        try:
            columns = list(data[0].keys())
            placeholders = ', '.join([f'${i+1}' for i in range(len(columns))])
            column_names = ', '.join(columns)
            
            query = f"INSERT INTO {table} ({column_names}) VALUES ({placeholders})"
            
            async with self.pool.acquire() as connection:
                # 使用事务批量插入
                async with connection.transaction():
                    for row in data:
                        await connection.execute(query, *row.values())
                
                return len(data)
        except Exception as e:
            print(f"批量插入错误: {e}")
            raise

# 使用示例
async def database_example():
    # 连接数据库(请替换为实际的连接字符串)
    db_manager = AsyncDatabaseManager("postgresql://user:password@localhost/dbname")
    
    try:
        await db_manager.connect()
        
        # 创建表
        await db_manager.execute_update("""
            CREATE TABLE IF NOT EXISTS users (
                id SERIAL PRIMARY KEY,
                name VARCHAR(100),
                email VARCHAR(100),
                created_at TIMESTAMP DEFAULT NOW()
            )
        """)
        
        # 插入数据
        sample_data = [
            {"name": "Alice", "email": "alice@example.com"},
            {"name": "Bob", "email": "bob@example.com"},
            {"name": "Charlie", "email": "charlie@example.com"}
        ]
        
        inserted_count = await db_manager.batch_insert("users", sample_data)
        print(f"插入了 {inserted_count} 条记录")
        
        # 查询数据
        users = await db_manager.execute_query("SELECT * FROM users WHERE name = $1", ("Alice",))
        print(f"查询结果: {users}")
        
    except Exception as e:
        print(f"数据库操作错误: {e}")
    finally:
        await db_manager.disconnect()

# asyncio.run(database_example())

4.2 异步Redis操作

import asyncio
import aioredis
from typing import Any, Optional

class AsyncRedisManager:
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis_url = redis_url
        self.redis = None
    
    async def connect(self):
        """连接Redis"""
        self.redis = await aioredis.from_url(
            self.redis_url,
            encoding="utf-8",
            decode_responses=True
        )
        print("Redis连接成功")
    
    async def disconnect(self):
        """断开Redis连接"""
        if self.redis:
            await self.redis.close()
            print("Redis连接已关闭")
    
    async def get_data(self, key: str) -> Optional[str]:
        """获取数据"""
        if not self.redis:
            raise Exception("Redis未连接")
        
        try:
            return await self.redis.get(key)
        except Exception as e:
            print(f"获取数据错误: {e}")
            return None
    
    async def set_data(self, key: str, value: str, expire: int = 3600) -> bool:
        """设置数据"""
        if not self.redis:
            raise Exception("Redis未连接")
        
        try:
            await self.redis.set(key, value, ex=expire)
            return True
        except Exception as e:
            print(f"设置数据错误: {e}")
            return False
    
    async def batch_get(self, keys: List[str]) -> Dict[str, str]:
        """批量获取数据"""
        if not self.redis:
            raise Exception("Redis未连接")
        
        try:
            results = await self.redis.mget(*keys)
            return dict(zip(keys, results))
        except Exception as e:
            print(f"批量获取错误: {e}")
            return {}
    
    async def pipeline_operations(self, operations: List[tuple]) -> List[Any]:
        """管道操作"""
        if not self.redis:
            raise Exception("Redis未连接")
        
        try:
            pipe = self.redis.pipeline()
            for op in operations:
                operation_type, *args = op
                if operation_type == 'get':
                    pipe.get(args[0])
                elif operation_type == 'set':
                    pipe.set(args[0], args[1])
            
            return await pipe.execute()
        except Exception as e:
            print(f"管道操作错误: {e}")
            return []

# 使用示例
async def redis_example():
    redis_manager = AsyncRedisManager("redis://localhost:6379")
    
    try:
        await redis_manager.connect()
        
        # 设置数据
        await redis_manager.set_data("user:1", "Alice", 3600)
        await redis_manager.set_data("user:2", "Bob", 3600)
        
        # 获取单个数据
        user1 = await redis_manager.get_data("user:1")
        print(f"获取用户1: {user1}")
        
        # 批量获取
        users = await redis_manager.batch_get(["user:1", "user:2", "user:3"])
        print(f"批量获取结果: {users}")
        
        # 管道操作
        operations = [
            ('set', 'counter', '100'),
            ('get', 'counter')
        ]
        results = await redis_manager.pipeline_operations(operations)
        print(f"管道操作结果: {results}")
        
    except Exception as e:
        print(f"Redis操作错误: {e}")
    finally:
        await redis_manager.disconnect()

# asyncio.run(redis_example())

5. 高并发性能优化

5.1 连接池优化

import asyncio
import aiohttp
from typing import Dict, Any

class OptimizedHttpClient:
    def __init__(self, 
                 max_connections: int = 100,
                 timeout: int = 30,
                 keepalive_timeout: int = 60):
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.connector = aiohttp.TCPConnector(
            limit=max_connections,
            limit_per_host=max_connections // 4,
            ttl_dns_cache=300,
            use_dns_cache=True,
            keepalive_timeout=keepalive_timeout
        )
        self.session = None
    
    async def __aenter__(self):
        self.session = aiohttp.ClientSession(
            timeout=self.timeout,
            connector=self.connector
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
    
    async def fetch(self, url: str, **kwargs) -> Dict[str, Any]:
        """获取数据"""
        try:
            async with self.session.get(url, **kwargs) as response:
                return {
                    'url': url,
                    'status': response.status,
                    'headers': dict(response.headers),
                    'content': await response.text(),
                    'success': True
                }
        except Exception as e:
            return {
                'url': url,
                'error': str(e),
                'success': False
            }

async def performance_test():
    urls = [
        f"https://httpbin.org/delay/{i%3+1}" 
        for i in range(50)
    ]
    
    # 性能测试
    start_time = asyncio.get_event_loop().time()
    
    async with OptimizedHttpClient(max_connections=20) as client:
        tasks = [client.fetch(url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
    
    end_time = asyncio.get_event_loop().time()
    
    print(f"总耗时: {end_time - start_time:.2f}s")
    successful = sum(1 for r in results if isinstance(r, dict) and r.get('success'))
    print(f"成功请求: {successful}/{len(urls)}")

# asyncio.run(performance_test())

5.2 缓存策略优化

import asyncio
import time
from typing import Dict, Any, Optional
from collections import OrderedDict

class AsyncCache:
    def __init__(self, max_size: int = 1000, ttl: int = 3600):
        self.max_size = max_size
        self.ttl = ttl
        self.cache = OrderedDict()
        self.timestamps = {}
        self.lock = asyncio.Lock()
    
    async def get(self, key: str) -> Optional[Any]:
        """获取缓存数据"""
        async with self.lock:
            if key not in self.cache:
                return None
            
            # 检查是否过期
            if time.time() - self.timestamps.get(key, 0) > self.ttl:
                del self.cache[key]
                del self.timestamps[key]
                return None
            
            # 移动到末尾(最近使用)
            self.cache.move_to_end(key)
            return self.cache[key]
    
    async def set(self, key: str, value: Any):
        """设置缓存数据"""
        async with self.lock:
            # 如果已存在,移动到末尾
            if key in self.cache:
                self.cache.move_to_end(key)
            
            # 检查是否需要删除旧数据
            if len(self.cache) >= self.max_size:
                oldest_key = next(iter(self.cache))
                del self.cache[oldest_key]
                del self.timestamps[oldest_key]
            
            # 设置新数据
            self.cache[key] = value
            self.timestamps[key] = time.time()
    
    async def invalidate(self, key: str):
        """删除缓存"""
        async with self.lock:
            if key in self.cache:
                del self.cache[key]
                del self.timestamps[key]
    
    async def cleanup_expired(self):
        """清理过期数据"""
        async with self.lock:
            current_time = time.time()
            expired_keys = [
                key for key, timestamp in self.timestamps.items()
                if current_time - timestamp > self.ttl
            ]
            
            for key in expired_keys:
                del self.cache[key]
                del self.timestamps[key]

# 使用示例
async def cache_example():
    cache = AsyncCache(max_size=100, ttl=10)
    
    # 测试缓存
    await cache.set("key1", "value1")
    result = await cache.get("key1")
    print(f"获取缓存: {result}")
    
    # 模拟过期
    await asyncio.sleep(11)
    result = await cache.get("key1")
    print(f"过期后获取: {result}")
    
    # 批量操作
    tasks = [
        cache.set(f"key{i}", f"value{i}") 
        for i in range(50)
    ]
    await asyncio.gather(*tasks)
    
    print(f"缓存大小: {len(cache.cache)}")

# asyncio.run(cache_example())

5.3 负载均衡与错误处理

import asyncio
import random
from typing import List, Dict, Any
from dataclasses import dataclass

@dataclass
class Server:
    url: str
    weight: int = 1
    healthy: bool = True
    error_count: int = 0
    max_errors: int = 3

class LoadBalancer:
    def __init__(self, servers: List[Server]):
        self.servers = servers
        self.current_index = 0
    
    def get_next_server(self) -> Server:
        """获取下一个服务器"""
        # 简单的轮询策略
        server = self.servers[self.current_index]
        self.current_index = (self.current_index + 1) % len(self.servers)
        return server
    
    def get_weighted_server(self) -> Server:
        """获取加权服务器(权重高的优先)"""
        # 过滤健康服务器
        healthy_servers = [s for s in self.servers if s.healthy]
        
        if not healthy_servers:
            raise Exception("没有可用的服务器")
        
        # 计算总权重
        total_weight = sum(s.weight for s in healthy_servers)
        random_weight = random.randint(1, total_weight)
        
        # 选择服务器
        current_weight = 0
        for server in healthy_servers:
            current_weight += server.weight
            if random_weight <= current_weight:
                return server
        
        return healthy_servers[-1]  # 默认返回最后一个
    
    def mark_server_unhealthy(self, server_url: str):
        """标记服务器不健康"""
        for server in self.servers:
            if server.url == server_url:
                server.error_count += 1
                if server.error_count >= server.max_errors:
                    server.healthy = False
                break
    
    def mark_server_healthy(self, server_url: str):
        """标记服务器健康"""
        for server in self.servers:
            if server.url == server_url:
                server.healthy = True
                server.error_count = 0
                break

class RobustAsyncClient:
    def __init__(self, servers: List[str], max_retries: int = 3):
        self.load_balancer = LoadBalancer([
            Server(url, weight=1) for url in servers
        ])
        self.max_retries = max_retries
        self.session = None
    
    async def __aenter__(self):
        import aiohttp
        self.session = aiohttp.ClientSession()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
    
    async def request(self, endpoint: str, **kwargs) -> Dict[str, Any]:
        """发送请求"""
        for attempt in range(self.max_retries):
            try:
                server = self.load_balancer.get_weighted_server()
                
                url = f"{server.url}{endpoint}"
                print(f"请求 {url} (尝试 {attempt + 1})")
                
                async with self.session.get(url, **kwargs) as response:
                    return {
                        'server': server.url,
                        'status': response.status,
                        'data': await response.text(),
                        'success': True
                    }
                    
            except Exception as e:
                print(f"请求失败 {server.url}: {e}")
                self.load_balancer.mark_server_unhealthy(server.url)
                
                if attempt < self.max_retries - 1:
                    # 等待后重试
                    await asyncio.sleep(2 ** attempt)
                else:
                    raise

# 使用示例
async def load_balancer_example():
    servers = [
相关推荐
广告位招租

相似文章

    评论 (0)

    0/2000