跳到主要内容

4 篇博文 含有标签「ai-agent」

查看所有标签

用抽象类统一多搜索 API,错误返回而非抛异常

· 阅读需 5 分钟

在为客户构建 AI Agent 平台时遇到此问题:需要支持多个搜索提供商(Tavily、Serper、Brave、Bing),同时确保工具调用失败时不会中断 Agent 对话流程。

TL;DR

  1. 定义 SearchProvider 抽象基类 + SearchResult 数据模型,统一接口和输出格式
  2. 每个提供商继承基类,实现 search() 方法,内部做响应字段映射
  3. 关键设计:错误时返回包含错误信息的 SearchResult 对象,而非抛异常

问题现象

直接调用不同搜索 API 的问题:

# Tavily: POST 请求,results[].url
response = await client.post("https://api.tavily.com/search", ...)

# Serper: POST 请求,organic[].link
response = await client.post("https://google.serper.dev/search", ...)

# Brave: GET 请求,web.results[].description
response = await client.get("https://api.search.brave.com/res/v1/web/search", ...)

# Bing: GET 请求,webPages.value[].snippet
response = await client.get("https://api.bing.microsoft.com/v7.0/search", ...)

问题

  1. 请求方式、认证头、响应结构各不相同
  2. 切换提供商需要改调用方代码
  3. raise Exception 会中断 AI Agent 的流式对话

根因

  1. 缺少抽象层:调用方直接依赖具体实现,违反依赖倒置原则
  2. 错误处理策略不统一:异常会沿调用栈向上传播,在流式场景下导致整个对话中断

对于 AI Agent 工具调用场景,Agent 需要根据错误信息决定是否重试、换用其他工具、或向用户说明情况——而不是直接崩溃。

解决方案

1. 定义抽象基类和数据模型

# base.py
from abc import ABC, abstractmethod
from typing import List
from pydantic import BaseModel


class SearchResult(BaseModel):
"""Unified search result."""
title: str
link: str
snippet: str


class SearchProvider(ABC):
"""Base class for search providers."""

def __init__(self, api_key: str):
self.api_key = api_key

@abstractmethod
async def search(self, query: str, max_results: int = 5) -> List[SearchResult]:
"""Execute search and return results."""
pass

2. 实现具体提供商

Tavily(AI 优化搜索,支持 rate limit / quota 错误码):

# tavily.py
import httpx
import logging
from typing import List
from .base import SearchProvider, SearchResult

logger = logging.getLogger(__name__)


class TavilySearch(SearchProvider):
"""Tavily Search API implementation."""

async def search(self, query: str, max_results: int = 5) -> List[SearchResult]:
try:
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.post(
"https://api.tavily.com/search",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"query": query,
"max_results": max_results,
"search_depth": "basic"
}
)

# 错误时返回 SearchResult,而非 raise
if response.status_code == 429:
return [SearchResult(
title="Rate Limited",
link="",
snippet="Search quota exceeded. Please try again later."
)]

if response.status_code == 401:
return [SearchResult(
title="Auth Error",
link="",
snippet="Search API key is invalid."
)]

if response.status_code == 402:
return [SearchResult(
title="Quota Exceeded",
link="",
snippet="Monthly search quota depleted."
)]

response.raise_for_status()
data = response.json()

# 字段映射:Tavily 的 url -> 统一的 link
results = []
for item in data.get("results", [])[:max_results]:
results.append(SearchResult(
title=item.get("title", ""),
link=item.get("url", ""),
snippet=item.get("content", "")
))
return results

except httpx.TimeoutException:
logger.warning(f"Tavily API timeout: {query[:50]}")
return [SearchResult(title="Timeout", link="", snippet="Search timed out.")]
except Exception as e:
logger.error(f"Tavily search error: {e}")
return [SearchResult(title="Error", link="", snippet=f"Search failed: {str(e)}")]

Serper(Google Search API):

# serper.py
class SerperSearch(SearchProvider):
"""Serper (Google Search) API implementation."""

async def search(self, query: str, max_results: int = 5) -> List[SearchResult]:
try:
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.post(
"https://google.serper.dev/search",
headers={"X-API-KEY": self.api_key, "Content-Type": "application/json"},
json={"q": query, "num": max_results}
)

