Skip to main content

5 posts tagged with "Python"

View all tags

Unify Multiple Search APIs with Abstract Class, Return Errors Instead of Raising

· 4 min read

Encountered this issue while building an AI Agent platform for a client: needed to support multiple search providers (Tavily, Serper, Brave, Bing) while ensuring tool call failures don't interrupt the Agent's conversation flow.

TL;DR

  1. Define SearchProvider abstract base class + SearchResult data model for unified interface and output
  2. Each provider inherits the base class, implements search() method with field mapping
  3. Key design: Return SearchResult with error info on failure, never raise exceptions

The Problem

Direct calls to different search APIs look like this:

# Tavily: POST request, results[].url
response = await client.post("https://api.tavily.com/search", ...)

# Serper: POST request, organic[].link
response = await client.post("https://google.serper.dev/search", ...)

# Brave: GET request, web.results[].description
response = await client.get("https://api.search.brave.com/res/v1/web/search", ...)

# Bing: GET request, webPages.value[].snippet
response = await client.get("https://api.bing.microsoft.com/v7.0/search", ...)

Issues:

  1. Request methods, auth headers, and response structures vary
  2. Switching providers requires changing caller code
  3. raise Exception interrupts AI Agent's streaming conversation

Root Cause

  1. Missing abstraction layer: Caller directly depends on concrete implementations, violating dependency inversion
  2. Inconsistent error handling: Exceptions propagate up the call stack, crashing the entire streaming flow

For AI Agent tool calls, the Agent needs to decide whether to retry, use another tool, or explain to the user—not just crash.

Solution

1. Define Abstract Base Class and Data Model

# base.py
from abc import ABC, abstractmethod
from typing import List
from pydantic import BaseModel


class SearchResult(BaseModel):
"""Unified search result."""
title: str
link: str
snippet: str


class SearchProvider(ABC):
"""Base class for search providers."""

def __init__(self, api_key: str):
self.api_key = api_key

@abstractmethod
async def search(self, query: str, max_results: int = 5) -> List[SearchResult]:
"""Execute search and return results."""
pass

2. Implement Concrete Providers

Tavily (AI-optimized search, supports rate limit / quota error codes):

# tavily.py
import httpx
import logging
from typing import List
from .base import SearchProvider, SearchResult

logger = logging.getLogger(__name__)


class TavilySearch(SearchProvider):
"""Tavily Search API implementation."""

async def search(self, query: str, max_results: int = 5) -> List[SearchResult]:
try:
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.post(
"https://api.tavily.com/search",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"query": query,
"max_results": max_results,
"search_depth": "basic"
}
)

# Return SearchResult on error, never raise
if response.status_code == 429:
return [SearchResult(
title="Rate Limited",
link="",
snippet="Search quota exceeded. Please try again later."
)]

if response.status_code == 401:
return [SearchResult(
title="Auth Error",
link="",
snippet="Search API key is invalid."
)]

if response.status_code == 402:
return [SearchResult(
title="Quota Exceeded",
link="",
snippet="Monthly search quota depleted."
)]

response.raise_for_status()
data = response.json()

# Field mapping: Tavily's url -> unified link
results = []
for item in data.get("results", [])[:max_results]:
results.append(SearchResult(
title=item.get("title", ""),
link=item.get("url", ""),
snippet=item.get("content", "")
))
return results

except httpx.TimeoutException:
logger.warning(f"Tavily API timeout: {query[:50]}")
return [SearchResult(title="Timeout", link="", snippet="Search timed out.")]
except Exception as e:
logger.error(f"Tavily search error: {e}")
return [SearchResult(title="Error", link="", snippet=f"Search failed: {str(e)}")]

Serper (Google Search API):

# serper.py
class SerperSearch(SearchProvider):
"""Serper (Google Search) API implementation."""

