Skip to main content

4 posts tagged with "ai-agent"

View all tags

Unify Multiple Search APIs with Abstract Class, Return Errors Instead of Raising

· 4 min read

Encountered this issue while building an AI Agent platform for a client: needed to support multiple search providers (Tavily, Serper, Brave, Bing) while ensuring tool call failures don't interrupt the Agent's conversation flow.

TL;DR

  1. Define SearchProvider abstract base class + SearchResult data model for unified interface and output
  2. Each provider inherits the base class, implements search() method with field mapping
  3. Key design: Return SearchResult with error info on failure, never raise exceptions

The Problem

Direct calls to different search APIs look like this:

# Tavily: POST request, results[].url
response = await client.post("https://api.tavily.com/search", ...)

# Serper: POST request, organic[].link
response = await client.post("https://google.serper.dev/search", ...)

# Brave: GET request, web.results[].description
response = await client.get("https://api.search.brave.com/res/v1/web/search", ...)

# Bing: GET request, webPages.value[].snippet
response = await client.get("https://api.bing.microsoft.com/v7.0/search", ...)

Issues:

  1. Request methods, auth headers, and response structures vary
  2. Switching providers requires changing caller code
  3. raise Exception interrupts AI Agent's streaming conversation

Root Cause

  1. Missing abstraction layer: Caller directly depends on concrete implementations, violating dependency inversion
  2. Inconsistent error handling: Exceptions propagate up the call stack, crashing the entire streaming flow

For AI Agent tool calls, the Agent needs to decide whether to retry, use another tool, or explain to the user—not just crash.

Solution

1. Define Abstract Base Class and Data Model

# base.py
from abc import ABC, abstractmethod
from typing import List
from pydantic import BaseModel


class SearchResult(BaseModel):
"""Unified search result."""
title: str
link: str
snippet: str


class SearchProvider(ABC):
"""Base class for search providers."""

def __init__(self, api_key: str):
self.api_key = api_key

@abstractmethod
async def search(self, query: str, max_results: int = 5) -> List[SearchResult]:
"""Execute search and return results."""
pass

2. Implement Concrete Providers

Tavily (AI-optimized search, supports rate limit / quota error codes):

# tavily.py
import httpx
import logging
from typing import List
from .base import SearchProvider, SearchResult

logger = logging.getLogger(__name__)


class TavilySearch(SearchProvider):
"""Tavily Search API implementation."""

async def search(self, query: str, max_results: int = 5) -> List[SearchResult]:
try:
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.post(
"https://api.tavily.com/search",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"query": query,
"max_results": max_results,
"search_depth": "basic"
}
)

# Return SearchResult on error, never raise
if response.status_code == 429:
return [SearchResult(
title="Rate Limited",
link="",
snippet="Search quota exceeded. Please try again later."
)]

if response.status_code == 401:
return [SearchResult(
title="Auth Error",
link="",
snippet="Search API key is invalid."
)]

if response.status_code == 402:
return [SearchResult(
title="Quota Exceeded",
link="",
snippet="Monthly search quota depleted."
)]

response.raise_for_status()
data = response.json()

# Field mapping: Tavily's url -> unified link
results = []
for item in data.get("results", [])[:max_results]:
results.append(SearchResult(
title=item.get("title", ""),
link=item.get("url", ""),
snippet=item.get("content", "")
))
return results

except httpx.TimeoutException:
logger.warning(f"Tavily API timeout: {query[:50]}")
return [SearchResult(title="Timeout", link="", snippet="Search timed out.")]
except Exception as e:
logger.error(f"Tavily search error: {e}")
return [SearchResult(title="Error", link="", snippet=f"Search failed: {str(e)}")]

Serper (Google Search API):

# serper.py
class SerperSearch(SearchProvider):
"""Serper (Google Search) API implementation."""

async def search(self, query: str, max_results: int = 5) -> List[SearchResult]:
try:
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.post(
"https://google.serper.dev/search",
headers={"X-API-KEY": self.api_key, "Content-Type": "application/json"},
json={"q": query, "num": max_results}
)

