Web Streams API#

穩定性:2 - 穩定

實作 WHATWG Streams 標準

概觀#

WHATWG Streams 標準(或「web streams」)定義用於處理串流資料的 API。它類似於 Node.js Streams API,但稍後才出現,並已成為跨多個 JavaScript 環境串流資料的「標準」API。

有三個主要類型的物件

  • ReadableStream - 表示串流資料的來源。
  • WritableStream - 表示串流資料的目的地。
  • TransformStream - 表示轉換串流資料的演算法。

範例 ReadableStream#

此範例建立一個簡單的 ReadableStream,會永遠每秒推播一次目前的 performance.now() 時間戳記。使用非同步可迭代來從串流中讀取資料。

import {
  ReadableStream,
} from 'node:stream/web';

import {
  setInterval as every,
} from 'node:timers/promises';

import {
  performance,
} from 'node:perf_hooks';

const SECOND = 1000;

const stream = new ReadableStream({
  async start(controller) {
    for await (const _ of every(SECOND))
      controller.enqueue(performance.now());
  },
});

for await (const value of stream)
  console.log(value);const {
  ReadableStream,
} = require('node:stream/web');

const {
  setInterval: every,
} = require('node:timers/promises');

const {
  performance,
} = require('node:perf_hooks');

const SECOND = 1000;

const stream = new ReadableStream({
  async start(controller) {
    for await (const _ of every(SECOND))
      controller.enqueue(performance.now());
  },
});

(async () => {
  for await (const value of stream)
    console.log(value);
})();

API#

類別:ReadableStream#

new ReadableStream([underlyingSource [, strategy]])#
  • underlyingSource <Object>
    • start <Function> 在建立 ReadableStream 時立即呼叫的使用者定義函式。
    • pull <Function>ReadableStream 內部佇列未滿時重複呼叫的使用者定義函式。操作可能是同步或非同步。如果是非同步,則在先前傳回的承諾完成之前,不會再次呼叫此函式。
    • cancel <Function> 在取消 ReadableStream 時呼叫的使用者定義函式。
      • reason <any>
      • 傳回:已完成 undefined 的承諾。
    • type <string> 必須是 'bytes'undefined
    • autoAllocateChunkSize <數字> 僅在 type 等於 'bytes' 時使用。當設定為非零值時,檢視緩衝區會自動配置給 ReadableByteStreamController.byobRequest。若未設定,則必須使用串流的內部佇列透過預設讀取器 ReadableStreamDefaultReader 傳輸資料。
  • strategy <物件>
    • highWaterMark <數字> 在套用反壓力之前的最大內部佇列大小。
    • size <函式> 使用者定義的函式,用於識別每個資料區塊的大小。
readableStream.locked#

readableStream.locked 屬性預設為 false,當有活躍的讀取器使用串流的資料時,會切換為 true

readableStream.cancel([reason])#
  • reason <any>
  • 傳回:一旦取消完成,承諾會以 undefined 履行。
readableStream.getReader([options])#
import { ReadableStream } from 'node:stream/web';

const stream = new ReadableStream();

const reader = stream.getReader();

console.log(await reader.read());const { ReadableStream } = require('node:stream/web');

const stream = new ReadableStream();

const reader = stream.getReader();

reader.read().then(console.log);

導致 readableStream.lockedtrue

readableStream.pipeThrough(transform[, options])#
  • transform <Object>
    • readable <ReadableStream> transform.writable 將會將從此 ReadableStream 收到的潛在修改資料推入的 ReadableStream
    • writable <WritableStream>ReadableStream 的資料將會寫入的 WritableStream
  • options <Object>
    • preventAbort <boolean> 當為 true 時,此 ReadableStream 中的錯誤不會導致 transform.writable 中止。
    • preventCancel <boolean> 當為 true 時,目的地 transform.writable 中的錯誤不會導致此 ReadableStream 取消。
    • preventClose <boolean> 當為 true 時,關閉此 ReadableStream 也不會導致 transform.writable 關閉。
    • signal <AbortSignal> 允許使用 <AbortController> 取消資料傳輸。
  • 傳回: <ReadableStream>transform.readable

將此 <ReadableStream> 連接到在 transform 參數中提供的 <ReadableStream><WritableStream> 成對,以便將此 <ReadableStream> 的資料寫入 transform.writable,可能已轉換,然後推送到 transform.readable。組態完管線後,會傳回 transform.readable

