Skip to main content

2 posts tagged with "httpx"

View all tags

Unify Multiple Search APIs with Abstract Class, Return Errors Instead of Raising

· 4 min read

Encountered this issue while building an AI Agent platform for a client: needed to support multiple search providers (Tavily, Serper, Brave, Bing) while ensuring tool call failures don't interrupt the Agent's conversation flow.

TL;DR

  1. Define SearchProvider abstract base class + SearchResult data model for unified interface and output
  2. Each provider inherits the base class, implements search() method with field mapping
  3. Key design: Return SearchResult with error info on failure, never raise exceptions

The Problem

Direct calls to different search APIs look like this:

# Tavily: POST request, results[].url
response = await client.post("https://api.tavily.com/search", ...)

# Serper: POST request, organic[].link
response = await client.post("https://google.serper.dev/search", ...)

# Brave: GET request, web.results[].description
response = await client.get("https://api.search.brave.com/res/v1/web/search", ...)

# Bing: GET request, webPages.value[].snippet
response = await client.get("https://api.bing.microsoft.com/v7.0/search", ...)

Issues:

  1. Request methods, auth headers, and response structures vary
  2. Switching providers requires changing caller code
  3. raise Exception interrupts AI Agent's streaming conversation

Root Cause

  1. Missing abstraction layer: Caller directly depends on concrete implementations, violating dependency inversion
  2. Inconsistent error handling: Exceptions propagate up the call stack, crashing the entire streaming flow

For AI Agent tool calls, the Agent needs to decide whether to retry, use another tool, or explain to the user—not just crash.

Solution

1. Define Abstract Base Class and Data Model

# base.py
from abc import ABC, abstractmethod
from typing import List
from pydantic import BaseModel


class SearchResult(BaseModel):
"""Unified search result."""
title: str
link: str
snippet: str


class SearchProvider(ABC):
"""Base class for search providers."""

def __init__(self, api_key: str):
self.api_key = api_key

@abstractmethod
async def search(self, query: str, max_results: int = 5) -> List[SearchResult]:
"""Execute search and return results."""
pass

2. Implement Concrete Providers

Tavily (AI-optimized search, supports rate limit / quota error codes):

# tavily.py
import httpx
import logging
from typing import List
from .base import SearchProvider, SearchResult

logger = logging.getLogger(__name__)


class TavilySearch(SearchProvider):
"""Tavily Search API implementation."""

async def search(self, query: str, max_results: int = 5) -> List[SearchResult]:
try:
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.post(
"https://api.tavily.com/search",
headers={"Authorization": f"Bearer {self.api_key}"},
json={
"query": query,
"max_results": max_results,
"search_depth": "basic"
}
)

# Return SearchResult on error, never raise
if response.status_code == 429:
return [SearchResult(
title="Rate Limited",
link="",
snippet="Search quota exceeded. Please try again later."
)]

if response.status_code == 401:
return [SearchResult(
title="Auth Error",
link="",
snippet="Search API key is invalid."
)]

if response.status_code == 402:
return [SearchResult(
title="Quota Exceeded",
link="",
snippet="Monthly search quota depleted."
)]

response.raise_for_status()
data = response.json()

# Field mapping: Tavily's url -> unified link
results = []
for item in data.get("results", [])[:max_results]:
results.append(SearchResult(
title=item.get("title", ""),
link=item.get("url", ""),
snippet=item.get("content", "")
))
return results

except httpx.TimeoutException:
logger.warning(f"Tavily API timeout: {query[:50]}")
return [SearchResult(title="Timeout", link="", snippet="Search timed out.")]
except Exception as e:
logger.error(f"Tavily search error: {e}")
return [SearchResult(title="Error", link="", snippet=f"Search failed: {str(e)}")]

Serper (Google Search API):

# serper.py
class SerperSearch(SearchProvider):
"""Serper (Google Search) API implementation."""

async def search(self, query: str, max_results: int = 5) -> List[SearchResult]:
try:
async with httpx.AsyncClient(timeout=15.0) as client:
response = await client.post(
"https://google.serper.dev/search",
headers={"X-API-KEY": self.api_key, "Content-Type": "application/json"},
json={"q": query, "num": max_results}
)

