Node.js v21.7.2 文件
- Node.js v21.7.2
-
► 目錄
- 非同步內容追蹤
- 簡介
- 類別:
AsyncLocalStorage
new AsyncLocalStorage()
- 靜態方法:
AsyncLocalStorage.bind(fn)
- 靜態方法:
AsyncLocalStorage.snapshot()
asyncLocalStorage.disable()
asyncLocalStorage.getStore()
asyncLocalStorage.enterWith(store)
asyncLocalStorage.run(store, callback[, ...args])
asyncLocalStorage.exit(callback[, ...args])
- 使用
async/await
- 疑難排解:內容遺失
- 類別:
AsyncResource
new AsyncResource(type[, options])
- 靜態方法:
AsyncResource.bind(fn[, type[, thisArg]])
asyncResource.bind(fn[, thisArg])
asyncResource.runInAsyncScope(fn[, thisArg, ...args])
asyncResource.emitDestroy()
asyncResource.asyncId()
asyncResource.triggerAsyncId()
- 在
Worker
執行緒池中使用AsyncResource
- 將
AsyncResource
與EventEmitter
整合
- 非同步內容追蹤
-
► 索引
- 斷言測試
- 非同步內容追蹤
- 非同步掛勾
- 緩衝區
- C++ 外掛程式
- 使用 Node-API 的 C/C++ 外掛程式
- C++ 內嵌程式 API
- 子程序
- 叢集
- 命令列選項
- 主控台
- Corepack
- 加密
- 偵錯程式
- 已棄用的 API
- 診斷頻道
- DNS
- 網域
- 錯誤
- 事件
- 檔案系統
- 全域變數
- HTTP
- HTTP/2
- HTTPS
- 檢查器
- 國際化
- 模組:CommonJS 模組
- 模組:ECMAScript 模組
- 模組:
node:module
API - 模組:套件
- 網路
- 作業系統
- 路徑
- 效能掛勾
- 權限
- 程序
- Punycode
- 查詢字串
- Readline
- REPL
- 報告
- 單一可執行應用程式
- 串流
- 字串解碼器
- 測試執行器
- 計時器
- TLS/SSL
- 追蹤事件
- TTY
- UDP/資料報
- URL
- 公用程式
- V8
- VM
- WASI
- Web Crypto API
- Web Streams API
- 工作執行緒
- Zlib
- ► 其他版本
- ► 選項
非同步內容追蹤#
原始程式碼: lib/async_hooks.js
簡介#
這些類別用於關聯狀態並將其傳播到回呼函式和 Promise 鏈中。它們允許在 Web 要求或任何其他非同步期間儲存資料。這類似於其他語言中的執行緒局部儲存。
AsyncLocalStorage
和 AsyncResource
類別是 node:async_hooks
模組的一部分
import { AsyncLocalStorage, AsyncResource } from 'node:async_hooks';
const { AsyncLocalStorage, AsyncResource } = require('node:async_hooks');
類別:AsyncLocalStorage
#
此類別會建立在非同步作業中保持一致性的儲存。
雖然您可以在 node:async_hooks
模組之上建立自己的實作,但建議優先使用 AsyncLocalStorage
,因為它是一種效能良好且記憶體安全的實作,包含許多不容易實作的重大最佳化。
以下範例使用 AsyncLocalStorage
建立一個簡單的記錄器,會將 ID 指定給傳入的 HTTP 要求,並將其包含在每個要求中記錄的訊息中。
import http from 'node:http';
import { AsyncLocalStorage } from 'node:async_hooks';
const asyncLocalStorage = new AsyncLocalStorage();
function logWithId(msg) {
const id = asyncLocalStorage.getStore();
console.log(`${id !== undefined ? id : '-'}:`, msg);
}
let idSeq = 0;
http.createServer((req, res) => {
asyncLocalStorage.run(idSeq++, () => {
logWithId('start');
// Imagine any chain of async operations here
setImmediate(() => {
logWithId('finish');
res.end();
});
});
}).listen(8080);
http.get('https://127.0.0.1:8080');
http.get('https://127.0.0.1:8080');
// Prints:
// 0: start
// 1: start
// 0: finish
// 1: finish
const http = require('node:http');
const { AsyncLocalStorage } = require('node:async_hooks');
const asyncLocalStorage = new AsyncLocalStorage();
function logWithId(msg) {
const id = asyncLocalStorage.getStore();
console.log(`${id !== undefined ? id : '-'}:`, msg);
}
let idSeq = 0;
http.createServer((req, res) => {
asyncLocalStorage.run(idSeq++, () => {
logWithId('start');
// Imagine any chain of async operations here
setImmediate(() => {
logWithId('finish');
res.end();
});
});
}).listen(8080);
http.get('https://127.0.0.1:8080');
http.get('https://127.0.0.1:8080');
// Prints:
// 0: start
// 1: start
// 0: finish
// 1: finish
AsyncLocalStorage
的每個執行個體都會維護一個獨立的儲存內容。多個執行個體可以同時安全地存在,而不會有互相干擾資料的風險。
new AsyncLocalStorage()
#
建立 AsyncLocalStorage
的新執行個體。儲存只會在 run()
呼叫中或 enterWith()
呼叫後提供。
靜態方法:AsyncLocalStorage.bind(fn)
#
fn
<Function> 要繫結到目前執行內容的函式。- 傳回:<Function> 一個會在擷取的執行內容中呼叫
fn
的新函式。
將給定的函式繫結到目前的執行環境。
靜態方法:AsyncLocalStorage.snapshot()
#
- 傳回:<Function>一個新的函式,其簽章為
(fn: (...args) : R, ...args) : R
。
擷取目前的執行環境,並傳回一個函式,該函式接受一個函式作為參數。每當呼叫傳回的函式時,它會在擷取的環境中呼叫傳遞給它的函式。
const asyncLocalStorage = new AsyncLocalStorage();
const runInAsyncScope = asyncLocalStorage.run(123, () => AsyncLocalStorage.snapshot());
const result = asyncLocalStorage.run(321, () => runInAsyncScope(() => asyncLocalStorage.getStore()));
console.log(result); // returns 123
AsyncLocalStorage.snapshot() 可以取代 AsyncResource 的使用,以進行簡單的非同步環境追蹤,例如
class Foo {
#runInAsyncScope = AsyncLocalStorage.snapshot();
get() { return this.#runInAsyncScope(() => asyncLocalStorage.getStore()); }
}
const foo = asyncLocalStorage.run(123, () => new Foo());
console.log(asyncLocalStorage.run(321, () => foo.get())); // returns 123
asyncLocalStorage.disable()
#
停用 AsyncLocalStorage
的執行個體。後續所有對 asyncLocalStorage.getStore()
的呼叫都會傳回 undefined
,直到再次呼叫 asyncLocalStorage.run()
或 asyncLocalStorage.enterWith()
為止。
呼叫 asyncLocalStorage.disable()
時,會退出所有連結到該執行個體的目前環境。
在 asyncLocalStorage
可以被垃圾回收之前,必須呼叫 asyncLocalStorage.disable()
。這不適用於 asyncLocalStorage
提供的儲存,因為這些物件會與對應的非同步資源一起被垃圾回收。
當 asyncLocalStorage
在目前的處理程序中不再使用時,請使用這個方法。
asyncLocalStorage.getStore()
#
- 傳回:<any>
傳回目前的儲存。如果在呼叫 asyncLocalStorage.run()
或 asyncLocalStorage.enterWith()
初始化的非同步環境之外呼叫,則會傳回 undefined
。
asyncLocalStorage.enterWith(store)
#
store
<any>
轉換為當前同步執行剩餘時間的內容,然後通過任何後續的非同步呼叫保留儲存。
範例
const store = { id: 1 };
// Replaces previous store with the given store object
asyncLocalStorage.enterWith(store);
asyncLocalStorage.getStore(); // Returns the store object
someAsyncOperation(() => {
asyncLocalStorage.getStore(); // Returns the same object
});
此轉換將持續整個同步執行。這表示,例如,如果在事件處理常式中輸入內容,後續的事件處理常式也會在該內容中執行,除非特別使用 AsyncResource
繫結到其他內容。這就是為什麼除非有充分的理由使用後者方法,否則應優先使用 run()
而不是 enterWith()
。
const store = { id: 1 };
emitter.on('my-event', () => {
asyncLocalStorage.enterWith(store);
});
emitter.on('my-event', () => {
asyncLocalStorage.getStore(); // Returns the same object
});
asyncLocalStorage.getStore(); // Returns undefined
emitter.emit('my-event');
asyncLocalStorage.getStore(); // Returns the same object
asyncLocalStorage.run(store, callback[, ...args])
#
store
<any>callback
<Function>...args
<any>
在內容中同步執行函式,並傳回其傳回值。無法在回呼函式外存取儲存。任何在回呼中建立的非同步作業都可以存取儲存。
可選的 args
會傳遞給回呼函式。
如果回呼函式擲回錯誤,run()
也會擲回錯誤。堆疊追蹤不受此呼叫影響,且內容會結束。
範例
const store = { id: 2 };
try {
asyncLocalStorage.run(store, () => {
asyncLocalStorage.getStore(); // Returns the store object
setTimeout(() => {
asyncLocalStorage.getStore(); // Returns the store object
}, 200);
throw new Error();
});
} catch (e) {
asyncLocalStorage.getStore(); // Returns undefined
// The error will be caught here
}
asyncLocalStorage.exit(callback[, ...args])
#
callback
<Function>...args
<any>
在一個語境外同步執行一個函式,並傳回其傳回值。儲存區在回呼函式或在回呼函式中建立的非同步操作中無法存取。在回呼函式中執行的任何 getStore()
呼叫都會傳回 undefined
。
可選的 args
會傳遞給回呼函式。
如果回呼函式擲出錯誤,錯誤也會由 exit()
擲出。堆疊追蹤不受此呼叫影響,而且會重新進入語境。
範例
// Within a call to run
try {
asyncLocalStorage.getStore(); // Returns the store object or value
asyncLocalStorage.exit(() => {
asyncLocalStorage.getStore(); // Returns undefined
throw new Error();
});
} catch (e) {
asyncLocalStorage.getStore(); // Returns the same object or value
// The error will be caught here
}
與 async/await
一起使用#
如果在非同步函式中,只有一個 await
呼叫在一個語境中執行,應該使用下列模式
async function fn() {
await asyncLocalStorage.run(new Map(), () => {
asyncLocalStorage.getStore().set('key', value);
return foo(); // The return value of foo will be awaited
});
}
在此範例中,儲存區只在回呼函式和 foo
呼叫的函式中可用。在 run
外部,呼叫 getStore
會傳回 undefined
。
疑難排解:語境遺失#
在大部分情況下,AsyncLocalStorage
都能正常運作。在少數情況下,目前儲存區會在非同步操作之一中遺失。
如果你的程式碼是基於回呼,使用 util.promisify()
將其轉為 Promise 就足夠,這樣它就能與原生 Promise 一起運作。
如果你需要使用基於回呼的 API,或者你的程式碼假設自訂 thenable 實作,請使用 AsyncResource
類別將非同步操作與正確的執行語境關聯。透過在懷疑會導致遺失的呼叫後記錄 asyncLocalStorage.getStore()
的內容,找出導致語境遺失的函式呼叫。當程式碼記錄 undefined
時,最後呼叫的回呼函式可能是導致語境遺失的原因。
類別:AsyncResource
#
AsyncResource
類別設計為由嵌入式非同步資源擴充。使用此類別,使用者可以輕鬆觸發其自身資源的生命週期事件。
當實例化 AsyncResource
時,init
鉤子會觸發。
以下是 AsyncResource
API 的概觀。
import { AsyncResource, executionAsyncId } from 'node:async_hooks';
// AsyncResource() is meant to be extended. Instantiating a
// new AsyncResource() also triggers init. If triggerAsyncId is omitted then
// async_hook.executionAsyncId() is used.
const asyncResource = new AsyncResource(
type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false },
);
// Run a function in the execution context of the resource. This will
// * establish the context of the resource
// * trigger the AsyncHooks before callbacks
// * call the provided function `fn` with the supplied arguments
// * trigger the AsyncHooks after callbacks
// * restore the original execution context
asyncResource.runInAsyncScope(fn, thisArg, ...args);
// Call AsyncHooks destroy callbacks.
asyncResource.emitDestroy();
// Return the unique ID assigned to the AsyncResource instance.
asyncResource.asyncId();
// Return the trigger ID for the AsyncResource instance.
asyncResource.triggerAsyncId();
const { AsyncResource, executionAsyncId } = require('node:async_hooks');
// AsyncResource() is meant to be extended. Instantiating a
// new AsyncResource() also triggers init. If triggerAsyncId is omitted then
// async_hook.executionAsyncId() is used.
const asyncResource = new AsyncResource(
type, { triggerAsyncId: executionAsyncId(), requireManualDestroy: false },
);
// Run a function in the execution context of the resource. This will
// * establish the context of the resource
// * trigger the AsyncHooks before callbacks
// * call the provided function `fn` with the supplied arguments
// * trigger the AsyncHooks after callbacks
// * restore the original execution context
asyncResource.runInAsyncScope(fn, thisArg, ...args);
// Call AsyncHooks destroy callbacks.
asyncResource.emitDestroy();
// Return the unique ID assigned to the AsyncResource instance.
asyncResource.asyncId();
// Return the trigger ID for the AsyncResource instance.
asyncResource.triggerAsyncId();
new AsyncResource(type[, options])
#
範例用法
class DBQuery extends AsyncResource {
constructor(db) {
super('DBQuery');
this.db = db;
}
getInfo(query, callback) {
this.db.get(query, (err, data) => {
this.runInAsyncScope(callback, null, err, data);
});
}
close() {
this.db = null;
this.emitDestroy();
}
}
靜態方法:AsyncResource.bind(fn[, type[, thisArg]])
#
fn
<Function> 要繫結到目前執行內容的函式。類型
<字串> 與基礎AsyncResource
關聯的可選名稱。thisArg
<任何>
將給定的函式繫結到目前的執行環境。
asyncResource.bind(fn[, thisArg])
#
將指定的函式繫結到此 AsyncResource
的範圍執行。
asyncResource.runInAsyncScope(fn[, thisArg, ...args])
#
在非同步資源的執行內容中,使用提供的引數呼叫提供的函式。這將建立內容、觸發非同步掛勾在回呼之前、呼叫函式、觸發非同步掛勾在回呼之後,然後還原原始執行內容。
asyncResource.emitDestroy()
#
- 傳回:<AsyncResource>
asyncResource
的參考。
呼叫所有 destroy
掛勾。這應該只呼叫一次。如果呼叫超過一次,將擲回錯誤。這必須手動呼叫。如果資源留待 GC 收集,則 destroy
掛勾將永遠不會被呼叫。
asyncResource.asyncId()
#
- 傳回:<數字> 指定給資源的唯一
asyncId
。
asyncResource.triggerAsyncId()
#
- 傳回:<number> 傳遞給
AsyncResource
建構函式的triggerAsyncId
。
將 AsyncResource
用於 Worker
執行緒池#
下列範例顯示如何使用 AsyncResource
類別,為 Worker
池正確提供非同步追蹤。其他資源池,例如資料庫連線池,可以遵循類似的模型。
假設任務是加兩個數字,使用一個名為 task_processor.js
的檔案,內容如下
import { parentPort } from 'node:worker_threads';
parentPort.on('message', (task) => {
parentPort.postMessage(task.a + task.b);
});
const { parentPort } = require('node:worker_threads');
parentPort.on('message', (task) => {
parentPort.postMessage(task.a + task.b);
});
它周圍的 Worker 池可以使用下列結構
import { AsyncResource } from 'node:async_hooks';
import { EventEmitter } from 'node:events';
import path from 'node:path';
import { Worker } from 'node:worker_threads';
const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');
class WorkerPoolTaskInfo extends AsyncResource {
constructor(callback) {
super('WorkerPoolTaskInfo');
this.callback = callback;
}
done(err, result) {
this.runInAsyncScope(this.callback, null, err, result);
this.emitDestroy(); // `TaskInfo`s are used only once.
}
}
export default class WorkerPool extends EventEmitter {
constructor(numThreads) {
super();
this.numThreads = numThreads;
this.workers = [];
this.freeWorkers = [];
this.tasks = [];
for (let i = 0; i < numThreads; i++)
this.addNewWorker();
// Any time the kWorkerFreedEvent is emitted, dispatch
// the next task pending in the queue, if any.
this.on(kWorkerFreedEvent, () => {
if (this.tasks.length > 0) {
const { task, callback } = this.tasks.shift();
this.runTask(task, callback);
}
});
}
addNewWorker() {
const worker = new Worker(new URL('task_processor.js', import.meta.url));
worker.on('message', (result) => {
// In case of success: Call the callback that was passed to `runTask`,
// remove the `TaskInfo` associated with the Worker, and mark it as free
// again.
worker[kTaskInfo].done(null, result);
worker[kTaskInfo] = null;
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
});
worker.on('error', (err) => {
// In case of an uncaught exception: Call the callback that was passed to
// `runTask` with the error.
if (worker[kTaskInfo])
worker[kTaskInfo].done(err, null);
else
this.emit('error', err);
// Remove the worker from the list and start a new Worker to replace the
// current one.
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
});
this.workers.push(worker);
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
}
runTask(task, callback) {
if (this.freeWorkers.length === 0) {
// No free threads, wait until a worker thread becomes free.
this.tasks.push({ task, callback });
return;
}
const worker = this.freeWorkers.pop();
worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
worker.postMessage(task);
}
close() {
for (const worker of this.workers) worker.terminate();
}
}
const { AsyncResource } = require('node:async_hooks');
const { EventEmitter } = require('node:events');
const path = require('node:path');
const { Worker } = require('node:worker_threads');
const kTaskInfo = Symbol('kTaskInfo');
const kWorkerFreedEvent = Symbol('kWorkerFreedEvent');
class WorkerPoolTaskInfo extends AsyncResource {
constructor(callback) {
super('WorkerPoolTaskInfo');
this.callback = callback;
}
done(err, result) {
this.runInAsyncScope(this.callback, null, err, result);
this.emitDestroy(); // `TaskInfo`s are used only once.
}
}
class WorkerPool extends EventEmitter {
constructor(numThreads) {
super();
this.numThreads = numThreads;
this.workers = [];
this.freeWorkers = [];
this.tasks = [];
for (let i = 0; i < numThreads; i++)
this.addNewWorker();
// Any time the kWorkerFreedEvent is emitted, dispatch
// the next task pending in the queue, if any.
this.on(kWorkerFreedEvent, () => {
if (this.tasks.length > 0) {
const { task, callback } = this.tasks.shift();
this.runTask(task, callback);
}
});
}
addNewWorker() {
const worker = new Worker(path.resolve(__dirname, 'task_processor.js'));
worker.on('message', (result) => {
// In case of success: Call the callback that was passed to `runTask`,
// remove the `TaskInfo` associated with the Worker, and mark it as free
// again.
worker[kTaskInfo].done(null, result);
worker[kTaskInfo] = null;
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
});
worker.on('error', (err) => {
// In case of an uncaught exception: Call the callback that was passed to
// `runTask` with the error.
if (worker[kTaskInfo])
worker[kTaskInfo].done(err, null);
else
this.emit('error', err);
// Remove the worker from the list and start a new Worker to replace the
// current one.
this.workers.splice(this.workers.indexOf(worker), 1);
this.addNewWorker();
});
this.workers.push(worker);
this.freeWorkers.push(worker);
this.emit(kWorkerFreedEvent);
}
runTask(task, callback) {
if (this.freeWorkers.length === 0) {
// No free threads, wait until a worker thread becomes free.
this.tasks.push({ task, callback });
return;
}
const worker = this.freeWorkers.pop();
worker[kTaskInfo] = new WorkerPoolTaskInfo(callback);
worker.postMessage(task);
}
close() {
for (const worker of this.workers) worker.terminate();
}
}
module.exports = WorkerPool;
沒有 WorkerPoolTaskInfo
物件新增的明確追蹤,看起來回呼函式與個別 Worker
物件相關聯。不過,建立 Worker
與建立任務無關,也沒有提供關於何時排定任務的資訊。
這個池可以如下使用
import WorkerPool from './worker_pool.js';
import os from 'node:os';
const pool = new WorkerPool(os.availableParallelism());
let finished = 0;
for (let i = 0; i < 10; i++) {
pool.runTask({ a: 42, b: 100 }, (err, result) => {
console.log(i, err, result);
if (++finished === 10)
pool.close();
});
}
const WorkerPool = require('./worker_pool.js');
const os = require('node:os');
const pool = new WorkerPool(os.availableParallelism());
let finished = 0;
for (let i = 0; i < 10; i++) {
pool.runTask({ a: 42, b: 100 }, (err, result) => {
console.log(i, err, result);
if (++finished === 10)
pool.close();
});
}
將 AsyncResource
與 EventEmitter
整合#
EventEmitter
觸發的事件監聽器可能會在與呼叫 eventEmitter.on()
時不同的執行緒環境中執行。
下列範例顯示如何使用 AsyncResource
類別,將事件監聽器正確與正確的執行緒環境關聯。相同的做法可以套用在 Stream
或類似的事件驅動類別。
import { createServer } from 'node:http';
import { AsyncResource, executionAsyncId } from 'node:async_hooks';
const server = createServer((req, res) => {
req.on('close', AsyncResource.bind(() => {
// Execution context is bound to the current outer scope.
}));
req.on('close', () => {
// Execution context is bound to the scope that caused 'close' to emit.
});
res.end();
}).listen(3000);
const { createServer } = require('node:http');
const { AsyncResource, executionAsyncId } = require('node:async_hooks');
const server = createServer((req, res) => {
req.on('close', AsyncResource.bind(() => {
// Execution context is bound to the current outer scope.
}));
req.on('close', () => {
// Execution context is bound to the scope that caused 'close' to emit.
});
res.end();
}).listen(3000);