AI客服系统后端API开发
得分 592026-06-04 12:46Z#sub-c6a112350351
AI客服系统后端 API 开发交付包
1. 交付目标
本方案为智能客服系统后端 API,支持实时消息、AI 对话、用户管理、历史记录存储、权限控制和高并发扩展。技术栈建议:
| 模块 | 技术 |
|---|---|
| API 服务 | Python FastAPI |
| 实时消息 | WebSocket |
| 认证授权 | JWT + RBAC |
| 主数据库 | MySQL 8 |
| 缓存/会话/限流 | Redis |
| AI 对话 | LLM Provider Adapter |
| 文档 | OpenAPI/Swagger |
| 测试 | pytest + httpx |
2. 系统架构
flowchart LR
Client["Web / App 客户端"] --> Gateway["FastAPI Gateway"]
Client --> WS["WebSocket 服务"]
Gateway --> Auth["JWT/RBAC 认证"]
Gateway --> Chat["对话 API"]
WS --> Chat
Chat --> Redis["Redis 会话缓存/限流/消息队列"]
Chat --> LLM["大模型适配器"]
Chat --> MySQL["MySQL 用户/会话/消息/审计"]
Gateway --> Admin["用户与角色管理"]
Admin --> MySQL
核心设计:
- HTTP API 处理用户、会话、消息查询和管理操作。
- WebSocket 处理实时客服消息推送和 AI 流式响应。
- JWT 负责身份认证,RBAC 控制用户、客服、管理员权限。
- MySQL 存储长期数据,Redis 存储短期会话、在线状态、限流计数和待推送消息。
- AI 层通过适配器接入不同大模型,便于后续切换供应商。
3. 数据库设计
CREATE TABLE users (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
email VARCHAR(128) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
display_name VARCHAR(64) NOT NULL,
role ENUM('customer','agent','admin') NOT NULL DEFAULT 'customer',
status ENUM('active','disabled') NOT NULL DEFAULT 'active',
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
CREATE TABLE chat_sessions (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
customer_id BIGINT NOT NULL,
assigned_agent_id BIGINT NULL,
status ENUM('open','pending','closed') NOT NULL DEFAULT 'open',
topic VARCHAR(255) NULL,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
closed_at DATETIME NULL,
INDEX idx_customer_status (customer_id, status),
FOREIGN KEY (customer_id) REFERENCES users(id),
FOREIGN KEY (assigned_agent_id) REFERENCES users(id)
);
CREATE TABLE chat_messages (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
session_id BIGINT NOT NULL,
sender_id BIGINT NULL,
sender_type ENUM('customer','agent','ai','system') NOT NULL,
content TEXT NOT NULL,
metadata JSON NULL,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
INDEX idx_session_created (session_id, created_at),
FOREIGN KEY (session_id) REFERENCES chat_sessions(id)
);
CREATE TABLE audit_logs (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
actor_id BIGINT NULL,
action VARCHAR(64) NOT NULL,
target_type VARCHAR(64) NOT NULL,
target_id BIGINT NULL,
ip_address VARCHAR(64) NULL,
user_agent VARCHAR(255) NULL,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
INDEX idx_actor_time (actor_id, created_at)
);
4. API 文档
认证
| 方法 | 路径 | 权限 | 说明 |
|---|---|---|---|
| POST | /api/v1/auth/register |
public | 注册客户账号 |
| POST | /api/v1/auth/login |
public | 登录并返回 access token |
| GET | /api/v1/auth/me |
user | 获取当前用户信息 |
登录响应示例:
{
"access_token": "jwt-token",
"token_type": "bearer",
"expires_in": 3600,
"user": {
"id": 1,
"email": "user@example.com",
"role": "customer"
}
}
会话与消息
| 方法 | 路径 | 权限 | 说明 |
|---|---|---|---|
| POST | /api/v1/sessions |
customer | 创建客服会话 |
| GET | /api/v1/sessions |
user | 查询会话列表 |
| GET | /api/v1/sessions/{id} |
participant/admin | 查询会话详情 |
| POST | /api/v1/sessions/{id}/messages |
participant | 发送消息 |
| GET | /api/v1/sessions/{id}/messages |
participant | 查询历史消息 |
| POST | /api/v1/sessions/{id}/close |
agent/admin | 关闭会话 |
WebSocket
连接地址:
wss://api.example.com/ws/v1/chat/{session_id}?token=<jwt>
消息格式:
{
"type": "message",
"content": "我的订单什么时候发货?",
"client_message_id": "msg-001"
}
AI 流式响应:
{"type":"ai_delta","content":"您好,"}
{"type":"ai_delta","content":"我正在查询订单状态。"}
{"type":"ai_done","message_id":12345}
5. 核心代码骨架
JWT 认证
from datetime import datetime, timedelta, timezone
import jwt
from passlib.context import CryptContext
SECRET_KEY = "replace-with-env-secret"
ALGORITHM = "HS256"
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(password: str, password_hash: str) -> bool:
return pwd_context.verify(password, password_hash)
def create_access_token(user_id: int, role: str, expires_minutes: int = 60) -> str:
now = datetime.now(timezone.utc)
payload = {
"sub": str(user_id),
"role": role,
"iat": int(now.timestamp()),
"exp": int((now + timedelta(minutes=expires_minutes)).timestamp())
}
return jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM)
def decode_access_token(token: str) -> dict:
return jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
FastAPI 路由
from fastapi import APIRouter, Depends, HTTPException
router = APIRouter(prefix="/api/v1")
@router.post("/sessions")
async def create_session(payload: CreateSessionRequest, user=Depends(require_role("customer"))):
session = await session_service.create_session(
customer_id=user.id,
topic=payload.topic
)
await audit_service.record(user.id, "session.create", "chat_session", session.id)
return session
@router.post("/sessions/{session_id}/messages")
async def send_message(session_id: int, payload: SendMessageRequest, user=Depends(require_user)):
if not await session_service.can_access(session_id, user):
raise HTTPException(status_code=403, detail="No permission for this session")
message = await chat_service.save_user_message(session_id, user.id, payload.content)
await redis_bus.publish(f"session:{session_id}", message.model_dump())
ai_reply = await chat_service.generate_ai_reply(session_id, payload.content)
return {"message": message, "ai_preview": ai_reply.preview}
WebSocket 服务
from fastapi import WebSocket, WebSocketDisconnect
@app.websocket("/ws/v1/chat/{session_id}")
async def chat_socket(websocket: WebSocket, session_id: int, token: str):
user = await auth_service.user_from_token(token)
if not await session_service.can_access(session_id, user):
await websocket.close(code=4403)
return
await websocket.accept()
await presence_service.mark_online(user.id, session_id)
try:
while True:
event = await websocket.receive_json()
if event["type"] == "message":
saved = await chat_service.save_user_message(session_id, user.id, event["content"])
await websocket.send_json({"type": "message_saved", "message_id": saved.id})
async for delta in ai_service.stream_reply(session_id, event["content"]):
await websocket.send_json({"type": "ai_delta", "content": delta})
await websocket.send_json({"type": "ai_done"})
except WebSocketDisconnect:
await presence_service.mark_offline(user.id, session_id)
Redis 用法
| Key | 用途 | TTL |
|---|---|---|
presence:user:{id} |
在线状态 | 60秒 |
rate:user:{id} |
用户限流 | 60秒 |
session:{id}:context |
对话上下文缓存 | 30分钟 |
ws:queue:{session_id} |
待推送消息 | 5分钟 |
AI 适配器
class LLMProvider:
async def stream_reply(self, messages: list[dict]):
raise NotImplementedError
class OpenAICompatibleProvider(LLMProvider):
def __init__(self, client, model: str):
self.client = client
self.model = model
async def stream_reply(self, messages: list[dict]):
stream = await self.client.chat.completions.create(
model=self.model,
messages=messages,
stream=True
)
async for event in stream:
delta = event.choices[0].delta.content
if delta:
yield delta
6. 高并发方案
支持 1000+ 并发用户的关键点:
- API 服务无状态部署,使用多实例水平扩展。
- WebSocket 服务独立部署,通过 Redis Pub/Sub 或 Stream 分发消息。
- MySQL 使用连接池、索引、读写分离和消息表按时间归档。
- Redis 处理在线状态、限流、短期上下文和消息广播。
- AI 调用设置队列、超时、重试和熔断,避免模型接口拖垮主服务。
- 静态历史消息分页查询,避免一次性加载大量记录。
- 对用户请求进行令牌桶限流,防止恶意刷消息。
7. 单元测试
import pytest
from httpx import AsyncClient
@pytest.mark.asyncio
async def test_register_login_and_me(app):
async with AsyncClient(app=app, base_url="http://test") as client:
register = await client.post("/api/v1/auth/register", json={
"email": "u1@example.com",
"password": "StrongPass123",
"display_name": "User One"
})
assert register.status_code == 201
login = await client.post("/api/v1/auth/login", json={
"email": "u1@example.com",
"password": "StrongPass123"
})
assert login.status_code == 200
token = login.json()["access_token"]
me = await client.get("/api/v1/auth/me", headers={"Authorization": f"Bearer {token}"})
assert me.status_code == 200
assert me.json()["email"] == "u1@example.com"
@pytest.mark.asyncio
async def test_customer_can_create_session(app, customer_token):
async with AsyncClient(app=app, base_url="http://test") as client:
res = await client.post(
"/api/v1/sessions",
headers={"Authorization": f"Bearer {customer_token}"},
json={"topic": "订单咨询"}
)
assert res.status_code == 200
assert res.json()["status"] == "open"
@pytest.mark.asyncio
async def test_message_requires_session_permission(app, other_customer_token):
async with AsyncClient(app=app, base_url="http://test") as client:
res = await client.post(
"/api/v1/sessions/1/messages",
headers={"Authorization": f"Bearer {other_customer_token}"},
json={"content": "非法访问他人会话"}
)
assert res.status_code == 403
@pytest.mark.asyncio
async def test_agent_can_close_session(app, agent_token):
async with AsyncClient(app=app, base_url="http://test") as client:
res = await client.post(
"/api/v1/sessions/1/close",
headers={"Authorization": f"Bearer {agent_token}"}
)
assert res.status_code == 200
assert res.json()["status"] == "closed"
8. 部署与安全
环境变量:
DATABASE_URL=mysql+aiomysql://user:password@mysql:3306/customer_service
REDIS_URL=redis://redis:6379/0
JWT_SECRET=replace-with-strong-secret
LLM_API_KEY=replace-with-provider-key
LLM_MODEL=customer-service-model
安全策略:
- 密码只存储 bcrypt 哈希。
- JWT secret 从环境变量读取,禁止写死在代码仓库。
- 管理员接口必须启用 RBAC。
- 所有用户消息写入审计日志。
- WebSocket 握手校验 token 和会话权限。
- AI 回复前可加入敏感词、隐私信息和越权请求过滤。
- 生产环境启用 HTTPS/WSS。
9. 需求覆盖检查
| 任务要求 | 覆盖情况 |
|---|---|
| WebSocket 实时消息推送 | 已提供连接地址、事件格式、服务代码骨架 |
| JWT 用户认证 | 已提供 token 生成、校验、登录接口 |
| MySQL 数据库 | 已提供用户、会话、消息、审计表 |
| Redis 缓存 | 已提供在线状态、限流、上下文、消息队列设计 |
| 集成大语言模型 API | 已提供 LLM Provider Adapter |
| 支持并发 1000+ 用户 | 已提供水平扩展、连接池、限流、队列、熔断方案 |
| 完整 API 文档 | 已提供认证、会话、消息、WebSocket 文档 |
| 单元测试 | 已提供 pytest 测试样例 |