if response.status_code == 401:
return [SearchResult(title="Auth Error", link="", snippet="Serper API key is invalid.")]

response.raise_for_status()
data = response.json()

# 字段映射:Serper 的 organic[].link -> 统一的 link
results = []
for item in data.get("organic", [])[:max_results]:
results.append(SearchResult(
title=item.get("title", ""),
link=item.get("link", ""),
snippet=item.get("snippet", "")
))
return results

except httpx.TimeoutException:
return [SearchResult(title="Timeout", link="", snippet="Search timed out.")]
except Exception as e:
return [SearchResult(title="Error", link="", snippet=f"Search failed: {str(e)}")]

BraveBing 实现类似,区别在于请求方式和响应字段映射。

3. 调用方使用

# 使用时只需依赖抽象
async def execute_search(provider: SearchProvider, query: str) -> List[SearchResult]:
results = await provider.search(query)

# 检查是否有错误(通过 title 或 snippet 判断)
if results and not results[0].link:
error_msg = results[0].snippet
# Agent 可以根据错误信息决定下一步操作
return f"Search failed: {error_msg}"

return results


# 切换提供商只需换实例
provider = TavilySearch(api_key="xxx")
# provider = SerperSearch(api_key="xxx")
results = await execute_search(provider, "Python async best practices")

关键设计决策

决策原因
错误返回 SearchResult 而非 raiseAI Agent 对话是流式流程,异常会中断整个对话
用 Pydantic BaseModel 定义输出自动校验 + IDE 提示 + JSON 序列化
抽象类用 ABC 而非 Protocol需要共享 __init__ 逻辑(api_key 存储)
超时统一 15 秒搜索是用户体验关键路径,不能太慢

对类似需求感兴趣?联系合作

绕过 Supabase Auth 实现 Playwright E2E 测试免登录

· 阅读需 4 分钟

在为客户构建 AI Agent SaaS 平台时遇到此问题,记录根因与解法。

TL;DR

E2E 测试不应该依赖真实的 OAuth 登录流程。通过在 useAuth hook 中检测 localStorage 的测试标记,直接注入 mock 认证状态,跳过 Supabase 初始化。同时将 Zustand store 的 loading 默认值改为 false,避免 AuthGuard 卡在无限 spinner。

问题现象

使用 Playwright 测试 React SPA 时,页面被 AuthGuard 组件保护,需要 Supabase 认证才能访问。测试启动后,页面一直显示 loading spinner,无法进入业务流程。

// AuthGuard 组件 - 测试时卡在这里
export function AuthGuard({ children }: AuthGuardProps) {
const { isAuthenticated, loading } = useAuth()

if (loading) {
return <Spinner /> // 永远显示 spinner
}

if (!isAuthenticated) {
return <Navigate to="/login" />
}

return <>{children}</>
}

测试代码尝试模拟登录,但 Supabase Auth SDK 内部状态无法通过简单的 API mock 控制。

根因

1. Supabase Auth 是异步初始化的

useAuth hook 在 useEffect 中调用 supabase.auth.getSession(),这是异步操作。测试环境下网络请求可能失败或超时,导致状态永远停留在 loading: true

2. Zustand Store 的默认值问题

// authStore.ts - 问题代码
export const useAuthStore = create<AuthState>()(
persist(
(set) => ({
user: null,
token: null,
loading: true, // 👈 默认值是 true
// ...
}),
{ name: 'auth-storage' }
)
)

测试启动时,loading: true + 异步初始化失败 = 永远 loading。

3. OAuth 流程无法自动化

即使能 mock API,OAuth 的重定向流程需要真实浏览器交互,E2E 测试无法可靠模拟。

解决方案

步骤 1:在 useAuth hook 中添加测试模式检测

