Python机器学习项目实战:从数据预处理到模型部署的全流程详解

ThickSam
ThickSam 2026-01-31T21:01:15+08:00
0 0 2

引言

在当今的数据驱动时代,机器学习技术正在成为企业决策和产品开发的核心工具。Python作为机器学习领域最主流的编程语言,凭借其丰富的生态库和易用性,成为了开发者构建AI应用的首选。本文将通过一个完整的机器学习项目实战,从数据预处理到模型部署的全流程进行详细讲解,帮助读者掌握机器学习项目的完整技术栈和实践方法。

项目概述

我们将以一个经典的房价预测问题为例,构建一个完整的机器学习项目。该项目将涵盖数据获取、数据清洗、特征工程、模型训练、评估验证以及生产环境部署等所有关键环节。通过这个实战项目,读者可以深入了解机器学习项目的完整开发流程,并掌握相关的最佳实践。

项目目标

  • 预测房价,构建一个准确的回归模型
  • 掌握机器学习项目开发的完整流程
  • 学习数据科学项目中的关键技术和工具

环境准备与依赖安装

在开始项目之前,我们需要搭建合适的开发环境。以下是项目所需的Python库和版本信息:

# 创建虚拟环境
python -m venv ml_project_env
source ml_project_env/bin/activate  # Linux/Mac
# 或 ml_project_env\Scripts\activate  # Windows

# 安装必要的库
pip install pandas numpy scikit-learn matplotlib seaborn jupyter
pip install xgboost lightgbm catboost
pip install flask gunicorn
pip install joblib
# 验证安装的库版本
import pandas as pd
import numpy as np
import sklearn
import matplotlib.pyplot as plt
import seaborn as sns

print(f"pandas version: {pd.__version__}")
print(f"numpy version: {np.__version__}")
print(f"scikit-learn version: {sklearn.__version__}")

数据获取与探索性数据分析

1. 数据集介绍

我们将使用著名的波士顿房价数据集(Boston Housing Dataset)作为示例。这是一个经典的回归问题数据集,包含506个样本和13个特征。

# 导入必要的库
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.datasets import load_boston
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score
import warnings
warnings.filterwarnings('ignore')

# 设置图形显示样式
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

# 加载数据集
boston = load_boston()
X = pd.DataFrame(boston.data, columns=boston.feature_names)
y = pd.Series(boston.target, name='PRICE')

# 创建完整的数据框
df = pd.concat([X, y], axis=1)

print("数据集基本信息:")
print(f"数据形状: {df.shape}")
print(f"列名: {list(df.columns)}")
print("\n前5行数据:")
print(df.head())

2. 数据基本统计信息

# 基本统计信息
print("数据集描述性统计:")
print(df.describe())

# 检查缺失值
print("\n缺失值检查:")
print(df.isnull().sum())

# 检查数据类型
print("\n数据类型:")
print(df.dtypes)

3. 数据可视化分析

# 目标变量分布
plt.figure(figsize=(12, 4))

plt.subplot(1, 2, 1)
plt.hist(df['PRICE'], bins=30, edgecolor='black')
plt.title('房价分布')
plt.xlabel('房价 (千美元)')
plt.ylabel('频次')

plt.subplot(1, 2, 2)
plt.boxplot(df['PRICE'])
plt.title('房价箱线图')
plt.ylabel('房价 (千美元)')

plt.tight_layout()
plt.show()

# 特征相关性分析
plt.figure(figsize=(12, 10))
correlation_matrix = df.corr()
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0, 
            square=True, linewidths=0.5)
plt.title('特征相关性热力图')
plt.show()

# 与目标变量相关性最高的特征
price_corr = correlation_matrix['PRICE'].abs().sort_values(ascending=False)
print("与房价相关性排序:")
print(price_corr)

数据预处理

1. 数据清洗

# 检查异常值
def detect_outliers(df, columns):
    """检测异常值"""
    outliers = {}
    for col in columns:
        Q1 = df[col].quantile(0.25)
        Q3 = df[col].quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        outliers[col] = df[(df[col] < lower_bound) | (df[col] > upper_bound)]
    return outliers

# 检查数值型特征的异常值
numeric_columns = df.select_dtypes(include=[np.number]).columns.tolist()
outliers = detect_outliers(df, numeric_columns)
print("各特征异常值数量:")
for col, outlier_data in outliers.items():
    if len(outlier_data) > 0:
        print(f"{col}: {len(outlier_data)}个异常值")

