Python机器学习项目实战:从数据预处理到模型部署的完整开发流程

SadSnow
SadSnow 2026-02-02T14:03:04+08:00
0 0 1

在当今数据驱动的世界中,机器学习已经成为解决复杂问题的重要工具。本文将通过一个完整的实际案例,演示Python机器学习项目的开发全流程,涵盖从数据预处理到模型部署的各个环节。通过这个实践项目,读者可以掌握机器学习项目开发的核心技能和最佳实践。

1. 项目概述与目标

1.1 项目背景

本项目以预测房价为例,展示完整的机器学习开发流程。房价预测是房地产行业中的重要应用,通过分析房屋的各种特征来预测其市场价值。这个项目将帮助我们理解如何从原始数据开始,经过一系列的数据处理和模型构建步骤,最终部署到生产环境。

1.2 项目目标

  • 数据清洗和预处理
  • 特征工程和特征选择
  • 模型训练和调优
  • 模型评估和验证
  • 模型部署到生产环境

2. 环境准备与数据获取

2.1 环境配置

在开始项目之前,我们需要配置好必要的Python环境。以下是我们将使用的库:

# 导入必要的库
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
from sklearn.impute import SimpleImputer
import warnings
warnings.filterwarnings('ignore')

# 设置中文字体和图形样式
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
sns.set_style("whitegrid")

2.2 数据集介绍

我们将使用经典的波士顿房价数据集(Boston Housing Dataset),这是一个经典的回归问题数据集,包含506个样本和13个特征:

# 加载数据集
from sklearn.datasets import load_boston

# 注意:在较新版本的sklearn中,load_boston已被弃用
# 我们将使用替代方案加载数据
try:
    boston = load_boston()
    X = pd.DataFrame(boston.data, columns=boston.feature_names)
    y = pd.Series(boston.target, name='PRICE')
except:
    # 如果无法加载,创建模拟数据集用于演示
    from sklearn.datasets import make_regression
    X, y = make_regression(n_samples=506, n_features=13, noise=10, random_state=42)
    
print("数据集形状:", X.shape)
print("目标变量形状:", y.shape)
print("\n特征名称:")
print(X.columns.tolist())

3. 数据探索与分析

3.1 数据基本信息检查

# 查看数据基本信息
print("数据集基本信息:")
print(X.info())
print("\n数据统计描述:")
print(X.describe())

# 检查缺失值
print("\n缺失值统计:")
missing_values = X.isnull().sum()
print(missing_values[missing_values > 0])

# 检查目标变量
print("\n目标变量统计:")
print(y.describe())

3.2 数据可视化分析

# 创建数据可视化
fig, axes = plt.subplots(2, 2, figsize=(15, 10))

# 目标变量分布
axes[0,0].hist(y, bins=30, alpha=0.7, color='skyblue')
axes[0,0].set_title('房价分布')
axes[0,0].set_xlabel('价格')

# 特征相关性热力图
correlation_matrix = X.corr()
sns.heatmap(correlation_matrix, annot=False, cmap='coolwarm', ax=axes[0,1])
axes[0,1].set_title('特征相关性矩阵')

# 重要特征与目标变量的关系
important_features = correlation_matrix['PRICE'].abs().sort_values(ascending=False)[1:6]
for i, (idx, value) in enumerate(important_features.items()):
    if i < 2:
        axes[1,i].scatter(X[idx], y, alpha=0.5)
        axes[1,i].set_xlabel(idx)
        axes[1,i].set_ylabel('价格')
        axes[1,i].set_title(f'{idx} vs 价格')

plt.tight_layout()
plt.show()

4. 数据预处理

4.1 处理缺失值

# 检查并处理缺失值
def handle_missing_values(df):
    """处理数据集中的缺失值"""
    missing_info = df.isnull().sum()
    print("缺失值情况:")
    print(missing_info[missing_info > 0])
    
    # 对于数值型特征,使用中位数填充
    numeric_columns = df.select_dtypes(include=[np.number]).columns
    for col in numeric_columns:
        if df[col].isnull().sum() > 0:
            median_value = df[col].median()
            df[col].fillna(median_value, inplace=True)
            print(f"用中位数 {median_value} 填充 {col} 的缺失值")
    
    return df

# 处理缺失值
X_processed = handle_missing_values(X.copy())

4.2 异常值检测与处理