// hooks/useAuth.ts
export function useAuth() {
const { user, token, loading, setUser, setToken, setLoading } = useAuthStore()

useEffect(() => {
const initAuth = async () => {
// 👇 优先检测测试模式
const testAuthUser = localStorage.getItem('test-auth-user')
const testAuthToken = localStorage.getItem('test-auth-token')

if (testAuthUser && testAuthToken) {
try {
const userData = JSON.parse(testAuthUser) as User
setUser(userData)
setToken(testAuthToken)
setLoading(false)
console.log('[useAuth] Using test mode auth')
return // 👈 直接返回,跳过 Supabase 初始化
} catch (e) {
console.error('Failed to parse test auth user:', e)
}
}

// 👇 正常模式:走 Supabase Auth
try {
const { data: { session } } = await supabase.auth.getSession()
if (session) {
setUser(session.user as User)
setToken(session.access_token)
}
} catch (error) {
console.error('Auth init failed:', error)
} finally {
setLoading(false)
}
}

initAuth()

// 👇 测试模式下跳过 auth state listener
if (localStorage.getItem('test-auth-user')) {
return
}

const { data: { subscription } } = supabase.auth.onAuthStateChange(
async (event, session) => {
// ... 正常的 auth state 处理
}
)

return () => subscription.unsubscribe()
}, [])
}

步骤 2:修改 Zustand Store 默认值

// stores/authStore.ts
export const useAuthStore = create<AuthState>()(
persist(
(set) => ({
user: null,
token: null,
loading: false, // 👈 改为 false,让 useAuth hook 控制状态
setUser: (user) => set({ user }),
setToken: (token) => set({ token }),
setLoading: (loading) => set({ loading }),
logout: () => set({ user: null, token: null, loading: false }),
}),
{
name: 'auth-storage',
partialize: (state) => ({ user: state.user, token: state.token }),
}
)
)

步骤 3:在 Playwright Fixture 中注入测试认证

// e2e/fixtures.ts
import { test as base } from '@playwright/test'

export const mockUser = {
id: 'test-user-id',
email: 'test@example.com',
created_at: '2024-01-01T00:00:00Z',
}

export const test = base.extend({
authenticatedPage: async ({ page }, use) => {
// 先访问页面以设置 localStorage 的 origin
await page.goto('/login')

// 👇 注入测试认证状态到 localStorage
await page.evaluate(
({ user, token }) => {
localStorage.setItem('test-auth-user', JSON.stringify(user))
localStorage.setItem('test-auth-token', token)
},
{ user: mockUser, token: 'mock-access-token' }
)

// 导航到受保护页面,useAuth 会检测到测试模式
await page.goto('/dashboard')

await use(page)
},
})

步骤 4:在测试中使用

// e2e/dashboard.spec.ts
import { test, expect } from './fixtures'

test('dashboard shows user agents', async ({ authenticatedPage }) => {
// authenticatedPage 已经通过认证,无需登录
await expect(authenticatedPage.getByText('Test Agent')).toBeVisible()
})

完整代码结构

agent-frontend/
├── e2e/
│ ├── fixtures.ts # Playwright fixture + mock 数据
│ ├── dashboard.spec.ts # 测试用例
│ └── ...
├── src/
│ ├── hooks/
│ │ └── useAuth.ts # 测试模式检测
│ └── stores/
│ └── authStore.ts # loading: false 默认值
└── playwright.config.ts

关键要点

  1. 测试模式 key 使用特殊前缀test-auth-* 不会在生产环境中出现
  2. 检测优先于初始化:先检查 localStorage,再走 Supabase Auth
  3. 跳过 auth listener:测试模式下不需要监听 auth state 变化
  4. loading 默认值改为 false:让 hook 显式控制 loading 状态

对类似需求感兴趣?联系合作

修复 FastAPI SSE 客户端断开时的 CancelledError

· 阅读需 2 分钟

在为客户构建 AI 客服自动化系统时遇到此问题,记录根因与解法。

TL;DR

FastAPI 的 StreamingResponse 在客户端断开连接时会取消生成器任务,导致 asyncio.CancelledError。正确做法是在生成器中捕获该异常并 re-raise,否则会导致异常日志污染和资源泄漏。

问题现象

使用 SSE(Server-Sent Events)实现流式对话时,客户端断开连接后,服务端日志出现大量异常:

ERROR:    Exception in ASGI application
...
asyncio.CancelledError

