NodeJS 速習チュートリアル

Node.js Cluster

1. Cluster モジュールとは?

Cluster モジュールは、同じサーバーポートを共有する複数のワーカープロセスを作成する手段を提供します。

Node.js はデフォルトでシングルスレッドで動作しますが、Cluster モジュールを使用することで、アプリケーションが複数の CPU コアを利用できるようになり、マルチコアシステム上でのパフォーマンスを大幅に向上させることができます。

各ワーカーは、独自のイベントループとメモリ空間を持つ独立したプロセスとして動作しますが、サーバーポートはすべて共通のものを共有します。マスタープロセスは、ワーカーの作成と、入ってくる接続のワーカー間での分散を担当します。

2. Cluster モジュールのインポート

Cluster モジュールは Node.js に標準で組み込まれています。スクリプト内で require することで使用可能です。

const cluster = require('cluster');
const os = require('os');

// これがマスタープロセスかどうかを確認
if (cluster.isMaster) {
  console.log(`マスタープロセス ${process.pid} が実行中です`);
} else {
  console.log(`ワーカープロセス ${process.pid} が起動しました`);
}

3. クラスタリングの仕組み

Cluster モジュールは、複数のワーカープロセスを生成(spawn)するマスタープロセスを作成することで動作します。

  • マスタープロセスはアプリケーションコードを実行せず、ワーカーの管理を行います。
  • 各ワーカープロセスは、アプリケーションコードを独立して実行する新しい Node.js インスタンスです。

       注意: 内部的には、Cluster モジュールは Child Process モジュールの fork() メソッドを使用して新しいワーカーを作成しています。

プロセスタイプ責務
マスター (Master)ワーカープロセスの作成と管理
ワーカーのヘルスチェック(監視)
クラッシュしたワーカーの再起動
負荷分散(接続の分散)
ワーカー (Worker)実際のアプリケーションコードの実行
リクエストの処理
データの処理
ビジネスロジックの実行

4. 基本的なクラスターの作成

以下は、各 CPU コアに対してワーカープロセスを作成するシンプルな例です。

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  // これはマスタープロセスです

  console.log(`マスター ${process.pid} が実行中です`);

  // 各 CPU コアに対してワーカーをフォークする
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  // ワーカーの終了を監視
  cluster.on('exit', (worker, code, signal) => {
    console.log(`ワーカー ${worker.process.pid} が終了しました`);
    // 終了したワーカーの代わりに新しいワーカーをフォークすることも可能です
    console.log('新しいワーカーをフォークしています...');
    cluster.fork();
  });
} else {
  // これはワーカープロセスです

  // HTTP サーバーを作成
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end(`ワーカー ${process.pid} からのレスポンスです\n`);

    // CPU 負荷をシミュレート
    let i = 1e7;
    while (i > 0) { i--; }

  }).listen(8000);

  console.log(`ワーカー ${process.pid} が起動しました`);
}

この例のポイント:

  • マスタープロセスが CPU コア数を検出します。
  • CPU ごとに 1 つのワーカーをフォークします。
  • 各ワーカーは同じポート(8000)で HTTP サーバーを作成します。
  • Cluster モジュールが自動的に入ってくる接続を負荷分散(ロードバランシング)します。
  • ワーカーがクラッシュした場合、マスターが新しいワーカーを作成します。

5. ワーカー間の通信

