NodeJS 速習チュートリアル

Node.js Worker Threads モジュール

1. Worker Threads(ワーカー・スレッド)とは?

Worker Threads は、Node.js(v10.5.0 で試験的導入、v12 で安定版)で導入された機能で、JavaScript コードを複数の CPU コアで並列(Parallel)に実行できるようにするものです。

別々の Node.js プロセスを作成する child_processcluster モジュールとは異なり、Worker Threads はメモリを共有し、真の並列 JavaScript 実行を可能にします。

Node.js は非同期 I/O 操作には非常に優れていますが、シングルスレッドという性質上、重い計算などの CPU インテンシブなタスクが発生するとメインスレッドをブロックし、アプリケーション全体のパフォーマンスを低下させてしまうという弱点がありました。Worker Threads はこの課題を解決するために設計されています。

       注意: Worker Threads はブラウザの Web Workers と概念は似ていますが、Node.js 実行環境専用に最適化されています。

1.1 Worker Threads を使用すべきタイミング

Worker Threads は以下のようなケースで真価を発揮します:

  • CPU インテンシブな操作(大規模な計算、データ処理)
  • データの並列処理
  • メインスレッドをブロックしてしまう可能性のある重い操作

逆に、以下のようなケースでは不要です:

  • I/O バウンドな操作(ファイルシステム、ネットワーク)
  • すでに非同期 API が提供されている操作
  • すぐに完了する単純なタスク

2. Worker Threads モジュールのインポート

Worker Threads モジュールは Node.js に標準搭載されています。スクリプト内で以下のように require して使用します。

const {
  Worker,
  isMainThread,
  parentPort,
  workerData
} = require('worker_threads');

2.1 主要なコンポーネント

コンポーネント説明
Worker新しいワーカー・スレッドを作成するためのクラス
isMainThreadメインスレッドで実行中なら true、ワーカー内なら false を返す真偽値
parentPortワーカー側で使用し、親スレッドとの通信を可能にする MessagePort
workerDataワーカー作成時に渡される初期データ
MessageChannel通信チャネル(接続された MessagePort のペア)を作成
MessagePortスレッド間でメッセージを送信するためのインターフェース
threadId現在のスレッドの一意の識別子

3. 初めての Worker Thread 作成

メインスレッドからワーカーを作成し、CPU 負荷の高いタスクを実行させる簡単な例を見てみましょう。

// main.js
const { Worker } = require('worker_threads');

// ワーカーを実行する関数
function runWorker(workerData) {
  return new Promise((resolve, reject) => {
    // 新しいワーカーを作成
    const worker = new Worker('./worker.js', { workerData });
    
    // ワーカーからのメッセージをリッスン
    worker.on('message', resolve);
    
    // エラーのハンドリング
    worker.on('error', reject);
    
    // ワーカーの終了を監視
    worker.on('exit', (code) => {
      if (code !== 0) {
        reject(new Error(`ワーカーが終了コード ${code} で停止しました`));
      }
    });
  });
}

// 実行用関数
async function run() {
  try {
    // ワーカーにデータを送り、結果を取得
    const result = await runWorker('メインスレッドからの挨拶!');
    console.log('ワーカーの結果:', result);
  } catch (err) {
    console.error('ワーカーエラー:', err);
  }
}

run().catch(err => console.error(err));
// worker.js
const { parentPort, workerData } = require('worker_threads');

// メインスレッドから受信したデータの表示
console.log('ワーカーが受信:', workerData);

// CPU 負荷の高いタスクをシミュレート
function performCPUIntensiveTask() {
  // 例:大きな数値まで合計を計算
  let result = 0;
  for (let i = 0; i < 1_000_000; i++) {
    result += i;
  }
  return result;
}

// タスクの実行
const result = performCPUIntensiveTask();

// 結果をメインスレッドに返送
parentPort.postMessage({
  receivedData: workerData,
  calculatedSum: result
});

3.1 サンプルの主要コンセプト

  • Worker コンストラクタには、ワーカー・スクリプトのパスとオプションオブジェクトを渡します。
  • workerData オプションを使用して、初期データをワーカーに渡せます。
  • ワーカーは parentPort.postMessage() を使ってメインスレッドにデータを送り返します。
  • イベントハンドラ(messageerrorexit)を使用してワーカーのライフサイクルを管理します。

4. スレッド間の通信

Worker Threads はメッセージパッシングによって通信します。通信は双方向であり、メインスレッドとワーカーの双方がメッセージの送受信を行えます。

4.1 メインスレッドからワーカーへ

// main.js
const { Worker } = require('worker_threads');

// ワーカーの作成
const worker = new Worker('./message_worker.js');