# 处理缺失值
print(f"\n缺失值总数: {df.isnull().sum().sum()}")
# 波士顿数据集没有缺失值,但实际项目中需要处理

2. 特征工程

# 创建新特征
def create_features(df):
    """创建新的特征"""
    df_new = df.copy()
    
    # 房间数量相关特征
    df_new['RM_LSTAT_RATIO'] = df_new['RM'] / (df_new['LSTAT'] + 1e-8)
    df_new['RM_AGE_RATIO'] = df_new['RM'] / (df_new['AGE'] + 1e-8)
    
    # 房价密度特征
    df_new['PRICE_PER_ROOM'] = df_new['PRICE'] / df_new['RM']
    
    # 环境特征组合
    df_new['DIS_LSTAT'] = df_new['DIS'] * df_new['LSTAT']
    df_new['NOX_DIS'] = df_new['NOX'] * df_new['DIS']
    
    return df_new

# 应用特征工程
df_processed = create_features(df)
print("处理后的数据形状:", df_processed.shape)
print("新增特征列:", [col for col in df_processed.columns if col not in df.columns])

3. 数据标准化

# 分离特征和目标变量
X = df_processed.drop('PRICE', axis=1)
y = df_processed['PRICE']

# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

print(f"训练集大小: {X_train.shape}")
print(f"测试集大小: {X_test.shape}")

# 特征标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# 转换回DataFrame便于后续操作
X_train_scaled_df = pd.DataFrame(X_train_scaled, columns=X.columns)
X_test_scaled_df = pd.DataFrame(X_test_scaled, columns=X.columns)

print("标准化后的数据统计:")
print(X_train_scaled_df.describe())

模型选择与训练

1. 多模型对比

# 定义多个模型
models = {
    'Linear Regression': LinearRegression(),
    'Random Forest': RandomForestRegressor(n_estimators=100, random_state=42),
}

# 训练和评估模型
model_results = {}

for name, model in models.items():
    print(f"\n训练 {name} 模型...")
    
    # 训练模型
    if name == 'Linear Regression':
        model.fit(X_train_scaled, y_train)
        y_pred = model.predict(X_test_scaled)
    else:
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
    
    # 计算评估指标
    mse = mean_squared_error(y_test, y_pred)
    rmse = np.sqrt(mse)
    r2 = r2_score(y_test, y_pred)
    
    model_results[name] = {
        'MSE': mse,
        'RMSE': rmse,
        'R2': r2,
        'model': model
    }
    
    print(f"MSE: {mse:.4f}")
    print(f"RMSE: {rmse:.4f}")
    print(f"R²: {r2:.4f}")

# 显示模型对比结果
results_df = pd.DataFrame(model_results).T
print("\n模型性能对比:")
print(results_df[['RMSE', 'R2']])

2. 高级模型训练

from sklearn.ensemble import GradientBoostingRegressor
from xgboost import XGBRegressor
from lightgbm import LGBMRegressor

# 使用集成学习方法
advanced_models = {
    'Gradient Boosting': GradientBoostingRegressor(n_estimators=100, random_state=42),
    'XGBoost': XGBRegressor(random_state=42),
    'LightGBM': LGBMRegressor(random_state=42)
}

# 训练高级模型
advanced_results = {}

for name, model in advanced_models.items():
    print(f"\n训练 {name} 模型...")
    
    if name in ['XGBoost', 'LightGBM']:
        # 这些模型不需要标准化
        model.fit(X_train, y_train)
        y_pred = model.predict(X_test)
    else:
        model.fit(X_train_scaled, y_train)
        y_pred = model.predict(X_test_scaled)
    
    # 计算评估指标
    mse = mean_squared_error(y_test, y_pred)
    rmse = np.sqrt(mse)
    r2 = r2_score(y_test, y_pred)
    
    advanced_results[name] = {
        'MSE': mse,
        'RMSE': rmse,
        'R2': r2,
        'model': model
    }
    
    print(f"MSE: {mse:.4f}")
    print(f"RMSE: {rmse:.4f}")
    print(f"R²: {r2:.4f}")

# 合并结果
all_results = {**model_results, **advanced_results}
best_model_name = min(advanced_results.keys(), key=lambda x: advanced_results[x]['RMSE'])
print(f"\n最佳模型: {best_model_name}")

模型评估与优化

1. 交叉验证

from sklearn.model_selection import cross_val_score, GridSearchCV

