Stream[src]#

穩定性:2 - 穩定

原始碼: lib/stream.js

串流是 Node.js 中用於處理串流資料的抽象介面。node:stream 模組提供了一個用於實作串流介面的 API。

Node.js 提供了許多串流物件。例如,HTTP 伺服器要求process.stdout 都是串流實例。

串流可以是可讀、可寫或兩者皆可。所有串流都是 EventEmitter 的實例。

存取 node:stream 模組

const stream = require('node:stream'); 

node:stream 模組對於建立新的串流實例類型很有用。通常不需要使用 node:stream 模組來使用串流。

本文組織#

本文包含兩個主要區段和一個筆記區段。第一個區段說明如何在應用程式中使用現有的串流。第二個區段說明如何建立新的串流類型。

串流類型#

Node.js 中有四種基本的串流類型

此外,此模組包含實用函式 stream.pipeline()stream.finished()stream.Readable.from()stream.addAbortSignal()

串流 Promises API#

stream/promises API 提供另一組非同步實用函式,這些函式會傳回 Promise 物件,而不是使用回呼。可透過 require('node:stream/promises')require('node:stream').promises 存取 API。

stream.pipeline(source[, ...transforms], destination[, options])#

stream.pipeline(streams[, options])#

const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');

async function run() {
  await pipeline(
    fs.createReadStream('archive.tar'),
    zlib.createGzip(),
    fs.createWriteStream('archive.tar.gz'),
  );
  console.log('Pipeline succeeded.');
}

run().catch(console.error);import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

await pipeline(
  createReadStream('archive.tar'),
  createGzip(),
  createWriteStream('archive.tar.gz'),
);
console.log('Pipeline succeeded.');

若要使用 AbortSignal,請將其傳遞至選項物件內,作為最後一個參數。當訊號中斷時,將呼叫 destroy 於基礎管線上,並附帶 AbortError

const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');
const zlib = require('node:zlib');

async function run() {
  const ac = new AbortController();
  const signal = ac.signal;

  setImmediate(() => ac.abort());
  await pipeline(
    fs.createReadStream('archive.tar'),
    zlib.createGzip(),
    fs.createWriteStream('archive.tar.gz'),
    { signal },
  );
}

run().catch(console.error); // AbortErrorimport { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';

const ac = new AbortController();
const { signal } = ac;
setImmediate(() => ac.abort());
try {
  await pipeline(
    createReadStream('archive.tar'),
    createGzip(),
    createWriteStream('archive.tar.gz'),
    { signal },
  );
} catch (err) {
  console.error(err); // AbortError
}

pipeline API 也支援非同步產生器

const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');

async function run() {
  await pipeline(
    fs.createReadStream('lowercase.txt'),
    async function* (source, { signal }) {
      source.setEncoding('utf8');  // Work with strings rather than `Buffer`s.
      for await (const chunk of source) {
        yield await processChunk(chunk, { signal });
      }
    },
    fs.createWriteStream('uppercase.txt'),
  );
  console.log('Pipeline succeeded.');
}

run().catch(console.error);import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';

