康心伴Logo
康心伴WellAlly
Backend Development

RAG营养聊天机器人完整实现指南(FastAPI+LangChain)

5 分钟阅读

RAG营养聊天机器人完整实现指南

检索增强生成(RAG)技术正在 revolutionize 健康科技领域。本文将提供一个完整的实现指南,教你如何从零开始构建一个专业的营养咨询聊天机器人。

系统架构图

code
┌─────────────┐
│  用户问题   │
└──────┬──────┘
       │
       ▼
┌─────────────────────────────────┐
│      FastAPI应用层              │
│  - 请求验证                     │
│  - 会话管理                     │
│  - 响应格式化                   │
└──────┬──────────────────────────┘
       │
       ▼
┌─────────────────────────────────┐
│       RAG服务层                 │
│  ┌─────────┐    ┌──────────┐   │
│  │ 检索器  │───▶│ 生成器   │   │
│  └────┬────┘    └─────┬────┘   │
│       │               │        │
│       ▼               ▼        │
│  ┌─────────┐    ┌──────────┐   │
│  │向量数据库│    │  LLM    │   │
│  │(Pinecone)│    │(GPT-4)  │   │
│  └─────────┘    └──────────┘   │
└─────────────────────────────────┘
Code collapsed

完整项目结构

code
nutrition-rag-bot/
├── app/
│   ├── __init__.py
│   ├── main.py                          # FastAPI应用
│   ├── config/
│   │   ├── __init__.py
│   │   └── settings.py                  # 配置管理
│   ├── api/
│   │   ├── __init__.py
│   │   ├── v1/
│   │   │   ├── __init__.py
│   │   │   ├── chat.py                  # 聊天API
│   │   │   ├── documents.py             # 文档管理API
│   │   │   └── health.py                # 健康检查
│   ├── core/
│   │   ├── __init__.py
│   │   ├── rag.py                       # RAG核心逻辑
│   │   ├── retriever.py                 # 检索器
│   │   ├── generator.py                 # 生成器
│   │   └── memory.py                    # 对话记忆
│   ├── vectorstore/
│   │   ├── __init__.py
│   │   ├── pinecone_store.py            # Pinecone实现
│   │   └── embeddings.py                # 嵌入模型
│   ├── processors/
│   │   ├── __init__.py
│   │   ├── document_loader.py           # 文档加载
│   │   ├── text_splitter.py             # 文本分割
│   │   └── metadata_extractor.py        # 元数据提取
│   ├── models/
│   │   ├── __init__.py
│   │   ├── chat.py                      # 聊天模型
│   │   ├── document.py                  # 文档模型
│   │   └── response.py                  # 响应模型
│   ├── prompts/
│   │   ├── __init__.py
│   │   ├── chat_prompts.py              # 聊天提示词
│   │   └── system_prompts.py            # 系统提示词
│   └── utils/
│       ├── __init__.py
│       ├── logger.py                    # 日志工具
│       └── validators.py                # 验证工具
├── knowledge/
│   ├── guidelines/                      # 营养指南
│   ├── foods/                           # 食物数据
│   ├── research/                        # 研究论文
│   └── faq/                             # 常见问题
├── tests/
│   ├── unit/                            # 单元测试
│   ├── integration/                     # 集成测试
│   └── e2e/                             # 端到端测试
├── scripts/
│   ├── ingest_documents.py              # 文档导入脚本
│   └── init_vectorstore.py              # 初始化向量库
├── requirements.txt
├── .env.example
├── docker-compose.yml
└── README.md
Code collapsed

1. 核心配置

app/config/settings.py

code
from pydantic_settings import BaseSettings
from typing import Optional

class Settings(BaseSettings):
    """应用配置"""

    # 应用基础配置
    APP_NAME: str = "营养RAG聊天机器人"
    APP_VERSION: str = "1.0.0"
    DEBUG: bool = False

    # API配置
    API_PREFIX: str = "/api/v1"
    MAX_CONTEXT_LENGTH: int = 4000
    MAX_HISTORY_LENGTH: int = 10

    # OpenAI配置
    OPENAI_API_KEY: str
    OPENAI_MODEL: str = "gpt-4-turbo-preview"
    OPENAI_TEMPERATURE: float = 0.7
    EMBEDDING_MODEL: str = "text-embedding-3-large"
    EMBEDDING_DIMENSIONS: int = 3072

    # Pinecone配置
    PINECONE_API_KEY: str
    PINECONE_ENVIRONMENT: str = "gcp-starter"
    PINECONE_INDEX_NAME: str = "nutrition-rag"
    PINECONE_NAMESPACE: str = "nutrition"

    # 文档处理配置
    CHUNK_SIZE: int = 1000
    CHUNK_OVERLAP: int = 200
    MAX_DOCUMENT_SIZE_MB: int = 10

    # 检索配置
    TOP_K_RESULTS: int = 5
    SIMILARITY_THRESHOLD: float = 0.7
    RERANK_TOP_K: int = 3

    # 缓存配置
    ENABLE_CACHE: bool = True
    CACHE_TTL_SECONDS: int = 3600

    # 日志配置
    LOG_LEVEL: str = "INFO"
    LOG_FORMAT: str = "json"

    class Config:
        env_file = ".env"
        case_sensitive = True

