FastAPI+Celery+SpaCy构建AI伴侣聊天机器人完整教程
概述
AI伴侣聊天机器人是健康应用的核心功能,能够:
- 情感支持:识别用户情绪状态
- 健康提醒:基于对话提供个性化建议
- 行为追踪:监测用户的心理健康趋势
- 危机干预:检测风险信号并提供帮助
本教程将构建一个完整的AI伴侣系统:
- FastAPI:高性能异步Web框架
- Celery:分布式任务队列
- SpaCy:工业级NLP库
系统架构
code
┌─────────────────────────────────────────────────────────┐
│ 前端 (React Native) │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ 聊天界面 │ │ 情绪日记 │ │ 健康仪表板 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└────────────────────┬────────────────────────────────────┘
│ WebSocket + HTTP
┌────────────────────▼────────────────────────────────────┐
│ FastAPI 后端 │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ WebSocket │ │ REST API │ │ 认证中间件 │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└────────────────────┬────────────────────────────────────┘
│
┌────────────────────▼────────────────────────────────────┐
│ Celery 任务队列 │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ NLP分析 │ │ 情感分析 │ │ 定期提醒 │ │
│ │ (SpaCy) │ │ (ML模型) │ │ (Scheduler) │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────┘
│
┌────────────────────▼────────────────────────────────────┐
│ Redis + PostgreSQL │
└─────────────────────────────────────────────────────────┘
Code collapsed
环境设置
项目结构
code
ai-companion/
├── backend/
│ ├── app/
│ │ ├── main.py # FastAPI应用
│ │ ├── config.py # 配置
│ │ ├── models.py # 数据模型
│ │ ├── api/
│ │ │ ├── chat.py # 聊天API
│ │ │ └── health.py # 健康数据API
│ │ ├── services/
│ │ │ ├── nlp_service.py # NLP服务
│ │ │ └── emotion_service.py # 情感分析服务
│ │ └── tasks/
│ │ ├── celery_app.py # Celery配置
│ │ └── chat_tasks.py # 异步聊天任务
├── frontend/ # React Native应用
├── requirements.txt
└── docker-compose.yml
Code collapsed
安装依赖
code
# requirements.txt
fastapi==0.104.0
uvicorn[standard]==0.24.0
celery==5.3.4
redis==5.0.1
spacy==3.7.2
transformers==4.35.0
torch==2.1.0
sqlalchemy==2.0.23
psycopg2-binary==2.9.9
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
python-multipart==0.0.6
websockets==12.0
aiofiles==23.2.1
# 安装
pip install -r requirements.txt
# 下载SpaCy模型
python -m spacy download zh_core_web_sm
python -m spacy download en_core_web_sm
Code collapsed
FastAPI应用
主应用配置
code
# backend/app/main.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy.orm import Session
import redis
import json
from .config import settings
from .models import get_db
from .api import chat, health
app = FastAPI(
title="AI Companion API",
description="智能健康伴侣聊天机器人",
version="1.0.0"
)
# CORS配置
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # 生产环境应限制来源
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Redis连接
redis_client = redis.Redis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
db=0,
decode_responses=True
)
# 包含路由
app.include_router(chat.router, prefix="/api/chat", tags=["chat"])
app.include_router(health.router, prefix="/api/health", tags=["health"])
@app.on_event("startup")
async def startup_event():
"""应用启动时执行"""
# 初始化NLP模型
from .services.nlp_service import NLPService
await NLPService.initialize()
@app.on_event("shutdown")
async def shutdown_event():
"""应用关闭时执行"""
redis_client.close()
@app.get("/")
async def root():
return {
"message": "AI Companion API",
"version": "1.0.0",
"status": "running"
}
@app.get("/health")
async def health_check():
return {"status": "healthy"}
Code collapsed
配置文件
code
# backend/app/config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
# 数据库配置
DATABASE_URL: str = "postgresql://user:password@localhost/ai_companion"
# Redis配置
REDIS_HOST: str = "localhost"
REDIS_PORT: int = 6379
# JWT配置
SECRET_KEY: str = "your-secret-key-here"
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
# SpaCy配置
SPACY_MODEL_ZH: str = "zh_core_web_sm"
SPACY_MODEL_EN: str = "en_core_web_sm"
# Celery配置
CELERY_BROKER_URL: str = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND: str = "redis://localhost:6379/0"
# AI模型配置
EMOTION_MODEL_PATH: str = "./models/emotion_classifier"
class Config:
env_file = ".env"
settings = Settings()
Code collapsed
WebSocket聊天接口
聊天端点
code
# backend/app/api/chat.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends
from sqlalchemy.orm import Session
from datetime import datetime
import json
from ..models import get_db, User, Message
from ..services.nlp_service import NLPService
from ..tasks.chat_tasks import process_message_async
router = APIRouter()
class ConnectionManager:
"""WebSocket连接管理器"""
def __init__(self):
self.active_connections: dict[str, WebSocket] = {}
async def connect(self, user_id: str, websocket: WebSocket):
await websocket.accept()
self.active_connections[user_id] = websocket
def disconnect(self, user_id: str):
if user_id in self.active_connections:
del self.active_connections[user_id]
async def send_message(self, user_id: str, message: dict):
if user_id in self.active_connections:
await self.active_connections[user_id].send_json(message)
manager = ConnectionManager()
@router.websocket("/ws/{user_id}")
async def websocket_endpoint(websocket: WebSocket, user_id: str):
await manager.connect(user_id, websocket)
try:
while True:
# 接收用户消息
data = await websocket.receive_text()
message_data = json.loads(data)
# 立即回执确认
await manager.send_message(user_id, {
"type": "ack",
"message_id": message_data.get("message_id"),
"timestamp": datetime.now().isoformat()
})
# 异步处理消息
process_message_async.delay(
user_id=user_id,
message=message_data["message"],
message_id=message_data.get("message_id"),
language=message_data.get("language", "zh")
)
except WebSocketDisconnect:
manager.disconnect(user_id)
Code collapsed
NLP服务
code
# backend/app/services/nlp_service.py
from spacy import load
from typing import Dict, List, Optional
import re
class NLPService:
"""NLP处理服务"""
nlp_zh = None
nlp_en = None
@classmethod
async def initialize(cls):
"""初始化NLP模型"""
cls.nlp_zh = load("zh_core_web_sm")
cls.nlp_en = load("en_core_web_sm")
@classmethod
def process_message(cls, text: str, language: str = "zh") -> Dict:
"""
处理用户消息
返回:
{
"intent": str, # 意图
"entities": List[Dict], # 实体
"sentiment": str, # 情感倾向
"keywords": List[str] # 关键词
}
"""
nlp = cls.nlp_zh if language == "zh" else cls.nlp_en
doc = nlp(text)
# 提取实体
entities = []
for ent in doc.ents:
entities.append({
"text": ent.text,
"label": ent.label_,
"start": ent.start_char,
"end": ent.end_char
})
# 提取关键词(去除停用词和标点)
keywords = [
token.text for token in doc
if not token.is_stop
and not token.is_punct
and len(token.text) > 1
]
# 简单情感分析(基于词性)
sentiment = cls._analyze_sentiment(doc, language)
# 意图识别
intent = cls._classify_intent(text, language)
return {
"intent": intent,
"entities": entities,
"sentiment": sentiment,
"keywords": keywords[:10] # 最多10个关键词
}
@classmethod
def _analyze_sentiment(cls, doc, language: str) -> str:
"""分析情感倾向"""
# 简化版情感分析
positive_words = {
"zh": ["开心", "高兴", "快乐", "喜欢", "爱", "好"],
"en": ["happy", "good", "great", "love", "enjoy"]
}
negative_words = {
"zh": ["难过", "伤心", "痛苦", "讨厌", "不好", "抑郁"],
"en": ["sad", "bad", "hate", "depressed", "anxious"]
}
tokens = [token.text.lower() for token in doc]
positive_count = sum(1 for t in tokens if t in positive_words[language])
negative_count = sum(1 for t in tokens if t in negative_words[language])
if positive_count > negative_count:
return "positive"
elif negative_count > positive_count:
return "negative"
else:
return "neutral"
@classmethod
def _classify_intent(cls, text: str, language: str) -> str:
"""分类用户意图"""
# 定义意图模式
intent_patterns = {
"greeting": {
"zh": [r"你好", r"嗨", r"早上好", r"晚上好"],
"en": [r"hello", r"hi", r"hey", r"good morning"]
},
"help_request": {
"zh": [r"帮助", r"怎么", r"如何", r"怎么办"],
"en": [r"help", r"how to", r"what should"]
},
"emotion_sharing": {
"zh": [r"我觉得", r"我感觉", r"我今天"],
"en": [r"i feel", r"i'm feeling", r"today i"]
},
"crisis": {
"zh": [r"想死", r"自杀", r"结束生命"],
"en": [r"want to die", r"suicide", r"end it all"]
}
}
text_lower = text.lower()
for intent, patterns in intent_patterns.items():
for pattern in patterns[language]:
if re.search(pattern, text_lower):
return intent
return "general"
Code collapsed
Celery异步任务
Celery配置
code
# backend/app/tasks/celery_app.py
from celery import Celery
from ..config import settings
celery_app = Celery(
"ai_companion",
broker=settings.CELERY_BROKER_URL,
backend=settings.CELERY_RESULT_BACKEND
)
celery_app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Asia/Shanghai',
enable_utc=True,
task_track_started=True,
task_time_limit=30 * 60, # 30分钟超时
task_soft_time_limit=25 * 60,
worker_prefetch_multiplier=1,
worker_max_tasks_per_child=1000,
)
# 定期任务配置
celery_app.conf.beat_schedule = {
'daily-checkin': {
'task': 'app.tasks.chat_tasks.send_daily_checkin',
'schedule': crontab(hour=9, minute=0), # 每天9点
},
'weekly-summary': {
'task': 'app.tasks.chat_tasks.generate_weekly_summary',
'schedule': crontab(hour=20, minute=0, day_of_week=5), # 周五晚8点
},
}
Code collapsed
聊天处理任务
code
# backend/app/tasks/chat_tasks.py
from celery import shared_task
from datetime import datetime
import uuid
from ..services.nlp_service import NLPService
from ..services.emotion_service import EmotionService
from ..models import SessionLocal, Message, User
from .celery_app import celery_app
@shared_task(bind=True)
def process_message_async(self, user_id: str, message: str,
message_id: Optional[str], language: str = "zh"):
"""
异步处理用户消息
步骤:
1. NLP分析(意图、实体、情感)
2. 查询对话历史
3. 生成回复
4. 检测危机信号
5. 保存消息
"""
db = SessionLocal()
try:
# 1. NLP分析
nlp_result = NLPService.process_message(message, language)
# 2. 保存用户消息
user_msg = Message(
id=message_id or str(uuid.uuid4()),
user_id=user_id,
content=message,
is_from_user=True,
intent=nlp_result["intent"],
sentiment=nlp_result["sentiment"],
timestamp=datetime.now()
)
db.add(user_msg)
# 3. 检测危机
if nlp_result["intent"] == "crisis":
response = _handle_crisis(user_id, db)
# 4. 生成回复
else:
response = _generate_response(
user_id,
nlp_result,
db
)
# 5. 保存回复
bot_msg = Message(
id=str(uuid.uuid4()),
user_id=user_id,
content=response["text"],
is_from_user=False,
intent=response.get("intent"),
timestamp=datetime.now()
)
db.add(bot_msg)
db.commit()
# 6. 通过WebSocket发送回复
from ..api.chat import manager
import asyncio
asyncio.run(manager.send_message(user_id, {
"type": "message",
"content": response["text"],
"suggestions": response.get("suggestions", []),
"timestamp": datetime.now().isoformat()
}))
return {"status": "success", "response": response}
except Exception as e:
db.rollback()
raise self.retry(exc=e, countdown=60, max_retries=3)
finally:
db.close()
def _generate_response(user_id: str, nlp_result: Dict, db) -> Dict:
"""生成回复"""
intent = nlp_result["intent"]
sentiment = nlp_result["sentiment"]
# 根据意图和情感生成回复
if intent == "greeting":
responses = {
"zh": ["你好!今天感觉怎么样?", "嗨!很高兴见到你。"],
"en": ["Hello! How are you feeling today?", "Hi! Great to see you."]
}
text = responses["zh"][0]
elif intent == "emotion_sharing":
if sentiment == "negative":
text = "我理解你现在的感受。愿意多告诉我一些吗?"
elif sentiment == "positive":
text = "听起来你今天心情不错!是什么让你开心呢?"
else:
text = "谢谢你的分享。"
elif intent == "help_request":
text = "我可以帮你记录情绪、提供健康建议。你需要什么帮助?"
else:
# 获取对话历史
history = db.query(Message).filter_by(
user_id=user_id
).order_by(Message.timestamp.desc()).limit(5).all()
# 使用上下文生成回复
text = _contextual_response(history, nlp_result)
return {
"text": text,
"intent": intent,
"suggestions": _generate_suggestions(intent, sentiment)
}
def _handle_crisis(user_id: str, db) -> Dict:
"""处理危机情况"""
# 1. 记录危机事件
# 2. 立即发送热线信息
response = {
"text": """我听到你现在的痛苦。你很重要,有人愿意帮助你。
危机干预热线:400-161-9995
心理咨询:可在应用内预约专业咨询师
请一定给自己一个机会,寻求帮助是勇敢的选择。""",
"intent": "crisis_response",
"urgent": True
}
return response
def _generate_suggestions(intent: str, sentiment: str) -> List[str]:
"""生成回复建议"""
if sentiment == "negative":
return [
"记录情绪日记",
"进行呼吸练习",
"查看本周情绪趋势"
]
else:
return [
"分享今天的开心事",
"查看健康建议",
"设置提醒"
]
@shared_task
def send_daily_checkin():
"""发送每日签到"""
# 获取所有活跃用户
db = SessionLocal()
users = db.query(User).filter(User.is_active == True).all()
for user in users:
# 发送签到提醒
pass # 实现推送逻辑
db.close()
@shared_task
def generate_weekly_summary():
"""生成周报"""
# 生成用户情绪周报
pass
Code collapsed
数据模型
SQLAlchemy模型
code
# backend/app/models.py
from sqlalchemy import Column, String, DateTime, Boolean, Integer, Text, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
from datetime import datetime
from .config import settings
Base = declarative_base()
class User(Base):
__tablename__ = "users"
id = Column(String, primary_key=True)
email = Column(String, unique=True, nullable=False)
hashed_password = Column(String, nullable=False)
name = Column(String)
language = Column(String, default="zh")
is_active = Column(Boolean, default=True)
created_at = Column(DateTime, default=datetime.now)
messages = relationship("Message", back_populates="user")
emotion_logs = relationship("EmotionLog", back_populates="user")
class Message(Base):
__tablename__ = "messages"
id = Column(String, primary_key=True)
user_id = Column(String, ForeignKey("users.id"), nullable=False)
content = Column(Text, nullable=False)
is_from_user = Column(Boolean, nullable=False)
intent = Column(String)
sentiment = Column(String)
timestamp = Column(DateTime, default=datetime.now)
user = relationship("User", back_populates="messages")
class EmotionLog(Base):
__tablename__ = "emotion_logs"
id = Column(Integer, primary_key=True, autoincrement=True)
user_id = Column(String, ForeignKey("users.id"), nullable=False)
emotion = Column(String, nullable=False)
intensity = Column(Integer) # 1-10
notes = Column(Text)
timestamp = Column(DateTime, default=datetime.now)
user = relationship("User", back_populates="emotion_logs")
# 数据库连接
from sqlalchemy import create_engine
engine = create_engine(settings.DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
Code collapsed
Docker部署
docker-compose.yml
code
version: '3.8'
services:
web:
build: ./backend
command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
volumes:
- ./backend:/app
ports:
- "8000:8000"
depends_on:
- redis
- db
environment:
- DATABASE_URL=postgresql://postgres:password@db/ai_companion
- REDIS_HOST=redis
celery:
build: ./backend
command: celery -A app.tasks.celery_app worker --loglevel=info
volumes:
- ./backend:/app
depends_on:
- redis
- db
environment:
- DATABASE_URL=postgresql://postgres:password@db/ai_companion
- REDIS_HOST=redis
celery-beat:
build: ./backend
command: celery -A app.tasks.celery_app beat --loglevel=info
volumes:
- ./backend:/app
depends_on:
- redis
- db
environment:
- DATABASE_URL=postgresql://postgres:password@db/ai_companion
- REDIS_HOST=redis
redis:
image: redis:7-alpine
ports:
- "6379:6379"
db:
image: postgres:15-alpine
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=password
- POSTGRES_DB=ai_companion
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
volumes:
postgres_data:
Code collapsed
关键要点
- FastAPI提供高性能异步接口:WebSocket实时通信
- Celery处理耗时任务:NLP分析、消息生成
- SpaCy进行NLP处理:意图识别、实体提取
- 危机干预机制:检测风险信号并提供帮助
- WebSocket实时通信:即时消息推送
常见问题
Celery任务不执行?
检查:
- Redis连接正常
- Worker进程运行中
- 任务路由配置正确
SpaCy模型加载慢?
使用懒加载或模型缓存:
code
from functools import lru_cache
@lru_cache(maxsize=1)
def get_nlp_model(model_name):
return load(model_name)
Code collapsed
如何扩展多语言?
添加对应语言的SpaCy模型和意图模式:
code
# 多语言支持
language_support = {
"zh": {"model": "zh_core_web_sm", "patterns": {...}},
"en": {"model": "en_core_web_sm", "patterns": {...}},
"es": {"model": "es_core_news_sm", "patterns": {...}}
}
Code collapsed
参考资料
- FastAPI官方文档
- Celery文档
- SpaCy文档
- WebSocket协议规范
- 心理健康危机干预指南
发布日期:2026年3月8日 最后更新:2026年3月8日