Python异步编程实战:asyncio、aiohttp与异步数据库连接池优化

HotNina
HotNina 2026-02-01T21:04:00+08:00
0 0 2

引言

在现代Web开发中,处理高并发I/O密集型应用已成为开发者面临的核心挑战之一。传统的同步编程模型在面对大量并发请求时往往表现不佳,导致系统响应缓慢甚至崩溃。Python作为一门广泛应用的编程语言,其异步编程能力为解决这一问题提供了强有力的工具。

本文将深入探讨Python异步编程的核心技术栈,包括asyncio事件循环、aiohttp异步HTTP客户端以及异步数据库连接池等关键技术,帮助开发者构建高性能、高并发的I/O密集型应用。

Python异步编程基础:asyncio详解

asyncio模块概述

asyncio是Python标准库中用于编写异步I/O程序的核心模块。它基于事件循环(Event Loop)机制,允许程序在等待I/O操作完成时执行其他任务,从而显著提升并发处理能力。

import asyncio
import time

async def fetch_data(url):
    """模拟异步数据获取"""
    print(f"开始请求 {url}")
    # 模拟网络延迟
    await asyncio.sleep(1)
    print(f"完成请求 {url}")
    return f"数据来自 {url}"

async def main():
    start_time = time.time()
    
    # 顺序执行(同步方式)
    tasks = [fetch_data(f"url_{i}") for i in range(5)]
    results = await asyncio.gather(*tasks)
    
    end_time = time.time()
    print(f"总耗时: {end_time - start_time:.2f}秒")
    print("结果:", results)

# 运行异步程序
asyncio.run(main())

事件循环机制

asyncio的核心是事件循环,它负责调度和执行协程。事件循环通过run()方法启动,管理着所有待执行的协程任务。

import asyncio
import aiohttp
import time

class AsyncEventLoopExample:
    def __init__(self):
        self.loop = asyncio.get_event_loop()
    
    async def simple_coroutine(self, name, delay):
        """简单的协程示例"""
        print(f"协程 {name} 开始执行")
        await asyncio.sleep(delay)
        print(f"协程 {name} 执行完成")
        return f"结果来自 {name}"
    
    async def event_loop_demo(self):
        """事件循环演示"""
        # 创建多个协程任务
        tasks = [
            self.simple_coroutine("A", 1),
            self.simple_coroutine("B", 2),
            self.simple_coroutine("C", 1)
        ]
        
        # 并发执行所有任务
        results = await asyncio.gather(*tasks)
        return results

# 使用示例
async def run_event_loop_demo():
    demo = AsyncEventLoopExample()
    results = await demo.event_loop_demo()
    print("并发执行结果:", results)

# asyncio.run(run_event_loop_demo())

协程与任务管理

在异步编程中,协程(Coroutine)是异步函数的执行单位,而任务(Task)则是对协程的包装,提供更多的控制能力。

import asyncio
import time

async def task_with_delay(name, delay):
    """带延迟的任务"""
    print(f"任务 {name} 开始")
    await asyncio.sleep(delay)
    print(f"任务 {name} 完成")
    return f"任务 {name} 的结果"

async def task_manager():
    """任务管理示例"""
    
    # 方法1:使用 asyncio.create_task() 创建任务
    task1 = asyncio.create_task(task_with_delay("Task-1", 2))
    task2 = asyncio.create_task(task_with_delay("Task-2", 1))
    
    # 等待所有任务完成
    results = await asyncio.gather(task1, task2)
    print("任务结果:", results)
    
    # 方法2:使用 asyncio.wait() 管理任务
    tasks = [
        asyncio.create_task(task_with_delay("Task-3", 1)),
        asyncio.create_task(task_with_delay("Task-4", 2))
    ]
    
    done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
    
    for task in done:
        result = task.result()
        print(f"完成的任务结果: {result}")

# asyncio.run(task_manager())

异步HTTP客户端:aiohttp实战

aiohttp基础使用