# 使用交叉验证评估模型稳定性
def evaluate_with_cv(model, X, y, cv=5):
    """使用交叉验证评估模型"""
    cv_scores = cross_val_score(model, X, y, cv=cv, scoring='neg_mean_squared_error')
    rmse_scores = np.sqrt(-cv_scores)
    
    print(f"交叉验证 RMSE: {rmse_scores}")
    print(f"平均 RMSE: {rmse_scores.mean():.4f} (+/- {rmse_scores.std() * 2:.4f})")
    
    return rmse_scores

# 对最佳模型进行交叉验证
best_model = advanced_results[best_model_name]['model']
if best_model_name in ['XGBoost', 'LightGBM']:
    cv_scores = evaluate_with_cv(best_model, X_train, y_train)
else:
    cv_scores = evaluate_with_cv(best_model, X_train_scaled, y_train)

2. 超参数调优

# 为XGBoost进行超参数调优
def optimize_xgboost(X_train, y_train):
    """优化XGBoost模型参数"""
    
    # 定义参数网格
    param_grid = {
        'n_estimators': [50, 100, 200],
        'max_depth': [3, 5, 7, 9],
        'learning_rate': [0.01, 0.1, 0.2],
        'subsample': [0.8, 1.0]
    }
    
    # 创建网格搜索对象
    xgb = XGBRegressor(random_state=42)
    grid_search = GridSearchCV(
        estimator=xgb,
        param_grid=param_grid,
        cv=3,
        scoring='neg_mean_squared_error',
        n_jobs=-1,
        verbose=1
    )
    
    # 执行网格搜索
    print("开始超参数调优...")
    grid_search.fit(X_train, y_train)
    
    print(f"最佳参数: {grid_search.best_params_}")
    print(f"最佳交叉验证分数: {-grid_search.best_score_:.4f}")
    
    return grid_search.best_estimator_

# 如果使用XGBoost进行优化
if best_model_name == 'XGBoost':
    optimized_model = optimize_xgboost(X_train, y_train)
else:
    optimized_model = advanced_results[best_model_name]['model']

3. 模型性能可视化

# 预测结果可视化
def plot_predictions(y_true, y_pred, model_name):
    """绘制预测结果对比图"""
    
    plt.figure(figsize=(12, 5))
    
    # 预测值 vs 真实值
    plt.subplot(1, 2, 1)
    plt.scatter(y_true, y_pred, alpha=0.6)
    plt.plot([y_true.min(), y_true.max()], [y_true.min(), y_true.max()], 'r--', lw=2)
    plt.xlabel('真实值')
    plt.ylabel('预测值')
    plt.title(f'{model_name} - 预测值 vs 真实值')
    
    # 残差图
    plt.subplot(1, 2, 2)
    residuals = y_true - y_pred
    plt.scatter(y_pred, residuals, alpha=0.6)
    plt.axhline(y=0, color='r', linestyle='--')
    plt.xlabel('预测值')
    plt.ylabel('残差')
    plt.title(f'{model_name} - 残差图')
    
    plt.tight_layout()
    plt.show()

# 绘制最佳模型的预测结果
if best_model_name in ['XGBoost', 'LightGBM']:
    y_pred_best = optimized_model.predict(X_test)
else:
    if best_model_name == 'Linear Regression':
        y_pred_best = best_model.predict(X_test_scaled)
    else:
        y_pred_best = best_model.predict(X_test)

plot_predictions(y_test, y_pred_best, best_model_name)

模型部署准备

1. 模型保存与加载

import joblib

# 保存最佳模型和预处理器
def save_model(model, scaler, model_name):
    """保存模型和预处理器"""
    
    # 保存模型
    joblib.dump(model, f'{model_name}_model.pkl')
    
    # 如果有标准化器,也保存它
    if scaler is not None:
        joblib.dump(scaler, f'{model_name}_scaler.pkl')
    
    print(f"模型已保存为 {model_name}_model.pkl")
    if scaler is not None:
        print(f"标准化器已保存为 {model_name}_scaler.pkl")

# 保存最佳模型
if best_model_name in ['XGBoost', 'LightGBM']:
    save_model(optimized_model, None, best_model_name)
else:
    save_model(best_model, scaler, best_model_name)

2. 特征重要性分析

