跳到主要内容

3 篇博文 含有标签「PostgreSQL」

查看所有标签

Airflow PostgresHook 多语句 SQL 静默丢结果?按分号切分逐条执行

· 阅读需 7 分钟

在 Airflow DAG 把 .sql 模板文件整段读出后传给 PostgresHook.get_pandas_df() 时,前置 SELECT 的结果被静默丢弃——DAG 报「SQL 查询无结果」,但把同一段 SQL 复制到 psql 又能正常返回数据。

在开发 AI运营 时遇到此问题——基于大语言模型的智能分析流水线,Airflow DAG 从 SQL 模板文件读取多查询报表模板并执行。

TL;DR

PostgresHook.get_pandas_df(sql) 内部走 pandas.read_sql(sql, conn)psycopg2 cursor.execute(sql)。当 sql 是含多条 ; 分隔 SELECT 的单字符串时,DBAPI 只暴露最后一个结果集的游标,前置查询结果被静默丢弃且不报错。修复方法:按顶层分号切分成 list[str],逐条 get_pandas_df 收集结果,或直接传 list 让 DbApiHook 顺序执行。

问题现象

DAG 任务执行 shop_monthly_overview.sql 报「SQL 查询无结果」:

sql_count = 1   ← 模板里明明写了 4 条查询
result = "❌ SQL 查询无结果"

但同一份 SQL 复制到 psql 直连同库同参数,4 条 SELECT 都有数据。

复现实验

在 Airflow 容器内直接验证 get_pandas_df 对多语句的行为:

from airflow.providers.postgres.hooks.postgres import PostgresHook

hook = PostgresHook(postgres_conn_id="postgres_default")

# 三条 SELECT 串成单字符串
sql = "SELECT 1 AS a; SELECT 2 AS b; SELECT 99 AS c WHERE 1=0;"

df = hook.get_pandas_df(sql)
print(df.columns.tolist()) # ['c'] ← 只拿到末条的列
print(df) # Empty ← 末条本身 0 行

预期应得到三条结果,实际只拿到末条(SELECT 99 ... WHERE 1=0,0 行),前两条完全消失,没有任何报错或警告。

根因

调用链是 PostgresHook.get_pandas_dfDbApiHook.get_pandas_dfpandas.io.sql.read_sqlpsycopg2 cursor.execute(sql)

DBAPI 协议(PEP 249)允许 execute 接受含多条 ; 分隔语句的字符串,PostgreSQL 服务端会依次执行全部语句,但游标只暴露最后一个结果集——这是 PostgreSQL wire protocol 的固有行为,不是 Airflow 或 pandas 的 bug。

┌────────────────────────────────────────────────────┐
│ SELECT 1; ← 执行,结果集 1 立即被丢弃 │
│ SELECT 2; ← 执行,结果集 2 立即被丢弃 │
│ SELECT 99 WHERE 1=0; ← 执行,结果集 3 暴露给游标 │
└────────────────────────────────────────────────────┘

pandas.read_sql 只 fetch 到结果集 3

源头是 task_execute_sql.sql 文件整段读出后当成一条字符串传进 get_pandas_df

# ❌ 问题代码
sql_text = open(sql_path).read() # 含 4 条 SELECT 的整段
df = pg_hook.get_pandas_df(sql_text) # 只拿到末条结果

为什么 psql 能正常返回?因为 psql 前端会主动遍历所有结果集并依次打印,而 DBAPI 游标不会。

解决方案

方案 A(推荐):按顶层分号切分后逐条执行

适合 .sql 模板文件场景——文件含注释、引号、多查询,需要稳健的切分。

def split_sql_statements(sql: str) -> list:
"""
按顶层分号切分 SQL,正确处理:
- 单引号字符串内的分号('a;b' 不切)
- SQL 标准 '' 转义('it''s' 不切)
- -- 行注释内的分号(-- note; not split 不切)
"""
statements = []
buf = []
i, n = 0, len(sql)
in_quote = False

while i < n:
ch = sql[i]

# 在单引号字符串内
if in_quote:
buf.append(ch)
if ch == "'":
# '' = 字面量单引号,不结束字符串
if i + 1 < n and sql[i + 1] == "'":
buf.append(sql[i + 1])
i += 2
continue
in_quote = False
i += 1
continue