aiohttp是Python中用于异步HTTP客户端和服务器的库,基于asyncio构建,能够高效处理大量并发HTTP请求。

import aiohttp
import asyncio
import time

class AsyncHTTPClient:
    def __init__(self, timeout=10):
        self.timeout = aiohttp.ClientTimeout(total=timeout)
    
    async def fetch_single_url(self, session, url):
        """单个URL获取数据"""
        try:
            async with session.get(url, timeout=self.timeout) as response:
                if response.status == 200:
                    data = await response.text()
                    return {
                        'url': url,
                        'status': response.status,
                        'data_length': len(data)
                    }
                else:
                    return {
                        'url': url,
                        'status': response.status,
                        'error': f'HTTP {response.status}'
                    }
        except Exception as e:
            return {
                'url': url,
                'error': str(e)
            }
    
    async def fetch_multiple_urls(self, urls):
        """并发获取多个URL"""
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch_single_url(session, url) for url in urls]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            return results

# 使用示例
async def demo_aiohttp():
    client = AsyncHTTPClient()
    
    # 测试URL列表
    test_urls = [
        'https://httpbin.org/delay/1',
        'https://httpbin.org/delay/2',
        'https://httpbin.org/json',
        'https://httpbin.org/uuid'
    ]
    
    start_time = time.time()
    results = await client.fetch_multiple_urls(test_urls)
    end_time = time.time()
    
    print(f"总耗时: {end_time - start_time:.2f}秒")
    for result in results:
        if isinstance(result, dict):
            print(f"URL: {result['url']}, 状态: {result.get('status', 'N/A')}")
        else:
            print(f"错误: {result}")

# asyncio.run(demo_aiohttp())

高级HTTP客户端配置

为了更好地控制并发请求和处理异常,我们需要对aiohttp进行更精细的配置。

import aiohttp
import asyncio
from typing import List, Dict, Any
import logging

class AdvancedAsyncHTTPClient:
    def __init__(self, 
                 max_concurrent=100,
                 timeout=30,
                 retry_attempts=3,
                 backoff_factor=1):
        """
        初始化高级异步HTTP客户端
        
        Args:
            max_concurrent: 最大并发连接数
            timeout: 请求超时时间
            retry_attempts: 重试次数
            backoff_factor: 退避因子
        """
        self.max_concurrent = max_concurrent
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.retry_attempts = retry_attempts
        self.backoff_factor = backoff_factor
        
        # 创建连接池
        connector = aiohttp.TCPConnector(
            limit=max_concurrent,  # 最大连接数
            limit_per_host=max_concurrent // 4,  # 每个主机的最大连接数
            ttl_dns_cache=300,  # DNS缓存时间
            use_dns_cache=True,
            ssl=False  # 根据需要设置SSL
        )
        
        self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=self.timeout,
            headers={
                'User-Agent': 'Python-Async-Client/1.0',
                'Accept': 'application/json'
            }
        )
    
    async def fetch_with_retry(self, url: str, **kwargs) -> Dict[str, Any]:
        """带重试机制的请求"""
        for attempt in range(self.retry_attempts):
            try:
                async with self.session.get(url, **kwargs) as response:
                    if response.status == 200:
                        data = await response.json()
                        return {
                            'url': url,
                            'status': response.status,
                            'data': data,
                            'attempt': attempt + 1
                        }
                    elif response.status >= 500 and attempt < self.retry_attempts - 1:
                        # 服务器错误,进行重试
                        await asyncio.sleep(self.backoff_factor * (2 ** attempt))
                        continue
                    else:
                        return {
                            'url': url,
                            'status': response.status,
                            'error': f'HTTP {response.status}',
                            'attempt': attempt + 1
                        }
            except Exception as e:
                if attempt < self.retry_attempts - 1:
                    await asyncio.sleep(self.backoff_factor * (2 ** attempt))
                    continue
                else:
                    return {
                        'url': url,
                        'error': str(e),
                        'attempt': attempt + 1
                    }
    
    async def fetch_batch(self, urls: List[str], batch_size: int = 50) -> List[Dict[str, Any]]:
        """批量处理URL请求"""
        results = []
        
        # 分批处理,避免一次性创建过多任务
        for i in range(0, len(urls), batch_size):
            batch = urls[i:i + batch_size]
            tasks = [self.fetch_with_retry(url) for url in batch]
            batch_results = await asyncio.gather(*tasks, return_exceptions=True)
            results.extend(batch_results if not isinstance(batch_results[0], Exception) else [])
            
        return results
    
    async def close(self):
        """关闭会话"""
        await self.session.close()

