Python异步编程最佳实践:asyncio、并发控制与高性能网络爬虫开发

Alice744
Alice744 2026-02-01T10:13:15+08:00
0 0 1

引言

在现代Web应用开发中,处理大量并发请求和I/O密集型任务已成为开发者面临的常见挑战。传统的同步编程模型在面对高并发场景时往往显得力不从心,而Python异步编程技术为我们提供了一种更加高效、优雅的解决方案。本文将深入探讨Python异步编程的核心概念,重点介绍asyncio库的使用方法,并结合实际案例演示如何构建高性能的异步网络爬虫和API服务。

什么是异步编程

异步编程的基本概念

异步编程是一种编程范式,它允许程序在等待某些操作完成时继续执行其他任务,而不是阻塞等待。这种模式特别适用于I/O密集型任务,如网络请求、文件读写等操作。

在传统的同步编程中,当一个函数需要等待网络响应时,整个线程都会被阻塞,直到响应返回。而在异步编程中,程序可以在发起请求后立即继续执行其他任务,当响应到达时再处理结果。

异步编程的优势

  1. 高并发性:能够同时处理大量并发请求
  2. 资源效率:减少线程创建和上下文切换的开销
  3. 响应性:应用程序在等待I/O操作时不会阻塞其他任务
  4. 可扩展性:更容易构建大规模的网络应用

asyncio库详解

asyncio基础概念

asyncio是Python标准库中用于编写异步I/O程序的核心模块。它基于事件循环(Event Loop)的概念,提供了一套完整的异步编程基础设施。

import asyncio
import time

# 基本的异步函数定义
async def hello_world():
    print("Hello")
    await asyncio.sleep(1)  # 模拟异步等待
    print("World")

# 运行异步函数
asyncio.run(hello_world())

事件循环(Event Loop)

事件循环是asyncio的核心组件,它负责调度和执行异步任务。每个Python程序只能有一个事件循环实例。

import asyncio

async def main():
    # 获取当前事件循环
    loop = asyncio.get_event_loop()
    
    # 创建任务
    task1 = asyncio.create_task(fetch_data("url1"))
    task2 = asyncio.create_task(fetch_data("url2"))
    
    # 等待所有任务完成
    results = await asyncio.gather(task1, task2)
    return results

async def fetch_data(url):
    print(f"Fetching data from {url}")
    await asyncio.sleep(1)  # 模拟网络请求
    return f"Data from {url}"

# 运行主函数
asyncio.run(main())

异步任务管理

asyncio提供了多种方式来管理异步任务:

import asyncio

async def task_with_delay(name, delay):
    print(f"Task {name} started")
    await asyncio.sleep(delay)
    print(f"Task {name} completed")
    return f"Result from {name}"

async def manage_tasks():
    # 创建多个任务
    tasks = [
        asyncio.create_task(task_with_delay("A", 2)),
        asyncio.create_task(task_with_delay("B", 1)),
        asyncio.create_task(task_with_delay("C", 3))
    ]
    
    # 等待所有任务完成
    results = await asyncio.gather(*tasks)
    print("All tasks completed:", results)
    
    # 或者使用wait方法
    done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
    print(f"Done: {len(done)}, Pending: {len(pending)}")

# 运行任务管理示例
asyncio.run(manage_tasks())

并发控制与资源管理

限制并发数量

在处理大量并发请求时,需要合理控制并发数量以避免资源耗尽。Python提供了多种方式来实现并发控制。

import asyncio
import aiohttp
from typing import List

class ConcurrencyController:
    def __init__(self, max_concurrent: int = 10):
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def fetch_with_limit(self, session: aiohttp.ClientSession, url: str) -> dict:
        # 获取信号量
        async with self.semaphore:
            try:
                async with session.get(url) as response:
                    data = await response.json()
                    return {
                        'url': url,
                        'status': response.status,
                        'data': data
                    }
            except Exception as e:
                return {
                    'url': url,
                    'error': str(e)
                }