if response.status_code == 401:
return [SearchResult(title="Auth Error", link="", snippet="Serper API key is invalid.")]

response.raise_for_status()
data = response.json()

# Field mapping: Serper's organic[].link -> unified link
results = []
for item in data.get("organic", [])[:max_results]:
results.append(SearchResult(
title=item.get("title", ""),
link=item.get("link", ""),
snippet=item.get("snippet", "")
))
return results

except httpx.TimeoutException:
return [SearchResult(title="Timeout", link="", snippet="Search timed out.")]
except Exception as e:
return [SearchResult(title="Error", link="", snippet=f"Search failed: {str(e)}")]

Brave and Bing implementations are similar, differing in request method and response field mapping.

3. Caller Usage

# Depend on abstraction only
async def execute_search(provider: SearchProvider, query: str) -> List[SearchResult]:
results = await provider.search(query)

# Check for errors (via title or snippet)
if results and not results[0].link:
error_msg = results[0].snippet
# Agent can decide next action based on error info
return f"Search failed: {error_msg}"

return results


# Switch providers by changing instance only
provider = TavilySearch(api_key="xxx")
# provider = SerperSearch(api_key="xxx")
results = await execute_search(provider, "Python async best practices")

Key Design Decisions

DecisionReason
Return SearchResult on error instead of raiseAI Agent conversations are streaming flows; exceptions interrupt everything
Use Pydantic BaseModel for outputAuto-validation + IDE hints + JSON serialization
Use ABC instead of ProtocolNeed shared __init__ logic (api_key storage)
Unified 15-second timeoutSearch is UX-critical; can't be too slow

Interested in similar solutions? Get in touch

Fix the Hidden Pitfall of httpx async with client.post()

· 2 min read

Encountered this issue while building a multi-service SaaS system. Documenting the root cause and solution.

TL;DR

Don't use async with client.post() pattern with httpx.AsyncClient. Create the client first, then call methods: response = await client.post().

Problem Symptoms

import httpx

async def call_api():
async with httpx.AsyncClient() as client:
async with client.post(url, json=data) as response: # Problem code
return response.json()

This code sometimes works, sometimes errors:

httpx.RemoteProtocolError: cannot write to closing transport
RuntimeError: Session is closed

Root Cause

The async with client.post() Trap

client.post() returns a Response object, not a context manager. Wrapping it with async with causes:

  1. Premature connection closure: The connection closes immediately when the async with block ends, but the response may still be reading
  2. Resource contention: With concurrent requests, connection pool state becomes chaotic

Understanding httpx Context Managers Correctly

# ✅ Correct: client is the context manager
async with httpx.AsyncClient() as client:
response = await client.post(url, json=data)
return response.json()

# ❌ Wrong: treating response as context manager
async with client.post(url) as response:
...

Solution

Option 1: Single Request (Simple Scenarios)

async def call_api(url: str, data: dict) -> dict:
async with httpx.AsyncClient() as client:
response = await client.post(url, json=data)
response.raise_for_status()
return response.json()

Option 2: Reuse Client (High-Frequency Requests)

# Global or dependency injection
_client = httpx.AsyncClient(timeout=30.0)

async def call_api(url: str, data: dict) -> dict:
response = await _client.post(url, json=data)
response.raise_for_status()
return response.json()

# On app shutdown
async def shutdown():
await _client.aclose()

Option 3: FastAPI Dependency Injection

from fastapi import Depends
from httpx import AsyncClient

async def get_http_client() -> AsyncClient:
async with AsyncClient(timeout=30.0) as client:
yield client

@router.post("/proxy")
async def proxy(
data: dict,
client: AsyncClient = Depends(get_http_client)
):
response = await client.post("https://external.api/endpoint", json=data)
return response.json()

FAQ

Q: How should httpx async with be used correctly?

A: async with is only for managing AsyncClient lifecycle, not wrapping individual requests. Correct pattern: async with AsyncClient() as client: response = await client.post(...).

Q: Why does async with client.post() sometimes work?

A: It may work by chance in single-threaded, low-concurrency scenarios, but will fail under high concurrency or network latency. This is a hidden bug—don't rely on it.

Q: How to configure httpx timeout?

A: AsyncClient(timeout=30.0) or AsyncClient(timeout=httpx.Timeout(connect=5.0, read=30.0)).