康心伴Logo
康心伴WellAlly
Health

FastAPI+Celery+SpaCy构建AI伴侣聊天机器人完整教程 | WellAlly康心伴

5 分钟阅读

FastAPI+Celery+SpaCy构建AI伴侣聊天机器人完整教程

概述

AI伴侣聊天机器人是健康应用的核心功能,能够:

  • 情感支持:识别用户情绪状态
  • 健康提醒:基于对话提供个性化建议
  • 行为追踪:监测用户的心理健康趋势
  • 危机干预:检测风险信号并提供帮助

本教程将构建一个完整的AI伴侣系统

  • FastAPI:高性能异步Web框架
  • Celery:分布式任务队列
  • SpaCy:工业级NLP库

系统架构

code
┌─────────────────────────────────────────────────────────┐
│                      前端 (React Native)                │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐ │
│  │  聊天界面     │  │  情绪日记     │  │  健康仪表板   │ │
│  └──────────────┘  └──────────────┘  └──────────────┘ │
└────────────────────┬────────────────────────────────────┘
                     │ WebSocket + HTTP
┌────────────────────▼────────────────────────────────────┐
│                   FastAPI 后端                          │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐ │
│  │  WebSocket   │  │   REST API   │  │  认证中间件    │ │
│  └──────────────┘  └──────────────┘  └──────────────┘ │
└────────────────────┬────────────────────────────────────┘
                     │
┌────────────────────▼────────────────────────────────────┐
│                  Celery 任务队列                         │
│  ┌──────────────┐  ┌──────────────┐  ┌──────────────┐ │
│  │  NLP分析     │  │  情感分析     │  │  定期提醒     │ │
│  │  (SpaCy)     │  │  (ML模型)     │  │  (Scheduler) │ │
│  └──────────────┘  └──────────────┘  └──────────────┘ │
└─────────────────────────────────────────────────────────┘
                     │
┌────────────────────▼────────────────────────────────────┐
│                  Redis + PostgreSQL                     │
└─────────────────────────────────────────────────────────┘
Code collapsed

环境设置

项目结构

code
ai-companion/
├── backend/
│   ├── app/
│   │   ├── main.py              # FastAPI应用
│   │   ├── config.py            # 配置
│   │   ├── models.py            # 数据模型
│   │   ├── api/
│   │   │   ├── chat.py          # 聊天API
│   │   │   └── health.py        # 健康数据API
│   │   ├── services/
│   │   │   ├── nlp_service.py   # NLP服务
│   │   │   └── emotion_service.py # 情感分析服务
│   │   └── tasks/
│   │       ├── celery_app.py    # Celery配置
│   │       └── chat_tasks.py    # 异步聊天任务
├── frontend/                    # React Native应用
├── requirements.txt
└── docker-compose.yml
Code collapsed

安装依赖

code
# requirements.txt
fastapi==0.104.0
uvicorn[standard]==0.24.0
celery==5.3.4
redis==5.0.1
spacy==3.7.2
transformers==4.35.0
torch==2.1.0
sqlalchemy==2.0.23
psycopg2-binary==2.9.9
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
python-multipart==0.0.6
websockets==12.0
aiofiles==23.2.1

# 安装
pip install -r requirements.txt

# 下载SpaCy模型
python -m spacy download zh_core_web_sm
python -m spacy download en_core_web_sm
Code collapsed

FastAPI应用

主应用配置

code
# backend/app/main.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
import redis
import json

from .config import settings
from .models import get_db
from .api import chat, health

app = FastAPI(
    title="AI Companion API",
    description="智能健康伴侣聊天机器人",
    version="1.0.0"
)

# CORS配置
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],  # 生产环境应限制来源
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Redis连接
redis_client = redis.Redis(
    host=settings.REDIS_HOST,
    port=settings.REDIS_PORT,
    db=0,
    decode_responses=True
)

# 包含路由
app.include_router(chat.router, prefix="/api/chat", tags=["chat"])
app.include_router(health.router, prefix="/api/health", tags=["health"])