await pipeline(
  createReadStream('lowercase.txt'),
  async function* (source, { signal }) {
    source.setEncoding('utf8');  // Work with strings rather than `Buffer`s.
    for await (const chunk of source) {
      yield await processChunk(chunk, { signal });
    }
  },
  createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');

記得處理傳遞到非同步產生器的 signal 參數。特別是在非同步產生器是管線來源 (即第一個參數) 或管線永遠不會完成的情況下。

const { pipeline } = require('node:stream/promises');
const fs = require('node:fs');

async function run() {
  await pipeline(
    async function* ({ signal }) {
      await someLongRunningfn({ signal });
      yield 'asd';
    },
    fs.createWriteStream('uppercase.txt'),
  );
  console.log('Pipeline succeeded.');
}

run().catch(console.error);import { pipeline } from 'node:stream/promises';
import fs from 'node:fs';
await pipeline(
  async function* ({ signal }) {
    await someLongRunningfn({ signal });
    yield 'asd';
  },
  fs.createWriteStream('uppercase.txt'),
);
console.log('Pipeline succeeded.');

pipeline API 提供 回呼版本

stream.finished(stream[, options])#

const { finished } = require('node:stream/promises');
const fs = require('node:fs');

const rs = fs.createReadStream('archive.tar');

async function run() {
  await finished(rs);
  console.log('Stream is done reading.');
}

run().catch(console.error);
rs.resume(); // Drain the stream.import { finished } from 'node:stream/promises';
import { createReadStream } from 'node:fs';

const rs = createReadStream('archive.tar');

async function run() {
  await finished(rs);
  console.log('Stream is done reading.');
}

run().catch(console.error);
rs.resume(); // Drain the stream.

finished API 也提供 回呼版本

物件模式#

Node.js API 建立的所有串流都只在字串和 Buffer (或 Uint8Array) 物件上操作。不過,串流實作有可能使用其他類型的 JavaScript 值 (除了 null,它在串流中具有特殊用途)。此類串流被視為在「物件模式」中操作。

串流實例在串流建立時使用 objectMode 選項切換成物件模式。嘗試將現有的串流切換成物件模式並不安全。

緩衝#

WritableReadable 串流都會在內部緩衝區儲存資料。

潛在緩衝的資料量取決於傳遞給串流建構函式的 highWaterMark 選項。對於一般串流,highWaterMark 選項指定 總位元組數。對於在物件模式下運作的串流,highWaterMark 指定總物件數。

當實作呼叫 stream.push(chunk) 時,資料會緩衝在 Readable 串流中。如果串流的使用者沒有呼叫 stream.read(),資料會停留在內部佇列中,直到被使用完。

一旦內部讀取緩衝區的總大小達到 highWaterMark 指定的臨界值,串流會暫時停止從底層資源讀取資料,直到目前緩衝的資料可以使用為止(也就是說,串流會停止呼叫用於填滿讀取緩衝區的內部 readable._read() 方法)。

當重複呼叫 writable.write(chunk) 方法時,資料會緩衝在 Writable 串流中。當內部寫入緩衝區的總大小低於 highWaterMark 設定的臨界值時,呼叫 writable.write() 會傳回 true。一旦內部緩衝區的大小達到或超過 highWaterMark,就會傳回 false

stream API 的主要目標,特別是 stream.pipe() 方法,是要將資料緩衝限制在可接受的層級,以避免速度不同的來源和目的地淹沒可用的記憶體。

highWaterMark 選項是一個臨界值,而不是限制:它決定串流緩衝多少資料,然後才會停止要求更多資料。它通常不會強制執行嚴格的記憶體限制。特定的串流實作可能會選擇強制執行更嚴格的限制,但這不是強制性的。

由於 DuplexTransform 串流都是 ReadableWritable,因此每個串流都維護兩個用於讀取和寫入的獨立內部緩衝區,允許每一側獨立於另一側操作,同時維護適當且有效的資料流。例如,net.Socket 實例是 Duplex 串流,其 Readable 側允許使用從 socket 接收 的資料,而其 Writable 側允許將資料寫入 socket。由於寫入 socket 的資料速度可能比接收資料的速度快或慢,因此每一側都應該獨立於另一側操作(和緩衝)。

內部緩衝的機制是內部實作的詳細資訊,可能會隨時變更。但是,對於某些進階實作,可以使用 writable.writableBufferreadable.readableBuffer 擷取內部緩衝區。不建議使用這些未記載的屬性。

串流使用者的 API#

幾乎所有 Node.js 應用程式,無論多麼簡單,都會以某種方式使用串流。以下是 Node.js 應用程式中使用串流的範例,該應用程式實作 HTTP 伺服器

const http = require('node:http');

const server = http.createServer((req, res) => {
  // `req` is an http.IncomingMessage, which is a readable stream.
  // `res` is an http.ServerResponse, which is a writable stream.

  let body = '';
  // Get the data as utf8 strings.
  // If an encoding is not set, Buffer objects will be received.
  req.setEncoding('utf8');

  // Readable streams emit 'data' events once a listener is added.
  req.on('data', (chunk) => {
    body += chunk;
  });

  // The 'end' event indicates that the entire body has been received.
  req.on('end', () => {
    try {
      const data = JSON.parse(body);
      // Write back something interesting to the user:
      res.write(typeof data);
      res.end();
    } catch (er) {
      // uh oh! bad json!
      res.statusCode = 400;
      return res.end(`error: ${er.message}`);
    }
  });
});

server.listen(1337);

// $ curl localhost:1337 -d "{}"
// object
// $ curl localhost:1337 -d "\"foo\""
// string
// $ curl localhost:1337 -d "not json"
// error: Unexpected token 'o', "not json" is not valid JSON 

Writable 串流(例如範例中的 res)公開方法,例如 write()end(),用於將資料寫入串流。

Readable 串流使用 EventEmitter API,在串流有可讀取資料時通知應用程式碼。可透過多種方式從串流讀取可用的資料。

WritableReadable 串流都以各種方式使用 EventEmitter API 來傳達串流的目前狀態。

DuplexTransform 串流同時是 WritableReadable

將資料寫入串流或從串流使用資料的應用程式不需要直接實作串流介面,而且通常不需要呼叫 require('node:stream')

想要實作新類型的串流的開發人員應參閱 串流實作者的 API 部分。

可寫入串流#

可寫入串流是將資料寫入的目的地的抽象概念。

Writable 串流的範例包括

其中一些範例實際上是實作 Writable 介面的 Duplex 串流。

所有 Writable 串流都實作由 stream.Writable 類別定義的介面。

雖然 Writable 串流的特定實例可能在各種方式上有所不同,但所有 Writable 串流都遵循以下範例中說明的基本使用模式

const myStream = getWritableStreamSomehow();
myStream.write('some data');
myStream.write('some more data');
myStream.end('done writing data'); 
類別:stream.Writable#
事件:'close'#

當串流及其任何底層資源(例如檔案描述符)已關閉時,會發出 'close' 事件。此事件表示不會再發出任何事件,也不會再進行任何運算。

如果 Writable 串流是以 emitClose 選項建立的,它將永遠發出 'close' 事件。

事件:'drain'#

如果呼叫 stream.write(chunk) 傳回 false,則會在適當時機發出 'drain' 事件,以繼續將資料寫入串流。

// Write the data to the supplied writable stream one million times.
// Be attentive to back-pressure.
function writeOneMillionTimes(writer, data, encoding, callback) {
  let i = 1000000;
  write();
  function write() {
    let ok = true;
    do {
      i--;
      if (i === 0) {
        // Last time!
        writer.write(data, encoding, callback);
      } else {
        // See if we should continue, or wait.
        // Don't pass the callback, because we're not done yet.
        ok = writer.write(data, encoding);
      }
    } while (i > 0 && ok);
    if (i > 0) {
      // Had to stop early!
      // Write some more once it drains.
      writer.once('drain', write);
    }
  }
} 
事件:'error'#

如果在寫入或管道傳輸資料時發生錯誤,則會發出 'error' 事件。呼叫時,會將單一 Error 參數傳遞給監聽器回呼函式。

在發出 'error' 事件時,會關閉串流,除非在建立串流時將 autoDestroy 選項設定為 false

'error' 之後,不應再發出 'close' 以外的其他事件(包括 'error' 事件)。

事件:'finish'#

在呼叫 stream.end() 方法後,且所有資料都已沖刷至底層系統後,會發出 'finish' 事件。

const writer = getWritableStreamSomehow();
for (let i = 0; i < 100; i++) {
  writer.write(`hello, #${i}!\n`);
}
writer.on('finish', () => {
  console.log('All writes are now complete.');
});
writer.end('This is the end\n'); 
事件:'pipe'#

在可讀取串流上呼叫 stream.pipe() 方法,將此可寫入串流新增至其目的地集合時,會發出 'pipe' 事件。

const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('pipe', (src) => {
  console.log('Something is piping into the writer.');
  assert.equal(src, reader);
});
reader.pipe(writer); 
事件:'unpipe'#

Readable 串流上呼叫 stream.unpipe() 方法,將此 Writable 從其目的地集合中移除時,會發出 'unpipe' 事件。

如果此 Writable 串流在 Readable 串流導向其中時發出錯誤,也會發出此事件。

const writer = getWritableStreamSomehow();
const reader = getReadableStreamSomehow();
writer.on('unpipe', (src) => {
  console.log('Something has stopped piping into the writer.');
  assert.equal(src, reader);
});
reader.pipe(writer);
reader.unpipe(writer); 
writable.cork()#

writable.cork() 方法強制所有已寫入資料緩衝在記憶體中。在呼叫 stream.uncork()stream.end() 方法時,會沖刷緩衝資料。

writable.cork() 的主要目的是適應一種情況,即多個小區塊會快速連續寫入串流。writable.cork() 緩衝所有區塊,直到呼叫 writable.uncork(),然後會將所有區塊傳遞給 writable._writev()(如果存在)。這可以防止資料在等待處理第一個小區塊時緩衝的排頭封鎖情況。不過,在未實作 writable._writev() 的情況下使用 writable.cork() 可能會對傳輸量產生負面影響。

另請參閱:writable.uncork()writable._writev()

writable.destroy([error])#
  • error <Error> 選擇性,使用 'error' 事件發出的錯誤。
  • 傳回:<this>

毀損串流。選擇性發出 'error' 事件,並發出 'close' 事件(除非將 emitClose 設為 false)。呼叫此函式後,可寫入串流已結束,後續呼叫 write()end() 會導致 ERR_STREAM_DESTROYED 錯誤。這是毀損串流的破壞性立即方式。先前呼叫的 write() 可能尚未耗盡,並可能觸發 ERR_STREAM_DESTROYED 錯誤。如果資料應在關閉前清除,請使用 end() 取代 destroy,或在毀損串流前等待 'drain' 事件。

const { Writable } = require('node:stream');

const myStream = new Writable();

const fooErr = new Error('foo error');
myStream.destroy(fooErr);
myStream.on('error', (fooErr) => console.error(fooErr.message)); // foo error 
const { Writable } = require('node:stream');

const myStream = new Writable();

myStream.destroy();
myStream.on('error', function wontHappen() {}); 
const { Writable } = require('node:stream');

const myStream = new Writable();
myStream.destroy();

myStream.write('foo', (error) => console.error(error.code));
// ERR_STREAM_DESTROYED 

呼叫 destroy() 後,任何進一步的呼叫都會成為無操作,且除了來自 _destroy() 的錯誤外,不會再發出任何錯誤作為 'error'

實作人員不應覆寫此方法,而應實作 writable._destroy()

writable.closed#

在發出 'close' 後為 true

writable.destroyed#

在呼叫 writable.destroy() 之後為 true

const { Writable } = require('node:stream');

const myStream = new Writable();

console.log(myStream.destroyed); // false
myStream.destroy();
console.log(myStream.destroyed); // true 
writable.end([chunk[, encoding]][, callback])#
  • chunk <string> | <Buffer> | <Uint8Array> | <any> 選擇性要寫入的資料。對於未在物件模式中運作的串流,chunk 必須是字串、BufferUint8Array。對於物件模式串流,chunk 可以是 null 以外的任何 JavaScript 值。
  • encoding <string> 如果 chunk 是字串,則為編碼
  • callback <Function> 串流完成時的回呼。
  • 傳回:<this>

呼叫 writable.end() 方法表示不會再寫入更多資料到 Writable。選擇性的 chunkencoding 參數允許在關閉串流之前立即寫入最後一個額外的資料區塊。

在呼叫 stream.end() 之後呼叫 stream.write() 方法會引發錯誤。

// Write 'hello, ' and then end with 'world!'.
const fs = require('node:fs');
const file = fs.createWriteStream('example.txt');
file.write('hello, ');
file.end('world!');
// Writing more now is not allowed! 
writable.setDefaultEncoding(encoding)#

可寫入。setDefaultEncoding() 方法設定 可寫入 串流的預設 編碼

可寫入。uncork()#

可寫入。uncork() 方法清除自 串流。cork() 被呼叫後的所有緩衝資料。

在使用 可寫入。cork()可寫入。uncork() 來管理寫入串流的緩衝時,請使用 process.nextTick() 延後呼叫 可寫入。uncork()。這樣做可以批次處理在給定的 Node.js 事件迴圈階段中發生的所有 可寫入。write() 呼叫。

stream.cork();
stream.write('some ');
stream.write('data ');
process.nextTick(() => stream.uncork()); 

如果 可寫入。cork() 方法在串流上被呼叫多次,則必須呼叫等數量的 可寫入。uncork() 來清除緩衝資料。

stream.cork();
stream.write('some ');
stream.cork();
stream.write('data ');
process.nextTick(() => {
  stream.uncork();
  // The data will not be flushed until uncork() is called a second time.
  stream.uncork();
}); 

另請參閱:可寫入。cork()

可寫入。可寫入#

如果呼叫 可寫入。write() 是安全的,則為 true,這表示串流尚未被銷毀、發生錯誤或結束。

可寫入。可寫入中止#

穩定性:1 - 實驗性

傳回串流是否在發出 'finish' 之前被銷毀或發生錯誤。

可寫入。可寫入已結束#

可寫入。end() 被呼叫後為 true。此屬性不會指出資料是否已清除,請改用 可寫入。可寫入已完成

可寫入。可寫入已塞住#

需要呼叫 writable.uncork() 的次數,才能完全解除串流的封鎖。

writable.errored#

如果串流已因錯誤而毀損,則傳回錯誤。

writable.writableFinished#

在發出 'finish' 事件之前,立即設定為 true

writable.writableHighWaterMark#

傳回建立此 Writable 時傳遞的 highWaterMark 值。

writable.writableLength#

此屬性包含準備寫入佇列中的位元組 (或物件) 數量。此值提供有關 highWaterMark 狀態的內省資料。

writable.writableNeedDrain#

如果串流的緩衝區已滿,且串流會發出 'drain',則為 true

writable.writableObjectMode#

給定 Writable 串流的 objectMode 屬性的取得器。

writable.write(chunk[, encoding][, callback])#
  • chunk <string> | <Buffer> | <Uint8Array> | <any> 選擇性要寫入的資料。對於未在物件模式中運作的串流,chunk 必須是字串、BufferUint8Array。對於物件模式串流,chunk 可以是 null 以外的任何 JavaScript 值。
  • encoding <string> | <null> 編碼,如果 chunk 是字串。預設值:'utf8'
  • callback <Function> 當這段資料被沖刷時要執行的回呼函式。
  • 傳回:<boolean> false 如果串流希望呼叫程式碼等待 'drain' 事件發出後才繼續寫入其他資料;否則為 true

writable.write() 方法將一些資料寫入串流中,並在資料完全處理完畢後呼叫提供的 callback。如果發生錯誤,callback 將會以錯誤作為第一個引數被呼叫。callback 是非同步呼叫的,且在 'error' 發出之前。

如果在接納 chunk 後,內部緩衝區小於串流建立時設定的 highWaterMark,則傳回值為 true。如果傳回 false,則應停止進一步嘗試將資料寫入串流,直到 'drain' 事件發出。

串流未排空時,呼叫 write() 會緩衝 區塊,並傳回 false。一旦所有目前緩衝的區塊都已排空(由作業系統接受傳遞),就會發出 'drain' 事件。一旦 write() 傳回 false,請勿再寫入更多區塊,直到發出 'drain' 事件。雖然允許在未排空的串流上呼叫 write(),但 Node.js 會緩衝所有已寫入的區塊,直到達到最大記憶體使用量,此時它會無條件中止。即使在中止之前,高記憶體使用量也會導致垃圾收集器效能不佳和 RSS 過高(通常不會在不再需要記憶體後釋放回系統)。由於 TCP socket 可能永遠不會排空(如果遠端對等端未讀取資料),寫入未排空的 socket 可能會導致遠端可利用的漏洞。

在串流未排空時寫入資料對 Transform 來說特別有問題,因為 Transform 串流在預設情況下會暫停,直到它們被串接或加入 'data''readable' 事件處理常式。

如果要寫入的資料可以依需求產生或擷取,建議將邏輯封裝到 Readable 中,並使用 stream.pipe()。但是,如果偏好呼叫 write(),可以使用 'drain' 事件來尊重反壓並避免記憶體問題

function write(data, cb) {
  if (!stream.write(data)) {
    stream.once('drain', cb);
  } else {
    process.nextTick(cb);
  }
}

// Wait for cb to be called before doing any other write.
write('hello', () => {
  console.log('Write completed, do more writes now.');
}); 

物件模式中的 Writable 串流將永遠忽略 編碼 參數。

可讀取串流#

可讀取串流是從中使用資料的來源的抽象。

Readable 串流的範例包括

所有 Readable 串流都實作由 stream.Readable 類別定義的介面。

兩種讀取模式#

Readable 串流實際上以兩種模式之一運作:流動和暫停。這些模式與 物件模式 分開。Readable 串流可以是物件模式,也可以不是,無論它是在流動模式還是暫停模式中。

  • 在流動模式中,資料會自動從底層系統讀取,並透過 EventEmitter 介面的事件盡可能快地提供給應用程式。

  • 在暫停模式中,必須明確呼叫 stream.read() 方法才能從串流中讀取資料區塊。

所有 Readable 串流都從暫停模式開始,但可以使用下列其中一種方式切換到流動模式

Readable 可以使用下列其中一種方式切換回暫停模式

  • 如果沒有管線目的地,呼叫 stream.pause() 方法。
  • 如果存在管線目的地,移除所有管線目的地。可以透過呼叫 stream.unpipe() 方法移除多個管線目的地。

要記住的重要觀念是,Readable 在提供消耗或忽略資料的機制之前不會產生資料。如果消耗機制被停用或移除,Readable嘗試停止產生資料。

基於向後相容性考量,移除 'data' 事件處理常式不會自動暫停串流。此外,如果存在管線目的地,則呼叫 stream.pause() 無法保證在這些目的地耗盡並要求更多資料後,串流將保持暫停狀態。

如果 Readable 切換至流動模式,且沒有消費者可處理資料,則該資料將會遺失。例如,當呼叫 readable.resume() 方法,但未附加監聽器至 'data' 事件,或從串流中移除 'data' 事件處理常式時,就會發生此情況。

新增 'readable' 事件處理常式會自動停止串流流動,且資料必須透過 readable.read() 來使用。如果移除 'readable' 事件處理常式,則如果存在 'data' 事件處理常式,串流將重新開始流動。

三種狀態#

Readable 串流的「兩種模式」操作是對 Readable 串流實作中發生的更複雜內部狀態管理進行簡化的抽象。

特別地,在任何特定時間點,每個 Readable 都處於三種可能狀態之一

  • readable.readableFlowing === null
  • readable.readableFlowing === false
  • readable.readableFlowing === true

readable.readableFlowingnull 時,不會提供使用串流資料的機制。因此,串流不會產生資料。在此狀態下,附加 'data' 事件的監聽器、呼叫 readable.pipe() 方法或呼叫 readable.resume() 方法會將 readable.readableFlowing 切換為 true,導致 Readable 開始在產生資料時主動發出事件。

呼叫 readable.pause()readable.unpipe() 或接收反壓力會導致將 readable.readableFlowing 設為 false,暫時停止事件流動,但不會停止產生資料。在此狀態下,附加 'data' 事件的監聽器不會將 readable.readableFlowing 切換為 true

const { PassThrough, Writable } = require('node:stream');
const pass = new PassThrough();
const writable = new Writable();

pass.pipe(writable);
pass.unpipe(writable);
// readableFlowing is now false.

pass.on('data', (chunk) => { console.log(chunk.toString()); });
// readableFlowing is still false.
pass.write('ok');  // Will not emit 'data'.
pass.resume();     // Must be called to make stream emit 'data'.
// readableFlowing is now true. 

readable.readableFlowingfalse 時,資料可能會累積在串流的內部緩衝區中。

選擇一種 API 樣式#

Readable 串流 API 在多個 Node.js 版本中演進,並提供多種使用串流資料的方法。一般來說,開發人員應選擇一種使用資料的方法,絕不應使用多種方法來使用單一串流的資料。具體來說,使用 on('data')on('readable')pipe() 或非同步反覆運算器的組合可能會導致非直覺的行為。

類別:stream.Readable#
事件:'close'#

當串流及其任何底層資源(例如檔案描述符)已關閉時,會發出 'close' 事件。此事件表示不會再發出任何事件,也不會再進行任何運算。

如果 Readable 串流使用 emitClose 選項建立,它將永遠發出 'close' 事件。

事件:'data'#
  • chunk <Buffer> | <string> | <any> 資料區塊。對於未在物件模式中運作的串流,區塊將會是字串或 Buffer。對於在物件模式中的串流,區塊可以是任何 JavaScript 值,除了 null

每當串流放棄資料區塊的所有權給使用者時,就會發出 'data' 事件。這可能會在串流透過呼叫 readable.pipe()readable.resume() 或附加監聽器回呼給 'data' 事件而切換到流動模式時發生。每當呼叫 readable.read() 方法且有資料區塊可供傳回時,也會發出 'data' 事件。

'data' 事件監聽器附加到未明確暫停的串流會將串流切換到流動模式。然後,資料將在可用的情況下立即傳遞。

如果使用 readable.setEncoding() 方法為串流指定預設編碼,則會將資料區塊作為字串傳遞給偵聽器回呼;否則,資料將作為 Buffer 傳遞。

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
}); 
事件:'end'#

當串流中沒有更多資料可供使用時,會發出 'end' 事件。

除非資料已完全使用,否則 不會發出 'end' 事件。這可透過將串流切換至流動模式,或重複呼叫 stream.read(),直到所有資料都已使用完畢,來達成。

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
});
readable.on('end', () => {
  console.log('There will be no more data.');
}); 
事件:'error'#

Readable 實作可能會隨時發出 'error' 事件。這通常會發生在基礎串流無法產生資料,因為基礎內部發生故障,或當串流實作嘗試推送無效資料區塊時。

偵聽器回呼會傳遞一個單一的 Error 物件。

事件:'pause'#

當呼叫 stream.pause(),且 readableFlowing 不為 false 時,會發出 'pause' 事件。

事件:'readable'#

