跳到主要内容

5 篇博文 含有标签「Python」

查看所有标签

用抽象类统一多搜索 API,错误返回而非抛异常

· 阅读需 5 分钟

在为客户构建 AI Agent 平台时遇到此问题:需要支持多个搜索提供商(Tavily、Serper、Brave、Bing),同时确保工具调用失败时不会中断 Agent 对话流程。

TL;DR

  1. 定义 SearchProvider 抽象基类 + SearchResult 数据模型,统一接口和输出格式
  2. 每个提供商继承基类,实现 search() 方法,内部做响应字段映射
  3. 关键设计:错误时返回包含错误信息的 SearchResult 对象,而非抛异常

问题现象

直接调用不同搜索 API 的问题:

# Tavily: POST 请求,results[].url
response = await client.post("https://api.tavily.com/search", ...)

# Serper: POST 请求,organic[].link
response = await client.post("https://google.serper.dev/search", ...)

# Brave: GET 请求,web.results[].description
response = await client.get("https://api.search.brave.com/res/v1/web/search", ...)

# Bing: GET 请求,webPages.value[].snippet
response = await client.get("https://api.bing.microsoft.com/v7.0/search", ...)

问题

  1. 请求方式、认证头、响应结构各不相同
  2. 切换提供商需要改调用方代码
  3. raise Exception 会中断 AI Agent 的流式对话

根因

  1. 缺少抽象层:调用方直接依赖具体实现,违反依赖倒置原则
  2. 错误处理策略不统一:异常会沿调用栈向上传播,在流式场景下导致整个对话中断

对于 AI Agent 工具调用场景,Agent 需要根据错误信息决定是否重试、换用其他工具、或向用户说明情况——而不是直接崩溃。

解决方案

1. 定义抽象基类和数据模型

# base.py
from abc import ABC, abstractmethod
from typing import List
from pydantic import BaseModel


class SearchResult(BaseModel):
"""Unified search result."""
title: str
link: str
snippet: str


class SearchProvider(ABC):
"""Base class for search providers."""

def __init__(self, api_key: str):
self.api_key = api_key

@abstractmethod
async def search(self, query: str, max_results: int = 5) -> List[SearchResult]:
"""Execute search and return results."""
pass

2. 实现具体提供商

Tavily(AI 优化搜索,支持 rate limit / quota 错误码):

# tavily.py
import httpx
import logging
from typing import List
from .base import SearchProvider, SearchResult

logger = logging.getLogger(__name__)


class TavilySearch(SearchProvider):
"""Tavily Search API implementation."""

async def search(self, query: str, max_results: int = 5) -> List[SearchResult]:
try:
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.post(
"https://api.tavily.com/search",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"query": query,
"max_results": max_results,
"search_depth": "basic"
}
)

# 错误时返回 SearchResult,而非 raise
if response.status_code == 429:
return [SearchResult(
title="Rate Limited",
link="",
snippet="Search quota exceeded. Please try again later."
)]

if response.status_code == 401:
return [SearchResult(
title="Auth Error",
link="",
snippet="Search API key is invalid."
)]

if response.status_code == 402:
return [SearchResult(
title="Quota Exceeded",
link="",
snippet="Monthly search quota depleted."
)]

response.raise_for_status()
data = response.json()

# 字段映射:Tavily 的 url -> 统一的 link
results = []
for item in data.get("results", [])[:max_results]:
results.append(SearchResult(
title=item.get("title", ""),
link=item.get("url", ""),
snippet=item.get("content", "")
))
return results

except httpx.TimeoutException:
logger.warning(f"Tavily API timeout: {query[:50]}")
return [SearchResult(title="Timeout", link="", snippet="Search timed out.")]
except Exception as e:
logger.error(f"Tavily search error: {e}")
return [SearchResult(title="Error", link="", snippet=f"Search failed: {str(e)}")]

Serper(Google Search API):

# serper.py
class SerperSearch(SearchProvider):
"""Serper (Google Search) API implementation."""