// ワーカーにメッセージを送信
worker.postMessage('ハロー、ワーカー!');
worker.postMessage({ type: 'task', data: [1, 2, 3, 4, 5] });

// ワーカーからのメッセージを受信
worker.on('message', (message) => {
  console.log('メインスレッドが受信:', message);
});

// ワーカーの完了をハンドリング
worker.on('exit', (code) => {
  console.log(`ワーカーが終了コード ${code} で終了しました`);
});
// message_worker.js
const { parentPort } = require('worker_threads');

// メインスレッドからのメッセージを受信
parentPort.on('message', (message) => {
  console.log('ワーカーが受信:', message);
  
  // メッセージタイプに応じた処理
  if (typeof message === 'object' && message.type === 'task') {
    const result = processTask(message.data);
    parentPort.postMessage({ type: 'result', data: result });
  } else {
    // メッセージをそのままエコー(返送)
    parentPort.postMessage(`ワーカーからのエコー: ${message}`);
  }
});

// タスク処理の例
function processTask(data) {
  if (Array.isArray(data)) {
    return data.map(x => x * 2);
  }
  return null;
}

       注意: スレッド間で渡されるメッセージは、参照共有ではなく、値のコピー(シリアライズ)です。一方のスレッドでオブジェクトを変更しても、もう一方のコピーには影響しません。

5. CPU 負荷の高いタスクの例

実用的な例として、フィボナッチ数列の計算をシングルスレッドとマルチスレッドで比較してみましょう。

// fibonacci.js
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');

// 再帰的なフィボナッチ関数(意図的に CPU 負荷を高めるための非効率な実装)
function fibonacci(n) {
  if (n <= 1) return n;
  return fibonacci(n - 1) + fibonacci(n - 2);
}

if (isMainThread) {
  // メインスレッドで実行されるコード
  
  function runFibonacciWorker(n) {
    return new Promise((resolve, reject) => {
      const worker = new Worker(__filename, { workerData: n });
      worker.on('message', resolve);
      worker.on('error', reject);
      worker.on('exit', (code) => {
        if (code !== 0) {
          reject(new Error(`ワーカーが終了コード ${code} で停止しました`));
        }
      });
    });
  }
  
  async function run() {
    const numbers = [40, 41, 42, 43];
    
    // シングルスレッド(ブロッキング)
    console.time('Single thread');
    for (const n of numbers) {
      console.log(`Fibonacci(${n}) = ${fibonacci(n)}`);
    }
    console.timeEnd('Single thread');
    
    // Worker Threads(並列処理)
    console.time('Worker threads');
    const results = await Promise.all(
      numbers.map(n => runFibonacciWorker(n))
    );
    for (let i = 0; i < numbers.length; i++) {
      console.log(`Fibonacci(${numbers[i]}) = ${results[i]}`);
    }
    console.timeEnd('Worker threads');
  }
  
  run().catch(err => console.error(err));
} else {
  // ワーカー・スレッドで実行されるコード
  const result = fibonacci(workerData);
  parentPort.postMessage(result);
}

マルチコア CPU においては、Worker Threads 版の方が圧倒的に速くなるはずです。複数の CPU コアを活用して計算を並列実行できるからです。

       警告: パフォーマンスは向上しますが、スレッドの作成と通信にはオーバーヘッドがあります。非常に小さなタスクの場合、そのオーバーヘッドがメリットを上回ってしまう可能性があります。

6. Worker Threads とのデータ共有

スレッド間でデータを共有する方法はいくつかあります:

  1. コピーの受け渡し: postMessage() のデフォルト挙動。
  2. 所有権の転送: postMessage()transferList パラメータを使用。
  3. メモリの共有: SharedArrayBuffer を使用。

6.1 ArrayBuffer の転送

ArrayBuffer を転送すると、データのコピーを行わずに所有権のみを移動させます。大容量データの扱いに非常に効率的です。

// transfer_main.js
const { Worker } = require('worker_threads');

// 巨大なバッファを作成 (100MB)
const buffer = new ArrayBuffer(100 * 1024 * 1024);
const view = new Uint8Array(buffer);

// データの充填
for (let i = 0; i < view.length; i++) {
  view[i] = i % 256;
}

console.log('メインスレッドでバッファ作成完了');
console.log('転送前のバッファ byteLength:', buffer.byteLength);

const worker = new Worker('./transfer_worker.js');
worker.on('message', (message) => {
  console.log('ワーカーからのメッセージ:', message);
  // 転送後、メインスレッドではバッファが使用不可になる
  console.log('転送後のバッファ byteLength:', buffer.byteLength);
});

// バッファの所有権をワーカーに転送
worker.postMessage({ buffer }, [buffer]);
// transfer_worker.js
const { parentPort } = require('worker_threads');