# 分析特征重要性
def analyze_feature_importance(model, feature_names, model_name):
    """分析特征重要性"""
    
    if hasattr(model, 'feature_importances_'):
        # 对于树模型
        importances = model.feature_importances_
        indices = np.argsort(importances)[::-1]
        
        plt.figure(figsize=(10, 6))
        plt.title(f'{model_name} - 特征重要性')
        plt.bar(range(len(importances)), importances[indices])
        plt.xticks(range(len(importances)), [feature_names[i] for i in indices], rotation=45)
        plt.tight_layout()
        plt.show()
        
        # 显示前10个重要特征
        print("前10个重要特征:")
        for i in range(min(10, len(indices))):
            print(f"{i+1}. {feature_names[indices[i]]}: {importances[indices[i]]:.4f}")
            
    elif hasattr(model, 'coef_'):
        # 对于线性模型
        coefficients = np.abs(model.coef_)
        indices = np.argsort(coefficients)[::-1]
        
        plt.figure(figsize=(10, 6))
        plt.title(f'{model_name} - 特征系数绝对值')
        plt.bar(range(len(coefficients)), coefficients[indices])
        plt.xticks(range(len(coefficients)), [feature_names[i] for i in indices], rotation=45)
        plt.tight_layout()
        plt.show()

# 分析最佳模型的特征重要性
if best_model_name in ['XGBoost', 'LightGBM']:
    analyze_feature_importance(optimized_model, X.columns, best_model_name)
else:
    analyze_feature_importance(best_model, X.columns, best_model_name)

Flask Web应用部署

1. 创建Flask应用

from flask import Flask, request, jsonify, render_template_string
import numpy as np
import joblib
import json

# 创建Flask应用
app = Flask(__name__)

# 加载模型和预处理器
def load_model_and_scaler(model_name):
    """加载训练好的模型"""
    try:
        model = joblib.load(f'{model_name}_model.pkl')
        
        # 尝试加载标准化器(如果存在)
        try:
            scaler = joblib.load(f'{model_name}_scaler.pkl')
        except:
            scaler = None
            
        return model, scaler
    except Exception as e:
        print(f"加载模型失败: {e}")
        return None, None

# 加载最佳模型
best_model, best_scaler = load_model_and_scaler(best_model_name)

@app.route('/')
def home():
    """主页"""
    html_template = """
    <!DOCTYPE html>
    <html>
    <head>
        <title>房价预测系统</title>
        <style>
            body { font-family: Arial, sans-serif; margin: 40px; }
            .container { max-width: 800px; margin: 0 auto; }
            input, select { width: 100%; padding: 10px; margin: 5px 0; }
            button { background-color: #4CAF50; color: white; padding: 12px 20px; border: none; cursor: pointer; }
            button:hover { background-color: #45a049; }
            .result { margin-top: 20px; padding: 15px; background-color: #f0f0f0; }
        </style>
    </head>
    <body>
        <div class="container">
            <h1>房价预测系统</h1>
            <form id="predictionForm">
                <!-- 这里可以添加具体的输入字段 -->
                <button type="submit">预测房价</button>
            </form>
            <div id="result" class="result" style="display: none;"></div>
        </div>
        
        <script>
            document.getElementById('predictionForm').addEventListener('submit', function(e) {
                e.preventDefault();
                
                // 这里添加实际的预测逻辑
                alert('请在实际应用中实现预测逻辑');
            });
        </script>
    </body>
    </html>
    """
    return render_template_string(html_template)

@app.route('/predict', methods=['POST'])
def predict():
    """预测接口"""
    try:
        # 获取输入数据
        data = request.get_json()
        
        # 预处理输入数据
        input_data = np.array(list(data.values())).reshape(1, -1)
        
        # 进行预测
        if best_scaler is not None and best_model_name == 'Linear Regression':
            input_data_scaled = best_scaler.transform(input_data)
            prediction = best_model.predict(input_data_scaled)[0]
        else:
            prediction = best_model.predict(input_data)[0]
        
        return jsonify({
            'success': True,
            'prediction': float(prediction),
            'message': '预测成功'
        })
        
    except Exception as e:
        return jsonify({
            'success': False,
            'error': str(e)
        })

@app.route('/health')
def health():
    """健康检查接口"""
    return jsonify({'status': 'healthy'})

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0', port=5000)

2. 完整的预测API实现

# 创建更完整的预测API
import pandas as pd