# 使用示例
async def advanced_client_demo():
    client = AdvancedAsyncHTTPClient(max_concurrent=20, retry_attempts=3)
    
    # 测试URL列表
    test_urls = [
        'https://httpbin.org/delay/1',
        'https://httpbin.org/json',
        'https://httpbin.org/uuid',
        'https://httpbin.org/status/200',
        'https://httpbin.org/status/500'
    ]
    
    try:
        results = await client.fetch_batch(test_urls, batch_size=3)
        for result in results:
            if isinstance(result, dict):
                print(f"URL: {result['url']}, 状态: {result.get('status', 'N/A')}")
    finally:
        await client.close()

# asyncio.run(advanced_client_demo())

异步HTTP服务端实现

除了客户端,aiohttp也提供了强大的异步服务器支持。

import aiohttp
from aiohttp import web
import json
import asyncio

class AsyncHTTPServer:
    def __init__(self, host='localhost', port=8080):
        self.host = host
        self.port = port
        self.app = web.Application()
        self.setup_routes()
    
    def setup_routes(self):
        """设置路由"""
        self.app.router.add_get('/', self.handle_root)
        self.app.router.add_get('/api/users/{user_id}', self.handle_user)
        self.app.router.add_post('/api/users', self.handle_create_user)
        self.app.router.add_get('/health', self.handle_health)
    
    async def handle_root(self, request):
        """根路径处理"""
        return web.json_response({
            'message': 'Hello from Async HTTP Server',
            'version': '1.0'
        })
    
    async def handle_user(self, request):
        """用户信息处理"""
        user_id = request.match_info['user_id']
        
        # 模拟数据库查询延迟
        await asyncio.sleep(0.1)
        
        user_data = {
            'id': user_id,
            'name': f'User {user_id}',
            'email': f'user{user_id}@example.com'
        }
        
        return web.json_response(user_data)
    
    async def handle_create_user(self, request):
        """创建用户"""
        try:
            data = await request.json()
            
            # 模拟异步处理
            await asyncio.sleep(0.2)
            
            response_data = {
                'id': str(int(time.time())),
                'name': data.get('name', ''),
                'email': data.get('email', ''),
                'created_at': time.time()
            }
            
            return web.json_response(response_data, status=201)
        except Exception as e:
            return web.json_response(
                {'error': str(e)}, 
                status=400
            )
    
    async def handle_health(self, request):
        """健康检查"""
        return web.json_response({'status': 'healthy', 'timestamp': time.time()})
    
    async def start_server(self):
        """启动服务器"""
        runner = web.AppRunner(self.app)
        await runner.setup()
        
        site = web.TCPSite(runner, self.host, self.port)
        await site.start()
        
        print(f"Server started at http://{self.host}:{self.port}")
        return runner
    
    async def stop_server(self, runner):
        """停止服务器"""
        await runner.cleanup()

# 使用示例
async def run_server():
    server = AsyncHTTPServer('localhost', 8080)
    runner = await server.start_server()
    
    try:
        # 保持服务器运行
        await asyncio.sleep(3600)  # 运行1小时
    except KeyboardInterrupt:
        print("停止服务器...")
    finally:
        await server.stop_server(runner)

# 注意:实际使用时请取消注释下面的代码来启动服务器
# asyncio.run(run_server())

异步数据库连接池优化

异步数据库连接基础

在I/O密集型应用中,数据库操作往往是性能瓶颈。使用异步数据库连接可以显著提升并发处理能力。