async def search(self, query: str, max_results: int = 5) -> List[SearchResult]:
try:
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.post(
"https://google.serper.dev/search",
headers={"X-API-KEY": self.api_key, "Content-Type": "application/json"},
json={"q": query, "num": max_results}
)

if response.status_code == 401:
return [SearchResult(title="Auth Error", link="", snippet="Serper API key is invalid.")]

response.raise_for_status()
data = response.json()

# Field mapping: Serper's organic[].link -> unified link
results = []
for item in data.get("organic", [])[:max_results]:
results.append(SearchResult(
title=item.get("title", ""),
link=item.get("link", ""),
snippet=item.get("snippet", "")
))
return results

except httpx.TimeoutException:
return [SearchResult(title="Timeout", link="", snippet="Search timed out.")]
except Exception as e:
return [SearchResult(title="Error", link="", snippet=f"Search failed: {str(e)}")]

Brave and Bing implementations are similar, differing in request method and response field mapping.

3. Caller Usage

# Depend on abstraction only
async def execute_search(provider: SearchProvider, query: str) -> List[SearchResult]:
results = await provider.search(query)

# Check for errors (via title or snippet)
if results and not results[0].link:
error_msg = results[0].snippet
# Agent can decide next action based on error info
return f"Search failed: {error_msg}"

return results


# Switch providers by changing instance only
provider = TavilySearch(api_key="xxx")
# provider = SerperSearch(api_key="xxx")
results = await execute_search(provider, "Python async best practices")

Key Design Decisions

DecisionReason
Return SearchResult on error instead of raiseAI Agent conversations are streaming flows; exceptions interrupt everything
Use Pydantic BaseModel for outputAuto-validation + IDE hints + JSON serialization
Use ABC instead of ProtocolNeed shared __init__ logic (api_key storage)
Unified 15-second timeoutSearch is UX-critical; can't be too slow

Interested in similar solutions? Get in touch

Fix the Hidden Pitfall of httpx async with client.post()

· 2 min read

Encountered this issue while building a multi-service SaaS system. Documenting the root cause and solution.

TL;DR

Don't use async with client.post() pattern with httpx.AsyncClient. Create the client first, then call methods: response = await client.post().

Problem Symptoms

import httpx

async def call_api():
async with httpx.AsyncClient() as client:
async with client.post(url, json=data) as response: # Problem code
return response.json()

This code sometimes works, sometimes errors:

httpx.RemoteProtocolError: cannot write to closing transport
RuntimeError: Session is closed

Root Cause

The async with client.post() Trap

client.post() returns a Response object, not a context manager. Wrapping it with async with causes:

  1. Premature connection closure: The connection closes immediately when the async with block ends, but the response may still be reading
  2. Resource contention: With concurrent requests, connection pool state becomes chaotic

Understanding httpx Context Managers Correctly

# ✅ Correct: client is the context manager
async with httpx.AsyncClient() as client:
response = await client.post(url, json=data)
return response.json()

# ❌ Wrong: treating response as context manager
async with client.post(url) as response:
...

Solution

Option 1: Single Request (Simple Scenarios)

async def call_api(url: str, data: dict) -> dict:
async with httpx.AsyncClient() as client:
response = await client.post(url, json=data)
response.raise_for_status()
return response.json()

Option 2: Reuse Client (High-Frequency Requests)

# Global or dependency injection
_client = httpx.AsyncClient(timeout=30.0)

async def call_api(url: str, data: dict) -> dict:
response = await _client.post(url, json=data)
response.raise_for_status()
return response.json()

# On app shutdown
async def shutdown():
await _client.aclose()

Option 3: FastAPI Dependency Injection

from fastapi import Depends
from httpx import AsyncClient

async def get_http_client() -> AsyncClient:
async with AsyncClient(timeout=30.0) as client:
yield client

@router.post("/proxy")
async def proxy(
data: dict,
client: AsyncClient = Depends(get_http_client)
):
response = await client.post("https://external.api/endpoint", json=data)
return response.json()

FAQ

Q: How should httpx async with be used correctly?