在管線作業期間,會將 readableStream.locked 設為 true

import {
  ReadableStream,
  TransformStream,
} from 'node:stream/web';

const stream = new ReadableStream({
  start(controller) {
    controller.enqueue('a');
  },
});

const transform = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  },
});

const transformedStream = stream.pipeThrough(transform);

for await (const chunk of transformedStream)
  console.log(chunk);
  // Prints: Aconst {
  ReadableStream,
  TransformStream,
} = require('node:stream/web');

const stream = new ReadableStream({
  start(controller) {
    controller.enqueue('a');
  },
});

const transform = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  },
});

const transformedStream = stream.pipeThrough(transform);

(async () => {
  for await (const chunk of transformedStream)
    console.log(chunk);
    // Prints: A
})();
readableStream.pipeTo(destination[, options])#
  • destination <WritableStream> <WritableStream>,此 ReadableStream 的資料將寫入其中。
  • options <Object>
    • preventAbort <boolean> 當為 true 時,此 ReadableStream 中的錯誤不會導致 destination 中止。
    • preventCancel <boolean> 當為 true 時,destination 中的錯誤不會導致此 ReadableStream 取消。
    • preventClose <boolean> 當為 true 時,關閉此 ReadableStream 也不會導致 destination 關閉。
    • signal <AbortSignal> 允許使用 <AbortController> 取消資料傳輸。
  • 傳回:已完成且包含 undefined 的 Promise

在管線作業期間,會將 readableStream.locked 設為 true

readableStream.tee()#

傳回一對新的 <ReadableStream> 執行個體,此 ReadableStream 的資料將轉送至其中。每個執行個體都會收到相同的資料。

導致 readableStream.lockedtrue

readableStream.values([options])#

建立並傳回一個非同步迭代器,可用於使用這個 ReadableStream 的資料。

在非同步迭代器作用時,會將 readableStream.locked 設定為 true

import { Buffer } from 'node:buffer';

const stream = new ReadableStream(getSomeSource());

for await (const chunk of stream.values({ preventCancel: true }))
  console.log(Buffer.from(chunk).toString()); 
非同步迭代#

<ReadableStream> 物件使用 for await 語法支援非同步迭代器協定。

import { Buffer } from 'node:buffer';

const stream = new ReadableStream(getSomeSource());

for await (const chunk of stream)
  console.log(Buffer.from(chunk).toString()); 

非同步迭代器會使用 <ReadableStream>,直到它終止。

預設情況下,如果非同步迭代器提早結束(透過 breakreturnthrow),<ReadableStream> 會關閉。若要防止 <ReadableStream> 自動關閉,請使用 readableStream.values() 方法取得非同步迭代器,並將 preventCancel 選項設定為 true

<ReadableStream> 不得鎖定(也就是說,它不得有現有的作用中讀取器)。在非同步迭代期間,<ReadableStream> 會被鎖定。

使用 postMessage() 傳輸#

<ReadableStream> 執行個體可以使用 <MessagePort> 傳輸。

const stream = new ReadableStream(getReadableSourceSomehow());

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => {
  data.getReader().read().then((chunk) => {
    console.log(chunk);
  });
};

port2.postMessage(stream, [stream]); 

ReadableStream.from(iterable)#

  • iterable <Iterable> 物件,實作 Symbol.asyncIteratorSymbol.iterator 迭代協定。

一個實用方法,可從一個迭代建立一個新的 <ReadableStream>

import { ReadableStream } from 'node:stream/web';

async function* asyncIterableGenerator() {
  yield 'a';
  yield 'b';
  yield 'c';
}

const stream = ReadableStream.from(asyncIterableGenerator());

for await (const chunk of stream)
  console.log(chunk); // Prints: 'a', 'b', 'c'const { ReadableStream } = require('node:stream/web');

async function* asyncIterableGenerator() {
  yield 'a';
  yield 'b';
  yield 'c';
}

(async () => {
  const stream = ReadableStream.from(asyncIterableGenerator());

  for await (const chunk of stream)
    console.log(chunk); // Prints: 'a', 'b', 'c'
})();

類別:ReadableStreamDefaultReader#

預設情況下,在沒有參數的情況下呼叫 readableStream.getReader() 會傳回 ReadableStreamDefaultReader 的執行個體。預設讀取器將串流中傳遞的資料區塊視為不透明值,這允許 <ReadableStream> 與任何 JavaScript 值搭配使用。

