ChatGPT API集成实战:从零搭建智能问答系统与对话管理架构

Steve263
Steve263 2026-01-28T03:07:01+08:00
0 0 1

引言

在人工智能技术快速发展的今天,大型语言模型如ChatGPT已经成为了企业智能化转型的重要工具。将ChatGPT API集成到企业应用中,能够显著提升客户服务效率,降低运营成本,创造更好的用户体验。

本文将深入探讨如何从零开始搭建一个完整的智能问答系统,涵盖API调用封装、对话状态管理、用户会话跟踪等核心技术点,并提供完整的项目架构设计和代码实现方案。

一、技术背景与应用场景

1.1 ChatGPT API概述

ChatGPT是OpenAI开发的大型语言模型,通过其API接口,开发者可以将强大的自然语言处理能力集成到自己的应用中。API支持多种功能:

  • 文本生成和回答
  • 对话管理
  • 内容创作
  • 代码生成
  • 多语言支持

1.2 应用场景分析

智能问答系统主要适用于以下场景:

  • 企业客服系统:自动处理常见问题,减少人工客服压力
  • 知识库问答:为员工或客户提供精准的知识查询服务
  • 产品推荐系统:基于用户对话内容提供个性化推荐
  • 教育培训平台:辅助教学和学习过程

二、系统架构设计

2.1 整体架构概览

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   用户前端      │    │   API网关       │    │   应用服务层    │
│                 │    │                 │    │                 │
│  Web/移动应用   │───▶│  身份认证       │───▶│  业务逻辑处理   │
│                 │    │  请求路由       │    │  对话管理       │
└─────────────────┘    │  限流熔断       │    │  状态存储       │
                       │                 │    │  API调用封装   │
                       └─────────────────┘    └─────────────────┘
                                              │                 │
                                              │  数据库         │
                                              │  缓存系统       │
                                              └─────────────────┘

2.2 核心组件设计

2.2.1 API调用封装层

负责与ChatGPT API的交互,包括请求构建、响应处理、错误处理等。

2.2.2 对话管理模块

维护用户对话状态,实现上下文理解、会话跟踪等功能。

2.2.3 用户会话管理

管理用户的会话信息,确保对话的连续性和一致性。

三、API调用封装实现

3.1 基础配置与初始化

import openai
import os
from typing import Dict, Any, Optional
import logging

class ChatGPTClient:
    def __init__(self, api_key: str, model: str = "gpt-3.5-turbo"):
        """
        初始化ChatGPT客户端
        
        Args:
            api_key (str): OpenAI API密钥
            model (str): 使用的模型名称
        """
        self.api_key = api_key
        self.model = model
        openai.api_key = api_key
        self.logger = logging.getLogger(__name__)
        
    def _validate_response(self, response: Dict[str, Any]) -> bool:
        """验证API响应是否有效"""
        return response and 'choices' in response and len(response['choices']) > 0
    
    def _handle_error(self, error: Exception) -> Dict[str, Any]:
        """统一错误处理"""
        self.logger.error(f"API调用失败: {str(error)}")
        return {
            "error": str(error),
            "success": False
        }

3.2 核心API调用方法