@app.on_event("startup")
async def startup_event():
    """应用启动时执行"""
    # 初始化NLP模型
    from .services.nlp_service import NLPService
    await NLPService.initialize()

@app.on_event("shutdown")
async def shutdown_event():
    """应用关闭时执行"""
    redis_client.close()

@app.get("/")
async def root():
    return {
        "message": "AI Companion API",
        "version": "1.0.0",
        "status": "running"
    }

@app.get("/health")
async def health_check():
    return {"status": "healthy"}
Code collapsed

配置文件

code
# backend/app/config.py
from pydantic_settings import BaseSettings

class Settings(BaseSettings):
    # 数据库配置
    DATABASE_URL: str = "postgresql://user:password@localhost/ai_companion"

    # Redis配置
    REDIS_HOST: str = "localhost"
    REDIS_PORT: int = 6379

    # JWT配置
    SECRET_KEY: str = "your-secret-key-here"
    ALGORITHM: str = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES: int = 30

    # SpaCy配置
    SPACY_MODEL_ZH: str = "zh_core_web_sm"
    SPACY_MODEL_EN: str = "en_core_web_sm"

    # Celery配置
    CELERY_BROKER_URL: str = "redis://localhost:6379/0"
    CELERY_RESULT_BACKEND: str = "redis://localhost:6379/0"

    # AI模型配置
    EMOTION_MODEL_PATH: str = "./models/emotion_classifier"

    class Config:
        env_file = ".env"

settings = Settings()
Code collapsed

WebSocket聊天接口

聊天端点

code
# backend/app/api/chat.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends
from sqlalchemy.orm import Session
from datetime import datetime
import json

from ..models import get_db, User, Message
from ..services.nlp_service import NLPService
from ..tasks.chat_tasks import process_message_async

router = APIRouter()

class ConnectionManager:
    """WebSocket连接管理器"""
    def __init__(self):
        self.active_connections: dict[str, WebSocket] = {}

    async def connect(self, user_id: str, websocket: WebSocket):
        await websocket.accept()
        self.active_connections[user_id] = websocket

    def disconnect(self, user_id: str):
        if user_id in self.active_connections:
            del self.active_connections[user_id]

    async def send_message(self, user_id: str, message: dict):
        if user_id in self.active_connections:
            await self.active_connections[user_id].send_json(message)

manager = ConnectionManager()

@router.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
    await manager.connect(user_id, websocket)

    try:
        while True:
            # 接收用户消息
            data = await websocket.receive_text()
            message_data = json.loads(data)

            # 立即回执确认
            await manager.send_message(user_id, {
                "type": "ack",
                "message_id": message_data.get("message_id"),
                "timestamp": datetime.now().isoformat()
            })

            # 异步处理消息
            process_message_async.delay(
                user_id=user_id,
                message=message_data["message"],
                message_id=message_data.get("message_id"),
                language=message_data.get("language", "zh")
            )

    except WebSocketDisconnect:
        manager.disconnect(user_id)
Code collapsed

NLP服务

code
# backend/app/services/nlp_service.py
from spacy import load
from typing import Dict, List, Optional
import re