async def search(self, query: str, max_results: int = 5) -> List[SearchResult]:
try:
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.post(
"https://google.serper.dev/search",
headers={"X-API-KEY": self.api_key, "Content-Type": "application/json"},
json={"q": query, "num": max_results}
)

if response.status_code == 401:
return [SearchResult(title="Auth Error", link="", snippet="Serper API key is invalid.")]

response.raise_for_status()
data = response.json()

# 字段映射:Serper 的 organic[].link -> 统一的 link
results = []
for item in data.get("organic", [])[:max_results]:
results.append(SearchResult(
title=item.get("title", ""),
link=item.get("link", ""),
snippet=item.get("snippet", "")
))
return results

except httpx.TimeoutException:
return [SearchResult(title="Timeout", link="", snippet="Search timed out.")]
except Exception as e:
return [SearchResult(title="Error", link="", snippet=f"Search failed: {str(e)}")]

BraveBing 实现类似,区别在于请求方式和响应字段映射。

3. 调用方使用

# 使用时只需依赖抽象
async def execute_search(provider: SearchProvider, query: str) -> List[SearchResult]:
results = await provider.search(query)

# 检查是否有错误(通过 title 或 snippet 判断)
if results and not results[0].link:
error_msg = results[0].snippet
# Agent 可以根据错误信息决定下一步操作
return f"Search failed: {error_msg}"

return results


# 切换提供商只需换实例
provider = TavilySearch(api_key="xxx")
# provider = SerperSearch(api_key="xxx")
results = await execute_search(provider, "Python async best practices")

关键设计决策

决策原因
错误返回 SearchResult 而非 raiseAI Agent 对话是流式流程,异常会中断整个对话
用 Pydantic BaseModel 定义输出自动校验 + IDE 提示 + JSON 序列化
抽象类用 ABC 而非 Protocol需要共享 __init__ 逻辑(api_key 存储)
超时统一 15 秒搜索是用户体验关键路径,不能太慢

对类似需求感兴趣?联系合作

修复 httpx async with client.post() 的隐藏坑

· 阅读需 2 分钟

在构建多服务协作的 SaaS 系统时遇到此问题,记录根因与解法。

TL;DR

httpx.AsyncClient 不要用 async with client.post() 模式,应该先创建 client 再调用方法:response = await client.post()

问题现象

import httpx

async def call_api():
async with httpx.AsyncClient() as client:
async with client.post(url, json=data) as response: # 问题代码
return response.json()

这段代码有时正常,有时报错:

httpx.RemoteProtocolError: cannot write to closing transport
RuntimeError: Session is closed

根因

async with client.post() 的陷阱

client.post() 返回的是 Response 对象,不是上下文管理器。用 async with 包装会导致:

  1. 连接过早关闭async with 块结束时立即关闭连接,但响应可能还在读取
  2. 资源竞争:多个并发请求时,连接池状态混乱

正确理解 httpx 上下文管理器

# ✅ 正确:client 是上下文管理器
async with httpx.AsyncClient() as client:
response = await client.post(url, json=data)
return response.json()

# ❌ 错误:把 response 当上下文管理器
async with client.post(url) as response:
...

解决方案

方案 1:单次请求(推荐简单场景)

async def call_api(url: str, data: dict) -> dict:
async with httpx.AsyncClient() as client:
response = await client.post(url, json=data)
response.raise_for_status()
return response.json()

方案 2:复用 client(推荐高频请求)

# 全局或依赖注入
_client = httpx.AsyncClient(timeout=30.0)

async def call_api(url: str, data: dict) -> dict:
response = await _client.post(url, json=data)
response.raise_for_status()
return response.json()

# 应用关闭时
async def shutdown():
await _client.aclose()

方案 3:FastAPI 依赖注入

from fastapi import Depends
from httpx import AsyncClient

async def get_http_client() -> AsyncClient:
async with AsyncClient(timeout=30.0) as client:
yield client

