叢集#

穩定性:2 - 穩定

原始碼: lib/cluster.js

Node.js 程序叢集可用於執行多個 Node.js 實例,這些實例可以在其應用程式執行緒之間分配工作負載。如果不需要程序隔離,請改用 worker_threads 模組,它允許在單一 Node.js 實例內執行多個應用程式執行緒。

cluster 模組允許輕鬆建立子程序,而這些子程序都共用伺服器埠。

import cluster from 'node:cluster';
import http from 'node:http';
import { availableParallelism } from 'node:os';
import process from 'node:process';

const numCPUs = availableParallelism();

if (cluster.isPrimary) {
  console.log(`Primary ${process.pid} is running`);

  // Fork workers.
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => {
    console.log(`worker ${worker.process.pid} died`);
  });
} else {
  // Workers can share any TCP connection
  // In this case it is an HTTP server
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');
  }).listen(8000);

  console.log(`Worker ${process.pid} started`);
}const cluster = require('node:cluster');
const http = require('node:http');
const numCPUs = require('node:os').availableParallelism();
const process = require('node:process');

if (cluster.isPrimary) {
  console.log(`Primary ${process.pid} is running`);

  // Fork workers.
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => {
    console.log(`worker ${worker.process.pid} died`);
  });
} else {
  // Workers can share any TCP connection
  // In this case it is an HTTP server
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');
  }).listen(8000);

  console.log(`Worker ${process.pid} started`);
}

現在執行 Node.js 會在工作執行緒之間共用埠 8000

$ node server.js
Primary 3596 is running
Worker 4324 started
Worker 4520 started
Worker 6056 started
Worker 5644 started 

在 Windows 上,目前還無法在工作執行緒中設定命名管線伺服器。

運作方式#

工作執行緒是使用 child_process.fork() 方法產生,以便它們可以透過 IPC 與父執行緒通訊,並傳遞伺服器控制代碼來回傳遞。

cluster 模組支援兩種分配傳入連線的方法。

第一個方法(除了 Windows 以外,所有平台上的預設方法)是循環方法,其中主要程序會在一個連接埠上監聽,接受新的連線,並以循環的方式將它們分配給工作程序,並內建一些智慧功能以避免工作程序過載。

第二個方法是主要程序會建立監聽 socket,並將它傳送給有興趣的工作程序。然後,工作程序會直接接受傳入的連線。

理論上,第二個方法應該會提供最佳效能。然而,在實際上,由於作業系統排程器的變動,分配往往會非常不平衡。已經觀察到超過 70% 的所有連線都集中在八個程序中的兩個程序中。

由於 server.listen() 會將大部分工作交給主要程序,因此在一般 Node.js 程序和叢集工作程序之間的行為有三個不同的地方

  1. server.listen({fd: 7}) 因為訊息傳遞給主要程序,所以會監聽檔案描述符 7 在父項中,並將控制權傳遞給工作程序,而不是監聽工作程序對檔案描述符 7 參考的看法。
  2. server.listen(handle) 明確監聽控制權會導致工作程序使用提供的控制權,而不是與主要程序交談。
  3. server.listen(0) 通常,這會導致伺服器在隨機連接埠上監聽。然而,在叢集中,每個工作程序每次執行 listen(0) 時都會收到相同的「隨機」連接埠。基本上,連接埠在第一次是隨機的,但之後是可以預測的。若要監聽唯一的連接埠,請根據叢集工作程序 ID 產生連接埠號碼。

Node.js 沒有提供路由邏輯。因此,設計一個應用程式非常重要,這樣它就不會過度依賴記憶體中的資料物件,例如會話和登入。

由於工作程序都是獨立的程序,因此可以根據程式的需求將它們終止或重新產生,而不會影響其他工作程序。只要還有一些工作程序仍然存在,伺服器就會繼續接受連線。如果沒有工作程序存在,則現有的連線將會中斷,新的連線將會被拒絕。然而,Node.js 不會自動管理工作程序的數量。根據自己的需求管理工作程序池是應用程式的責任。

雖然 node:cluster 模組的主要用例是網路,但它也可以用於需要工作程序的其他用例。

類別:Worker#

Worker 物件包含所有工作程序的公開資訊和方法。在主程式中,可以使用 cluster.workers 取得。在工作程序中,可以使用 cluster.worker 取得。

事件:'disconnect'#

類似於 cluster.on('disconnect') 事件,但特定於此工作程序。

cluster.fork().on('disconnect', () => {
  // Worker has disconnected
}); 

事件:'error'#

此事件與 child_process.fork() 提供的事件相同。

在工作程序中,也可以使用 process.on('error')

