Python异步编程最佳实践:asyncio、aiohttp与多线程协同处理高并发请求

ShallowFire
ShallowFire 2026-02-09T07:05:09+08:00
0 0 0

引言

在现代Web应用开发中,高并发请求处理已成为一个重要的技术挑战。传统的同步编程模型在面对大量并发请求时往往表现不佳,导致系统响应缓慢甚至崩溃。Python作为一门广泛应用的编程语言,在异步编程领域也提供了强大的支持。本文将深入探讨如何使用asyncio、aiohttp以及多线程技术来构建高性能的异步服务,为Web爬虫、API服务等场景提供最佳实践指南。

Python异步编程基础

什么是异步编程

异步编程是一种编程范式,允许程序在等待I/O操作完成的同时执行其他任务。与传统的同步编程不同,异步编程不会阻塞主线程,从而能够更高效地处理并发请求。

在Python中,异步编程主要通过asyncawait关键字来实现。async用于定义协程函数,而await用于等待协程的执行结果。

asyncio模块详解

asyncio是Python标准库中用于编写异步I/O应用程序的核心模块。它提供了事件循环、协程、任务等基础组件,是构建异步应用的基础。

import asyncio

# 定义一个简单的协程函数
async def hello_world():
    print("Hello")
    await asyncio.sleep(1)  # 模拟异步操作
    print("World")

# 运行协程
asyncio.run(hello_world())

事件循环机制

事件循环是异步编程的核心,它负责调度和执行协程。Python的asyncio库通过事件循环来管理异步任务的执行顺序。

import asyncio
import time

async def task(name, delay):
    print(f"Task {name} started")
    await asyncio.sleep(delay)
    print(f"Task {name} completed after {delay}s")

async def main():
    # 创建多个协程任务
    tasks = [
        task("A", 1),
        task("B", 2),
        task("C", 1.5)
    ]
    
    # 并发执行所有任务
    await asyncio.gather(*tasks)

# 运行主函数
asyncio.run(main())

aiohttp异步HTTP框架

aiohttp简介

aiohttp是一个基于asyncio的异步HTTP客户端和服务器框架,专门用于构建高性能的Web应用和服务。它提供了完整的HTTP协议实现,并且能够很好地与asyncio集成。

from aiohttp import web
import asyncio

async def handle(request):
    name = request.match_info.get('name', 'Anonymous')
    text = f"Hello, {name}"
    return web.Response(text=text)

app = web.Application()
app.router.add_get('/', handle)
app.router.add_get('/{name}', handle)

# 启动服务器
if __name__ == '__main__':
    web.run_app(app, host='localhost', port=8080)

高性能异步Web服务

在构建高性能Web服务时,aiohttp能够充分利用异步特性来处理大量并发请求。

import asyncio
from aiohttp import web
import json
import time

class AsyncWebService:
    def __init__(self):
        self.app = web.Application()
        self.setup_routes()
        
    def setup_routes(self):
        self.app.router.add_get('/', self.home)
        self.app.router.add_get('/api/users/{user_id}', self.get_user)
        self.app.router.add_post('/api/users', self.create_user)
        
    async def home(self, request):
        return web.Response(text="Welcome to Async Web Service")
        
    async def get_user(self, request):
        user_id = request.match_info['user_id']
        # 模拟数据库查询
        await asyncio.sleep(0.1)  # 异步等待
        user_data = {
            "id": user_id,
            "name": f"User_{user_id}",
            "email": f"user{user_id}@example.com"
        }
        return web.Response(
            text=json.dumps(user_data),
            content_type='application/json'
        )
        
    async def create_user(self, request):
        data = await request.json()
        # 模拟异步处理
        await asyncio.sleep(0.2)
        response_data = {
            "status": "created",
            "user": data
        }
        return web.Response(
            text=json.dumps(response_data),
            content_type='application/json'
        )

# 启动服务
async def main():
    service = AsyncWebService()
    runner = web.AppRunner(service.app)
    await runner.setup()
    site = web.TCPSite(runner, 'localhost', 8080)
    await site.start()
    print("Server started at http://localhost:8080")
    
    # 保持服务运行
    while True:
        await asyncio.sleep(3600)

# if __name__ == '__main__':
#     asyncio.run(main())

异步爬虫实战

高性能Web爬虫设计

异步爬虫能够显著提高数据抓取效率,特别是在处理大量网页时。通过aiohttpasyncio,我们可以轻松构建高效的异步爬虫。