import asyncio
import asyncpg
import time
from typing import List, Dict, Any

class AsyncDatabaseManager:
    def __init__(self, 
                 host: str = 'localhost',
                 port: int = 5432,
                 database: str = 'testdb',
                 user: str = 'user',
                 password: str = 'password',
                 min_connections: int = 10,
                 max_connections: int = 20):
        """
        初始化异步数据库管理器
        
        Args:
            host: 数据库主机
            port: 数据库端口
            database: 数据库名称
            user: 用户名
            password: 密码
            min_connections: 最小连接数
            max_connections: 最大连接数
        """
        self.connection_string = f"postgresql://{user}:{password}@{host}:{port}/{database}"
        self.min_connections = min_connections
        self.max_connections = max_connections
        self.pool = None
    
    async def create_pool(self):
        """创建连接池"""
        self.pool = await asyncpg.create_pool(
            self.connection_string,
            min_size=self.min_connections,
            max_size=self.max_connections,
            command_timeout=60,
            statement_cache_size=100
        )
        print(f"数据库连接池创建成功,最小连接数: {self.min_connections}, 最大连接数: {self.max_connections}")
    
    async def close_pool(self):
        """关闭连接池"""
        if self.pool:
            await self.pool.close()
            print("数据库连接池已关闭")
    
    async def execute_query(self, query: str, params: tuple = None) -> List[Dict[str, Any]]:
        """执行查询并返回结果"""
        if not self.pool:
            raise Exception("数据库连接池未初始化")
        
        try:
            async with self.pool.acquire() as connection:
                rows = await connection.fetch(query, *params)
                return [dict(row) for row in rows]
        except Exception as e:
            print(f"查询执行失败: {e}")
            raise
    
    async def execute_update(self, query: str, params: tuple = None) -> int:
        """执行更新操作"""
        if not self.pool:
            raise Exception("数据库连接池未初始化")
        
        try:
            async with self.pool.acquire() as connection:
                result = await connection.execute(query, *params)
                return result.split(' ')[1]  # 返回受影响的行数
        except Exception as e:
            print(f"更新执行失败: {e}")
            raise

# 使用示例
async def database_demo():
    db_manager = AsyncDatabaseManager(
        host='localhost',
        port=5432,
        database='testdb',
        user='user',
        password='password',
        min_connections=5,
        max_connections=15
    )
    
    try:
        await db_manager.create_pool()
        
        # 创建测试表
        create_table_query = """
        CREATE TABLE IF NOT EXISTS users (
            id SERIAL PRIMARY KEY,
            name VARCHAR(100),
            email VARCHAR(100),
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
        """
        await db_manager.execute_update(create_table_query)
        
        # 插入测试数据
        insert_query = "INSERT INTO users (name, email) VALUES ($1, $2)"
        for i in range(10):
            await db_manager.execute_update(insert_query, (f"User {i}", f"user{i}@example.com"))
        
        # 查询数据
        select_query = "SELECT * FROM users WHERE id > $1 ORDER BY id"
        results = await db_manager.execute_query(select_query, (5,))
        
        print(f"查询结果: {len(results)} 条记录")
        for row in results:
            print(f"ID: {row['id']}, Name: {row['name']}")
            
    except Exception as e:
        print(f"数据库操作失败: {e}")
    finally:
        await db_manager.close_pool()

# asyncio.run(database_demo())

高级连接池配置与优化

为了更好地利用异步数据库连接,需要对连接池进行精细化配置。

import asyncio
import asyncpg
import time
from typing import List, Dict, Any
import logging

