Python异步编程终极指南:asyncio、aiohttp与多线程混合应用详解

OldEar
OldEar 2026-02-08T18:05:09+08:00
0 0 2

引言

在现代软件开发中,I/O密集型应用的性能优化已成为一个关键议题。Python作为一门广泛使用的编程语言,在处理并发任务方面有着独特的优势。异步编程作为一种高效的并发解决方案,能够显著提升程序的执行效率和资源利用率。本文将深入探讨Python异步编程的核心概念,详细介绍asyncio事件循环、aiohttp异步HTTP客户端以及异步数据库操作等关键技术,并提供实际的应用场景和最佳实践。

什么是异步编程

异步编程的基本概念

异步编程是一种编程范式,允许程序在等待I/O操作完成时执行其他任务,而不是阻塞等待。传统的同步编程模型中,当一个函数需要等待网络请求、文件读写或数据库查询等I/O操作时,整个线程会被阻塞,直到操作完成。而异步编程通过事件循环机制,让程序能够在等待期间处理其他任务,从而显著提高资源利用率和程序性能。

异步编程的优势

  1. 高并发性:单个线程可以同时处理大量并发请求
  2. 资源效率:减少线程创建和切换的开销
  3. 响应性:避免长时间阻塞导致的界面卡顿
  4. 可扩展性:能够轻松处理更多并发连接

asyncio核心概念详解

事件循环(Event Loop)

事件循环是异步编程的核心组件,它负责管理并调度所有异步任务。在Python中,asyncio提供了完整的事件循环实现。

import asyncio
import time

async def say_hello(name, delay):
    print(f"Hello {name} at {time.time()}")
    await asyncio.sleep(delay)
    print(f"Goodbye {name} at {time.time()}")

async def main():
    # 并发执行多个任务
    await asyncio.gather(
        say_hello("Alice", 1),
        say_hello("Bob", 2),
        say_hello("Charlie", 0.5)
    )

# 运行事件循环
if __name__ == "__main__":
    asyncio.run(main())

协程(Coroutine)

协程是异步编程的基础单元,使用async关键字定义。协程可以被暂停和恢复执行,通过await关键字来等待其他协程或异步操作的完成。

import asyncio

# 定义协程函数
async def fetch_data(url):
    print(f"Starting fetch from {url}")
    # 模拟网络请求延迟
    await asyncio.sleep(1)
    return f"Data from {url}"

async def process_data():
    # 创建多个协程任务
    tasks = [
        fetch_data("http://api1.com"),
        fetch_data("http://api2.com"),
        fetch_data("http://api3.com")
    ]
    
    # 并发执行所有任务
    results = await asyncio.gather(*tasks)
    return results

# 运行协程
asyncio.run(process_data())

任务(Task)与未来对象(Future)

在asyncio中,TaskFuture的子类,用于管理协程的执行。通过create_task()函数可以将协程包装成任务。

import asyncio

async def long_running_task(name, duration):
    print(f"Task {name} started")
    await asyncio.sleep(duration)
    print(f"Task {name} completed after {duration} seconds")
    return f"Result from {name}"

async def main():
    # 创建任务
    task1 = asyncio.create_task(long_running_task("Task-1", 2))
    task2 = asyncio.create_task(long_running_task("Task-2", 3))
    
    # 等待任务完成
    result1 = await task1
    result2 = await task2
    
    print(f"Results: {result1}, {result2}")

# 运行示例
asyncio.run(main())

aiohttp异步HTTP客户端

基础使用

aiohttp是Python中功能强大的异步HTTP客户端和服务器库,基于asyncio构建,能够高效处理大量并发HTTP请求。

import aiohttp
import asyncio
import time

async def fetch_url(session, url):
    """异步获取URL内容"""
    try:
        async with session.get(url) as response:
            if response.status == 200:
                content = await response.text()
                return {
                    'url': url,
                    'status': response.status,
                    'content_length': len(content)
                }
            else:
                return {
                    'url': url,
                    'status': response.status,
                    'error': 'HTTP Error'
                }
    except Exception as e:
        return {
            'url': url,
            'error': str(e)
        }

async def fetch_multiple_urls():
    urls = [
        'https://httpbin.org/delay/1',
        'https://httpbin.org/delay/2',
        'https://httpbin.org/json',
        'https://httpbin.org/uuid'
    ]
    
    # 创建会话
    async with aiohttp.ClientSession() as session:
        # 并发执行所有请求
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        
        for result in results:
            print(f"URL: {result['url']}")
            if 'error' in result:
                print(f"  Error: {result['error']}")
            else:
                print(f"  Status: {result['status']}, Length: {result['content_length']}")