import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time
from urllib.parse import urljoin, urlparse

class AsyncWebCrawler:
    def __init__(self, max_concurrent=10, timeout=5):
        self.max_concurrent = max_concurrent
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.semaphore = asyncio.Semaphore(max_concurrent)
        
    async def fetch_page(self, session, url):
        """获取单个页面内容"""
        try:
            async with self.semaphore:  # 控制并发数
                async with session.get(url, timeout=self.timeout) as response:
                    if response.status == 200:
                        content = await response.text()
                        return {
                            'url': url,
                            'status': response.status,
                            'content': content,
                            'timestamp': time.time()
                        }
                    else:
                        print(f"Failed to fetch {url}: status {response.status}")
                        return None
        except Exception as e:
            print(f"Error fetching {url}: {e}")
            return None
            
    async def parse_page(self, page_data):
        """解析页面内容"""
        if not page_data:
            return None
            
        soup = BeautifulSoup(page_data['content'], 'html.parser')
        
        # 提取标题
        title = soup.find('title')
        title_text = title.get_text().strip() if title else ''
        
        # 提取所有链接
        links = []
        for link in soup.find_all('a', href=True):
            absolute_url = urljoin(page_data['url'], link['href'])
            links.append(absolute_url)
            
        return {
            'url': page_data['url'],
            'title': title_text,
            'link_count': len(links),
            'links': links[:10],  # 只保留前10个链接
            'timestamp': page_data['timestamp']
        }
        
    async def crawl_urls(self, urls):
        """爬取多个URL"""
        async with aiohttp.ClientSession() as session:
            # 并发获取所有页面
            fetch_tasks = [self.fetch_page(session, url) for url in urls]
            pages = await asyncio.gather(*fetch_tasks)
            
            # 解析页面内容
            parse_tasks = [self.parse_page(page) for page in pages if page]
            results = await asyncio.gather(*parse_tasks)
            
            return [r for r in results if r]

# 使用示例
async def example_crawler():
    urls = [
        'https://httpbin.org/delay/1',
        'https://httpbin.org/delay/2',
        'https://httpbin.org/delay/1',
        'https://httpbin.org/delay/3'
    ]
    
    crawler = AsyncWebCrawler(max_concurrent=5)
    start_time = time.time()
    
    results = await crawler.crawl_urls(urls)
    
    end_time = time.time()
    print(f"爬取完成,耗时: {end_time - start_time:.2f}秒")
    print(f"成功获取 {len(results)} 个页面")
    
    for result in results:
        print(f"URL: {result['url']}")
        print(f"标题: {result['title']}")
        print(f"链接数: {result['link_count']}")
        print("-" * 50)

# asyncio.run(example_crawler())

异步爬虫优化技巧

为了进一步提升爬虫性能,我们可以采用多种优化策略:

import asyncio
import aiohttp
from collections import deque
import time

