Python AI机器学习项目实战:从数据预处理到模型部署全流程

PoorEthan
PoorEthan 2026-02-02T19:15:09+08:00
0 0 2

引言

在人工智能和机器学习快速发展的今天,Python作为最受欢迎的编程语言之一,在AI领域扮演着至关重要的角色。本文将通过一个完整的机器学习项目实例,深入演示如何使用Python构建、训练和部署机器学习模型,涵盖从数据预处理到生产环境部署的全流程。

我们将使用TensorFlow和PyTorch等主流深度学习框架,结合实际的数据集,展示从原始数据到可部署模型的完整技术栈。通过本文的学习,读者将掌握现代AI项目开发的核心技能和最佳实践。

1. 项目概述与环境准备

1.1 项目背景

本次实战项目以房价预测为例,我们将构建一个能够根据房屋特征预测房价的机器学习模型。项目涉及数据清洗、特征工程、模型训练、评估验证以及最终的部署流程。

1.2 技术栈介绍

# 环境依赖包
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import mean_squared_error, r2_score
import tensorflow as tf
from tensorflow import keras
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import joblib
import warnings
warnings.filterwarnings('ignore')

# 检查版本信息
print("Python版本:", sys.version)
print("Pandas版本:", pd.__version__)
print("TensorFlow版本:", tf.__version__)
print("PyTorch版本:", torch.__version__)

1.3 数据集介绍

我们将使用经典的波士顿房价数据集(Boston Housing Dataset),包含506个样本,13个特征变量。主要特征包括:

  • 房屋平均房间数
  • 城市犯罪率
  • 非零售商业面积比例
  • 交通便利性指数等

2. 数据预处理与探索性分析

2.1 数据加载与初步检查

# 加载数据集
from sklearn.datasets import fetch_california_housing
import pandas as pd

# 加载加州房价数据集(波士顿房价的现代替代品)
housing = fetch_california_housing()
X = pd.DataFrame(housing.data, columns=housing.feature_names)
y = pd.Series(housing.target, name='target')

print("数据集形状:", X.shape)
print("\n数据集基本信息:")
print(X.info())
print("\n数据集描述统计:")
print(X.describe())

# 检查缺失值
print("\n缺失值检查:")
print(X.isnull().sum())

2.2 数据质量分析

# 可视化数据分布
fig, axes = plt.subplots(3, 4, figsize=(15, 10))
axes = axes.ravel()

for i, column in enumerate(X.columns):
    axes[i].hist(X[column], bins=30, alpha=0.7)
    axes[i].set_title(f'{column} 分布')
    axes[i].set_xlabel(column)
    axes[i].set_ylabel('频次')

plt.tight_layout()
plt.show()

# 相关性分析
correlation_matrix = X.corr()
plt.figure(figsize=(10, 8))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0)
plt.title('特征相关性热力图')
plt.show()

2.3 异常值检测与处理

# 异常值检测(使用IQR方法)
def detect_outliers(df, columns):
    outliers = {}
    for column in columns:
        Q1 = df[column].quantile(0.25)
        Q3 = df[column].quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        
        outlier_count = len(df[(df[column] < lower_bound) | (df[column] > upper_bound)])
        outliers[column] = outlier_count
    
    return outliers

# 检测异常值
outliers = detect_outliers(X, X.columns)
print("各特征异常值数量:")
for feature, count in outliers.items():
    print(f"{feature}: {count}")

# 使用箱线图可视化异常值
plt.figure(figsize=(12, 8))
X.boxplot()
plt.title('特征箱线图(异常值检测)')
plt.xticks(rotation=45)
plt.show()

2.4 数据清洗与预处理

# 处理缺失值和异常值
def preprocess_data(df):
    # 检查并处理缺失值
    if df.isnull().sum().sum() > 0:
        print("发现缺失值,使用中位数填充")
        df = df.fillna(df.median())
    
    # 异常值处理:使用截断方法
    for column in df.columns:
        Q1 = df[column].quantile(0.25)
        Q3 = df[column].quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        
        # 截断异常值
        df[column] = df[column].clip(lower=lower_bound, upper=upper_bound)
    
    return df

# 应用预处理
X_processed = preprocess_data(X.copy())
print("预处理后数据形状:", X_processed.shape)

3. 特征工程与数据分割

3.1 特征选择与构造