@router.post("/proxy")
async def proxy(
data: dict,
client: AsyncClient = Depends(get_http_client)
):
response = await client.post("https://external.api/endpoint", json=data)
return response.json()

FAQ

Q: httpx async with 怎么用才对?

A: async with 只用于管理 AsyncClient 生命周期,不是包装单个请求。正确模式:async with AsyncClient() as client: response = await client.post(...)

Q: 为什么有时 async with client.post() 也能跑?

A: 单线程、低并发时可能碰巧正常,但高并发或网络延迟时会暴露问题。这是隐藏 bug,不要侥幸。

Q: httpx 超时怎么配置?

A: AsyncClient(timeout=30.0)AsyncClient(timeout=httpx.Timeout(connect=5.0, read=30.0))

解决 Pydantic v2 ORM mode 报错 model_config 被覆盖

· 阅读需 2 分钟

TL;DR

Pydantic v2 不再支持 class Config,需要用 model_config = ConfigDict(from_attributes=True)。如果你的模型有 model_config 字段,必须重命名避免与保留字冲突。

问题现象

报错 1:class Config 不生效

from pydantic import BaseModel

class AgentResponse(BaseModel):
id: str
name: str

class Config:
orm_mode = True # v1 写法
PydanticUserError: `orm_mode` is not a valid config option. Did you mean `from_attributes`?

报错 2:model_config 字段冲突

class Agent(BaseModel):
id: str
model_config: dict # 业务字段,存储 LLM 配置

model_config = ConfigDict(from_attributes=True)
# TypeError: 'dict' object is not callable

模型中有个业务字段叫 model_config(存储 LLM 配置),与 Pydantic v2 保留字冲突。

根因

1. Pydantic v2 配置语法变化

Pydantic v2 使用 model_config 作为配置属性名,不再支持嵌套的 class Config

Pydantic v1Pydantic v2
class Config: orm_mode = Truemodel_config = ConfigDict(from_attributes=True)
class Config: schema_extra = {...}model_config = ConfigDict(json_schema_extra={...})

2. model_config 是保留字

model_config 在 Pydantic v2 中是特殊属性,不能同时作为业务字段名使用。

解决方案

1. 更新 ORM mode 配置

from pydantic import BaseModel, ConfigDict

class AgentResponse(BaseModel):
model_config = ConfigDict(from_attributes=True) # 新写法

id: str
name: str

2. 重命名冲突字段

将业务字段 model_config 改为 llm_config(或任意非保留名):

# models/agent.py
class Agent(BaseModel):
__tablename__ = "agent_agents"

id: str
llm_config: dict # 改名,避免冲突

# schemas/agent.py
class AgentResponse(BaseModel):
model_config = ConfigDict(from_attributes=True)

agent_id: str
llm_config: LlmConfig # 与模型保持一致

3. 数据库迁移(如需要)

如果数据库字段也要改:

# alembic/versions/xxx_rename_model_config.py
def upgrade():
op.alter_column('agent_agents', 'model_config', new_column_name='llm_config')

def downgrade():
op.alter_column('agent_agents', 'llm_config', new_column_name='model_config')

FAQ

Q: Pydantic v2 的 orm_mode 改成什么了?

A: 改为 from_attributes=True,配置方式从 class Config 变成 model_config = ConfigDict(...)

Q: 为什么 model_config 字段报错?

A: model_config 是 Pydantic v2 的保留属性名,用于配置模型行为。如果业务代码中有同名字段,需要重命名。

Q: ConfigDict 还有哪些常用选项?

A: from_attributes (ORM mode)、json_schema_extra (schema 扩展)、str_strip_whitespace (自动去空格)、validate_assignment (赋值时验证)。

修复 RAG 查询返回的 sources 缺少 similarity 字段

· 阅读需 3 分钟

在 RAG 知识库项目中调试查询结果返回格式问题,以下是完整排查过程。

TL;DR

RAG /query 接口返回的 sources 字段只包含 metadata,没有每条来源的 similarity 分数。解决方案:在组装响应时,将 metadatasdistances 合并,计算 similarity = 1 - distance

