跳到主要内容

2 篇博文 含有标签「AsyncLocalStorage」

查看所有标签

Node.js AsyncLocalStorage 在回调里读不到值?EventEmitter 越界丢失上下文

· 阅读需 5 分钟

请求日志中间件在 res.on('finish') 回调里读 AsyncLocalStorage 的 traceId,getStore() 返回 undefined,每条响应日志的 traceId 都是空的。

在为客户开发 电商数据采集工具 时遇到此问题——服务端用 ALS 把每条请求的 traceId 贯穿整条处理链路,但响应日志死活关联不上,排查发现是「晚回调」丢了上下文。

TL;DR

res.on('finish') 这类 EventEmitter 回调,触发时已经脱离了注册它时的 async context,als.getStore() 自然拿不到请求的 store。最稳的解法是在同步段把值取到闭包变量,回调里直接用闭包值;需要完整 store 时则在回调内 als.run(store, fn) 重建上下文。

问题现象

一个看起来毫无问题的请求日志中间件:

// middleware/requestLog.js
import { als } from '../utils/als.js';

app.use((req, res, next) => {
res.on('finish', () => {
const store = als.getStore();
logger.info({
traceId: store?.traceId, // 响应日志里这里永远是 undefined
statusCode: res.statusCode,
}, 'request');
});
next();
});

中间件顺序没问题,traceId 在请求处理链路里(路由、业务函数)都读得到,唯独 res.on('finish') 里读不到。更迷惑的是:把 als.getStore() 挪到 next() 之前的同步段,它就有值。

根因

AsyncLocalStorage 靠 Node 的 async_hooks 把 store 绑定到当前激活的 async context 上,顺着异步调用链往下传。als.run(store, fn) 的语义是:在 fn 执行期间(及其派生的异步任务里),getStore() 都能拿到这个 store。

问题出在 EventEmitter。res.on('finish', cb) 做的事是cb 注册成监听器,等响应发送完毕后由 EventEmitter 的事件循环触发。触发 cb 的那个 async context,是 EventEmitter 派发事件时所在的上下文——不是注册它时的请求上下文。而且响应发送通常发生在请求处理链路之后,请求对应的 als.run 作用域可能已经退出。

所以 cbals.getStore() 拿到的是「当前激活上下文」的 store,而那个上下文根本不属于这次请求,结果就是 undefined(或更糟,串到别的上下文)。

凡是「注册时一个上下文、触发时另一个上下文」的回调都有这个坑:res.on('finish')once、某些 setTimeout/setIntervalchrome.alarms 监听器等等。

解决方案

按场景给两个模式,按需选。

模式 A(推荐):同步段闭包捕获

如果你的回调只需要 store 里的某几个值(最常见就是 traceId),最简单也最可靠——在同步段(store 一定存活的时刻)把值取出来存进闭包,回调里直接用闭包变量,彻底不依赖 ALS:

app.use((req, res, next) => {
// 同步段:此时一定在 als.run 作用域内,getStore() 必有值
const traceId = als.getStore()?.traceId;
const start = Date.now();

res.on('finish', () => {
// 回调里用闭包里的 traceId,不再碰 ALS
logger.info({
traceId, // 稳定拿到
statusCode: res.statusCode,
durationMs: Date.now() - start,
}, 'request');
});

next();
});

这一步把「异步上下文是否还活着」这个不确定性,换成了一个确定的闭包引用。回调何时触发都不影响——值已经在闭包里了。

模式 B:als.run 重建上下文

当回调里要调用一坨内部都依赖 getStore() 的代码(比如 logger 的 mixin、Sentry 的 scope 注入),逐个改成闭包不现实,就在回调入口重建上下文:

res.on('finish', () => {
const traceId = capturedTraceId; // 同步段捕获的值
if (traceId) {
// 在回调内重新建立 ALS 上下文,后续 record() 内部 getStore() 能正常拿到
als.run({ traceId }, () => record(res, start));
} else {
record(res, start);
}
});