事件:'exit'#

  • code <number> 正常結束時的結束代碼。
  • signal <string> 導致程序被終止的訊號名稱(例如 'SIGHUP')。

類似於 cluster.on('exit') 事件,但特定於此工作程序。

import cluster from 'node:cluster';

if (cluster.isPrimary) {
  const worker = cluster.fork();
  worker.on('exit', (code, signal) => {
    if (signal) {
      console.log(`worker was killed by signal: ${signal}`);
    } else if (code !== 0) {
      console.log(`worker exited with error code: ${code}`);
    } else {
      console.log('worker success!');
    }
  });
}const cluster = require('node:cluster');

if (cluster.isPrimary) {
  const worker = cluster.fork();
  worker.on('exit', (code, signal) => {
    if (signal) {
      console.log(`worker was killed by signal: ${signal}`);
    } else if (code !== 0) {
      console.log(`worker exited with error code: ${code}`);
    } else {
      console.log('worker success!');
    }
  });
}

事件:'listening'#

類似於 cluster.on('listening') 事件,但特定於此工作執行緒。

cluster.fork().on('listening', (address) => {
  // Worker is listening
});cluster.fork().on('listening', (address) => {
  // Worker is listening
});

它不會在工作執行緒中發出。

事件:'message'#

類似於 cluster'message' 事件,但特定於此工作執行緒。

在工作執行緒中,也可以使用 process.on('message')

請參閱 process 事件:'message'

以下是使用訊息系統的範例。它在主程序中計算工作執行緒收到的 HTTP 要求數量

import cluster from 'node:cluster';
import http from 'node:http';
import { availableParallelism } from 'node:os';
import process from 'node:process';

if (cluster.isPrimary) {

  // Keep track of http requests
  let numReqs = 0;
  setInterval(() => {
    console.log(`numReqs = ${numReqs}`);
  }, 1000);

  // Count requests
  function messageHandler(msg) {
    if (msg.cmd && msg.cmd === 'notifyRequest') {
      numReqs += 1;
    }
  }

  // Start workers and listen for messages containing notifyRequest
  const numCPUs = availableParallelism();
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  for (const id in cluster.workers) {
    cluster.workers[id].on('message', messageHandler);
  }

} else {

  // Worker processes have a http server.
  http.Server((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');

    // Notify primary about the request
    process.send({ cmd: 'notifyRequest' });
  }).listen(8000);
}const cluster = require('node:cluster');
const http = require('node:http');
const process = require('node:process');

if (cluster.isPrimary) {

  // Keep track of http requests
  let numReqs = 0;
  setInterval(() => {
    console.log(`numReqs = ${numReqs}`);
  }, 1000);

  // Count requests
  function messageHandler(msg) {
    if (msg.cmd && msg.cmd === 'notifyRequest') {
      numReqs += 1;
    }
  }

  // Start workers and listen for messages containing notifyRequest
  const numCPUs = require('node:os').availableParallelism();
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  for (const id in cluster.workers) {
    cluster.workers[id].on('message', messageHandler);
  }

} else {

  // Worker processes have a http server.
  http.Server((req, res) => {
    res.writeHead(200);
    res.end('hello world\n');

    // Notify primary about the request
    process.send({ cmd: 'notifyRequest' });
  }).listen(8000);
}

事件:'online'#

類似於 cluster.on('online') 事件,但特定於此工作執行緒。

cluster.fork().on('online', () => {
  // Worker is online
}); 

它不會在工作執行緒中發出。

worker.disconnect()#

在工作執行緒中,此函式將關閉所有伺服器,等待這些伺服器上的 'close' 事件,然後中斷 IPC 通道。

在主程式中,會傳送內部訊息給工作執行緒,導致其自行呼叫 .disconnect()

導致設定 .exitedAfterDisconnect

伺服器關閉後,它將不再接受新連線,但其他任何正在聆聽的工作執行緒都可能會接受連線。現有的連線將被允許照常關閉。當沒有更多連線存在時,請參閱 server.close(),與工作執行緒的 IPC 通道將關閉,讓它能正常結束。

上述內容適用於伺服器連線,工作程序不會自動關閉用戶端連線,而且斷線不會在退出前等待它們關閉。

在工作程序中,process.disconnect 存在,但它不是這個函式;它是 disconnect()

由於長駐伺服器連線可能會阻擋工作程序斷線,因此傳送訊息可能很有用,這樣應用程式特定的動作就可以採取措施將它們關閉。實作逾時,在一段時間後如果尚未發出 'disconnect' 事件,則終止工作程序也可能很有用。

