串流中的背壓

在數據處理中常見的一個一般性問題稱為backpressure,描述了在數據傳輸期間緩衝區後面的數據積累的情況。當傳輸的接收端進行複雜操作或因某種原因速度較慢時,來源端的數據往往會累積,就像一個阻塞。

為了解決這個問題,必須建立一個委派系統,以確保從一個源頭到另一個源頭的數據流暢。不同的社群已經獨特地解決了這個問題,Unix管道和TCP套接字是其中的良好示例,通常被稱為流量控制。在Node.js中,流已成為被採用的解決方案。

本指南的目的是進一步詳細說明什麼是backpressure,以及Node.js源代碼中流是如何解決這個問題的。指南的第二部分將介紹建議的最佳實踐,以確保在實現流時,應用程序代碼是安全且優化的。

我們假設您對backpressureBufferEventEmitters在Node.js中的一般定義有一定的熟悉程度,以及對Stream的一些經驗。如果您還沒有閱讀這些文檔,先查看API文檔不失為一個壞主意,因為它將幫助擴展您對本指南的理解。

數據處理的問題

在計算機系統中,數據通過管道、套接字和信號從一個進程傳輸到另一個進程。在Node.js中,我們發現了一個類似的機制,稱為Stream。流很棒!它們為Node.js做了很多事情,幾乎每個內部代碼庫的部分都使用了該模塊。作為開發人員,您絕對鼓勵使用它們!

const readline = require('node:readline');

// process.stdin and process.stdout are both instances of Streams.
const rl = readline.createInterface({
  input: process.stdin,
  output: process.stdout,
});

rl.question('Why should you use streams? ', answer => {
  console.log(`Maybe it's ${answer}, maybe it's because they are awesome! :)`);

  rl.close();
});

透過串流實現的背壓機制是一個很好的優化範例,可以通過比較 Node.js 的內部系統工具中實現的串流來進行演示。

在一個場景中,我們將使用熟悉的 zip(1) 工具對一個大文件(大約 ~9 GB)進行壓縮。

zip The.Matrix.1080p.mkv

雖然這將需要幾分鐘的時間來完成,但在另一個 shell 中,我們可能運行一個腳本,該腳本使用 Node.js 的模組 zlib,該模組包裝了另一個壓縮工具 gzip(1)。

const gzip = require('node:zlib').createGzip();
const fs = require('node:fs');

const inp = fs.createReadStream('The.Matrix.1080p.mkv');
const out = fs.createWriteStream('The.Matrix.1080p.mkv.gz');

inp.pipe(gzip).pipe(out);

為了測試結果,請嘗試打開每個壓縮文件。由 zip(1) 工具壓縮的文件將通知您文件損壞,而由 Stream 完成的壓縮將無錯誤地解壓縮。

在這個例子中,我們使用 .pipe() 將資料源從一端傳送到另一端。然而,請注意沒有適當的錯誤處理程序附加。如果一個資料塊無法正確接收,Readable 資料源或 gzip 流就不會被銷毀。pump 是一個實用工具,如果其中一個流失敗或關閉,它將正確銷毀管道中的所有流,這在這種情況下是必需的!

pump 只對 Node.js 8.x 或更早的版本必要,對於 Node.js 10.x 或更新版本,pipeline 被引入以取代 pump。這是一個模塊方法,用於在流之間進行管道傳輸,轉發錯誤,正確清理並在管道完成時提供回調。

這是使用 pipeline 的例子

const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');

// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.
// A pipeline to gzip a potentially huge video file efficiently:

pipeline(
  fs.createReadStream('The.Matrix.1080p.mkv'),
  zlib.createGzip(),
  fs.createWriteStream('The.Matrix.1080p.mkv.gz'),
  err => {
    if (err) {
      console.error('Pipeline failed', err);
    } else {
      console.log('Pipeline succeeded');
    }
  }
);

您也可以對 pipeline 呼叫 promisify 以在 async / await 中使用它

