RAG营养聊天机器人完整实现指南
检索增强生成(RAG)技术正在 revolutionize 健康科技领域。本文将提供一个完整的实现指南,教你如何从零开始构建一个专业的营养咨询聊天机器人。
系统架构图
code
┌─────────────┐
│ 用户问题 │
└──────┬──────┘
│
▼
┌─────────────────────────────────┐
│ FastAPI应用层 │
│ - 请求验证 │
│ - 会话管理 │
│ - 响应格式化 │
└──────┬──────────────────────────┘
│
▼
┌─────────────────────────────────┐
│ RAG服务层 │
│ ┌─────────┐ ┌──────────┐ │
│ │ 检索器 │───▶│ 生成器 │ │
│ └────┬────┘ └─────┬────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────┐ ┌──────────┐ │
│ │向量数据库│ │ LLM │ │
│ │(Pinecone)│ │(GPT-4) │ │
│ └─────────┘ └──────────┘ │
└─────────────────────────────────┘
Code collapsed
完整项目结构
code
nutrition-rag-bot/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI应用
│ ├── config/
│ │ ├── __init__.py
│ │ └── settings.py # 配置管理
│ ├── api/
│ │ ├── __init__.py
│ │ ├── v1/
│ │ │ ├── __init__.py
│ │ │ ├── chat.py # 聊天API
│ │ │ ├── documents.py # 文档管理API
│ │ │ └── health.py # 健康检查
│ ├── core/
│ │ ├── __init__.py
│ │ ├── rag.py # RAG核心逻辑
│ │ ├── retriever.py # 检索器
│ │ ├── generator.py # 生成器
│ │ └── memory.py # 对话记忆
│ ├── vectorstore/
│ │ ├── __init__.py
│ │ ├── pinecone_store.py # Pinecone实现
│ │ └── embeddings.py # 嵌入模型
│ ├── processors/
│ │ ├── __init__.py
│ │ ├── document_loader.py # 文档加载
│ │ ├── text_splitter.py # 文本分割
│ │ └── metadata_extractor.py # 元数据提取
│ ├── models/
│ │ ├── __init__.py
│ │ ├── chat.py # 聊天模型
│ │ ├── document.py # 文档模型
│ │ └── response.py # 响应模型
│ ├── prompts/
│ │ ├── __init__.py
│ │ ├── chat_prompts.py # 聊天提示词
│ │ └── system_prompts.py # 系统提示词
│ └── utils/
│ ├── __init__.py
│ ├── logger.py # 日志工具
│ └── validators.py # 验证工具
├── knowledge/
│ ├── guidelines/ # 营养指南
│ ├── foods/ # 食物数据
│ ├── research/ # 研究论文
│ └── faq/ # 常见问题
├── tests/
│ ├── unit/ # 单元测试
│ ├── integration/ # 集成测试
│ └── e2e/ # 端到端测试
├── scripts/
│ ├── ingest_documents.py # 文档导入脚本
│ └── init_vectorstore.py # 初始化向量库
├── requirements.txt
├── .env.example
├── docker-compose.yml
└── README.md
Code collapsed
1. 核心配置
app/config/settings.py
code
from pydantic_settings import BaseSettings
from typing import Optional
class Settings(BaseSettings):
"""应用配置"""
# 应用基础配置
APP_NAME: str = "营养RAG聊天机器人"
APP_VERSION: str = "1.0.0"
DEBUG: bool = False
# API配置
API_PREFIX: str = "/api/v1"
MAX_CONTEXT_LENGTH: int = 4000
MAX_HISTORY_LENGTH: int = 10
# OpenAI配置
OPENAI_API_KEY: str
OPENAI_MODEL: str = "gpt-4-turbo-preview"
OPENAI_TEMPERATURE: float = 0.7
EMBEDDING_MODEL: str = "text-embedding-3-large"
EMBEDDING_DIMENSIONS: int = 3072
# Pinecone配置
PINECONE_API_KEY: str
PINECONE_ENVIRONMENT: str = "gcp-starter"
PINECONE_INDEX_NAME: str = "nutrition-rag"
PINECONE_NAMESPACE: str = "nutrition"
# 文档处理配置
CHUNK_SIZE: int = 1000
CHUNK_OVERLAP: int = 200
MAX_DOCUMENT_SIZE_MB: int = 10
# 检索配置
TOP_K_RESULTS: int = 5
SIMILARITY_THRESHOLD: float = 0.7
RERANK_TOP_K: int = 3
# 缓存配置
ENABLE_CACHE: bool = True
CACHE_TTL_SECONDS: int = 3600
# 日志配置
LOG_LEVEL: str = "INFO"
LOG_FORMAT: str = "json"
class Config:
env_file = ".env"
case_sensitive = True
settings = Settings()
Code collapsed
2. RAG核心实现
app/core/rag.py
code
from typing import List, Dict, Any, Optional, AsyncIterator
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
from app.core.retriever import Retriever
from app.core.generator import Generator
from app.core.memory import ConversationMemory
from app.models.chat import ChatRequest, ChatResponse, SourceDocument
from app.prompts.system_prompts import NUTRITION_SYSTEM_PROMPT
from app.config import settings
from app.utils.logger import get_logger
logger = get_logger(__name__)
class RAGPipeline:
"""RAG管道 - 完整的检索增强生成流程"""
def __init__(self):
self.retriever = Retriever()
self.generator = Generator()
self.memory = ConversationMemory()
self.llm = ChatOpenAI(
model=settings.OPENAI_MODEL,
temperature=settings.OPENAI_TEMPERATURE,
streaming=True
)
async def process(
self,
request: ChatRequest
) -> AsyncIterator[Dict[str, Any]]:
"""
处理聊天请求(流式)
Args:
request: 聊天请求
Yields:
流式响应片段
"""
try:
# 1. 获取对话历史
history = await self.memory.get_history(request.session_id)
# 2. 检索相关文档
logger.info(f"检索文档: {request.question}")
sources = await self.retriever.retrieve(
query=request.question,
top_k=settings.TOP_K_RESULTS,
filter=self._build_filter(request)
)
if not sources:
yield {
"type": "error",
"content": "抱歉,我没有找到相关信息。请尝试其他问题或咨询专业营养师。"
}
return
# 3. 构建提示
prompt = await self._build_prompt(
question=request.question,
sources=sources,
history=history
)
# 4. 流式生成回答
logger.info("开始生成回答")
full_response = ""
async for chunk in self._stream_generate(prompt):
full_response += chunk
yield {
"type": "token",
"content": chunk
}
# 5. 保存对话历史
await self.memory.add_message(
session_id=request.session_id,
role="user",
content=request.question
)
await self.memory.add_message(
session_id=request.session_id,
role="assistant",
content=full_response
)
# 6. 返回完整响应
yield {
"type": "complete",
"response": ChatResponse(
answer=full_response,
sources=[SourceDocument(**s) for s in sources],
session_id=request.session_id
)
}
except Exception as e:
logger.error(f"RAG处理错误: {e}")
yield {
"type": "error",
"content": f"处理请求时出错: {str(e)}"
}
async def _build_prompt(
self,
question: str,
sources: List[Dict[str, Any]],
history: List[Dict[str, str]]
) -> List[Dict[str, Any]]:
"""构建LLM提示"""
# 构建上下文
context = self._format_context(sources)
# 构建历史
history_text = self._format_history(history)
# 构建消息列表
messages = [
SystemMessage(content=NUTRITION_SYSTEM_PROMPT),
SystemMessage(content=f"""参考信息:
{context}
注意事项:
1. 以上信息来源于权威营养资料和科学研究
2. 引用时请注明来源编号
3. 如信息不足,明确告知用户
4. 医疗问题建议咨询专业医生"""),
]
# 添加历史对话
if history_text:
messages.append(
SystemMessage(content=f"历史对话:\n{history_text}")
)
# 添加当前问题
messages.append(HumanMessage(content=question))
return messages
async def _stream_generate(
self,
messages: List[Dict[str, Any]]
) -> AsyncIterator[str]:
"""流式生成文本"""
async for chunk in self.llm.astream(messages):
if hasattr(chunk, 'content'):
yield chunk.content
elif isinstance(chunk, str):
yield chunk
def _format_context(self, sources: List[Dict[str, Any]]) -> str:
"""格式化检索上下文"""
parts = []
for i, source in enumerate(sources, 1):
metadata = source.get('metadata', {})
parts.append(
f"[来源{i}] {source['content']}\n"
f"出处: {metadata.get('title', '未知')}"
f" | {metadata.get('source', '')}"
f"{f' | 第{metadata.get('page', 1)}页' if metadata.get('page') else ''}"
f" | 相关度: {source['score']:.2f}"
)
return "\n\n".join(parts)
def _format_history(self, history: List[Dict[str, str]]) -> str:
"""格式化对话历史"""
if not history:
return ""
lines = []
for msg in history[-settings.MAX_HISTORY_LENGTH:]:
role = "用户" if msg['role'] == 'user' else "助手"
lines.append(f"{role}: {msg['content']}")
return "\n".join(lines)
def _build_filter(self, request: ChatRequest) -> Optional[Dict[str, Any]]:
"""构建检索过滤条件"""
filters = {}
# 可根据请求添加过滤条件
# 例如: 特定的文档类型、时间范围等
return filters if filters else None
Code collapsed
3. 检索器实现
app/core/retriever.py
code
from typing import List, Dict, Any, Optional
from pinecone import Pinecone
from app.vectorstore.pinecone_store import PineconeVectorStore
from app.vectorstore.embeddings import EmbeddingService
from app.config import settings
from app.utils.logger import get_logger
logger = get_logger(__name__)
class Retriever:
"""文档检索器"""
def __init__(self):
self.vector_store = PineconeVectorStore()
self.embedding_service = EmbeddingService()
async def retrieve(
self,
query: str,
top_k: int = 5,
filter: Optional[Dict[str, Any]] = None
) -> List[Dict[str, Any]]:
"""
检索相关文档
Args:
query: 查询文本
top_k: 返回结果数量
filter: 过滤条件
Returns:
检索结果列表
"""
try:
# 生成查询向量
logger.info(f"生成查询嵌入: {query[:50]}...")
query_embedding = await self.embedding_service.embed_query(query)
# 向量检索
logger.info("执行向量检索...")
results = await self.vector_store.similarity_search(
vector=query_embedding,
top_k=top_k,
filter=filter,
namespace=settings.PINECONE_NAMESPACE
)
# 过滤低相似度结果
filtered_results = [
r for r in results
if r['score'] >= settings.SIMILARITY_THRESHOLD
]
logger.info(f"检索完成: {len(filtered_results)}个结果")
return filtered_results
except Exception as e:
logger.error(f"检索失败: {e}")
return []
async def retrieve_with_rerank(
self,
query: str,
top_k: int = 10,
rerank_top_k: int = 3
) -> List[Dict[str, Any]]:
"""
检索并重排序(提高精度)
Args:
query: 查询文本
top_k: 初始检索数量
rerank_top_k: 重排序后返回数量
Returns:
重排序后的检索结果
"""
# 初始检索
results = await self.retrieve(query, top_k=top_k)
if not results:
return []
# 使用Cross-Encoder进行重排序
reranked_results = await self._rerank(query, results)
return reranked_results[:rerank_top_k]
async def _rerank(
self,
query: str,
results: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
"""
使用Cross-Encoder重排序
实际项目中可以使用专门的rerank模型
这里简化实现
"""
# 简单的关键词匹配重排序
query_words = set(query.lower().split())
for result in results:
content_words = set(result['content'].lower().split())
overlap = len(query_words & content_words)
result['rerank_score'] = result['score'] * 0.7 + (overlap / len(query_words)) * 0.3
return sorted(results, key=lambda x: x['rerank_score'], reverse=True)
async def hybrid_search(
self,
query: str,
top_k: int = 5
) -> List[Dict[str, Any]]:
"""
混合检索(向量+关键词)
结合语义检索和关键词检索的优势
"""
# 向量检索
vector_results = await self.retrieve(query, top_k=top_k)
# 关键词检索(实际项目中可使用Elasticsearch)
keyword_results = await self._keyword_search(query, top_k=top_k)
# 合并和重排序
combined = self._merge_results(vector_results, keyword_results)
return combined[:top_k]
async def _keyword_search(
self,
query: str,
top_k: int
) -> List[Dict[str, Any]]:
"""关键词检索(简化实现)"""
# 实际项目中应使用专门的关键词搜索引擎
# 这里返回空列表作为占位
return []
def _merge_results(
self,
vector_results: List[Dict],
keyword_results: List[Dict]
) -> List[Dict]:
"""合并检索结果"""
# 简化的合并算法
all_results = {}
for result in vector_results:
doc_id = result.get('id', '')
all_results[doc_id] = {**result, 'vector_score': result['score']}
for result in keyword_results:
doc_id = result.get('id', '')
if doc_id in all_results:
all_results[doc_id]['keyword_score'] = result['score']
all_results[doc_id]['combined_score'] = (
all_results[doc_id]['vector_score'] * 0.7 +
result['score'] * 0.3
)
else:
all_results[doc_id] = {
**result,
'combined_score': result['score']
}
return sorted(
all_results.values(),
key=lambda x: x.get('combined_score', x.get('score', 0)),
reverse=True
)
Code collapsed
4. 文档导入脚本
scripts/ingest_documents.py
code
#!/usr/bin/env python3
"""
文档导入脚本 - 将营养资料导入向量数据库
"""
import asyncio
import sys
from pathlib import Path
# 添加项目路径
sys.path.insert(0, str(Path(__file__).parent.parent))
from app.processors.document_loader import DocumentLoader
from app.processors.text_splitter import TextSplitter
from app.vectorstore.embeddings import EmbeddingService
from app.vectorstore.pinecone_store import PineconeVectorStore
from app.config import settings
from app.utils.logger import get_logger
logger = get_logger(__name__)
async def ingest_documents(knowledge_dir: str):
"""
导入文档到向量数据库
Args:
knowledge_dir: 知识库目录路径
"""
logger.info(f"开始导入文档: {knowledge_dir}")
# 1. 加载文档
loader = DocumentLoader()
documents = loader.load_directory(knowledge_dir)
logger.info(f"加载了 {len(documents)} 个文档")
if not documents:
logger.warning("没有找到文档")
return
# 2. 分割文本
splitter = TextSplitter(
chunk_size=settings.CHUNK_SIZE,
chunk_overlap=settings.CHUNK_OVERLAP
)
chunks = splitter.split_documents(documents)
logger.info(f"分割为 {len(chunks)} 个文本块")
# 3. 生成嵌入
embedding_service = EmbeddingService()
logger.info("生成嵌入向量...")
embeddings = []
for chunk in chunks:
embedding = await embedding_service.embed_text(chunk['content'])
embeddings.append(embedding)
logger.info(f"生成了 {len(embeddings)} 个嵌入向量")
# 4. 上传到Pinecone
vector_store = PineconeVectorStore()
logger.info("上传到向量数据库...")
await vector_store.add_vectors(
vectors=embeddings,
metadata=chunks,
namespace=settings.PINECONE_NAMESPACE
)
logger.info("文档导入完成!")
async def main():
if len(sys.argv) < 2:
print("用法: python ingest_documents.py <knowledge_directory>")
sys.exit(1)
knowledge_dir = sys.argv[1]
if not Path(knowledge_dir).exists():
logger.error(f"目录不存在: {knowledge_dir}")
sys.exit(1)
await ingest_documents(knowledge_dir)
if __name__ == "__main__":
asyncio.run(main())
Code collapsed
API端点总结
| 端点 | 方法 | 描述 |
|---|---|---|
| /api/v1/chat/ask | POST | 提问(流式) |
| /api/v1/chat/ask-stream | POST | 提问(SSE流式) |
| /api/v1/documents/ingest | POST | 导入文档 |
| /api/v1/documents/list | GET | 列出文档 |
| /api/v1/health | GET | 健康检查 |
部署建议
-
Pinecone索引配置:
- 使用生产环境
- 设置合适的副本数量
- 启用数据备份
-
性能优化:
- 实现查询缓存
- 使用批处理嵌入
- 优化提示词长度
-
监控和日志:
- 跟踪检索质量
- 监控LLM调用
- 记录用户反馈
通过本完整指南,你已掌握构建生产级RAG营养聊天机器人的全部技术细节。这个系统可直接用于健康咨询场景,也可扩展到其他专业领域。