class NLPService:
    """NLP处理服务"""
    nlp_zh = None
    nlp_en = None

    @classmethod
    async def initialize(cls):
        """初始化NLP模型"""
        cls.nlp_zh = load("zh_core_web_sm")
        cls.nlp_en = load("en_core_web_sm")

    @classmethod
    def process_message(cls, text: str, language: str = "zh") -> Dict:
        """
        处理用户消息

        返回:
            {
                "intent": str,         # 意图
                "entities": List[Dict], # 实体
                "sentiment": str,      # 情感倾向
                "keywords": List[str]  # 关键词
            }
        """
        nlp = cls.nlp_zh if language == "zh" else cls.nlp_en
        doc = nlp(text)

        # 提取实体
        entities = []
        for ent in doc.ents:
            entities.append({
                "text": ent.text,
                "label": ent.label_,
                "start": ent.start_char,
                "end": ent.end_char
            })

        # 提取关键词(去除停用词和标点)
        keywords = [
            token.text for token in doc
            if not token.is_stop
            and not token.is_punct
            and len(token.text) > 1
        ]

        # 简单情感分析(基于词性)
        sentiment = cls._analyze_sentiment(doc, language)

        # 意图识别
        intent = cls._classify_intent(text, language)

        return {
            "intent": intent,
            "entities": entities,
            "sentiment": sentiment,
            "keywords": keywords[:10]  # 最多10个关键词
        }

    @classmethod
    def _analyze_sentiment(cls, doc, language: str) -> str:
        """分析情感倾向"""
        # 简化版情感分析
        positive_words = {
            "zh": ["开心", "高兴", "快乐", "喜欢", "爱", "好"],
            "en": ["happy", "good", "great", "love", "enjoy"]
        }

        negative_words = {
            "zh": ["难过", "伤心", "痛苦", "讨厌", "不好", "抑郁"],
            "en": ["sad", "bad", "hate", "depressed", "anxious"]
        }

        tokens = [token.text.lower() for token in doc]

        positive_count = sum(1 for t in tokens if t in positive_words[language])
        negative_count = sum(1 for t in tokens if t in negative_words[language])

        if positive_count > negative_count:
            return "positive"
        elif negative_count > positive_count:
            return "negative"
        else:
            return "neutral"

    @classmethod
    def _classify_intent(cls, text: str, language: str) -> str:
        """分类用户意图"""
        # 定义意图模式
        intent_patterns = {
            "greeting": {
                "zh": [r"你好", r"嗨", r"早上好", r"晚上好"],
                "en": [r"hello", r"hi", r"hey", r"good morning"]
            },
            "help_request": {
                "zh": [r"帮助", r"怎么", r"如何", r"怎么办"],
                "en": [r"help", r"how to", r"what should"]
            },
            "emotion_sharing": {
                "zh": [r"我觉得", r"我感觉", r"我今天"],
                "en": [r"i feel", r"i'm feeling", r"today i"]
            },
            "crisis": {
                "zh": [r"想死", r"自杀", r"结束生命"],
                "en": [r"want to die", r"suicide", r"end it all"]
            }
        }

        text_lower = text.lower()

        for intent, patterns in intent_patterns.items():
            for pattern in patterns[language]:
                if re.search(pattern, text_lower):
                    return intent

        return "general"
Code collapsed

Celery异步任务

Celery配置

code
# backend/app/tasks/celery_app.py
from celery import Celery
from ..config import settings

celery_app = Celery(
    "ai_companion",
    broker=settings.CELERY_BROKER_URL,
    backend=settings.CELERY_RESULT_BACKEND
)

celery_app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='Asia/Shanghai',
    enable_utc=True,
    task_track_started=True,
    task_time_limit=30 * 60,  # 30分钟超时
    task_soft_time_limit=25 * 60,
    worker_prefetch_multiplier=1,
    worker_max_tasks_per_child=1000,
)

# 定期任务配置
celery_app.conf.beat_schedule = {
    'daily-checkin': {
        'task': 'app.tasks.chat_tasks.send_daily_checkin',
        'schedule': crontab(hour=9, minute=0),  # 每天9点
    },
    'weekly-summary': {
        'task': 'app.tasks.chat_tasks.generate_weekly_summary',
        'schedule': crontab(hour=20, minute=0, day_of_week=5),  # 周五晚8点
    },
}
Code collapsed

聊天处理任务

code
# backend/app/tasks/chat_tasks.py
from celery import shared_task
from datetime import datetime
import uuid

from ..services.nlp_service import NLPService
from ..services.emotion_service import EmotionService
from ..models import SessionLocal, Message, User
from .celery_app import celery_app