Child Process モジュールの IPC(プロセス間通信)と同様に、send() メソッドと message イベントを使用して、マスターとワーカープロセス間で通信を行うことができます。

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  console.log(`マスター ${process.pid} が実行中です`);

  // 各ワーカーのリクエスト数を追跡
  const requestCounts = {};

  // ワーカーをフォーク
  for (let i = 0; i < numCPUs; i++) {
    const worker = cluster.fork();
    requestCounts[worker.id] = 0;

    // このワーカーからのメッセージをリッスン
    worker.on('message', (msg) => {
      if (msg.cmd === 'incrementRequestCount') {
        requestCounts[worker.id]++;
        console.log(`ワーカー ${worker.id} (pid ${worker.process.pid}) が ${requestCounts[worker.id]} 件のリクエストを処理しました`);
      }
    });
  }

  // 10秒ごとに各ワーカーへリクエスト数を送信
  setInterval(() => {
    for (const id in cluster.workers) {
      cluster.workers[id].send({
        cmd: 'requestCount',
        requestCount: requestCounts[id]
      });
    }
    console.log('総リクエスト集計:', requestCounts);
  }, 10000);

  // ワーカーの終了を処理
  cluster.on('exit', (worker, code, signal) => {
    console.log(`ワーカー ${worker.process.pid} が終了しました`);
    // 代わりの新しいワーカーをフォーク
    const newWorker = cluster.fork();
    requestCounts[newWorker.id] = 0;
  });
} else {
  // ワーカープロセス
  console.log(`ワーカー ${process.pid} が起動しました`);

  let localRequestCount = 0;

  // マスターからのメッセージを処理
  process.on('message', (msg) => {
    if (msg.cmd === 'requestCount') {
      console.log(`マスターによると、ワーカー ${process.pid} は ${msg.requestCount} 件のリクエストを処理しました`);
    }
  });

  // HTTP サーバーを作成
  http.createServer((req, res) => {
    // リクエストを処理したことをマスターに通知
    process.send({ cmd: 'incrementRequestCount' });

    // ローカルのカウントを増やす
    localRequestCount++;

    // レスポンスを送信
    res.writeHead(200);
    res.end(`ワーカー ${process.pid} からこんにちは。ローカルで ${localRequestCount} 件のリクエストを処理しました\n`);
  }).listen(8000);
}

6. ゼロダウンタイム・リスタート

クラスタリングの主な利点の一つは、ダウンタイムなしでワーカーを再起動できることです。これは、アプリケーションのアップデートをデプロイする際に非常に有用です。

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  console.log(`マスター ${process.pid} が実行中です`);

  // ワーカーを保持
  const workers = [];

  // 初期ワーカーをフォーク
  for (let i = 0; i < numCPUs; i++) {
    workers.push(cluster.fork());
  }

  // ワーカーを一つずつ再起動する関数
  function restartWorkers() {
    console.log('ゼロダウンタイム・リスタートを開始します...');
    
    let i = 0;
    function restartWorker() {
      if (i >= workers.length) {
        console.log('すべてのワーカーが正常に再起動されました!');
        return;
      }

      const worker = workers[i++];
      console.log(`ワーカー ${worker.process.pid} を再起動中...`);

      // 新しいワーカーを作成
      const newWorker = cluster.fork();
      newWorker.on('listening', () => {
        // 新しいワーカーがリッスンを開始したら、古いワーカーを終了させる
        worker.disconnect();

        // 配列内の古いワーカーを入れ替える
        workers[workers.indexOf(worker)] = newWorker;

        // 次のワーカーへ進む
        setTimeout(restartWorker, 1000);
      });
    }

    // 再帰的なプロセスを開始
    restartWorker();
  }
  
  // 20秒後に再起動をシミュレート
  setTimeout(restartWorkers, 20000);

  // 通常のワーカー終了を処理
  cluster.on('exit', (worker, code, signal) => {
    if (worker.exitedAfterDisconnect !== true) {
      console.log(`ワーカー ${worker.process.pid} が予期せず終了しました。入れ替えます...`);
      const newWorker = cluster.fork();
      workers[workers.indexOf(worker)] = newWorker;
    }
  });
} else {
  // ワーカープロセス

  // HTTP サーバーを作成
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end(`ワーカー ${process.pid} が応答中。稼働時間: ${process.uptime().toFixed(2)} 秒\n`);
  }).listen(8000);

  console.log(`ワーカー ${process.pid} が起動しました`);
}

この例のデモンストレーション内容:

  • 初期ワーカーセットの作成。
  • 各ワーカーを一つずつ順番に入れ替え。
  • 古いワーカーを切断する前に、新しいワーカーがリッスン状態であることを確認。
  • 予期せぬワーカーの終了に対する適切な処理。

7. 負荷分散(ロードバランシング)

Cluster モジュールには、入ってくる接続をワーカープロセス間で分散するための負荷分散機能が組み込まれています。

主に 2 つの戦略があります:

7.1 ラウンドロビン(デフォルト)

Windows 以外のすべてのプラットフォームでは、Node.js はデフォルトでラウンドロビン方式を採用しています。マスタープロセスが接続を受け付け、順番にワーカーへ分散します。

       注意: Windows では、OS のポート処理の仕様により、負荷分散の挙動が異なります。Windows では、ワーカーが接続を奪い合う形で受け付けます。