class OptimizedAsyncDatabaseManager:
    def __init__(self, connection_config: Dict[str, Any]):
        """
        优化的异步数据库管理器
        
        Args:
            connection_config: 连接配置字典
        """
        self.config = connection_config
        self.pool = None
        self.logger = logging.getLogger(__name__)
    
    async def create_optimized_pool(self):
        """创建优化的连接池"""
        try:
            self.pool = await asyncpg.create_pool(
                **self.config,
                # 连接池配置
                min_size=self.config.get('min_connections', 10),
                max_size=self.config.get('max_connections', 50),
                max_inactive_connection_lifetime=300,  # 5分钟无使用后关闭连接
                setup=self._setup_connection,  # 连接建立时的回调
                init=self._init_connection,  # 连接初始化
                # 性能优化参数
                command_timeout=60,
                statement_cache_size=1000,
                max_cached_statement_lifetime=300,
                max_cacheable_statement_size=1024
            )
            
            self.logger.info("连接池创建成功")
            
        except Exception as e:
            self.logger.error(f"连接池创建失败: {e}")
            raise
    
    async def _setup_connection(self, connection):
        """设置新连接"""
        # 设置时区和语言环境
        await connection.set_type_codec(
            'jsonb', encoder=json.dumps, decoder=json.loads, schema='pg_catalog'
        )
        
        # 设置查询超时
        await connection.execute("SET statement_timeout = 30000")
    
    async def _init_connection(self, connection):
        """初始化连接"""
        # 执行一些基础配置
        await connection.execute("SET search_path TO public")
        await connection.execute("SET timezone = 'UTC'")
    
    async def execute_batch_queries(self, queries: List[tuple]) -> List[Any]:
        """
        批量执行查询
        
        Args:
            queries: 查询列表,每个元素为 (query, params) 元组
            
        Returns:
            查询结果列表
        """
        if not self.pool:
            raise Exception("数据库连接池未初始化")
        
        results = []
        
        try:
            async with self.pool.acquire() as connection:
                for query, params in queries:
                    if params:
                        result = await connection.fetch(query, *params)
                    else:
                        result = await connection.fetch(query)
                    results.append([dict(row) for row in result])
                    
        except Exception as e:
            self.logger.error(f"批量查询执行失败: {e}")
            raise
        
        return results
    
    async def execute_transaction(self, transaction_queries: List[tuple]) -> bool:
        """
        执行事务
        
        Args:
            transaction_queries: 事务查询列表
            
        Returns:
            是否成功
        """
        if not self.pool:
            raise Exception("数据库连接池未初始化")
        
        try:
            async with self.pool.acquire() as connection:
                async with connection.transaction():
                    for query, params in transaction_queries:
                        await connection.execute(query, *params)
            return True
            
        except Exception as e:
            self.logger.error(f"事务执行失败: {e}")
            return False
    
    async def get_connection_stats(self) -> Dict[str, Any]:
        """获取连接池统计信息"""
        if not self.pool:
            raise Exception("数据库连接池未初始化")
        
        try:
            stats = {
                'size': self.pool.get_size(),
                'max_size': self.pool.get_max_size(),
                'min_size': self.pool.get_min_size(),
                'idle': self.pool.get_idle(),
                'in_use': self.pool.get_in_use()
            }
            return stats
        except Exception as e:
            self.logger.error(f"获取连接池统计信息失败: {e}")
            return {}

# 使用示例
async def optimized_database_demo():
    # 配置参数
    config = {
        'host': 'localhost',
        'port': 5432,
        'database': 'testdb',
        'user': 'user',
        'password': 'password',
        'min_connections': 5,
        'max_connections': 20,
        'connect_timeout': 10
    }
    
    db_manager = OptimizedAsyncDatabaseManager(config)
    
    try:
        await db_manager.create_optimized_pool()
        
        # 批量查询示例
        queries = [
            ("SELECT COUNT(*) as count FROM users", None),
            ("SELECT * FROM users WHERE id > $1 LIMIT 5", (10,)),
            ("SELECT email FROM users WHERE name LIKE $1", ('%User%',))
        ]
        
        results = await db_manager.execute_batch_queries(queries)
        print("批量查询结果:")
        for i, result in enumerate(results):
            print(f"查询 {i+1}: {len(result)} 条记录")
        
        # 事务示例
        transaction_queries = [
            ("INSERT INTO users (name, email) VALUES ($1, $2)", ('Transaction User 1', 'trans1@example.com')),
            ("INSERT INTO users (name, email) VALUES ($1, $2)", ('Transaction User 2', 'trans2@example.com'))
        ]
        
        success = await db_manager.execute_transaction(transaction_queries)
        print(f"事务执行结果: {'成功' if success else '失败'}")
        
        # 连接池统计
        stats = await db_manager.get_connection_stats()
        print("连接池统计:", stats)
        
    except Exception as e:
        print(f"数据库操作失败: {e}")
    finally:
        await db_manager.close_pool()