class OptimizedAsyncCrawler:
    def __init__(self, max_concurrent=20, rate_limit=10):
        self.max_concurrent = max_concurrent
        self.rate_limit = rate_limit  # 每秒请求数限制
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.request_queue = deque()
        self.session = None
        
    async def __aenter__(self):
        self.session = aiohttp.ClientSession(
            timeout=aiohttp.ClientTimeout(total=10),
            headers={
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            }
        )
        return self
        
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
            
    async def rate_limited_request(self, url):
        """限速请求"""
        # 确保不超过速率限制
        now = time.time()
        while len(self.request_queue) >= self.rate_limit:
            oldest = self.request_queue.popleft()
            sleep_time = 1.0 - (now - oldest)
            if sleep_time > 0:
                await asyncio.sleep(sleep_time)
                
        async with self.semaphore:
            try:
                async with self.session.get(url) as response:
                    content = await response.text()
                    self.request_queue.append(time.time())
                    return {
                        'url': url,
                        'status': response.status,
                        'content': content
                    }
            except Exception as e:
                print(f"Request failed for {url}: {e}")
                return None
                
    async def crawl_with_retry(self, urls, max_retries=3):
        """带重试机制的爬取"""
        results = []
        
        async def fetch_with_retry(url, retries=0):
            try:
                result = await self.rate_limited_request(url)
                if result and result['status'] == 200:
                    return result
                elif retries < max_retries:
                    print(f"Retrying {url} (attempt {retries + 1})")
                    await asyncio.sleep(2 ** retries)  # 指数退避
                    return await fetch_with_retry(url, retries + 1)
                else:
                    print(f"Failed to fetch {url} after {max_retries} attempts")
                    return None
            except Exception as e:
                print(f"Error fetching {url}: {e}")
                if retries < max_retries:
                    await asyncio.sleep(2 ** retries)
                    return await fetch_with_retry(url, retries + 1)
                return None
                
        tasks = [fetch_with_retry(url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 过滤掉异常结果
        return [r for r in results if isinstance(r, dict) and r]

# 使用示例
async def optimized_crawler_example():
    urls = [
        'https://httpbin.org/delay/1',
        'https://httpbin.org/status/200',
        'https://httpbin.org/status/404'
    ]
    
    async with OptimizedAsyncCrawler(max_concurrent=5, rate_limit=2) as crawler:
        results = await crawler.crawl_with_retry(urls)
        print(f"成功获取 {len(results)} 个页面")
        
        for result in results:
            print(f"URL: {result['url']}, Status: {result['status']}")

# asyncio.run(optimized_crawler_example())

多线程与异步协同处理

异步与同步任务混合处理

在实际应用中,我们经常需要同时处理异步和同步任务。Python提供了多种方式来实现这种混合处理:

import asyncio
import concurrent.futures
import threading
import time
from typing import List

class MixedAsyncSyncProcessor:
    def __init__(self, max_workers=4):
        self.max_workers = max_workers
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
        
    async def run_sync_task(self, sync_function, *args, **kwargs):
        """在异步环境中执行同步任务"""
        loop = asyncio.get_event_loop()
        # 在线程池中执行同步函数
        result = await loop.run_in_executor(
            self.executor, 
            sync_function, 
            *args, 
            **kwargs
        )
        return result
        
    def cpu_intensive_task(self, data):
        """CPU密集型任务"""
        # 模拟CPU密集型计算
        total = sum(i * i for i in range(data))
        time.sleep(0.1)  # 模拟阻塞操作
        return total
        
    async def process_mixed_tasks(self, data_list: List[int]):
        """处理混合类型的任务"""
        tasks = []
        
        for data in data_list:
            # 异步任务:模拟网络请求
            async def async_task():
                await asyncio.sleep(0.1)
                return f"Async result for {data}"
                
            # 同步任务:在线程池中执行
            sync_task = self.run_sync_task(self.cpu_intensive_task, data)
            
            tasks.append(async_task())
            tasks.append(sync_task)
            
        # 并发执行所有任务
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results
        
    async def batch_process_with_concurrent_limit(self, data_list: List[int], batch_size=5):
        """批量处理,控制并发数量"""
        semaphore = asyncio.Semaphore(batch_size)
        
        async def limited_task(data):
            async with semaphore:
                # 模拟异步任务
                await asyncio.sleep(0.1)
                # 同步任务
                result = await self.run_sync_task(self.cpu_intensive_task, data)
                return f"Processed {data}: {result}"
                
        tasks = [limited_task(data) for data in data_list]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results

# 使用示例
async def mixed_processor_example():
    processor = MixedAsyncSyncProcessor(max_workers=3)
    
    # 测试数据
    test_data = [1000, 2000, 3000, 4000, 5000]
    
    print("开始混合处理任务...")
    start_time = time.time()
    
    results = await processor.process_mixed_tasks(test_data)
    
    end_time = time.time()
    print(f"处理完成,耗时: {end_time - start_time:.2f}秒")
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i}: {result}")

# asyncio.run(mixed_processor_example())

线程池与异步事件循环的协调

在处理I/O密集型任务时,合理使用线程池可以有效提高性能:

import asyncio
import aiohttp
import concurrent.futures
from typing import List, Dict
import time

class AsyncHttpClientWithThreadPool:
    def __init__(self, max_workers=10, timeout=30):
        self.max_workers = max_workers
        self.timeout = timeout
        self.session = None
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=max_workers)
        
    async def __aenter__(self):
        self.session = aiohttp.ClientSession(
            timeout=aiohttp.ClientTimeout(total=self.timeout),
            connector=aiohttp.TCPConnector(limit=100, limit_per_host=30)
        )
        return self
        
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
        self.executor.shutdown(wait=True)
        
    async def fetch_with_thread_pool(self, url: str, data: Dict = None) -> Dict:
        """使用线程池处理阻塞操作"""
        loop = asyncio.get_event_loop()
        
        # 在异步环境中执行阻塞操作
        if data:
            # 模拟数据处理
            processed_data = await loop.run_in_executor(
                self.executor,
                self._process_data_sync,
                data
            )
            
            # 发送POST请求
            async with self.session.post(url, json=processed_data) as response:
                content = await response.text()
                return {
                    'url': url,
                    'status': response.status,
                    'content': content,
                    'processed_data': processed_data
                }
        else:
            # GET请求
            async with self.session.get(url) as response:
                content = await response.text()
                return {
                    'url': url,
                    'status': response.status,
                    'content': content
                }
                
    def _process_data_sync(self, data: Dict) -> Dict:
        """同步数据处理函数"""
        # 模拟耗时的数据处理
        time.sleep(0.05)
        processed = {
            'processed_at': time.time(),
            'original_data': data,
            'processed': True
        }
        return processed
        
    async def batch_fetch(self, urls: List[str], data_list: List[Dict] = None) -> List[Dict]:
        """批量获取数据"""
        if data_list is None:
            data_list = [None] * len(urls)
            
        tasks = [
            self.fetch_with_thread_pool(url, data) 
            for url, data in zip(urls, data_list)
        ]
        
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return [r for r in results if not isinstance(r, Exception)]