當串流中有資料可供讀取,或已到達串流尾端時,會發出 'readable' 事件。實際上,'readable' 事件表示串流有新資訊。如果資料可用,stream.read() 會傳回該資料。

const readable = getReadableStreamSomehow();
readable.on('readable', function() {
  // There is some data to read now.
  let data;

  while ((data = this.read()) !== null) {
    console.log(data);
  }
}); 

如果串流的結尾已經到達,呼叫 stream.read() 會傳回 null 並觸發 'end' 事件。如果從來沒有資料可以讀取,這也是正確的。例如,在以下範例中,foo.txt 是個空檔案

const fs = require('node:fs');
const rr = fs.createReadStream('foo.txt');
rr.on('readable', () => {
  console.log(`readable: ${rr.read()}`);
});
rr.on('end', () => {
  console.log('end');
}); 

執行這個指令碼的輸出是

$ node test.js
readable: null
end 

在某些情況下,附加 'readable' 事件的監聽器會導致一些資料讀取到內部緩衝區。

一般來說,readable.pipe()'data' 事件機制比 'readable' 事件更容易理解。不過,處理 'readable' 可能會提高處理量。

如果 'readable''data' 同時使用,'readable' 優先控制流程,亦即只有在呼叫 stream.read() 時才會發出 'data'readableFlowing 屬性會變成 false。如果在移除 'readable' 時有 'data' 監聽器,串流會開始流動,亦即會發出 'data' 事件,而不會呼叫 .resume()

事件:'resume'#

當呼叫 stream.resume()readableFlowing 不是 true 時,會發出 'resume' 事件。

readable.destroy([error])#
  • error <Error> 錯誤,會在 'error' 事件中作為酬載傳遞
  • 傳回:<this>

銷毀串流。選擇性地發出 'error' 事件,並發出 'close' 事件(除非 emitClose 設為 false)。在此呼叫後,可讀取串流將釋放任何內部資源,而後續對 push() 的呼叫將被忽略。

呼叫 destroy() 後,任何進一步的呼叫都會成為無操作,且除了來自 _destroy() 的錯誤外,不會再發出任何錯誤作為 'error'

實作人員不應覆寫此方法,而應實作 readable._destroy()

readable.closed#

在發出 'close' 後為 true

readable.destroyed#

在呼叫 readable.destroy() 後為 true

readable.isPaused()#

readable.isPaused() 方法傳回 Readable 的目前作業狀態。這主要由 readable.pipe() 方法底層的機制使用。在大部分典型案例中,沒有理由直接使用此方法。

const readable = new stream.Readable();

readable.isPaused(); // === false
readable.pause();
readable.isPaused(); // === true
readable.resume();
readable.isPaused(); // === false 
readable.pause()#

readable.pause() 方法將使流動模式中的串流停止發出 'data' 事件,並切換出流動模式。任何可用的資料都將留在內部緩衝區中。

const readable = getReadableStreamSomehow();
readable.on('data', (chunk) => {
  console.log(`Received ${chunk.length} bytes of data.`);
  readable.pause();
  console.log('There will be no additional data for 1 second.');
  setTimeout(() => {
    console.log('Now data will start flowing again.');
    readable.resume();
  }, 1000);
}); 

如果有一個 'readable' 事件監聽器,readable.pause() 方法將不會有作用。

readable.pipe(destination[, options])#

readable.pipe() 方法將 Writable 串流附加到 readable,導致它自動切換到流動模式,並將所有資料推送到附加的 Writable。資料流動將自動管理,以便目的地 Writable 串流不會被更快的 Readable 串流壓垮。

以下範例將所有資料從 readable 管接到名為 file.txt 的檔案中

const fs = require('node:fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt'.
readable.pipe(writable); 

可以將多個 Writable 串流附加到單一 Readable 串流。

readable.pipe() 方法傳回對 目的地 串流的參考,使串接管接串流成為可能

const fs = require('node:fs');
const zlib = require('node:zlib');
const r = fs.createReadStream('file.txt');
const z = zlib.createGzip();
const w = fs.createWriteStream('file.txt.gz');
r.pipe(z).pipe(w); 

預設情況下,當來源 Readable 串流發出 'end' 時,會在目的地 Writable 串流上呼叫 stream.end(),以便目的地不再可寫入。若要停用此預設行為,可以將 end 選項傳遞為 false,導致目的地串流保持開啟

reader.pipe(writer, { end: false });
reader.on('end', () => {
  writer.end('Goodbye\n');
}); 

一個重要的注意事項是,如果 Readable 串流在處理期間發出錯誤,則 Writable 目的地不會自動關閉。如果發生錯誤,則需要手動關閉每個串流,以防止記憶體外洩。

Writable 串流 process.stderrprocess.stdout 永遠不會關閉,直到 Node.js 程序結束,無論指定的選項為何。

readable.read([size])#

readable.read() 方法會讀取內部緩衝區的資料並傳回。如果沒有可讀取的資料,則會傳回 null。預設情況下,資料會傳回為 Buffer 物件,除非已使用 readable.setEncoding() 方法指定編碼,或串流處於物件模式中。

可選的 size 參數會指定要讀取的特定位元組數。如果沒有足夠的 size 位元組可供讀取,則會傳回 null除非串流已結束,這種情況下會傳回內部緩衝區中所有剩餘的資料)。

如果未指定 size 參數,則會傳回內部緩衝區中包含的所有資料。

size 參數必須小於或等於 1 GiB。

readable.read() 方法只能在處於暫停模式的 Readable 串流上呼叫。在流動模式中,readable.read() 會自動呼叫,直到內部緩衝區完全清空。

const readable = getReadableStreamSomehow();

// 'readable' may be triggered multiple times as data is buffered in
readable.on('readable', () => {
  let chunk;
  console.log('Stream is readable (new data received in buffer)');
  // Use a loop to make sure we read all currently available data
  while (null !== (chunk = readable.read())) {
    console.log(`Read ${chunk.length} bytes of data...`);
  }
});

// 'end' will be triggered once when there is no more data available
readable.on('end', () => {
  console.log('Reached end of stream.');
}); 

每次呼叫 readable.read() 會傳回一個資料區塊,或 null。這些區塊不會串接。必須使用 while 迴圈才能使用目前緩衝區中的所有資料。在讀取大型檔案時,.read() 可能會傳回 null,表示已使用完目前所有緩衝內容,但仍有更多資料尚未緩衝。在這種情況下,當緩衝區中有更多資料時,會發出新的 'readable' 事件。最後,當沒有更多資料時,會發出 'end' 事件。

因此,要從 readable 讀取檔案的全部內容,必須在多個 'readable' 事件中收集區塊

const chunks = [];

readable.on('readable', () => {
  let chunk;
  while (null !== (chunk = readable.read())) {
    chunks.push(chunk);
  }
});

readable.on('end', () => {
  const content = chunks.join('');
}); 

物件模式中的 Readable 串流會永遠從呼叫 readable.read(size) 傳回單一項目,而不論 size 參數的值為何。

如果 readable.read() 方法傳回資料區塊,也會發出 'data' 事件。

在發出 'end' 事件後呼叫 stream.read([size]) 會傳回 null。不會引發執行時期錯誤。

readable.readable#

如果可以安全呼叫 readable.read(),則為 true,表示串流尚未毀損或發出 'error''end'

readable.readableAborted#

穩定性:1 - 實驗性

傳回串流是否在發出 'end' 之前毀損或發生錯誤。

readable.readableDidRead#

穩定性:1 - 實驗性

傳回是否已發出 'data'

readable.readableEncoding#

給定 Readable 串流的屬性 encoding 的 getter。可以使用 readable.setEncoding() 方法設定 encoding 屬性。

readable.readableEnded#

'end' 事件發出時,變為 true

readable.errored#

如果串流已因錯誤而毀損,則傳回錯誤。

readable.readableFlowing#

這個屬性反映了 三種狀態 區段中所述的 Readable 串流的目前狀態。

readable.readableHighWaterMark#

傳回建立這個 Readable 時傳遞的 highWaterMark 值。

readable.readableLength#

這個屬性包含準備好要讀取的佇列中的位元組 (或物件) 數量。這個值提供關於 highWaterMark 狀態的內省資料。

readable.readableObjectMode#

給定 Readable 串流的屬性 objectMode 的 getter。

readable.resume()#

readable.resume() 方法會導致一個明確暫停的 Readable 串流繼續發出 'data' 事件,將串流切換到流動模式。

readable.resume() 方法可以用來完全使用來自串流的資料,而不需要實際處理任何資料

getReadableStreamSomehow()
  .resume()
  .on('end', () => {
    console.log('Reached the end, but did not read anything.');
  }); 

如果有一個 'readable' 事件監聽器,則 readable.resume() 方法沒有作用。

readable.setEncoding(encoding)#

readable.setEncoding() 方法設定從 Readable 串流讀取資料的字元編碼。

預設情況下,不會指定編碼,串流資料將傳回為 Buffer 物件。設定編碼會導致串流資料傳回為指定編碼的字串,而不是 Buffer 物件。例如,呼叫 readable.setEncoding('utf8') 會導致輸出資料被解釋為 UTF-8 資料,並傳遞為字串。呼叫 readable.setEncoding('hex') 會導致資料以十六進位字串格式編碼。

Readable 串流會適當處理透過串流傳送的多位元組字元,否則如果僅從串流中提取為 Buffer 物件,這些字元會被不當解碼。

const readable = getReadableStreamSomehow();
readable.setEncoding('utf8');
readable.on('data', (chunk) => {
  assert.equal(typeof chunk, 'string');
  console.log('Got %d characters of string data:', chunk.length);
}); 
readable.unpipe([destination])#

readable.unpipe() 方法會分離先前使用 stream.pipe() 方法附加的 Writable 串流。

如果未指定 destination,則會分離所有連接。

如果指定 destination,但未設定任何連接,則此方法不會執行任何動作。

const fs = require('node:fs');
const readable = getReadableStreamSomehow();
const writable = fs.createWriteStream('file.txt');
// All the data from readable goes into 'file.txt',
// but only for the first second.
readable.pipe(writable);
setTimeout(() => {
  console.log('Stop writing to file.txt.');
  readable.unpipe(writable);
  console.log('Manually close the file stream.');
  writable.end();
}, 1000); 
readable.unshift(chunk[, encoding])#
  • chunk <Buffer> | <Uint8Array> | <string> | <null> | <any> 要在讀取佇列中取消的資料區塊。對於不以物件模式運作的串流,chunk 必須是字串、BufferUint8Arraynull。對於物件模式串流,chunk 可以是任何 JavaScript 值。
  • encoding <string> 字串區塊的編碼。必須是有效的 Buffer 編碼,例如 'utf8''ascii'

chunk 傳遞為 null 表示串流的結尾 (EOF),其行為與 readable.push(null) 相同,之後無法再寫入更多資料。EOF 訊號會放在緩衝區的結尾,而任何緩衝資料仍會被清除。

readable.unshift() 方法會將資料區塊推回內部緩衝區。這在某些情況下很有用,例如串流正被需要「取消使用」部分已從來源樂觀提取的資料的程式碼使用,以便將資料傳遞給其他方。

在發出 'end' 事件後,無法呼叫 stream.unshift(chunk) 方法,否則會擲回執行時期錯誤。

使用 stream.unshift() 的開發人員通常應考慮改用 Transform 串流。請參閱 串流實作者的 API 區段以取得更多資訊。

// Pull off a header delimited by \n\n.
// Use unshift() if we get too much.
// Call the callback with (error, header, stream).
const { StringDecoder } = require('node:string_decoder');
function parseHeader(stream, callback) {
  stream.on('error', callback);
  stream.on('readable', onReadable);
  const decoder = new StringDecoder('utf8');
  let header = '';
  function onReadable() {
    let chunk;
    while (null !== (chunk = stream.read())) {
      const str = decoder.write(chunk);
      if (str.includes('\n\n')) {
        // Found the header boundary.
        const split = str.split(/\n\n/);
        header += split.shift();
        const remaining = split.join('\n\n');
        const buf = Buffer.from(remaining, 'utf8');
        stream.removeListener('error', callback);
        // Remove the 'readable' listener before unshifting.
        stream.removeListener('readable', onReadable);
        if (buf.length)
          stream.unshift(buf);
        // Now the body of the message can be read from the stream.
        callback(null, header, stream);
        return;
      }
      // Still reading the header.
      header += str;
    }
  }
} 

stream.push(chunk) 不同,stream.unshift(chunk) 透過重設串流的內部讀取狀態,不會結束讀取程序。如果在讀取期間呼叫 readable.unshift()(即在自訂串流中呼叫 stream._read() 實作內),可能會造成意外的結果。在呼叫 readable.unshift() 之後,立即呼叫 stream.push('') 將適當地重設讀取狀態,但最好在執行讀取程序時,避免呼叫 readable.unshift()

readable.wrap(stream)#

在 Node.js 0.10 之前,串流並未實作目前定義的完整 node:stream 模組 API。(請參閱 相容性 以取得更多資訊。)

在使用會發出 'data' 事件且具有僅為建議性質的 stream.pause() 方法的舊版 Node.js 函式庫時,readable.wrap() 方法可用於建立 Readable 串流,並使用舊串流作為其資料來源。