parentPort.on('message', ({ buffer }) => {
  const view = new Uint8Array(buffer);
  
  let sum = 0;
  for (let i = 0; i < view.length; i++) {
    sum += view[i];
  }
  
  console.log('ワーカーでバッファを受信');
  console.log('ワーカー内のバッファ byteLength:', buffer.byteLength);
  console.log('全値の合計:', sum);
  
  parentPort.postMessage('バッファの処理が正常に完了しました');
});

       注意: ArrayBuffer を転送した後、元のスレッドのバッファは使用不能になり、byteLength は 0 になります。

6.2 SharedArrayBuffer によるメモリ共有

コピーも転送もせずに、複数のスレッドから同じメモリにアクセスしたい場合は、SharedArrayBuffer を使用します。

// shared_main.js
const { Worker } = require('worker_threads');

// 共有バッファを作成(Int32 を 10 個分)
const sharedBuffer = new SharedArrayBuffer(4 * 10);
const sharedArray = new Int32Array(sharedBuffer);

for (let i = 0; i < sharedArray.length; i++) {
  sharedArray[i] = i;
}

console.log('メインスレッドの初期状態:', [...sharedArray]);

const worker = new Worker('./shared_worker.js', {
  workerData: { sharedBuffer }
});

worker.on('message', (message) => {
  console.log('ワーカーからのメッセージ:', message);
  console.log('メインスレッドで更新を確認:', [...sharedArray]);
  // 同じメモリを参照しているため、ワーカー側の変更が即座に見える
});
// shared_worker.js
const { parentPort, workerData } = require('worker_threads');
const { sharedBuffer } = workerData;

const sharedArray = new Int32Array(sharedBuffer);
console.log('ワーカーでの初期状態:', [...sharedArray]);

// 共有メモリを書き換え
for (let i = 0; i < sharedArray.length; i++) {
  sharedArray[i] = sharedArray[i] * 2;
}

console.log('ワーカーで更新完了:', [...sharedArray]);
parentPort.postMessage('共有メモリを更新しました');

7. Atomics によるアクセス同期

複数のスレッドが同時に共有メモリへアクセスする場合、レースコンディション(競合状態)を防ぐ同期手段が必要です。Atomics オブジェクトは、共有メモリ操作をアトミック(不可分)に行うためのメソッドを提供します。

// atomics_main.js
const { Worker } = require('worker_threads');

const sharedBuffer = new SharedArrayBuffer(4 * 10);
const sharedArray = new Int32Array(sharedBuffer);

sharedArray[0] = 0; // フラグ: 0 = メインスレッドのターン, 1 = ワーカーのターン
sharedArray[1] = 0; // インクリメントするデータ

const workerCount = 4;
const workerIterations = 10;
const workers = [];

console.log(`${workerCount} 個のワーカーを作成し、各 ${workerIterations} 回実行します`);

for (let i = 0; i < workerCount; i++) {
  const worker = new Worker('./atomics_worker.js', {
    workerData: { sharedBuffer, id: i, iterations: workerIterations }
  });
  
  workers.push(worker);
  
  worker.on('exit', () => {
    console.log(`ワーカー ${i} が終了しました`);
    if (workers.every(w => w.threadId === -1)) {
      console.log(`最終結果: ${sharedArray[1]}`);
      console.log(`期待される結果: ${workerCount * workerIterations}`);
    }
  });
}

// 最初のワーカーに開始シグナルを送る
Atomics.store(sharedArray, 0, 1);
Atomics.notify(sharedArray, 0);
// atomics_worker.js
const { parentPort, workerData } = require('worker_threads');
const { sharedBuffer, id, iterations } = workerData;

const sharedArray = new Int32Array(sharedBuffer);

for (let i = 0; i < iterations; i++) {
  // 自分のターンが来るまで待機
  while (Atomics.load(sharedArray, 0) !== id + 1) {
    Atomics.wait(sharedArray, 0, Atomics.load(sharedArray, 0));
  }
  
  // カウンタをアトミックに加算
  const currentValue = Atomics.add(sharedArray, 1, 1);
  console.log(`ワーカー ${id}: カウンタを ${currentValue + 1} に増やしました`);
  
  // 次のワーカーへ通知
  const nextWorkerId = (id + 1) % (iterations === 0 ? 1 : iterations);
  Atomics.store(sharedArray, 0, nextWorkerId + 1);
  Atomics.notify(sharedArray, 0);
}

parentPort.close();

8. ワーカープールの作成

現実的なアプリケーションでは、タスクごとにスレッドを生成・破棄するのではなく、あらかじめ作成しておいたスレッドを再利用する「ワーカープール」を使用するのが一般的です。

// worker_pool.js
const { Worker } = require('worker_threads');
const os = require('os');
const path = require('path');