class ChatGPTClient:
    # ... 前面的初始化代码
    
    def create_chat_completion(self, 
                             messages: list,
                             temperature: float = 0.7,
                             max_tokens: int = 1000,
                             top_p: float = 1.0,
                             frequency_penalty: float = 0.0,
                             presence_penalty: float = 0.0) -> Dict[str, Any]:
        """
        创建聊天完成
        
        Args:
            messages (list): 消息列表,包含system、user、assistant消息
            temperature (float): 温度参数,控制随机性
            max_tokens (int): 最大生成token数
            top_p (float): Top-p采样参数
            frequency_penalty (float): 频率惩罚
            presence_penalty (float): 存在惩罚
            
        Returns:
            Dict[str, Any]: API响应结果
        """
        try:
            response = openai.ChatCompletion.create(
                model=self.model,
                messages=messages,
                temperature=temperature,
                max_tokens=max_tokens,
                top_p=top_p,
                frequency_penalty=frequency_penalty,
                presence_penalty=presence_penalty
            )
            
            if self._validate_response(response):
                return {
                    "success": True,
                    "response": response,
                    "content": response['choices'][0]['message']['content'].strip()
                }
            else:
                return {
                    "success": False,
                    "error": "Invalid API response"
                }
                
        except openai.error.RateLimitError as e:
            return self._handle_error(f"API调用频率限制: {str(e)}")
        except openai.error.AuthenticationError as e:
            return self._handle_error(f"认证失败: {str(e)}")
        except openai.error.APIConnectionError as e:
            return self._handle_error(f"连接错误: {str(e)}")
        except Exception as e:
            return self._handle_error(f"未知错误: {str(e)}")
    
    def create_completion(self, 
                         prompt: str,
                         temperature: float = 0.7,
                         max_tokens: int = 1000,
                         top_p: float = 1.0,
                         frequency_penalty: float = 0.0,
                         presence_penalty: float = 0.0) -> Dict[str, Any]:
        """
        创建文本补全
        
        Args:
            prompt (str): 输入提示词
            temperature (float): 温度参数
            max_tokens (int): 最大生成token数
            top_p (float): Top-p采样参数
            frequency_penalty (float): 频率惩罚
            presence_penalty (float): 存在惩罚
            
        Returns:
            Dict[str, Any]: API响应结果
        """
        try:
            response = openai.Completion.create(
                model=self.model,
                prompt=prompt,
                temperature=temperature,
                max_tokens=max_tokens,
                top_p=top_p,
                frequency_penalty=frequency_penalty,
                presence_penalty=presence_penalty
            )
            
            if 'choices' in response and len(response['choices']) > 0:
                return {
                    "success": True,
                    "response": response,
                    "content": response['choices'][0]['text'].strip()
                }
            else:
                return {
                    "success": False,
                    "error": "Invalid API response"
                }
                
        except Exception as e:
            return self._handle_error(str(e))

3.3 高级功能封装

class ChatGPTClient:
    # ... 前面的代码
    
    def create_chat_with_context(self, 
                               user_message: str,
                               conversation_history: list = None,
                               system_prompt: str = "You are a helpful assistant.",
                               **kwargs) -> Dict[str, Any]:
        """
        创建带上下文的聊天
        
        Args:
            user_message (str): 用户消息
            conversation_history (list): 对话历史记录
            system_prompt (str): 系统提示词
            **kwargs: 其他API参数
            
        Returns:
            Dict[str, Any]: 响应结果
        """
        # 构建消息列表
        messages = []
        
        # 添加系统提示词
        if system_prompt:
            messages.append({
                "role": "system",
                "content": system_prompt
            })
        
        # 添加历史对话
        if conversation_history:
            messages.extend(conversation_history)
        
        # 添加用户消息
        messages.append({
            "role": "user",
            "content": user_message
        })
        
        return self.create_chat_completion(messages, **kwargs)
    
    def streaming_response(self, 
                          messages: list,
                          stream: bool = True) -> Dict[str, Any]:
        """
        流式响应处理
        
        Args:
            messages (list): 消息列表
            stream (bool): 是否启用流式响应
            
        Returns:
            Dict[str, Any]: 响应结果
        """
        try:
            response = openai.ChatCompletion.create(
                model=self.model,
                messages=messages,
                stream=stream
            )
            
            if stream:
                # 处理流式响应
                content = ""
                for chunk in response:
                    if 'choices' in chunk and len(chunk['choices']) > 0:
                        delta = chunk['choices'][0]['delta']
                        if 'content' in delta:
                            content += delta['content']
                            yield delta['content']
                
                return {
                    "success": True,
                    "content": content
                }
            else:
                # 处理非流式响应
                result = next(response)
                return {
                    "success": True,
                    "response": result,
                    "content": result['choices'][0]['message']['content'].strip()
                }
                
        except Exception as e:
            return self._handle_error(str(e))

