串流中的背壓
在數據處理中常見的一個一般性問題稱為backpressure
,描述了在數據傳輸期間緩衝區後面的數據積累的情況。當傳輸的接收端進行複雜操作或因某種原因速度較慢時,來源端的數據往往會累積,就像一個阻塞。
為了解決這個問題,必須建立一個委派系統,以確保從一個源頭到另一個源頭的數據流暢。不同的社群已經獨特地解決了這個問題,Unix管道和TCP套接字是其中的良好示例,通常被稱為流量控制。在Node.js中,流已成為被採用的解決方案。
本指南的目的是進一步詳細說明什麼是backpressure,以及Node.js源代碼中流是如何解決這個問題的。指南的第二部分將介紹建議的最佳實踐,以確保在實現流時,應用程序代碼是安全且優化的。
我們假設您對backpressure
、Buffer
和EventEmitters
在Node.js中的一般定義有一定的熟悉程度,以及對Stream
的一些經驗。如果您還沒有閱讀這些文檔,先查看API文檔不失為一個壞主意,因為它將幫助擴展您對本指南的理解。
數據處理的問題
在計算機系統中,數據通過管道、套接字和信號從一個進程傳輸到另一個進程。在Node.js中,我們發現了一個類似的機制,稱為Stream
。流很棒!它們為Node.js做了很多事情,幾乎每個內部代碼庫的部分都使用了該模塊。作為開發人員,您絕對鼓勵使用它們!
const readline = require('node:readline');
// process.stdin and process.stdout are both instances of Streams.
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});
rl.question('Why should you use streams? ', answer => {
console.log(`Maybe it's ${answer}, maybe it's because they are awesome! :)`);
rl.close();
});
透過串流實現的背壓機制是一個很好的優化範例,可以通過比較 Node.js 的內部系統工具中實現的串流來進行演示。
在一個場景中,我們將使用熟悉的 zip(1) 工具對一個大文件(大約 ~9 GB)進行壓縮。
zip The.Matrix.1080p.mkv
雖然這將需要幾分鐘的時間來完成,但在另一個 shell 中,我們可能運行一個腳本,該腳本使用 Node.js 的模組 zlib,該模組包裝了另一個壓縮工具 gzip(1)。
const gzip = require('node:zlib').createGzip();
const fs = require('node:fs');
const inp = fs.createReadStream('The.Matrix.1080p.mkv');
const out = fs.createWriteStream('The.Matrix.1080p.mkv.gz');
inp.pipe(gzip).pipe(out);
為了測試結果,請嘗試打開每個壓縮文件。由 zip(1) 工具壓縮的文件將通知您文件損壞,而由 Stream 完成的壓縮將無錯誤地解壓縮。
在這個例子中,我們使用
.pipe()
將資料源從一端傳送到另一端。然而,請注意沒有適當的錯誤處理程序附加。如果一個資料塊無法正確接收,Readable
資料源或gzip
流就不會被銷毀。pump
是一個實用工具,如果其中一個流失敗或關閉,它將正確銷毀管道中的所有流,這在這種情況下是必需的!
pump
只對 Node.js 8.x 或更早的版本必要,對於 Node.js 10.x 或更新版本,pipeline
被引入以取代 pump
。這是一個模塊方法,用於在流之間進行管道傳輸,轉發錯誤,正確清理並在管道完成時提供回調。
這是使用 pipeline 的例子
const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');
// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.
// A pipeline to gzip a potentially huge video file efficiently:
pipeline(
fs.createReadStream('The.Matrix.1080p.mkv'),
zlib.createGzip(),
fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
err => {
if (err) {
console.error('Pipeline failed', err);
} else {
console.log('Pipeline succeeded');
}
}
);
您也可以對 pipeline 呼叫 promisify
以在 async
/ await
中使用它
const stream = require('node:stream');
const fs = require('node:fs');
const zlib = require('node:zlib');
const util = require('node:util');
const pipeline = util.promisify(stream.pipeline);
async function run() {
try {
await pipeline(
fs.createReadStream('The.Matrix.1080p.mkv'),
zlib.createGzip(),
fs.createWriteStream('The.Matrix.1080p.mkv.gz')
);
console.log('Pipeline succeeded');
} catch (err) {
console.error('Pipeline failed', err);
}
}
資料過多,速度過快
有時候,Readable
流可能會將資料傳送給 Writable
太快 —— 遠超消費者所能處理的速度!
當發生這種情況時,消費者將開始將所有資料塊排隊等待稍後消耗。寫入佇列將變得越來越長,因此必須在整個過程完成之前將更多資料保存在記憶體中。
寫入磁碟比從磁碟讀取要慢得多,因此,當我們嘗試壓縮文件並將其寫入硬碟時,會出現背壓,因為寫入磁碟無法跟上讀取的速度。
// Secretly the stream is saying: "whoa, whoa! hang on, this is way too much!"
// Data will begin to build up on the read side of the data buffer as
// `write` tries to keep up with the incoming data flow.
inp.pipe(gzip).pipe(outputFile);
這就是為什麼背壓機制很重要的原因。如果沒有背壓系統,該過程將使用掉系統的記憶體,有效地減慢其他進程的速度,並佔據系統的大部分資源直到完成。
這導致了一些問題
- 減緩所有其他當前進程的速度
- 非常過度使用的垃圾收集器
- 記憶體耗盡
在以下的例子中,我們將從.write()
函數中取出返回值,並將其更改為true
,這樣會有效地禁用 Node.js 核心中的背壓支援。在提到 '修改後' 二進位檔案時,我們指的是執行不帶return ret;
行的node
二進位檔案,而是替換為return true;
。
對垃圾收集的過度負擔
讓我們快速看一下一個基準測試。使用上面相同的例子,我們進行了一些時間測試,以獲得兩個二進位檔案的中位數時間。
trial (#) | `node` binary (ms) | modified `node` binary (ms)
=================================================================
1 | 56924 | 55011
2 | 52686 | 55869
3 | 59479 | 54043
4 | 54473 | 55229
5 | 52933 | 59723
=================================================================
average time: | 55299 | 55975
兩者運行時間大約都是一分鐘左右,因此基本上沒有什麼差異,但讓我們仔細檢查以確認我們的猜測是否正確。我們使用 Linux 工具dtrace
來評估 V8 垃圾收集器的情況。
GC(垃圾收集器)測量的時間指示垃圾收集器完成單個掃描週期的間隔時間
approx. time (ms) | GC (ms) | modified GC (ms)
=================================================
0 | 0 | 0
1 | 0 | 0
40 | 0 | 2
170 | 3 | 1
300 | 3 | 1
* * *
* * *
* * *
39000 | 6 | 26
42000 | 6 | 21
47000 | 5 | 32
50000 | 8 | 28
54000 | 6 | 35
儘管兩個過程開始相同並且看起來以相同的速率運行垃圾收集(GC),但明顯地,在正確運作的反壓制系統下,幾秒後,它會將GC負載分散到持續4-8毫秒的一致間隔,直到數據傳輸結束。
然而,當沒有反壓制系統時,V8垃圾收集開始拖延。正常的二進制稱為GC大約每分鐘觸發75次,而修改過的二進制僅觸發36次。
這是由於增加的記憶體使用量所積累的緩慢債務。隨著數據的傳輸,如果沒有反壓制系統,則每個塊傳輸使用的記憶體越來越多。
分配的記憶體越多,GC就需要在一次掃描中處理更多內容。掃描更大的記憶體空間以尋找分離的指針將消耗更多的計算資源。
記憶體枯竭
為了確定每個二進制檔案的記憶體消耗,我們分別使用/usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js
計時每個過程。
這是正常二進制的輸出
Respecting the return value of .write()
=============================================
real 58.88
user 56.79
sys 8.79
87810048 maximum resident set size
0 average shared memory size
0 average unshared data size
0 average unshared stack size
19427 page reclaims
3134 page faults
0 swaps
5 block input operations
194 block output operations
0 messages sent
0 messages received
1 signals received
12 voluntary context switches
666037 involuntary context switches
虛擬記憶體所佔用的最大位元組大小約為87.81 MB。
Without respecting the return value of .write():
==================================================
real 54.48
user 53.15
sys 7.43
1524965376 maximum resident set size
0 average shared memory size
0 average unshared data size
0 average unshared stack size
373617 page reclaims
3139 page faults
0 swaps
18 block input operations
199 block output operations
0 messages sent
0 messages received
1 signals received
25 voluntary context switches
629566 involuntary context switches
虛擬記憶體所佔用的最大位元組大小約為1.52 GB。
如果沒有流來委派反壓制,則分配的記憶體空間將增加一個數量級 - 相同過程之間的巨大差距!
這個實驗顯示了Node.js反壓制機制在計算系統中的優化和成本效益。現在,讓我們來看看它是如何工作的!
反壓制如何解決這些問題?
從一個進程傳輸數據到另一個進程有不同的功能。在 Node.js 中,有一個內置的名為 .pipe()
的函數。還有 其他的套件 也可以使用!不過,從這個過程的基本層面來看,我們有兩個獨立的組件:數據的來源和消費者。
當從來源調用 .pipe()
時,它向消費者發出有數據需要傳輸的信號。該管道函數有助於為事件觸發設置適當的背壓閉包。
在 Node.js 中,來源是一個 Readable
流,而消費者是 Writable
流(這兩者可以與一個 Duplex
流或一個 Transform
流互換,但這超出了本指南的範圍)。
觸發背壓的時刻可以準確地縮小到 Writable
的 .write()
函數的返回值。當然,這個返回值是由幾個條件決定的。
在任何數據緩衝區超過 highWaterMark
或寫入隊列當前繁忙的情況下,.write()
會返回 false
。
當回傳一個false
值時,後壓系統會啟動。它將暫停傳入的Readable
串流發送任何資料,並等待消費者再次準備好。一旦資料緩衝區被清空,將會發出一個'drain'
事件,並恢復傳入的資料流。
一旦佇列完成,後壓將允許再次發送資料。被使用的記憶體空間將釋放自身並準備下一批資料。
這有效地允許在任何給定時間使用固定量的記憶體來進行.pipe()
功能。不會有記憶體洩漏,也不會有無限緩衝,並且垃圾回收器只需處理一個記憶體區域!
那麼,如果後壓這麼重要,為什麼你(可能)沒聽說過它呢?嗯,答案很簡單:Node.js會自動為您處理所有這些。
這太棒了!但當我們試圖理解如何實現自己的自定義串流時,這也不是那麼棒。
在大多數機器上,有一個字節大小來確定緩衝區何時已滿(這將在不同機器上有所不同)。Node.js允許您設置自定義的
highWaterMark
,但通常默認設置為16kb(16384,或者對於objectMode串流為16)。在可能想要提高該值的情況下,請勇敢嘗試,但請謹慎行事!
.pipe()
的生命周期
為了更好地理解背壓,這裡有一個流程圖,闡述了一個可讀流(Readable stream)被導入一個可寫流(Writable stream)的生命週期,請參考Readable
流被導入到Writable
流。
+===================+
x--> Piping functions +--> src.pipe(dest) |
x are set up during |===================|
x the .pipe method. | Event callbacks |
+===============+ x |-------------------|
| Your Data | x They exist outside | .on('close', cb) |
+=======+=======+ x the data flow, but | .on('data', cb) |
| x importantly attach | .on('drain', cb) |
| x events, and their | .on('unpipe', cb) |
+---------v---------+ x respective callbacks. | .on('error', cb) |
| Readable Stream +----+ | .on('finish', cb) |
+-^-------^-------^-+ | | .on('end', cb) |
^ | ^ | +-------------------+
| | | |
| ^ | |
^ ^ ^ | +-------------------+ +=================+
^ | ^ +----> Writable Stream +---------> .write(chunk) |
| | | +-------------------+ +=======+=========+
| | | |
| ^ | +------------------v---------+
^ | +-> if (!chunk) | Is this chunk too big? |
^ | | emit .end(); | Is the queue busy? |
| | +-> else +-------+----------------+---+
| ^ | emit .write(); | |
| ^ ^ +--v---+ +---v---+
| | ^-----------------------------------< No | | Yes |
^ | +------+ +---v---+
^ | |
| ^ emit .pause(); +=================+ |
| ^---------------^-----------------------+ return false; <-----+---+
| +=================+ |
| |
^ when queue is empty +============+ |
^------------^-----------------------< Buffering | |
| |============| |
+> emit .drain(); | ^Buffer^ | |
+> emit .resume(); +------------+ |
| ^Buffer^ | |
+------------+ add chunk to queue |
| <---^---------------------<
+============+
如果您正在設置一個管道來串聯幾個流以操縱您的數據,您很可能會實現
Transform
流。
在這種情況下,您的Readable
流的輸出將進入Transform
,然後將被導入Writable
流。
Readable.pipe(Transformable).pipe(Writable);
背壓將自動應用,但請注意,Transform
流的入站和出站highWaterMark
可能會被操作,並將影響背壓系統。
背壓指南
自從Node.js v0.10以來,Stream
類別提供了使用下劃線版本這些相應函數(._read()
和._write()
)來修改.read()
或.write()
行為的能力。
有文檔記錄了實現可讀流和實現可寫流的指南。我們假設您已經閱讀了這些內容,下一節將更深入一些。
實施自訂串流時應遵守的規則
串流的黃金法則是 始終尊重背壓。什麼構成最佳實踐是不矛盾的實踐。只要小心避免與內部背壓支援衝突的行為,你就可以確保自己遵循了良好的實踐。
一般來說,
- 如果沒有被要求,永遠不要
.push()
。 - 如果
.write()
返回 false 後不要呼叫,而是等待 'drain' 事件。 - 串流在不同的 Node.js 版本和你所使用的庫之間可能會有變化。要小心並測試事物。
關於第 3 點,用於構建瀏覽器串流的一個非常有用的套件是
readable-stream
。Rodd Vagg 寫了一篇 很棒的博客文章 描述了這個庫的用途。簡而言之,它為Readable
串流提供了一種自動優雅降級的類型,並支援舊版瀏覽器和 Node.js。
特定於可讀串流的規則
到目前為止,我們已經看過 .write()
如何影響背壓,並且大部分都專注於 Writable
串流。由於 Node.js 的功能,技術上數據從 Readable
到 Writable
是向下流動的。然而,就像在任何數據、物質或能量的傳輸中一樣,來源和目的地一樣重要,而 Readable
串流對於如何處理背壓至關重要。
這兩個過程彼此依賴,以有效溝通。如果可讀流 (Readable
) 忽略了可寫流 (Writable
) 要求停止發送數據時,這可能與 .write()
返回值不正確時一樣成為問題。
因此,除了尊重 .write()
返回值外,我們還必須尊重 .push()
在 ._read()
方法中使用的返回值。如果 .push()
返回 false
值,則流將停止從源頭讀取。否則,它將繼續進行而不間斷。
這是一個使用 .push()
的不良實踐示例。
// This is problematic as it completely ignores the return value from the push
// which may be a signal for backpressure from the destination stream!
class MyReadable extends Readable {
_read(size) {
let chunk;
while (null !== (chunk = getNextChunk())) {
this.push(chunk);
}
}
}
此外,在自定義流外部忽略背壓也有風險。在此良好實踐的反例中,應用程式代碼會在數據可用時(由 'data'
事件 發出信號)強制透過數據。
// This ignores the backpressure mechanisms Node.js has set in place,
// and unconditionally pushes through data, regardless if the
// destination stream is ready for it or not.
readable.on('data', data => writable.write(data));
這是使用可讀流的 .push()
的示例。
const { Readable } = require('node:stream');
// Create a custom Readable stream
const myReadableStream = new Readable({
objectMode: true,
read(size) {
// Push some data onto the stream
this.push({ message: 'Hello, world!' });
this.push(null); // Mark the end of the stream
},
});
// Consume the stream
myReadableStream.on('data', chunk => {
console.log(chunk);
});
// Output:
// { message: 'Hello, world!' }
在這個範例中,我們創建了一個自定義的 Readable 流,使用 .push()
將一個對象推送到流上。當流準備好消耗數據時,將調用 ._read()
方法,在這種情況下,我們立即將一些數據推送到流上,並通過推送 null 標記流的結尾。
然後,我們通過監聽 'data' 事件來消耗流,並記錄推送到流上的每個數據塊。在這種情況下,我們只將一個數據塊推送到流上,所以我們只看到一條日誌消息。
特定於 Writable 流的規則
記住,.write()
可能會根據一些條件返回 true 或 false。對於我們來說,幸運的是,在構建我們自己的 Writable
流時,流狀態機
將處理我們的回調並決定何時處理背壓並為我們優化數據流動。
然而,當我們想直接使用 Writable
時,我們必須尊重 .write()
的返回值並密切注意這些條件。
- 如果寫入隊列繁忙,
.write()
將返回 false。 - 如果數據塊太大,
.write()
將返回 false(限制由變量highWaterMark
指示)。
// This writable is invalid because of the async nature of JavaScript callbacks.
// Without a return statement for each callback prior to the last,
// there is a great chance multiple callbacks will be called.
class MyWritable extends Writable {
_write(chunk, encoding, callback) {
if (chunk.toString().indexOf('a') >= 0) callback();
else if (chunk.toString().indexOf('b') >= 0) callback();
callback();
}
}
// The proper way to write this would be:
if (chunk.contains('a')) return callback();
if (chunk.contains('b')) return callback();
callback();
在實現 ._writev()
時還有一些要注意的事項。這個函數與 .cork()
耦合在一起,但在寫作時常見的錯誤
// Using .uncork() twice here makes two calls on the C++ layer, rendering the
// cork/uncork technique useless.
ws.cork();
ws.write('hello ');
ws.write('world ');
ws.uncork();
ws.cork();
ws.write('from ');
ws.write('Matteo');
ws.uncork();
// The correct way to write this is to utilize process.nextTick(), which fires
// on the next event loop.
ws.cork();
ws.write('hello ');
ws.write('world ');
process.nextTick(doUncork, ws);
ws.cork();
ws.write('from ');
ws.write('Matteo');
process.nextTick(doUncork, ws);
// As a global function.
function doUncork(stream) {
stream.uncork();
}
.cork()
可以被呼叫多次,我們只需要小心呼叫相同次數的 .uncork()
以使其重新流動。
結論
在 Node.js 中,串流是一個經常被使用的模組。它們對於內部結構至關重要,對於開發人員來說,它們可以擴展並連接整個 Node.js 模組生態系統。
希望現在您能夠以背壓為考量來進行故障排除和安全編碼您自己的 Writable
和 Readable
串流,並與同事和朋友分享您的知識。
請務必閱讀更多關於 Stream
的其他 API 函數,以幫助改善並發揮您在使用 Node.js 構建應用程序時的串流能力。