代码原本写法:

async def event_stream():
async for event in engine.execute(body.message):
yield event

return StreamingResponse(event_stream(), media_type="text/event-stream")

根因

FastAPI/Starlette 的 StreamingResponse 在客户端断开时,会取消正在执行的生成器任务。Python 的 async for 循环被取消时会抛出 asyncio.CancelledError

如果不处理这个异常,它会向上传播,被 ASGI 服务器捕获并记录为错误日志。更严重的是,生成器内的资源(如数据库连接、HTTP 客户端)可能无法正确释放。

解决方案

在生成器内部捕获 CancelledError,记录日志后 必须 re-raise

import asyncio
import logging

logger = logging.getLogger(__name__)

async def event_stream():
try:
async for event in engine.execute(body.message):
yield event
except asyncio.CancelledError:
# 客户端断开连接,正常行为
logger.info("Client disconnected")
raise # 必须 re-raise 以正确终止生成器

return StreamingResponse(event_stream(), media_type="text/event-stream")

为什么必须 re-raise?

CancelledError 是 Python 取消协程的标准机制。捕获后如果不 re-raise:

  1. 生成器不会正确终止
  2. StreamingResponse 认为响应正常完成
  3. 可能导致资源泄漏

FAQ

Q: FastAPI SSE 客户端断开后为什么报 CancelledError?

A: 这是 Python asyncio 的设计行为。客户端断开时,Starlette 取消生成器任务,触发 CancelledError。正确处理方式是捕获并 re-raise。

Q: 捕获 CancelledError 后不 re-raise 会怎样?

A: 生成器无法正确终止,可能导致数据库连接、HTTP 客户端等资源泄漏。同时 StreamingResponse 会误认为响应正常完成。

Q: 如何区分正常断开和异常断开?

A: CancelledError 本身就是正常断开的信号。如果需要在断开时执行清理逻辑(如更新状态),在 except 块中处理后再 re-raise。

集成 Supabase Auth 到 FastAPI 的三个坑

· 阅读需 4 分钟

在为客户构建 SaaS 认证系统时遇到此问题,记录根因与解法。

TL;DR

Supabase Auth + FastAPI 集成有三个常见坑:JWKS 路径不是标准路径、ES256 签名需转换为 DER 格式、用户首次登录时本地数据库无记录。本文提供完整解决方案。

问题现象

坑 1:JWKS 路径 404

GET https://xxx.supabase.co/.well-known/jwks.json
# 404 Not Found

所有 JWT 验证请求返回 401 Invalid Token。

坑 2:ES256 签名验证失败

from jose import jwt
payload = jwt.decode(token, key, algorithms=["ES256"])
# JWTError: Signature verification failed

明明公钥是对的,但签名验证总是失败。

坑 3:用户首次登录无本地记录

# 创建 Agent 时
agent = Agent(user_id=current_user["user_id"], ...)
db.add(agent)
# ForeignKeyViolation: user_id 不存在

Supabase Auth 用户通过了 JWT 验证,但本地 agent_users 表没有该用户记录。

根因

坑 1:Supabase 非标准 JWKS 路径

标准 OAuth/OIDC 服务器 JWKS 在 /.well-known/jwks.json,但 Supabase 把认证服务放在 /auth/v1/ 子路径下:

标准路径Supabase 路径
/.well-known/jwks.json/auth/v1/.well-known/jwks.json

坑 2:ES256 原始签名 vs DER 格式

Supabase JWT 使用 ES256(P-256 曲线)签名。JWT 中的签名是 raw 格式r || s 拼接,64 字节),但 Python cryptography 库的 verify() 方法需要 DER-encoded ASN.1 格式

Raw:     r (32 bytes) || s (32 bytes) = 64 bytes
DER: 0x30 <len> 0x02 <r_len> <r> 0x02 <s_len> <s>

python-josejwt.decode() 在处理 ES256 时有兼容性问题,需要手动验证签名。

坑 3:认证与数据分离

Supabase Auth 是独立服务,用户注册/登录后只存在于 Supabase 的 auth.users 表。本地数据库的 agent_users 表需要手动同步。

