基于Transformer的AI模型在电商推荐系统中的应用与优化实践

DryFire
DryFire 2026-02-03T04:14:05+08:00
0 0 1

引言

随着电子商务行业的快速发展,个性化推荐已成为提升用户体验、增加转化率的核心技术手段。传统的推荐算法如协同过滤、矩阵分解等虽然在一定程度上满足了需求,但在处理大规模稀疏数据、捕捉用户复杂行为模式方面存在明显局限性。

近年来,Transformer架构凭借其强大的序列建模能力和并行计算优势,在自然语言处理领域取得了突破性进展。将这一前沿技术引入电商推荐系统,为解决复杂的用户-商品交互问题提供了新的思路和方法。本文将深入探讨基于Transformer的AI模型在电商推荐场景中的实际应用,分享从模型设计、训练优化到实时推理部署的完整技术实践。

Transformer架构基础理论

1.1 Transformer核心机制

Transformer模型的核心创新在于自注意力机制(Self-Attention),它能够捕捉序列中任意两个位置之间的依赖关系,而无需像RNN那样按顺序处理。这种并行化特性使得模型训练效率大幅提升。

import torch
import torch.nn as nn
import math

class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads
        
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)
        
    def forward(self, Q, K, V, mask=None):
        batch_size = Q.size(0)
        
        # 线性变换
        Q = self.W_q(Q)  # [batch_size, seq_len, d_model]
        K = self.W_k(K)
        V = self.W_v(V)
        
        # 分割为多头
        Q = Q.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        K = K.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        V = V.view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        
        # 计算注意力分数
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
            
        attention = torch.softmax(scores, dim=-1)
        
        # 加权求和
        out = torch.matmul(attention, V)
        out = out.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)
        
        return self.W_o(out)

1.2 在推荐系统中的适配

在电商推荐场景中,传统的Transformer需要进行特殊适配以处理用户行为序列的特性。用户的历史浏览、点击、购买等行为可以看作是一个时间序列,Transformer能够有效建模这些序列中的复杂依赖关系。

电商推荐场景下的模型设计

2.1 数据特征工程

推荐系统的成功很大程度上依赖于高质量的特征工程。在基于Transformer的推荐系统中,我们需要构建多维度的用户和商品特征:

import pandas as pd
import numpy as np
from sklearn.preprocessing import LabelEncoder

class FeatureEngineer:
    def __init__(self):
        self.user_encoder = LabelEncoder()
        self.item_encoder = LabelEncoder()
        self.category_encoder = LabelEncoder()
        
    def create_user_features(self, user_data):
        """创建用户特征"""
        features = {}
        
        # 用户基础信息
        features['user_id'] = self.user_encoder.fit_transform(user_data['user_id'])
        features['age_group'] = pd.cut(user_data['age'], 
                                     bins=[0, 18, 25, 35, 45, 100], 
                                     labels=['teen', 'young', 'adult', 'middle', 'senior'])
        
        # 用户行为统计
        features['total_clicks'] = user_data['click_count']
        features['total_purchases'] = user_data['purchase_count']
        features['avg_session_duration'] = user_data['session_duration']
        
        return pd.DataFrame(features)
    
    def create_item_features(self, item_data):
        """创建商品特征"""
        features = {}
        
        # 商品基础信息
        features['item_id'] = self.item_encoder.fit_transform(item_data['item_id'])
        features['category_id'] = self.category_encoder.fit_transform(item_data['category'])
        features['price_bucket'] = pd.cut(item_data['price'], 
                                        bins=[0, 50, 100, 200, 500, 1000, float('inf')], 
                                        labels=['low', 'medium_low', 'medium', 'medium_high', 'high', 'very_high'])
        
        # 商品热度特征
        features['popularity_score'] = item_data['view_count'] / (item_data['total_views'] + 1)
        features['conversion_rate'] = item_data['purchase_count'] / (item_data['view_count'] + 1)
        
        return pd.DataFrame(features)

# 使用示例
fe = FeatureEngineer()
user_features = fe.create_user_features(user_df)
item_features = fe.create_item_features(item_df)

2.2 用户行为序列建模

在电商场景中,用户的行为序列是推荐系统的重要输入。我们需要将用户的浏览、点击、购买等行为转换为适合Transformer处理的序列格式:

class UserBehaviorSequence:
    def __init__(self, max_seq_length=100):
        self.max_seq_length = max_seq_length
        self.user_id_encoder = LabelEncoder()
        self.item_id_encoder = LabelEncoder()
        
    def build_sequences(self, interactions_df):
        """构建用户行为序列"""
        # 按用户分组并排序
        user_sequences = interactions_df.groupby('user_id').apply(
            lambda x: x.sort_values('timestamp')['item_id'].tolist()
        ).reset_index(name='sequences')
        
        # 转换为编码序列
        encoded_sequences = []
        for _, row in user_sequences.iterrows():
            user_id = row['user_id']
            sequence = row['sequences']
            
            # 编码商品ID
            encoded_seq = self.item_id_encoder.fit_transform(sequence)
            
            # 截断或填充序列
            if len(encoded_seq) > self.max_seq_length:
                encoded_seq = encoded_seq[-self.max_seq_length:]
            else:
                encoded_seq = np.pad(encoded_seq, 
                                   (0, self.max_seq_length - len(encoded_seq)), 
                                   mode='constant', constant_values=0)
            
            encoded_sequences.append({
                'user_id': user_id,
                'sequence': encoded_seq
            })
        
        return encoded_sequences

# 构建序列数据
behavior_seq = UserBehaviorSequence(max_seq_length=50)
sequences = behavior_seq.build_sequences(interactions_df)

2.3 Transformer推荐模型架构

基于上述特征工程,我们设计了一个完整的Transformer推荐模型:

import torch.nn.functional as F

class TransformerRecommendationModel(nn.Module):
    def __init__(self, vocab_size, d_model=256, num_heads=8, num_layers=4, 
                 max_seq_length=100, dropout=0.1, num_items=None):
        super(TransformerRecommendationModel, self).__init__()
        
        self.d_model = d_model
        self.max_seq_length = max_seq_length
        
        # 嵌入层
        self.item_embedding = nn.Embedding(vocab_size, d_model)
        self.position_embedding = nn.Embedding(max_seq_length, d_model)
        
        # 用户特征嵌入
        if num_items:
            self.user_feature_embedding = nn.Linear(num_items, d_model)
        
        # Transformer编码器层
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=num_heads,
            dropout=dropout,
            batch_first=True
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers)
        
        # 输出层
        self.output_projection = nn.Linear(d_model, vocab_size)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, item_seq, user_features=None, mask=None):
        batch_size, seq_len = item_seq.size()
        
        # 嵌入编码
        item_emb = self.item_embedding(item_seq) * math.sqrt(self.d_model)
        
        # 位置编码
        positions = torch.arange(seq_len, device=item_seq.device).unsqueeze(0)
        pos_emb = self.position_embedding(positions)
        item_emb += pos_emb
        
        # 用户特征融合
        if user_features is not None:
            user_emb = self.user_feature_embedding(user_features)
            # 可以通过注意力机制或简单的拼接来融合用户特征
            item_emb = item_emb + user_emb.unsqueeze(1)  # 扩展维度后相加
            
        # Transformer前向传播
        if mask is not None:
            transformer_out = self.transformer_encoder(item_emb, src_key_padding_mask=mask)
        else:
            transformer_out = self.transformer_encoder(item_emb)
            
        # 输出预测
        output = self.output_projection(transformer_out)
        
        return output

# 模型实例化
model = TransformerRecommendationModel(
    vocab_size=vocab_size,
    d_model=256,
    num_heads=8,
    num_layers=4,
    max_seq_length=100,
    dropout=0.1
)

模型训练与优化

3.1 损失函数设计

在推荐系统中,常用的损失函数包括交叉熵损失、负采样损失等。针对Transformer模型,我们采用改进的损失函数来更好地适应推荐场景:

class RecommendationLoss(nn.Module):
    def __init__(self, margin=0.5, alpha=1.0):
        super(RecommendationLoss, self).__init__()
        self.margin = margin
        self.alpha = alpha
        
    def forward(self, predictions, targets, user_features=None):
        """
        predictions: [batch_size, seq_len, vocab_size]
        targets: [batch_size, seq_len]
        """
        # 交叉熵损失
        ce_loss = F.cross_entropy(
            predictions.view(-1, predictions.size(-1)), 
            targets.view(-1), 
            reduction='none'
        )
        
        # 可以添加额外的正则化项
        if user_features is not None:
            # 用户特征一致性损失
            consistency_loss = self.calculate_consistency_loss(predictions, user_features)
            total_loss = ce_loss.mean() + self.alpha * consistency_loss
        else:
            total_loss = ce_loss.mean()
            
        return total_loss
    
    def calculate_consistency_loss(self, predictions, user_features):
        # 简化的用户特征一致性损失计算
        # 可以根据具体业务需求进行调整
        return torch.tensor(0.0, device=predictions.device)

# 损失函数实例化
criterion = RecommendationLoss(margin=0.5, alpha=1.0)

