跳到主要内容

4 篇博文 含有标签「FastAPI」

查看所有标签

用抽象类统一多搜索 API,错误返回而非抛异常

· 阅读需 5 分钟

在为客户构建 AI Agent 平台时遇到此问题:需要支持多个搜索提供商(Tavily、Serper、Brave、Bing),同时确保工具调用失败时不会中断 Agent 对话流程。

TL;DR

  1. 定义 SearchProvider 抽象基类 + SearchResult 数据模型,统一接口和输出格式
  2. 每个提供商继承基类,实现 search() 方法,内部做响应字段映射
  3. 关键设计:错误时返回包含错误信息的 SearchResult 对象,而非抛异常

问题现象

直接调用不同搜索 API 的问题:

# Tavily: POST 请求,results[].url
response = await client.post("https://api.tavily.com/search", ...)

# Serper: POST 请求,organic[].link
response = await client.post("https://google.serper.dev/search", ...)

# Brave: GET 请求,web.results[].description
response = await client.get("https://api.search.brave.com/res/v1/web/search", ...)

# Bing: GET 请求,webPages.value[].snippet
response = await client.get("https://api.bing.microsoft.com/v7.0/search", ...)

问题

  1. 请求方式、认证头、响应结构各不相同
  2. 切换提供商需要改调用方代码
  3. raise Exception 会中断 AI Agent 的流式对话

根因

  1. 缺少抽象层:调用方直接依赖具体实现,违反依赖倒置原则
  2. 错误处理策略不统一:异常会沿调用栈向上传播,在流式场景下导致整个对话中断

对于 AI Agent 工具调用场景,Agent 需要根据错误信息决定是否重试、换用其他工具、或向用户说明情况——而不是直接崩溃。

解决方案

1. 定义抽象基类和数据模型

# base.py
from abc import ABC, abstractmethod
from typing import List
from pydantic import BaseModel


class SearchResult(BaseModel):
"""Unified search result."""
title: str
link: str
snippet: str


class SearchProvider(ABC):
"""Base class for search providers."""

def __init__(self, api_key: str):
self.api_key = api_key

@abstractmethod
async def search(self, query: str, max_results: int = 5) -> List[SearchResult]:
"""Execute search and return results."""
pass

2. 实现具体提供商

Tavily(AI 优化搜索,支持 rate limit / quota 错误码):

# tavily.py
import httpx
import logging
from typing import List
from .base import SearchProvider, SearchResult

logger = logging.getLogger(__name__)


class TavilySearch(SearchProvider):
"""Tavily Search API implementation."""

async def search(self, query: str, max_results: int = 5) -> List[SearchResult]:
try:
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.post(
"https://api.tavily.com/search",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"query": query,
"max_results": max_results,
"search_depth": "basic"
}
)

# 错误时返回 SearchResult,而非 raise
if response.status_code == 429:
return [SearchResult(
title="Rate Limited",
link="",
snippet="Search quota exceeded. Please try again later."
)]

if response.status_code == 401:
return [SearchResult(
title="Auth Error",
link="",
snippet="Search API key is invalid."
)]

if response.status_code == 402:
return [SearchResult(
title="Quota Exceeded",
link="",
snippet="Monthly search quota depleted."
)]

response.raise_for_status()
data = response.json()

# 字段映射:Tavily 的 url -> 统一的 link
results = []
for item in data.get("results", [])[:max_results]:
results.append(SearchResult(
title=item.get("title", ""),
link=item.get("url", ""),
snippet=item.get("content", "")
))
return results

except httpx.TimeoutException:
logger.warning(f"Tavily API timeout: {query[:50]}")
return [SearchResult(title="Timeout", link="", snippet="Search timed out.")]
except Exception as e:
logger.error(f"Tavily search error: {e}")
return [SearchResult(title="Error", link="", snippet=f"Search failed: {str(e)}")]

Serper(Google Search API):

# serper.py
class SerperSearch(SearchProvider):
"""Serper (Google Search) API implementation."""

async def search(self, query: str, max_results: int = 5) -> List[SearchResult]:
try:
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.post(
"https://google.serper.dev/search",
headers={"X-API-KEY": self.api_key, "Content-Type": "application/json"},
json={"q": query, "num": max_results}
)

if response.status_code == 401:
return [SearchResult(title="Auth Error", link="", snippet="Serper API key is invalid.")]

response.raise_for_status()
data = response.json()