async def fetch_multiple_urls(urls: List[str], max_concurrent: int = 5):
    controller = ConcurrencyController(max_concurrent)
    
    async with aiohttp.ClientSession() as session:
        tasks = [controller.fetch_with_limit(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results

# 使用示例
async def main():
    urls = [
        'https://jsonplaceholder.typicode.com/posts/1',
        'https://jsonplaceholder.typicode.com/posts/2',
        'https://jsonplaceholder.typicode.com/posts/3',
        # ... 更多URL
    ]
    
    results = await fetch_multiple_urls(urls, max_concurrent=3)
    for result in results:
        print(result)

# asyncio.run(main())

任务超时控制

合理的超时设置可以避免长时间等待导致的资源浪费:

import asyncio
import aiohttp

async def fetch_with_timeout(session: aiohttp.ClientSession, url: str, timeout: int = 10) -> dict:
    try:
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=timeout)) as response:
            data = await response.json()
            return {
                'url': url,
                'status': response.status,
                'data': data
            }
    except asyncio.TimeoutError:
        return {
            'url': url,
            'error': 'Timeout'
        }
    except Exception as e:
        return {
            'url': url,
            'error': str(e)
        }

async def fetch_with_timeout_control():
    urls = [
        'https://jsonplaceholder.typicode.com/posts/1',
        'https://httpbin.org/delay/2',  # 这个会超时
    ]
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_timeout(session, url, timeout=1) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results

# asyncio.run(fetch_with_timeout_control())

异步数据库操作优化

异步数据库连接池

在异步应用中,合理使用数据库连接池可以显著提升性能:

import asyncio
import asyncpg
from typing import List, Dict

class AsyncDatabaseManager:
    def __init__(self, connection_string: str):
        self.connection_string = connection_string
        self.pool = None
    
    async def create_pool(self, min_size: int = 10, max_size: int = 20):
        """创建连接池"""
        self.pool = await asyncpg.create_pool(
            self.connection_string,
            min_size=min_size,
            max_size=max_size
        )
    
    async def fetch_users(self, limit: int = 100) -> List[Dict]:
        """异步获取用户数据"""
        if not self.pool:
            raise Exception("Database pool not initialized")
        
        async with self.pool.acquire() as connection:
            rows = await connection.fetch('SELECT * FROM users LIMIT $1', limit)
            return [dict(row) for row in rows]
    
    async def insert_users(self, users_data: List[Dict]) -> int:
        """批量插入用户数据"""
        if not self.pool:
            raise Exception("Database pool not initialized")
        
        async with self.pool.acquire() as connection:
            # 使用批量插入提高效率
            query = """
                INSERT INTO users (name, email, created_at) 
                VALUES ($1, $2, $3)
            """
            
            # 准备数据
            data_tuples = [
                (user['name'], user['email'], user['created_at'])
                for user in users_data
            ]
            
            # 批量执行
            result = await connection.executemany(query, data_tuples)
            return len(data_tuples)

# 使用示例
async def database_example():
    db_manager = AsyncDatabaseManager('postgresql://user:password@localhost/db')
    await db_manager.create_pool()
    
    # 获取数据
    users = await db_manager.fetch_users(10)
    print(f"Fetched {len(users)} users")
    
    # 插入数据
    new_users = [
        {'name': f'User{i}', 'email': f'user{i}@example.com', 'created_at': '2023-01-01'}
        for i in range(5)
    ]
    
    inserted_count = await db_manager.insert_users(new_users)
    print(f"Inserted {inserted_count} users")

# asyncio.run(database_example())

异步事务处理

在异步环境中正确处理事务同样重要:

import asyncio
import asyncpg

class AsyncTransactionManager:
    def __init__(self, connection_string: str):
        self.connection_string = connection_string
        self.pool = None
    
    async def create_pool(self):
        self.pool = await asyncpg.create_pool(self.connection_string)
    
    async def transfer_money(self, from_account: int, to_account: int, amount: float) -> bool:
        """异步转账操作"""
        async with self.pool.acquire() as connection:
            try:
                # 开始事务
                async with connection.transaction():
                    # 检查余额
                    balance = await connection.fetchval(
                        'SELECT balance FROM accounts WHERE id = $1',
                        from_account
                    )
                    
                    if balance < amount:
                        raise ValueError("Insufficient funds")
                    
                    # 扣款
                    await connection.execute(
                        'UPDATE accounts SET balance = balance - $1 WHERE id = $2',
                        amount, from_account
                    )
                    
                    # 加款
                    await connection.execute(
                        'UPDATE accounts SET balance = balance + $1 WHERE id = $2',
                        amount, to_account
                    )
                
                return True
            except Exception as e:
                print(f"Transaction failed: {e}")
                return False