settings = Settings()
Code collapsed

2. RAG核心实现

app/core/rag.py

code
from typing import List, Dict, Any, Optional, AsyncIterator
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage

from app.core.retriever import Retriever
from app.core.generator import Generator
from app.core.memory import ConversationMemory
from app.models.chat import ChatRequest, ChatResponse, SourceDocument
from app.prompts.system_prompts import NUTRITION_SYSTEM_PROMPT
from app.config import settings
from app.utils.logger import get_logger

logger = get_logger(__name__)

class RAGPipeline:
    """RAG管道 - 完整的检索增强生成流程"""

    def __init__(self):
        self.retriever = Retriever()
        self.generator = Generator()
        self.memory = ConversationMemory()
        self.llm = ChatOpenAI(
            model=settings.OPENAI_MODEL,
            temperature=settings.OPENAI_TEMPERATURE,
            streaming=True
        )

    async def process(
        self,
        request: ChatRequest
    ) -> AsyncIterator[Dict[str, Any]]:
        """
        处理聊天请求(流式)

        Args:
            request: 聊天请求

        Yields:
            流式响应片段
        """
        try:
            # 1. 获取对话历史
            history = await self.memory.get_history(request.session_id)

            # 2. 检索相关文档
            logger.info(f"检索文档: {request.question}")
            sources = await self.retriever.retrieve(
                query=request.question,
                top_k=settings.TOP_K_RESULTS,
                filter=self._build_filter(request)
            )

            if not sources:
                yield {
                    "type": "error",
                    "content": "抱歉,我没有找到相关信息。请尝试其他问题或咨询专业营养师。"
                }
                return

            # 3. 构建提示
            prompt = await self._build_prompt(
                question=request.question,
                sources=sources,
                history=history
            )

            # 4. 流式生成回答
            logger.info("开始生成回答")
            full_response = ""

            async for chunk in self._stream_generate(prompt):
                full_response += chunk
                yield {
                    "type": "token",
                    "content": chunk
                }

            # 5. 保存对话历史
            await self.memory.add_message(
                session_id=request.session_id,
                role="user",
                content=request.question
            )
            await self.memory.add_message(
                session_id=request.session_id,
                role="assistant",
                content=full_response
            )

            # 6. 返回完整响应
            yield {
                "type": "complete",
                "response": ChatResponse(
                    answer=full_response,
                    sources=[SourceDocument(**s) for s in sources],
                    session_id=request.session_id
                )
            }

        except Exception as e:
            logger.error(f"RAG处理错误: {e}")
            yield {
                "type": "error",
                "content": f"处理请求时出错: {str(e)}"
            }

    async def _build_prompt(
        self,
        question: str,
        sources: List[Dict[str, Any]],
        history: List[Dict[str, str]]
    ) -> List[Dict[str, Any]]:
        """构建LLM提示"""
        # 构建上下文
        context = self._format_context(sources)

        # 构建历史
        history_text = self._format_history(history)

        # 构建消息列表
        messages = [
            SystemMessage(content=NUTRITION_SYSTEM_PROMPT),
            SystemMessage(content=f"""参考信息:
{context}

注意事项:
1. 以上信息来源于权威营养资料和科学研究
2. 引用时请注明来源编号
3. 如信息不足,明确告知用户
4. 医疗问题建议咨询专业医生"""),
        ]

        # 添加历史对话
        if history_text:
            messages.append(
                SystemMessage(content=f"历史对话:\n{history_text}")
            )

        # 添加当前问题
        messages.append(HumanMessage(content=question))

        return messages

    async def _stream_generate(
        self,
        messages: List[Dict[str, Any]]
    ) -> AsyncIterator[str]:
        """流式生成文本"""
        async for chunk in self.llm.astream(messages):
            if hasattr(chunk, 'content'):
                yield chunk.content
            elif isinstance(chunk, str):
                yield chunk

    def _format_context(self, sources: List[Dict[str, Any]]) -> str:
        """格式化检索上下文"""
        parts = []
        for i, source in enumerate(sources, 1):
            metadata = source.get('metadata', {})
            parts.append(
                f"[来源{i}] {source['content']}\n"
                f"出处: {metadata.get('title', '未知')}"
                f" | {metadata.get('source', '')}"
                f"{f' | 第{metadata.get('page', 1)}页' if metadata.get('page') else ''}"
                f" | 相关度: {source['score']:.2f}"
            )
        return "\n\n".join(parts)

    def _format_history(self, history: List[Dict[str, str]]) -> str:
        """格式化对话历史"""
        if not history:
            return ""
        lines = []
        for msg in history[-settings.MAX_HISTORY_LENGTH:]:
            role = "用户" if msg['role'] == 'user' else "助手"
            lines.append(f"{role}: {msg['content']}")
        return "\n".join(lines)

    def _build_filter(self, request: ChatRequest) -> Optional[Dict[str, Any]]:
        """构建检索过滤条件"""
        filters = {}
        # 可根据请求添加过滤条件
        # 例如: 特定的文档类型、时间范围等
        return filters if filters else None