# 字段映射:Serper 的 organic[].link -> 统一的 link
results = []
for item in data.get("organic", [])[:max_results]:
results.append(SearchResult(
title=item.get("title", ""),
link=item.get("link", ""),
snippet=item.get("snippet", "")
))
return results

except httpx.TimeoutException:
return [SearchResult(title="Timeout", link="", snippet="Search timed out.")]
except Exception as e:
return [SearchResult(title="Error", link="", snippet=f"Search failed: {str(e)}")]

BraveBing 实现类似,区别在于请求方式和响应字段映射。

3. 调用方使用

# 使用时只需依赖抽象
async def execute_search(provider: SearchProvider, query: str) -> List[SearchResult]:
results = await provider.search(query)

# 检查是否有错误(通过 title 或 snippet 判断)
if results and not results[0].link:
error_msg = results[0].snippet
# Agent 可以根据错误信息决定下一步操作
return f"Search failed: {error_msg}"

return results


# 切换提供商只需换实例
provider = TavilySearch(api_key="xxx")
# provider = SerperSearch(api_key="xxx")
results = await execute_search(provider, "Python async best practices")

关键设计决策

决策原因
错误返回 SearchResult 而非 raiseAI Agent 对话是流式流程,异常会中断整个对话
用 Pydantic BaseModel 定义输出自动校验 + IDE 提示 + JSON 序列化
抽象类用 ABC 而非 Protocol需要共享 __init__ 逻辑(api_key 存储)
超时统一 15 秒搜索是用户体验关键路径,不能太慢

对类似需求感兴趣?联系合作

修复 FastAPI SSE 客户端断开时的 CancelledError

· 阅读需 2 分钟

在为客户构建 AI 客服自动化系统时遇到此问题,记录根因与解法。

TL;DR

FastAPI 的 StreamingResponse 在客户端断开连接时会取消生成器任务,导致 asyncio.CancelledError。正确做法是在生成器中捕获该异常并 re-raise,否则会导致异常日志污染和资源泄漏。

问题现象

使用 SSE(Server-Sent Events)实现流式对话时,客户端断开连接后,服务端日志出现大量异常:

ERROR:    Exception in ASGI application
...
asyncio.CancelledError

代码原本写法:

async def event_stream():
async for event in engine.execute(body.message):
yield event

return StreamingResponse(event_stream(), media_type="text/event-stream")

根因

FastAPI/Starlette 的 StreamingResponse 在客户端断开时,会取消正在执行的生成器任务。Python 的 async for 循环被取消时会抛出 asyncio.CancelledError

如果不处理这个异常,它会向上传播,被 ASGI 服务器捕获并记录为错误日志。更严重的是,生成器内的资源(如数据库连接、HTTP 客户端)可能无法正确释放。

解决方案

在生成器内部捕获 CancelledError,记录日志后 必须 re-raise

import asyncio
import logging

logger = logging.getLogger(__name__)

async def event_stream():
try:
async for event in engine.execute(body.message):
yield event
except asyncio.CancelledError:
# 客户端断开连接,正常行为
logger.info("Client disconnected")
raise # 必须 re-raise 以正确终止生成器

return StreamingResponse(event_stream(), media_type="text/event-stream")

为什么必须 re-raise?

CancelledError 是 Python 取消协程的标准机制。捕获后如果不 re-raise:

  1. 生成器不会正确终止
  2. StreamingResponse 认为响应正常完成
  3. 可能导致资源泄漏

FAQ

Q: FastAPI SSE 客户端断开后为什么报 CancelledError?

A: 这是 Python asyncio 的设计行为。客户端断开时,Starlette 取消生成器任务,触发 CancelledError。正确处理方式是捕获并 re-raise。

Q: 捕获 CancelledError 后不 re-raise 会怎样?

A: 生成器无法正确终止,可能导致数据库连接、HTTP 客户端等资源泄漏。同时 StreamingResponse 会误认为响应正常完成。

Q: 如何区分正常断开和异常断开?

A: CancelledError 本身就是正常断开的信号。如果需要在断开时执行清理逻辑(如更新状态),在 except 块中处理后再 re-raise。

解决 Pydantic v2 ORM mode 报错 model_config 被覆盖

· 阅读需 2 分钟

TL;DR

Pydantic v2 不再支持 class Config,需要用 model_config = ConfigDict(from_attributes=True)。如果你的模型有 model_config 字段,必须重命名避免与保留字冲突。

问题现象

报错 1:class Config 不生效