很少需要使用 readable.wrap(),但已提供此方法,以方便與舊版 Node.js 應用程式和函式庫進行互動。

const { OldReader } = require('./old-api-module.js');
const { Readable } = require('node:stream');
const oreader = new OldReader();
const myReader = new Readable().wrap(oreader);

myReader.on('readable', () => {
  myReader.read(); // etc.
}); 
readable[Symbol.asyncIterator]()#
const fs = require('node:fs');

async function print(readable) {
  readable.setEncoding('utf8');
  let data = '';
  for await (const chunk of readable) {
    data += chunk;
  }
  console.log(data);
}

print(fs.createReadStream('file')).catch(console.error); 

如果迴圈以 breakreturnthrow 終止,串流將會被銷毀。換句話說,重複串流會完全消耗串流。串流將以等於 highWaterMark 選項大小的區塊讀取。在上述程式碼範例中,如果檔案的資料少於 64 KiB,資料將會在單一區塊中,因為沒有提供 highWaterMark 選項給 fs.createReadStream()

readable[Symbol.asyncDispose]()#

穩定性:1 - 實驗性

呼叫 readable.destroy(),並帶有 AbortError,並傳回一個承諾,在串流完成時兌現。

readable.compose(stream[, options])#

穩定性:1 - 實驗性

import { Readable } from 'node:stream';

async function* splitToWords(source) {
  for await (const chunk of source) {
    const words = String(chunk).split(' ');

    for (const word of words) {
      yield word;
    }
  }
}

const wordsStream = Readable.from(['this is', 'compose as operator']).compose(splitToWords);
const words = await wordsStream.toArray();

console.log(words); // prints ['this', 'is', 'compose', 'as', 'operator'] 

請參閱 stream.compose 以取得更多資訊。

readable.iterator([options])#

穩定性:1 - 實驗性

  • options <Object>
    • destroyOnReturn <布林值> 設定為 false 時,呼叫非同步反覆運算器的 return,或使用 breakreturnthrow 離開 for await...of 反覆運算,不會摧毀串流。預設:true
  • 傳回:<AsyncIterator> 來使用串流。

此方法建立的反覆運算器讓使用者有選項取消摧毀串流,如果 for await...of 迴圈是由 returnbreakthrow 離開,或如果串流在反覆運算期間發出錯誤,反覆運算器應摧毀串流。

const { Readable } = require('node:stream');

async function printIterator(readable) {
  for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
    console.log(chunk); // 1
    break;
  }

  console.log(readable.destroyed); // false

  for await (const chunk of readable.iterator({ destroyOnReturn: false })) {
    console.log(chunk); // Will print 2 and then 3
  }

  console.log(readable.destroyed); // True, stream was totally consumed
}

async function printSymbolAsyncIterator(readable) {
  for await (const chunk of readable) {
    console.log(chunk); // 1
    break;
  }

  console.log(readable.destroyed); // true
}

async function showBoth() {
  await printIterator(Readable.from([1, 2, 3]));
  await printSymbolAsyncIterator(Readable.from([1, 2, 3]));
}

showBoth(); 
readable.map(fn[, options])#

穩定性:1 - 實驗性

  • fn <函式> | <非同步函式> 一個函式,用於對串流中的每個區塊進行對應。
    • data <任何> 來自串流的一個資料區塊。
    • options <Object>
      • signal <中止訊號> 如果串流被摧毀,則中止,允許提早中止 fn 呼叫。
  • options <Object>
    • concurrency <數字> 在串流上同時呼叫 fn 的最大並行呼叫次數。預設:1
    • highWaterMark <數字> 在等待使用者使用對應項目時,要緩衝多少項目。預設:concurrency * 2 - 1
    • signal <AbortSignal> 允許在訊號中斷時銷毀串流。
  • 傳回:<可讀取> 使用函式 fn 對應的串流。

此方法允許對串流進行對應。fn 函式會在串流中的每個區塊呼叫。如果 fn 函式傳回一個承諾,在傳遞給結果串流之前,會 await 該承諾。

import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';

// With a synchronous mapper.
for await (const chunk of Readable.from([1, 2, 3, 4]).map((x) => x * 2)) {
  console.log(chunk); // 2, 4, 6, 8
}
// With an asynchronous mapper, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
  'nodejs.org',
  'openjsf.org',
  'www.linuxfoundation.org',
]).map((domain) => resolver.resolve4(domain), { concurrency: 2 });
for await (const result of dnsResults) {
  console.log(result); // Logs the DNS result of resolver.resolve4.
} 
readable.filter(fn[, options])#

穩定性:1 - 實驗性

  • fn <Function> | <AsyncFunction> 用於從串流中篩選區塊的函式。
    • data <任何> 來自串流的一個資料區塊。
    • options <Object>
      • signal <中止訊號> 如果串流被摧毀,則中止,允許提早中止 fn 呼叫。
  • options <Object>
    • concurrency <數字> 在串流上同時呼叫 fn 的最大並行呼叫次數。預設:1
    • highWaterMark <number> 在等待使用者使用已篩選項目時要緩衝多少項目。預設:concurrency * 2 - 1
    • signal <AbortSignal> 允許在訊號中斷時銷毀串流。
  • 傳回:<Readable> 已使用謂詞 fn 篩選的串流。

此方法允許篩選串流。對於串流中的每個區塊,將呼叫 fn 函式,如果它傳回真值,則區塊將傳遞至結果串流。如果 fn 函式傳回承諾,則將 await 該承諾。

import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';

// With a synchronous predicate.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
  console.log(chunk); // 3, 4
}
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
  'nodejs.org',
  'openjsf.org',
  'www.linuxfoundation.org',
]).filter(async (domain) => {
  const { address } = await resolver.resolve4(domain, { ttl: true });
  return address.ttl > 60;
}, { concurrency: 2 });
for await (const result of dnsResults) {
  // Logs domains with more than 60 seconds on the resolved dns record.
  console.log(result);
} 
readable.forEach(fn[, options])#

穩定性:1 - 實驗性

此方法允許迭代串流。對於串流中的每個區塊,將呼叫 fn 函式。如果 fn 函式傳回承諾,則將 await 該承諾。

此方法與 for await...of 迴圈不同,在於它可以選擇同時處理區塊。此外,只有傳遞 signal 選項並中止相關的 AbortController,才能停止 forEach 迭代,而 for await...of 可以使用 breakreturn 停止。在任何一種情況下,串流都會被銷毀。

此方法與聆聽 'data' 事件不同,在於它使用底層機制中的 readable 事件,並且可以限制同時發出 fn 呼叫的數量。

import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';

// With a synchronous predicate.
for await (const chunk of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
  console.log(chunk); // 3, 4
}
// With an asynchronous predicate, making at most 2 queries at a time.
const resolver = new Resolver();
const dnsResults = Readable.from([
  'nodejs.org',
  'openjsf.org',
  'www.linuxfoundation.org',
]).map(async (domain) => {
  const { address } = await resolver.resolve4(domain, { ttl: true });
  return address;
}, { concurrency: 2 });
await dnsResults.forEach((result) => {
  // Logs result, similar to `for await (const result of dnsResults)`
  console.log(result);
});
console.log('done'); // Stream has finished 
readable.toArray([options])#

穩定性:1 - 實驗性

此方法允許輕鬆取得串流的內容。

由於此方法會將整個串流讀取到記憶體中,因此它否定了串流的優點。它是為了互通性和便利性而設計,而不是作為使用串流的主要方式。

import { Readable } from 'node:stream';
import { Resolver } from 'node:dns/promises';

await Readable.from([1, 2, 3, 4]).toArray(); // [1, 2, 3, 4]

// Make dns queries concurrently using .map and collect
// the results into an array using toArray
const dnsResults = await Readable.from([
  'nodejs.org',
  'openjsf.org',
  'www.linuxfoundation.org',
]).map(async (domain) => {
  const { address } = await resolver.resolve4(domain, { ttl: true });
  return address;
}, { concurrency: 2 }).toArray(); 
readable.some(fn[, options])#

穩定性:1 - 實驗性

  • fn <Function> | <AsyncFunction> 要在串流的每個區塊上呼叫的函式。
    • data <任何> 來自串流的一個資料區塊。
    • options <Object>
      • signal <中止訊號> 如果串流被摧毀,則中止,允許提早中止 fn 呼叫。
  • options <Object>
    • concurrency <數字> 在串流上同時呼叫 fn 的最大並行呼叫次數。預設:1
    • signal <AbortSignal> 允許在訊號中斷時銷毀串流。
  • 傳回:<Promise> 如果 fn 至少對其中一個區塊傳回真值,則評估為 true 的 Promise。

此方法類似於 Array.prototype.some,並對串流中的每個區塊呼叫 fn,直到等待的傳回值為 true(或任何真值)。一旦區塊上的 fn 呼叫等待傳回值為真值,串流就會被銷毀,而 Promise 則會以 true 滿足。如果區塊上的 fn 呼叫都沒有傳回真值,則 Promise 會以 false 滿足。

import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';

// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).some((x) => x > 2); // true
await Readable.from([1, 2, 3, 4]).some((x) => x < 0); // false

// With an asynchronous predicate, making at most 2 file checks at a time.
const anyBigFile = await Readable.from([
  'file1',
  'file2',
  'file3',
]).some(async (fileName) => {
  const stats = await stat(fileName);
  return stats.size > 1024 * 1024;
}, { concurrency: 2 });
console.log(anyBigFile); // `true` if any file in the list is bigger than 1MB
console.log('done'); // Stream has finished 
readable.find(fn[, options])#

穩定性:1 - 實驗性

  • fn <Function> | <AsyncFunction> 要在串流的每個區塊上呼叫的函式。
    • data <任何> 來自串流的一個資料區塊。
    • options <Object>
      • signal <中止訊號> 如果串流被摧毀,則中止,允許提早中止 fn 呼叫。
  • options <Object>
    • concurrency <數字> 在串流上同時呼叫 fn 的最大並行呼叫次數。預設:1
    • signal <AbortSignal> 允許在訊號中斷時銷毀串流。
  • 傳回:<Promise> 一個 Promise,評估為第一個區塊,其 fn 評估為真值,或如果找不到元素,則為 undefined

這個方法類似於 Array.prototype.find,並在串流中的每個區塊上呼叫 fn,以找到 fn 為真值的區塊。一旦 fn 呼叫的等待回傳值為真值,串流就會被銷毀,且 Promise 會以 fn 回傳真值的數值來達成。如果區塊上的所有 fn 呼叫都回傳假值,則 Promise 會以 undefined 來達成。

import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';

// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).find((x) => x > 2); // 3
await Readable.from([1, 2, 3, 4]).find((x) => x > 0); // 1
await Readable.from([1, 2, 3, 4]).find((x) => x > 10); // undefined

// With an asynchronous predicate, making at most 2 file checks at a time.
const foundBigFile = await Readable.from([
  'file1',
  'file2',
  'file3',
]).find(async (fileName) => {
  const stats = await stat(fileName);
  return stats.size > 1024 * 1024;
}, { concurrency: 2 });
console.log(foundBigFile); // File name of large file, if any file in the list is bigger than 1MB
console.log('done'); // Stream has finished 
readable.every(fn[, options])#

穩定性:1 - 實驗性

  • fn <Function> | <AsyncFunction> 要在串流的每個區塊上呼叫的函式。
    • data <任何> 來自串流的一個資料區塊。
    • options <Object>
      • signal <中止訊號> 如果串流被摧毀,則中止,允許提早中止 fn 呼叫。
  • options <Object>
    • concurrency <數字> 在串流上同時呼叫 fn 的最大並行呼叫次數。預設:1
    • signal <AbortSignal> 允許在訊號中斷時銷毀串流。
  • 傳回:<Promise> 一個 Promise,如果 fn 對所有區塊都回傳真值,則評估為 true

這個方法類似於 Array.prototype.every,並在串流中的每個區塊上呼叫 fn,以檢查所有等待回傳值是否為 fn 的真值。一旦區塊上的 fn 呼叫的等待回傳值為假值,串流就會被銷毀,且 Promise 會以 false 來達成。如果區塊上的所有 fn 呼叫都回傳真值,則 Promise 會以 true 來達成。

import { Readable } from 'node:stream';
import { stat } from 'node:fs/promises';

// With a synchronous predicate.
await Readable.from([1, 2, 3, 4]).every((x) => x > 2); // false
await Readable.from([1, 2, 3, 4]).every((x) => x > 0); // true

// With an asynchronous predicate, making at most 2 file checks at a time.
const allBigFiles = await Readable.from([
  'file1',
  'file2',
  'file3',
]).every(async (fileName) => {
  const stats = await stat(fileName);
  return stats.size > 1024 * 1024;
}, { concurrency: 2 });
// `true` if all files in the list are bigger than 1MiB
console.log(allBigFiles);
console.log('done'); // Stream has finished 
readable.flatMap(fn[, options])#

穩定性:1 - 實驗性

此方法透過將給定的回呼函式套用至串流的每個區塊,然後壓平結果,來傳回新的串流。

可以從 fn 傳回串流或其他可迭代或非同步可迭代,而結果串流會合併 (壓平) 至傳回的串流。

import { Readable } from 'node:stream';
import { createReadStream } from 'node:fs';