# 使用示例
async def http_client_example():
    urls = [
        'https://httpbin.org/get',
        'https://httpbin.org/post'
    ]
    
    data_list = [
        {'name': 'test1', 'value': 100},
        {'name': 'test2', 'value': 200}
    ]
    
    async with AsyncHttpClientWithThreadPool(max_workers=5) as client:
        start_time = time.time()
        results = await client.batch_fetch(urls, data_list)
        end_time = time.time()
        
        print(f"批量请求完成,耗时: {end_time - start_time:.2f}秒")
        for result in results:
            print(f"URL: {result['url']}, Status: {result['status']}")

# asyncio.run(http_client_example())

性能优化与最佳实践

异步编程性能监控

import asyncio
import time
from functools import wraps
import logging

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def async_timer(func):
    """异步函数执行时间装饰器"""
    @wraps(func)
    async def wrapper(*args, **kwargs):
        start_time = time.time()
        try:
            result = await func(*args, **kwargs)
            end_time = time.time()
            logger.info(f"{func.__name__} executed in {end_time - start_time:.4f}s")
            return result
        except Exception as e:
            end_time = time.time()
            logger.error(f"{func.__name__} failed after {end_time - start_time:.4f}s: {e}")
            raise
    return wrapper

class PerformanceMonitor:
    def __init__(self):
        self.metrics = {
            'total_requests': 0,
            'total_time': 0,
            'success_count': 0,
            'error_count': 0,
            'avg_response_time': 0
        }
        
    @async_timer
    async def monitored_async_function(self, delay: float):
        """被监控的异步函数"""
        await asyncio.sleep(delay)
        return f"Completed after {delay}s"
        
    async def monitor_batch_operations(self, operations: List[callable], batch_size: int = 10):
        """监控批量操作"""
        start_time = time.time()
        
        # 分批处理
        for i in range(0, len(operations), batch_size):
            batch = operations[i:i + batch_size]
            tasks = [op() for op in batch]
            
            try:
                results = await asyncio.gather(*tasks, return_exceptions=True)
                self.metrics['total_requests'] += len(results)
                
                # 统计成功和失败
                success_count = sum(1 for r in results if not isinstance(r, Exception))
                error_count = len(results) - success_count
                
                self.metrics['success_count'] += success_count
                self.metrics['error_count'] += error_count
                
            except Exception as e:
                logger.error(f"Batch processing failed: {e}")
                self.metrics['error_count'] += len(batch)
                
        end_time = time.time()
        total_time = end_time - start_time
        
        self.metrics['total_time'] = total_time
        self.metrics['avg_response_time'] = total_time / max(self.metrics['total_requests'], 1)
        
        logger.info(f"Batch monitoring completed: {self.metrics}")

# 使用示例
async def performance_monitor_example():
    monitor = PerformanceMonitor()
    
    # 创建一些异步操作
    async def operation(delay):
        await asyncio.sleep(delay)
        return f"Operation with delay {delay}"
        
    operations = [operation(i * 0.1) for i in range(1, 21)]
    
    await monitor.monitor_batch_operations(operations, batch_size=5)

# asyncio.run(performance_monitor_example())

资源管理和连接池优化

import asyncio
import aiohttp
from typing import Optional
import weakref