def detect_outliers(df, columns):
    """使用IQR方法检测异常值"""
    outliers_info = {}
    for col in columns:
        Q1 = df[col].quantile(0.25)
        Q3 = df[col].quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        
        outliers = df[(df[col] < lower_bound) | (df[col] > upper_bound)]
        outliers_info[col] = len(outliers)
    
    return outliers_info

# 检测异常值
numeric_features = X_processed.select_dtypes(include=[np.number]).columns
outliers_info = detect_outliers(X_processed, numeric_features)
print("各特征异常值数量:")
for feature, count in outliers_info.items():
    print(f"{feature}: {count} 个异常值")

# 可视化异常值检测结果
plt.figure(figsize=(12, 8))
X_processed.boxplot(figsize=(12, 8))
plt.xticks(rotation=45)
plt.title('特征箱线图(异常值检测)')
plt.tight_layout()
plt.show()

4.3 数据标准化

# 对数值型特征进行标准化
def standardize_features(X_train, X_test):
    """对训练集和测试集进行标准化"""
    scaler = StandardScaler()
    
    # 只对数值型特征进行标准化
    numeric_columns = X_train.select_dtypes(include=[np.number]).columns
    
    X_train_scaled = X_train.copy()
    X_test_scaled = X_test.copy()
    
    X_train_scaled[numeric_columns] = scaler.fit_transform(X_train[numeric_columns])
    X_test_scaled[numeric_columns] = scaler.transform(X_test[numeric_columns])
    
    return X_train_scaled, X_test_scaled, scaler

# 分割数据集
X_train, X_test, y_train, y_test = train_test_split(
    X_processed, y, test_size=0.2, random_state=42
)

print(f"训练集大小: {X_train.shape}")
print(f"测试集大小: {X_test.shape}")

5. 特征工程

5.1 特征选择

from sklearn.feature_selection import SelectKBest, f_regression, RFE
from sklearn.ensemble import RandomForestRegressor

def feature_selection(X_train, y_train, X_test):
    """特征选择方法"""
    
    # 方法1: 基于统计的特征选择
    selector_kbest = SelectKBest(score_func=f_regression, k=10)
    X_train_kbest = selector_kbest.fit_transform(X_train, y_train)
    X_test_kbest = selector_kbest.transform(X_test)
    
    # 获取选中的特征名称
    selected_features = X_train.columns[selector_kbest.get_support()].tolist()
    print("SelectKBest选中的特征:")
    print(selected_features)
    
    # 方法2: 基于模型的特征选择(递归特征消除)
    rf_model = RandomForestRegressor(n_estimators=100, random_state=42)
    rfe_selector = RFE(estimator=rf_model, n_features_to_select=10)
    X_train_rfe = rfe_selector.fit_transform(X_train, y_train)
    X_test_rfe = rfe_selector.transform(X_test)
    
    # 获取选中的特征名称
    selected_features_rfe = X_train.columns[rfe_selector.support_].tolist()
    print("\nRFE选中的特征:")
    print(selected_features_rfe)
    
    return X_train_kbest, X_test_kbest, selected_features

# 执行特征选择
X_train_selected, X_test_selected, selected_features = feature_selection(X_train, y_train, X_test)

5.2 特征构造

def create_features(X):
    """创建新的特征"""
    X_new = X.copy()
    
    # 创建交互特征
    if 'RM' in X.columns and 'LSTAT' in X.columns:
        X_new['RM_LSTAT_INTERACTION'] = X['RM'] * X['LSTAT']
    
    # 创建多项式特征
    if 'RM' in X.columns:
        X_new['RM_SQUARED'] = X['RM'] ** 2
    
    # 创建比率特征
    if 'CRIM' in X.columns and 'ZN' in X.columns:
        X_new['CRIM_ZN_RATIO'] = X['CRIM'] / (X['ZN'] + 1e-8)  # 避免除零
    
    return X_new

# 应用特征构造
X_train_features = create_features(X_train)
X_test_features = create_features(X_test)

print("新增特征后数据形状:")
print(f"训练集: {X_train_features.shape}")
print(f"测试集: {X_test_features.shape}")

# 重新进行特征选择
X_train_selected_final, X_test_selected_final, selected_features_final = feature_selection(
    X_train_features, y_train, X_test_features
)

6. 模型训练与调优

6.1 多模型对比

from sklearn.model_selection import cross_val_score
import time