问题现象

调用 RAG 查询接口,返回的 sources 缺少相似度信息:

{
"answer": "根据文档...",
"sources": [
{"doc_id": "doc_001", "title": "API 文档", "source": "github"},
{"doc_id": "doc_002", "title": "开发指南", "source": "github"}
],
"similarity": 0.85
}

问题:

  • sources 数组中的每个对象没有 similarity 字段
  • 只有顶层的 similarity(最高相似度),无法知道每条来源的相关性
  • 前端无法按相似度排序或高亮显示

根因

原始代码直接返回 metadata,忽略了 distances 信息:

# 问题代码
result = {
"answer": answer,
"sources": search_results.get("metadatas", [[]])[0], # 只有 metadata
"collection": collection,
"similarity": max_similarity # 只有最高分
}

向量数据库(如 Milvus、Chroma)的检索结果通常包含三个数组:

  • documents: 文本内容
  • metadatas: 元数据
  • distances: 距离分数(越小越相似)

疏漏:只传递了 metadata,没有把 distance 转换为 similarity 并合并到 sources 中。

解决方案

合并 metadatasdistances,计算每条来源的相似度:

# 修复代码
metadatas = search_results.get("metadatas", [[]])[0]
distances = search_results.get("distances", [[]])[0]

sources = [
{**meta, "similarity": round(1 - dist, 3)}
for meta, dist in zip(metadatas, distances)
]

result = {
"answer": answer,
"sources": sources, # 现在包含 similarity
"collection": collection,
"similarity": max_similarity
}

修复后返回:

{
"answer": "根据文档...",
"sources": [
{"doc_id": "doc_001", "title": "API 文档", "similarity": 0.85},
{"doc_id": "doc_002", "title": "开发指南", "similarity": 0.72}
],
"similarity": 0.85
}

完整代码示例

async def query_handler(request):
# 1. 执行向量检索
search_results = await milvus_service.query(
collection_name=collection,
query_embeddings=[query_embedding],
n_results=5
)

# 2. 生成答案
answer = await llm.generate(context, question)

# 3. 组装 sources(合并 metadata 和 similarity)
metadatas = search_results.get("metadatas", [[]])[0]
distances = search_results.get("distances", [[]])[0]

sources = [
{**meta, "similarity": round(1 - dist, 3)}
for meta, dist in zip(metadatas, distances)
]

# 4. 计算最高相似度
max_similarity = max(s["similarity"] for s in sources) if sources else 0

return {
"answer": answer,
"sources": sources,
"similarity": max_similarity
}

FAQ

Q: 为什么 similarity = 1 - distance?

A: 向量数据库通常返回距离(distance)而非相似度(similarity)。对于余弦距离,cosine_distance = 1 - cosine_similarity,所以 similarity = 1 - distance。对于欧氏距离,需要用 similarity = 1 / (1 + distance) 等公式转换。

Q: 顶层 similarity 和 sources 中的 similarity 有什么区别?

A: 顶层 similarity 是最高相似度(最相关的那条来源),用于判断整体回答质量。sources 中每条记录的 similarity 表示该来源的相关性,用于排序、高亮或过滤。

Q: 如果 distance 不是余弦距离怎么办?

A: 需要根据距离类型调整公式:

  • 余弦距离:similarity = 1 - distance
  • 欧氏距离:similarity = 1 / (1 + distance)
  • 内积:similarity = distance(已经是相似度)

检查你的向量数据库配置,确认使用的是哪种距离度量。

修复 Milvus 混合搜索的四个常见坑

· 阅读需 3 分钟

在 RAG 知识库项目中调试混合检索评分问题,以下是完整排查过程。

TL;DR

Milvus 混合搜索(Dense + Sparse)有四个常见坑:空稀疏向量报错、Collection 未加载、sparse 格式错误、阈值过高。本文给出每个问题的最小修复代码。

问题现象

坑 1:空稀疏向量插入失败

MilvusException: (code=65535, message=empty sparse float vector row)