class WorkerPool {
  constructor(workerScript, numWorkers = os.cpus().length) {
    this.workerScript = workerScript;
    this.numWorkers = numWorkers;
    this.workers = [];
    this.freeWorkers = [];
    this.tasks = [];
    
    this._initialize();
  }
  
  _initialize() {
    for (let i = 0; i < this.numWorkers; i++) {
      this._createWorker();
    }
  }
  
  _createWorker() {
    const worker = new Worker(this.workerScript);
    
    worker.on('message', (result) => {
      const { resolve } = this.tasks.shift();
      resolve(result);
      this.freeWorkers.push(worker);
      this._processQueue();
    });
    
    worker.on('error', (err) => {
      console.error(`ワーカーエラー: ${err}`);
      this._removeWorker(worker);
      this._createWorker();
      if (this.tasks.length > 0) {
        const { reject } = this.tasks.shift();
        reject(err);
        this._processQueue();
      }
    });
    
    worker.on('exit', (code) => {
      if (code !== 0) {
        console.error(`ワーカーが終了コード ${code} で終了しました`);
        this._removeWorker(worker);
        this._createWorker();
      }
    });
    
    this.workers.push(worker);
    this.freeWorkers.push(worker);
  }
  
  _removeWorker(worker) {
    this.workers = this.workers.filter(w => w !== worker);
    this.freeWorkers = this.freeWorkers.filter(w => w !== worker);
  }
  
  _processQueue() {
    if (this.tasks.length > 0 && this.freeWorkers.length > 0) {
      const { taskData } = this.tasks[0];
      const worker = this.freeWorkers.pop();
      worker.postMessage(taskData);
    }
  }
  
  runTask(taskData) {
    return new Promise((resolve, reject) => {
      this.tasks.push({ taskData, resolve, reject });
      this._processQueue();
    });
  }
  
  close() {
    for (const worker of this.workers) {
      worker.terminate();
    }
  }
}

module.exports = WorkerPool;

9. 実践的な応用:画像処理

画像処理は CPU インテンシブであり、並列化が非常に容易なため、Worker Threads の最適なユースケースです。

// image_worker.js
const { parentPort, workerData } = require('worker_threads');
const { imagePath, options } = workerData;

// 本来は sharp や jimp などのライブラリを使用
function processImage(imagePath, options) {
  console.log(`${imagePath} を処理中...`);
  let processingTime = 500;
  
  if (options.grayscale) processingTime += 200;
  if (options.resize) processingTime += 300;
  
  return new Promise(resolve => {
    setTimeout(() => {
      resolve({
        imagePath,
        outputPath: `processed_${imagePath}`,
        dimensions: options.resize || { width: 1024, height: 768 }
      });
    }, processingTime);
  });
}

processImage(imagePath, options)
  .then(result => parentPort.postMessage(result))
  .catch(err => { throw err; });

10. Worker Threads vs. Child Process vs. Cluster

適切な並行処理メカニズムを選択することが重要です。

特徴Worker ThreadsChild ProcessCluster
メモリ共有可能 (SharedArrayBuffer)不可 (IPC のみ)不可 (IPC のみ)
リソース消費低 (V8 インスタンス共有)高 (独立プロセス)高 (独立プロセス)
起動速度速い遅い遅い
分離レベル低 (イベントループ共有)高 (完全分離)高 (完全分離)
主な用途CPU 負荷タスク外部プログラム実行HTTP サーバーのスケーリング

11. ベストプラクティス

  1. スレッドを使いすぎない: メインスレッドをブロックするような重い CPU タスクにのみ使用します。
  2. オーバーヘッドを考慮する: スレッド作成にはコストがかかります。非常に短いタスクには向きません。
  3. ワーカープールの活用: スレッドを使い回して、生成・破棄のコストを抑えます。
  4. データ転送を最小限に: ArrayBuffer の転送や SharedArrayBuffer を活用します。
  5. 適切な同期: 共有メモリにアクセスする際は Atomics を使用します。
  6. ベンチマークの実施: スレッド導入によって実際にパフォーマンスが向上したか常に計測してください。

12. まとめ

Worker Threads モジュールは Node.js に真のマルチスレッド機能をもたらし、メインのイベントループを止めることなく CPU インテンシブなタスクを並列実行することを可能にします。

このチュートリアルでは以下の内容を学びました:

  • Worker Threads の基本概念と使い方
  • SharedArrayBufferAtomics を使ったデータ共有と同期
  • パフォーマンスを最大化するためのワーカープールの構築
  • 実践的なユースケースと他の並行処理モデルとの比較

適切な設計により、すべての CPU リソースを効率的に活用した、高スループットで信頼性の高い Node.js アプリケーションを構築できます。