Code collapsed

3. 检索器实现

app/core/retriever.py

code
from typing import List, Dict, Any, Optional
from pinecone import Pinecone

from app.vectorstore.pinecone_store import PineconeVectorStore
from app.vectorstore.embeddings import EmbeddingService
from app.config import settings
from app.utils.logger import get_logger

logger = get_logger(__name__)

class Retriever:
    """文档检索器"""

    def __init__(self):
        self.vector_store = PineconeVectorStore()
        self.embedding_service = EmbeddingService()

    async def retrieve(
        self,
        query: str,
        top_k: int = 5,
        filter: Optional[Dict[str, Any]] = None
    ) -> List[Dict[str, Any]]:
        """
        检索相关文档

        Args:
            query: 查询文本
            top_k: 返回结果数量
            filter: 过滤条件

        Returns:
            检索结果列表
        """
        try:
            # 生成查询向量
            logger.info(f"生成查询嵌入: {query[:50]}...")
            query_embedding = await self.embedding_service.embed_query(query)

            # 向量检索
            logger.info("执行向量检索...")
            results = await self.vector_store.similarity_search(
                vector=query_embedding,
                top_k=top_k,
                filter=filter,
                namespace=settings.PINECONE_NAMESPACE
            )

            # 过滤低相似度结果
            filtered_results = [
                r for r in results
                if r['score'] >= settings.SIMILARITY_THRESHOLD
            ]

            logger.info(f"检索完成: {len(filtered_results)}个结果")
            return filtered_results

        except Exception as e:
            logger.error(f"检索失败: {e}")
            return []

    async def retrieve_with_rerank(
        self,
        query: str,
        top_k: int = 10,
        rerank_top_k: int = 3
    ) -> List[Dict[str, Any]]:
        """
        检索并重排序(提高精度)

        Args:
            query: 查询文本
            top_k: 初始检索数量
            rerank_top_k: 重排序后返回数量

        Returns:
            重排序后的检索结果
        """
        # 初始检索
        results = await self.retrieve(query, top_k=top_k)

        if not results:
            return []

        # 使用Cross-Encoder进行重排序
        reranked_results = await self._rerank(query, results)

        return reranked_results[:rerank_top_k]

    async def _rerank(
        self,
        query: str,
        results: List[Dict[str, Any]]
    ) -> List[Dict[str, Any]]:
        """
        使用Cross-Encoder重排序

        实际项目中可以使用专门的rerank模型
        这里简化实现
        """
        # 简单的关键词匹配重排序
        query_words = set(query.lower().split())
        for result in results:
            content_words = set(result['content'].lower().split())
            overlap = len(query_words & content_words)
            result['rerank_score'] = result['score'] * 0.7 + (overlap / len(query_words)) * 0.3

        return sorted(results, key=lambda x: x['rerank_score'], reverse=True)

    async def hybrid_search(
        self,
        query: str,
        top_k: int = 5
    ) -> List[Dict[str, Any]]:
        """
        混合检索(向量+关键词)

        结合语义检索和关键词检索的优势
        """
        # 向量检索
        vector_results = await self.retrieve(query, top_k=top_k)

        # 关键词检索(实际项目中可使用Elasticsearch)
        keyword_results = await self._keyword_search(query, top_k=top_k)

        # 合并和重排序
        combined = self._merge_results(vector_results, keyword_results)

        return combined[:top_k]

    async def _keyword_search(
        self,
        query: str,
        top_k: int
    ) -> List[Dict[str, Any]]:
        """关键词检索(简化实现)"""
        # 实际项目中应使用专门的关键词搜索引擎
        # 这里返回空列表作为占位
        return []

    def _merge_results(
        self,
        vector_results: List[Dict],
        keyword_results: List[Dict]
    ) -> List[Dict]:
        """合并检索结果"""
        # 简化的合并算法
        all_results = {}

        for result in vector_results:
            doc_id = result.get('id', '')
            all_results[doc_id] = {**result, 'vector_score': result['score']}

        for result in keyword_results:
            doc_id = result.get('id', '')
            if doc_id in all_results:
                all_results[doc_id]['keyword_score'] = result['score']
                all_results[doc_id]['combined_score'] = (
                    all_results[doc_id]['vector_score'] * 0.7 +
                    result['score'] * 0.3
                )
            else:
                all_results[doc_id] = {
                    **result,
                    'combined_score': result['score']
                }

        return sorted(
            all_results.values(),
            key=lambda x: x.get('combined_score', x.get('score', 0)),
            reverse=True
        )