7.2 プライマリワーカーによる直接受付

cluster.schedulingPolicy を設定することで、各ワーカーが直接接続を受け付けるようにすることも可能です。

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

// スケジューリングポリシーを SCHED_NONE に設定(ワーカー自身に接続を受け付けさせる)
cluster.schedulingPolicy = cluster.SCHED_NONE;

if (cluster.isMaster) {
  console.log(`マスター ${process.pid} が実行中です`);

  // ワーカーをフォーク
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => {
    console.log(`ワーカー ${worker.process.pid} が終了しました`);
    cluster.fork();
  });
} else {
  // ワーカープロセス
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end(`ワーカー ${process.pid} からのレスポンスです\n`);
  }).listen(8000);

  console.log(`ワーカー ${process.pid} が起動しました`);
}

8. 状態の共有(Shared State)

各ワーカーは独自のメモリ空間を持つ独立したプロセスで動作するため、変数を介して直接状態を共有することはできません。その代わりに以下の方法をとります:

  • IPC メッセージングを使用する(通信の例で示した通り)。
  • Redis、MongoDB、ファイルシステムなどの外部ストレージを使用する。
  • セッション管理のためにスティッキーセッション(Sticky Sessions)を使用する。

9. スティッキーセッションの実装例

スティッキーセッションは、同じクライアントからのリクエストが常に同じワーカープロセスに送られるように保証します。

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  console.log(`マスター ${process.pid} が実行中です`);

  // ワーカーをフォーク
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  // ID ごとにワーカーの参照を保存
  const workers = {};
  for (const id in cluster.workers) {
    workers[id] = cluster.workers[id];
  }

  // 接続をワーカーにルーティングするサーバーを作成
  const server = http.createServer((req, res) => {
    // クライアントの IP を取得
    const clientIP = req.connection.remoteAddress || req.socket.remoteAddress;

    // どのワーカーを使用するか決定するためのシンプルなハッシュ関数
    const workerIndex = clientIP.split('.').reduce((a, b) => a + parseInt(b), 0) % numCPUs;
    const workerIds = Object.keys(workers);
    const workerId = workerIds[workerIndex];

    // 選択されたワーカーにリクエストを送信
    workers[workerId].send('sticky-session:connection', req.connection);

    res.end(`リクエストはワーカー ${workerId} にルーティングされました`);
  }).listen(8000);

  console.log('マスターサーバーがポート 8000 で待機中');

  // ワーカーの終了を処理
  cluster.on('exit', (worker, code, signal) => {
    console.log(`ワーカー ${worker.process.pid} が終了しました`);

    // 終了したワーカーを削除
    delete workers[worker.id];

    // 代わりのワーカーを作成
    const newWorker = cluster.fork();
    workers[newWorker.id] = newWorker;
  });
} else {
  // ワーカープロセス - コンセプトの実証のみ
  // 実際の実装では、より詳細なソケット処理が必要です

  process.on('message', (msg, socket) => {
    if (msg === 'sticky-session:connection' && socket) {
      console.log(`ワーカー ${process.pid} がスティッキー接続を受信しました`);
      
      // 実際の実装では、ここでソケットを処理します
      // socket.end(`ワーカー ${process.pid} によって処理されました\n`);
    }
  });

  // ワーカーも独自のサーバーを実行可能
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end(`ワーカー ${process.pid} への直接リクエスト\n`);
  }).listen(8001);

  console.log(`ワーカー ${process.pid} が起動しました`);
}

これは概念を示す簡略化された例です。本番環境では通常:

  • より高度なハッシュアルゴリズムを使用します。
  • IP アドレスの代わりにクッキーやその他のセッション識別子を使用します。
  • ソケット接続をより慎重に扱います。

10. ワーカーのライフサイクル

クラスターを適切に管理するためには、ワーカーのライフサイクルを理解することが重要です。

イベント説明
fork新しいワーカーがフォークされたときに発生
onlineワーカーが実行され、メッセージを受信する準備ができたときに発生
listeningワーカーが接続のリッスンを開始したときに発生
disconnectワーカーの IPC チャネルが切断されたときに発生
exitワーカープロセスが終了したときに発生
const cluster = require('cluster');
const http = require('http');