if (cluster.isPrimary) {
  const worker = cluster.fork();
  let timeout;

  worker.on('listening', (address) => {
    worker.send('shutdown');
    worker.disconnect();
    timeout = setTimeout(() => {
      worker.kill();
    }, 2000);
  });

  worker.on('disconnect', () => {
    clearTimeout(timeout);
  });

} else if (cluster.isWorker) {
  const net = require('node:net');
  const server = net.createServer((socket) => {
    // Connections never end
  });

  server.listen(8000);

  process.on('message', (msg) => {
    if (msg === 'shutdown') {
      // Initiate graceful close of any connections to server
    }
  });
} 

worker.exitedAfterDisconnect#

如果工作程序因為 .disconnect() 而退出,則此屬性為 true。如果工作程序以任何其他方式退出,則為 false。如果工作程序尚未退出,則為 undefined

布林值 worker.exitedAfterDisconnect 允許區分自願退出和意外退出,主要程序可以選擇根據此值不重生工作程序。

cluster.on('exit', (worker, code, signal) => {
  if (worker.exitedAfterDisconnect === true) {
    console.log('Oh, it was just voluntary – no need to worry');
  }
});

// kill worker
worker.kill(); 

worker.id#

每個新工作程序都會獲得自己的唯一 ID,此 ID 會儲存在 id 中。

當工作程序處於運作狀態時,此為在 cluster.workers 中索引它的鍵。

worker.isConnected()#

此函數會傳回 true,如果工作執行緒透過其 IPC 通道連線至其主要執行緒,否則傳回 false。工作執行緒會在建立後連線至其主要執行緒。在發出 'disconnect' 事件後,會中斷連線。

worker.isDead()#

此函數會傳回 true,如果工作執行緒的處理程序已終止(因為結束或被發出訊號)。否則,會傳回 false

import cluster from 'node:cluster';
import http from 'node:http';
import { availableParallelism } from 'node:os';
import process from 'node:process';

const numCPUs = availableParallelism();

if (cluster.isPrimary) {
  console.log(`Primary ${process.pid} is running`);

  // Fork workers.
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('fork', (worker) => {
    console.log('worker is dead:', worker.isDead());
  });

  cluster.on('exit', (worker, code, signal) => {
    console.log('worker is dead:', worker.isDead());
  });
} else {
  // Workers can share any TCP connection. In this case, it is an HTTP server.
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end(`Current process\n ${process.pid}`);
    process.kill(process.pid);
  }).listen(8000);
}const cluster = require('node:cluster');
const http = require('node:http');
const numCPUs = require('node:os').availableParallelism();
const process = require('node:process');

if (cluster.isPrimary) {
  console.log(`Primary ${process.pid} is running`);

  // Fork workers.
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('fork', (worker) => {
    console.log('worker is dead:', worker.isDead());
  });

  cluster.on('exit', (worker, code, signal) => {
    console.log('worker is dead:', worker.isDead());
  });
} else {
  // Workers can share any TCP connection. In this case, it is an HTTP server.
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end(`Current process\n ${process.pid}`);
    process.kill(process.pid);
  }).listen(8000);
}

worker.kill([signal])#

  • signal <string> 要傳送至工作執行緒處理程序的終止訊號名稱。預設:'SIGTERM'

此函數會終止工作執行緒。在主要工作執行緒中,會透過中斷 worker.process 來執行此動作,中斷後,再使用 signal 終止。在工作執行緒中,會透過使用 signal 終止處理程序來執行此動作。

kill() 函數會終止工作執行緒處理程序,而不等待正常中斷,其行為與 worker.process.kill() 相同。

此方法別名為 worker.destroy(),以維持向後相容性。

在工作執行緒中,process.kill() 存在,但並非此函數;而是 kill()

worker.process#

所有工作執行緒都是使用 child_process.fork() 建立的,此函數傳回的物件會儲存在 .process 中。在工作執行緒中,會儲存全域 process

請參閱:子程序模組

如果 process 上發生 'disconnect' 事件且 .exitedAfterDisconnect 不為 true,工作執行緒會呼叫 process.exit(0)。這可以防止意外中斷連線。

worker.send(message[, sendHandle[, options]][, callback])#

  • message <Object>
  • sendHandle <Handle>
  • options <Object> 如果存在 options 引數,則它是一個用於參數化傳送特定類型處理的物件。options 支援下列屬性
    • keepOpen <boolean> 傳遞 net.Socket 執行個體時可以使用的一個值。如果為 true,則在傳送程序中保持 socket 開啟。預設值:false
  • callback <Function>
  • 傳回:<boolean>

傳送訊息給工作執行緒或主程式,並可選擇附帶處理。