四、对话状态管理

4.1 对话状态模型设计

from dataclasses import dataclass, field
from typing import List, Dict, Any
from datetime import datetime
import json

@dataclass
class Message:
    """消息对象"""
    role: str  # 'user', 'assistant', 'system'
    content: str
    timestamp: datetime = field(default_factory=datetime.now)
    
    def to_dict(self):
        return {
            "role": self.role,
            "content": self.content,
            "timestamp": self.timestamp.isoformat()
        }
    
    @classmethod
    def from_dict(cls, data):
        return cls(
            role=data["role"],
            content=data["content"],
            timestamp=datetime.fromisoformat(data["timestamp"])
        )

@dataclass
class Conversation:
    """对话对象"""
    conversation_id: str
    user_id: str
    messages: List[Message] = field(default_factory=list)
    created_at: datetime = field(default_factory=datetime.now)
    updated_at: datetime = field(default_factory=datetime.now)
    is_active: bool = True
    metadata: Dict[str, Any] = field(default_factory=dict)
    
    def add_message(self, message: Message):
        """添加消息"""
        self.messages.append(message)
        self.updated_at = datetime.now()
    
    def get_messages(self, limit: int = None) -> List[Message]:
        """获取消息列表"""
        if limit:
            return self.messages[-limit:]
        return self.messages
    
    def to_dict(self):
        return {
            "conversation_id": self.conversation_id,
            "user_id": self.user_id,
            "messages": [msg.to_dict() for msg in self.messages],
            "created_at": self.created_at.isoformat(),
            "updated_at": self.updated_at.isoformat(),
            "is_active": self.is_active,
            "metadata": self.metadata
        }
    
    @classmethod
    def from_dict(cls, data):
        messages = [Message.from_dict(msg_data) for msg_data in data["messages"]]
        conversation = cls(
            conversation_id=data["conversation_id"],
            user_id=data["user_id"],
            messages=messages,
            created_at=datetime.fromisoformat(data["created_at"]),
            updated_at=datetime.fromisoformat(data["updated_at"]),
            is_active=data["is_active"],
            metadata=data.get("metadata", {})
        )
        return conversation

4.2 对话管理服务实现

import redis
import pickle
from typing import Optional, List
import uuid
import time