def train_and_evaluate_models(X_train, y_train):
    """训练和评估多个模型"""
    
    # 定义模型
    models = {
        'Linear Regression': LinearRegression(),
        'Ridge Regression': Ridge(alpha=1.0),
        'Lasso Regression': Lasso(alpha=0.1),
        'Random Forest': RandomForestRegressor(n_estimators=100, random_state=42)
    }
    
    results = {}
    
    for name, model in models.items():
        print(f"\n训练 {name} 模型...")
        
        # 训练模型
        start_time = time.time()
        model.fit(X_train, y_train)
        training_time = time.time() - start_time
        
        # 交叉验证
        cv_scores = cross_val_score(model, X_train, y_train, cv=5, scoring='r2')
        
        results[name] = {
            'model': model,
            'cv_mean': cv_scores.mean(),
            'cv_std': cv_scores.std(),
            'training_time': training_time
        }
        
        print(f"{name} - CV R²: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})")
        print(f"训练时间: {training_time:.4f} 秒")
    
    return results

# 训练模型
model_results = train_and_evaluate_models(X_train_selected_final, y_train)

6.2 超参数调优

def hyperparameter_tuning(X_train, y_train):
    """超参数调优"""
    
    # 随机森林超参数调优
    rf_params = {
        'n_estimators': [50, 100, 200],
        'max_depth': [None, 10, 20, 30],
        'min_samples_split': [2, 5, 10],
        'min_samples_leaf': [1, 2, 4]
    }
    
    rf_model = RandomForestRegressor(random_state=42)
    
    print("开始随机森林超参数调优...")
    grid_search = GridSearchCV(
        rf_model, 
        rf_params, 
        cv=5, 
        scoring='r2', 
        n_jobs=-1,
        verbose=1
    )
    
    grid_search.fit(X_train, y_train)
    
    print("最佳参数:", grid_search.best_params_)
    print("最佳交叉验证得分:", grid_search.best_score_)
    
    return grid_search.best_estimator_

# 执行超参数调优
best_model = hyperparameter_tuning(X_train_selected_final, y_train)

7. 模型评估与验证

7.1 测试集评估

def evaluate_model(model, X_train, X_test, y_train, y_test):
    """模型评估函数"""
    
    # 训练模型
    model.fit(X_train, y_train)
    
    # 预测
    y_train_pred = model.predict(X_train)
    y_test_pred = model.predict(X_test)
    
    # 计算评估指标
    train_mse = mean_squared_error(y_train, y_train_pred)
    test_mse = mean_squared_error(y_test, y_test_pred)
    train_rmse = np.sqrt(train_mse)
    test_rmse = np.sqrt(test_mse)
    train_r2 = r2_score(y_train, y_train_pred)
    test_r2 = r2_score(y_test, y_test_pred)
    train_mae = mean_absolute_error(y_train, y_train_pred)
    test_mae = mean_absolute_error(y_test, y_test_pred)
    
    print("模型评估结果:")
    print("-" * 50)
    print(f"训练集 RMSE: {train_rmse:.4f}")
    print(f"测试集 RMSE: {test_rmse:.4f}")
    print(f"训练集 R²: {train_r2:.4f}")
    print(f"测试集 R²: {test_r2:.4f}")
    print(f"训练集 MAE: {train_mae:.4f}")
    print(f"测试集 MAE: {test_mae:.4f}")
    
    return {
        'model': model,
        'y_train_pred': y_train_pred,
        'y_test_pred': y_test_pred,
        'metrics': {
            'train_rmse': train_rmse,
            'test_rmse': test_rmse,
            'train_r2': train_r2,
            'test_r2': test_r2,
            'train_mae': train_mae,
            'test_mae': test_mae
        }
    }

# 评估最佳模型
evaluation_results = evaluate_model(best_model, X_train_selected_final, X_test_selected_final, y_train, y_test)

7.2 残差分析

def residual_analysis(y_true, y_pred, model_name="Model"):
    """残差分析"""
    
    residuals = y_true - y_pred
    
    fig, axes = plt.subplots(1, 3, figsize=(15, 5))
    
    # 残差vs预测值散点图
    axes[0].scatter(y_pred, residuals, alpha=0.6)
    axes[0].axhline(y=0, color='red', linestyle='--')
    axes[0].set_xlabel('预测值')
    axes[0].set_ylabel('残差')
    axes[0].set_title(f'{model_name} - 残差vs预测值')
    
    # 残差分布直方图
    axes[1].hist(residuals, bins=30, alpha=0.7, color='skyblue')
    axes[1].set_xlabel('残差')
    axes[1].set_ylabel('频次')
    axes[1].set_title(f'{model_name} - 残差分布')
    
    # Q-Q图
    from scipy import stats
    stats.probplot(residuals, dist="norm", plot=axes[2])
    axes[2].set_title(f'{model_name} - Q-Q图')
    
    plt.tight_layout()
    plt.show()
    
    # 统计信息
    print(f"\n{model_name} 残差统计:")
    print(f"残差均值: {np.mean(residuals):.4f}")
    print(f"残差标准差: {np.std(residuals):.4f}")
    print(f"残差偏度: {stats.skew(residuals):.4f}")