const stream = require('node:stream');
const fs = require('node:fs');
const zlib = require('node:zlib');
const util = require('node:util');

const pipeline = util.promisify(stream.pipeline);

async function run() {
  try {
    await pipeline(
      fs.createReadStream('The.Matrix.1080p.mkv'),
      zlib.createGzip(),
      fs.createWriteStream('The.Matrix.1080p.mkv.gz')
    );
    console.log('Pipeline succeeded');
  } catch (err) {
    console.error('Pipeline failed', err);
  }
}

資料過多,速度過快

有時候,Readable 流可能會將資料傳送給 Writable 太快 —— 遠超消費者所能處理的速度!

當發生這種情況時,消費者將開始將所有資料塊排隊等待稍後消耗。寫入佇列將變得越來越長,因此必須在整個過程完成之前將更多資料保存在記憶體中。

寫入磁碟比從磁碟讀取要慢得多,因此,當我們嘗試壓縮文件並將其寫入硬碟時,會出現背壓,因為寫入磁碟無法跟上讀取的速度。

// Secretly the stream is saying: "whoa, whoa! hang on, this is way too much!"
// Data will begin to build up on the read side of the data buffer as
// `write` tries to keep up with the incoming data flow.
inp.pipe(gzip).pipe(outputFile);

這就是為什麼背壓機制很重要的原因。如果沒有背壓系統,該過程將使用掉系統的記憶體,有效地減慢其他進程的速度,並佔據系統的大部分資源直到完成。

這導致了一些問題

  • 減緩所有其他當前進程的速度
  • 非常過度使用的垃圾收集器
  • 記憶體耗盡

在以下的例子中,我們將從.write()函數中取出返回值,並將其更改為true,這樣會有效地禁用 Node.js 核心中的背壓支援。在提到 '修改後' 二進位檔案時,我們指的是執行不帶return ret;行的node二進位檔案,而是替換為return true;

對垃圾收集的過度負擔