from pydantic import BaseModel

class AgentResponse(BaseModel):
id: str
name: str

class Config:
orm_mode = True # v1 写法
PydanticUserError: `orm_mode` is not a valid config option. Did you mean `from_attributes`?

报错 2:model_config 字段冲突

class Agent(BaseModel):
id: str
model_config: dict # 业务字段,存储 LLM 配置

model_config = ConfigDict(from_attributes=True)
# TypeError: 'dict' object is not callable

模型中有个业务字段叫 model_config(存储 LLM 配置),与 Pydantic v2 保留字冲突。

根因

1. Pydantic v2 配置语法变化

Pydantic v2 使用 model_config 作为配置属性名,不再支持嵌套的 class Config

Pydantic v1Pydantic v2
class Config: orm_mode = Truemodel_config = ConfigDict(from_attributes=True)
class Config: schema_extra = {...}model_config = ConfigDict(json_schema_extra={...})

2. model_config 是保留字

model_config 在 Pydantic v2 中是特殊属性,不能同时作为业务字段名使用。

解决方案

1. 更新 ORM mode 配置

from pydantic import BaseModel, ConfigDict

class AgentResponse(BaseModel):
model_config = ConfigDict(from_attributes=True) # 新写法

id: str
name: str

2. 重命名冲突字段

将业务字段 model_config 改为 llm_config(或任意非保留名):

# models/agent.py
class Agent(BaseModel):
__tablename__ = "agent_agents"

id: str
llm_config: dict # 改名,避免冲突

# schemas/agent.py
class AgentResponse(BaseModel):
model_config = ConfigDict(from_attributes=True)

agent_id: str
llm_config: LlmConfig # 与模型保持一致

3. 数据库迁移(如需要)

如果数据库字段也要改:

# alembic/versions/xxx_rename_model_config.py
def upgrade():
op.alter_column('agent_agents', 'model_config', new_column_name='llm_config')

def downgrade():
op.alter_column('agent_agents', 'llm_config', new_column_name='model_config')

FAQ

Q: Pydantic v2 的 orm_mode 改成什么了?

A: 改为 from_attributes=True,配置方式从 class Config 变成 model_config = ConfigDict(...)

Q: 为什么 model_config 字段报错?

A: model_config 是 Pydantic v2 的保留属性名,用于配置模型行为。如果业务代码中有同名字段,需要重命名。

Q: ConfigDict 还有哪些常用选项?

A: from_attributes (ORM mode)、json_schema_extra (schema 扩展)、str_strip_whitespace (自动去空格)、validate_assignment (赋值时验证)。

集成 Supabase Auth 到 FastAPI 的三个坑

· 阅读需 4 分钟

在为客户构建 SaaS 认证系统时遇到此问题,记录根因与解法。

TL;DR

Supabase Auth + FastAPI 集成有三个常见坑:JWKS 路径不是标准路径、ES256 签名需转换为 DER 格式、用户首次登录时本地数据库无记录。本文提供完整解决方案。

问题现象

坑 1:JWKS 路径 404

GET https://xxx.supabase.co/.well-known/jwks.json
# 404 Not Found

所有 JWT 验证请求返回 401 Invalid Token。

坑 2:ES256 签名验证失败

from jose import jwt
payload = jwt.decode(token, key, algorithms=["ES256"])
# JWTError: Signature verification failed

明明公钥是对的,但签名验证总是失败。

坑 3:用户首次登录无本地记录

# 创建 Agent 时
agent = Agent(user_id=current_user["user_id"], ...)
db.add(agent)
# ForeignKeyViolation: user_id 不存在

Supabase Auth 用户通过了 JWT 验证,但本地 agent_users 表没有该用户记录。

根因

坑 1:Supabase 非标准 JWKS 路径

标准 OAuth/OIDC 服务器 JWKS 在 /.well-known/jwks.json,但 Supabase 把认证服务放在 /auth/v1/ 子路径下:

标准路径Supabase 路径
/.well-known/jwks.json/auth/v1/.well-known/jwks.json

坑 2:ES256 原始签名 vs DER 格式

Supabase JWT 使用 ES256(P-256 曲线)签名。JWT 中的签名是 raw 格式r || s 拼接,64 字节),但 Python cryptography 库的 verify() 方法需要 DER-encoded ASN.1 格式

Raw:     r (32 bytes) || s (32 bytes) = 64 bytes
DER: 0x30 <len> 0x02 <r_len> <r> 0x02 <s_len> <s>