class ConversationManager:
    """对话管理服务"""
    
    def __init__(self, redis_host: str = "localhost", redis_port: int = 6379, 
                 redis_db: int = 0, expire_time: int = 3600):
        """
        初始化对话管理器
        
        Args:
            redis_host (str): Redis主机地址
            redis_port (int): Redis端口
            redis_db (int): Redis数据库编号
            expire_time (int): 对话过期时间(秒)
        """
        self.redis_client = redis.Redis(
            host=redis_host,
            port=redis_port,
            db=redis_db,
            decode_responses=False  # 使用二进制存储以提高性能
        )
        self.expire_time = expire_time
        self.logger = logging.getLogger(__name__)
    
    def create_conversation(self, user_id: str, 
                          initial_message: Optional[str] = None,
                          system_prompt: str = "You are a helpful assistant.") -> Conversation:
        """
        创建新的对话
        
        Args:
            user_id (str): 用户ID
            initial_message (str): 初始消息
            system_prompt (str): 系统提示词
            
        Returns:
            Conversation: 新创建的对话对象
        """
        conversation_id = str(uuid.uuid4())
        
        # 构建初始对话
        conversation = Conversation(
            conversation_id=conversation_id,
            user_id=user_id,
            metadata={
                "system_prompt": system_prompt,
                "initial_message": initial_message
            }
        )
        
        # 添加系统消息
        if system_prompt:
            system_msg = Message("system", system_prompt)
            conversation.add_message(system_msg)
        
        # 添加初始用户消息
        if initial_message:
            user_msg = Message("user", initial_message)
            conversation.add_message(user_msg)
        
        # 存储到Redis
        self._save_conversation(conversation)
        
        return conversation
    
    def get_conversation(self, conversation_id: str) -> Optional[Conversation]:
        """
        获取对话
        
        Args:
            conversation_id (str): 对话ID
            
        Returns:
            Conversation: 对话对象,如果不存在返回None
        """
        try:
            data = self.redis_client.get(f"conversation:{conversation_id}")
            if data:
                conversation_data = pickle.loads(data)
                return Conversation.from_dict(conversation_data)
            return None
        except Exception as e:
            self.logger.error(f"获取对话失败: {str(e)}")
            return None
    
    def update_conversation(self, conversation: Conversation) -> bool:
        """
        更新对话
        
        Args:
            conversation (Conversation): 对话对象
            
        Returns:
            bool: 是否更新成功
        """
        try:
            self._save_conversation(conversation)
            return True
        except Exception as e:
            self.logger.error(f"更新对话失败: {str(e)}")
            return False
    
    def _save_conversation(self, conversation: Conversation) -> None:
        """保存对话到Redis"""
        key = f"conversation:{conversation.conversation_id}"
        data = pickle.dumps(conversation.to_dict())
        self.redis_client.setex(key, self.expire_time, data)
    
    def delete_conversation(self, conversation_id: str) -> bool:
        """
        删除对话
        
        Args:
            conversation_id (str): 对话ID
            
        Returns:
            bool: 是否删除成功
        """
        try:
            key = f"conversation:{conversation_id}"
            deleted = self.redis_client.delete(key)
            return deleted > 0
        except Exception as e:
            self.logger.error(f"删除对话失败: {str(e)}")
            return False
    
    def get_user_conversations(self, user_id: str, limit: int = 10) -> List[Conversation]:
        """
        获取用户的所有对话
        
        Args:
            user_id (str): 用户ID
            limit (int): 最大返回数量
            
        Returns:
            List[Conversation]: 对话列表
        """
        try:
            # 这里可以实现更复杂的查询逻辑
            # 为了简化,返回最近创建的对话
            conversations = []
            # 实际实现中需要从数据库或Redis中筛选用户相关的对话
            return conversations[:limit]
        except Exception as e:
            self.logger.error(f"获取用户对话失败: {str(e)}")
            return []
    
    def clear_expired_conversations(self) -> int:
        """
        清理过期对话
        
        Returns:
            int: 清理的对话数量
        """
        # Redis自动过期机制已经处理了大部分情况
        # 这里可以添加额外的清理逻辑
        return 0

五、用户会话跟踪与管理

5.1 用户会话管理器