# 特征重要性分析
from sklearn.ensemble import RandomForestRegressor
from sklearn.feature_selection import SelectKBest, f_regression

# 使用随机森林进行特征重要性分析
rf_model = RandomForestRegressor(n_estimators=100, random_state=42)
rf_model.fit(X_processed, y)

# 获取特征重要性
feature_importance = pd.DataFrame({
    'feature': X_processed.columns,
    'importance': rf_model.feature_importances_
}).sort_values('importance', ascending=False)

print("特征重要性排序:")
print(feature_importance)

# 可视化特征重要性
plt.figure(figsize=(10, 6))
sns.barplot(data=feature_importance, x='importance', y='feature')
plt.title('特征重要性分析')
plt.xlabel('重要性得分')
plt.show()

# 构造新特征
def create_features(df):
    df_new = df.copy()
    
    # 创建交互特征
    if 'MedInc' in df_new.columns and 'AveRooms' in df_new.columns:
        df_new['MedInc_AveRooms'] = df_new['MedInc'] * df_new['AveRooms']
    
    # 创建比率特征
    if 'AveRooms' in df_new.columns and 'AveBedrms' in df_new.columns:
        df_new['Room_Bedroom_Ratio'] = df_new['AveRooms'] / (df_new['AveBedrms'] + 1e-8)
    
    return df_new

X_engineered = create_features(X_processed)
print("工程化后特征数量:", X_engineered.shape[1])

3.2 数据分割与标准化

# 分割训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(
    X_engineered, y, test_size=0.2, random_state=42
)

print(f"训练集形状: {X_train.shape}")
print(f"测试集形状: {X_test.shape}")

# 特征标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

# 保存标准化器用于生产环境
joblib.dump(scaler, 'models/scaler.pkl')
print("标准化器已保存")

# 转换为numpy数组格式
X_train_array = X_train_scaled.astype(np.float32)
X_test_array = X_test_scaled.astype(np.float32)
y_train_array = y_train.values.astype(np.float32)
y_test_array = y_test.values.astype(np.float32)

4. 模型构建与训练

4.1 TensorFlow/Keras深度学习模型

# 构建神经网络模型
def create_neural_network(input_dim):
    model = keras.Sequential([
        # 输入层
        keras.layers.Dense(128, activation='relu', input_shape=(input_dim,)),
        keras.layers.Dropout(0.3),
        
        # 隐藏层
        keras.layers.Dense(64, activation='relu'),
        keras.layers.Dropout(0.3),
        
        keras.layers.Dense(32, activation='relu'),
        keras.layers.Dropout(0.2),
        
        # 输出层
        keras.layers.Dense(1, activation='linear')
    ])
    
    # 编译模型
    model.compile(
        optimizer=keras.optimizers.Adam(learning_rate=0.001),
        loss='mean_squared_error',
        metrics=['mean_absolute_error']
    )
    
    return model

# 创建并训练模型
model = create_neural_network(X_train_array.shape[1])
print(model.summary())

# 设置回调函数
callbacks = [
    keras.callbacks.EarlyStopping(
        monitor='val_loss',
        patience=20,
        restore_best_weights=True
    ),
    keras.callbacks.ReduceLROnPlateau(
        monitor='val_loss',
        factor=0.5,
        patience=10,
        min_lr=1e-7
    )
]

# 训练模型
history = model.fit(
    X_train_array, y_train_array,
    batch_size=32,
    epochs=100,
    validation_split=0.2,
    callbacks=callbacks,
    verbose=1
)

4.2 PyTorch深度学习模型

# PyTorch模型定义
class HousingPriceModel(nn.Module):
    def __init__(self, input_size):
        super(HousingPriceModel, self).__init__()
        self.network = nn.Sequential(
            nn.Linear(input_size, 128),
            nn.ReLU(),
            nn.Dropout(0.3),
            
            nn.Linear(128, 64),
            nn.ReLU(),
            nn.Dropout(0.3),
            
            nn.Linear(64, 32),
            nn.ReLU(),
            nn.Dropout(0.2),
            
            nn.Linear(32, 1)
        )
    
    def forward(self, x):
        return self.network(x)

# 初始化模型
torch_model = HousingPriceModel(X_train_array.shape[1])

# 定义损失函数和优化器
criterion = nn.MSELoss()
optimizer = optim.Adam(torch_model.parameters(), lr=0.001)