坑 2:Collection 未加载

MilvusException: (code=101, message=failed to search: collection not loaded[collection=xxx])

坑 3:Sparse 向量格式错误

ParamError: (code=1, message=`search_data` value [{0: {81705: 1.3486}}] is illegal)

坑 4:搜索无结果(分数被过滤)

{"answer": "抱歉,知识库中没有相关内容", "similarity": 0.0}

根因

坑 1:Milvus 的 SPARSE_FLOAT_VECTOR 类型不接受空字典 {},必须有至少一个键值对。

坑 2:Milvus 2.4+ 要求搜索前显式调用 load_collection(),否则报 collection not loaded。

坑 3:DashScope API 返回的 sparse 格式是 {text_index: sparse_vec},搜索时需要提取 sparse_vec 本身,而非整个嵌套结构。

坑 4:混合搜索的分数是加权组合(如 0.7 * dense_score + 0.3 * sparse_score),通常在 0.3-0.5 之间。如果阈值设为 0.7,所有结果都会被过滤。

解决方案

坑 1:为空稀疏向量添加占位符

# 获取稀疏向量,如果为空则使用最小占位符
sparse_vec = sparse_vectors.get(chunk_idx, {})
if not sparse_vec:
sparse_vec = {0: 0.0} # Milvus 不接受空 sparse vector

data = {
"dense_vector": dense_embeddings[chunk_idx],
"sparse_vector": sparse_vec, # 保证非空
"text": chunk,
"doc_id": doc_id,
"metadata": metadata
}

坑 2:搜索前加载 Collection

async def hybrid_search(self, collection_name: str, ...):
self.get_or_create_collection(collection_name)

# Milvus 2.4+ 要求:搜索前必须加载
self.client.load_collection(collection_name=collection_name)

dense_results = self.client.search(...)
sparse_results = self.client.search(...)

坑 3:正确提取 Sparse 向量

async def embed_query(self, text: str) -> dict:
result = await self._embed_batch([text], text_type="query", use_instruct=True)
# _embed_batch 返回 {"sparse": {0: sparse_vec}}
# 需要提取 index 0 的向量本身
return {
"dense": result["dense"][0],
"sparse": result["sparse"].get(0, {}) # 提取 sparse_vec
}

坑 4:调整混合搜索阈值

# config.py 或环境变量
rag_min_similarity: float = 0.3 # 过滤阈值(原 0.7 过高)
rag_refuse_similarity: float = 0.3 # 拒答阈值(原 0.5 过高)

混合搜索分数计算公式:

# 典型分数范围:0.3 - 0.5
score = dense_similarity * 0.7 + sparse_similarity * 0.3

FAQ

Q: Milvus 为什么不接受空的稀疏向量?

A: Milvus 的 SPARSE_FLOAT_VECTOR 类型要求每行至少有一个非零元素。空字典 {} 无法确定向量维度,会触发 empty sparse float vector row 错误。使用 {0: 0.0} 作为占位符即可绕过。

Q: Milvus 2.4 搜索前必须调用 load_collection 吗?

A: 是的。Milvus 2.4+ 默认不自动加载 Collection 到内存,必须显式调用 client.load_collection(collection_name) 后才能搜索。这是性能优化设计,避免不用的 Collection 占用内存。

Q: 混合搜索的分数为什么通常只有 0.3-0.5?

A: 混合搜索分数是加权组合(如 0.7 * dense + 0.3 * sparse)。即使两个检索都完美匹配(1.0),加权后最高也只有 1.0。实际场景中 dense 和 sparse 分数很少同时为 1.0,所以典型分数在 0.3-0.5。阈值应设为 0.3 左右,而非 0.7。

Q: DashScope sparse embedding 返回什么格式?

A: DashScope 返回 {"embeddings": [{"sparse_embedding": [{"index": 123, "value": 0.5}, ...]}]}。批量调用时,转换后格式为 {text_index: {dim_index: value}}。搜索时需要用 .get(0, {}) 提取第一条的 sparse 向量。