解决方案

1. 正确的 JWKS URL

# config.py
class Settings(BaseSettings):
supabase_url: str = "https://xxx.supabase.co"

@property
def jwks_url(self) -> str:
# 关键:/auth/v1/ 前缀
return f"{self.supabase_url}/auth/v1/.well-known/jwks.json"

2. ES256 签名验证(完整代码)

import json
import base64
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric.utils import encode_dss_signature

def _base64url_decode(data: str) -> bytes:
"""Base64url 解码,自动补 padding"""
rem = len(data) % 4
if rem > 0:
data += "=" * (4 - rem)
return base64.urlsafe_b64decode(data)

def _raw_to_der_signature(raw_sig: bytes) -> bytes:
"""将 raw ECDSA 签名 (r||s) 转为 DER 格式"""
# P-256: r 和 s 各 32 字节
r = int.from_bytes(raw_sig[:32], "big")
s = int.from_bytes(raw_sig[32:], "big")
return encode_dss_signature(r, s)

def verify_es256_signature(token: str, public_key_jwk: dict) -> dict:
"""验证 ES256 JWT 签名,返回 payload"""
parts = token.split(".")
if len(parts) != 3:
raise ValueError("Invalid JWT format")

header_b64, payload_b64, signature_b64 = parts

# 1. 构建 EC 公钥
x = _base64url_decode(public_key_jwk["x"])
y = _base64url_decode(public_key_jwk["y"])
x_int = int.from_bytes(x, "big")
y_int = int.from_bytes(y, "big")

public_key = ec.EllipticCurvePublicNumbers(
x_int, y_int, ec.SECP256R1()
).public_key(default_backend())

# 2. 验证签名
message = f"{header_b64}.{payload_b64}".encode()
raw_signature = _base64url_decode(signature_b64)
der_signature = _raw_to_der_signature(raw_signature)

public_key.verify(
der_signature,
message,
ec.ECDSA(hashes.SHA256())
)

# 3. 返回 payload
return json.loads(_base64url_decode(payload_b64))

3. 用户同步服务

# app/services/user_service.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.user import AgentUser

async def ensure_user_exists(
db: AsyncSession,
user_id: str,
email: str,
plan: str = "free"
) -> AgentUser:
"""确保用户存在于本地数据库(从 Supabase Auth 同步)"""
# 检查是否存在
result = await db.execute(
select(AgentUser).where(AgentUser.user_id == user_id)
)
user = result.scalar_one_or_none()

if user:
return user

# 创建新用户
user = AgentUser(
user_id=user_id,
email=email,
plan=plan,
role="user"
)
db.add(user)
await db.commit()
await db.refresh(user)
return user

4. 在创建资源前调用

# app/routers/agents.py
@router.post("/")
async def create_agent(
input: CreateAgentInput,
db: AsyncSession = Depends(get_db),
current_user: dict = Depends(get_current_user)
):
# 关键:确保用户存在
user = await ensure_user_exists(
db,
user_id=current_user["user_id"],
email=current_user["email"],
plan=current_user["plan"]
)

# 现在可以安全创建 Agent
agent = Agent(
user_id=user.user_id,
name=input.name,
llm_config=input.llm_config.model_dump()
)
...

FAQ

Q: Supabase JWT 验证返回 404 怎么办?

A: Supabase 的 JWKS 路径是 /auth/v1/.well-known/jwks.json,不是标准的 /.well-known/jwks.json。检查你的 JWKS URL 配置。

Q: python-jose 验证 ES256 签名失败怎么解决?

A: python-jose 对 ES256 支持不完善。使用 cryptography 库手动验证,需要将 JWT 的 raw 签名(r||s 64字节)转换为 DER 格式。

Q: FastAPI 如何同步 Supabase Auth 用户到本地数据库?

A: 在需要用户记录的 API(如创建资源)入口处调用 ensure_user_exists(),从 JWT 提取用户信息并同步到本地表。

Q: Supabase JWT 中的 user_id 在哪个字段?

A: sub 字段包含用户 UUID,email 字段包含邮箱,app_metadata.plan 包含订阅计划(自定义字段)。