引言
在人工智能技术快速发展的今天,大型语言模型如ChatGPT已经成为了企业智能化转型的重要工具。将ChatGPT API集成到企业应用中,能够显著提升客户服务效率,降低运营成本,创造更好的用户体验。
本文将深入探讨如何从零开始搭建一个完整的智能问答系统,涵盖API调用封装、对话状态管理、用户会话跟踪等核心技术点,并提供完整的项目架构设计和代码实现方案。
一、技术背景与应用场景
1.1 ChatGPT API概述
ChatGPT是OpenAI开发的大型语言模型,通过其API接口,开发者可以将强大的自然语言处理能力集成到自己的应用中。API支持多种功能:
- 文本生成和回答
- 对话管理
- 内容创作
- 代码生成
- 多语言支持
1.2 应用场景分析
智能问答系统主要适用于以下场景:
- 企业客服系统:自动处理常见问题,减少人工客服压力
- 知识库问答:为员工或客户提供精准的知识查询服务
- 产品推荐系统:基于用户对话内容提供个性化推荐
- 教育培训平台:辅助教学和学习过程
二、系统架构设计
2.1 整体架构概览
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ 用户前端 │ │ API网关 │ │ 应用服务层 │
│ │ │ │ │ │
│ Web/移动应用 │───▶│ 身份认证 │───▶│ 业务逻辑处理 │
│ │ │ 请求路由 │ │ 对话管理 │
└─────────────────┘ │ 限流熔断 │ │ 状态存储 │
│ │ │ API调用封装 │
└─────────────────┘ └─────────────────┘
│ │
│ 数据库 │
│ 缓存系统 │
└─────────────────┘
2.2 核心组件设计
2.2.1 API调用封装层
负责与ChatGPT API的交互,包括请求构建、响应处理、错误处理等。
2.2.2 对话管理模块
维护用户对话状态,实现上下文理解、会话跟踪等功能。
2.2.3 用户会话管理
管理用户的会话信息,确保对话的连续性和一致性。
三、API调用封装实现
3.1 基础配置与初始化
import openai
import os
from typing import Dict, Any, Optional
import logging
class ChatGPTClient:
def __init__(self, api_key: str, model: str = "gpt-3.5-turbo"):
"""
初始化ChatGPT客户端
Args:
api_key (str): OpenAI API密钥
model (str): 使用的模型名称
"""
self.api_key = api_key
self.model = model
openai.api_key = api_key
self.logger = logging.getLogger(__name__)
def _validate_response(self, response: Dict[str, Any]) -> bool:
"""验证API响应是否有效"""
return response and 'choices' in response and len(response['choices']) > 0
def _handle_error(self, error: Exception) -> Dict[str, Any]:
"""统一错误处理"""
self.logger.error(f"API调用失败: {str(error)}")
return {
"error": str(error),
"success": False
}
3.2 核心API调用方法
class ChatGPTClient:
# ... 前面的初始化代码
def create_chat_completion(self,
messages: list,
temperature: float = 0.7,
max_tokens: int = 1000,
top_p: float = 1.0,
frequency_penalty: float = 0.0,
presence_penalty: float = 0.0) -> Dict[str, Any]:
"""
创建聊天完成
Args:
messages (list): 消息列表,包含system、user、assistant消息
temperature (float): 温度参数,控制随机性
max_tokens (int): 最大生成token数
top_p (float): Top-p采样参数
frequency_penalty (float): 频率惩罚
presence_penalty (float): 存在惩罚
Returns:
Dict[str, Any]: API响应结果
"""
try:
response = openai.ChatCompletion.create(
model=self.model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
top_p=top_p,
frequency_penalty=frequency_penalty,
presence_penalty=presence_penalty
)
if self._validate_response(response):
return {
"success": True,
"response": response,
"content": response['choices'][0]['message']['content'].strip()
}
else:
return {
"success": False,
"error": "Invalid API response"
}
except openai.error.RateLimitError as e:
return self._handle_error(f"API调用频率限制: {str(e)}")
except openai.error.AuthenticationError as e:
return self._handle_error(f"认证失败: {str(e)}")
except openai.error.APIConnectionError as e:
return self._handle_error(f"连接错误: {str(e)}")
except Exception as e:
return self._handle_error(f"未知错误: {str(e)}")
def create_completion(self,
prompt: str,
temperature: float = 0.7,
max_tokens: int = 1000,
top_p: float = 1.0,
frequency_penalty: float = 0.0,
presence_penalty: float = 0.0) -> Dict[str, Any]:
"""
创建文本补全
Args:
prompt (str): 输入提示词
temperature (float): 温度参数
max_tokens (int): 最大生成token数
top_p (float): Top-p采样参数
frequency_penalty (float): 频率惩罚
presence_penalty (float): 存在惩罚
Returns:
Dict[str, Any]: API响应结果
"""
try:
response = openai.Completion.create(
model=self.model,
prompt=prompt,
temperature=temperature,
max_tokens=max_tokens,
top_p=top_p,
frequency_penalty=frequency_penalty,
presence_penalty=presence_penalty
)
if 'choices' in response and len(response['choices']) > 0:
return {
"success": True,
"response": response,
"content": response['choices'][0]['text'].strip()
}
else:
return {
"success": False,
"error": "Invalid API response"
}
except Exception as e:
return self._handle_error(str(e))
3.3 高级功能封装
class ChatGPTClient:
# ... 前面的代码
def create_chat_with_context(self,
user_message: str,
conversation_history: list = None,
system_prompt: str = "You are a helpful assistant.",
**kwargs) -> Dict[str, Any]:
"""
创建带上下文的聊天
Args:
user_message (str): 用户消息
conversation_history (list): 对话历史记录
system_prompt (str): 系统提示词
**kwargs: 其他API参数
Returns:
Dict[str, Any]: 响应结果
"""
# 构建消息列表
messages = []
# 添加系统提示词
if system_prompt:
messages.append({
"role": "system",
"content": system_prompt
})
# 添加历史对话
if conversation_history:
messages.extend(conversation_history)
# 添加用户消息
messages.append({
"role": "user",
"content": user_message
})
return self.create_chat_completion(messages, **kwargs)
def streaming_response(self,
messages: list,
stream: bool = True) -> Dict[str, Any]:
"""
流式响应处理
Args:
messages (list): 消息列表
stream (bool): 是否启用流式响应
Returns:
Dict[str, Any]: 响应结果
"""
try:
response = openai.ChatCompletion.create(
model=self.model,
messages=messages,
stream=stream
)
if stream:
# 处理流式响应
content = ""
for chunk in response:
if 'choices' in chunk and len(chunk['choices']) > 0:
delta = chunk['choices'][0]['delta']
if 'content' in delta:
content += delta['content']
yield delta['content']
return {
"success": True,
"content": content
}
else:
# 处理非流式响应
result = next(response)
return {
"success": True,
"response": result,
"content": result['choices'][0]['message']['content'].strip()
}
except Exception as e:
return self._handle_error(str(e))
四、对话状态管理
4.1 对话状态模型设计
from dataclasses import dataclass, field
from typing import List, Dict, Any
from datetime import datetime
import json
@dataclass
class Message:
"""消息对象"""
role: str # 'user', 'assistant', 'system'
content: str
timestamp: datetime = field(default_factory=datetime.now)
def to_dict(self):
return {
"role": self.role,
"content": self.content,
"timestamp": self.timestamp.isoformat()
}
@classmethod
def from_dict(cls, data):
return cls(
role=data["role"],
content=data["content"],
timestamp=datetime.fromisoformat(data["timestamp"])
)
@dataclass
class Conversation:
"""对话对象"""
conversation_id: str
user_id: str
messages: List[Message] = field(default_factory=list)
created_at: datetime = field(default_factory=datetime.now)
updated_at: datetime = field(default_factory=datetime.now)
is_active: bool = True
metadata: Dict[str, Any] = field(default_factory=dict)
def add_message(self, message: Message):
"""添加消息"""
self.messages.append(message)
self.updated_at = datetime.now()
def get_messages(self, limit: int = None) -> List[Message]:
"""获取消息列表"""
if limit:
return self.messages[-limit:]
return self.messages
def to_dict(self):
return {
"conversation_id": self.conversation_id,
"user_id": self.user_id,
"messages": [msg.to_dict() for msg in self.messages],
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat(),
"is_active": self.is_active,
"metadata": self.metadata
}
@classmethod
def from_dict(cls, data):
messages = [Message.from_dict(msg_data) for msg_data in data["messages"]]
conversation = cls(
conversation_id=data["conversation_id"],
user_id=data["user_id"],
messages=messages,
created_at=datetime.fromisoformat(data["created_at"]),
updated_at=datetime.fromisoformat(data["updated_at"]),
is_active=data["is_active"],
metadata=data.get("metadata", {})
)
return conversation
4.2 对话管理服务实现
import redis
import pickle
from typing import Optional, List
import uuid
import time
class ConversationManager:
"""对话管理服务"""
def __init__(self, redis_host: str = "localhost", redis_port: int = 6379,
redis_db: int = 0, expire_time: int = 3600):
"""
初始化对话管理器
Args:
redis_host (str): Redis主机地址
redis_port (int): Redis端口
redis_db (int): Redis数据库编号
expire_time (int): 对话过期时间(秒)
"""
self.redis_client = redis.Redis(
host=redis_host,
port=redis_port,
db=redis_db,
decode_responses=False # 使用二进制存储以提高性能
)
self.expire_time = expire_time
self.logger = logging.getLogger(__name__)
def create_conversation(self, user_id: str,
initial_message: Optional[str] = None,
system_prompt: str = "You are a helpful assistant.") -> Conversation:
"""
创建新的对话
Args:
user_id (str): 用户ID
initial_message (str): 初始消息
system_prompt (str): 系统提示词
Returns:
Conversation: 新创建的对话对象
"""
conversation_id = str(uuid.uuid4())
# 构建初始对话
conversation = Conversation(
conversation_id=conversation_id,
user_id=user_id,
metadata={
"system_prompt": system_prompt,
"initial_message": initial_message
}
)
# 添加系统消息
if system_prompt:
system_msg = Message("system", system_prompt)
conversation.add_message(system_msg)
# 添加初始用户消息
if initial_message:
user_msg = Message("user", initial_message)
conversation.add_message(user_msg)
# 存储到Redis
self._save_conversation(conversation)
return conversation
def get_conversation(self, conversation_id: str) -> Optional[Conversation]:
"""
获取对话
Args:
conversation_id (str): 对话ID
Returns:
Conversation: 对话对象,如果不存在返回None
"""
try:
data = self.redis_client.get(f"conversation:{conversation_id}")
if data:
conversation_data = pickle.loads(data)
return Conversation.from_dict(conversation_data)
return None
except Exception as e:
self.logger.error(f"获取对话失败: {str(e)}")
return None
def update_conversation(self, conversation: Conversation) -> bool:
"""
更新对话
Args:
conversation (Conversation): 对话对象
Returns:
bool: 是否更新成功
"""
try:
self._save_conversation(conversation)
return True
except Exception as e:
self.logger.error(f"更新对话失败: {str(e)}")
return False
def _save_conversation(self, conversation: Conversation) -> None:
"""保存对话到Redis"""
key = f"conversation:{conversation.conversation_id}"
data = pickle.dumps(conversation.to_dict())
self.redis_client.setex(key, self.expire_time, data)
def delete_conversation(self, conversation_id: str) -> bool:
"""
删除对话
Args:
conversation_id (str): 对话ID
Returns:
bool: 是否删除成功
"""
try:
key = f"conversation:{conversation_id}"
deleted = self.redis_client.delete(key)
return deleted > 0
except Exception as e:
self.logger.error(f"删除对话失败: {str(e)}")
return False
def get_user_conversations(self, user_id: str, limit: int = 10) -> List[Conversation]:
"""
获取用户的所有对话
Args:
user_id (str): 用户ID
limit (int): 最大返回数量
Returns:
List[Conversation]: 对话列表
"""
try:
# 这里可以实现更复杂的查询逻辑
# 为了简化,返回最近创建的对话
conversations = []
# 实际实现中需要从数据库或Redis中筛选用户相关的对话
return conversations[:limit]
except Exception as e:
self.logger.error(f"获取用户对话失败: {str(e)}")
return []
def clear_expired_conversations(self) -> int:
"""
清理过期对话
Returns:
int: 清理的对话数量
"""
# Redis自动过期机制已经处理了大部分情况
# 这里可以添加额外的清理逻辑
return 0
五、用户会话跟踪与管理
5.1 用户会话管理器
class UserSessionManager:
"""用户会话管理器"""
def __init__(self, redis_host: str = "localhost", redis_port: int = 6379,
session_timeout: int = 1800): # 30分钟
"""
初始化用户会话管理器
Args:
redis_host (str): Redis主机地址
redis_port (int): Redis端口
session_timeout (int): 会话超时时间(秒)
"""
self.redis_client = redis.Redis(
host=redis_host,
port=redis_port,
decode_responses=False
)
self.session_timeout = session_timeout
self.logger = logging.getLogger(__name__)
def create_session(self, user_id: str, session_data: Dict[str, Any] = None) -> str:
"""
创建用户会话
Args:
user_id (str): 用户ID
session_data (Dict[str, Any]): 会话数据
Returns:
str: 会话ID
"""
session_id = str(uuid.uuid4())
session_info = {
"user_id": user_id,
"created_at": datetime.now().isoformat(),
"updated_at": datetime.now().isoformat(),
"data": session_data or {}
}
# 存储会话信息
key = f"user_session:{session_id}"
self.redis_client.setex(key, self.session_timeout,
pickle.dumps(session_info))
# 建立用户与会话的关联
user_key = f"user_sessions:{user_id}"
self.redis_client.sadd(user_key, session_id)
self.redis_client.expire(user_key, self.session_timeout)
return session_id
def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
"""
获取用户会话信息
Args:
session_id (str): 会话ID
Returns:
Dict[str, Any]: 会话信息,如果不存在返回None
"""
try:
key = f"user_session:{session_id}"
data = self.redis_client.get(key)
if data:
session_info = pickle.loads(data)
# 更新最后访问时间
session_info["updated_at"] = datetime.now().isoformat()
self.redis_client.setex(key, self.session_timeout,
pickle.dumps(session_info))
return session_info
return None
except Exception as e:
self.logger.error(f"获取会话信息失败: {str(e)}")
return None
def update_session(self, session_id: str, session_data: Dict[str, Any]) -> bool:
"""
更新用户会话信息
Args:
session_id (str): 会话ID
session_data (Dict[str, Any]): 会话数据
Returns:
bool: 是否更新成功
"""
try:
key = f"user_session:{session_id}"
data = self.redis_client.get(key)
if data:
session_info = pickle.loads(data)
session_info["data"].update(session_data)
session_info["updated_at"] = datetime.now().isoformat()
self.redis_client.setex(key, self.session_timeout,
pickle.dumps(session_info))
return True
return False
except Exception as e:
self.logger.error(f"更新会话信息失败: {str(e)}")
return False
def destroy_session(self, session_id: str) -> bool:
"""
销毁用户会话
Args:
session_id (str): 会话ID
Returns:
bool: 是否销毁成功
"""
try:
key = f"user_session:{session_id}"
# 删除会话信息
deleted = self.redis_client.delete(key)
# 从用户关联列表中移除
user_key = f"user_sessions:{self.get_user_id_from_session(session_id)}"
if user_key:
self.redis_client.srem(user_key, session_id)
return deleted > 0
except Exception as e:
self.logger.error(f"销毁会话失败: {str(e)}")
return False
def get_user_id_from_session(self, session_id: str) -> Optional[str]:
"""
从会话ID获取用户ID
Args:
session_id (str): 会话ID
Returns:
str: 用户ID,如果不存在返回None
"""
try:
key = f"user_session:{session_id}"
data = self.redis_client.get(key)
if data:
session_info = pickle.loads(data)
return session_info["user_id"]
return None
except Exception:
return None
def get_user_active_sessions(self, user_id: str) -> List[str]:
"""
获取用户活跃会话
Args:
user_id (str): 用户ID
Returns:
List[str]: 会话ID列表
"""
try:
user_key = f"user_sessions:{user_id}"
session_ids = self.redis_client.smembers(user_key)
return list(session_ids)
except Exception as e:
self.logger.error(f"获取用户会话失败: {str(e)}")
return []
5.2 会话上下文管理
class SessionContextManager:
"""会话上下文管理器"""
def __init__(self, conversation_manager: ConversationManager,
session_manager: UserSessionManager):
self.conversation_manager = conversation_manager
self.session_manager = session_manager
self.logger = logging.getLogger(__name__)
def get_context_for_user(self, user_id: str, session_id: str) -> Dict[str, Any]:
"""
获取用户上下文
Args:
user_id (str): 用户ID
session_id (str): 会话ID
Returns:
Dict[str, Any]: 上下文信息
"""
context = {
"user_id": user_id,
"session_id": session_id,
"conversation_id": None,
"conversation_history": [],
"session_data": {}
}
# 获取会话数据
session_info = self.session_manager.get_session(session_id)
if session_info:
context["session_data"] = session_info["data"]
# 尝试获取用户的当前对话
# 这里可以实现更复杂的逻辑,比如根据会话状态获取最近的对话
return context
def update_user_context(self, user_id: str, session_id: str,
conversation_id: str = None, **kwargs) -> bool:
"""
更新用户上下文
Args:
user_id (str): 用户ID
session_id (str): 会话ID
conversation_id (str): 对话ID
**kwargs: 其他上下文数据
Returns:
bool: 是否更新成功
"""
try:
# 更新会话数据
session_data = kwargs.copy()
if conversation_id:
session_data["current_conversation_id"] = conversation_id
return self.session_manager.update_session(session_id, session_data)
except Exception as e:
self.logger.error(f"更新用户上下文失败: {str(e)}")
return False
六、完整的应用服务层实现
6.1 智能问答服务
class SmartQAService:
"""智能问答服务"""
def __init__(self, chatgpt_client: ChatGPTClient,
conversation_manager: ConversationManager,
session_manager: UserSessionManager):
self.chatgpt_client = chatgpt_client
self.conversation_manager = conversation_manager
self.session_manager = session_manager
self.logger = logging.getLogger(__name__)
def process_user_query(self, user_id: str, session_id: str,
question: str, system_prompt: str = None) -> Dict[str, Any]:
"""
处理用户查询
Args:
user_id (str): 用户ID
session_id (str): 会话ID
question (str): 用户问题
system_prompt (str): 系统提示词
Returns:
Dict[str, Any]: 响应结果
"""
try:
# 获取当前会话信息
session_info = self.session_manager.get_session(session_id)
if not session_info:
return {
"success": False,
"error": "Invalid session"
}
# 获取或创建对话
conversation_id = session_info["data"].get("current_conversation_id")
if not conversation_id:
# 创建新的对话
conversation = self.conversation_manager.create_conversation(
user_id=user_id,
initial_message=question,
system_prompt=system_prompt or "You are a helpful assistant."
)
conversation_id = conversation.conversation_id
# 更新会话中的当前对话ID
self.session_manager.update_session(session_id, {
"current_conversation_id": conversation_id
})
else:
# 获取现有对话
conversation = self.conversation_manager.get_conversation(conversation_id)
if not conversation:
# 如果对话不存在,创建新的
conversation = self.conversation_manager.create_conversation(
user_id=user_id,
initial_message=question,
system_prompt=system_prompt or "You are a helpful assistant."
)
conversation_id = conversation.conversation_id
# 更新会话中的当前对话ID
self.session_manager.update_session(session_id, {
"current_conversation_id": conversation_id
})
# 构建对话历史
conversation_history = []
messages = conversation.get_messages()
# 限制历史消息数量,避免超出token限制
recent_messages = messages[-10:] if len(messages) > 10 else messages
for msg in recent_messages:
conversation_history.append({
"role": msg.role,
"content": msg.content
})
# 调用ChatGPT API
response = self.chatgpt_client.create_chat_with_context(
user_message=question,
conversation_history=conversation_history,
system_prompt=system_prompt or "You are a helpful assistant."
)
if response["success"]:
# 添加用户问题到对话历史
user_message = Message("user", question)
conversation.add_message(user_message)
# 添加AI回答到对话历史
ai_message = Message("assistant", response["content"])
conversation.add_message(ai_message)
# 更新对话状态
self.conversation_manager.update_conversation(conversation)
return {
"success": True,
"answer": response["content"],
"conversation_id": conversation_id,
"session_id": session_id
}
else:
return {
"success": False,
"error": response.get("error", "API call failed")

评论 (0)