# 转换数据为PyTorch张量
X_train_tensor = torch.FloatTensor(X_train_array)
y_train_tensor = torch.FloatTensor(y_train_array).reshape(-1, 1)
X_test_tensor = torch.FloatTensor(X_test_array)
y_test_tensor = torch.FloatTensor(y_test_array).reshape(-1, 1)

# 训练PyTorch模型
def train_pytorch_model(model, criterion, optimizer, X_train, y_train, epochs=50):
    model.train()
    losses = []
    
    for epoch in range(epochs):
        optimizer.zero_grad()
        outputs = model(X_train)
        loss = criterion(outputs, y_train)
        loss.backward()
        optimizer.step()
        
        losses.append(loss.item())
        
        if (epoch + 1) % 10 == 0:
            print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}')
    
    return losses

# 训练模型
torch_losses = train_pytorch_model(torch_model, criterion, optimizer, 
                                  X_train_tensor, y_train_tensor, epochs=50)

4.3 传统机器学习模型对比

from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.svm import SVR
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error

# 定义多个模型
models = {
    'Linear Regression': LinearRegression(),
    'Random Forest': RandomForestRegressor(n_estimators=100, random_state=42),
    'Gradient Boosting': GradientBoostingRegressor(random_state=42),
    'SVR': SVR(kernel='rbf')
}

# 训练和评估所有模型
model_results = {}

for name, model in models.items():
    # 训练模型
    if name == 'Linear Regression':
        model.fit(X_train_scaled, y_train)
    else:
        model.fit(X_train_array, y_train_array)
    
    # 预测
    if name == 'Linear Regression':
        y_pred = model.predict(X_test_scaled)
    else:
        y_pred = model.predict(X_test_array)
    
    # 评估
    mse = mean_squared_error(y_test, y_pred)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(y_test, y_pred)
    r2 = r2_score(y_test, y_pred)
    
    model_results[name] = {
        'MSE': mse,
        'RMSE': rmse,
        'MAE': mae,
        'R²': r2
    }
    
    print(f"{name} 结果:")
    print(f"  RMSE: {rmse:.4f}")
    print(f"  MAE: {mae:.4f}")
    print(f"  R²: {r2:.4f}")
    print()

# 可视化模型性能对比
results_df = pd.DataFrame(model_results).T
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
axes = axes.ravel()

metrics = ['RMSE', 'MAE', 'R²']
for i, metric in enumerate(metrics):
    ax = axes[i]
    if metric == 'R²':
        results_df[metric].plot(kind='bar', ax=ax, color='green')
        ax.set_title(f'{metric} 对比')
        ax.set_ylabel(metric)
    else:
        results_df[metric].plot(kind='bar', ax=ax, color='blue')
        ax.set_title(f'{metric} 对比')
        ax.set_ylabel(metric)

plt.tight_layout()
plt.show()

5. 模型评估与验证

5.1 深度学习模型评估

# TensorFlow/Keras模型评估
def evaluate_model(model, X_test, y_test):
    # 预测
    y_pred = model.predict(X_test)
    
    # 计算指标
    mse = mean_squared_error(y_test, y_pred)
    rmse = np.sqrt(mse)
    mae = mean_absolute_error(y_test, y_pred)
    r2 = r2_score(y_test, y_pred)
    
    print("TensorFlow模型评估结果:")
    print(f"RMSE: {rmse:.4f}")
    print(f"MAE: {mae:.4f}")
    print(f"R²: {r2:.4f}")
    
    return y_pred

# 评估深度学习模型
tf_predictions = evaluate_model(model, X_test_array, y_test_array)

# 可视化预测结果
plt.figure(figsize=(10, 6))
plt.scatter(y_test, tf_predictions, alpha=0.5)
plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--', lw=2)
plt.xlabel('真实值')
plt.ylabel('预测值')
plt.title('TensorFlow模型预测结果对比')
plt.show()

5.2 学习曲线分析

# 绘制训练历史
def plot_training_history(history):
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
    
    # 损失函数
    ax1.plot(history.history['loss'], label='训练损失')
    ax1.plot(history.history['val_loss'], label='验证损失')
    ax1.set_title('模型损失')
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Loss')
    ax1.legend()
    
    # MAE
    ax2.plot(history.history['mean_absolute_error'], label='训练MAE')
    ax2.plot(history.history['val_mean_absolute_error'], label='验证MAE')
    ax2.set_title('模型MAE')
    ax2.set_xlabel('Epoch')
    ax2.set_ylabel('MAE')
    ax2.legend()
    
    plt.tight_layout()
    plt.show()