A: async with is only for managing AsyncClient lifecycle, not wrapping individual requests. Correct pattern: async with AsyncClient() as client: response = await client.post(...).

Q: Why does async with client.post() sometimes work?

A: It may work by chance in single-threaded, low-concurrency scenarios, but will fail under high concurrency or network latency. This is a hidden bug—don't rely on it.

Q: How to configure httpx timeout?

A: AsyncClient(timeout=30.0) or AsyncClient(timeout=httpx.Timeout(connect=5.0, read=30.0)).

Fix Pydantic v2 ORM Mode model_config Override Error

· 2 min read

TL;DR

Pydantic v2 no longer supports class Config. Use model_config = ConfigDict(from_attributes=True) instead. If your model has a field named model_config, you must rename it to avoid conflict with the reserved attribute.

Problem Symptoms

Error 1: class Config Not Working

from pydantic import BaseModel

class AgentResponse(BaseModel):
id: str
name: str

class Config:
orm_mode = True # v1 style
PydanticUserError: `orm_mode` is not a valid config option. Did you mean `from_attributes`?

Error 2: model_config Field Conflict

class Agent(BaseModel):
id: str
model_config: dict # Business field storing LLM config

model_config = ConfigDict(from_attributes=True)
# TypeError: 'dict' object is not callable

Your model has a business field called model_config (storing LLM configuration), which conflicts with Pydantic v2's reserved name.

Root Cause

1. Pydantic v2 Configuration Syntax Change

Pydantic v2 uses model_config as the configuration attribute name, no longer supporting nested class Config:

Pydantic v1Pydantic v2
class Config: orm_mode = Truemodel_config = ConfigDict(from_attributes=True)
class Config: schema_extra = {...}model_config = ConfigDict(json_schema_extra={...})

2. model_config is a Reserved Name

model_config is a special attribute in Pydantic v2 and cannot be used as a business field name simultaneously.

Solution

1. Update ORM Mode Configuration

from pydantic import BaseModel, ConfigDict

class AgentResponse(BaseModel):
model_config = ConfigDict(from_attributes=True) # New syntax

id: str
name: str

2. Rename Conflicting Field

Rename the business field model_config to llm_config (or any non-reserved name):

# models/agent.py
class Agent(BaseModel):
__tablename__ = "agent_agents"

id: str
llm_config: dict # Renamed to avoid conflict

# schemas/agent.py
class AgentResponse(BaseModel):
model_config = ConfigDict(from_attributes=True)

agent_id: str
llm_config: LlmConfig # Keep consistent with model

3. Database Migration (If Needed)

If the database column also needs renaming:

# alembic/versions/xxx_rename_model_config.py
def upgrade():
op.alter_column('agent_agents', 'model_config', new_column_name='llm_config')

def downgrade():
op.alter_column('agent_agents', 'llm_config', new_column_name='model_config')

FAQ

Q: What did Pydantic v2 replace orm_mode with?

A: It's now from_attributes=True, and the configuration syntax changed from class Config to model_config = ConfigDict(...).

Q: Why is my model_config field causing errors?

A: model_config is a reserved attribute name in Pydantic v2 for configuring model behavior. If your business code has a field with the same name, you need to rename it.

Q: What other common ConfigDict options exist?

A: from_attributes (ORM mode), json_schema_extra (schema extension), str_strip_whitespace (auto strip whitespace), validate_assignment (validate on assignment).

修复 RAG 查询返回的 sources 缺少 similarity 字段

· 3 min read

在 RAG 知识库项目中调试查询结果返回格式问题,以下是完整排查过程。

TL;DR

RAG /query 接口返回的 sources 字段只包含 metadata,没有每条来源的 similarity 分数。解决方案:在组装响应时,将 metadatasdistances 合并,计算 similarity = 1 - distance

问题现象

调用 RAG 查询接口,返回的 sources 缺少相似度信息:

{
"answer": "根据文档...",
"sources": [
{"doc_id": "doc_001", "title": "API 文档", "source": "github"},
{"doc_id": "doc_002", "title": "开发指南", "source": "github"}
],
"similarity": 0.85
}

问题:

  • sources 数组中的每个对象没有 similarity 字段
  • 只有顶层的 similarity(最高相似度),无法知道每条来源的相关性
  • 前端无法按相似度排序或高亮显示

根因

原始代码直接返回 metadata,忽略了 distances 信息:

# 问题代码
result = {
"answer": answer,
"sources": search_results.get("metadatas", [[]])[0], # 只有 metadata
"collection": collection,
"similarity": max_similarity # 只有最高分
}

向量数据库(如 Milvus、Chroma)的检索结果通常包含三个数组:

  • documents: 文本内容
  • metadatas: 元数据
  • distances: 距离分数(越小越相似)

疏漏:只传递了 metadata,没有把 distance 转换为 similarity 并合并到 sources 中。

解决方案

合并 metadatasdistances,计算每条来源的相似度:

# 修复代码
metadatas = search_results.get("metadatas", [[]])[0]
distances = search_results.get("distances", [[]])[0]

sources = [
{**meta, "similarity": round(1 - dist, 3)}
for meta, dist in zip(metadatas, distances)
]

result = {
"answer": answer,
"sources": sources, # 现在包含 similarity
"collection": collection,
"similarity": max_similarity
}

修复后返回:

{
"answer": "根据文档...",
"sources": [
{"doc_id": "doc_001", "title": "API 文档", "similarity": 0.85},
{"doc_id": "doc_002", "title": "开发指南", "similarity": 0.72}
],
"similarity": 0.85
}

完整代码示例

async def query_handler(request):
# 1. 执行向量检索
search_results = await milvus_service.query(
collection_name=collection,
query_embeddings=[query_embedding],
n_results=5
)

# 2. 生成答案
answer = await llm.generate(context, question)

# 3. 组装 sources(合并 metadata 和 similarity)
metadatas = search_results.get("metadatas", [[]])[0]
distances = search_results.get("distances", [[]])[0]

sources = [
{**meta, "similarity": round(1 - dist, 3)}
for meta, dist in zip(metadatas, distances)
]

# 4. 计算最高相似度
max_similarity = max(s["similarity"] for s in sources) if sources else 0

return {
"answer": answer,
"sources": sources,
"similarity": max_similarity
}

FAQ

Q: 为什么 similarity = 1 - distance?

A: 向量数据库通常返回距离(distance)而非相似度(similarity)。对于余弦距离,cosine_distance = 1 - cosine_similarity,所以 similarity = 1 - distance。对于欧氏距离,需要用 similarity = 1 / (1 + distance) 等公式转换。

Q: 顶层 similarity 和 sources 中的 similarity 有什么区别?

A: 顶层 similarity 是最高相似度(最相关的那条来源),用于判断整体回答质量。sources 中每条记录的 similarity 表示该来源的相关性,用于排序、高亮或过滤。

Q: 如果 distance 不是余弦距离怎么办?

A: 需要根据距离类型调整公式:

  • 余弦距离:similarity = 1 - distance
  • 欧氏距离:similarity = 1 / (1 + distance)
  • 内积:similarity = distance(已经是相似度)

检查你的向量数据库配置,确认使用的是哪种距离度量。

修复 Milvus 混合搜索的四个常见坑

· 3 min read

在 RAG 知识库项目中调试混合检索评分问题,以下是完整排查过程。

TL;DR

Milvus 混合搜索(Dense + Sparse)有四个常见坑:空稀疏向量报错、Collection 未加载、sparse 格式错误、阈值过高。本文给出每个问题的最小修复代码。

问题现象

坑 1:空稀疏向量插入失败

MilvusException: (code=65535, message=empty sparse float vector row)

坑 2:Collection 未加载

MilvusException: (code=101, message=failed to search: collection not loaded[collection=xxx])