// With a synchronous mapper.
for await (const chunk of Readable.from([1, 2, 3, 4]).flatMap((x) => [x, x])) {
  console.log(chunk); // 1, 1, 2, 2, 3, 3, 4, 4
}
// With an asynchronous mapper, combine the contents of 4 files
const concatResult = Readable.from([
  './1.mjs',
  './2.mjs',
  './3.mjs',
  './4.mjs',
]).flatMap((fileName) => createReadStream(fileName));
for await (const result of concatResult) {
  // This will contain the contents (all chunks) of all 4 files
  console.log(result);
} 
readable.drop(limit[, options])#

穩定性:1 - 實驗性

  • limit <number> 要從可讀取串流中刪除的區塊數。
  • options <Object>
  • 傳回:<Readable> 已刪除 limit 個區塊的串流。

此方法傳回新的串流,其中已刪除前 limit 個區塊。

import { Readable } from 'node:stream';

await Readable.from([1, 2, 3, 4]).drop(2).toArray(); // [3, 4] 
readable.take(limit[, options])#

穩定性:1 - 實驗性

  • limit <number> 要從可讀取串流中擷取的區塊數。
  • options <Object>
  • 傳回:<Readable> 已擷取 limit 個區塊的串流。

此方法傳回新的串流,其中包含前 limit 個區塊。

import { Readable } from 'node:stream';

await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2] 
readable.reduce(fn[, initial[, options]])#

穩定性:1 - 實驗性

  • fn <Function> | <AsyncFunction> 縮減函式,用於呼叫串流中的每個區塊。
    • previous <any>fn 的最後一次呼叫取得的值,或指定的 initial 值,否則為串流的第一個區塊。
    • data <任何> 來自串流的一個資料區塊。
    • options <Object>
      • signal <中止訊號> 如果串流被摧毀,則中止,允許提早中止 fn 呼叫。
  • initial <any> 在縮減中使用的初始值。
  • options <Object>
  • 傳回:<Promise> 縮減最終值的承諾。

此方法依序對串流的每個區塊呼叫 fn,傳遞給它前一個元素計算的結果。它傳回縮減最終值的承諾。

如果沒有提供 initial 值,串流的第一個區塊會用作初始值。如果串流是空的,承諾會以 TypeError 拒絕,其 ERR_INVALID_ARGS 程式碼屬性。

import { Readable } from 'node:stream';
import { readdir, stat } from 'node:fs/promises';
import { join } from 'node:path';

const directoryPath = './src';
const filesInDir = await readdir(directoryPath);

const folderSize = await Readable.from(filesInDir)
  .reduce(async (totalSize, file) => {
    const { size } = await stat(join(directoryPath, file));
    return totalSize + size;
  }, 0);

console.log(folderSize); 

縮減器函式逐一迭代串流元素,表示沒有 concurrency 參數或並行性。若要同時執行 reduce,您可以將非同步函式萃取到 readable.map 方法。

import { Readable } from 'node:stream';
import { readdir, stat } from 'node:fs/promises';
import { join } from 'node:path';

const directoryPath = './src';
const filesInDir = await readdir(directoryPath);

const folderSize = await Readable.from(filesInDir)
  .map((file) => stat(join(directoryPath, file)), { concurrency: 2 })
  .reduce((totalSize, { size }) => totalSize + size, 0);

console.log(folderSize); 

雙工和轉換串流#

類別:stream.Duplex#

雙工串流是同時實作 ReadableWritable 介面的串流。

Duplex 串流的範例包括

duplex.allowHalfOpen#

如果為 false,則串流會在可讀取側結束時自動結束可寫入側。最初由 allowHalfOpen 建構函式選項設定,預設為 true

這可以手動變更以變更現有 Duplex 串流執行個體的半開行為,但必須在發出 'end' 事件之前變更。

類別:stream.Transform#

轉換串流是 Duplex 串流,其中輸出在某種程度上與輸入相關。與所有 Duplex 串流一樣,Transform 串流實作 ReadableWritable 介面。

Transform 串流的範例包括

transform.destroy([error])#

銷毀串流,並選擇性地發出 'error' 事件。呼叫此函式後,轉換串流會釋放任何內部資源。實作人員不應覆寫此方法,而應實作 readable._destroy()Transform_destroy() 預設實作也會發出 'close',除非 emitClose 設為 false。

呼叫 destroy() 後,任何進一步的呼叫都將成為空操作,且不會發出 'error',除非來自 _destroy()

stream.finished(stream[, options], callback)#

  • stream <Stream> | <ReadableStream> | <WritableStream> 可讀和/或可寫串流/網路串流。
  • options <Object>
    • error <boolean> 如果設為 false,則呼叫 emit('error', err) 不會被視為已完成。預設:true
    • readable <boolean> 當設為 false 時,即使串流可能仍可讀取,也會在串流結束時呼叫回呼函式。預設:true
    • writable <boolean> 當設為 false 時,即使串流可能仍可寫入,也會在串流結束時呼叫回呼函式。預設:true
    • signal <AbortSignal> 允許中斷等待串流完成。如果訊號中斷,底層串流不會中斷。回呼函式將會使用 AbortError 呼叫。此函式新增的所有已註冊監聽器也會被移除。
    • cleanup <boolean> 移除所有已註冊的串流監聽器。預設:false
  • callback <Function> 回呼函式,會接收一個選用的錯誤引數。
  • 傳回:<Function> 清除函式,會移除所有已註冊的監聽器。

當串流不再可讀、可寫或發生錯誤或過早關閉事件時,會收到通知的函式。

const { finished } = require('node:stream');
const fs = require('node:fs');

const rs = fs.createReadStream('archive.tar');

finished(rs, (err) => {
  if (err) {
    console.error('Stream failed.', err);
  } else {
    console.log('Stream is done reading.');
  }
});

rs.resume(); // Drain the stream. 

特別適用於串流過早中斷(例如中止的 HTTP 要求)的錯誤處理場景,而且不會發出 'end''finish'

finished API 提供 promise 版本

stream.finished() 在呼叫 callback 之後,會留下未完成的事件監聽器(特別是 'error''end''finish''close')。這樣做的原因是,避免意外的 'error' 事件(由於串流實作不正確)導致意外的崩潰。如果這是你不想要的行為,則需要在 callback 中呼叫傳回的清除函式

const cleanup = finished(rs, (err) => {
  cleanup();
  // ...
}); 

stream.pipeline(source[, ...transforms], destination, callback)#

stream.pipeline(streams, callback)#

一個模組方法,用於在串流和產生器之間建立管道,轉送錯誤、適當清理,並在管道完成時提供一個回呼。

const { pipeline } = require('node:stream');
const fs = require('node:fs');
const zlib = require('node:zlib');

// Use the pipeline API to easily pipe a series of streams
// together and get notified when the pipeline is fully done.

// A pipeline to gzip a potentially huge tar file efficiently:

pipeline(
  fs.createReadStream('archive.tar'),
  zlib.createGzip(),
  fs.createWriteStream('archive.tar.gz'),
  (err) => {
    if (err) {
      console.error('Pipeline failed.', err);
    } else {
      console.log('Pipeline succeeded.');
    }
  },
); 

pipeline API 提供了 promise 版本

stream.pipeline() 會對所有串流呼叫 stream.destroy(err),但下列例外:

  • 已發出 'end''close'Readable 串流。
  • 已發出 'finish''close'Writable 串流。

在呼叫 callback 之後,stream.pipeline() 會在串流上留下懸空的事件監聽器。如果在失敗後重新使用串流,這可能會導致事件監聽器外洩和錯誤被吞掉。如果最後一個串流是可讀取的,則會移除懸空的事件監聽器,以便稍後可以消耗最後一個串流。

當發生錯誤時,stream.pipeline() 會關閉所有串流。將 IncomingRequestpipeline 搭配使用可能會導致意外行為,因為它會在不傳送預期回應的情況下關閉 socket。請參閱以下範例

const fs = require('node:fs');
const http = require('node:http');
const { pipeline } = require('node:stream');

const server = http.createServer((req, res) => {
  const fileStream = fs.createReadStream('./fileNotExist.txt');
  pipeline(fileStream, res, (err) => {
    if (err) {
      console.log(err); // No such file
      // this message can't be sent once `pipeline` already destroyed the socket
      return res.end('error!!!');
    }
  });
}); 

stream.compose(...streams)#

穩定性:1 - stream.compose 為實驗性質。

將兩個或多個串流結合為一個 Duplex 串流,寫入第一個串流並從最後一個串流讀取。每個提供的串流都會透過 stream.pipeline 管道傳輸到下一個串流。如果任何串流發生錯誤,則所有串流都會被銷毀,包括外部的 Duplex 串流。

由於 stream.compose 會傳回一個新的串流,而這個串流又可以(而且應該)被管道傳輸到其他串流,因此它支援組合。相較之下,當將串流傳遞給 stream.pipeline 時,通常第一個串流是可讀取串流,最後一個串流是可寫入串流,形成一個封閉電路。

如果傳遞的是 Function,則它必須是一個採用 source Iterable 的工廠方法。

import { compose, Transform } from 'node:stream';

const removeSpaces = new Transform({
  transform(chunk, encoding, callback) {
    callback(null, String(chunk).replace(' ', ''));
  },
});

async function* toUpper(source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase();
  }
}

let res = '';
for await (const buf of compose(removeSpaces, toUpper).end('hello world')) {
  res += buf;
}

console.log(res); // prints 'HELLOWORLD' 

stream.compose 可用於將非同步可迭代、產生器和函式轉換為串流。

  • AsyncIterable 轉換為可讀取的 Duplex。不能產生 null
  • AsyncGeneratorFunction 轉換為可讀取/可寫入的轉換 Duplex。必須將來源 AsyncIterable 作為第一個參數。不能產生 null
  • AsyncFunction 轉換為可寫入的 Duplex。必須傳回 nullundefined
import { compose } from 'node:stream';
import { finished } from 'node:stream/promises';

// Convert AsyncIterable into readable Duplex.
const s1 = compose(async function*() {
  yield 'Hello';
  yield 'World';
}());

// Convert AsyncGenerator into transform Duplex.
const s2 = compose(async function*(source) {
  for await (const chunk of source) {
    yield String(chunk).toUpperCase();
  }
});

let res = '';

// Convert AsyncFunction into writable Duplex.
const s3 = compose(async function(source) {
  for await (const chunk of source) {
    res += chunk;
  }
});

await finished(compose(s1, s2, s3));

console.log(res); // prints 'HELLOWORLD' 

請參閱 readable.compose(stream) 以了解作為運算子的 stream.compose

stream.Readable.from(iterable[, options])#

  • iterable <Iterable> 實作 `Symbol.asyncIterator` 或 `Symbol.iterator` iterable 協定的物件。如果傳入 null 值,會發出一個「error」事件。
  • options <Object> 提供給 new stream.Readable([options]) 的選項。預設情況下,Readable.from() 會將 options.objectMode 設為 true,除非明確選擇退出,方法是將 options.objectMode 設為 false
  • 傳回:<stream.Readable>

一個用於從迭代器建立可讀串流的實用方法。

const { Readable } = require('node:stream');

async function * generate() {
  yield 'hello';
  yield 'streams';
}

const readable = Readable.from(generate());

readable.on('data', (chunk) => {
  console.log(chunk);
}); 

呼叫 Readable.from(string)Readable.from(buffer) 時,出於效能考量,不會對字串或緩衝區進行迭代以符合其他串流語意。

如果傳入包含承諾的 Iterable 物件作為引數,可能會導致未處理的拒絕。

const { Readable } = require('node:stream');

Readable.from([
  new Promise((resolve) => setTimeout(resolve('1'), 1500)),
  new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Unhandled rejection
]); 

stream.Readable.fromWeb(readableStream[, options])#

穩定性:1 - 實驗性

stream.Readable.isDisturbed(stream)#

穩定性:1 - 實驗性

傳回串流是否已讀取或已取消。

stream.isErrored(stream)#

穩定性:1 - 實驗性

傳回串流是否遭遇錯誤。

stream.isReadable(stream)#

穩定性:1 - 實驗性

傳回串流是否可讀取。

stream.Readable.toWeb(streamReadable[, options])#

穩定性:1 - 實驗性

  • streamReadable <stream.Readable>
  • options <Object>
    • strategy <Object>
      • highWaterMark <number> 在從給定的 stream.Readable 讀取時,應用反壓之前 (所建立的 ReadableStream) 的最大內部佇列大小。如果未提供值,它將從給定的 stream.Readable 取得。
      • size <Function> 給定資料區塊大小的函式。如果未提供值,所有區塊的大小都將為 1
  • 傳回:<ReadableStream>

stream.Writable.fromWeb(writableStream[, options])#

穩定性:1 - 實驗性

stream.Writable.toWeb(streamWritable)#

穩定性:1 - 實驗性

stream.Duplex.from(src)#