class UserSessionManager:
    """用户会话管理器"""
    
    def __init__(self, redis_host: str = "localhost", redis_port: int = 6379, 
                 session_timeout: int = 1800):  # 30分钟
        """
        初始化用户会话管理器
        
        Args:
            redis_host (str): Redis主机地址
            redis_port (int): Redis端口
            session_timeout (int): 会话超时时间(秒)
        """
        self.redis_client = redis.Redis(
            host=redis_host,
            port=redis_port,
            decode_responses=False
        )
        self.session_timeout = session_timeout
        self.logger = logging.getLogger(__name__)
    
    def create_session(self, user_id: str, session_data: Dict[str, Any] = None) -> str:
        """
        创建用户会话
        
        Args:
            user_id (str): 用户ID
            session_data (Dict[str, Any]): 会话数据
            
        Returns:
            str: 会话ID
        """
        session_id = str(uuid.uuid4())
        
        session_info = {
            "user_id": user_id,
            "created_at": datetime.now().isoformat(),
            "updated_at": datetime.now().isoformat(),
            "data": session_data or {}
        }
        
        # 存储会话信息
        key = f"user_session:{session_id}"
        self.redis_client.setex(key, self.session_timeout, 
                               pickle.dumps(session_info))
        
        # 建立用户与会话的关联
        user_key = f"user_sessions:{user_id}"
        self.redis_client.sadd(user_key, session_id)
        self.redis_client.expire(user_key, self.session_timeout)
        
        return session_id
    
    def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
        """
        获取用户会话信息
        
        Args:
            session_id (str): 会话ID
            
        Returns:
            Dict[str, Any]: 会话信息,如果不存在返回None
        """
        try:
            key = f"user_session:{session_id}"
            data = self.redis_client.get(key)
            if data:
                session_info = pickle.loads(data)
                # 更新最后访问时间
                session_info["updated_at"] = datetime.now().isoformat()
                self.redis_client.setex(key, self.session_timeout, 
                                       pickle.dumps(session_info))
                return session_info
            return None
        except Exception as e:
            self.logger.error(f"获取会话信息失败: {str(e)}")
            return None
    
    def update_session(self, session_id: str, session_data: Dict[str, Any]) -> bool:
        """
        更新用户会话信息
        
        Args:
            session_id (str): 会话ID
            session_data (Dict[str, Any]): 会话数据
            
        Returns:
            bool: 是否更新成功
        """
        try:
            key = f"user_session:{session_id}"
            data = self.redis_client.get(key)
            if data:
                session_info = pickle.loads(data)
                session_info["data"].update(session_data)
                session_info["updated_at"] = datetime.now().isoformat()
                self.redis_client.setex(key, self.session_timeout, 
                                       pickle.dumps(session_info))
                return True
            return False
        except Exception as e:
            self.logger.error(f"更新会话信息失败: {str(e)}")
            return False
    
    def destroy_session(self, session_id: str) -> bool:
        """
        销毁用户会话
        
        Args:
            session_id (str): 会话ID
            
        Returns:
            bool: 是否销毁成功
        """
        try:
            key = f"user_session:{session_id}"
            # 删除会话信息
            deleted = self.redis_client.delete(key)
            
            # 从用户关联列表中移除
            user_key = f"user_sessions:{self.get_user_id_from_session(session_id)}"
            if user_key:
                self.redis_client.srem(user_key, session_id)
            
            return deleted > 0
        except Exception as e:
            self.logger.error(f"销毁会话失败: {str(e)}")
            return False
    
    def get_user_id_from_session(self, session_id: str) -> Optional[str]:
        """
        从会话ID获取用户ID
        
        Args:
            session_id (str): 会话ID
            
        Returns:
            str: 用户ID,如果不存在返回None
        """
        try:
            key = f"user_session:{session_id}"
            data = self.redis_client.get(key)
            if data:
                session_info = pickle.loads(data)
                return session_info["user_id"]
            return None
        except Exception:
            return None
    
    def get_user_active_sessions(self, user_id: str) -> List[str]:
        """
        获取用户活跃会话
        
        Args:
            user_id (str): 用户ID
            
        Returns:
            List[str]: 会话ID列表
        """
        try:
            user_key = f"user_sessions:{user_id}"
            session_ids = self.redis_client.smembers(user_key)
            return list(session_ids)
        except Exception as e:
            self.logger.error(f"获取用户会话失败: {str(e)}")
            return []

5.2 会话上下文管理

class SessionContextManager:
    """会话上下文管理器"""
    
    def __init__(self, conversation_manager: ConversationManager, 
                 session_manager: UserSessionManager):
        self.conversation_manager = conversation_manager
        self.session_manager = session_manager
        self.logger = logging.getLogger(__name__)
    
    def get_context_for_user(self, user_id: str, session_id: str) -> Dict[str, Any]:
        """
        获取用户上下文
        
        Args:
            user_id (str): 用户ID
            session_id (str): 会话ID
            
        Returns:
            Dict[str, Any]: 上下文信息
        """
        context = {
            "user_id": user_id,
            "session_id": session_id,
            "conversation_id": None,
            "conversation_history": [],
            "session_data": {}
        }
        
        # 获取会话数据
        session_info = self.session_manager.get_session(session_id)
        if session_info:
            context["session_data"] = session_info["data"]
        
        # 尝试获取用户的当前对话
        # 这里可以实现更复杂的逻辑,比如根据会话状态获取最近的对话
        
        return context
    
    def update_user_context(self, user_id: str, session_id: str, 
                          conversation_id: str = None, **kwargs) -> bool:
        """
        更新用户上下文
        
        Args:
            user_id (str): 用户ID
            session_id (str): 会话ID
            conversation_id (str): 对话ID
            **kwargs: 其他上下文数据
            
        Returns:
            bool: 是否更新成功
        """
        try:
            # 更新会话数据
            session_data = kwargs.copy()
            if conversation_id:
                session_data["current_conversation_id"] = conversation_id
            
            return self.session_manager.update_session(session_id, session_data)
        except Exception as e:
            self.logger.error(f"更新用户上下文失败: {str(e)}")
            return False