坑 3:Sparse 向量格式错误

ParamError: (code=1, message=`search_data` value [{0: {81705: 1.3486}}] is illegal)

坑 4:搜索无结果(分数被过滤)

{"answer": "抱歉,知识库中没有相关内容", "similarity": 0.0}

根因

坑 1:Milvus 的 SPARSE_FLOAT_VECTOR 类型不接受空字典 {},必须有至少一个键值对。

坑 2:Milvus 2.4+ 要求搜索前显式调用 load_collection(),否则报 collection not loaded。

坑 3:DashScope API 返回的 sparse 格式是 {text_index: sparse_vec},搜索时需要提取 sparse_vec 本身,而非整个嵌套结构。

坑 4:混合搜索的分数是加权组合(如 0.7 * dense_score + 0.3 * sparse_score),通常在 0.3-0.5 之间。如果阈值设为 0.7,所有结果都会被过滤。

解决方案

坑 1:为空稀疏向量添加占位符

# 获取稀疏向量,如果为空则使用最小占位符
sparse_vec = sparse_vectors.get(chunk_idx, {})
if not sparse_vec:
sparse_vec = {0: 0.0} # Milvus 不接受空 sparse vector

data = {
"dense_vector": dense_embeddings[chunk_idx],
"sparse_vector": sparse_vec, # 保证非空
"text": chunk,
"doc_id": doc_id,
"metadata": metadata
}

坑 2:搜索前加载 Collection

async def hybrid_search(self, collection_name: str, ...):
self.get_or_create_collection(collection_name)

# Milvus 2.4+ 要求:搜索前必须加载
self.client.load_collection(collection_name=collection_name)

dense_results = self.client.search(...)
sparse_results = self.client.search(...)

坑 3:正确提取 Sparse 向量

async def embed_query(self, text: str) -> dict:
result = await self._embed_batch([text], text_type="query", use_instruct=True)
# _embed_batch 返回 {"sparse": {0: sparse_vec}}
# 需要提取 index 0 的向量本身
return {
"dense": result["dense"][0],
"sparse": result["sparse"].get(0, {}) # 提取 sparse_vec
}

坑 4:调整混合搜索阈值

# config.py 或环境变量
rag_min_similarity: float = 0.3 # 过滤阈值(原 0.7 过高)
rag_refuse_similarity: float = 0.3 # 拒答阈值(原 0.5 过高)

混合搜索分数计算公式:

# 典型分数范围:0.3 - 0.5
score = dense_similarity * 0.7 + sparse_similarity * 0.3

FAQ

Q: Milvus 为什么不接受空的稀疏向量?

A: Milvus 的 SPARSE_FLOAT_VECTOR 类型要求每行至少有一个非零元素。空字典 {} 无法确定向量维度,会触发 empty sparse float vector row 错误。使用 {0: 0.0} 作为占位符即可绕过。

Q: Milvus 2.4 搜索前必须调用 load_collection 吗?

A: 是的。Milvus 2.4+ 默认不自动加载 Collection 到内存,必须显式调用 client.load_collection(collection_name) 后才能搜索。这是性能优化设计,避免不用的 Collection 占用内存。

Q: 混合搜索的分数为什么通常只有 0.3-0.5?

A: 混合搜索分数是加权组合(如 0.7 * dense + 0.3 * sparse)。即使两个检索都完美匹配(1.0),加权后最高也只有 1.0。实际场景中 dense 和 sparse 分数很少同时为 1.0,所以典型分数在 0.3-0.5。阈值应设为 0.3 左右,而非 0.7。

Q: DashScope sparse embedding 返回什么格式?

A: DashScope 返回 {"embeddings": [{"sparse_embedding": [{"index": 123, "value": 0.5}, ...]}]}。批量调用时,转换后格式为 {text_index: {dim_index: value}}。搜索时需要用 .get(0, {}) 提取第一条的 sparse 向量。