引言
在人工智能和机器学习快速发展的今天,Python作为最受欢迎的编程语言之一,在AI领域扮演着至关重要的角色。本文将通过一个完整的机器学习项目实例,深入演示如何使用Python构建、训练和部署机器学习模型,涵盖从数据预处理到生产环境部署的全流程。
我们将使用TensorFlow和PyTorch等主流深度学习框架,结合实际的数据集,展示从原始数据到可部署模型的完整技术栈。通过本文的学习,读者将掌握现代AI项目开发的核心技能和最佳实践。
1. 项目概述与环境准备
1.1 项目背景
本次实战项目以房价预测为例,我们将构建一个能够根据房屋特征预测房价的机器学习模型。项目涉及数据清洗、特征工程、模型训练、评估验证以及最终的部署流程。
1.2 技术栈介绍
# 环境依赖包
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import mean_squared_error, r2_score
import tensorflow as tf
from tensorflow import keras
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import joblib
import warnings
warnings.filterwarnings('ignore')
# 检查版本信息
print("Python版本:", sys.version)
print("Pandas版本:", pd.__version__)
print("TensorFlow版本:", tf.__version__)
print("PyTorch版本:", torch.__version__)
1.3 数据集介绍
我们将使用经典的波士顿房价数据集(Boston Housing Dataset),包含506个样本,13个特征变量。主要特征包括:
- 房屋平均房间数
- 城市犯罪率
- 非零售商业面积比例
- 交通便利性指数等
2. 数据预处理与探索性分析
2.1 数据加载与初步检查
# 加载数据集
from sklearn.datasets import fetch_california_housing
import pandas as pd
# 加载加州房价数据集(波士顿房价的现代替代品)
housing = fetch_california_housing()
X = pd.DataFrame(housing.data, columns=housing.feature_names)
y = pd.Series(housing.target, name='target')
print("数据集形状:", X.shape)
print("\n数据集基本信息:")
print(X.info())
print("\n数据集描述统计:")
print(X.describe())
# 检查缺失值
print("\n缺失值检查:")
print(X.isnull().sum())
2.2 数据质量分析
# 可视化数据分布
fig, axes = plt.subplots(3, 4, figsize=(15, 10))
axes = axes.ravel()
for i, column in enumerate(X.columns):
axes[i].hist(X[column], bins=30, alpha=0.7)
axes[i].set_title(f'{column} 分布')
axes[i].set_xlabel(column)
axes[i].set_ylabel('频次')
plt.tight_layout()
plt.show()
# 相关性分析
correlation_matrix = X.corr()
plt.figure(figsize=(10, 8))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0)
plt.title('特征相关性热力图')
plt.show()
2.3 异常值检测与处理
# 异常值检测(使用IQR方法)
def detect_outliers(df, columns):
outliers = {}
for column in columns:
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outlier_count = len(df[(df[column] < lower_bound) | (df[column] > upper_bound)])
outliers[column] = outlier_count
return outliers
# 检测异常值
outliers = detect_outliers(X, X.columns)
print("各特征异常值数量:")
for feature, count in outliers.items():
print(f"{feature}: {count}")
# 使用箱线图可视化异常值
plt.figure(figsize=(12, 8))
X.boxplot()
plt.title('特征箱线图(异常值检测)')
plt.xticks(rotation=45)
plt.show()
2.4 数据清洗与预处理
# 处理缺失值和异常值
def preprocess_data(df):
# 检查并处理缺失值
if df.isnull().sum().sum() > 0:
print("发现缺失值,使用中位数填充")
df = df.fillna(df.median())
# 异常值处理:使用截断方法
for column in df.columns:
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
# 截断异常值
df[column] = df[column].clip(lower=lower_bound, upper=upper_bound)
return df
# 应用预处理
X_processed = preprocess_data(X.copy())
print("预处理后数据形状:", X_processed.shape)
3. 特征工程与数据分割
3.1 特征选择与构造
# 特征重要性分析
from sklearn.ensemble import RandomForestRegressor
from sklearn.feature_selection import SelectKBest, f_regression
# 使用随机森林进行特征重要性分析
rf_model = RandomForestRegressor(n_estimators=100, random_state=42)
rf_model.fit(X_processed, y)
# 获取特征重要性
feature_importance = pd.DataFrame({
'feature': X_processed.columns,
'importance': rf_model.feature_importances_
}).sort_values('importance', ascending=False)
print("特征重要性排序:")
print(feature_importance)
# 可视化特征重要性
plt.figure(figsize=(10, 6))
sns.barplot(data=feature_importance, x='importance', y='feature')
plt.title('特征重要性分析')
plt.xlabel('重要性得分')
plt.show()
# 构造新特征
def create_features(df):
df_new = df.copy()
# 创建交互特征
if 'MedInc' in df_new.columns and 'AveRooms' in df_new.columns:
df_new['MedInc_AveRooms'] = df_new['MedInc'] * df_new['AveRooms']
# 创建比率特征
if 'AveRooms' in df_new.columns and 'AveBedrms' in df_new.columns:
df_new['Room_Bedroom_Ratio'] = df_new['AveRooms'] / (df_new['AveBedrms'] + 1e-8)
return df_new
X_engineered = create_features(X_processed)
print("工程化后特征数量:", X_engineered.shape[1])
3.2 数据分割与标准化
# 分割训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(
X_engineered, y, test_size=0.2, random_state=42
)
print(f"训练集形状: {X_train.shape}")
print(f"测试集形状: {X_test.shape}")
# 特征标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 保存标准化器用于生产环境
joblib.dump(scaler, 'models/scaler.pkl')
print("标准化器已保存")
# 转换为numpy数组格式
X_train_array = X_train_scaled.astype(np.float32)
X_test_array = X_test_scaled.astype(np.float32)
y_train_array = y_train.values.astype(np.float32)
y_test_array = y_test.values.astype(np.float32)
4. 模型构建与训练
4.1 TensorFlow/Keras深度学习模型
# 构建神经网络模型
def create_neural_network(input_dim):
model = keras.Sequential([
# 输入层
keras.layers.Dense(128, activation='relu', input_shape=(input_dim,)),
keras.layers.Dropout(0.3),
# 隐藏层
keras.layers.Dense(64, activation='relu'),
keras.layers.Dropout(0.3),
keras.layers.Dense(32, activation='relu'),
keras.layers.Dropout(0.2),
# 输出层
keras.layers.Dense(1, activation='linear')
])
# 编译模型
model.compile(
optimizer=keras.optimizers.Adam(learning_rate=0.001),
loss='mean_squared_error',
metrics=['mean_absolute_error']
)
return model
# 创建并训练模型
model = create_neural_network(X_train_array.shape[1])
print(model.summary())
# 设置回调函数
callbacks = [
keras.callbacks.EarlyStopping(
monitor='val_loss',
patience=20,
restore_best_weights=True
),
keras.callbacks.ReduceLROnPlateau(
monitor='val_loss',
factor=0.5,
patience=10,
min_lr=1e-7
)
]
# 训练模型
history = model.fit(
X_train_array, y_train_array,
batch_size=32,
epochs=100,
validation_split=0.2,
callbacks=callbacks,
verbose=1
)
4.2 PyTorch深度学习模型
# PyTorch模型定义
class HousingPriceModel(nn.Module):
def __init__(self, input_size):
super(HousingPriceModel, self).__init__()
self.network = nn.Sequential(
nn.Linear(input_size, 128),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(128, 64),
nn.ReLU(),
nn.Dropout(0.3),
nn.Linear(64, 32),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(32, 1)
)
def forward(self, x):
return self.network(x)
# 初始化模型
torch_model = HousingPriceModel(X_train_array.shape[1])
# 定义损失函数和优化器
criterion = nn.MSELoss()
optimizer = optim.Adam(torch_model.parameters(), lr=0.001)
# 转换数据为PyTorch张量
X_train_tensor = torch.FloatTensor(X_train_array)
y_train_tensor = torch.FloatTensor(y_train_array).reshape(-1, 1)
X_test_tensor = torch.FloatTensor(X_test_array)
y_test_tensor = torch.FloatTensor(y_test_array).reshape(-1, 1)
# 训练PyTorch模型
def train_pytorch_model(model, criterion, optimizer, X_train, y_train, epochs=50):
model.train()
losses = []
for epoch in range(epochs):
optimizer.zero_grad()
outputs = model(X_train)
loss = criterion(outputs, y_train)
loss.backward()
optimizer.step()
losses.append(loss.item())
if (epoch + 1) % 10 == 0:
print(f'Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}')
return losses
# 训练模型
torch_losses = train_pytorch_model(torch_model, criterion, optimizer,
X_train_tensor, y_train_tensor, epochs=50)
4.3 传统机器学习模型对比
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.svm import SVR
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
# 定义多个模型
models = {
'Linear Regression': LinearRegression(),
'Random Forest': RandomForestRegressor(n_estimators=100, random_state=42),
'Gradient Boosting': GradientBoostingRegressor(random_state=42),
'SVR': SVR(kernel='rbf')
}
# 训练和评估所有模型
model_results = {}
for name, model in models.items():
# 训练模型
if name == 'Linear Regression':
model.fit(X_train_scaled, y_train)
else:
model.fit(X_train_array, y_train_array)
# 预测
if name == 'Linear Regression':
y_pred = model.predict(X_test_scaled)
else:
y_pred = model.predict(X_test_array)
# 评估
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
model_results[name] = {
'MSE': mse,
'RMSE': rmse,
'MAE': mae,
'R²': r2
}
print(f"{name} 结果:")
print(f" RMSE: {rmse:.4f}")
print(f" MAE: {mae:.4f}")
print(f" R²: {r2:.4f}")
print()
# 可视化模型性能对比
results_df = pd.DataFrame(model_results).T
fig, axes = plt.subplots(2, 2, figsize=(12, 10))
axes = axes.ravel()
metrics = ['RMSE', 'MAE', 'R²']
for i, metric in enumerate(metrics):
ax = axes[i]
if metric == 'R²':
results_df[metric].plot(kind='bar', ax=ax, color='green')
ax.set_title(f'{metric} 对比')
ax.set_ylabel(metric)
else:
results_df[metric].plot(kind='bar', ax=ax, color='blue')
ax.set_title(f'{metric} 对比')
ax.set_ylabel(metric)
plt.tight_layout()
plt.show()
5. 模型评估与验证
5.1 深度学习模型评估
# TensorFlow/Keras模型评估
def evaluate_model(model, X_test, y_test):
# 预测
y_pred = model.predict(X_test)
# 计算指标
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
mae = mean_absolute_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
print("TensorFlow模型评估结果:")
print(f"RMSE: {rmse:.4f}")
print(f"MAE: {mae:.4f}")
print(f"R²: {r2:.4f}")
return y_pred
# 评估深度学习模型
tf_predictions = evaluate_model(model, X_test_array, y_test_array)
# 可视化预测结果
plt.figure(figsize=(10, 6))
plt.scatter(y_test, tf_predictions, alpha=0.5)
plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--', lw=2)
plt.xlabel('真实值')
plt.ylabel('预测值')
plt.title('TensorFlow模型预测结果对比')
plt.show()
5.2 学习曲线分析
# 绘制训练历史
def plot_training_history(history):
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 4))
# 损失函数
ax1.plot(history.history['loss'], label='训练损失')
ax1.plot(history.history['val_loss'], label='验证损失')
ax1.set_title('模型损失')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Loss')
ax1.legend()
# MAE
ax2.plot(history.history['mean_absolute_error'], label='训练MAE')
ax2.plot(history.history['val_mean_absolute_error'], label='验证MAE')
ax2.set_title('模型MAE')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('MAE')
ax2.legend()
plt.tight_layout()
plt.show()
plot_training_history(history)
5.3 交叉验证
from sklearn.model_selection import cross_val_score, KFold
# 对最佳模型进行交叉验证
kfold = KFold(n_splits=5, shuffle=True, random_state=42)
# 使用随机森林进行交叉验证
rf_model_cv = RandomForestRegressor(n_estimators=100, random_state=42)
cv_scores = cross_val_score(rf_model_cv, X_train_array, y_train_array,
cv=kfold, scoring='r2')
print("交叉验证结果:")
print(f"各折R²分数: {cv_scores}")
print(f"平均R²分数: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})")
6. 模型优化与调参
6.1 超参数调优
from sklearn.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestRegressor
# 随机森林超参数调优
param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [None, 10, 20, 30],
'min_samples_split': [2, 5, 10],
'min_samples_leaf': [1, 2, 4]
}
# 网格搜索
rf_grid = RandomForestRegressor(random_state=42)
grid_search = GridSearchCV(
rf_grid, param_grid, cv=3, scoring='r2',
n_jobs=-1, verbose=1
)
grid_search.fit(X_train_array, y_train_array)
print("最佳参数:", grid_search.best_params_)
print("最佳交叉验证分数:", grid_search.best_score_)
# 使用最佳模型
best_model = grid_search.best_estimator_
6.2 模型集成
from sklearn.ensemble import VotingRegressor
from sklearn.linear_model import LinearRegression
from sklearn.svm import SVR
# 创建集成模型
ensemble_model = VotingRegressor([
('rf', RandomForestRegressor(n_estimators=100, random_state=42)),
('lr', LinearRegression()),
('svr', SVR(kernel='rbf'))
])
# 训练集成模型
ensemble_model.fit(X_train_array, y_train_array)
# 评估集成模型
ensemble_pred = ensemble_model.predict(X_test_array)
ensemble_rmse = np.sqrt(mean_squared_error(y_test, ensemble_pred))
ensemble_r2 = r2_score(y_test, ensemble_pred)
print("集成模型结果:")
print(f"RMSE: {ensemble_rmse:.4f}")
print(f"R²: {ensemble_r2:.4f}")
7. 模型部署准备
7.1 模型保存与版本控制
import os
import joblib
from datetime import datetime
# 创建模型目录结构
os.makedirs('models', exist_ok=True)
os.makedirs('models/tensorflow_model', exist_ok=True)
os.makedirs('models/pytorch_model', exist_ok=True)
# 保存TensorFlow模型
model.save('models/tensorflow_model/best_model.h5')
print("TensorFlow模型已保存")
# 保存PyTorch模型
torch.save(torch_model.state_dict(), 'models/pytorch_model/best_model.pth')
print("PyTorch模型已保存")
# 保存最佳模型(随机森林)
joblib.dump(best_model, 'models/best_model.pkl')
print("最佳模型已保存")
# 保存模型元数据
model_metadata = {
'created_at': datetime.now().isoformat(),
'model_type': 'RandomForest',
'features_count': X_train_array.shape[1],
'training_samples': len(X_train_array),
'validation_score': best_model.score(X_test_array, y_test_array)
}
joblib.dump(model_metadata, 'models/model_metadata.pkl')
print("模型元数据已保存")
7.2 模型推理封装
class HousingPricePredictor:
def __init__(self, model_path=None, scaler_path=None):
self.model = None
self.scaler = None
self.model_type = None
if model_path and scaler_path:
self.load_model(model_path, scaler_path)
def load_model(self, model_path, scaler_path):
"""加载模型和标准化器"""
# 加载标准化器
self.scaler = joblib.load(scaler_path)
# 根据路径判断模型类型
if 'tensorflow' in model_path.lower():
self.model = tf.keras.models.load_model(model_path)
self.model_type = 'tensorflow'
elif 'pytorch' in model_path.lower():
# 加载PyTorch模型
self.model = HousingPriceModel(X_train_array.shape[1])
self.model.load_state_dict(torch.load(model_path))
self.model.eval()
self.model_type = 'pytorch'
else:
# 加载sklearn模型
self.model = joblib.load(model_path)
self.model_type = 'sklearn'
def predict(self, features):
"""进行预测"""
if isinstance(features, list):
features = np.array(features).reshape(1, -1)
# 标准化特征
features_scaled = self.scaler.transform(features)
if self.model_type == 'tensorflow':
prediction = self.model.predict(features_scaled)
return float(prediction[0][0])
elif self.model_type == 'pytorch':
with torch.no_grad():
features_tensor = torch.FloatTensor(features_scaled)
prediction = self.model(features_tensor)
return float(prediction.item())
else:
prediction = self.model.predict(features_scaled)
return float(prediction[0])
def predict_batch(self, features_list):
"""批量预测"""
predictions = []
for features in features_list:
pred = self.predict(features)
predictions.append(pred)
return predictions
# 创建预测器实例
predictor = HousingPricePredictor(
model_path='models/best_model.pkl',
scaler_path='models/scaler.pkl'
)
# 测试预测功能
test_features = X_test.iloc[0].values.tolist()
prediction = predictor.predict(test_features)
print(f"预测房价: ${prediction:.2f}")
8. 生产环境部署方案
8.1 Flask Web API部署
from flask import Flask, request, jsonify
import numpy as np
app = Flask(__name__)
# 初始化预测器
predictor = HousingPricePredictor(
model_path='models/best_model.pkl',
scaler_path='models/scaler.pkl'
)
@app.route('/predict', methods=['POST'])
def predict():
try:
# 获取请求数据
data = request.get_json()
# 验证输入
required_fields = ['features']
for field in required_fields:
if field not in data:
return jsonify({'error': f'Missing required field: {field}'}), 400
features = data['features']
# 进行预测
prediction = predictor.predict(features)
return jsonify({
'prediction': float(prediction),
'status': 'success'
})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/health', methods=['GET'])
def health_check():
return jsonify({'status': 'healthy'})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
8.2 Docker容器化部署
# Dockerfile
FROM python:3.8-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 5000
CMD ["python", "app.py"]
# requirements.txt
flask==2.0.1
numpy==1.21.0
pandas==1.3.0
scikit-learn==1.0.0
tensorflow==2.8.0
torch==1.9.0
joblib==1.1.0
gunicorn==20.1.0
8.3 Kubernetes部署配置
# deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: housing-price-predictor
spec:
replicas: 3
selector:
matchLabels:
app: housing-price-predictor
template:
metadata:
labels:
app: housing-price-predictor
spec:
containers:
- name: predictor
image: housing-price-predictor:latest
ports:
- containerPort: 5000
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
---
apiVersion: v1
kind: Service
metadata:
name: housing-price-predictor-service
spec:
selector:
app: housing-price-predictor
ports:
- port: 80
targetPort: 5000
type: LoadBalancer
9. 性能监控与模型更新
9.1 模型性能监控
import logging
from datetime import datetime
# 设置日志记录
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('model_performance.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class ModelMonitor:
def __init__(self):
self.predictions = []
self.performance_history = []
def log_prediction(self, features, prediction, actual=None):
"""记录预测结果"""
record = {
'timestamp': datetime.now().isoformat(),
'features': features,
'prediction': prediction,
'actual': actual
}
self.predictions.append(record)
logger.info(f"Prediction recorded: {prediction}")
def calculate_metrics(self, predictions, actuals):
"""计算性能指标"""
mse = mean_squared_error(actuals, predictions)
rmse = np.sqrt(mse)
mae = mean_absolute_error(actuals, predictions)
r2 = r2_score(actuals, predictions)
metrics = {
'rmse': rmse,
'mae': mae,
'r2': r2,
'timestamp': datetime.now().isoformat()
}
self.performance_history.append(metrics)
logger.info(f"Performance metrics: RMSE={rmse:.4f}, R²={r2:.4f}")
return metrics
# 使用监控器
monitor = ModelMonitor()
9.2 模型版本管理
import shutil
import os
from
评论 (0)