在主程式中,這會傳送訊息給特定工作執行緒。它與 ChildProcess.send() 相同。

在工作執行緒中,這會傳送訊息給主程式。它與 process.send() 相同。

這個範例會回傳主程式傳送來的所有訊息

if (cluster.isPrimary) {
  const worker = cluster.fork();
  worker.send('hi there');

} else if (cluster.isWorker) {
  process.on('message', (msg) => {
    process.send(msg);
  });
} 

事件:'disconnect'#

工作執行緒 IPC 通道斷開連線後發出。這可能會在工作執行緒正常結束、被終止或手動斷開連線(例如使用 worker.disconnect())時發生。

'disconnect''exit' 事件之間可能會有些延遲。這些事件可用於偵測處理程序是否卡在清理作業中,或是有哪些長駐連線。

cluster.on('disconnect', (worker) => {
  console.log(`The worker #${worker.id} has disconnected`);
}); 

事件:'exit'#

當任何工作執行緒結束時,叢集模組會發出 'exit' 事件。

這可以用於再次呼叫 .fork() 來重新啟動工作執行緒。

cluster.on('exit', (worker, code, signal) => {
  console.log('worker %d died (%s). restarting...',
              worker.process.pid, signal || code);
  cluster.fork();
}); 

請參閱 child_process 事件:'exit'

事件:'fork'#

當新的工作執行緒分岔時,叢集模組會發出 'fork' 事件。這可用於記錄工作執行緒活動,並建立自訂逾時。

const timeouts = [];
function errorMsg() {
  console.error('Something must be wrong with the connection ...');
}

cluster.on('fork', (worker) => {
  timeouts[worker.id] = setTimeout(errorMsg, 2000);
});
cluster.on('listening', (worker, address) => {
  clearTimeout(timeouts[worker.id]);
});
cluster.on('exit', (worker, code, signal) => {
  clearTimeout(timeouts[worker.id]);
  errorMsg();
}); 

事件:'listening'#

從工作執行緒呼叫 listen() 之後,當伺服器上發出 'listening' 事件時,主程式中的 cluster 上也會發出 'listening' 事件。

事件處理常式會以兩個引數執行,worker 包含工作執行緒物件,而 address 物件包含下列連線屬性:addressportaddressType。如果工作執行緒在多個位址上監聽,這會非常有用。

cluster.on('listening', (worker, address) => {
  console.log(
    `A worker is now connected to ${address.address}:${address.port}`);
}); 

addressType 是下列其中一個:

  • 4 (TCPv4)
  • 6 (TCPv6)
  • -1 (Unix 域套接字)
  • 'udp4''udp6' (UDPv4 或 UDPv6)

事件:'message'#

當叢集主節點從任何工作節點收到訊息時發出。

請參閱 child_process 事件:'message'

事件:'online'#

在分派新工作節點後,工作節點應回應線上訊息。當主節點收到線上訊息時,它會發出此事件。'fork''online' 的差異在於,fork 是在主節點分派工作節點時發出,而 'online' 是在工作節點執行時發出。

cluster.on('online', (worker) => {
  console.log('Yay, the worker responded after it was forked');
}); 

事件:'setup'#

每次呼叫 .setupPrimary() 時發出。

settings 物件是在呼叫 .setupPrimary() 時的 cluster.settings 物件,而且僅供參考,因為可以在單一滴答中呼叫 .setupPrimary() 多次。

如果準確性很重要,請使用 cluster.settings

cluster.disconnect([callback])#

  • callback <Function> 在所有工作節點斷線且已關閉所有控制代碼時呼叫。

cluster.workers 中的每個工作節點上呼叫 .disconnect()

當它們斷線時,所有內部控制代碼都將關閉,允許主程序在沒有其他事件等待的情況下正常結束。

此方法採用一個可選的回呼引數,該引數將於完成時呼叫。

只能從主要程序呼叫此方法。

cluster.fork([env])#

產生新的工作程序。

只能從主要程序呼叫此方法。

cluster.isMaster#

穩定性:0 - 已標示為不建議使用

已標示為不建議使用的 cluster.isPrimary 別名。

cluster.isPrimary#

如果程序為主要程序,則為 True。這是由 process.env.NODE_UNIQUE_ID 決定。如果 process.env.NODE_UNIQUE_ID 未定義,則 isPrimarytrue

cluster.isWorker#

如果程序不是主要程序,則為 True(為 cluster.isPrimary 的否定值)。

cluster.schedulingPolicy#

排程政策,cluster.SCHED_RR 為循環,或 cluster.SCHED_NONE 交由作業系統決定。這是全域設定,且在第一個工作執行緒產生或呼叫 .setupPrimary() 時,會立即凍結,以先發生的為準。