if (cluster.isMaster) {
  console.log(`マスター ${process.pid} が実行中です`);

  // ワーカーをフォーク
  const worker = cluster.fork();

  // すべてのワーカーライフサイクルイベントをリッスン
  worker.on('fork', () => {
    console.log(`ワーカー ${worker.process.pid} がフォークされています`);
  });

  worker.on('online', () => {
    console.log(`ワーカー ${worker.process.pid} がオンラインです`);
  });

  worker.on('listening', (address) => {
    console.log(`ワーカー ${worker.process.pid} がポート ${address.port} でリッスン中`);
  });

  worker.on('disconnect', () => {
    console.log(`ワーカー ${worker.process.pid} が切断されました`);
  });

  worker.on('exit', (code, signal) => {
    console.log(`ワーカー ${worker.process.pid} が終了コード ${code}、シグナル ${signal} で終了しました`);

     if (signal) {
      console.log(`ワーカーはシグナル ${signal} によって強制終了されました`);
    } else if (code !== 0) {
      console.log(`ワーカーはエラーコード ${code} で終了しました`);
    } else {
      console.log('ワーカーは正常に終了しました');
    }
  });

  // 10秒後、ワーカーを穏やかに切断
  setTimeout(() => {
    console.log('ワーカーを穏やかに切断しています...');
    worker.disconnect();
  }, 10000);

} else {
  // ワーカープロセス
  console.log(`ワーカー ${process.pid} が起動しました`);

  // HTTP サーバーを作成
  http.createServer((req, res) => {
    res.writeHead(200);
    res.end(`ワーカー ${process.pid} からのレスポンスです\n`);
  }).listen(8000);

  // ワーカーが切断された場合、サーバーを閉じる
  process.on('disconnect', () => {
    console.log(`ワーカー ${process.pid} が切断されました。サーバーを閉じています...`);
    // 本番環境では、すべての接続を閉じ、リソースをクリーンアップする必要があります
    process.exit(0);
  });
}

11. グレースフル・シャットダウン

グレースフル・シャットダウン(穏やかな停止)は、ワーカープロセスが終了する前に、既存のリクエストの処理を完了させるために重要です。

const cluster = require('cluster');
const http = require('http');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  console.log(`マスター ${process.pid} が実行中です`);

  // ワーカーをフォーク
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  // 終了シグナルを処理
  process.on('SIGTERM', () => {
    console.log('マスターが SIGTERM を受信しました。グレースフル・シャットダウンを開始します...');

    // すべてのワーカーに仕事を終えて終了するよう通知
    Object.values(cluster.workers).forEach(worker => {
      console.log(`ワーカー ${worker.process.pid} に SIGTERM を送信中`);
      worker.send('shutdown');
    });

    // ワーカーが穏やかに終了しない場合に備え、強制終了のタイムアウトを設定
    setTimeout(() => {
      console.log('一部のワーカーが穏やかに終了しませんでした。強制終了します...');
      Object.values(cluster.workers).forEach(worker => {
        if (!worker.isDead()) {
          console.log(`ワーカー ${worker.process.pid} をキルしています`);
          worker.process.kill('SIGKILL');
        }
    });

    // マスターを終了
    console.log('すべてのワーカーが終了しました。マスターを終了します...');
    process.exit(0);
  }, 5000);
  });

  // ワーカーの終了を処理
  cluster.on('exit', (worker, code, signal) => {
    console.log(`ワーカー ${worker.process.pid} が終了しました (${signal || code})`);

    // すべてのワーカーが終了したら、マスターも終了
    if (Object.keys(cluster.workers).length === 0) {
      console.log('すべてのワーカーが終了しました。マスターをシャットダウンします...');
      process.exit(0);
    }
  });

  console.log(`マスター ${process.pid} は ${Object.keys(cluster.workers).length} 個のワーカーとともに準備完了です`);
  console.log('グレースフル・シャットダウンを開始するには、マスタープロセスに SIGTERM を送信してください');

} else {
  // ワーカープロセス
  console.log(`ワーカー ${process.pid} が起動しました`);

  // シャットダウン中かどうかのフラグ
  let isShuttingDown = false;
  let activeConnections = 0;

  // HTTP サーバーを作成
  const server = http.createServer((req, res) => {
     // アクティブな接続を追跡
     activeConnections++;

    // 遅いレスポンスをシミュレート
    setTimeout(() => {
      res.writeHead(200);
      res.end(`ワーカー ${process.pid} からこんにちは\n`);

      // 接続完了
      activeConnections--;

      // シャットダウン中かつアクティブな接続がゼロの場合、サーバーを閉じる
      if (isShuttingDown && activeConnections === 0) {
        console.log(`ワーカー ${process.pid} にアクティブな接続がありません。サーバーを閉じています...`);
        server.close(() => {
          console.log(`ワーカー ${process.pid} がサーバーを閉じました。終了します...`);
          process.exit(0);
        });
      }
    }, 2000);
  });

  // サーバー開始
  server.listen(8000);

  // マスターからのシャットダウンメッセージを処理
  process.on('message', (msg) => {
     if (msg === 'shutdown') {
      console.log(`ワーカー ${process.pid} がシャットダウンメッセージを受信しました。新規接続を停止します...`);

      // シャットダウンフラグを設定
      isShuttingDown = true;

      // 新規接続の受け付けを停止
      server.close(() => {
        console.log(`ワーカー ${process.pid} がサーバーを閉じました`);

      // アクティブな接続がない場合は即座に終了
      if (activeConnections === 0) {
        console.log(`ワーカー ${process.pid} にアクティブな接続がありません。終了します...`);
        process.exit(0);
      } else {
        console.log(`ワーカー ${process.pid} は ${activeConnections} 件の接続完了を待機しています...`);
      }
    });
  }
  });

  // 直接の終了シグナルも処理
  process.on('SIGTERM', () => {
    console.log(`ワーカー ${process.pid} が直接 SIGTERM を受信しました`);
    // 同じシャットダウンロジックを使用
    isShuttingDown = true;
    server.close(() => process.exit(0));
  });
}