# asyncio.run(optimized_database_demo())

异步数据库连接池监控与维护

为了确保异步数据库连接池的稳定运行,需要实现监控和维护机制。

import asyncio
import asyncpg
import time
from typing import Dict, Any
import logging
import threading
from concurrent.futures import ThreadPoolExecutor

class MonitoredAsyncDatabaseManager:
    def __init__(self, connection_config: Dict[str, Any]):
        self.config = connection_config
        self.pool = None
        self.logger = logging.getLogger(__name__)
        self.monitoring_thread = None
        self.monitoring_active = False
        
        # 性能统计
        self.stats = {
            'queries_executed': 0,
            'transactions_completed': 0,
            'errors': 0,
            'connection_timeouts': 0,
            'avg_query_time': 0
        }
        
        # 最近的查询时间记录
        self.query_times = []
    
    async def create_monitored_pool(self):
        """创建带监控的连接池"""
        try:
            self.pool = await asyncpg.create_pool(
                **self.config,
                min_size=self.config.get('min_connections', 10),
                max_size=self.config.get('max_connections', 50),
                max_inactive_connection_lifetime=300,
                setup=self._setup_connection,
                init=self._init_connection,
                command_timeout=60
            )
            
            self.logger.info("监控连接池创建成功")
            
            # 启动监控线程
            self.monitoring_active = True
            self.monitoring_thread = threading.Thread(target=self._monitor_pool, daemon=True)
            self.monitoring_thread.start()
            
        except Exception as e:
            self.logger.error(f"监控连接池创建失败: {e}")
            raise
    
    async def _setup_connection(self, connection):
        """设置新连接"""
        await connection.execute("SET statement_timeout = 30000")
    
    async def _init_connection(self, connection):
        """初始化连接"""
        await connection.execute("SET search_path TO public")
        await connection.execute("SET timezone = 'UTC'")
    
    async def execute_query_with_monitoring(self, query: str, params: tuple = None) -> Dict[str, Any]:
        """带监控的查询执行"""
        start_time = time.time()
        
        try:
            if not self.pool:
                raise Exception("数据库连接池未初始化")
            
            async with self.pool.acquire() as connection:
                rows = await connection.fetch(query, *params)
                result = [dict(row) for row in rows]
                
                # 更新统计信息
                query_time = time.time() - start_time
                self._update_stats(query_time, success=True)
                
                return {
                    'success': True,
                    'data': result,
                    'query_time': query_time,
                    'timestamp': time.time()
                }
                
        except asyncpg.exceptions.ConnectionDoesNotExistError:
            self._update_stats(time.time() - start_time, success=False, error_type='connection_lost')
            raise
        except asyncpg.exceptions.TimeoutError:
            self._update_stats(time.time() - start_time, success=False, error_type='timeout')
            raise
        except Exception as e:
            self._update_stats(time.time() - start_time, success=False, error_type='general')
            raise
    
    def _update_stats(self, query_time: float, success: bool = True, error_type: str = None):
        """更新统计信息"""
        self.stats['queries_executed'] += 1
        
        if not success:
            self.stats['errors'] += 1
            if error_type == 'timeout':
                self.stats['connection_timeouts'] += 1
        else:
            # 计算平均查询时间
            self.query_times.append(query_time)
            if len(self.query_times) > 100:
                self.query_times.pop(0)
            
            if self.query_times:
                self.stats['avg_query_time'] = sum(self.query_times
相关推荐
广告位招租

相似文章

    评论 (0)

    0/2000