class HousePricePredictor:
    """房价预测器类"""
    
    def __init__(self, model_name):
        self.model_name = model_name
        self.model, self.scaler = load_model_and_scaler(model_name)
        self.feature_names = X.columns.tolist()
        
    def predict(self, features_dict):
        """根据特征字典进行预测"""
        try:
            # 转换为DataFrame
            input_df = pd.DataFrame([features_dict])
            
            # 确保所有特征都存在
            for feature in self.feature_names:
                if feature not in input_df.columns:
                    input_df[feature] = 0
            
            # 重新排列列顺序
            input_data = input_df[self.feature_names]
            
            # 进行预测
            if self.scaler is not None and self.model_name == 'Linear Regression':
                input_data_scaled = self.scaler.transform(input_data)
                prediction = self.model.predict(input_data_scaled)[0]
            else:
                prediction = self.model.predict(input_data)[0]
                
            return {
                'prediction': float(prediction),
                'success': True
            }
            
        except Exception as e:
            return {
                'error': str(e),
                'success': False
            }

# 创建预测器实例
predictor = HousePricePredictor(best_model_name)

# 测试预测功能
test_features = {
    'CRIM': 0.1,
    'ZN': 20.0,
    'INDUS': 7.0,
    'CHAS': 0.0,
    'NOX': 0.5,
    'RM': 6.5,
    'AGE': 40.0,
    'DIS': 5.0,
    'RAD': 4.0,
    'TAX': 300.0,
    'PTRATIO': 15.0,
    'B': 390.0,
    'LSTAT': 10.0
}

# 测试预测
result = predictor.predict(test_features)
print("预测结果:", result)

Docker容器化部署

1. 创建Dockerfile

FROM python:3.9-slim

# 设置工作目录
WORKDIR /app

# 复制依赖文件
COPY requirements.txt .

# 安装依赖
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 暴露端口
EXPOSE 5000

# 启动命令
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "app:app"]

2. 创建requirements.txt

Flask==2.3.3
scikit-learn==1.3.0
numpy==1.24.3
pandas==2.0.3
joblib==1.3.2
gunicorn==21.2.0
xgboost==1.7.5
lightgbm==3.3.5

3. 构建和运行Docker容器

# 构建Docker镜像
docker build -t house-price-predictor .

# 运行容器
docker run -p 5000:5000 house-price-predictor

# 在后台运行
docker run -d -p 5000:5000 house-price-predictor

性能监控与日志记录

1. 添加日志功能

import logging
from datetime import datetime

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('model_prediction.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

class PredictiveModel:
    """增强版预测模型类"""
    
    def __init__(self, model_name):
        self.model_name = model_name
        self.model, self.scaler = load_model_and_scaler(model_name)
        self.feature_names = X.columns.tolist()
        self.logger = logger
        
    def predict(self, features_dict):
        """带日志记录的预测方法"""
        try:
            start_time = datetime.now()
            
            # 记录输入数据
            self.logger.info(f"开始预测,输入特征: {features_dict}")
            
            # 预处理和预测
            input_df = pd.DataFrame([features_dict])
            
            for feature in self.feature_names:
                if feature not in input_df.columns:
                    input_df[feature] = 0
            
            input_data = input_df[self.feature_names]
            
            if self.scaler is not None and self.model_name == 'Linear Regression':
                input_data_scaled = self.scaler.transform(input_data)
                prediction = self.model.predict(input_data_scaled)[0]
            else:
                prediction = self.model.predict(input_data)[0]
            
            end_time = datetime.now()
            execution_time = (end_time - start_time).total_seconds()
            
            # 记录预测结果
            self.logger.info(f"预测完成,结果: {prediction}, 耗时: {execution_time:.4f}秒")
            
            return {
                'prediction': float(prediction),
                'success': True,
                'execution_time': execution_time
            }
            
        except Exception as e:
            self.logger.error(f"预测失败: {str(e)}")
            return {
                'error': str(e),
                'success': False
            }

# 使用增强版模型
enhanced_predictor = PredictiveModel(best_model_name)

项目总结与最佳实践

1. 关键技术点回顾

通过这个完整的机器学习项目,我们掌握了以下关键技术点:

# 项目关键步骤总结
project_steps = [
    "数据获取与探索性分析",
    "数据清洗与预处理",
    "特征工程与选择",
    "模型训练与比较",
    "超参数优化",
    "模型评估与验证",
    "模型保存与部署",
    "容器化部署",
    "监控与日志记录"
]

print("项目完整流程:")
for i, step
相关推荐
广告位招租

相似文章

    评论 (0)

    0/2000