3.2 训练策略优化

class ModelTrainer:
    def __init__(self, model, criterion, optimizer, device):
        self.model = model
        self.criterion = criterion
        self.optimizer = optimizer
        self.device = device
        
    def train_epoch(self, dataloader, epoch):
        self.model.train()
        total_loss = 0
        
        for batch_idx, (item_seq, user_features, targets) in enumerate(dataloader):
            item_seq = item_seq.to(self.device)
            user_features = user_features.to(self.device)
            targets = targets.to(self.device)
            
            # 构建mask
            mask = (item_seq != 0)  # 假设0是padding值
            
            self.optimizer.zero_grad()
            
            # 前向传播
            outputs = self.model(item_seq, user_features, mask)
            
            # 计算损失
            loss = self.criterion(outputs, targets, user_features)
            
            # 反向传播
            loss.backward()
            
            # 梯度裁剪
            torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
            
            self.optimizer.step()
            
            total_loss += loss.item()
            
            if batch_idx % 100 == 0:
                print(f'Epoch: {epoch}, Batch: {batch_idx}, Loss: {loss.item():.6f}')
                
        return total_loss / len(dataloader)
    
    def validate(self, dataloader):
        self.model.eval()
        total_loss = 0
        
        with torch.no_grad():
            for item_seq, user_features, targets in dataloader:
                item_seq = item_seq.to(self.device)
                user_features = user_features.to(self.device)
                targets = targets.to(self.device)
                
                mask = (item_seq != 0)
                outputs = self.model(item_seq, user_features, mask)
                loss = self.criterion(outputs, targets, user_features)
                
                total_loss += loss.item()
                
        return total_loss / len(dataloader)

# 训练配置
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-5)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=10)

trainer = ModelTrainer(model, criterion, optimizer, device)

3.3 学习率调度与早停机制

def train_with_early_stopping(model, train_loader, val_loader, num_epochs=50, patience=5):
    best_val_loss = float('inf')
    patience_counter = 0
    
    for epoch in range(num_epochs):
        # 训练
        train_loss = trainer.train_epoch(train_loader, epoch)
        
        # 验证
        val_loss = trainer.validate(val_loader)
        
        print(f'Epoch {epoch+1}/{num_epochs}: Train Loss: {train_loss:.6f}, Val Loss: {val_loss:.6f}')
        
        # 学习率调度
        scheduler.step()
        
        # 早停机制
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0
            # 保存最佳模型
            torch.save(model.state_dict(), 'best_model.pth')
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print(f'Early stopping at epoch {epoch+1}')
                break
                
    return best_val_loss

# 开始训练
best_loss = train_with_early_stopping(model, train_loader, val_loader, num_epochs=50)

实时推理优化

4.1 模型压缩与量化

为了满足电商推荐系统对实时性的要求,我们需要对模型进行压缩和优化:

import torch.quantization as quantization

class QuantizedTransformerModel(nn.Module):
    def __init__(self, model):
        super(QuantizedTransformerModel, self).__init__()
        self.model = model
        
    def forward(self, item_seq, user_features=None, mask=None):
        # 量化推理
        with torch.no_grad():
            return self.model(item_seq, user_features, mask)
    
    def quantize_model(self):
        """模型量化"""
        # 配置量化
        model.eval()
        model.qconfig = quantization.get_default_qconfig('fbgemm')
        
        # 准备量化
        quantized_model = quantization.prepare(model, inplace=False)
        
        # 执行量化
        quantized_model = quantization.convert(quantized_model, inplace=True)
        
        return quantized_model

# 模型量化示例
quantized_model = QuantizedTransformerModel(model)
quantized_model.quantize_model()

4.2 推理加速优化

class FastInferenceModel:
    def __init__(self, model_path, device='cpu'):
        self.device = torch.device(device)
        self.model = self.load_model(model_path)
        self.model.eval()
        
    def load_model(self, model_path):
        """加载优化后的模型"""
        model = TransformerRecommendationModel(vocab_size=10000, d_model=256)
        model.load_state_dict(torch.load(model_path, map_location=self.device))
        return model
    
    def predict_next_item(self, user_sequence, user_features=None, top_k=10):
        """预测下一个最可能的商品"""
        with torch.no_grad():
            # 转换输入
            sequence_tensor = torch.tensor([user_sequence], dtype=torch.long).to(self.device)
            
            if user_features is not None:
                user_features_tensor = torch.tensor([user_features], dtype=torch.float32).to(self.device)
            else:
                user_features_tensor = None
                
            # 构建mask
            mask = (sequence_tensor != 0)
            
            # 前向推理
            outputs = self.model(sequence_tensor, user_features_tensor, mask)
            
            # 获取最后一个时间步的预测结果
            last_output = outputs[0, -1, :]
            
            # 计算概率分布
            probabilities = F.softmax(last_output, dim=-1)
            
            # 获取top-k推荐
            top_k_probs, top_k_indices = torch.topk(probabilities, k=top_k, sorted=True)
            
            return [(idx.item(), prob.item()) for idx, prob in zip(top_k_indices, top_k_probs)]
    
    def batch_predict(self, user_sequences, user_features=None, top_k=10):
        """批量预测"""
        predictions = []
        
        for seq, features in zip(user_sequences, user_features):
            pred = self.predict_next_item(seq, features, top_k)
            predictions.append(pred)
            
        return predictions