讓我們快速看一下一個基準測試。使用上面相同的例子,我們進行了一些時間測試,以獲得兩個二進位檔案的中位數時間。

   trial (#)  | `node` binary (ms) | modified `node` binary (ms)
=================================================================
      1       |      56924         |           55011
      2       |      52686         |           55869
      3       |      59479         |           54043
      4       |      54473         |           55229
      5       |      52933         |           59723
=================================================================
average time: |      55299         |           55975

兩者運行時間大約都是一分鐘左右,因此基本上沒有什麼差異,但讓我們仔細檢查以確認我們的猜測是否正確。我們使用 Linux 工具dtrace來評估 V8 垃圾收集器的情況。

GC(垃圾收集器)測量的時間指示垃圾收集器完成單個掃描週期的間隔時間

approx. time (ms) | GC (ms) | modified GC (ms)
=================================================
          0       |    0    |      0
          1       |    0    |      0
         40       |    0    |      2
        170       |    3    |      1
        300       |    3    |      1

         *             *           *
         *             *           *
         *             *           *

      39000       |    6    |     26
      42000       |    6    |     21
      47000       |    5    |     32
      50000       |    8    |     28
      54000       |    6    |     35

儘管兩個過程開始相同並且看起來以相同的速率運行垃圾收集(GC),但明顯地,在正確運作的反壓制系統下,幾秒後,它會將GC負載分散到持續4-8毫秒的一致間隔,直到數據傳輸結束。

然而,當沒有反壓制系統時,V8垃圾收集開始拖延。正常的二進制稱為GC大約每分鐘觸發75次,而修改過的二進制僅觸發36次。

這是由於增加的記憶體使用量所積累的緩慢債務。隨著數據的傳輸,如果沒有反壓制系統,則每個塊傳輸使用的記憶體越來越多。

分配的記憶體越多,GC就需要在一次掃描中處理更多內容。掃描更大的記憶體空間以尋找分離的指針將消耗更多的計算資源。

記憶體枯竭

為了確定每個二進制檔案的記憶體消耗,我們分別使用/usr/bin/time -lp sudo ./node ./backpressure-example/zlib.js計時每個過程。

這是正常二進制的輸出

Respecting the return value of .write()
=============================================
real        58.88
user        56.79
sys          8.79
  87810048  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
     19427  page reclaims
      3134  page faults
         0  swaps
         5  block input operations
       194  block output operations
         0  messages sent
         0  messages received
         1  signals received
        12  voluntary context switches
    666037  involuntary context switches

虛擬記憶體所佔用的最大位元組大小約為87.81 MB。

現在改變回傳值.write()函式,我們得到

Without respecting the return value of .write():
==================================================
real        54.48
user        53.15
sys          7.43
1524965376  maximum resident set size
         0  average shared memory size
         0  average unshared data size
         0  average unshared stack size
    373617  page reclaims
      3139  page faults
         0  swaps
        18  block input operations
       199  block output operations
         0  messages sent
         0  messages received
         1  signals received
        25  voluntary context switches
    629566  involuntary context switches

虛擬記憶體所佔用的最大位元組大小約為1.52 GB。

如果沒有流來委派反壓制,則分配的記憶體空間將增加一個數量級 - 相同過程之間的巨大差距!

這個實驗顯示了Node.js反壓制機制在計算系統中的優化和成本效益。現在,讓我們來看看它是如何工作的!

反壓制如何解決這些問題?

從一個進程傳輸數據到另一個進程有不同的功能。在 Node.js 中,有一個內置的名為 .pipe() 的函數。還有 其他的套件 也可以使用!不過,從這個過程的基本層面來看,我們有兩個獨立的組件:數據的來源消費者

當從來源調用 .pipe() 時,它向消費者發出有數據需要傳輸的信號。該管道函數有助於為事件觸發設置適當的背壓閉包。

在 Node.js 中,來源是一個 Readable 流,而消費者是 Writable 流(這兩者可以與一個 Duplex 流或一個 Transform 流互換,但這超出了本指南的範圍)。

觸發背壓的時刻可以準確地縮小到 Writable.write() 函數的返回值。當然,這個返回值是由幾個條件決定的。

在任何數據緩衝區超過 highWaterMark 或寫入隊列當前繁忙的情況下,.write() 會返回 false

當回傳一個false值時,後壓系統會啟動。它將暫停傳入的Readable串流發送任何資料,並等待消費者再次準備好。一旦資料緩衝區被清空,將會發出一個'drain'事件,並恢復傳入的資料流。

一旦佇列完成,後壓將允許再次發送資料。被使用的記憶體空間將釋放自身並準備下一批資料。

這有效地允許在任何給定時間使用固定量的記憶體來進行.pipe()功能。不會有記憶體洩漏,也不會有無限緩衝,並且垃圾回收器只需處理一個記憶體區域!

那麼,如果後壓這麼重要,為什麼你(可能)沒聽說過它呢?嗯,答案很簡單:Node.js會自動為您處理所有這些。

這太棒了!但當我們試圖理解如何實現自己的自定義串流時,這也不是那麼棒。

在大多數機器上,有一個字節大小來確定緩衝區何時已滿(這將在不同機器上有所不同)。Node.js允許您設置自定義的highWaterMark,但通常默認設置為16kb(16384,或者對於objectMode串流為16)。在可能想要提高該值的情況下,請勇敢嘗試,但請謹慎行事!

.pipe()的生命周期

為了更好地理解背壓,這裡有一個流程圖,闡述了一個可讀流(Readable stream)被導入一個可寫流(Writable stream)的生命週期,請參考Readable被導入Writable流。

                                                     +===================+
                         x-->  Piping functions   +-->   src.pipe(dest)  |
                         x     are set up during     |===================|
                         x     the .pipe method.     |  Event callbacks  |
  +===============+      x                           |-------------------|
  |   Your Data   |      x     They exist outside    | .on('close', cb)  |
  +=======+=======+      x     the data flow, but    | .on('data', cb)   |
          |              x     importantly attach    | .on('drain', cb)  |
          |              x     events, and their     | .on('unpipe', cb) |
+---------v---------+    x     respective callbacks. | .on('error', cb)  |
|  Readable Stream  +----+                           | .on('finish', cb) |
+-^-------^-------^-+    |                           | .on('end', cb)    |
  ^       |       ^      |                           +-------------------+
  |       |       |      |
  |       ^       |      |
  ^       ^       ^      |    +-------------------+         +=================+
  ^       |       ^      +---->  Writable Stream  +--------->  .write(chunk)  |
  |       |       |           +-------------------+         +=======+=========+
  |       |       |                                                 |
  |       ^       |                              +------------------v---------+
  ^       |       +-> if (!chunk)                |    Is this chunk too big?  |
  ^       |       |     emit .end();             |    Is the queue busy?      |
  |       |       +-> else                       +-------+----------------+---+
  |       ^       |     emit .write();                   |                |
  |       ^       ^                                   +--v---+        +---v---+
  |       |       ^-----------------------------------<  No  |        |  Yes  |
  ^       |                                           +------+        +---v---+
  ^       |                                                               |
  |       ^               emit .pause();          +=================+     |
  |       ^---------------^-----------------------+  return false;  <-----+---+
  |                                               +=================+         |
  |                                                                           |
  ^            when queue is empty     +============+                         |
  ^------------^-----------------------<  Buffering |                         |
               |                       |============|                         |
               +> emit .drain();       |  ^Buffer^  |                         |
               +> emit .resume();      +------------+                         |
                                       |  ^Buffer^  |                         |
                                       +------------+   add chunk to queue    |
                                       |            <---^---------------------<
                                       +============+

如果您正在設置一個管道來串聯幾個流以操縱您的數據,您很可能會實現Transform流。

在這種情況下,您的Readable流的輸出將進入Transform,然後將被導入Writable流。

Readable.pipe(Transformable).pipe(Writable);

背壓將自動應用,但請注意,Transform流的入站和出站highWaterMark可能會被操作,並將影響背壓系統。

背壓指南

自從Node.js v0.10以來,Stream類別提供了使用下劃線版本這些相應函數(._read()._write())來修改.read().write()行為的能力。

有文檔記錄了實現可讀流實現可寫流的指南。我們假設您已經閱讀了這些內容,下一節將更深入一些。

實施自訂串流時應遵守的規則

串流的黃金法則是 始終尊重背壓。什麼構成最佳實踐是不矛盾的實踐。只要小心避免與內部背壓支援衝突的行為,你就可以確保自己遵循了良好的實踐。

一般來說,

  1. 如果沒有被要求,永遠不要 .push()
  2. 如果 .write() 返回 false 後不要呼叫,而是等待 'drain' 事件。
  3. 串流在不同的 Node.js 版本和你所使用的庫之間可能會有變化。要小心並測試事物。

關於第 3 點,用於構建瀏覽器串流的一個非常有用的套件是 readable-stream。Rodd Vagg 寫了一篇 很棒的博客文章 描述了這個庫的用途。簡而言之,它為 Readable 串流提供了一種自動優雅降級的類型,並支援舊版瀏覽器和 Node.js。

特定於可讀串流的規則

到目前為止,我們已經看過 .write() 如何影響背壓,並且大部分都專注於 Writable 串流。由於 Node.js 的功能,技術上數據從 ReadableWritable 是向下流動的。然而,就像在任何數據、物質或能量的傳輸中一樣,來源和目的地一樣重要,而 Readable 串流對於如何處理背壓至關重要。

這兩個過程彼此依賴,以有效溝通。如果可讀流 (Readable) 忽略了可寫流 (Writable) 要求停止發送數據時,這可能與 .write() 返回值不正確時一樣成為問題。

因此,除了尊重 .write() 返回值外,我們還必須尊重 .push()._read() 方法中使用的返回值。如果 .push() 返回 false 值,則流將停止從源頭讀取。否則,它將繼續進行而不間斷。

這是一個使用 .push() 的不良實踐示例。

// This is problematic as it completely ignores the return value from the push
// which may be a signal for backpressure from the destination stream!
class MyReadable extends Readable {
  _read(size) {
    let chunk;
    while (null !== (chunk = getNextChunk())) {
      this.push(chunk);
    }
  }
}

此外,在自定義流外部忽略背壓也有風險。在此良好實踐的反例中,應用程式代碼會在數據可用時(由 'data' 事件 發出信號)強制透過數據。

// This ignores the backpressure mechanisms Node.js has set in place,
// and unconditionally pushes through data, regardless if the
// destination stream is ready for it or not.
readable.on('data', data => writable.write(data));

這是使用可讀流的 .push() 的示例。

const { Readable } = require('node:stream');

// Create a custom Readable stream
const myReadableStream = new Readable({
  objectMode: true,
  read(size) {
    // Push some data onto the stream
    this.push({ message: 'Hello, world!' });
    this.push(null); // Mark the end of the stream
  },
});

// Consume the stream
myReadableStream.on('data', chunk => {
  console.log(chunk);
});

// Output:
// { message: 'Hello, world!' }

在這個範例中,我們創建了一個自定義的 Readable 流,使用 .push() 將一個對象推送到流上。當流準備好消耗數據時,將調用 ._read() 方法,在這種情況下,我們立即將一些數據推送到流上,並通過推送 null 標記流的結尾。

然後,我們通過監聽 'data' 事件來消耗流,並記錄推送到流上的每個數據塊。在這種情況下,我們只將一個數據塊推送到流上,所以我們只看到一條日誌消息。

特定於 Writable 流的規則

記住,.write() 可能會根據一些條件返回 true 或 false。對於我們來說,幸運的是,在構建我們自己的 Writable 流時,流狀態機 將處理我們的回調並決定何時處理背壓並為我們優化數據流動。

然而,當我們想直接使用 Writable 時,我們必須尊重 .write() 的返回值並密切注意這些條件。

  • 如果寫入隊列繁忙,.write() 將返回 false。
  • 如果數據塊太大,.write() 將返回 false(限制由變量 highWaterMark 指示)。
// This writable is invalid because of the async nature of JavaScript callbacks.
// Without a return statement for each callback prior to the last,
// there is a great chance multiple callbacks will be called.
class MyWritable extends Writable {
  _write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) callback();
    else if (chunk.toString().indexOf('b') >= 0) callback();
    callback();
  }
}