if response.status_code == 401:
return [SearchResult(title="Auth Error", link="", snippet="Serper API key is invalid.")]

response.raise_for_status()
data = response.json()

# Field mapping: Serper's organic[].link -> unified link
results = []
for item in data.get("organic", [])[:max_results]:
results.append(SearchResult(
title=item.get("title", ""),
link=item.get("link", ""),
snippet=item.get("snippet", "")
))
return results

except httpx.TimeoutException:
return [SearchResult(title="Timeout", link="", snippet="Search timed out.")]
except Exception as e:
return [SearchResult(title="Error", link="", snippet=f"Search failed: {str(e)}")]

Brave and Bing implementations are similar, differing in request method and response field mapping.

3. Caller Usage

# Depend on abstraction only
async def execute_search(provider: SearchProvider, query: str) -> List[SearchResult]:
results = await provider.search(query)

# Check for errors (via title or snippet)
if results and not results[0].link:
error_msg = results[0].snippet
# Agent can decide next action based on error info
return f"Search failed: {error_msg}"

return results


# Switch providers by changing instance only
provider = TavilySearch(api_key="xxx")
# provider = SerperSearch(api_key="xxx")
results = await execute_search(provider, "Python async best practices")

Key Design Decisions

DecisionReason
Return SearchResult on error instead of raiseAI Agent conversations are streaming flows; exceptions interrupt everything
Use Pydantic BaseModel for outputAuto-validation + IDE hints + JSON serialization
Use ABC instead of ProtocolNeed shared __init__ logic (api_key storage)
Unified 15-second timeoutSearch is UX-critical; can't be too slow

Interested in similar solutions? Get in touch

Bypass Supabase Auth for Playwright E2E Testing Without Login

· 4 min read

Encountered this issue while building an AI Agent SaaS platform for a client. Here's the root cause and solution.

TL;DR

E2E tests shouldn't depend on real OAuth login flows. By detecting localStorage test markers in the useAuth hook, you can inject mock auth state directly and skip Supabase initialization. Also change the Zustand store's loading default to false to prevent AuthGuard from showing an infinite spinner.

Problem

When testing a React SPA with Playwright, pages are protected by AuthGuard and require Supabase authentication. After the test starts, the page shows a loading spinner indefinitely and never reaches the business logic.

// AuthGuard component - tests get stuck here
export function AuthGuard({ children }: AuthGuardProps) {
const { isAuthenticated, loading } = useAuth()

if (loading) {
return <Spinner /> // Forever showing spinner
}

if (!isAuthenticated) {
return <Navigate to="/login" />
}

return <>{children}</>
}

Test code tries to simulate login, but Supabase Auth SDK's internal state can't be controlled by simple API mocking.

Root Cause

1. Supabase Auth Initializes Asynchronously

The useAuth hook calls supabase.auth.getSession() in useEffect, which is async. In test environments, network requests may fail or timeout, leaving state stuck at loading: true.

2. Zustand Store Default Value Problem

// authStore.ts - problematic code
export const useAuthStore = create<AuthState>()(
persist(
(set) => ({
user: null,
token: null,
loading: true, // 👈 Default is true
// ...
}),
{ name: 'auth-storage' }
)
)

At test startup: loading: true + async init failure = forever loading.

3. OAuth Flow Can't Be Automated

Even if APIs can be mocked, OAuth redirect flows require real browser interaction that E2E tests can't reliably simulate.

Solution

Step 1: Add Test Mode Detection in useAuth Hook