12. ベストプラクティス

  • ワーカー数: ほとんどの場合、CPU コアあたり 1 つのワーカーを作成するのが最適です。
  • ステートレス設計: クラスターで効果的に動作させるため、アプリケーションはステートレス(状態を持たない)に設計してください。
  • グレースフル・シャットダウン: 接続の切断を避けるため、適切なシャットダウン処理を実装してください。
  • ワーカーの監視: クラッシュしたワーカーを監視し、速やかに再作成してください。
  • データベース接続: 各ワーカーが独自の接続プールを持つため、データベース接続数を適切に設定してください。
  • 共有リソース: ワーカー間で共有されるリソース(ファイルロックなど)の扱いに注意してください。
  • ワーカーの軽量化: ワーカープロセスで不必要なメモリ使用を避けてください。

       警告: 複数のワーカーを使用する場合、ファイルベースのロックやその他の共有リソースには注意が必要です。シングルプロセス環境では安全だった操作も、マルチワーカー環境ではレースコンディション(競合状態)を引き起こす可能性があります。

13. Cluster モジュールの代替案

Cluster モジュールは強力ですが、複数のコアで Node.js アプリケーションを実行するための代替手段も存在します。

アプローチ説明ユースケース
PM2負荷分散とクラスタリング機能を内蔵した Node.js 用プロセス管理ツール堅牢なプロセス管理が必要な本番環境のアプリケーション
ロードバランサーNginx などのロードバランサーの背後で複数の Node.js インスタンスを実行複数のサーバーやコンテナに負荷を分散する場合
Worker ThreadsCPU 負荷の高いタスクのための軽量なスレッド (Node.js >= 10.5.0)単一プロセス内での CPU 集中的な操作
コンテナDocker や Kubernetes を使用して複数のコンテナインスタンスを実行モダンなクラウド環境におけるスケーラブルな分散アプリケーション

14. 高度な負荷分散戦略

デフォルトのラウンドロビンが多くの用途に適していますが、特定のユースケースではより洗練された戦略が必要になる場合があります。

14.1 重み付きラウンドロビン (Weighted Round-Robin)