建立雙工串流的實用程式方法。

  • Stream 會將可寫入串流轉換成可寫入的 Duplex,並將可讀取串流轉換成 Duplex
  • Blob 會轉換成可讀取的 Duplex
  • 字串轉換成可讀取的雙工
  • 陣列緩衝區轉換成可讀取的雙工
  • AsyncIterable 轉換為可讀取的 Duplex。不能產生 null
  • AsyncGeneratorFunction 轉換為可讀取/可寫入的轉換 Duplex。必須將來源 AsyncIterable 作為第一個參數。不能產生 null
  • 非同步函式轉換成可寫入的雙工。必須傳回nullundefined
  • 物件 ({ 可寫入, 可讀取 })可讀取可寫入轉換成串流,然後將它們合併成雙工,其中雙工會寫入到可寫入並從可讀取中讀取。
  • 承諾轉換成可讀取的雙工。值null會被忽略。
  • 可讀取串流轉換成可讀取的雙工
  • 可寫入串流轉換成可寫入的雙工
  • 傳回:<stream.Duplex>

如果傳入包含承諾的 Iterable 物件作為引數,可能會導致未處理的拒絕。

const { Duplex } = require('node:stream');

Duplex.from([
  new Promise((resolve) => setTimeout(resolve('1'), 1500)),
  new Promise((_, reject) => setTimeout(reject(new Error('2')), 1000)), // Unhandled rejection
]); 

stream.Duplex.fromWeb(pair[, options])#

穩定性:1 - 實驗性

import { Duplex } from 'node:stream';
import {
  ReadableStream,
  WritableStream,
} from 'node:stream/web';

const readable = new ReadableStream({
  start(controller) {
    controller.enqueue('world');
  },
});

const writable = new WritableStream({
  write(chunk) {
    console.log('writable', chunk);
  },
});

const pair = {
  readable,
  writable,
};
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true });

duplex.write('hello');

for await (const chunk of duplex) {
  console.log('readable', chunk);
}const { Duplex } = require('node:stream');
const {
  ReadableStream,
  WritableStream,
} = require('node:stream/web');

const readable = new ReadableStream({
  start(controller) {
    controller.enqueue('world');
  },
});

const writable = new WritableStream({
  write(chunk) {
    console.log('writable', chunk);
  },
});

const pair = {
  readable,
  writable,
};
const duplex = Duplex.fromWeb(pair, { encoding: 'utf8', objectMode: true });

duplex.write('hello');
duplex.once('readable', () => console.log('readable', duplex.read()));

stream.Duplex.toWeb(streamDuplex)#

穩定性:1 - 實驗性

import { Duplex } from 'node:stream';

const duplex = Duplex({
  objectMode: true,
  read() {
    this.push('world');
    this.push(null);
  },
  write(chunk, encoding, callback) {
    console.log('writable', chunk);
    callback();
  },
});

const { readable, writable } = Duplex.toWeb(duplex);
writable.getWriter().write('hello');

const { value } = await readable.getReader().read();
console.log('readable', value);const { Duplex } = require('node:stream');

const duplex = Duplex({
  objectMode: true,
  read() {
    this.push('world');
    this.push(null);
  },
  write(chunk, encoding, callback) {
    console.log('writable', chunk);
    callback();
  },
});

const { readable, writable } = Duplex.toWeb(duplex);
writable.getWriter().write('hello');

readable.getReader().read().then((result) => {
  console.log('readable', result.value);
});

stream.addAbortSignal(signal, stream)#

將 AbortSignal 附加到可讀或可寫串流。這讓程式碼可以使用 AbortController 控制串流毀損。

呼叫傳遞的 AbortSignal 對應的 AbortController 上的 abort 會產生與呼叫串流上的 .destroy(new AbortError()) 相同的行為,而 controller.error(new AbortError()) 則用於 Web 串流。

const fs = require('node:fs');

const controller = new AbortController();
const read = addAbortSignal(
  controller.signal,
  fs.createReadStream(('object.json')),
);
// Later, abort the operation closing the stream
controller.abort(); 

或使用具有可讀串流的 AbortSignal 作為非同步可迭代物件

const controller = new AbortController();
setTimeout(() => controller.abort(), 10_000); // set a timeout
const stream = addAbortSignal(
  controller.signal,
  fs.createReadStream(('object.json')),
);
(async () => {
  try {
    for await (const chunk of stream) {
      await process(chunk);
    }
  } catch (e) {
    if (e.name === 'AbortError') {
      // The operation was cancelled
    } else {
      throw e;
    }
  }
})(); 

或使用具有 ReadableStream 的 AbortSignal

const controller = new AbortController();
const rs = new ReadableStream({
  start(controller) {
    controller.enqueue('hello');
    controller.enqueue('world');
    controller.close();
  },
});

addAbortSignal(controller.signal, rs);

finished(rs, (err) => {
  if (err) {
    if (err.name === 'AbortError') {
      // The operation was cancelled
    }
  }
});

const reader = rs.getReader();

reader.read().then(({ value, done }) => {
  console.log(value); // hello
  console.log(done); // false
  controller.abort();
}); 

stream.getDefaultHighWaterMark(objectMode)#

傳回串流使用的預設 highWaterMark。預設為 16384 (16 KiB),或 objectMode16

stream.setDefaultHighWaterMark(objectMode, value)#

設定串流使用的預設 highWaterMark。

串流實作者的 API#

node:stream 模組 API 的設計目的是讓你可以輕鬆使用 JavaScript 的原型繼承模型來實作串流。

首先,串流開發人員會宣告一個新的 JavaScript 類別,延伸四個基本串流類別之一 (stream.Writablestream.Readablestream.Duplexstream.Transform),確保他們呼叫適當的父類別建構函式

const { Writable } = require('node:stream');

class MyWritable extends Writable {
  constructor({ highWaterMark, ...options }) {
    super({ highWaterMark });
    // ...
  }
} 

在延伸串流時,請記住使用者可以在將這些選項轉發到基礎建構函數之前提供和應該提供的選項。例如,如果實作對 autoDestroyemitClose 選項做出假設,請不要允許使用者覆寫這些選項。明確指出轉發的選項,而不是隱式轉發所有選項。

然後,新的串流類別必須實作一個或多個特定方法,具體取決於要建立的串流類型,如下表所述

使用案例類別要實作的方法
僅讀取Readable_read()
僅寫入Writable_write()_writev()_final()
讀取和寫入Duplex_read()_write()_writev()_final()
對寫入的資料進行操作,然後讀取結果Transform_transform()_flush()_final()

串流的實作程式碼絕不可呼叫串流的「公開」方法,這些方法供使用者使用(如 串流使用者的 API 部分所述)。這樣做可能會對使用串流的應用程式程式碼造成不良的副作用。

避免覆寫公開方法,例如 write()end()cork()uncork()read()destroy(),或透過 .emit() 發出內部事件,例如 'error''data''end''finish''close'。這樣做可能會破壞目前和未來的串流不變式,導致與其他串流、串流公用程式和使用者預期的行為和/或相容性問題。

簡化的建構#

對於許多簡單案例,可以建立串流而不需要依賴繼承。這可以透過直接建立 `stream.Writable`、`stream.Readable`、`stream.Duplex` 或 `stream.Transform` 物件的實例,並將適當的方法傳遞為建構函式選項來達成。

const { Writable } = require('node:stream');

const myWritable = new Writable({
  construct(callback) {
    // Initialize state and load resources...
  },
  write(chunk, encoding, callback) {
    // ...
  },
  destroy() {
    // Free resources...
  },
}); 

實作可寫入串流#

stream.Writable 類別延伸來實作 Writable 串流。

自訂的 Writable 串流必須呼叫 new stream.Writable([options]) 建構函式,並實作 writable._write() 和/或 writable._writev() 方法。

new stream.Writable([options])#
const { Writable } = require('node:stream');

class MyWritable extends Writable {
  constructor(options) {
    // Calls the stream.Writable() constructor.
    super(options);
    // ...
  }
} 

或者,在使用舊版 ES6 風格建構函式時

const { Writable } = require('node:stream');
const util = require('node:util');

function MyWritable(options) {
  if (!(this instanceof MyWritable))
    return new MyWritable(options);
  Writable.call(this, options);
}
util.inherits(MyWritable, Writable); 

或者,使用簡化的建構函式方法

const { Writable } = require('node:stream');

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  },
}); 

對應傳遞的 AbortSignal 呼叫 AbortController 上的 abort,其行為與在可寫入串流上呼叫 .destroy(new AbortError()) 相同。

const { Writable } = require('node:stream');

const controller = new AbortController();
const myWritable = new Writable({
  write(chunk, encoding, callback) {
    // ...
  },
  writev(chunks, callback) {
    // ...
  },
  signal: controller.signal,
});
// Later, abort the operation closing the stream
controller.abort(); 
writable._construct(callback)#
  • callback <Function> 當串流初始化完成時,呼叫此函式(可選擇傳遞錯誤引數)。

_construct() 方法不可直接呼叫。它可以由子類別實作,如果實作,則僅由內部 Writable 類別方法呼叫。

此選用函式會在串流建構函式傳回後的一個時間刻度中呼叫,延後任何 _write()_final()_destroy() 呼叫,直到呼叫 callback 為止。這有助於在串流可以使用之前初始化狀態或非同步初始化資源。

const { Writable } = require('node:stream');
const fs = require('node:fs');

class WriteStream extends Writable {
  constructor(filename) {
    super();
    this.filename = filename;
    this.fd = null;
  }
  _construct(callback) {
    fs.open(this.filename, (err, fd) => {
      if (err) {
        callback(err);
      } else {
        this.fd = fd;
        callback();
      }
    });
  }
  _write(chunk, encoding, callback) {
    fs.write(this.fd, chunk, callback);
  }
  _destroy(err, callback) {
    if (this.fd) {
      fs.close(this.fd, (er) => callback(er || err));
    } else {
      callback(err);
    }
  }
} 
writable._write(chunk, encoding, callback)#
  • chunk <Buffer> | <string> | <any> 要寫入的 Buffer,從傳遞給 stream.write()string 轉換而來。如果串流的 decodeStrings 選項為 false 或串流以物件模式運作,則不會轉換區塊,且會傳遞給 stream.write() 的任何內容。
  • encoding <string> 如果區塊為字串,則 encoding 是該字串的字元編碼。如果區塊為 Buffer,或如果串流以物件模式運作,則可以略過 encoding
  • callback <Function> 當已完成提供區塊的處理時,呼叫此函式(可選擇傳遞錯誤引數)。

所有 Writable 串流實作都必須提供 writable._write() 和/或 writable._writev() 方法,才能將資料傳送至底層資源。

Transform 串流提供 writable._write() 的自訂實作。

此函式不得由應用程式程式碼直接呼叫。它應由子類別實作,並僅由內部 Writable 類別方法呼叫。

callback 函式必須在 writable._write() 內部同步呼叫,或非同步(即不同的時間)呼叫,以表示寫入已成功完成或因錯誤而失敗。傳遞給 callback 的第一個引數必須是呼叫失敗時的 Error 物件,或寫入成功時的 null

在呼叫 writable._write()callback 之間發生的所有對 writable.write() 的呼叫,都會導致寫入資料被暫存。當呼叫 callback 時,串流可能會發出 'drain' 事件。如果串流實作能夠一次處理多個資料區塊,則應實作 writable._writev() 方法。

如果在建構函式選項中將 decodeStrings 屬性明確設定為 false,則 chunk 將保持傳遞給 .write() 的相同物件,並且可能是字串而非 Buffer。這是為了支援對某些字串資料編碼具有最佳化處理的實作。在這種情況下,encoding 引數將表示字串的字元編碼。否則,可以安全地忽略 encoding 引數。

writable._write() 方法加上底線前綴,是因為它是定義它的類別的內部方法,且不應由使用者程式直接呼叫。

writable._writev(chunks, callback)#
  • chunks <Object[]> 要寫入的資料。值是一個 <Object> 陣列,每個陣列代表要寫入的離散資料區塊。這些物件的屬性為
    • chunk <Buffer> | <string> 包含要寫入資料的緩衝區執行個體或字串。如果 Writable 是使用將 decodeStrings 選項設定為 false 所建立,且字串傳遞給 write(),則 chunk 會是字串。
    • encoding <string> chunk 的字元編碼。如果 chunkBuffer,則 encoding 會是 'buffer'
  • callback <Function> 當已提供區塊處理完成時,要呼叫的回呼函式(選擇性地帶有錯誤引數)。

此函式不得由應用程式程式碼直接呼叫。它應由子類別實作,並僅由內部 Writable 類別方法呼叫。

writable._writev() 方法可以在串流實作中,除了 writable._write() 之外,或者作為替代方法實作,這些串流實作能夠一次處理多個資料區塊。如果已實作,且有來自先前寫入的緩衝資料,則會呼叫 _writev(),而不是 _write()

writable._writev() 方法之前加上底線,因為它是定義它的類別的內部方法,且不應由使用者程式直接呼叫。

writable._destroy(err, callback)#
  • err <Error> 可能的錯誤。
  • callback <Function> 回呼函式,會接收一個選用的錯誤引數。

_destroy() 方法由 writable.destroy() 呼叫。子類別可以覆寫它,但不得直接呼叫它。此外,一旦在承諾解決時執行,callback 不應與 async/await 混合使用。

writable._final(callback)#
  • callback <Function> 在完成寫入任何剩餘資料時呼叫此函式(選擇性地加上一個錯誤引數)。