new ReadableStreamDefaultReader(stream)#

建立新的 <ReadableStreamDefaultReader>,鎖定至指定的 <ReadableStream>

readableStreamDefaultReader.cancel([reason])#
  • reason <any>
  • 傳回:已完成 undefined 的承諾。

取消 <ReadableStream>,並傳回一個承諾,當底層串流已取消時會達成。

readableStreamDefaultReader.closed#
  • 類型:<Promise> 當關聯的 <ReadableStream> 關閉時,以 undefined 達成;如果串流發生錯誤或讀取器的鎖定在串流完成關閉前釋放,則會拒絕。
readableStreamDefaultReader.read()#

要求底層 <ReadableStream> 的下一個資料區塊,並傳回一個承諾,一旦資料可用就會履行。

readableStreamDefaultReader.releaseLock()#

釋放此讀取器對底層 <ReadableStream> 的鎖定。

類別:ReadableStreamBYOBReader#

ReadableStreamBYOBReader 是位元組導向 <ReadableStream>(在建立 ReadableStream 時,其 underlyingSource.type 設定為等於 'bytes')的替代使用者。

BYOB 是「自己帶緩衝區」的縮寫。這是一種模式,允許更有效率地讀取位元組導向資料,避免不必要的複製。

import {
  open,
} from 'node:fs/promises';

import {
  ReadableStream,
} from 'node:stream/web';

import { Buffer } from 'node:buffer';

class Source {
  type = 'bytes';
  autoAllocateChunkSize = 1024;

  async start(controller) {
    this.file = await open(new URL(import.meta.url));
    this.controller = controller;
  }

  async pull(controller) {
    const view = controller.byobRequest?.view;
    const {
      bytesRead,
    } = await this.file.read({
      buffer: view,
      offset: view.byteOffset,
      length: view.byteLength,
    });

    if (bytesRead === 0) {
      await this.file.close();
      this.controller.close();
    }
    controller.byobRequest.respond(bytesRead);
  }
}

const stream = new ReadableStream(new Source());

async function read(stream) {
  const reader = stream.getReader({ mode: 'byob' });

  const chunks = [];
  let result;
  do {
    result = await reader.read(Buffer.alloc(100));
    if (result.value !== undefined)
      chunks.push(Buffer.from(result.value));
  } while (!result.done);

  return Buffer.concat(chunks);
}

const data = await read(stream);
console.log(Buffer.from(data).toString()); 
new ReadableStreamBYOBReader(stream)#

建立新的 ReadableStreamBYOBReader,鎖定至指定的 <ReadableStream>

readableStreamBYOBReader.cancel([reason])#
  • reason <any>
  • 傳回:已完成 undefined 的承諾。

取消 <ReadableStream>,並傳回一個承諾,當底層串流已取消時會達成。

readableStreamBYOBReader.closed#
  • 類型:<Promise> 當關聯的 <ReadableStream> 關閉時,以 undefined 達成;如果串流發生錯誤或讀取器的鎖定在串流完成關閉前釋放,則會拒絕。
readableStreamBYOBReader.read(view[, options])#

要求底層 <ReadableStream> 的下一個資料區塊,並傳回一個承諾,一旦資料可用就會履行。

請勿將共用 <Buffer> 物件實例傳遞給此方法。共用 Buffer 物件是使用 Buffer.allocUnsafe()Buffer.from() 建立的,或通常由各種 node:fs 模組回呼傳回。這些類型的 Buffer 使用共用底層 <ArrayBuffer> 物件,其中包含所有共用 Buffer 實例的所有資料。當 Buffer<TypedArray><DataView> 傳遞給 readableStreamBYOBReader.read() 時,檢視的底層 ArrayBuffer分離,使所有可能存在於該 ArrayBuffer 上的現有檢視失效。這可能會對您的應用程式造成災難性的後果。

readableStreamBYOBReader.releaseLock()#

釋放此讀取器對底層 <ReadableStream> 的鎖定。

類別:ReadableStreamDefaultController#

每個 <ReadableStream> 都有一個控制器,負責串流佇列的內部狀態和管理。ReadableStreamDefaultController 是非位元組導向 ReadableStream 的預設控制器實作。

readableStreamDefaultController.close()#

關閉與此控制器關聯的 <ReadableStream>

readableStreamDefaultController.desiredSize#

傳回填滿 <ReadableStream> 佇列所需的剩餘資料量。

readableStreamDefaultController.enqueue([chunk])#