// hooks/useAuth.ts
export function useAuth() {
const { user, token, loading, setUser, setToken, setLoading } = useAuthStore()

useEffect(() => {
const initAuth = async () => {
// 👇 Check test mode first
const testAuthUser = localStorage.getItem('test-auth-user')
const testAuthToken = localStorage.getItem('test-auth-token')

if (testAuthUser && testAuthToken) {
try {
const userData = JSON.parse(testAuthUser) as User
setUser(userData)
setToken(testAuthToken)
setLoading(false)
console.log('[useAuth] Using test mode auth')
return // 👈 Return early, skip Supabase init
} catch (e) {
console.error('Failed to parse test auth user:', e)
}
}

// 👇 Normal mode: proceed with Supabase Auth
try {
const { data: { session } } = await supabase.auth.getSession()
if (session) {
setUser(session.user as User)
setToken(session.access_token)
}
} catch (error) {
console.error('Auth init failed:', error)
} finally {
setLoading(false)
}
}

initAuth()

// 👇 Skip auth state listener in test mode
if (localStorage.getItem('test-auth-user')) {
return
}

const { data: { subscription } } = supabase.auth.onAuthStateChange(
async (event, session) => {
// ... normal auth state handling
}
)

return () => subscription.unsubscribe()
}, [])
}

Step 2: Modify Zustand Store Default Value

// stores/authStore.ts
export const useAuthStore = create<AuthState>()(
persist(
(set) => ({
user: null,
token: null,
loading: false, // 👈 Change to false, let useAuth hook control state
setUser: (user) => set({ user }),
setToken: (token) => set({ token }),
setLoading: (loading) => set({ loading }),
logout: () => set({ user: null, token: null, loading: false }),
}),
{
name: 'auth-storage',
partialize: (state) => ({ user: state.user, token: state.token }),
}
)
)

Step 3: Inject Test Auth in Playwright Fixture

// e2e/fixtures.ts
import { test as base } from '@playwright/test'

export const mockUser = {
id: 'test-user-id',
email: 'test@example.com',
created_at: '2024-01-01T00:00:00Z',
}

export const test = base.extend({
authenticatedPage: async ({ page }, use) => {
// Visit page first to set localStorage origin
await page.goto('/login')

// 👇 Inject test auth state into localStorage
await page.evaluate(
({ user, token }) => {
localStorage.setItem('test-auth-user', JSON.stringify(user))
localStorage.setItem('test-auth-token', token)
},
{ user: mockUser, token: 'mock-access-token' }
)

// Navigate to protected page, useAuth will detect test mode
await page.goto('/dashboard')

await use(page)
},
})

Step 4: Use in Tests

// e2e/dashboard.spec.ts
import { test, expect } from './fixtures'

test('dashboard shows user agents', async ({ authenticatedPage }) => {
// authenticatedPage is already authenticated, no login needed
await expect(authenticatedPage.getByText('Test Agent')).toBeVisible()
})

Complete Code Structure

agent-frontend/
├── e2e/
│ ├── fixtures.ts # Playwright fixture + mock data
│ ├── dashboard.spec.ts # Test cases
│ └── ...
├── src/
│ ├── hooks/
│ │ └── useAuth.ts # Test mode detection
│ └── stores/
│ └── authStore.ts # loading: false default
└── playwright.config.ts

Key Takeaways

  1. Use special prefix for test mode keys: test-auth-* won't appear in production
  2. Check before initialize: Check localStorage first, then proceed with Supabase Auth
  3. Skip auth listener: No need to listen for auth state changes in test mode
  4. Change loading default to false: Let the hook explicitly control loading state

Interested in similar solutions? Contact us

修复 FastAPI SSE 客户端断开时的 CancelledError

· 2 min read

在为客户构建 AI 客服自动化系统时遇到此问题,记录根因与解法。

TL;DR

FastAPI 的 StreamingResponse 在客户端断开连接时会取消生成器任务,导致 asyncio.CancelledError。正确做法是在生成器中捕获该异常并 re-raise,否则会导致异常日志污染和资源泄漏。

问题现象

使用 SSE(Server-Sent Events)实现流式对话时,客户端断开连接后,服务端日志出现大量异常:

ERROR:    Exception in ASGI application
...
asyncio.CancelledError

代码原本写法:

async def event_stream():
async for event in engine.execute(body.message):
yield event

return StreamingResponse(event_stream(), media_type="text/event-stream")

根因