# 使用示例
inference_model = FastInferenceModel('best_model.pth', device='cuda')
recommendations = inference_model.predict_next_item([1, 2, 3, 4], [0.5, 0.3, 0.2], top_k=5)

4.3 缓存策略优化

import redis
import json
from functools import lru_cache

class RecommendationCache:
    def __init__(self, redis_host='localhost', redis_port=6379, expire_time=3600):
        self.redis_client = redis.Redis(host=redis_host, port=redis_port, decode_responses=True)
        self.expire_time = expire_time
        
    def get_recommendations(self, user_id, context=None):
        """从缓存获取推荐结果"""
        cache_key = f"recommendation:{user_id}"
        
        if context:
            cache_key += f":{hash(str(context))}"
            
        cached_result = self.redis_client.get(cache_key)
        
        if cached_result:
            return json.loads(cached_result)
        else:
            return None
            
    def set_recommendations(self, user_id, recommendations, context=None):
        """设置缓存"""
        cache_key = f"recommendation:{user_id}"
        
        if context:
            cache_key += f":{hash(str(context))}"
            
        self.redis_client.setex(
            cache_key, 
            self.expire_time, 
            json.dumps(recommendations)
        )
        
    def invalidate_user_cache(self, user_id):
        """清除用户缓存"""
        pattern = f"recommendation:{user_id}:*"
        keys = self.redis_client.keys(pattern)
        if keys:
            self.redis_client.delete(*keys)

# 缓存使用示例
cache = RecommendationCache()
user_id = "user_12345"

# 检查缓存
recommendations = cache.get_recommendations(user_id, {"time": "morning"})

if recommendations is None:
    # 从模型获取推荐
    recommendations = inference_model.predict_next_item([1, 2, 3, 4], [0.5, 0.3, 0.2], top_k=10)
    
    # 存储到缓存
    cache.set_recommendations(user_id, recommendations, {"time": "morning"})

性能评估与监控

5.1 推荐效果评估指标

class RecommendationEvaluator:
    def __init__(self):
        pass
        
    def precision_at_k(self, predictions, ground_truth, k=10):
        """计算Precision@K"""
        precision_scores = []
        
        for pred_list, truth_list in zip(predictions, ground_truth):
            # 取前k个预测结果
            top_k_pred = pred_list[:k]
            
            # 计算交集
            relevant_items = set(truth_list)
            predicted_items = set(top_k_pred)
            
            if len(predicted_items) == 0:
                precision_scores.append(0.0)
            else:
                precision = len(relevant_items.intersection(predicted_items)) / len(predicted_items)
                precision_scores.append(precision)
                
        return sum(precision_scores) / len(precision_scores)
    
    def recall_at_k(self, predictions, ground_truth, k=10):
        """计算Recall@K"""
        recall_scores = []
        
        for pred_list, truth_list in zip(predictions, ground_truth):
            top_k_pred = pred_list[:k]
            
            relevant_items = set(truth_list)
            predicted_items = set(top_k_pred)
            
            if len(relevant_items) == 0:
                recall_scores.append(0.0)
            else:
                recall = len(relevant_items.intersection(predicted_items)) / len(relevant_items)
                recall_scores.append(recall)
                
        return sum(recall_scores) / len(recall_scores)
    
    def ndcg_at_k(self, predictions, ground_truth, k=10):
        """计算NDCG@K"""
        ndcg_scores = []
        
        for pred_list, truth_list in zip(predictions, ground_truth):
            top_k_pred = pred_list[:k]
            
            # 构建相关性列表
            relevance_scores = []
            for i, item in enumerate(top_k_pred):
                if item in truth_list:
                    relevance_scores.append(1.0)
                else:
                    relevance_scores.append(0.0)
                    
            # 计算DCG
            dcg = 0.0
            for i, score in enumerate(relevance_scores):
                if score > 0:
                    dcg += score / math.log2(i + 2)
                    
            # 计算IDCG
            idcg = 0.0
            ideal_relevance = min(len(truth_list), k)
            for i in range(ideal_relevance):
                idcg += 1.0 / math.log2(i + 2)
                
            if idcg > 0:
                ndcg = dcg / idcg
            else:
                ndcg = 0.0
                
            ndcg_scores.append(ndcg)
            
        return sum(ndcg_scores) / len(ndcg_scores)