# asyncio.run(transfer_money_example())

高性能网络爬虫开发

基础异步爬虫实现

import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time
from urllib.parse import urljoin, urlparse
import logging

class AsyncWebScraper:
    def __init__(self, max_concurrent: int = 10, timeout: int = 30):
        self.max_concurrent = max_concurrent
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.session = None
        
        # 配置日志
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
    
    async def __aenter__(self):
        """异步上下文管理器入口"""
        self.session = aiohttp.ClientSession(
            timeout=self.timeout,
            headers={
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
            }
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """异步上下文管理器出口"""
        if self.session:
            await self.session.close()
    
    async def fetch_page(self, url: str) -> dict:
        """获取单个页面内容"""
        async with self.semaphore:
            try:
                start_time = time.time()
                async with self.session.get(url) as response:
                    content = await response.text()
                    elapsed_time = time.time() - start_time
                    
                    return {
                        'url': url,
                        'status': response.status,
                        'content': content,
                        'elapsed_time': elapsed_time,
                        'timestamp': time.time()
                    }
            except Exception as e:
                self.logger.error(f"Error fetching {url}: {e}")
                return {
                    'url': url,
                    'error': str(e),
                    'timestamp': time.time()
                }
    
    async def parse_page(self, page_data: dict) -> dict:
        """解析页面内容"""
        if 'error' in page_data:
            return page_data
        
        try:
            soup = BeautifulSoup(page_data['content'], 'html.parser')
            
            # 提取标题
            title = soup.find('title')
            title_text = title.get_text().strip() if title else ''
            
            # 提取所有链接
            links = []
            for link in soup.find_all('a', href=True):
                absolute_url = urljoin(page_data['url'], link['href'])
                links.append({
                    'text': link.get_text().strip(),
                    'url': absolute_url
                })
            
            return {
                **page_data,
                'title': title_text,
                'links_count': len(links),
                'links': links[:10]  # 只保留前10个链接
            }
        except Exception as e:
            self.logger.error(f"Error parsing {page_data['url']}: {e}")
            return {
                **page_data,
                'error': f'Parse error: {e}'
            }

# 使用示例
async def basic_crawler_example():
    urls = [
        'https://httpbin.org/delay/1',
        'https://httpbin.org/delay/2',
        'https://httpbin.org/delay/1',
    ]
    
    async with AsyncWebScraper(max_concurrent=3) as scraper:
        # 并发获取页面
        fetch_tasks = [scraper.fetch_page(url) for url in urls]
        page_data_list = await asyncio.gather(*fetch_tasks)
        
        # 解析页面
        parse_tasks = [scraper.parse_page(data) for data in page_data_list]
        parsed_results = await asyncio.gather(*parse_tasks)
        
        for result in parsed_results:
            print(f"URL: {result['url']}")
            print(f"Title: {result.get('title', 'N/A')}")
            print(f"Links: {result.get('links_count', 0)}")
            print("-" * 50)

# asyncio.run(basic_crawler_example())

高级爬虫功能实现

import asyncio
import aiohttp
from bs4 import BeautifulSoup
import time
from urllib.parse import urljoin, urlparse
from dataclasses import dataclass
from typing import List, Set, Optional
import json

@dataclass
class CrawlResult:
    url: str
    title: str
    content: str
    links: List[str]
    status_code: int
    crawl_time: float
    error: Optional[str] = None

class AdvancedAsyncScraper:
    def __init__(self, 
                 max_concurrent: int = 10,
                 timeout: int = 30,
                 delay: float = 0.1,
                 respect_robots: bool = True):
        self.max_concurrent = max_concurrent
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.delay = delay
        self.respect_robots = respect_robots
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.session = None
        self.visited_urls: Set[str] = set()
        self.crawled_count = 0
        
    async def __aenter__(self):
        self.session = aiohttp.ClientSession(
            timeout=self.timeout,
            headers={
                'User-Agent': ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                            'AppleWebKit/537.36 (KHTML, like Gecko) '
                            'Chrome/91.0.4472.124 Safari/537.36')
            }
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
    
    async def fetch_page(self, url: str) -> CrawlResult:
        """获取并解析单个页面"""
        async with self.semaphore:
            try:
                # 避免重复爬取
                if url in self.visited_urls:
                    return CrawlResult(
                        url=url,
                        title='',
                        content='',
                        links=[],
                        status_code=200,
                        crawl_time=0,
                        error='Already visited'
                    )
                
                self.visited_urls.add(url)
                start_time = time.time()
                
                await asyncio.sleep(self.delay)  # 避免过于频繁的请求
                
                async with self.session.get(url) as response:
                    content = await response.text()
                    elapsed_time = time.time() - start_time
                    
                    # 解析内容
                    soup = BeautifulSoup(content, 'html.parser')
                    
                    # 提取标题
                    title = soup.find('title')
                    title_text = title.get_text().strip() if title else ''
                    
                    # 提取文本内容
                    text_content = ' '.join([
                        p.get_text().strip() 
                        for p in soup.find_all('p')
                    ])
                    
                    # 提取链接
                    links = []
                    for link in soup.find_all('a', href=True):
                        absolute_url = urljoin(url, link['href'])
                        links.append(absolute_url)
                    
                    self.crawled_count += 1
                    
                    return CrawlResult(
                        url=url,
                        title=title_text,
                        content=text_content[:500],  # 限制内容长度
                        links=links,
                        status_code=response.status,
                        crawl_time=elapsed_time
                    )
                    
            except Exception as e:
                return CrawlResult(
                    url=url,
                    title='',
                    content='',
                    links=[],
                    status_code=0,
                    crawl_time=0,
                    error=str(e)
                )
    
    async def crawl_urls(self, urls: List[str], max_depth: int = 1) -> List[CrawlResult]:
        """爬取多个URL"""
        results = []
        
        # 第一层爬取
        tasks = [self.fetch_page(url) for url in urls]
        batch_results = await asyncio.gather(*tasks)
        results.extend(batch_results)
        
        # 如果需要更深的爬取,可以继续提取链接进行递归爬取
        if max_depth > 1:
            all_links = set()
            for result in batch_results:
                if not result.error and result.links:
                    all_links.update(result.links)
            
            # 过滤掉已经访问过的链接
            new_urls = [url for url in all_links if url not in self.visited_urls]
            if new_urls:
                print(f"Found {len(new_urls)} new URLs to crawl")
                depth_tasks = [self.fetch_page(url) for url in new_urls[:10]]  # 限制数量
                depth_results = await asyncio.gather(*depth_tasks)
                results.extend(depth_results)
        
        return results
    
    def save_results(self, results: List[CrawlResult], filename: str):
        """保存爬取结果到文件"""
        data = []
        for result in results:
            data.append({
                'url': result.url,
                'title': result.title,
                'content': result.content,
                'links_count': len(result.links),
                'status_code': result.status_code,
                'crawl_time': result.crawl_time,
                'error': result.error
            })
        
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        
        print(f"Saved {len(results)} results to {filename}")

# 使用示例
async def advanced_crawler_example():
    urls = [
        'https://httpbin.org/html',
        'https://httpbin.org/links/10/0',
    ]
    
    async with AdvancedAsyncScraper(max_concurrent=5, delay=0.5) as scraper:
        results = await scraper.crawl_urls(urls, max_depth=1)
        
        # 显示结果
        for result in results:
            if not result.error:
                print(f"✓ {result.url}")
                print(f"  Title: {result.title[:50]}...")
                print(f"  Links: {result.links_count}")
                print(f"  Time: {result.crawl_time:.2f}s")
            else:
                print(f"✗ {result.url} - Error: {result.error}")
            print()
        
        # 保存结果
        scraper.save_results(results, 'crawl_results.json')

# asyncio.run(advanced_crawler_example())

性能优化技巧

连接池优化

import asyncio
import aiohttp
from typing import Optional

class OptimizedAsyncClient:
    def __init__(self, 
                 max_concurrent: int = 100,
                 timeout: int = 30,
                 connection_timeout: int = 5):
        self.session: Optional[aiohttp.ClientSession] = None
        self.max_concurrent = max_concurrent
        self.timeout = timeout
        self.connection_timeout = connection_timeout
        
    async def __aenter__(self):
        """创建优化的会话"""
        connector = aiohttp.TCPConnector(
            limit=self.max_concurrent,  # 连接池大小
            limit_per_host=30,         # 每个主机的最大连接数
            ttl_dns_cache=300,         # DNS缓存时间
            use_dns_cache=True,        # 启用DNS缓存
            force_close=True,          # 强制关闭连接
        )
        
        self.session = aiohttp.ClientSession(
            connector=connector,
            timeout=aiohttp.ClientTimeout(
                total=self.timeout,
                connect=self.connection_timeout
            ),
            headers={
                'User-Agent': ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                            'AppleWebKit/537.36 (KHTML, like Gecko) '
                            'Chrome/91.0.4472.124 Safari/537.36')
            }
        )
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """清理资源"""
        if self.session:
            await self.session.close()
    
    async def fetch(self, url: str) -> dict:
        """优化的单次请求"""
        try:
            async with self.session.get(url) as response:
                content = await response.text()
                return {
                    'url': url,
                    'status': response.status,
                    'content': content,
                    'headers': dict(response.headers)
                }
        except Exception as e:
            return {
                'url': url,
                'error': str(e),
                'status': 0
            }