FastAPI/Starlette 的 StreamingResponse 在客户端断开时,会取消正在执行的生成器任务。Python 的 async for 循环被取消时会抛出 asyncio.CancelledError

如果不处理这个异常,它会向上传播,被 ASGI 服务器捕获并记录为错误日志。更严重的是,生成器内的资源(如数据库连接、HTTP 客户端)可能无法正确释放。

解决方案

在生成器内部捕获 CancelledError,记录日志后 必须 re-raise

import asyncio
import logging

logger = logging.getLogger(__name__)

async def event_stream():
try:
async for event in engine.execute(body.message):
yield event
except asyncio.CancelledError:
# 客户端断开连接,正常行为
logger.info("Client disconnected")
raise # 必须 re-raise 以正确终止生成器

return StreamingResponse(event_stream(), media_type="text/event-stream")

为什么必须 re-raise?

CancelledError 是 Python 取消协程的标准机制。捕获后如果不 re-raise:

  1. 生成器不会正确终止
  2. StreamingResponse 认为响应正常完成
  3. 可能导致资源泄漏

FAQ

Q: FastAPI SSE 客户端断开后为什么报 CancelledError?

A: 这是 Python asyncio 的设计行为。客户端断开时,Starlette 取消生成器任务,触发 CancelledError。正确处理方式是捕获并 re-raise。

Q: 捕获 CancelledError 后不 re-raise 会怎样?

A: 生成器无法正确终止,可能导致数据库连接、HTTP 客户端等资源泄漏。同时 StreamingResponse 会误认为响应正常完成。

Q: 如何区分正常断开和异常断开?

A: CancelledError 本身就是正常断开的信号。如果需要在断开时执行清理逻辑(如更新状态),在 except 块中处理后再 re-raise。

集成 Supabase Auth 到 FastAPI 的三个坑

· 4 min read

在为客户构建 SaaS 认证系统时遇到此问题,记录根因与解法。

TL;DR

Supabase Auth + FastAPI 集成有三个常见坑:JWKS 路径不是标准路径、ES256 签名需转换为 DER 格式、用户首次登录时本地数据库无记录。本文提供完整解决方案。

问题现象

坑 1:JWKS 路径 404

GET https://xxx.supabase.co/.well-known/jwks.json
# 404 Not Found

所有 JWT 验证请求返回 401 Invalid Token。

坑 2:ES256 签名验证失败

from jose import jwt
payload = jwt.decode(token, key, algorithms=["ES256"])
# JWTError: Signature verification failed

明明公钥是对的,但签名验证总是失败。

坑 3:用户首次登录无本地记录

# 创建 Agent 时
agent = Agent(user_id=current_user["user_id"], ...)
db.add(agent)
# ForeignKeyViolation: user_id 不存在

Supabase Auth 用户通过了 JWT 验证,但本地 agent_users 表没有该用户记录。

根因

坑 1:Supabase 非标准 JWKS 路径

标准 OAuth/OIDC 服务器 JWKS 在 /.well-known/jwks.json,但 Supabase 把认证服务放在 /auth/v1/ 子路径下:

标准路径Supabase 路径
/.well-known/jwks.json/auth/v1/.well-known/jwks.json

坑 2:ES256 原始签名 vs DER 格式

Supabase JWT 使用 ES256(P-256 曲线)签名。JWT 中的签名是 raw 格式r || s 拼接,64 字节),但 Python cryptography 库的 verify() 方法需要 DER-encoded ASN.1 格式

Raw:     r (32 bytes) || s (32 bytes) = 64 bytes
DER: 0x30 <len> 0x02 <r_len> <r> 0x02 <s_len> <s>

python-josejwt.decode() 在处理 ES256 时有兼容性问题,需要手动验证签名。

坑 3:认证与数据分离

Supabase Auth 是独立服务,用户注册/登录后只存在于 Supabase 的 auth.users 表。本地数据库的 agent_users 表需要手动同步。

解决方案

1. 正确的 JWKS URL

# config.py
class Settings(BaseSettings):
supabase_url: str = "https://xxx.supabase.co"