python-josejwt.decode() 在处理 ES256 时有兼容性问题,需要手动验证签名。

坑 3:认证与数据分离

Supabase Auth 是独立服务,用户注册/登录后只存在于 Supabase 的 auth.users 表。本地数据库的 agent_users 表需要手动同步。

解决方案

1. 正确的 JWKS URL

# config.py
class Settings(BaseSettings):
supabase_url: str = "https://xxx.supabase.co"

@property
def jwks_url(self) -> str:
# 关键:/auth/v1/ 前缀
return f"{self.supabase_url}/auth/v1/.well-known/jwks.json"

2. ES256 签名验证(完整代码)

import json
import base64
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric.utils import encode_dss_signature

def _base64url_decode(data: str) -> bytes:
"""Base64url 解码,自动补 padding"""
rem = len(data) % 4
if rem > 0:
data += "=" * (4 - rem)
return base64.urlsafe_b64decode(data)

def _raw_to_der_signature(raw_sig: bytes) -> bytes:
"""将 raw ECDSA 签名 (r||s) 转为 DER 格式"""
# P-256: r 和 s 各 32 字节
r = int.from_bytes(raw_sig[:32], "big")
s = int.from_bytes(raw_sig[32:], "big")
return encode_dss_signature(r, s)

def verify_es256_signature(token: str, public_key_jwk: dict) -> dict:
"""验证 ES256 JWT 签名,返回 payload"""
parts = token.split(".")
if len(parts) != 3:
raise ValueError("Invalid JWT format")

header_b64, payload_b64, signature_b64 = parts

# 1. 构建 EC 公钥
x = _base64url_decode(public_key_jwk["x"])
y = _base64url_decode(public_key_jwk["y"])
x_int = int.from_bytes(x, "big")
y_int = int.from_bytes(y, "big")

public_key = ec.EllipticCurvePublicNumbers(
x_int, y_int, ec.SECP256R1()
).public_key(default_backend())

# 2. 验证签名
message = f"{header_b64}.{payload_b64}".encode()
raw_signature = _base64url_decode(signature_b64)
der_signature = _raw_to_der_signature(raw_signature)

public_key.verify(
der_signature,
message,
ec.ECDSA(hashes.SHA256())
)

# 3. 返回 payload
return json.loads(_base64url_decode(payload_b64))

3. 用户同步服务

# app/services/user_service.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.user import AgentUser

async def ensure_user_exists(
db: AsyncSession,
user_id: str,
email: str,
plan: str = "free"
) -> AgentUser:
"""确保用户存在于本地数据库(从 Supabase Auth 同步)"""
# 检查是否存在
result = await db.execute(
select(AgentUser).where(AgentUser.user_id == user_id)
)
user = result.scalar_one_or_none()

if user:
return user

# 创建新用户
user = AgentUser(
user_id=user_id,
email=email,
plan=plan,
role="user"
)
db.add(user)
await db.commit()
await db.refresh(user)
return user

4. 在创建资源前调用

# app/routers/agents.py
@router.post("/")
async def create_agent(
input: CreateAgentInput,
db: AsyncSession = Depends(get_db),
current_user: dict = Depends(get_current_user)
):
# 关键:确保用户存在
user = await ensure_user_exists(
db,
user_id=current_user["user_id"],
email=current_user["email"],
plan=current_user["plan"]
)

# 现在可以安全创建 Agent
agent = Agent(
user_id=user.user_id,
name=input.name,
llm_config=input.llm_config.model_dump()
)
...

FAQ

Q: Supabase JWT 验证返回 404 怎么办?

A: Supabase 的 JWKS 路径是 /auth/v1/.well-known/jwks.json,不是标准的 /.well-known/jwks.json。检查你的 JWKS URL 配置。

Q: python-jose 验证 ES256 签名失败怎么解决?

A: python-jose 对 ES256 支持不完善。使用 cryptography 库手动验证,需要将 JWT 的 raw 签名(r||s 64字节)转换为 DER 格式。

Q: FastAPI 如何同步 Supabase Auth 用户到本地数据库?

A: 在需要用户记录的 API(如创建资源)入口处调用 ensure_user_exists(),从 JWT 提取用户信息并同步到本地表。

Q: Supabase JWT 中的 user_id 在哪个字段?

A: sub 字段包含用户 UUID,email 字段包含邮箱,app_metadata.plan 包含订阅计划(自定义字段)。