# 顶层
if ch == "'":
in_quote = True
buf.append(ch)
elif ch == '-' and i + 1 < n and sql[i + 1] == '-':
# 行注释,原样吞到行尾(注释里的 ; 不切分)
while i < n and sql[i] != '\n':
buf.append(sql[i])
i += 1
continue
elif ch == ';':
stmt = ''.join(buf).strip()
if stmt:
statements.append(stmt)
buf = []
i += 1
continue
else:
buf.append(ch)
i += 1

# 末尾无分号的残留块
stmt = ''.join(buf).strip()
if stmt:
statements.append(stmt)

return statements


# 调用方
sql_text = open(sql_path).read()
statements = split_sql_statements(sql_text)

# 逐条执行,收集所有结果
all_results = []
for idx, stmt in enumerate(statements, start=1):
df = pg_hook.get_pandas_df(stmt)
if not df.empty:
all_results.append({
"sql_index": idx,
"sql": stmt,
"data": df.to_dict("records"),
"columns": df.columns.tolist(),
"row_count": len(df),
})

方案 B:直接传 list 给 DbApiHook

Airflow DbApiHook.runget_records 接受 list[str] 参数会按顺序执行——但 get_pandas_df 在 list 模式下的返回行为各 provider 实现不一致,生产环境建议用方案 A 自己控制。

为什么不用 sqlparse.split

社区答案常推荐 sqlparse.split(sqlparse.format(sql, strip_comments=True)),但 strip_comments=True丢掉注释,如果你的下游 processor 依赖注释中的元信息(如 -- dimension: shop),就会丢失上下文。手写切分器保留注释原文,行为可控。

注意事项

注意事项

  • 不要用 sql.split(';') 简单切分——会误切 WHERE name = 'a;b' 这类引号内的分号,以及 -- 注释; 行注释里的分号
  • split_sql_statements 只处理单引号字符串和 -- 行注释;如果你的 SQL 用 /* 块注释 */ 或 dollar-quoted string($$...$$),需要扩展切分器
  • 修复后下游 processor 的 sql_index 语义会变(1-based 顺序索引),同步检查所有 df.iloc[sql_index] 类用法
  • 如果你的 SQL 是程序生成而非文件读取,更安全的做法是生成时就用 list,避免后续切分
  • 顺带提一个相邻的坑:如果你在 Drizzle ORM 里也遇到过 SQL 表达式被静默参数化的问题,可以看 Drizzle sql 模板混用参数化值与 SQL 表达式——同样是「框架替你做了你没预期到的转换」类陷阱

常见问题

Airflow PostgresHook 怎么执行多条 SQL 语句?

list[str] 而不是单条字符串。DbApiHook.get_pandas_dfrun 接受 sql 参数为 list 时按顺序逐条执行;单字符串含多条分号分隔语句时 psycopg2 只返回末条结果集。生产环境推荐自己切分后逐条调用,方便控制结果聚合和 sql_index 索引。

为什么 get_pandas_df 多语句 SQL 只返回最后一条结果?

pandas.io.sql.read_sqlpsycopg2 cursor.execute 执行整段字符串,DBAPI 协议对多语句只暴露最后一个结果集的游标,前置 SELECT 结果被服务端立即丢弃,不报错也不警告。psql 能正常返回是因为 psql 前端会主动遍历所有结果集,DBAPI 游标不会。

怎么安全地按分号切分含注释和引号的 SQL?

逐字符扫描,仅在「非单引号内、非 -- 行注释内」的顶层分号处切分。单引号字面量用 SQL 标准 '' 转义;不要用 str.split(';'),会误切注释和字符串里的分号。如果用 sqlparse.split,注意 strip_comments=True 会丢掉注释原文。


CCLEE

独立开发者,24年电商行业实战经验,专注将AI能力落地于真实商业场景。

合作咨询

UPSERT 写入全零?Drizzle sql 模板混用参数化值与 SQL 表达式的坑

· 阅读需 4 分钟

在为客户构建电商数据分析平台时遇到此问题,记录根因与解法。

TL;DR

Drizzle ORM 的 sql 模板标签中,sql.join(values.map(v => sql(v))) 会把所有值参数化传递。如果 values 数组里混入了 SQL 表达式(如 date_trunc('week', '2026-05-17'::date)::date),PostgreSQL 会把它当成普通字符串解析,报 invalid input syntax for type date 错误。SQL 表达式必须用 sql.raw() 或单独写在模板外部

问题现象