@property
def jwks_url(self) -> str:
# 关键:/auth/v1/ 前缀
return f"{self.supabase_url}/auth/v1/.well-known/jwks.json"

2. ES256 签名验证(完整代码)

import json
import base64
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric.utils import encode_dss_signature

def _base64url_decode(data: str) -> bytes:
"""Base64url 解码,自动补 padding"""
rem = len(data) % 4
if rem > 0:
data += "=" * (4 - rem)
return base64.urlsafe_b64decode(data)

def _raw_to_der_signature(raw_sig: bytes) -> bytes:
"""将 raw ECDSA 签名 (r||s) 转为 DER 格式"""
# P-256: r 和 s 各 32 字节
r = int.from_bytes(raw_sig[:32], "big")
s = int.from_bytes(raw_sig[32:], "big")
return encode_dss_signature(r, s)

def verify_es256_signature(token: str, public_key_jwk: dict) -> dict:
"""验证 ES256 JWT 签名,返回 payload"""
parts = token.split(".")
if len(parts) != 3:
raise ValueError("Invalid JWT format")

header_b64, payload_b64, signature_b64 = parts

# 1. 构建 EC 公钥
x = _base64url_decode(public_key_jwk["x"])
y = _base64url_decode(public_key_jwk["y"])
x_int = int.from_bytes(x, "big")
y_int = int.from_bytes(y, "big")

public_key = ec.EllipticCurvePublicNumbers(
x_int, y_int, ec.SECP256R1()
).public_key(default_backend())

# 2. 验证签名
message = f"{header_b64}.{payload_b64}".encode()
raw_signature = _base64url_decode(signature_b64)
der_signature = _raw_to_der_signature(raw_signature)

public_key.verify(
der_signature,
message,
ec.ECDSA(hashes.SHA256())
)

# 3. 返回 payload
return json.loads(_base64url_decode(payload_b64))

3. 用户同步服务

# app/services/user_service.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.user import AgentUser

async def ensure_user_exists(
db: AsyncSession,
user_id: str,
email: str,
plan: str = "free"
) -> AgentUser:
"""确保用户存在于本地数据库(从 Supabase Auth 同步)"""
# 检查是否存在
result = await db.execute(
select(AgentUser).where(AgentUser.user_id == user_id)
)
user = result.scalar_one_or_none()

if user:
return user

# 创建新用户
user = AgentUser(
user_id=user_id,
email=email,
plan=plan,
role="user"
)
db.add(user)
await db.commit()
await db.refresh(user)
return user

4. 在创建资源前调用

# app/routers/agents.py
@router.post("/")
async def create_agent(
input: CreateAgentInput,
db: AsyncSession = Depends(get_db),
current_user: dict = Depends(get_current_user)
):
# 关键:确保用户存在
user = await ensure_user_exists(
db,
user_id=current_user["user_id"],
email=current_user["email"],
plan=current_user["plan"]
)

# 现在可以安全创建 Agent
agent = Agent(
user_id=user.user_id,
name=input.name,
llm_config=input.llm_config.model_dump()
)
...

FAQ

Q: Supabase JWT 验证返回 404 怎么办?

A: Supabase 的 JWKS 路径是 /auth/v1/.well-known/jwks.json,不是标准的 /.well-known/jwks.json。检查你的 JWKS URL 配置。

Q: python-jose 验证 ES256 签名失败怎么解决?

A: python-jose 对 ES256 支持不完善。使用 cryptography 库手动验证,需要将 JWT 的 raw 签名(r||s 64字节)转换为 DER 格式。

Q: FastAPI 如何同步 Supabase Auth 用户到本地数据库?

A: 在需要用户记录的 API(如创建资源)入口处调用 ensure_user_exists(),从 JWT 提取用户信息并同步到本地表。

Q: Supabase JWT 中的 user_id 在哪个字段?

A: sub 字段包含用户 UUID,email 字段包含邮箱,app_metadata.plan 包含订阅计划(自定义字段)。