plot_training_history(history)

5.3 交叉验证

from sklearn.model_selection import cross_val_score, KFold

# 对最佳模型进行交叉验证
kfold = KFold(n_splits=5, shuffle=True, random_state=42)

# 使用随机森林进行交叉验证
rf_model_cv = RandomForestRegressor(n_estimators=100, random_state=42)
cv_scores = cross_val_score(rf_model_cv, X_train_array, y_train_array, 
                           cv=kfold, scoring='r2')

print("交叉验证结果:")
print(f"各折R²分数: {cv_scores}")
print(f"平均R²分数: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})")

6. 模型优化与调参

6.1 超参数调优

from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestRegressor

# 随机森林超参数调优
param_grid = {
    'n_estimators': [50, 100, 200],
    'max_depth': [None, 10, 20, 30],
    'min_samples_split': [2, 5, 10],
    'min_samples_leaf': [1, 2, 4]
}

# 网格搜索
rf_grid = RandomForestRegressor(random_state=42)
grid_search = GridSearchCV(
    rf_grid, param_grid, cv=3, scoring='r2', 
    n_jobs=-1, verbose=1
)

grid_search.fit(X_train_array, y_train_array)
print("最佳参数:", grid_search.best_params_)
print("最佳交叉验证分数:", grid_search.best_score_)

# 使用最佳模型
best_model = grid_search.best_estimator_

6.2 模型集成

from sklearn.ensemble import VotingRegressor
from sklearn.linear_model import LinearRegression
from sklearn.svm import SVR

# 创建集成模型
ensemble_model = VotingRegressor([
    ('rf', RandomForestRegressor(n_estimators=100, random_state=42)),
    ('lr', LinearRegression()),
    ('svr', SVR(kernel='rbf'))
])

# 训练集成模型
ensemble_model.fit(X_train_array, y_train_array)

# 评估集成模型
ensemble_pred = ensemble_model.predict(X_test_array)
ensemble_rmse = np.sqrt(mean_squared_error(y_test, ensemble_pred))
ensemble_r2 = r2_score(y_test, ensemble_pred)

print("集成模型结果:")
print(f"RMSE: {ensemble_rmse:.4f}")
print(f"R²: {ensemble_r2:.4f}")

7. 模型部署准备

7.1 模型保存与版本控制

import os
import joblib
from datetime import datetime

# 创建模型目录结构
os.makedirs('models', exist_ok=True)
os.makedirs('models/tensorflow_model', exist_ok=True)
os.makedirs('models/pytorch_model', exist_ok=True)

# 保存TensorFlow模型
model.save('models/tensorflow_model/best_model.h5')
print("TensorFlow模型已保存")

# 保存PyTorch模型
torch.save(torch_model.state_dict(), 'models/pytorch_model/best_model.pth')
print("PyTorch模型已保存")

# 保存最佳模型(随机森林)
joblib.dump(best_model, 'models/best_model.pkl')
print("最佳模型已保存")

# 保存模型元数据
model_metadata = {
    'created_at': datetime.now().isoformat(),
    'model_type': 'RandomForest',
    'features_count': X_train_array.shape[1],
    'training_samples': len(X_train_array),
    'validation_score': best_model.score(X_test_array, y_test_array)
}

joblib.dump(model_metadata, 'models/model_metadata.pkl')
print("模型元数据已保存")

7.2 模型推理封装