將新的資料區塊附加到 <ReadableStream> 的佇列。

readableStreamDefaultController.error([error])#

發出一個錯誤訊號,導致 <ReadableStream> 出錯並關閉。

類別:ReadableByteStreamController#

每個 <ReadableStream> 都有一個控制器,負責串流佇列的內部狀態和管理。ReadableByteStreamController 是針對以位元組為導向的 ReadableStream

readableByteStreamController.byobRequest#
readableByteStreamController.close()#

關閉與此控制器關聯的 <ReadableStream>

readableByteStreamController.desiredSize#

傳回填滿 <ReadableStream> 佇列所需的剩餘資料量。

readableByteStreamController.enqueue(chunk)#

將新的資料區塊附加到 <ReadableStream> 的佇列。

readableByteStreamController.error([error])#

發出一個錯誤訊號,導致 <ReadableStream> 出錯並關閉。

類別:ReadableStreamBYOBRequest#

在位元導向串流中使用 ReadableByteStreamController,以及使用 ReadableStreamBYOBReader 時,readableByteStreamController.byobRequest 屬性提供對 ReadableStreamBYOBRequest 執行個體的存取,該執行個體表示目前的讀取要求。此物件用於存取已提供給讀取要求以填入的 ArrayBuffer/TypedArray,並提供方法來表示已提供資料。

readableStreamBYOBRequest.respond(bytesWritten)#

表示已寫入 bytesWritten 數量的位元組至 readableStreamBYOBRequest.view

readableStreamBYOBRequest.respondWithNewView(view)#

表示請求已完成,並將位元組寫入新的 BufferTypedArrayDataView

readableStreamBYOBRequest.view#

類別:WritableStream#

WritableStream 是傳送串流資料的目的地。

import {
  WritableStream,
} from 'node:stream/web';

const stream = new WritableStream({
  write(chunk) {
    console.log(chunk);
  },
});

await stream.getWriter().write('Hello World'); 
new WritableStream([underlyingSink[, strategy]])#
  • underlyingSink <物件>
    • start <函式> 當建立 WritableStream 時立即呼叫的使用者定義函式。
    • write <函式> 當資料區塊已寫入 WritableStream 時呼叫的使用者定義函式。
    • close <Function>WritableStream 關閉時呼叫的使用者定義函式。
      • 傳回:已完成 undefined 的承諾。
    • abort <Function> 使用者定義的函式,用於突然關閉 WritableStream
      • reason <any>
      • 傳回:已完成 undefined 的承諾。
    • type <any> type 選項保留給未來使用,必須未定義。
  • strategy <物件>
    • highWaterMark <數字> 在套用反壓力之前的最大內部佇列大小。
    • size <函式> 使用者定義的函式,用於識別每個資料區塊的大小。
writableStream.abort([reason])#
  • reason <any>
  • 傳回:已完成 undefined 的承諾。

突然終止 WritableStream。所有排隊的寫入都會取消,其相關承諾也會被拒絕。

writableStream.close()#
  • 傳回:已完成 undefined 的承諾。

當沒有預期其他寫入時,關閉 WritableStream

writableStream.getWriter()#

建立並傳回新的寫入器執行個體,可用於將資料寫入 WritableStream

writableStream.locked#

writableStream.locked 屬性預設為 false,當有作用中的寫入器附加到此 WritableStream 時,會切換為 true

使用 postMessage() 傳輸#

可以使用 <MessagePort> 傳輸 <WritableStream> 執行個體。

const stream = new WritableStream(getWritableSinkSomehow());

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => {
  data.getWriter().write('hello');
};

port2.postMessage(stream, [stream]); 

類別:WritableStreamDefaultWriter#

new WritableStreamDefaultWriter(stream)#

建立新的 WritableStreamDefaultWriter,鎖定至指定的 WritableStream

writableStreamDefaultWriter.abort([reason])#
  • reason <any>
  • 傳回:已完成 undefined 的承諾。

突然終止 WritableStream。所有排隊的寫入都會取消,其相關承諾也會被拒絕。

writableStreamDefaultWriter.close()#
  • 傳回:已完成 undefined 的承諾。

當沒有預期其他寫入時,關閉 WritableStream

writableStreamDefaultWriter.closed#
  • 類型:<Promise>當關閉相關聯的 <WritableStream> 時,以 undefined 完成;如果串流錯誤或在串流完成關閉之前釋放寫入器的鎖定,則會拒絕。