@shared_task(bind=True)
def process_message_async(self, user_id: str, message: str,
                         message_id: Optional[str], language: str = "zh"):
    """
    异步处理用户消息

    步骤:
    1. NLP分析(意图、实体、情感)
    2. 查询对话历史
    3. 生成回复
    4. 检测危机信号
    5. 保存消息
    """
    db = SessionLocal()

    try:
        # 1. NLP分析
        nlp_result = NLPService.process_message(message, language)

        # 2. 保存用户消息
        user_msg = Message(
            id=message_id or str(uuid.uuid4()),
            user_id=user_id,
            content=message,
            is_from_user=True,
            intent=nlp_result["intent"],
            sentiment=nlp_result["sentiment"],
            timestamp=datetime.now()
        )
        db.add(user_msg)

        # 3. 检测危机
        if nlp_result["intent"] == "crisis":
            response = _handle_crisis(user_id, db)

        # 4. 生成回复
        else:
            response = _generate_response(
                user_id,
                nlp_result,
                db
            )

        # 5. 保存回复
        bot_msg = Message(
            id=str(uuid.uuid4()),
            user_id=user_id,
            content=response["text"],
            is_from_user=False,
            intent=response.get("intent"),
            timestamp=datetime.now()
        )
        db.add(bot_msg)
        db.commit()

        # 6. 通过WebSocket发送回复
        from ..api.chat import manager
        import asyncio
        asyncio.run(manager.send_message(user_id, {
            "type": "message",
            "content": response["text"],
            "suggestions": response.get("suggestions", []),
            "timestamp": datetime.now().isoformat()
        }))

        return {"status": "success", "response": response}

    except Exception as e:
        db.rollback()
        raise self.retry(exc=e, countdown=60, max_retries=3)

    finally:
        db.close()

def _generate_response(user_id: str, nlp_result: Dict, db) -> Dict:
    """生成回复"""
    intent = nlp_result["intent"]
    sentiment = nlp_result["sentiment"]

    # 根据意图和情感生成回复
    if intent == "greeting":
        responses = {
            "zh": ["你好!今天感觉怎么样?", "嗨!很高兴见到你。"],
            "en": ["Hello! How are you feeling today?", "Hi! Great to see you."]
        }
        text = responses["zh"][0]

    elif intent == "emotion_sharing":
        if sentiment == "negative":
            text = "我理解你现在的感受。愿意多告诉我一些吗?"
        elif sentiment == "positive":
            text = "听起来你今天心情不错!是什么让你开心呢?"
        else:
            text = "谢谢你的分享。"

    elif intent == "help_request":
        text = "我可以帮你记录情绪、提供健康建议。你需要什么帮助?"

    else:
        # 获取对话历史
        history = db.query(Message).filter_by(
            user_id=user_id
        ).order_by(Message.timestamp.desc()).limit(5).all()

        # 使用上下文生成回复
        text = _contextual_response(history, nlp_result)

    return {
        "text": text,
        "intent": intent,
        "suggestions": _generate_suggestions(intent, sentiment)
    }

def _handle_crisis(user_id: str, db) -> Dict:
    """处理危机情况"""
    # 1. 记录危机事件
    # 2. 立即发送热线信息
    response = {
        "text": """我听到你现在的痛苦。你很重要,有人愿意帮助你。

危机干预热线:400-161-9995
心理咨询:可在应用内预约专业咨询师

请一定给自己一个机会,寻求帮助是勇敢的选择。""",
        "intent": "crisis_response",
        "urgent": True
    }

    return response

def _generate_suggestions(intent: str, sentiment: str) -> List[str]:
    """生成回复建议"""
    if sentiment == "negative":
        return [
            "记录情绪日记",
            "进行呼吸练习",
            "查看本周情绪趋势"
        ]
    else:
        return [
            "分享今天的开心事",
            "查看健康建议",
            "设置提醒"
        ]

@shared_task
def send_daily_checkin():
    """发送每日签到"""
    # 获取所有活跃用户
    db = SessionLocal()
    users = db.query(User).filter(User.is_active == True).all()

    for user in users:
        # 发送签到提醒
        pass  # 实现推送逻辑

    db.close()

@shared_task
def generate_weekly_summary():
    """生成周报"""
    # 生成用户情绪周报
    pass
Code collapsed

数据模型

SQLAlchemy模型

code
# backend/app/models.py
from sqlalchemy import Column, String, DateTime, Boolean, Integer, Text, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
from datetime import datetime
from .config import settings

Base = declarative_base()