不得直接呼叫 _final() 方法。子類別可能會實作它,如果是這樣,它將僅由內部 Writable 類別方法呼叫。

此選擇性函式將在串流關閉之前呼叫,延遲 'finish' 事件直到呼叫 callback。這對於在串流結束之前關閉資源或寫入緩衝資料很有用。

寫入時發生的錯誤#

在處理 writable._write()writable._writev()writable._final() 方法時發生的錯誤,必須透過呼叫 callback 並將錯誤傳遞為第一個引數來傳播。從這些方法中擲出 Error 或手動發出 'error' 事件會導致未定義的行為。

如果 Readable 串流在 Writable 發出錯誤時將其串接到 Writable 串流,則 Readable 串流將會解除串接。

const { Writable } = require('node:stream');

const myWritable = new Writable({
  write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'));
    } else {
      callback();
    }
  },
}); 
範例可寫入串流#

以下說明一個相當簡化的(且有點無意義的)自訂 Writable 串流實作。雖然這個特定的 Writable 串流實例沒有任何真正的特定用途,但這個範例說明了自訂 Writable 串流實例的每個必要元素

const { Writable } = require('node:stream');

class MyWritable extends Writable {
  _write(chunk, encoding, callback) {
    if (chunk.toString().indexOf('a') >= 0) {
      callback(new Error('chunk is invalid'));
    } else {
      callback();
    }
  }
} 
在可寫入串流中解碼緩衝區#

解碼緩衝區是一項常見的任務,例如,在使用輸入為字串的轉換器時。當使用多位元組字元編碼(例如 UTF-8)時,這不是一個簡單的過程。以下範例說明如何使用 StringDecoderWritable 解碼多位元組字串。

const { Writable } = require('node:stream');
const { StringDecoder } = require('node:string_decoder');

class StringWritable extends Writable {
  constructor(options) {
    super(options);
    this._decoder = new StringDecoder(options && options.defaultEncoding);
    this.data = '';
  }
  _write(chunk, encoding, callback) {
    if (encoding === 'buffer') {
      chunk = this._decoder.write(chunk);
    }
    this.data += chunk;
    callback();
  }
  _final(callback) {
    this.data += this._decoder.end();
    callback();
  }
}

const euro = [[0xE2, 0x82], [0xAC]].map(Buffer.from);
const w = new StringWritable();

w.write('currency: ');
w.write(euro[0]);
w.end(euro[1]);

console.log(w.data); // currency: € 

實作可讀取串流#

stream.Readable 類別會延伸以實作 Readable 串流。

自訂 Readable 串流必須呼叫 new stream.Readable([options]) 建構函式,並實作 readable._read() 方法。

new stream.Readable([options])#
  • options <Object>
    • highWaterMark <number> 在停止從底層資源讀取之前,儲存在內部緩衝區中的 位元組數 的最大值。預設:16384(16 KiB),或 16(針對 objectMode 串流)。
    • encoding <string> 如果已指定,則會使用指定的編碼將緩衝區解碼為字串。預設:null
    • objectMode <boolean> 這個串流是否應表現為物件串流。意思是 stream.read(n) 會傳回單一值,而不是大小為 nBuffer預設:false
    • emitClose <布林值> 串流在被銷毀後是否應發出 'close'預設值:true
    • read <Function> stream._read() 方法的實作。
    • destroy <函數> stream._destroy() 方法的實作。
    • construct <函數> stream._construct() 方法的實作。
    • autoDestroy <布林值> 這個串流是否應在結束後自動對自己呼叫 .destroy()預設值:true
    • signal <中斷訊號> 代表可能取消的訊號。
const { Readable } = require('node:stream');

class MyReadable extends Readable {
  constructor(options) {
    // Calls the stream.Readable(options) constructor.
    super(options);
    // ...
  }
} 

或者,在使用舊版 ES6 風格建構函式時

const { Readable } = require('node:stream');
const util = require('node:util');

function MyReadable(options) {
  if (!(this instanceof MyReadable))
    return new MyReadable(options);
  Readable.call(this, options);
}
util.inherits(MyReadable, Readable); 

或者,使用簡化的建構函式方法

const { Readable } = require('node:stream');

const myReadable = new Readable({
  read(size) {
    // ...
  },
}); 

對應傳入 AbortSignalAbortController 呼叫 abort 的行為,會與對建立的 readable 呼叫 .destroy(new AbortError()) 相同。

const { Readable } = require('node:stream');
const controller = new AbortController();
const read = new Readable({
  read(size) {
    // ...
  },
  signal: controller.signal,
});
// Later, abort the operation closing the stream
controller.abort(); 
readable._construct(callback)#
  • callback <Function> 當串流初始化完成時,呼叫此函式(可選擇傳遞錯誤引數)。

_construct() 方法不可直接呼叫。它可以由子類別實作,如果實作,則僅會由內部 Readable 類別方法呼叫。

此選用函數會由串流建構函式在下一刻度排程,延後所有 _read()_destroy() 呼叫,直到呼叫 callback 為止。這對於在串流可以使用之前初始化狀態或非同步初始化資源很有用。

const { Readable } = require('node:stream');
const fs = require('node:fs');

class ReadStream extends Readable {
  constructor(filename) {
    super();
    this.filename = filename;
    this.fd = null;
  }
  _construct(callback) {
    fs.open(this.filename, (err, fd) => {
      if (err) {
        callback(err);
      } else {
        this.fd = fd;
        callback();
      }
    });
  }
  _read(n) {
    const buf = Buffer.alloc(n);
    fs.read(this.fd, buf, 0, n, null, (err, bytesRead) => {
      if (err) {
        this.destroy(err);
      } else {
        this.push(bytesRead > 0 ? buf.slice(0, bytesRead) : null);
      }
    });
  }
  _destroy(err, callback) {
    if (this.fd) {
      fs.close(this.fd, (er) => callback(er || err));
    } else {
      callback(err);
    }
  }
} 
readable._read(size)#
  • size <數字> 非同步讀取的位元組數

此函數不可由應用程式碼直接呼叫。它應該由子類別實作,並且僅由內部 Readable 類別方法呼叫。

所有 Readable 串流實作都必須提供 readable._read() 方法的實作,才能從基礎資源擷取資料。

當呼叫 readable._read() 時,如果資源中有資料可用,實作應使用 this.push(dataChunk) 方法開始將資料推入讀取佇列。每次呼叫 this.push(dataChunk) 後,當串流準備接受更多資料時,會再次呼叫 _read()_read() 可以繼續從資源讀取並推入資料,直到 readable.push() 傳回 false 為止。只有在 _read() 在停止後再次呼叫時,它才會繼續將額外資料推入佇列。

一旦呼叫 readable._read() 方法,直到透過 readable.push() 方法傳入更多資料,才會再次呼叫該方法。空資料(例如空緩衝區和字串)不會導致呼叫 readable._read()

size 參數是建議值。對於「讀取」為傳回資料的單一操作的實作,可以使用 size 參數來決定要擷取多少資料。其他實作可能會忽略此參數,並在資料可用時提供資料。在呼叫 stream.push(chunk) 之前,不需要「等待」直到有 size 位元組可用。

readable._read() 方法加上底線開頭,表示它是定義它的類別內部的方法,且使用者程式不應直接呼叫。

readable._destroy(err, callback)#
  • err <Error> 可能的錯誤。
  • callback <Function> 回呼函式,會接收一個選用的錯誤引數。

_destroy() 方法由 readable.destroy() 呼叫。它可以被子類別覆寫,但不得直接呼叫。

readable.push(chunk[, encoding])#
  • chunk <Buffer> | <Uint8Array> | <string> | <null> | <any> 要推入讀取佇列的資料區塊。對於未以物件模式運作的串流,chunk 必須是字串、BufferUint8Array。對於物件模式串流,chunk 可以是任何 JavaScript 值。
  • encoding <string> 字串區塊的編碼。必須是有效的 Buffer 編碼,例如 'utf8''ascii'
  • 傳回:<boolean> 如果可以繼續推入其他資料區塊,則為 true;否則為 false

chunkBufferUint8Arraystring 時,資料的 chunk 將會加入到內部佇列,供串流使用者使用。將 chunk 傳遞為 null 表示串流結束 (EOF),之後無法再寫入更多資料。

Readable 以暫停模式運作時,使用 readable.push() 加入的資料可以在發出 'readable' 事件時,透過呼叫 readable.read() 方法讀取。

Readable 以流動模式運作時,使用 readable.push() 加入的資料將會透過發出 'data' 事件傳遞。

readable.push() 方法被設計成盡可能靈活。例如,當封裝提供某種暫停/繼續機制和資料回呼的較低層級來源時,低層級來源可以透過自訂 Readable 實例封裝

// `_source` is an object with readStop() and readStart() methods,
// and an `ondata` member that gets called when it has data, and
// an `onend` member that gets called when the data is over.

class SourceWrapper extends Readable {
  constructor(options) {
    super(options);

    this._source = getLowLevelSourceObject();

    // Every time there's data, push it into the internal buffer.
    this._source.ondata = (chunk) => {
      // If push() returns false, then stop reading from source.
      if (!this.push(chunk))
        this._source.readStop();
    };

    // When the source ends, push the EOF-signaling `null` chunk.
    this._source.onend = () => {
      this.push(null);
    };
  }
  // _read() will be called when the stream wants to pull more data in.
  // The advisory size argument is ignored in this case.
  _read(size) {
    this._source.readStart();
  }
} 

readable.push() 方法用於將內容推入內部緩衝區。它可以由 readable._read() 方法驅動。

對於未以物件模式運作的串流,如果 readable.push()chunk 參數為 undefined,它將被視為空字串或緩衝區。請參閱 readable.push('') 以取得更多資訊。

讀取時發生的錯誤#

處理 readable._read() 時發生的錯誤必須透過 readable.destroy(err) 方法傳播。從 readable._read() 內部擲出 Error 或手動發出 'error' 事件會導致未定義的行為。

const { Readable } = require('node:stream');

const myReadable = new Readable({
  read(size) {
    const err = checkSomeErrorCondition();
    if (err) {
      this.destroy(err);
    } else {
      // Do some work.
    }
  },
}); 
一個範例計數串流#

以下是一個基礎的 Readable 串流範例,會以遞增順序發出從 1 到 1,000,000 的數字,然後結束。

const { Readable } = require('node:stream');

class Counter extends Readable {
  constructor(opt) {
    super(opt);
    this._max = 1000000;
    this._index = 1;
  }

  _read() {
    const i = this._index++;
    if (i > this._max)
      this.push(null);
    else {
      const str = String(i);
      const buf = Buffer.from(str, 'ascii');
      this.push(buf);
    }
  }
} 

實作雙工串流#

一個 Duplex 串流同時實作了 ReadableWritable,例如 TCP socket 連線。

由於 JavaScript 不支援多重繼承,因此 stream.Duplex 類別會被擴充套件來實作 Duplex 串流(而不是擴充套件 stream.Readable stream.Writable 類別)。

stream.Duplex 類別會從 stream.Readable 原型繼承,並從 stream.Writable 寄生繼承,但由於在 stream.Writable 上覆寫 Symbol.hasInstance,因此 instanceof 會對兩個基礎類別都正確運作。

自訂的 Duplex 串流必須呼叫 new stream.Duplex([options]) 建構函式,並實作 readable._read()writable._write() 方法。

new stream.Duplex(options)#
  • options <Object> 傳遞給 WritableReadable 建構函式。還有以下欄位
    • allowHalfOpen <布林值> 如果設為 false,則當可讀取端結束時,串流會自動結束可寫入端。預設值:true
    • readable <布林值> 設定 Duplex 是否可讀取。預設值:true
    • writable <布林值> 設定 Duplex 是否可寫入。預設值:true
    • readableObjectMode <布林值> 設定串流可讀取端的 objectMode。如果 objectModetrue,則不會產生作用。預設值:false
    • writableObjectMode <布林值> 設定串流可寫入端的 objectMode。如果 objectModetrue,則不會產生作用。預設值:false
    • readableHighWaterMark <數字> 設定串流可讀取端的 highWaterMark。如果提供 highWaterMark,則不會產生作用。
    • writableHighWaterMark <數字> 設定串流可寫入端的 highWaterMark。如果提供 highWaterMark,則不會產生作用。
const { Duplex } = require('node:stream');

class MyDuplex extends Duplex {
  constructor(options) {
    super(options);
    // ...
  }
} 

或者,在使用舊版 ES6 風格建構函式時

const { Duplex } = require('node:stream');
const util = require('node:util');

function MyDuplex(options) {
  if (!(this instanceof MyDuplex))
    return new MyDuplex(options);
  Duplex.call(this, options);
}
util.inherits(MyDuplex, Duplex); 

或者,使用簡化的建構函式方法

const { Duplex } = require('node:stream');

const myDuplex = new Duplex({
  read(size) {
    // ...
  },
  write(chunk, encoding, callback) {
    // ...
  },
}); 

使用管道時

const { Transform, pipeline } = require('node:stream');
const fs = require('node:fs');