const cluster = require('cluster');
const http = require('http');
const os = require('os');
if (cluster.isMaster) {
  console.log(`マスター ${process.pid} が実行中です`);

  // 異なる重みを持つワーカーを作成
  const workerWeights = [3, 2, 1]; // 最初のワーカーは最後のワーカーの3倍の負荷を受ける
  const workers = [];

  // 重みに基づいてワーカーを作成
  workerWeights.forEach((weight, index) => {
    for (let i = 0; i < weight; i++) {
      const worker = cluster.fork({ WORKER_WEIGHT: weight });
      worker.weight = weight;
      workers.push(worker);
    }
  });

  // 次に使用するワーカーを追跡
  let workerIndex = 0;

  // ロードバランサーサーバーを作成
  http.createServer((req, res) => {
    // 重みを考慮したシンプルなラウンドロビン
    const worker = workers[workerIndex++ % workers.length];
    worker.send('handle-request', req.socket);
  }).listen(8000);

} else {   // ワーカーコード
  process.on('message', (message, socket) => {
    if (message === 'handle-request' && socket) {
      // リクエストを処理
      socket.end(`ワーカー ${process.pid} によって処理されました\n`);
    }
  });
}

14.2 最小接続数 (Least Connections)

const cluster = require('cluster');
const http = require('http');
if (cluster.isMaster) {
  console.log(`マスター ${process.pid} が実行中です`);

  // ワーカーを作成し、接続数を追跡
  const workers = [];
  const numCPUs = require('os').cpus().length;

  for (let i = 0; i < numCPUs; i++) {
    const worker = cluster.fork();
    worker.connectionCount = 0;
    workers.push(worker);

    // ワーカーの接続状態を監視
    worker.on('message', (msg) => {
       if (msg.type === 'connection') {
         worker.connectionCount = msg.count;
       }
     });
  }
  // ロードバランサーを作成
  http.createServer((req, res) => {
    // 最も接続数が少ないワーカーを探す
    let minConnections = Infinity;
    let selectedWorker = null;

    for (const worker of workers) {
      if (worker.connectionCount < minConnections) {
        minConnections = worker.connectionCount;
        selectedWorker = worker;
      }
    }

    if (selectedWorker) {
      selectedWorker.send('handle-request', req.socket);
    }
  }).listen(8000);
}

15. パフォーマンスモニタリングとメトリクス

クラスターのパフォーマンスを監視することは、アプリケーションの健全性を維持するために不可欠です。基本的なメトリクス収集の実装例を以下に示します。

const cluster = require('cluster');
const os = require('os');
const promClient = require('prom-client');
if (cluster.isMaster) {
  // メトリクスレジストリを作成
  const register = new promClient.Registry();
  promClient.collectDefaultMetrics({ register });

  // カスタムメトリクス
  const workerRequests = new promClient.Counter({
    name: 'worker_requests_total',
    help: 'ワーカーが処理した総リクエスト数',
    labelNames: ['worker_pid']
  });
register.registerMetric(workerRequests);

  // ワーカーをフォーク
  for (let i = 0; i < os.cpus().length; i++) {
    const worker = cluster.fork();
    worker.on('message', (msg) => {
      if (msg.type === 'request_processed') {
        workerRequests.inc({ worker_pid: worker.process.pid });
      }
    });
  }

  // メトリクスエンドポイントを公開
  require('http').createServer(async (req, res) => {
    if (req.url === '/metrics') {
      res.setHeader('Content-Type', register.contentType);
      res.end(await register.metrics());
    }
  }).listen(9090);
} else {
  // ワーカーコード
  let requestCount = 0;

  require('http').createServer((req, res) => {
    requestCount++;
    process.send({ type: 'request_processed' });
    res.end(`リクエスト ${requestCount} 件目がワーカー ${process.pid} によって処理されました\n`);
  }).listen(8000);
}

15.1 監視すべき主要メトリクス

  • リクエストレート: ワーカーごとの秒間リクエスト数
  • エラーレート: 秒間エラーレスポンス数
  • レスポンスタイム: P50, P90, P99 のレスポンス時間
  • CPU 使用率: ワーカーごとの CPU 利用率
  • メモリ使用量: ワーカーごとのヒープおよび RSS メモリ
  • イベントループの遅延 (Lag): イベントループ内での遅延時間

16. コンテナ環境(Docker / Kubernetes)との統合

Docker や Kubernetes などのコンテナ環境で実行する場合は、以下のベストプラクティスを考慮してください。

16.1 プロセス管理