# 执行残差分析
residual_analysis(y_test, evaluation_results['y_test_pred'], "最佳模型")

8. 模型部署准备

8.1 模型保存与加载

import joblib
from sklearn.pipeline import Pipeline

def save_model(model, scaler, feature_names, model_path='model.pkl'):
    """保存训练好的模型和预处理器"""
    
    # 创建包含预处理和模型的管道
    model_pipeline = {
        'scaler': scaler,
        'model': model,
        'feature_names': feature_names
    }
    
    # 保存到文件
    joblib.dump(model_pipeline, model_path)
    print(f"模型已保存到 {model_path}")

def load_model(model_path='model.pkl'):
    """加载保存的模型"""
    
    model_pipeline = joblib.load(model_path)
    return model_pipeline

# 保存最佳模型
save_model(best_model, StandardScaler(), selected_features_final)

print("模型保存完成!")

8.2 创建预测函数

def predict_house_price(model_pipeline, features_dict):
    """使用训练好的模型进行房价预测"""
    
    # 获取预处理器和模型
    scaler = model_pipeline['scaler']
    model = model_pipeline['model']
    feature_names = model_pipeline['feature_names']
    
    # 创建特征向量
    features = []
    for feature in feature_names:
        if feature in features_dict:
            features.append(features_dict[feature])
        else:
            print(f"警告: 缺少特征 {feature}")
            features.append(0)  # 使用默认值
    
    # 转换为数组并标准化
    features_array = np.array(features).reshape(1, -1)
    
    # 标准化
    features_scaled = scaler.transform(features_array)
    
    # 预测
    prediction = model.predict(features_scaled)
    
    return prediction[0]

# 测试预测函数
test_features = {
    'CRIM': 0.1,
    'ZN': 20.0,
    'INDUS': 7.0,
    'CHAS': 0,
    'NOX': 0.5,
    'RM': 6.5,
    'AGE': 40.0,
    'DIS': 5.0,
    'RAD': 5,
    'TAX': 300,
    'PTRATIO': 15.0,
    'B': 390.0,
    'LSTAT': 10.0
}

# 加载模型并进行预测
try:
    loaded_model = load_model()
    predicted_price = predict_house_price(loaded_model, test_features)
    print(f"预测房价: ${predicted_price:.2f}k")
except Exception as e:
    print(f"加载模型时出错: {e}")

9. 生产环境部署

9.1 构建Flask API服务

from flask import Flask, request, jsonify
import numpy as np
import joblib
import pandas as pd

app = Flask(__name__)

# 加载模型
try:
    model_pipeline = joblib.load('model.pkl')
    print("模型加载成功")
except Exception as e:
    print(f"模型加载失败: {e}")
    model_pipeline = None

@app.route('/predict', methods=['POST'])
def predict():
    """房价预测API端点"""
    
    if model_pipeline is None:
        return jsonify({'error': '模型未正确加载'}), 500
    
    try:
        # 获取请求数据
        data = request.get_json()
        
        # 构建特征向量
        features_dict = {}
        feature_names = model_pipeline['feature_names']
        
        for feature in feature_names:
            if feature in data:
                features_dict[feature] = float(data[feature])
            else:
                return jsonify({'error': f'缺少必需的特征: {feature}'}), 400
        
        # 进行预测
        prediction = predict_house_price(model_pipeline, features_dict)
        
        return jsonify({
            'prediction': float(prediction),
            'status': 'success'
        })
        
    except Exception as e:
        return jsonify({'error': str(e)}), 400

@app.route('/health', methods=['GET'])
def health_check():
    """健康检查端点"""
    return jsonify({'status': 'healthy'})

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0', port=5000)

9.2 创建Docker容器

# Dockerfile
FROM python:3.8-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

EXPOSE 5000

CMD ["python", "app.py"]
# requirements.txt
flask==2.0.1
scikit-learn==1.0.1
pandas==1.3.3
numpy==1.21.2
joblib==1.1.0
matplotlib==3.4.3
seaborn==0.11.2

9.3 部署脚本