// The proper way to write this would be:
if (chunk.contains('a')) return callback();
if (chunk.contains('b')) return callback();
callback();

在實現 ._writev() 時還有一些要注意的事項。這個函數與 .cork() 耦合在一起,但在寫作時常見的錯誤

// Using .uncork() twice here makes two calls on the C++ layer, rendering the
// cork/uncork technique useless.
ws.cork();
ws.write('hello ');
ws.write('world ');
ws.uncork();

ws.cork();
ws.write('from ');
ws.write('Matteo');
ws.uncork();

// The correct way to write this is to utilize process.nextTick(), which fires
// on the next event loop.
ws.cork();
ws.write('hello ');
ws.write('world ');
process.nextTick(doUncork, ws);

ws.cork();
ws.write('from ');
ws.write('Matteo');
process.nextTick(doUncork, ws);

// As a global function.
function doUncork(stream) {
  stream.uncork();
}

.cork() 可以被呼叫多次,我們只需要小心呼叫相同次數的 .uncork() 以使其重新流動。

結論

在 Node.js 中,串流是一個經常被使用的模組。它們對於內部結構至關重要,對於開發人員來說,它們可以擴展並連接整個 Node.js 模組生態系統。

希望現在您能夠以背壓為考量來進行故障排除和安全編碼您自己的 WritableReadable 串流,並與同事和朋友分享您的知識。

請務必閱讀更多關於 Stream 的其他 API 函數,以幫助改善並發揮您在使用 Node.js 構建應用程序時的串流能力。