# 性能测试示例
async def performance_test():
    urls = [
        f'https://httpbin.org/delay/{i%3+1}' 
        for i in range(50)
    ]
    
    start_time = time.time()
    
    async with OptimizedAsyncClient(max_concurrent=20) as client:
        tasks = [client.fetch(url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
    
    end_time = time.time()
    print(f"Processed {len(results)} requests in {end_time - start_time:.2f} seconds")

缓存机制实现

import asyncio
import aiohttp
import hashlib
import time
from typing import Dict, Optional, Any

class CachedAsyncClient:
    def __init__(self, 
                 cache_ttl: int = 300,  # 5分钟缓存
                 max_concurrent: int = 10):
        self.cache_ttl = cache_ttl
        self.cache: Dict[str, tuple] = {}
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.session = None
    
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
    
    def _get_cache_key(self, url: str, params: Optional[Dict] = None) -> str:
        """生成缓存键"""
        key_string = f"{url}:{params}" if params else url
        return hashlib.md5(key_string.encode()).hexdigest()
    
    def _is_expired(self, timestamp: float) -> bool:
        """检查缓存是否过期"""
        return time.time() - timestamp > self.cache_ttl
    
    async def fetch(self, url: str, params: Optional[Dict] = None) -> dict:
        """带缓存的异步请求"""
        cache_key = self._get_cache_key(url, params)
        
        # 检查缓存
        if cache_key in self.cache:
            cached_data, timestamp = self.cache[cache_key]
            if not self._is_expired(timestamp):
                print(f"Cache hit for {url}")
                return cached_data
        
        # 缓存未命中,发起请求
        async with self.semaphore:
            try:
                print(f"Fetching {url}")
                async with self.session.get(url, params=params) as response:
                    content = await response.text()
                    result = {
                        'url': url,
                        'status': response.status,
                        'content': content,
                        'cached': False
                    }
                    
                    # 缓存结果
                    self.cache[cache_key] = (result, time.time())
                    return result
                    
            except Exception as e:
                error_result = {
                    'url': url,
                    'error': str(e),
                    'cached': False
                }
                return error_result
    
    def clear_cache(self):
        """清空缓存"""
        self.cache.clear()

# 使用示例
async def cached_crawler_example():
    urls = [
        'https://httpbin.org/delay/1',
        'https://httpbin.org/delay/1',  # 相同URL,应该使用缓存
    ]
    
    async with CachedAsyncClient(cache_ttl=60) as client:
        results = []
        for url in urls:
            result = await client.fetch(url)
            results.append(result)
        
        print("Results:", len(results))
        for result in results:
            print(f"URL: {result['url']}, Status: {result.get('status', 'Error')}")

错误处理与监控

完善的错误处理机制

import asyncio
import aiohttp
import logging
from typing import List, Dict, Any
import time

class RobustAsyncClient:
    def __init__(self, 
                 max_retries: int = 3,
                 backoff_factor: float = 1.0,
                 timeout: int = 30):
        self.max_retries = max_retries
        self.backoff_factor = backoff_factor
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.session = None
        self.logger = logging.getLogger(__name__)
    
    async def __aenter__(self):
        self.session = aiohttp.ClientSession(timeout=self.timeout)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if self.session:
            await self.session.close()
    
    async def fetch_with_retry(self, 
                              url: str, 
                              method: str = 'GET',
                              **kwargs) -> Dict[str, Any]:
        """带重试机制的异步请求"""
        last_exception = None
        
       
相关推荐
广告位招租

相似文章

    评论 (0)

    0/2000