# 运行示例
asyncio.run(fetch_multiple_urls())

高级配置与错误处理

import aiohttp
import asyncio
from typing import List, Dict

class AsyncHttpClient:
    def __init__(self, timeout: int = 30, max_concurrent: int = 100):
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def fetch_with_semaphore(self, session: aiohttp.ClientSession, url: str) -> Dict:
        """使用信号量限制并发数"""
        async with self.semaphore:  # 限制并发数
            try:
                async with session.get(url, timeout=self.timeout) as response:
                    content = await response.text()
                    return {
                        'url': url,
                        'status': response.status,
                        'content_length': len(content),
                        'success': True
                    }
            except Exception as e:
                return {
                    'url': url,
                    'error': str(e),
                    'success': False
                }
    
    async def fetch_batch(self, urls: List[str]) -> List[Dict]:
        """批量获取URL内容"""
        connector = aiohttp.TCPConnector(limit=100, limit_per_host=30)
        timeout = aiohttp.ClientTimeout(total=30)
        
        async with aiohttp.ClientSession(
            connector=connector,
            timeout=timeout
        ) as session:
            tasks = [self.fetch_with_semaphore(session, url) for url in urls]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            
            # 处理异常结果
            processed_results = []
            for result in results:
                if isinstance(result, Exception):
                    processed_results.append({'error': str(result)})
                else:
                    processed_results.append(result)
            
            return processed_results

# 使用示例
async def main():
    client = AsyncHttpClient(timeout=10, max_concurrent=50)
    urls = [
        'https://httpbin.org/delay/1',
        'https://httpbin.org/json',
        'https://httpbin.org/uuid'
    ]
    
    results = await client.fetch_batch(urls)
    for result in results:
        print(result)

# asyncio.run(main())

异步文件下载

import aiohttp
import asyncio
import os
from pathlib import Path

async def download_file(session: aiohttp.ClientSession, url: str, filename: str) -> bool:
    """异步下载文件"""
    try:
        async with session.get(url) as response:
            if response.status == 200:
                # 确保目录存在
                Path(filename).parent.mkdir(parents=True, exist_ok=True)
                
                # 写入文件
                with open(filename, 'wb') as f:
                    while True:
                        chunk = await response.content.read(1024)
                        if not chunk:
                            break
                        f.write(chunk)
                
                print(f"Downloaded: {filename}")
                return True
            else:
                print(f"Failed to download {url}: Status {response.status}")
                return False
    except Exception as e:
        print(f"Error downloading {url}: {e}")
        return False