als.run(store, fn) 会为 fn 建立一个新的、独立的 async context 并把 store 绑上去,fn 内部及它派生的异步调用都能读到。这比 als.enterWith 更安全——后者改写的是「当前共享上下文」,在并发场景下会串值,那是另一个坑,见 AsyncLocalStorage 并发读到错误的值?enterWith 改用 run 隔离上下文

注意事项

  • 判断某回调是否会丢上下文,看它是不是「注册和触发分离」。res.on('finish')once、跨 tick 的 setTimeout 都要警惕;而 awaitfetch().then() 这类顺着 async chain 走的则天然继承,不用处理。
  • 模式 A 优先。它把问题降维成一个普通闭包,可读性最好,也不会引入「重建上下文」的隐式行为;只有回调内部有大量依赖 getStore() 的既有代码时,才上模式 B。
  • 别用 als.enterWith 在回调里补救——它在并发下会改写共享父上下文导致串扰,是比「丢上下文」更难查的 bug。

常见问题

为什么 res.on('finish') 回调里读不到 AsyncLocalStorage 的值?

res.on('finish', cb)cb 注册为 EventEmitter 监听器,响应发送完毕后才触发。触发时的 async context 是事件派发所在的上下文,不是注册它的请求上下文,请求的 als.run 作用域可能已退出,因此 getStore() 返回 undefined。

怎么让 EventEmitter 回调重新拿到 AsyncLocalStorage 上下文?

最简单的是在同步段把需要的值取到闭包变量,回调里直接用闭包值;如果回调内部有大量依赖 getStore() 的代码,则在回调入口用 als.run(store, fn) 重建上下文。前者优先,后者用于改造既有逻辑。

CCLEE

独立开发者,24年电商行业实战经验,专注将AI能力落地于真实商业场景。

合作咨询

Node.js AsyncLocalStorage 并发读到错误的值?enterWith 改用 run 隔离上下文

· 阅读需 6 分钟

BullMQ worker 设了 concurrency: 3,上线后发现并发的几个 job 日志和 Sentry 上报全串了——A 任务的错误堆栈挂在了 B 任务的 traceId 下,排查时对着错误的链路看了半天。

在构建 AI运营 数据分析平台时遇到此问题——为电商运营智能分析市场趋势、用户行为与销售数据,后台用 BullMQ 并发跑分析任务,每个任务都靠 AsyncLocalStorage 打 traceId 做日志关联,并发一上来 traceId 就开始错乱。

TL;DR

als.enterWith(store) 改写的是当前激活的共享父上下文,并发任务在 await 交错时会互相覆盖,最后写的那个值「赢」,所有交错的任务都读到同一个错误的值。解法是改用 als.run(store, fn) 把整个处理器包起来——它为每次调用建立独立的新上下文,退出后自动恢复,并发再高也互不干扰。

问题现象

每个 job 进来时往 ALS 里塞自己的 traceId,处理器内部(含 await)读这个 traceId 打日志、上报 Sentry:

// worker.js —— 串扰写法
new Worker('analytics', (job) => {
als.enterWith({ traceId: job.data.executionId }); // 进来就写
return processJob(job); // 内部多处 await + logger.info({ traceId: als.getStore().traceId })
}, { concurrency: 3 });

单跑没问题,concurrency: 3 一开就出诡异现象:

# job A (executionId: aaa) 与 job B (executionId: bbb) 几乎同时进入
[worker] job A start traceId=aaa
[worker] job B start traceId=bbb
# A 内部 await 让出,B 调了 enterWith({bbb}),A 恢复后:
[worker] job A step2 traceId=bbb ← 串到 B 了
[worker] job A error traceId=bbb ← Sentry 上报到 B 的链路下

不是偶发,是只要并发就稳定复现,而且 traceId 永远等于「最近一次 enterWith 写入的值」。

根因