# 评估示例
evaluator = RecommendationEvaluator()
predictions = [[1, 2, 3, 4, 5], [2, 3, 4, 5, 6]]
ground_truth = [[1, 3, 5], [2, 4, 6]]

precision = evaluator.precision_at_k(predictions, ground_truth, k=5)
recall = evaluator.recall_at_k(predictions, ground_truth, k=5)
ndcg = evaluator.ndcg_at_k(predictions, ground_truth, k=5)

print(f"Precision@5: {precision:.4f}")
print(f"Recall@5: {recall:.4f}")
print(f"NDCG@5: {ndcg:.4f}")

5.2 实时监控系统

import time
import logging
from collections import defaultdict

class RecommendationMonitor:
    def __init__(self):
        self.metrics = defaultdict(list)
        self.logger = logging.getLogger('recommendation_monitor')
        
    def log_prediction(self, user_id, input_seq, prediction, execution_time):
        """记录预测结果"""
        timestamp = time.time()
        
        metric_data = {
            'timestamp': timestamp,
            'user_id': user_id,
            'input_sequence_length': len(input_seq),
            'prediction_time': execution_time,
            'top_prediction': prediction[0] if prediction else None
        }
        
        self.metrics['predictions'].append(metric_data)
        
    def log_performance(self, model_metrics):
        """记录模型性能指标"""
        timestamp = time.time()
        
        performance_data = {
            'timestamp': timestamp,
            'model_accuracy': model_metrics.get('accuracy', 0),
            'model_latency': model_metrics.get('latency', 0),
            'throughput': model_metrics.get('throughput', 0)
        }
        
        self.metrics['performance'].append(performance_data)
        
    def get_metrics_summary(self):
        """获取指标摘要"""
        if not self.metrics['predictions']:
            return {}
            
        predictions = self.metrics['predictions']
        performance = self.metrics['performance']
        
        # 计算平均执行时间
        avg_execution_time = sum(p['prediction_time'] for p in predictions) / len(predictions)
        
        # 计算预测吞吐量
        if performance:
            avg_throughput = sum(p['throughput'] for p in performance) / len(performance)
        else:
            avg_throughput = 0
            
        return {
            'total_predictions': len(predictions),
            'avg_execution_time': avg_execution_time,
            'avg_throughput': avg_throughput
        }

# 监控使用示例
monitor = RecommendationMonitor()

# 模拟预测过程
start_time = time.time()
recommendations = inference_model.predict_next_item([1, 2, 3, 4], [0.5, 0.3, 0.2], top_k=5)
end_time = time.time()

execution_time = end_time - start_time
monitor.log_prediction("user_12345", [1, 2, 3, 4], recommendations, execution_time)

# 获取监控摘要
summary = monitor.get_metrics_summary()
print(f"监控摘要: {summary}")

工程化实践与部署

6.1 模型服务化架构

from flask import Flask, request, jsonify
import torch
import json

class RecommendationService:
    def __init__(self, model_path, device='cpu'):
        self.device = torch.device(device)
        self.model = self.load_model(model_path)
        self.cache = RecommendationCache()
        
    def load_model(self, model_path):
        """加载模型"""
        model = TransformerRecommendationModel(vocab_size=10000, d_model=256)
        model.load_state_dict(torch.load(model_path, map_location=self.device))
        model.eval()
        return model
        
    def predict(self, user_id, sequence, context=None, top_k=10):
        """预测接口"""
        try:
            # 检查缓存
            cached_result = self.cache.get_recommendations(user_id, context)
            if cached_result:
                return cached_result
            
            # 预处理输入
            sequence_tensor = torch.tensor([sequence], dtype=torch.long).to(self.device)
            
            # 构建mask
            mask = (sequence_tensor != 0)
            
            # 推理
            start_time = time.time()
            with torch.no_grad():
                outputs = self.model(sequence_tensor, None, mask)
                
            execution_time = time.time() - start_time
            
            # 处理输出
            last_output = outputs[0, -1, :]
            probabilities = F.softmax(last_output, dim=-1)
           
相关推荐
广告位招租

相似文章

    评论 (0)

    0/2000