class User(Base):
    __tablename__ = "users"

    id = Column(String, primary_key=True)
    email = Column(String, unique=True, nullable=False)
    hashed_password = Column(String, nullable=False)
    name = Column(String)
    language = Column(String, default="zh")
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime, default=datetime.now)

    messages = relationship("Message", back_populates="user")
    emotion_logs = relationship("EmotionLog", back_populates="user")

class Message(Base):
    __tablename__ = "messages"

    id = Column(String, primary_key=True)
    user_id = Column(String, ForeignKey("users.id"), nullable=False)
    content = Column(Text, nullable=False)
    is_from_user = Column(Boolean, nullable=False)
    intent = Column(String)
    sentiment = Column(String)
    timestamp = Column(DateTime, default=datetime.now)

    user = relationship("User", back_populates="messages")

class EmotionLog(Base):
    __tablename__ = "emotion_logs"

    id = Column(Integer, primary_key=True, autoincrement=True)
    user_id = Column(String, ForeignKey("users.id"), nullable=False)
    emotion = Column(String, nullable=False)
    intensity = Column(Integer)  # 1-10
    notes = Column(Text)
    timestamp = Column(DateTime, default=datetime.now)

    user = relationship("User", back_populates="emotion_logs")

# 数据库连接
from sqlalchemy import create_engine
engine = create_engine(settings.DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()
Code collapsed

Docker部署

docker-compose.yml

code
version: '3.8'

services:
  web:
    build: ./backend
    command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
    volumes:
      - ./backend:/app
    ports:
      - "8000:8000"
    depends_on:
      - redis
      - db
    environment:
      - DATABASE_URL=postgresql://postgres:password@db/ai_companion
      - REDIS_HOST=redis

  celery:
    build: ./backend
    command: celery -A app.tasks.celery_app worker --loglevel=info
    volumes:
      - ./backend:/app
    depends_on:
      - redis
      - db
    environment:
      - DATABASE_URL=postgresql://postgres:password@db/ai_companion
      - REDIS_HOST=redis

  celery-beat:
    build: ./backend
    command: celery -A app.tasks.celery_app beat --loglevel=info
    volumes:
      - ./backend:/app
    depends_on:
      - redis
      - db
    environment:
      - DATABASE_URL=postgresql://postgres:password@db/ai_companion
      - REDIS_HOST=redis

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

  db:
    image: postgres:15-alpine
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=ai_companion
    volumes:
      - postgres_data:/var/lib/postgresql/data
    ports:
      - "5432:5432"

volumes:
  postgres_data:
Code collapsed

关键要点

  1. FastAPI提供高性能异步接口:WebSocket实时通信
  2. Celery处理耗时任务:NLP分析、消息生成
  3. SpaCy进行NLP处理:意图识别、实体提取
  4. 危机干预机制:检测风险信号并提供帮助
  5. WebSocket实时通信:即时消息推送

常见问题

Celery任务不执行?

检查:

  1. Redis连接正常
  2. Worker进程运行中
  3. 任务路由配置正确

SpaCy模型加载慢?

使用懒加载或模型缓存:

code
from functools import lru_cache

@lru_cache(maxsize=1)
def get_nlp_model(model_name):
    return load(model_name)
Code collapsed

如何扩展多语言?

添加对应语言的SpaCy模型和意图模式:

code
# 多语言支持
language_support = {
    "zh": {"model": "zh_core_web_sm", "patterns": {...}},
    "en": {"model": "en_core_web_sm", "patterns": {...}},
    "es": {"model": "es_core_news_sm", "patterns": {...}}
}
Code collapsed

参考资料

  • FastAPI官方文档
  • Celery文档
  • SpaCy文档
  • WebSocket协议规范
  • 心理健康危机干预指南

发布日期:2026年3月8日 最后更新:2026年3月8日

免责声明: 本内容仅供教育参考,不能替代专业医疗建议。请咨询医生获取个性化诊断和治疗方案。

#

文章标签

FastAPI
Celery
SpaCy
聊天机器人
AI伴侣
异步任务

觉得这篇文章有帮助?

立即体验康心伴,开始您的健康管理之旅