引言
在当今的数据驱动时代,机器学习技术正在成为企业决策和产品开发的核心工具。Python作为机器学习领域最主流的编程语言,凭借其丰富的生态库和易用性,成为了开发者构建AI应用的首选。本文将通过一个完整的机器学习项目实战,从数据预处理到模型部署的全流程进行详细讲解,帮助读者掌握机器学习项目的完整技术栈和实践方法。
项目概述
我们将以一个经典的房价预测问题为例,构建一个完整的机器学习项目。该项目将涵盖数据获取、数据清洗、特征工程、模型训练、评估验证以及生产环境部署等所有关键环节。通过这个实战项目,读者可以深入了解机器学习项目的完整开发流程,并掌握相关的最佳实践。
项目目标
- 预测房价,构建一个准确的回归模型
- 掌握机器学习项目开发的完整流程
- 学习数据科学项目中的关键技术和工具
环境准备与依赖安装
在开始项目之前,我们需要搭建合适的开发环境。以下是项目所需的Python库和版本信息:
# 创建虚拟环境
python -m venv ml_project_env
source ml_project_env/bin/activate # Linux/Mac
# 或 ml_project_env\Scripts\activate # Windows
# 安装必要的库
pip install pandas numpy scikit-learn matplotlib seaborn jupyter
pip install xgboost lightgbm catboost
pip install flask gunicorn
pip install joblib
# 验证安装的库版本
import pandas as pd
import numpy as np
import sklearn
import matplotlib.pyplot as plt
import seaborn as sns
print(f"pandas version: {pd.__version__}")
print(f"numpy version: {np.__version__}")
print(f"scikit-learn version: {sklearn.__version__}")
数据获取与探索性数据分析
1. 数据集介绍
我们将使用著名的波士顿房价数据集(Boston Housing Dataset)作为示例。这是一个经典的回归问题数据集,包含506个样本和13个特征。
# 导入必要的库
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.datasets import load_boston
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error, r2_score
import warnings
warnings.filterwarnings('ignore')
# 设置图形显示样式
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
# 加载数据集
boston = load_boston()
X = pd.DataFrame(boston.data, columns=boston.feature_names)
y = pd.Series(boston.target, name='PRICE')
# 创建完整的数据框
df = pd.concat([X, y], axis=1)
print("数据集基本信息:")
print(f"数据形状: {df.shape}")
print(f"列名: {list(df.columns)}")
print("\n前5行数据:")
print(df.head())
2. 数据基本统计信息
# 基本统计信息
print("数据集描述性统计:")
print(df.describe())
# 检查缺失值
print("\n缺失值检查:")
print(df.isnull().sum())
# 检查数据类型
print("\n数据类型:")
print(df.dtypes)
3. 数据可视化分析
# 目标变量分布
plt.figure(figsize=(12, 4))
plt.subplot(1, 2, 1)
plt.hist(df['PRICE'], bins=30, edgecolor='black')
plt.title('房价分布')
plt.xlabel('房价 (千美元)')
plt.ylabel('频次')
plt.subplot(1, 2, 2)
plt.boxplot(df['PRICE'])
plt.title('房价箱线图')
plt.ylabel('房价 (千美元)')
plt.tight_layout()
plt.show()
# 特征相关性分析
plt.figure(figsize=(12, 10))
correlation_matrix = df.corr()
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0,
square=True, linewidths=0.5)
plt.title('特征相关性热力图')
plt.show()
# 与目标变量相关性最高的特征
price_corr = correlation_matrix['PRICE'].abs().sort_values(ascending=False)
print("与房价相关性排序:")
print(price_corr)
数据预处理
1. 数据清洗
# 检查异常值
def detect_outliers(df, columns):
"""检测异常值"""
outliers = {}
for col in columns:
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
outliers[col] = df[(df[col] < lower_bound) | (df[col] > upper_bound)]
return outliers
# 检查数值型特征的异常值
numeric_columns = df.select_dtypes(include=[np.number]).columns.tolist()
outliers = detect_outliers(df, numeric_columns)
print("各特征异常值数量:")
for col, outlier_data in outliers.items():
if len(outlier_data) > 0:
print(f"{col}: {len(outlier_data)}个异常值")
# 处理缺失值
print(f"\n缺失值总数: {df.isnull().sum().sum()}")
# 波士顿数据集没有缺失值,但实际项目中需要处理
2. 特征工程
# 创建新特征
def create_features(df):
"""创建新的特征"""
df_new = df.copy()
# 房间数量相关特征
df_new['RM_LSTAT_RATIO'] = df_new['RM'] / (df_new['LSTAT'] + 1e-8)
df_new['RM_AGE_RATIO'] = df_new['RM'] / (df_new['AGE'] + 1e-8)
# 房价密度特征
df_new['PRICE_PER_ROOM'] = df_new['PRICE'] / df_new['RM']
# 环境特征组合
df_new['DIS_LSTAT'] = df_new['DIS'] * df_new['LSTAT']
df_new['NOX_DIS'] = df_new['NOX'] * df_new['DIS']
return df_new
# 应用特征工程
df_processed = create_features(df)
print("处理后的数据形状:", df_processed.shape)
print("新增特征列:", [col for col in df_processed.columns if col not in df.columns])
3. 数据标准化
# 分离特征和目标变量
X = df_processed.drop('PRICE', axis=1)
y = df_processed['PRICE']
# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
print(f"训练集大小: {X_train.shape}")
print(f"测试集大小: {X_test.shape}")
# 特征标准化
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# 转换回DataFrame便于后续操作
X_train_scaled_df = pd.DataFrame(X_train_scaled, columns=X.columns)
X_test_scaled_df = pd.DataFrame(X_test_scaled, columns=X.columns)
print("标准化后的数据统计:")
print(X_train_scaled_df.describe())
模型选择与训练
1. 多模型对比
# 定义多个模型
models = {
'Linear Regression': LinearRegression(),
'Random Forest': RandomForestRegressor(n_estimators=100, random_state=42),
}
# 训练和评估模型
model_results = {}
for name, model in models.items():
print(f"\n训练 {name} 模型...")
# 训练模型
if name == 'Linear Regression':
model.fit(X_train_scaled, y_train)
y_pred = model.predict(X_test_scaled)
else:
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
# 计算评估指标
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
r2 = r2_score(y_test, y_pred)
model_results[name] = {
'MSE': mse,
'RMSE': rmse,
'R2': r2,
'model': model
}
print(f"MSE: {mse:.4f}")
print(f"RMSE: {rmse:.4f}")
print(f"R²: {r2:.4f}")
# 显示模型对比结果
results_df = pd.DataFrame(model_results).T
print("\n模型性能对比:")
print(results_df[['RMSE', 'R2']])
2. 高级模型训练
from sklearn.ensemble import GradientBoostingRegressor
from xgboost import XGBRegressor
from lightgbm import LGBMRegressor
# 使用集成学习方法
advanced_models = {
'Gradient Boosting': GradientBoostingRegressor(n_estimators=100, random_state=42),
'XGBoost': XGBRegressor(random_state=42),
'LightGBM': LGBMRegressor(random_state=42)
}
# 训练高级模型
advanced_results = {}
for name, model in advanced_models.items():
print(f"\n训练 {name} 模型...")
if name in ['XGBoost', 'LightGBM']:
# 这些模型不需要标准化
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
else:
model.fit(X_train_scaled, y_train)
y_pred = model.predict(X_test_scaled)
# 计算评估指标
mse = mean_squared_error(y_test, y_pred)
rmse = np.sqrt(mse)
r2 = r2_score(y_test, y_pred)
advanced_results[name] = {
'MSE': mse,
'RMSE': rmse,
'R2': r2,
'model': model
}
print(f"MSE: {mse:.4f}")
print(f"RMSE: {rmse:.4f}")
print(f"R²: {r2:.4f}")
# 合并结果
all_results = {**model_results, **advanced_results}
best_model_name = min(advanced_results.keys(), key=lambda x: advanced_results[x]['RMSE'])
print(f"\n最佳模型: {best_model_name}")
模型评估与优化
1. 交叉验证
from sklearn.model_selection import cross_val_score, GridSearchCV
# 使用交叉验证评估模型稳定性
def evaluate_with_cv(model, X, y, cv=5):
"""使用交叉验证评估模型"""
cv_scores = cross_val_score(model, X, y, cv=cv, scoring='neg_mean_squared_error')
rmse_scores = np.sqrt(-cv_scores)
print(f"交叉验证 RMSE: {rmse_scores}")
print(f"平均 RMSE: {rmse_scores.mean():.4f} (+/- {rmse_scores.std() * 2:.4f})")
return rmse_scores
# 对最佳模型进行交叉验证
best_model = advanced_results[best_model_name]['model']
if best_model_name in ['XGBoost', 'LightGBM']:
cv_scores = evaluate_with_cv(best_model, X_train, y_train)
else:
cv_scores = evaluate_with_cv(best_model, X_train_scaled, y_train)
2. 超参数调优
# 为XGBoost进行超参数调优
def optimize_xgboost(X_train, y_train):
"""优化XGBoost模型参数"""
# 定义参数网格
param_grid = {
'n_estimators': [50, 100, 200],
'max_depth': [3, 5, 7, 9],
'learning_rate': [0.01, 0.1, 0.2],
'subsample': [0.8, 1.0]
}
# 创建网格搜索对象
xgb = XGBRegressor(random_state=42)
grid_search = GridSearchCV(
estimator=xgb,
param_grid=param_grid,
cv=3,
scoring='neg_mean_squared_error',
n_jobs=-1,
verbose=1
)
# 执行网格搜索
print("开始超参数调优...")
grid_search.fit(X_train, y_train)
print(f"最佳参数: {grid_search.best_params_}")
print(f"最佳交叉验证分数: {-grid_search.best_score_:.4f}")
return grid_search.best_estimator_
# 如果使用XGBoost进行优化
if best_model_name == 'XGBoost':
optimized_model = optimize_xgboost(X_train, y_train)
else:
optimized_model = advanced_results[best_model_name]['model']
3. 模型性能可视化
# 预测结果可视化
def plot_predictions(y_true, y_pred, model_name):
"""绘制预测结果对比图"""
plt.figure(figsize=(12, 5))
# 预测值 vs 真实值
plt.subplot(1, 2, 1)
plt.scatter(y_true, y_pred, alpha=0.6)
plt.plot([y_true.min(), y_true.max()], [y_true.min(), y_true.max()], 'r--', lw=2)
plt.xlabel('真实值')
plt.ylabel('预测值')
plt.title(f'{model_name} - 预测值 vs 真实值')
# 残差图
plt.subplot(1, 2, 2)
residuals = y_true - y_pred
plt.scatter(y_pred, residuals, alpha=0.6)
plt.axhline(y=0, color='r', linestyle='--')
plt.xlabel('预测值')
plt.ylabel('残差')
plt.title(f'{model_name} - 残差图')
plt.tight_layout()
plt.show()
# 绘制最佳模型的预测结果
if best_model_name in ['XGBoost', 'LightGBM']:
y_pred_best = optimized_model.predict(X_test)
else:
if best_model_name == 'Linear Regression':
y_pred_best = best_model.predict(X_test_scaled)
else:
y_pred_best = best_model.predict(X_test)
plot_predictions(y_test, y_pred_best, best_model_name)
模型部署准备
1. 模型保存与加载
import joblib
# 保存最佳模型和预处理器
def save_model(model, scaler, model_name):
"""保存模型和预处理器"""
# 保存模型
joblib.dump(model, f'{model_name}_model.pkl')
# 如果有标准化器,也保存它
if scaler is not None:
joblib.dump(scaler, f'{model_name}_scaler.pkl')
print(f"模型已保存为 {model_name}_model.pkl")
if scaler is not None:
print(f"标准化器已保存为 {model_name}_scaler.pkl")
# 保存最佳模型
if best_model_name in ['XGBoost', 'LightGBM']:
save_model(optimized_model, None, best_model_name)
else:
save_model(best_model, scaler, best_model_name)
2. 特征重要性分析
# 分析特征重要性
def analyze_feature_importance(model, feature_names, model_name):
"""分析特征重要性"""
if hasattr(model, 'feature_importances_'):
# 对于树模型
importances = model.feature_importances_
indices = np.argsort(importances)[::-1]
plt.figure(figsize=(10, 6))
plt.title(f'{model_name} - 特征重要性')
plt.bar(range(len(importances)), importances[indices])
plt.xticks(range(len(importances)), [feature_names[i] for i in indices], rotation=45)
plt.tight_layout()
plt.show()
# 显示前10个重要特征
print("前10个重要特征:")
for i in range(min(10, len(indices))):
print(f"{i+1}. {feature_names[indices[i]]}: {importances[indices[i]]:.4f}")
elif hasattr(model, 'coef_'):
# 对于线性模型
coefficients = np.abs(model.coef_)
indices = np.argsort(coefficients)[::-1]
plt.figure(figsize=(10, 6))
plt.title(f'{model_name} - 特征系数绝对值')
plt.bar(range(len(coefficients)), coefficients[indices])
plt.xticks(range(len(coefficients)), [feature_names[i] for i in indices], rotation=45)
plt.tight_layout()
plt.show()
# 分析最佳模型的特征重要性
if best_model_name in ['XGBoost', 'LightGBM']:
analyze_feature_importance(optimized_model, X.columns, best_model_name)
else:
analyze_feature_importance(best_model, X.columns, best_model_name)
Flask Web应用部署
1. 创建Flask应用
from flask import Flask, request, jsonify, render_template_string
import numpy as np
import joblib
import json
# 创建Flask应用
app = Flask(__name__)
# 加载模型和预处理器
def load_model_and_scaler(model_name):
"""加载训练好的模型"""
try:
model = joblib.load(f'{model_name}_model.pkl')
# 尝试加载标准化器(如果存在)
try:
scaler = joblib.load(f'{model_name}_scaler.pkl')
except:
scaler = None
return model, scaler
except Exception as e:
print(f"加载模型失败: {e}")
return None, None
# 加载最佳模型
best_model, best_scaler = load_model_and_scaler(best_model_name)
@app.route('/')
def home():
"""主页"""
html_template = """
<!DOCTYPE html>
<html>
<head>
<title>房价预测系统</title>
<style>
body { font-family: Arial, sans-serif; margin: 40px; }
.container { max-width: 800px; margin: 0 auto; }
input, select { width: 100%; padding: 10px; margin: 5px 0; }
button { background-color: #4CAF50; color: white; padding: 12px 20px; border: none; cursor: pointer; }
button:hover { background-color: #45a049; }
.result { margin-top: 20px; padding: 15px; background-color: #f0f0f0; }
</style>
</head>
<body>
<div class="container">
<h1>房价预测系统</h1>
<form id="predictionForm">
<!-- 这里可以添加具体的输入字段 -->
<button type="submit">预测房价</button>
</form>
<div id="result" class="result" style="display: none;"></div>
</div>
<script>
document.getElementById('predictionForm').addEventListener('submit', function(e) {
e.preventDefault();
// 这里添加实际的预测逻辑
alert('请在实际应用中实现预测逻辑');
});
</script>
</body>
</html>
"""
return render_template_string(html_template)
@app.route('/predict', methods=['POST'])
def predict():
"""预测接口"""
try:
# 获取输入数据
data = request.get_json()
# 预处理输入数据
input_data = np.array(list(data.values())).reshape(1, -1)
# 进行预测
if best_scaler is not None and best_model_name == 'Linear Regression':
input_data_scaled = best_scaler.transform(input_data)
prediction = best_model.predict(input_data_scaled)[0]
else:
prediction = best_model.predict(input_data)[0]
return jsonify({
'success': True,
'prediction': float(prediction),
'message': '预测成功'
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
})
@app.route('/health')
def health():
"""健康检查接口"""
return jsonify({'status': 'healthy'})
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0', port=5000)
2. 完整的预测API实现
# 创建更完整的预测API
import pandas as pd
class HousePricePredictor:
"""房价预测器类"""
def __init__(self, model_name):
self.model_name = model_name
self.model, self.scaler = load_model_and_scaler(model_name)
self.feature_names = X.columns.tolist()
def predict(self, features_dict):
"""根据特征字典进行预测"""
try:
# 转换为DataFrame
input_df = pd.DataFrame([features_dict])
# 确保所有特征都存在
for feature in self.feature_names:
if feature not in input_df.columns:
input_df[feature] = 0
# 重新排列列顺序
input_data = input_df[self.feature_names]
# 进行预测
if self.scaler is not None and self.model_name == 'Linear Regression':
input_data_scaled = self.scaler.transform(input_data)
prediction = self.model.predict(input_data_scaled)[0]
else:
prediction = self.model.predict(input_data)[0]
return {
'prediction': float(prediction),
'success': True
}
except Exception as e:
return {
'error': str(e),
'success': False
}
# 创建预测器实例
predictor = HousePricePredictor(best_model_name)
# 测试预测功能
test_features = {
'CRIM': 0.1,
'ZN': 20.0,
'INDUS': 7.0,
'CHAS': 0.0,
'NOX': 0.5,
'RM': 6.5,
'AGE': 40.0,
'DIS': 5.0,
'RAD': 4.0,
'TAX': 300.0,
'PTRATIO': 15.0,
'B': 390.0,
'LSTAT': 10.0
}
# 测试预测
result = predictor.predict(test_features)
print("预测结果:", result)
Docker容器化部署
1. 创建Dockerfile
FROM python:3.9-slim
# 设置工作目录
WORKDIR /app
# 复制依赖文件
COPY requirements.txt .
# 安装依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 暴露端口
EXPOSE 5000
# 启动命令
CMD ["gunicorn", "--bind", "0.0.0.0:5000", "app:app"]
2. 创建requirements.txt
Flask==2.3.3
scikit-learn==1.3.0
numpy==1.24.3
pandas==2.0.3
joblib==1.3.2
gunicorn==21.2.0
xgboost==1.7.5
lightgbm==3.3.5
3. 构建和运行Docker容器
# 构建Docker镜像
docker build -t house-price-predictor .
# 运行容器
docker run -p 5000:5000 house-price-predictor
# 在后台运行
docker run -d -p 5000:5000 house-price-predictor
性能监控与日志记录
1. 添加日志功能
import logging
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('model_prediction.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class PredictiveModel:
"""增强版预测模型类"""
def __init__(self, model_name):
self.model_name = model_name
self.model, self.scaler = load_model_and_scaler(model_name)
self.feature_names = X.columns.tolist()
self.logger = logger
def predict(self, features_dict):
"""带日志记录的预测方法"""
try:
start_time = datetime.now()
# 记录输入数据
self.logger.info(f"开始预测,输入特征: {features_dict}")
# 预处理和预测
input_df = pd.DataFrame([features_dict])
for feature in self.feature_names:
if feature not in input_df.columns:
input_df[feature] = 0
input_data = input_df[self.feature_names]
if self.scaler is not None and self.model_name == 'Linear Regression':
input_data_scaled = self.scaler.transform(input_data)
prediction = self.model.predict(input_data_scaled)[0]
else:
prediction = self.model.predict(input_data)[0]
end_time = datetime.now()
execution_time = (end_time - start_time).total_seconds()
# 记录预测结果
self.logger.info(f"预测完成,结果: {prediction}, 耗时: {execution_time:.4f}秒")
return {
'prediction': float(prediction),
'success': True,
'execution_time': execution_time
}
except Exception as e:
self.logger.error(f"预测失败: {str(e)}")
return {
'error': str(e),
'success': False
}
# 使用增强版模型
enhanced_predictor = PredictiveModel(best_model_name)
项目总结与最佳实践
1. 关键技术点回顾
通过这个完整的机器学习项目,我们掌握了以下关键技术点:
# 项目关键步骤总结
project_steps = [
"数据获取与探索性分析",
"数据清洗与预处理",
"特征工程与选择",
"模型训练与比较",
"超参数优化",
"模型评估与验证",
"模型保存与部署",
"容器化部署",
"监控与日志记录"
]
print("项目完整流程:")
for i, step
评论 (0)