Code collapsed

4. 文档导入脚本

scripts/ingest_documents.py

code
#!/usr/bin/env python3
"""
文档导入脚本 - 将营养资料导入向量数据库
"""
import asyncio
import sys
from pathlib import Path

# 添加项目路径
sys.path.insert(0, str(Path(__file__).parent.parent))

from app.processors.document_loader import DocumentLoader
from app.processors.text_splitter import TextSplitter
from app.vectorstore.embeddings import EmbeddingService
from app.vectorstore.pinecone_store import PineconeVectorStore
from app.config import settings
from app.utils.logger import get_logger

logger = get_logger(__name__)

async def ingest_documents(knowledge_dir: str):
    """
    导入文档到向量数据库

    Args:
        knowledge_dir: 知识库目录路径
    """
    logger.info(f"开始导入文档: {knowledge_dir}")

    # 1. 加载文档
    loader = DocumentLoader()
    documents = loader.load_directory(knowledge_dir)
    logger.info(f"加载了 {len(documents)} 个文档")

    if not documents:
        logger.warning("没有找到文档")
        return

    # 2. 分割文本
    splitter = TextSplitter(
        chunk_size=settings.CHUNK_SIZE,
        chunk_overlap=settings.CHUNK_OVERLAP
    )
    chunks = splitter.split_documents(documents)
    logger.info(f"分割为 {len(chunks)} 个文本块")

    # 3. 生成嵌入
    embedding_service = EmbeddingService()
    logger.info("生成嵌入向量...")

    embeddings = []
    for chunk in chunks:
        embedding = await embedding_service.embed_text(chunk['content'])
        embeddings.append(embedding)

    logger.info(f"生成了 {len(embeddings)} 个嵌入向量")

    # 4. 上传到Pinecone
    vector_store = PineconeVectorStore()
    logger.info("上传到向量数据库...")

    await vector_store.add_vectors(
        vectors=embeddings,
        metadata=chunks,
        namespace=settings.PINECONE_NAMESPACE
    )

    logger.info("文档导入完成!")

async def main():
    if len(sys.argv) < 2:
        print("用法: python ingest_documents.py <knowledge_directory>")
        sys.exit(1)

    knowledge_dir = sys.argv[1]

    if not Path(knowledge_dir).exists():
        logger.error(f"目录不存在: {knowledge_dir}")
        sys.exit(1)

    await ingest_documents(knowledge_dir)

if __name__ == "__main__":
    asyncio.run(main())
Code collapsed

API端点总结

端点方法描述
/api/v1/chat/askPOST提问(流式)
/api/v1/chat/ask-streamPOST提问(SSE流式)
/api/v1/documents/ingestPOST导入文档
/api/v1/documents/listGET列出文档
/api/v1/healthGET健康检查

部署建议

  1. Pinecone索引配置:

    • 使用生产环境
    • 设置合适的副本数量
    • 启用数据备份
  2. 性能优化:

    • 实现查询缓存
    • 使用批处理嵌入
    • 优化提示词长度
  3. 监控和日志:

    • 跟踪检索质量
    • 监控LLM调用
    • 记录用户反馈

通过本完整指南,你已掌握构建生产级RAG营养聊天机器人的全部技术细节。这个系统可直接用于健康咨询场景,也可扩展到其他专业领域。

#

文章标签

FastAPI
RAG
LangChain
Pinecone
AI应用
健康科技

觉得这篇文章有帮助?

立即体验康心伴,开始您的健康管理之旅