async def download_multiple_files():
    """下载多个文件"""
    files_to_download = [
        ('https://httpbin.org/json', 'downloads/data.json'),
        ('https://httpbin.org/uuid', 'downloads/uuid.txt'),
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [
            download_file(session, url, filename)
            for url, filename in files_to_download
        ]
        
        results = await asyncio.gather(*tasks)
        print(f"Successfully downloaded {sum(results)} out of {len(results)} files")

# asyncio.run(download_multiple_files())

异步数据库操作

使用asyncpg进行PostgreSQL异步操作

import asyncio
import asyncpg
from typing import List, Dict

class AsyncDatabaseManager:
    def __init__(self, connection_string: str):
        self.connection_string = connection_string
        self.pool = None
    
    async def create_pool(self):
        """创建连接池"""
        self.pool = await asyncpg.create_pool(
            self.connection_string,
            min_size=5,
            max_size=20,
            command_timeout=60
        )
    
    async def close_pool(self):
        """关闭连接池"""
        if self.pool:
            await self.pool.close()
    
    async def fetch_users(self) -> List[Dict]:
        """获取所有用户"""
        async with self.pool.acquire() as connection:
            rows = await connection.fetch('SELECT * FROM users')
            return [dict(row) for row in rows]
    
    async def insert_user(self, name: str, email: str) -> Dict:
        """插入新用户"""
        async with self.pool.acquire() as connection:
            row = await connection.fetchrow(
                'INSERT INTO users (name, email) VALUES ($1, $2) RETURNING *',
                name, email
            )
            return dict(row)
    
    async def batch_insert_users(self, users: List[Dict]) -> int:
        """批量插入用户"""
        async with self.pool.acquire() as connection:
            # 使用事务确保数据一致性
            async with connection.transaction():
                count = 0
                for user in users:
                    await connection.execute(
                        'INSERT INTO users (name, email) VALUES ($1, $2)',
                        user['name'], user['email']
                    )
                    count += 1
                return count

# 使用示例
async def database_example():
    # 创建数据库管理器
    db_manager = AsyncDatabaseManager('postgresql://user:password@localhost/db')
    
    try:
        await db_manager.create_pool()
        
        # 插入用户
        user = await db_manager.insert_user('Alice', 'alice@example.com')
        print(f"Inserted user: {user}")
        
        # 批量插入
        users = [
            {'name': 'Bob', 'email': 'bob@example.com'},
            {'name': 'Charlie', 'email': 'charlie@example.com'}
        ]
        count = await db_manager.batch_insert_users(users)
        print(f"Batch inserted {count} users")
        
        # 查询用户
        users = await db_manager.fetch_users()
        print(f"Fetched {len(users)} users")
        
    except Exception as e:
        print(f"Database error: {e}")
    finally:
        await db_manager.close_pool()

# asyncio.run(database_example())

异步Redis操作

import asyncio
import aioredis
from typing import Dict, List, Optional

class AsyncRedisManager:
    def __init__(self, redis_url: str = 'redis://localhost:6379'):
        self.redis_url = redis_url
        self.redis = None
    
    async def connect(self):
        """连接到Redis"""
        self.redis = await aioredis.from_url(
            self.redis_url,
            encoding='utf-8',
            decode_responses=True
        )
    
    async def close(self):
        """关闭连接"""
        if self.redis:
            await self.redis.close()
    
    async def set_key_value(self, key: str, value: str, expire: int = None) -> bool:
        """设置键值对"""
        try:
            if expire:
                await self.redis.set(key, value, ex=expire)
            else:
                await self.redis.set(key, value)
            return True
        except Exception as e:
            print(f"Error setting key {key}: {e}")
            return False
    
    async def get_key_value(self, key: str) -> Optional[str]:
        """获取键值"""
        try:
            return await self.redis.get(key)
        except Exception as e:
            print(f"Error getting key {key}: {e}")
            return None
    
    async def batch_set(self, data: Dict[str, str]) -> int:
        """批量设置键值对"""
        try:
            pipe = self.redis.pipeline()
            for key, value in data.items():
                pipe.set(key, value)
            await pipe.execute()
            return len(data)
        except Exception as e:
            print(f"Error in batch set: {e}")
            return 0
    
    async def get_multiple_keys(self, keys: List[str]) -> List[Optional[str]]:
        """批量获取键值"""
        try:
            results = await self.redis.mget(*keys)
            return results
        except Exception as e:
            print(f"Error in batch get: {e}")
            return [None] * len(keys)

# 使用示例
async def redis_example():
    redis_manager = AsyncRedisManager()
    
    try:
        await redis_manager.connect()
        
        # 设置单个键值
        await redis_manager.set_key_value('user:1', 'Alice', expire=3600)
        value = await redis_manager.get_key_value('user:1')
        print(f"Retrieved value: {value}")
        
        # 批量操作
        data = {
            'session:user1': 'active',
            'session:user2': 'inactive',
            'session:user3': 'active'
        }
        count = await redis_manager.batch_set(data)
        print(f"Batch set {count} keys")
        
        # 批量获取
        keys = ['session:user1', 'session:user2', 'session:user3']
        results = await redis_manager.get_multiple_keys(keys)
        print(f"Batch get results: {results}")
        
    except Exception as e:
        print(f"Redis error: {e}")
    finally:
        await redis_manager.close()

# asyncio.run(redis_example())

异步与多线程混合应用

线程池与异步的协作

在某些场景下,我们需要将异步编程与传统的多线程编程相结合。例如,处理CPU密集型任务时可以使用线程池,而I/O密集型任务则使用异步。

import asyncio
import concurrent.futures
import time
from typing import List

# CPU密集型任务示例
def cpu_intensive_task(n: int) -> int:
    """模拟CPU密集型任务"""
    total = 0
    for i in range(n * 1000000):
        total += i * i
    return total

class AsyncWithThreadPool:
    def __init__(self):
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
    
    async def run_cpu_task(self, n: int) -> int:
        """在异步环境中执行CPU密集型任务"""
        loop = asyncio.get_event_loop()
        result = await loop.run_in_executor(
            self.executor,
            cpu_intensive_task,
            n
        )
        return result
    
    async def process_multiple_tasks(self, numbers: List[int]) -> List[int]:
        """处理多个CPU密集型任务"""
        tasks = [self.run_cpu_task(n) for n in numbers]
        results = await asyncio.gather(*tasks)
        return results
    
    async def cleanup(self):
        """清理资源"""
        self.executor.shutdown(wait=True)

# 使用示例
async def mixed_example():
    manager = AsyncWithThreadPool()
    
    try:
        # 模拟异步I/O操作
        print("Starting async I/O operations...")
        await asyncio.sleep(1)
        
        # 执行CPU密集型任务
        numbers = [1, 2, 3, 4]
        results = await manager.process_multiple_tasks(numbers)
        
        print(f"CPU intensive task results: {results}")
        
    finally:
        await manager.cleanup()

# asyncio.run(mixed_example())

异步HTTP请求与同步数据库操作结合

import aiohttp
import asyncio
import sqlite3
from contextlib import contextmanager
from typing import List, Dict

class AsyncApiWithDatabase:
    def __init__(self):
        self.db_file = 'example.db'
        self.session = None
    
    async def create_session(self):
        """创建HTTP会话"""
        self.session = aiohttp.ClientSession()
    
    async def close_session(self):
        """关闭HTTP会话"""
        if self.session:
            await self.session.close()
    
    async def fetch_api_data(self, url: str) -> Dict:
        """异步获取API数据"""
        try:
            async with self.session.get(url) as response:
                if response.status == 200:
                    data = await response.json()
                    return {
                        'url': url,
                        'data': data,
                        'success': True
                    }
                else:
                    return {
                        'url': url,
                        'error': f'HTTP {response.status}',
                        'success': False
                    }
        except Exception as e:
            return {
                'url': url,
                'error': str(e),
                'success': False
            }
    
    @contextmanager
    def get_db_connection(self):
        """获取数据库连接的上下文管理器"""
        conn = sqlite3.connect(self.db_file)
        try:
            yield conn
        finally:
            conn.close()
    
    def save_to_database(self, data: Dict) -> bool:
        """同步保存数据到数据库"""
        try:
            with self.get_db_connection() as conn:
                cursor = conn.cursor()
                cursor.execute('''
                    CREATE TABLE IF NOT EXISTS api_responses (
                        id INTEGER PRIMARY KEY AUTOINCREMENT,
                        url TEXT,
                        response TEXT,
                        timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
                    )
                ''')
                
                cursor.execute(
                    'INSERT INTO api_responses (url, response) VALUES (?, ?)',
                    (data['url'], str(data.get('data', {})))
                )
                conn.commit()
            return True
        except Exception as e:
            print(f"Database error: {e}")
            return False
    
    async def process_api_with_db(self, urls: List[str]) -> List[Dict]:
        """处理API请求并保存到数据库"""
        # 创建任务列表
        tasks = [self.fetch_api_data(url) for url in urls]
        
        # 并发执行API请求
        results = await asyncio.gather(*tasks)
        
        # 同步保存到数据库
        saved_results = []
        for result in results:
            if result['success']:
                success = self.save_to_database(result)
                saved_results.append({
                    'url': result['url'],
                    'saved': success,
                    'data': result.get('data')
                })
            else:
                saved_results.append({
                    'url': result['url'],
                    'saved': False,
                    'error': result.get('error')
                })
        
        return saved_results

# 使用示例
async def combined_example():
    api_manager = AsyncApiWithDatabase()
    
    try:
        await api_manager.create_session()
        
        # 模拟API请求
        urls = [
            'https://httpbin.org/json',
            'https://httpbin.org/uuid'
        ]
        
        results = await api_manager.process_api_with_db(urls)
        
        for result in results:
            print(f"URL: {result['url']}")
            print(f"Saved: {result['saved']}")
            if not result['saved']:
                print(f"Error: {result.get('error')}")
            
    except Exception as e:
        print(f"Error: {e}")
    finally:
        await api_manager.close_session()

# asyncio.run(combined_example())

性能优化与最佳实践

连接池管理

import aiohttp
import asyncio
from typing import Optional

class OptimizedHttpClient:
    def __init__(self):
        self.session: Optional[aiohttp.ClientSession] = None
        self.connector = aiohttp.TCPConnector(
            limit=100,          # 总连接数限制
            limit_per_host=30,  # 每个主机的连接数限制
            ttl_dns_cache=300,  # DNS缓存时间(秒)
            use_dns_cache=True,
            force_close=False   # 避免强制关闭连接
        )
        self.timeout = aiohttp.ClientTimeout(
            total=30,           # 总超时时间
            connect=10,         # 连接超时时间
            sock_read=15,       # 读取超时时间
            sock_connect=10     # Socket连接超时时间
        )
    
    async def __aenter__(self):
        """异步上下文管理器入口"""
        self.session = aiohttp.ClientSession(
            connector=self.connector,
            timeout=self.timeout
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """异步上下文管理器出口"""
        if self.session:
            await self.session.close()
    
    async def fetch(self, url: str) -> Optional[Dict]:
        """获取URL内容"""
        try:
            async with self.session.get(url) as response:
                if response.status == 200:
                    content = await response.text()
                    return {
                        'url': url,
                        'status': response.status,
                        'content_length': len(content),
                        'success': True
                    }
                else:
                    return {
                        'url': url,
                        'status': response.status,
                        'success': False
                    }
        except Exception as e:
            return {
                'url': url,
                'error': str(e),
                'success': False
            }

# 使用示例
async def optimized_example():
    urls = [
        'https://httpbin.org/json',
        'https://httpbin.org/uuid'
    ]
    
    async with OptimizedHttpClient() as client:
        tasks = [client.fetch(url) for url in urls]
        results = await asyncio.gather(*tasks)
        
        for result in results:
            print(result)

# asyncio.run(optimized_example())

重试机制与超时处理

import aiohttp
import asyncio
from typing import Optional, Dict
import random

class RobustHttpClient:
    def __init__(self, max_retries: int = 3, base_delay: float = 1.0):
        self.max_retries = max_retries
        self.base_delay = base_delay
        self.session = None
    
    async def create_session(self):
        """创建HTTP会话"""
        self.session = aiohttp.ClientSession()
    
    async def close_session(self):
        """关闭HTTP会话"""
        if self.session:
            await self.session.close()
    
    async def fetch_with_retry(self, url: str) -> Optional[Dict]:
        """带重试机制的请求"""
        for attempt in range(self.max_retries + 1):
            try:
                async with self.session.get(url, timeout=10) as response:
                    if response.status == 200:
                        content = await response.text()
                        return {
                            'url': url,
                            'status': response.status,
                            'content_length': len(content),
                            'success': True,
                            'attempt': attempt + 1
                        }
                    elif response.status >= 500:  # 服务器错误,重试
                        if attempt < self.max_retries:
                            delay = self.base_delay * (2 ** attempt) + random.uniform(0, 1)
                            await asyncio.sleep(delay)
                            continue
                    else:  # 客户端错误,不重试
                        return {
                            'url': url,
                            'status': response.status,
                            'success': False,
                            'attempt': attempt + 1
                        }
            except aiohttp.ClientError as e:
                if attempt < self.max_retries:
                    delay = self.base_delay * (2 ** attempt) + random.uniform(0, 1)
                    await asyncio.sleep(delay)
                    continue
                else:
                    return {
                        'url': url,
                        'error': str(e),
                        'success': False,
                        'attempt': attempt + 1
                    }
            except Exception as e:
                return {
                    'url': url,
                    'error': str(e),
                    'success': False,
                    'attempt': attempt + 1
        
        return None

# 使用示例
async def retry_example():
    client = RobustHttpClient(max_retries=3, base_delay=0.5)
    
    try:
        await client.create_session()
        
        urls = [
            'https://httpbin.org/delay/1',
            'https://httpbin.org/status/500'
        ]
        
        tasks = [client.fetch_with_retry(url) for url in urls]
        results = await asyncio.gather(*tasks)
        
        for result in results:
            print(result)
            
    finally:
        await client.close_session()

# asyncio.run(retry_example())

实际应用案例

Web爬虫系统

import aiohttp
import asyncio
from urllib.parse import urljoin, urlparse
import time
from collections import deque
from typing import Set, List, Dict

class AsyncWebCrawler:
    def __init__(self, max_concurrent: int = 10):
        self.max_concurrent = max_concurrent
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.visited_urls: Set[str] = set()
        self.session = None
    
    async def create_session(self):
        """创建HTTP会话"""
        self.session = aiohttp.ClientSession(
            timeout=aiohttp.ClientTimeout(total=30),
            headers={'User-Agent': 'AsyncCrawler/1.0'}
        )
    
    async def close_session(self):
        """关闭HTTP会话"""
        if self.session:
            await self.session.close()
    
    async def fetch_page(self, url: str) -> Dict:
        """获取网页内容"""
        async with self.semaphore:  # 限制并发数
            try:
                start_time = time.time()
                async with self.session.get(url) as response:
                    if response.status == 200:
                        content = await response.text()
                        fetch_time = time.time() - start_time
                        
                        return {
                            'url': url,
                            'status': response.status,
                            'content_length': len(content),
                            'fetch_time': fetch_time,
                            'success': True
                        }
                    else:
                        return {
                            'url': url,
                            'status': response.status,
                            'success': False
                        }
            except Exception as e:
                return {
                    'url': url,
                    'error': str(e),
                    'success': False
相关推荐
广告位招租

相似文章

    评论 (0)

    0/2000