#!/bin/bash
# deploy.sh

echo "开始部署机器学习模型服务..."

# 构建Docker镜像
docker build -t house-price-predictor:latest .

# 运行容器
docker run -d \
  --name house-price-api \
  -p 5000:5000 \
  house-price-predictor:latest

echo "服务已启动,访问 http://localhost:5000"

10. 性能监控与维护

10.1 模型性能监控

import logging
from datetime import datetime

# 设置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('model_performance.log'),
        logging.StreamHandler()
    ]
)

def monitor_model_performance(y_true, y_pred, model_name="Model"):
    """监控模型性能"""
    
    # 计算性能指标
    mse = mean_squared_error(y_true, y_pred)
    rmse = np.sqrt(mse)
    r2 = r2_score(y_true, y_pred)
    
    # 记录日志
    logging.info(f"模型性能报告 - {model_name}")
    logging.info(f"时间: {datetime.now()}")
    logging.info(f"RMSE: {rmse:.4f}")
    logging.info(f"R²: {r2:.4f}")
    logging.info("-" * 50)
    
    return {
        'rmse': rmse,
        'r2': r2,
        'timestamp': datetime.now()
    }

# 执行性能监控
performance = monitor_model_performance(y_test, evaluation_results['y_test_pred'])

10.2 模型版本管理

import os
from datetime import datetime

def version_model(model_path, version_tag):
    """模型版本管理"""
    
    # 创建版本目录
    version_dir = f"models/version_{version_tag}"
    os.makedirs(version_dir, exist_ok=True)
    
    # 复制模型文件
    import shutil
    shutil.copy(model_path, f"{version_dir}/model.pkl")
    
    # 记录版本信息
    version_info = {
        'version': version_tag,
        'created_at': datetime.now().isoformat(),
        'model_path': model_path
    }
    
    with open(f"{version_dir}/version_info.json", 'w') as f:
        import json
        json.dump(version_info, f, indent=2)
    
    print(f"模型版本 {version_tag} 已创建")
    return version_dir

# 创建模型版本
version_dir = version_model('model.pkl', 'v1.0')

11. 最佳实践总结

11.1 数据处理最佳实践

class DataProcessor:
    """数据处理工具类"""
    
    def __init__(self):
        self.scaler = StandardScaler()
        self.imputer = SimpleImputer(strategy='median')
        
    def preprocess_data(self, X_train, X_test):
        """完整的数据预处理流程"""
        
        # 1. 处理缺失值
        X_train_processed = pd.DataFrame(
            self.imputer.fit_transform(X_train),
            columns=X_train.columns,
            index=X_train.index
        )
        
        X_test_processed = pd.DataFrame(
            self.imputer.transform(X_test),
            columns=X_test.columns,
            index=X_test.index
        )
        
        # 2. 标准化
        X_train_scaled = pd.DataFrame(
            self.scaler.fit_transform(X_train_processed),
            columns=X_train_processed.columns,
            index=X_train_processed.index
        )
        
        X_test_scaled = pd.DataFrame(
            self.scaler.transform(X_test_processed),
            columns=X_test_processed.columns,
            index=X_test_processed.index
        )
        
        return X_train_scaled, X_test_scaled
    
    def save_preprocessor(self, filepath):
        """保存预处理器"""
        joblib.dump({
            'scaler': self.scaler,
            'imputer': self.imputer
        }, filepath)
    
    def load_preprocessor(self, filepath):
        """加载预处理器"""
        preprocessor = joblib.load(filepath)
        self.scaler = preprocessor['scaler']
        self.imputer = preprocessor['imputer']

# 使用示例
processor = DataProcessor()
X_train_final, X_test_final = processor.preprocess_data(X_train, X_test)

11.2 模型开发建议

  1. 数据质量优先:始终先检查数据质量,处理缺失值和异常值
  2. 特征工程重要性:好的特征往往比复杂的模型更重要
  3. 交叉验证必要性:使用交叉验证来获得更可靠的性能估计
  4. 模型可解释性:选择合适的模型以满足业务需求
  5. 持续监控:部署后要持续监控模型性能变化

12. 总结与展望

通过这个完整的机器学习项目实践,我们涵盖了从数据预处理到模型部署的全流程。这个项目展示了:

  • 数据探索和可视化的重要性
  • 完整的数据清洗和预处理步骤
  • 多种模型的比较和选择
  • 超参数调优技术
  • 模型评估
相关推荐
广告位招租

相似文章

    评论 (0)

    0/2000