class OptimizedAsyncClient:
    """优化的异步HTTP客户端"""
    
    def __init__(self, 
                 max_concurrent: int = 100,
                 timeout: int = 30,
                 keep_alive: bool = True):
        self.max_concurrent = max_concurrent
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.keep_alive = keep_alive
        
        # 连接池配置
        self.connector = aiohttp.TCPConnector(
            limit=max_concurrent,
            limit_per_host=30,  # 每个主机的最大连接数
            ttl_dns_cache=300,   # DNS缓存时间
            use_dns_cache=True,
            ssl=False  # 根据需要设置SSL
        )
        
        self.session: Optional[aiohttp.ClientSession] = None
        
    async def __aenter__(self):
        if not self.session:
            self.session = aiohttp.ClientSession(
                timeout=self.timeout,
                connector=self.connector,
                headers={
                    'User-Agent': 'Python-Async-Client/1.0',
                    'Accept': 'application/json'
                }
            )
        return self
        
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
            
    async def fetch(self, url: str, method: str = 'GET', **kwargs) -> aiohttp.ClientResponse:
        """获取单个资源"""
        if not self.session:
            raise RuntimeError("Client not initialized")
            
        try:
            response = await self.session.request(
                method=method,
                url=url,
                **kwargs
            )
            return response
        except Exception as e:
            logger.error(f"Request failed for {url}: {e}")
            raise
            
    async def fetch_batch(self, urls: list, method: str = 'GET', **kwargs) -> list:
        """批量获取资源"""
        if not self.session:
            raise RuntimeError("Client not initialized")
            
        tasks = [
            self.fetch(url, method=method, **kwargs)
            for url in urls
        ]
        
        try:
            responses = await asyncio.gather(*tasks, return_exceptions=True)
            return responses
        except Exception as e:
            logger.error(f"Batch fetch failed: {e}")
            raise
            
    async def fetch_with_retry(self, url: str, max_retries: int = 3, **kwargs) -> dict:
        """带重试机制的获取"""
        for attempt in range(max_retries + 1):
            try:
                response = await self.fetch(url, **kwargs)
                content = await response.text()
                
                return {
                    'url': url,
                    'status': response.status,
                    'content': content,
                    'attempt': attempt + 1
                }
                
            except Exception as e:
                if attempt < max_retries:
                    wait_time = 2 ** attempt  # 指数退避
                    logger.warning(f"Attempt {attempt + 1} failed for {url}: {e}. Retrying in {wait_time}s")
                    await asyncio.sleep(wait_time)
                else:
                    logger.error(f"All attempts failed for {url}: {e}")
                    raise

# 使用示例
async def optimized_client_example():
    urls = [
        'https://httpbin.org/delay/1',
        'https://httpbin.org/status/200',
        'https://httpbin.org/status/404'
    ]
    
    async with OptimizedAsyncClient(max_concurrent=20) as client:
        # 单个请求
        response = await client.fetch(urls[0])
        print(f"Single request status: {response.status}")
        
        # 批量请求
        responses = await client.fetch_batch(urls)
        print(f"Batch requests completed: {len(responses)}")
        
        # 带重试的请求
        result = await client.fetch_with_retry(urls[1], max_retries=2)
        print(f"Retry request result: {result['status']}")

# asyncio.run(optimized_client_example())

高级异步编程模式

异步上下文管理器

import asyncio
import aiohttp
from contextlib import asynccontextmanager

class AsyncDatabaseConnection:
    """异步数据库连接管理器"""
    
    def __init__(self, connection_string: str):
        self.connection_string = connection_string
        self.connection = None
        
    async def __aenter__(self):
        # 模拟异步数据库连接
        await asyncio.sleep(0.1)
        self.connection = f"Connected to {self.connection_string}"
        print(f"Database connected: {self.connection}")
        return self
        
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        # 模拟异步数据库断开连接
        await asyncio.sleep(0.1)
        print(f"Database disconnected: {self.connection}")
        self.connection = None
        
    async def execute_query(self, query: str) -> dict:
        """执行查询"""
        await asyncio.sleep(0.05)  # 模拟查询延迟
        return {
            'query': query,
            'result': f"Result for '{query}'",
            'timestamp': time.time()
        }

@asynccontextmanager
async def async_database_manager(connection_string: str):
    """异步数据库管理器上下文"""
    connection = AsyncDatabaseConnection(connection_string)
    try:
        async with connection as conn:
            yield conn
相关推荐
广告位招租

相似文章

    评论 (0)

    0/2000