class HousingPricePredictor:
    def __init__(self, model_path=None, scaler_path=None):
        self.model = None
        self.scaler = None
        self.model_type = None
        
        if model_path and scaler_path:
            self.load_model(model_path, scaler_path)
    
    def load_model(self, model_path, scaler_path):
        """加载模型和标准化器"""
        # 加载标准化器
        self.scaler = joblib.load(scaler_path)
        
        # 根据路径判断模型类型
        if 'tensorflow' in model_path.lower():
            self.model = tf.keras.models.load_model(model_path)
            self.model_type = 'tensorflow'
        elif 'pytorch' in model_path.lower():
            # 加载PyTorch模型
            self.model = HousingPriceModel(X_train_array.shape[1])
            self.model.load_state_dict(torch.load(model_path))
            self.model.eval()
            self.model_type = 'pytorch'
        else:
            # 加载sklearn模型
            self.model = joblib.load(model_path)
            self.model_type = 'sklearn'
    
    def predict(self, features):
        """进行预测"""
        if isinstance(features, list):
            features = np.array(features).reshape(1, -1)
        
        # 标准化特征
        features_scaled = self.scaler.transform(features)
        
        if self.model_type == 'tensorflow':
            prediction = self.model.predict(features_scaled)
            return float(prediction[0][0])
        elif self.model_type == 'pytorch':
            with torch.no_grad():
                features_tensor = torch.FloatTensor(features_scaled)
                prediction = self.model(features_tensor)
                return float(prediction.item())
        else:
            prediction = self.model.predict(features_scaled)
            return float(prediction[0])
    
    def predict_batch(self, features_list):
        """批量预测"""
        predictions = []
        for features in features_list:
            pred = self.predict(features)
            predictions.append(pred)
        return predictions

# 创建预测器实例
predictor = HousingPricePredictor(
    model_path='models/best_model.pkl',
    scaler_path='models/scaler.pkl'
)

# 测试预测功能
test_features = X_test.iloc[0].values.tolist()
prediction = predictor.predict(test_features)
print(f"预测房价: ${prediction:.2f}")

8. 生产环境部署方案

8.1 Flask Web API部署

from flask import Flask, request, jsonify
import numpy as np

app = Flask(__name__)

# 初始化预测器
predictor = HousingPricePredictor(
    model_path='models/best_model.pkl',
    scaler_path='models/scaler.pkl'
)

@app.route('/predict', methods=['POST'])
def predict():
    try:
        # 获取请求数据
        data = request.get_json()
        
        # 验证输入
        required_fields = ['features']
        for field in required_fields:
            if field not in data:
                return jsonify({'error': f'Missing required field: {field}'}), 400
        
        features = data['features']
        
        # 进行预测
        prediction = predictor.predict(features)
        
        return jsonify({
            'prediction': float(prediction),
            'status': 'success'
        })
    
    except Exception as e:
        return jsonify({'error': str(e)}), 500

@app.route('/health', methods=['GET'])
def health_check():
    return jsonify({'status': 'healthy'})

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=True)

8.2 Docker容器化部署

# Dockerfile
FROM python:3.8-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

EXPOSE 5000

CMD ["python", "app.py"]
# requirements.txt
flask==2.0.1
numpy==1.21.0
pandas==1.3.0
scikit-learn==1.0.0
tensorflow==2.8.0
torch==1.9.0
joblib==1.1.0
gunicorn==20.1.0

8.3 Kubernetes部署配置

# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: housing-price-predictor
spec:
  replicas: 3
  selector:
    matchLabels:
      app: housing-price-predictor
  template:
    metadata:
      labels:
        app: housing-price-predictor
    spec:
      containers:
      - name: predictor
        image: housing-price-predictor:latest
        ports:
        - containerPort: 5000
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"

---
apiVersion: v1
kind: Service
metadata:
  name: housing-price-predictor-service
spec:
  selector:
    app: housing-price-predictor
  ports:
  - port: 80
    targetPort: 5000
  type: LoadBalancer

9. 性能监控与模型更新

9.1 模型性能监控

import logging
from datetime import datetime

# 设置日志记录
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('model_performance.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

class ModelMonitor:
    def __init__(self):
        self.predictions = []
        self.performance_history = []
    
    def log_prediction(self, features, prediction, actual=None):
        """记录预测结果"""
        record = {
            'timestamp': datetime.now().isoformat(),
            'features': features,
            'prediction': prediction,
            'actual': actual
        }
        
        self.predictions.append(record)
        logger.info(f"Prediction recorded: {prediction}")
    
    def calculate_metrics(self, predictions, actuals):
        """计算性能指标"""
        mse = mean_squared_error(actuals, predictions)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(actuals, predictions)
        r2 = r2_score(actuals, predictions)
        
        metrics = {
            'rmse': rmse,
            'mae': mae,
            'r2': r2,
            'timestamp': datetime.now().isoformat()
        }
        
        self.performance_history.append(metrics)
        logger.info(f"Performance metrics: RMSE={rmse:.4f}, R²={r2:.4f}")
        
        return metrics

# 使用监控器
monitor = ModelMonitor()

9.2 模型版本管理

import shutil
import os
from
相关推荐
广告位招租

相似文章

    评论 (0)

    0/2000