# Node.js クラスターアプリ用の Dockerfile 例
FROM node:16-slim

WORKDIR /app
COPY package*.json ./
RUN npm install --production

# アプリケーションコードをコピー
COPY . .

# 適切なシグナル処理のため、node プロセスを PID 1 として使用
CMD ["node", "cluster.js"]

# ヘルスチェック
HEALTHCHECK --interval=30s --timeout=3s \
CMD curl -f http://localhost:8080/health || exit 1

16.2 Kubernetes デプロイ設定

# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: node-cluster-app
spec:
  replicas: 3 # Pod の数
  selector:
    matchLabels:
      app: node-cluster
  template:
    metadata:
      labels:
        app: node-cluster
    spec:
      containers:
      - name: node-app
        image: your-image:latest
        ports:
          - containerPort: 8000
        resources:
          requests:
            cpu: "500m"
            memory: "512Mi"
          limits:
            cpu: "1000m"
            memory: "1Gi"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 8000
          initialDelaySeconds: 5
          periodSeconds: 10

17. よくある落とし穴と解決策

17.1 ワーカーのメモリリーク

問題: ワーカープロセスのメモリリークにより、メモリが徐々に増加する。
解決策: メモリ使用量に基づいたワーカーの再サイクル(入れ替え)を実装する。

// ワーカープロセス内
const MAX_MEMORY_MB = 500; // 再サイクル前の最大メモリ容量 (MB)

function checkMemory() {
  const memoryUsage = process.memoryUsage();
  const memoryMB = memoryUsage.heapUsed / 1024 / 1024;

  if (memoryMB > MAX_MEMORY_MB) {
    console.log(`ワーカー ${process.pid} のメモリ ${memoryMB.toFixed(2)}MB が制限を超えました。終了します...`);
    process.exit(1); // クラスターにワーカーを再起動させる
  }
}

// 30秒ごとにメモリを確認
setInterval(checkMemory, 30000);

17.2 サンダーリング・ハード問題 (Thundering Herd Problem)

問題: 再起動後にすべてのワーカーが一斉に接続を受け付けようとする負荷の集中。
解決策: 起動タイミングをずらす(Staggered startup)を実装する。

// マスタープロセス内
if (cluster.isMaster) {
  const numWorkers = require('os').cpus().length;

  function forkWorker(delay) {
    setTimeout(() => {
      const worker = cluster.fork();
      console.log(`ワーカー ${worker.process.pid} が ${delay}ms の遅延の後に起動しました`);
    }, delay);
  }

  // ワーカーの起動を1秒ずつずらす
  for (let i = 0; i < numWorkers; i++) {
    forkWorker(i * 1000);
  }
}

17.3 ワーカーの飢餓状態 (Worker Starvation)

問題: 一部のワーカーに負荷が偏り、他のワーカーが活用されない。
解決策: 適切な負荷分散アルゴリズムの実装と監視。

// リクエスト分散を追跡
const requestDistribution = new Map();

// マスタープロセス内
if (cluster.isMaster) {
  // ...

  // リクエスト分散を監視
  setInterval(() => {
    console.log('リクエスト分散状況:');
    requestDistribution.forEach((count, pid) => {
      console.log(` ワーカー ${pid}: ${count} リクエスト`);
    });
  }, 60000);

  // ワーカーごとのリクエストを追跡
  cluster.on('message', (worker, message) => {
    if (message.type === 'request_handled') {
      const count = requestDistribution.get(worker.process.pid) || 0;
      requestDistribution.set(worker.process.pid, count + 1);
    }
  });
}

18. まとめ

Node.js Cluster モジュールは、アプリケーションを複数の CPU コアにスケールさせるための効率的な方法を提供します:

  • 複数のワーカープロセスを管理するマスタープロセスを作成します。
  • ワーカーは同じサーバーポートを共有し、負荷分散を可能にします。
  • アプリケーションのパフォーマンスとレジリエンス(回復力)を向上させます。
  • ゼロダウンタイム・リスタートやグレースフル・シャットダウンを実現します。
  • マスターとワーカー間の通信には IPC を使用します。

クラスタリングを理解し適切に実装することで、利用可能なすべての CPU リソースを効率的に活用する、高性能で信頼性の高い Node.js アプリケーションを構築することができます。