writableStreamDefaultWriter.desiredSize#

填滿 <WritableStream> 佇列所需資料量。

writableStreamDefaultWriter.ready#
  • 類型:<Promise>當寫入器準備好使用時,以 undefined 完成。
writableStreamDefaultWriter.releaseLock()#

釋放此寫入器對底層 <ReadableStream> 的鎖定。

writableStreamDefaultWriter.write([chunk])#
  • chunk: <any>
  • 傳回:已完成 undefined 的承諾。

將新的資料區塊附加至 <WritableStream> 的佇列。

類別:WritableStreamDefaultController#

WritableStreamDefaultController 管理 <WritableStream> 的內部狀態。

writableStreamDefaultController.error([error])#

由使用者程式碼呼叫,以表示在處理 WritableStream 資料時發生錯誤。呼叫時,<WritableStream> 會中止,且目前擱置的寫入將會取消。

writableStreamDefaultController.signal#

類別:TransformStream#

TransformStream 包含 <ReadableStream><WritableStream>,這些串流會連接起來,以便將寫入至 WritableStream 的資料接收並可能轉換,然後再推入 ReadableStream 的佇列。

import {
  TransformStream,
} from 'node:stream/web';

const transform = new TransformStream({
  transform(chunk, controller) {
    controller.enqueue(chunk.toUpperCase());
  },
});

await Promise.all([
  transform.writable.getWriter().write('A'),
  transform.readable.getReader().read(),
]); 
new TransformStream([transformer[, writableStrategy[, readableStrategy]]])#
  • transformer <Object>
    • start <Function>TransformStream 建立時立即呼叫的使用者自訂函式。
    • transform <Function> 使用者自訂函式,用於接收並潛在修改寫入至 transformStream.writable 的資料區塊,然後再將其轉發至 transformStream.readable
    • flush <Function>TransformStream 的可寫入側關閉之前立即呼叫的使用者自訂函式,表示轉換程序結束。
    • readableType <any> readableType 選項保留供未來使用,必須undefined
    • writableType <any> writableType 選項保留供未來使用,必須undefined
  • writableStrategy <Object>
    • highWaterMark <數字> 在套用反壓力之前的最大內部佇列大小。
    • size <函式> 使用者定義的函式,用於識別每個資料區塊的大小。
  • readableStrategy <Object>
    • highWaterMark <數字> 在套用反壓力之前的最大內部佇列大小。
    • size <函式> 使用者定義的函式,用於識別每個資料區塊的大小。
transformStream.readable#
transformStream.writable#
透過 postMessage() 傳輸#

可以使用 <MessagePort> 傳輸 <TransformStream> 實例。

const stream = new TransformStream();

const { port1, port2 } = new MessageChannel();

port1.onmessage = ({ data }) => {
  const { writable, readable } = data;
  // ...
};

port2.postMessage(stream, [stream]); 

類別:TransformStreamDefaultController#

TransformStreamDefaultController 管理 TransformStream 的內部狀態。

transformStreamDefaultController.desiredSize#

填滿可讀取側佇列所需資料的數量。

transformStreamDefaultController.enqueue([chunk])#

將資料區塊附加到可讀取側的佇列。

transformStreamDefaultController.error([reason])#

向可讀取側和可寫入側發出訊號,表示在處理轉換資料時發生錯誤,導致兩側突然關閉。

transformStreamDefaultController.terminate()#

關閉傳輸的可讀取端,並導致可寫入端異常關閉。

類別:ByteLengthQueuingStrategy#

new ByteLengthQueuingStrategy(init)#
byteLengthQueuingStrategy.highWaterMark#
byteLengthQueuingStrategy.size#

類別:CountQueuingStrategy#

new CountQueuingStrategy(init)#
countQueuingStrategy.highWaterMark#
countQueuingStrategy.size#

類別:TextEncoderStream#

new TextEncoderStream()#

建立新的 TextEncoderStream 實例。

textEncoderStream.encoding#

TextEncoderStream 實例支援的編碼。

textEncoderStream.readable#
textEncoderStream.writable#

類別:TextDecoderStream#

new TextDecoderStream([encoding[, options]])#
  • encoding <string> 識別此 TextDecoder 實例支援的 encoding預設值:'utf-8'
  • options <Object>
    • fatal <boolean> 如果解碼失敗為致命錯誤,則為 true
    • ignoreBOM <boolean>true 時,TextDecoderStream 會在解碼結果中包含位元組順序標記。為 false 時,會從輸出中移除位元組順序標記。此選項僅在 encoding'utf-8''utf-16be''utf-16le' 時使用。預設值:false