关键在于 enterWith 改写的不是「这次调用专属」的上下文,而是当前激活的那个共享父上下文

AsyncLocalStorage 的上下文是树状的:一个 async context 可以被多个子任务共享。als.enterWith(store) 的语义是「把 store 写到我当前所处的这个 context 上」。当 worker 用 concurrency: 3 时,三个 job 的处理器共享同一个父上下文(worker 循环的上下文),于是:

  • job A 调 enterWith({aaa}) → 共享上下文被写成 aaa
  • job A await 让出执行权;
  • job B 调 enterWith({bbb}) → 同一个共享上下文被覆盖成 bbb
  • job A 恢复,读 getStore() → 拿到的是 bbb

这就是经典的「last-write-wins」串扰。await 点越多、并发越高,覆盖越频繁,错乱越严重。concurrency: 1 时看似正常,是因为根本没有交错——这也是它最坑的地方:开发时单线程调试永远发现不了。

Node 官方文档对此有明确告警:enterWith() 会产生预期外的副作用,推荐用 run() 替代

解决方案

enterWith 换成 run,并且用 run 包裹整个处理器(而不是某一段):

// worker.js —— 隔离写法
new Worker('analytics', (job) => {
return als.run(
{ traceId: job.data.executionId },
() => processJob(job), // 整个处理器都在独立上下文里跑
);
}, { concurrency: 3 });

als.run(store, fn) 的语义是:新建一个独立的 async context,把 store 绑定到它上面,在 fn 执行期间(及其派生的所有异步任务里)getStore() 都能拿到这个 store,fn 返回后上下文自动恢复到调用前的状态。

因为每次调用 run 建立的都是全新的、这次调用专属的上下文,并发任务之间天然隔离——job A 的 context 里永远是 aaa,job B 的里永远是 bbb,无论怎么在 await 处交错都不会互相覆盖。

这个改动的收益是直接的:

  • 每次调用独立快照:traceId 在进入 job 时绑定,整个处理链路(含所有 await、子函数、Sentry scope)读到的都是这个 job 自己的值;
  • 退出自动恢复:job 结束后上下文归位,不会泄漏到下一个 job 或 worker 主循环;
  • 并发安全concurrency 调到多少都不影响,行为和单线程一致。

如果处理器是抽出来的函数(比如 processWorkflowJobprocessAtomicJob),同样在 Worker 构造处包一层即可,不需要改处理器内部:

new Worker(queue, (job) => als.run({ traceId: job.data.id }, () => processWorkflowJob(job)), { concurrency });

注意事项

  • 只要存在并发(worker concurrency > 1、HTTP 并发请求、Promise.all 批处理),就别用 enterWith。它是为「单线程顺序设置一次」设计的,并发下必然串扰。
  • run 要包住整个处理器,不是只包入口的同步段——否则处理器内部的 await 之后又回到共享上下文,等于没改。
  • concurrency: 1 会掩盖这个 bug。开发时务必用目标并发数压测,否则上线才暴露。
  • 另一个 ALS 高频坑是回调里读不到值(上下文丢失),见 Node.js AsyncLocalStorage 在回调里读不到值?EventEmitter 越界丢失上下文

常见问题

als.enterWith 和 als.run 有什么区别?

enterWith 把 store 写到当前激活的共享父上下文上,后续并发的异步任务会互相覆盖;run 则为回调新建一个独立的新上下文并绑定 store,回调结束后自动恢复到之前的状态,每次调用互不干扰。Node 官方推荐用 run 替代 enterWith

为什么并发任务会读到错误的 traceId,串到别的请求?

并发任务在 await 处交错时,enterWith 写入的值会被最后一次调用覆盖,所有交错的任务读到的都是同一个错误的 traceId。改用 als.run 给每次调用建立独立上下文即可隔离,并发再高也不会串。

CCLEE

独立开发者,24年电商行业实战经验,专注将AI能力落地于真实商业场景。

合作咨询