pipeline(
  fs.createReadStream('object.json')
    .setEncoding('utf8'),
  new Transform({
    decodeStrings: false, // Accept string input rather than Buffers
    construct(callback) {
      this.data = '';
      callback();
    },
    transform(chunk, encoding, callback) {
      this.data += chunk;
      callback();
    },
    flush(callback) {
      try {
        // Make sure is valid json.
        JSON.parse(this.data);
        this.push(this.data);
        callback();
      } catch (err) {
        callback(err);
      }
    },
  }),
  fs.createWriteStream('valid-object.json'),
  (err) => {
    if (err) {
      console.error('failed', err);
    } else {
      console.log('completed');
    }
  },
); 
雙工串流範例#

以下說明一個簡單的 Duplex 串流範例,它包覆一個假設的較低層級來源物件,資料可以寫入其中,也可以從中讀取資料,儘管它使用與 Node.js 串流不相容的 API。以下說明一個簡單的 Duplex 串流範例,它透過 Writable 介面緩衝寫入的資料,然後透過 Readable 介面讀回資料。

const { Duplex } = require('node:stream');
const kSource = Symbol('source');

class MyDuplex extends Duplex {
  constructor(source, options) {
    super(options);
    this[kSource] = source;
  }

  _write(chunk, encoding, callback) {
    // The underlying source only deals with strings.
    if (Buffer.isBuffer(chunk))
      chunk = chunk.toString();
    this[kSource].writeSomeData(chunk);
    callback();
  }

  _read(size) {
    this[kSource].fetchSomeData(size, (data, encoding) => {
      this.push(Buffer.from(data, encoding));
    });
  }
} 

Duplex 串流最重要的面向是,ReadableWritable 端面在單一物件實例中共存,但彼此獨立運作。

物件模式雙工串流#

對於 Duplex 串流,objectMode 可分別使用 readableObjectModewritableObjectMode 選項,獨自設定 ReadableWritable 端面。

例如,在以下範例中,建立一個新的 Transform 串流(一種 Duplex 串流類型),其 Writable 端面為物件模式,接受 JavaScript 數字,並在 Readable 端面轉換為十六進位字串。

const { Transform } = require('node:stream');

// All Transform streams are also Duplex Streams.
const myTransform = new Transform({
  writableObjectMode: true,

  transform(chunk, encoding, callback) {
    // Coerce the chunk to a number if necessary.
    chunk |= 0;

    // Transform the chunk into something else.
    const data = chunk.toString(16);

    // Push the data onto the readable queue.
    callback(null, '0'.repeat(data.length % 2) + data);
  },
});

myTransform.setEncoding('ascii');
myTransform.on('data', (chunk) => console.log(chunk));

myTransform.write(1);
// Prints: 01
myTransform.write(10);
// Prints: 0a
myTransform.write(100);
// Prints: 64 

實作轉換串流#

Transform 串流是一種 Duplex 串流,其輸出以某種方式從輸入計算得出。範例包括壓縮、加密或解密資料的 zlib 串流或 crypto 串流。

輸出不一定要與輸入大小相同、區塊數相同或同時抵達。例如,Hash 串流只會有一個輸出區塊,並在輸入結束時提供。zlib 串流會產生比輸入小或大的輸出。

stream.Transform 類別會延伸以實作 Transform 串流。

stream.Transform 類別會從 stream.Duplex 原型繼承,並實作其自己的 writable._write()readable._read() 方法版本。自訂 Transform 實作必須實作 transform._transform() 方法,也可能實作 transform._flush() 方法。

使用 Transform 串流時務必小心,因為寫入串流的資料可能會導致串流的 Writable 端暫停,如果 Readable 端的輸出未被使用。

new stream.Transform([options])#
const { Transform } = require('node:stream');

class MyTransform extends Transform {
  constructor(options) {
    super(options);
    // ...
  }
} 

或者,在使用舊版 ES6 風格建構函式時

const { Transform } = require('node:stream');
const util = require('node:util');

function MyTransform(options) {
  if (!(this instanceof MyTransform))
    return new MyTransform(options);
  Transform.call(this, options);
}
util.inherits(MyTransform, Transform); 

或者,使用簡化的建構函式方法

const { Transform } = require('node:stream');

const myTransform = new Transform({
  transform(chunk, encoding, callback) {
    // ...
  },
}); 
事件:'end'#

'end' 事件來自 stream.Readable 類別。'end' 事件會在所有資料都已輸出後發出,也就是在 transform._flush() 中的回呼函式被呼叫後。如果發生錯誤,則不應發出 'end'

事件:'finish'#

'finish' 事件來自 stream.Writable 類別。'finish' 事件會在呼叫 stream.end() 且所有區塊都已由 stream._transform() 處理完畢後發出。如果發生錯誤,則不應發出 'finish'

transform._flush(callback)#
  • callback <Function> 當剩餘資料已沖刷完畢時要呼叫的回呼函式(選擇性地帶有錯誤引數和資料)。

此函數不可由應用程式碼直接呼叫。它應該由子類別實作,並且僅由內部 Readable 類別方法呼叫。

在某些情況下,轉換作業可能需要在串流的結尾發出額外的資料位元。例如,zlib 壓縮串流會儲存用於最佳化壓縮輸出的內部狀態量。然而,當串流結束時,需要沖刷該額外資料,以使壓縮資料完整。

自訂 Transform 實作可能會實作 transform._flush() 方法。當沒有更多可供使用的寫入資料,但在 'end' 事件發出訊號表示 Readable 串流結束之前,就會呼叫它。

transform._flush() 實作中,可以適當地呼叫 transform.push() 方法 0 次或多次。當 flush 作業完成時,必須呼叫 callback 函式。

transform._flush() 方法前面加上底線,是因為它是定義它的類別的內部方法,而且使用者程式不應直接呼叫它。

transform._transform(chunk, encoding, callback)#
  • chunk <Buffer> | <string> | <any> 要轉換的 Buffer,從傳遞給 stream.write()string 轉換而來。如果串流的 decodeStrings 選項為 false 或串流以物件模式運作,則不會轉換 chunk,而且會傳遞給 stream.write() 的任何資料。
  • encoding <string> 如果 chunk 是字串,則這是編碼類型。如果 chunk 是 buffer,則這是特殊值 'buffer'。在這種情況下,請忽略它。
  • callback <Function> 在處理提供的 chunk 之後要呼叫的 callback 函式(選擇性地加上錯誤引數和資料)。

此函數不可由應用程式碼直接呼叫。它應該由子類別實作,並且僅由內部 Readable 類別方法呼叫。

所有 Transform 串流實作都必須提供一個 _transform() 方法來接受輸入並產生輸出。transform._transform() 實作會處理寫入的位元組,計算輸出,然後使用 transform.push() 方法將該輸出傳遞給可讀取的部分。

transform.push() 方法可以被呼叫 0 次或多次,以從單一輸入區塊產生輸出,具體取決於要作為區塊結果輸出的內容。

有可能不會從任何給定的輸入資料區塊產生輸出。

callback 函式必須僅在當前區塊完全使用完畢時呼叫。傳遞給 callback 的第一個引數必須是 Error 物件(如果在處理輸入時發生錯誤)或 null(否則)。如果將第二個引數傳遞給 callback,則它將被轉發到 transform.push() 方法,但僅當第一個引數為假值時。換句話說,下列內容是等效的

transform.prototype._transform = function(data, encoding, callback) {
  this.push(data);
  callback();
};

transform.prototype._transform = function(data, encoding, callback) {
  callback(null, data);
}; 

transform._transform() 方法前面加上底線,因為它是定義它的類別的內部方法,並且不應由使用者程式直接呼叫。

transform._transform() 絕不會並行呼叫;串流實作佇列機制,且為了接收下一個區塊,必須同步或非同步呼叫 callback

類別:stream.PassThrough#

stream.PassThrough 類別是一個 Transform 串流的簡單實作,它只是將輸入位元組傳遞到輸出。它的用途主要是用於範例和測試,但是有一些使用案例,其中 stream.PassThrough 可用作新穎串流類型建構區塊。

其他注意事項#

串流與非同步產生器和非同步反覆運算器的相容性#

在 JavaScript 中支援非同步產生器和迭代器後,非同步產生器在這個時間點上有效地成為一級語言層級串流建構。

以下提供使用非同步產生器和非同步迭代器與 Node.js 串流的一些常見互通案例。

使用非同步迭代器使用可讀取串流#
(async function() {
  for await (const chunk of readable) {
    console.log(chunk);
  }
})(); 

非同步迭代器在串流上註冊一個永久錯誤處理常式,以防止任何未處理的毀損後錯誤。

使用非同步產生器建立可讀取串流#

可以使用 Readable.from() 輔助方法,從非同步產生器建立 Node.js 可讀取串流

const { Readable } = require('node:stream');

const ac = new AbortController();
const signal = ac.signal;

async function * generate() {
  yield 'a';
  await someLongRunningFn({ signal });
  yield 'b';
  yield 'c';
}

const readable = Readable.from(generate());
readable.on('close', () => {
  ac.abort();
});

readable.on('data', (chunk) => {
  console.log(chunk);
}); 
從非同步迭代器傳輸到可寫入串流#

從非同步迭代器寫入可寫入串流時,請確保正確處理反壓和錯誤。 stream.pipeline() 抽象化反壓和反壓相關錯誤的處理

const fs = require('node:fs');
const { pipeline } = require('node:stream');
const { pipeline: pipelinePromise } = require('node:stream/promises');

const writable = fs.createWriteStream('./file');

const ac = new AbortController();
const signal = ac.signal;

const iterator = createIterator({ signal });

// Callback Pattern
pipeline(iterator, writable, (err, value) => {
  if (err) {
    console.error(err);
  } else {
    console.log(value, 'value returned');
  }
}).on('close', () => {
  ac.abort();
});

// Promise Pattern
pipelinePromise(iterator, writable)
  .then((value) => {
    console.log(value, 'value returned');
  })
  .catch((err) => {
    console.error(err);
    ac.abort();
  }); 

與舊版 Node.js 版本相容#

在 Node.js 0.10 之前,Readable 串流介面較為簡單,但功能較弱且較不實用。

  • 'data' 事件會立即開始發出,而不是等待呼叫 stream.read() 方法。需要執行一些工作來決定如何處理資料的應用程式,必須將讀取的資料儲存在緩衝區中,以免資料遺失。
  • stream.pause() 方法是建議性的,而不是有保證的。這表示仍有必要準備好接收 'data' 事件,即使串流處於暫停狀態

在 Node.js 0.10 中,新增了 Readable 類別。為了與較舊的 Node.js 程式相容,當新增 'data' 事件處理常式或呼叫 stream.resume() 方法時,Readable 串流會切換到「流動模式」。其結果是,即使不使用新的 stream.read() 方法和 'readable' 事件,也不必再擔心遺失 'data' 區塊。

雖然大多數應用程式仍會正常運作,但在下列情況下會產生臨界案例

  • 未新增 'data' 事件監聽器。
  • 從未呼叫 stream.resume() 方法。
  • 串流未導向任何可寫入目的地。

例如,請考慮下列程式碼

// WARNING!  BROKEN!
net.createServer((socket) => {

  // We add an 'end' listener, but never consume the data.
  socket.on('end', () => {
    // It will never get here.
    socket.end('The message was received but was not processed.\n');
  });

}).listen(1337); 

在 Node.js 0.10 之前,會直接捨棄傳入的訊息資料。然而,在 Node.js 0.10 及更新版本中,socket 會永遠保持暫停狀態。

這種情況的解決方法是呼叫 stream.resume() 方法以開始資料流

// Workaround.
net.createServer((socket) => {
  socket.on('end', () => {
    socket.end('The message was received but was not processed.\n');
  });

  // Start the flow of data, discarding it.
  socket.resume();
}).listen(1337); 

除了新的 Readable 串流會切換到流動模式外,也可以使用 readable.wrap() 方法將 0.10 之前的串流包裝在 Readable 類別中。

readable.read(0)#

有些情況需要觸發底層可讀取串流機制的重新整理,但實際上並未使用任何資料。在這種情況下,可以呼叫 readable.read(0),它將永遠傳回 null

如果內部讀取緩衝區低於 highWaterMark,且串流目前未讀取,則呼叫 stream.read(0) 會觸發低層級 stream._read() 呼叫。

雖然大多數應用程式幾乎不需要執行此操作,但 Node.js 中有某些情況需要執行此操作,特別是在 Readable 串流類別內部。

readable.push('')#

不建議使用 readable.push('')

將零位元組字串、BufferUint8Array 推送到非物件模式的串流會產生有趣的副作用。由於它readable.push() 的呼叫,因此呼叫會結束讀取程序。但是,由於參數是空字串,因此不會將資料新增到可讀取緩衝區,所以使用者沒有任何資料可以消耗。

呼叫 readable.setEncoding() 後的 highWaterMark 差異#

使用 readable.setEncoding() 會改變 highWaterMark 在非物件模式中運作的方式。

通常,會根據位元組將目前緩衝區的大小與 highWaterMark 進行比較。但是,在呼叫 setEncoding() 之後,比較函式會開始根據字元來測量緩衝區的大小。

在使用 latin1ascii 的常見情況下,這不是問題。但在處理可能包含多位元組字元的字串時,建議注意此行為。