电商数据采集流程:Chrome 扩展采集 → CCLHub 转发 → Analytics 写库。现象:

  1. CCLHub 日志显示采集数据正常(uv: 403, payAmt: 19478.47
  2. Analytics 返回 200 成功
  3. 但数据库查询结果全是 0uv: 0, pay_amt: 0.00
-- 数据库实际数据
report_date | uv | pay_amt | reveal_cnt
-------------+-----+----------+------------
2026-05-12 | 392 | 7333.67 | 11879 -- 旧数据正常
2026-05-13 | 0 | 0.00 | 0 -- 新数据全零!

同时 Analytics 错误日志有:

PostgresError: invalid input syntax for type date:
"date_trunc('week', '2026-05-17'::date)::date"

根因

原始代码混用了参数化值和 SQL 表达式:

// ❌ 问题代码
const insertVals: (string | number | null)[] = [
String(shop_id),
String(platform_id),
reportDate,
tenant_id,
`date_trunc('week', '${reportDate}'::date)::date`, // ← SQL 表达式
];

// sql.join 会把所有值参数化,包括 date_trunc 表达式
await db.execute(sql`
INSERT INTO table (..., week_start_date)
VALUES (${sql.join(insertVals.map(v => sql`${v}`), sql`,`)})
...
`);

生成的 SQL:

-- PostgreSQL 收到的 $5 参数值是字面字符串
INSERT INTO table (..., week_start_date)
VALUES ($1, $2, $3, $4, $5, ...)
-- $5 = "date_trunc('week', '2026-05-17'::date)::date" ← 被当字符串!

PostgreSQL 尝试把 "date_trunc('week', '2026-05-17'::date)::date" 解析为 date 类型 → 报错。

为什么数据是 0 而不是报错? 因为同一张表有独立的询盘写入(PARTIAL UPSERT),询盘 INSERT 成功创建了行(看板列默认值 0),日报 UPSERT 失败但没有回滚已存在的行。

解决方案

把 SQL 表达式从参数化数组中分离出来,用 sql.raw() 或直接写在模板中:

// ✅ 修复:参数化值和 SQL 表达式分开
const insertCols = ['shop_id', 'platform_id', 'report_date', 'tenant_id'];
const insertVals: (string | number | null)[] = [
String(shop_id), String(platform_id), reportDate, tenant_id,
];

// 19 个数据列正常参数化
for (const [apiKey, dbCol] of Object.entries(DAILY_COLUMNS)) {
insertCols.push(dbCol);
insertVals.push(row[apiKey] != null ? String(row[apiKey]) : '0');
}

// week_start_date 用 SQL 表达式,不进参数化数组
await db.execute(sql`
INSERT INTO table (${sql.raw(insertCols.join(', '))}, week_start_date)
VALUES (
${sql.join(insertVals.map(v => sql`${v}`), sql`,`)},
date_trunc('week', ${reportDate}::date)::date -- ← 直接写在模板里
)
...
`);

关键区别:

写法Drizzle 处理方式PostgreSQL 收到
sql 模板插值参数化($N字符串字面量
sql.raw(expression)原样拼入 SQLSQL 表达式
直接写在 sql 模板中作为模板的一部分SQL 表达式

注意事项

注意事项

  • sql.raw() 存在 SQL 注入风险,不要用于用户输入。本例中 reportDate 来自内部 API,格式可控
  • Drizzle 的 sql 模板标签会自动参数化所有插值——这是安全特性,但 SQL 函数调用不该被参数化
  • 如果整条 SQL 都是动态构建的,考虑用 Drizzle 的 query builder API 代替 raw SQL
  • 数据库连接配置也容易踩坑——如果你遇到连接到了错误的 PostgreSQL 实例,可能是端口被 Docker 静默占用
  • 环境变量加载时序也是常见坑源,JWT 签名静默失败就是 dotenv 在 import 链之后才执行的典型例子

WSL2 + Docker 两个网络坑:端口被静默占用 & host 模式 localhost 不通

· 阅读需 5 分钟

TL;DR

WSL2 + Docker Desktop 有两个常见的网络坑:

  1. 端口被静默占用:Docker 容器映射 5432 后,SSH 隧道 localhost:5432 连到的是容器内的 PostgreSQL 而非远程服务器——密码没错,连的实例错了
  2. host 模式 localhost 不通network_mode: host 共享的是 Docker 工具 VM 网络,不是 WSL2 网络——curl localhost:8080 失败