建立新的 TextDecoderStream 實例。

textDecoderStream.encoding#

TextDecoderStream 實例支援的編碼。

textDecoderStream.fatal#

如果解碼錯誤導致引發 TypeError,則值為 true

textDecoderStream.ignoreBOM#

如果解碼結果將包含位元組順序標記,則值為 true

textDecoderStream.readable#
textDecoderStream.writable#

類別:CompressionStream#

new CompressionStream(format)#
  • format <string> 'deflate''deflate-raw''gzip' 之一。
compressionStream.readable#
compressionStream.writable#

類別:DecompressionStream#

new DecompressionStream(format)#
  • format <string> 'deflate''deflate-raw''gzip' 之一。
decompressionStream.readable#
decompressionStream.writable#

實用程式使用者#

實用程式使用者函式提供使用串流的常見選項。

它們使用以下方式存取

import {
  arrayBuffer,
  blob,
  buffer,
  json,
  text,
} from 'node:stream/consumers';const {
  arrayBuffer,
  blob,
  buffer,
  json,
  text,
} = require('node:stream/consumers');
streamConsumers.arrayBuffer(stream)#
import { arrayBuffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { TextEncoder } from 'node:util';

const encoder = new TextEncoder();
const dataArray = encoder.encode('hello world from consumers!');

const readable = Readable.from(dataArray);
const data = await arrayBuffer(readable);
console.log(`from readable: ${data.byteLength}`);
// Prints: from readable: 76const { arrayBuffer } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const { TextEncoder } = require('node:util');

const encoder = new TextEncoder();
const dataArray = encoder.encode('hello world from consumers!');
const readable = Readable.from(dataArray);
arrayBuffer(readable).then((data) => {
  console.log(`from readable: ${data.byteLength}`);
  // Prints: from readable: 76
});
streamConsumers.blob(stream)#
import { blob } from 'node:stream/consumers';

const dataBlob = new Blob(['hello world from consumers!']);

const readable = dataBlob.stream();
const data = await blob(readable);
console.log(`from readable: ${data.size}`);
// Prints: from readable: 27const { blob } = require('node:stream/consumers');

const dataBlob = new Blob(['hello world from consumers!']);

const readable = dataBlob.stream();
blob(readable).then((data) => {
  console.log(`from readable: ${data.size}`);
  // Prints: from readable: 27
});
streamConsumers.buffer(stream)#
import { buffer } from 'node:stream/consumers';
import { Readable } from 'node:stream';
import { Buffer } from 'node:buffer';

const dataBuffer = Buffer.from('hello world from consumers!');

const readable = Readable.from(dataBuffer);
const data = await buffer(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27const { buffer } = require('node:stream/consumers');
const { Readable } = require('node:stream');
const { Buffer } = require('node:buffer');

const dataBuffer = Buffer.from('hello world from consumers!');

const readable = Readable.from(dataBuffer);
buffer(readable).then((data) => {
  console.log(`from readable: ${data.length}`);
  // Prints: from readable: 27
});
streamConsumers.json(stream)#
import { json } from 'node:stream/consumers';
import { Readable } from 'node:stream';

const items = Array.from(
  {
    length: 100,
  },
  () => ({
    message: 'hello world from consumers!',
  }),
);

const readable = Readable.from(JSON.stringify(items));
const data = await json(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 100const { json } = require('node:stream/consumers');
const { Readable } = require('node:stream');

const items = Array.from(
  {
    length: 100,
  },
  () => ({
    message: 'hello world from consumers!',
  }),
);

const readable = Readable.from(JSON.stringify(items));
json(readable).then((data) => {
  console.log(`from readable: ${data.length}`);
  // Prints: from readable: 100
});
streamConsumers.text(stream)#
import { text } from 'node:stream/consumers';
import { Readable } from 'node:stream';

const readable = Readable.from('Hello world from consumers!');
const data = await text(readable);
console.log(`from readable: ${data.length}`);
// Prints: from readable: 27const { text } = require('node:stream/consumers');
const { Readable } = require('node:stream');

const readable = Readable.from('Hello world from consumers!');
text(readable).then((data) => {
  console.log(`from readable: ${data.length}`);
  // Prints: from readable: 27
});