六、完整的应用服务层实现

6.1 智能问答服务

class SmartQAService:
    """智能问答服务"""
    
    def __init__(self, chatgpt_client: ChatGPTClient, 
                 conversation_manager: ConversationManager,
                 session_manager: UserSessionManager):
        self.chatgpt_client = chatgpt_client
        self.conversation_manager = conversation_manager
        self.session_manager = session_manager
        self.logger = logging.getLogger(__name__)
    
    def process_user_query(self, user_id: str, session_id: str, 
                          question: str, system_prompt: str = None) -> Dict[str, Any]:
        """
        处理用户查询
        
        Args:
            user_id (str): 用户ID
            session_id (str): 会话ID
            question (str): 用户问题
            system_prompt (str): 系统提示词
            
        Returns:
            Dict[str, Any]: 响应结果
        """
        try:
            # 获取当前会话信息
            session_info = self.session_manager.get_session(session_id)
            if not session_info:
                return {
                    "success": False,
                    "error": "Invalid session"
                }
            
            # 获取或创建对话
            conversation_id = session_info["data"].get("current_conversation_id")
            
            if not conversation_id:
                # 创建新的对话
                conversation = self.conversation_manager.create_conversation(
                    user_id=user_id,
                    initial_message=question,
                    system_prompt=system_prompt or "You are a helpful assistant."
                )
                conversation_id = conversation.conversation_id
                
                # 更新会话中的当前对话ID
                self.session_manager.update_session(session_id, {
                    "current_conversation_id": conversation_id
                })
            else:
                # 获取现有对话
                conversation = self.conversation_manager.get_conversation(conversation_id)
                if not conversation:
                    # 如果对话不存在,创建新的
                    conversation = self.conversation_manager.create_conversation(
                        user_id=user_id,
                        initial_message=question,
                        system_prompt=system_prompt or "You are a helpful assistant."
                    )
                    conversation_id = conversation.conversation_id
                    
                    # 更新会话中的当前对话ID
                    self.session_manager.update_session(session_id, {
                        "current_conversation_id": conversation_id
                    })
            
            # 构建对话历史
            conversation_history = []
            messages = conversation.get_messages()
            
            # 限制历史消息数量,避免超出token限制
            recent_messages = messages[-10:] if len(messages) > 10 else messages
            
            for msg in recent_messages:
                conversation_history.append({
                    "role": msg.role,
                    "content": msg.content
                })
            
            # 调用ChatGPT API
            response = self.chatgpt_client.create_chat_with_context(
                user_message=question,
                conversation_history=conversation_history,
                system_prompt=system_prompt or "You are a helpful assistant."
            )
            
            if response["success"]:
                # 添加用户问题到对话历史
                user_message = Message("user", question)
                conversation.add_message(user_message)
                
                # 添加AI回答到对话历史
                ai_message = Message("assistant", response["content"])
                conversation.add_message(ai_message)
                
                # 更新对话状态
                self.conversation_manager.update_conversation(conversation)
                
                return {
                    "success": True,
                    "answer": response["content"],
                    "conversation_id": conversation_id,
                    "session_id": session_id
                }
            else:
                return {
                    "success": False,
                    "error": response.get("error", "API call failed")
               
相关推荐
广告位招租

相似文章

    评论 (0)

    0/2000