SCHED_RR 是所有作業系統的預設值,Windows 除外。一旦 libuv 能有效分配 IOCP 句柄,且不會造成效能大幅下降,Windows 將會變更為 SCHED_RR

cluster.schedulingPolicy 也可以透過 NODE_CLUSTER_SCHED_POLICY 環境變數設定。有效值為 'rr''none'

cluster.settings#

  • <Object>
    • execArgv <string[]> 傳遞給 Node.js 可執行檔的字串引數清單。預設值:process.execArgv
    • exec <string> 工作執行緒檔案的檔案路徑。預設值:process.argv[1]
    • args <string[]> 傳遞給工作執行緒的字串引數。預設值:process.argv.slice(2)
    • cwd <string> 工作執行緒程序的目前工作目錄。預設值:undefined(繼承自父程序)。
    • serialization <字串> 指定用於在程序之間傳送訊息的序列化類型。可能的數值為 'json''advanced'。有關更多詳細資訊,請參閱 child_process 的進階序列化預設值: false
    • silent <布林> 是否將輸出傳送至父項的 stdio。預設值: false
    • stdio <陣列> 設定已分岔程序的 stdio。由於叢集模組仰賴 IPC 才能運作,因此此設定必須包含 'ipc' 項目。提供此選項時,會覆寫 silent。請參閱 child_process.spawn()stdio
    • uid <數字> 設定程序的使用者身分。(請參閱 setuid(2)。)
    • gid <數字> 設定程序的群組身分。(請參閱 setgid(2)。)
    • inspectPort <數字> | <函式> 設定工作人員的檢查程式埠。這可以是數字,或是函式,該函式不帶任何引數,並傳回數字。預設情況下,每個工作人員都會取得自己的埠,從主項目的 process.debugPort 開始遞增。
    • windowsHide <布林> 隱藏通常會在 Windows 系統上建立的分岔程序主控台視窗。預設值: false

呼叫 .setupPrimary()(或 .fork())後,此設定物件將包含設定,包括預設值。

此物件不應手動變更或設定。

cluster.setupMaster([settings])#

穩定性:0 - 已標示為不建議使用

.setupPrimary() 的已棄用別名。

cluster.setupPrimary([settings])#

setupPrimary 用於變更預設的「fork」行為。呼叫後,設定將出現在 cluster.settings 中。

任何設定變更只會影響對 .fork() 的後續呼叫,且不會影響已執行的工作人員。

無法透過 .setupPrimary() 設定的工作人員唯一屬性是傳遞給 .fork()env

上述預設值僅適用於第一次呼叫;後續呼叫的預設值是呼叫 cluster.setupPrimary() 當時的目前值。

import cluster from 'node:cluster';

cluster.setupPrimary({
  exec: 'worker.js',
  args: ['--use', 'https'],
  silent: true,
});
cluster.fork(); // https worker
cluster.setupPrimary({
  exec: 'worker.js',
  args: ['--use', 'http'],
});
cluster.fork(); // http workerconst cluster = require('node:cluster');

cluster.setupPrimary({
  exec: 'worker.js',
  args: ['--use', 'https'],
  silent: true,
});
cluster.fork(); // https worker
cluster.setupPrimary({
  exec: 'worker.js',
  args: ['--use', 'http'],
});
cluster.fork(); // http worker

只能從主要程序呼叫此方法。

cluster.worker#

目前工作人員物件的參考。主程序中無法使用。

import cluster from 'node:cluster';

if (cluster.isPrimary) {
  console.log('I am primary');
  cluster.fork();
  cluster.fork();
} else if (cluster.isWorker) {
  console.log(`I am worker #${cluster.worker.id}`);
}const cluster = require('node:cluster');

if (cluster.isPrimary) {
  console.log('I am primary');
  cluster.fork();
  cluster.fork();
} else if (cluster.isWorker) {
  console.log(`I am worker #${cluster.worker.id}`);
}

cluster.workers#

一個儲存活動工作者物件的雜湊,以 id 欄位為鍵。這使得輕鬆迴圈所有工作者變得容易。它僅在主要程序中可用。

工作者在斷開連線退出後會從 cluster.workers 中移除。這兩個事件之間的順序無法預先確定。但是,可以保證從 cluster.workers 清單中移除會在發出最後一個 'disconnect''exit' 事件之前發生。

import cluster from 'node:cluster';

for (const worker of Object.values(cluster.workers)) {
  worker.send('big announcement to all workers');
}const cluster = require('node:cluster');

for (const worker of Object.values(cluster.workers)) {
  worker.send('big announcement to all workers');
}