Node.js Worker Threads Module
What are Worker Threads?
Worker Threads are a feature introduced in Node.js (initially in v10.5.0 as an experimental feature and stabilized in v12) that allows JavaScript code to run in parallel across multiple CPU cores.
Unlike the child_process
or cluster
modules, which create separate Node.js processes, Worker Threads can share memory and run true parallel JavaScript code.
The Node.js Worker Threads module addresses the limitations of Node.js's single-threaded nature for CPU-intensive tasks.
While Node.js excels at I/O-bound operations thanks to its asynchronous event loop, it can struggle with CPU-bound tasks that can block the main thread and affect application performance.
Note: Worker Threads are different from Web Workers in browsers, although they share similar concepts. Node.js Worker Threads are specifically designed for the Node.js runtime environment.
When to Use Worker Threads
Worker Threads are most useful for:
- CPU-intensive operations (large calculations, data processing)
- Parallel processing of data
- Operations that would otherwise block the main thread
They are not necessary for:
- I/O-bound operations (file system, network)
- Operations that already use asynchronous APIs
- Simple tasks that complete quickly
Importing the Worker Threads Module
The Worker Threads module is included in Node.js by default. You can use it by requiring it in your script:
const {
Worker,
isMainThread,
parentPort,
workerData
} = require('worker_threads');
Key Components
Component | Description |
---|---|
Worker |
Class for creating new worker threads |
isMainThread |
Boolean that is true if the code is running in the main thread, false if it's running in a worker |
parentPort |
If this thread is a worker, this is a MessagePort allowing communication with the parent thread |
workerData |
Data passed when creating the worker thread |
MessageChannel |
Creates a communication channel (pair of connected MessagePort objects) |
MessagePort |
Interface for sending messages between threads |
threadId |
Unique identifier for the current thread |
Creating Your First Worker Thread
Let's create a simple example where the main thread creates a worker to perform a CPU-intensive task:
// main.js
const { Worker } = require('worker_threads');
// Function to create a new worker
function runWorker(workerData) {
return new Promise((resolve, reject) => {
// Create a new worker
const worker = new Worker('./worker.js', { workerData });
// Listen for messages from the worker
worker.on('message', resolve);
// Listen for errors
worker.on('error', reject);
// Listen for worker exit
worker.on('exit', (code) => {
if (code !== 0) {
reject(new Error(`Worker stopped with exit code ${code}`));
}
});
});
}
// Run the worker
async function run() {
try {
// Send data to the worker and get the result
const result = await runWorker('Hello from main thread!');
console.log('Worker result:', result);
} catch (err) {
console.error('Worker error:', err);
}
}
run().catch(err => console.error(err));
// worker.js
const { parentPort, workerData } = require('worker_threads');
// Receive message from the main thread
console.log('Worker received:', workerData);
// Simulate CPU-intensive task
function performCPUIntensiveTask() {
// Simple example: Sum up to a large number
let result = 0;
for (let i = 0; i < 1_000_000; i++) {
result += i;
}
return result;
}
// Perform the task
const result = performCPUIntensiveTask();
// Send the result back to the main thread
parentPort.postMessage({
receivedData: workerData,
calculatedSum: result
});
In this example:
- The main thread creates a worker with some initial data
- The worker performs a CPU-intensive calculation
- The worker sends the result back to the main thread
- The main thread receives and processes the result
Key Concepts in the Example
- The
Worker
constructor takes the path to the worker script and an options object - The
workerData
option is used to pass initial data to the worker - The worker communicates back to the main thread using
parentPort.postMessage()
- Event handlers (
message
,error
,exit
) are used to manage the worker lifecycle
Communication Between Threads
Worker threads communicate by passing messages.
The communication is bidirectional, meaning both the main thread and workers can send and receive messages.
Main Thread to Worker
// main.js
const { Worker } = require('worker_threads');
// Create a worker
const worker = new Worker('./message_worker.js');
// Send messages to the worker
worker.postMessage('Hello worker!');
worker.postMessage({ type: 'task', data: [1, 2, 3, 4, 5] });
// Receive messages from the worker
worker.on('message', (message) => {
console.log('Main thread received:', message);
});
// Handle worker completion
worker.on('exit', (code) => {
console.log(`Worker exited with code ${code}`);
});
// message_worker.js
const { parentPort } = require('worker_threads');
// Receive messages from the main thread
parentPort.on('message', (message) => {
console.log('Worker received:', message);
// Process different message types
if (typeof message === 'object' && message.type === 'task') {
const result = processTask(message.data);
parentPort.postMessage({ type: 'result', data: result });
} else {
// Echo the message back
parentPort.postMessage(`Worker echoing: ${message}`);
}
});
// Example task processor
function processTask(data) {
if (Array.isArray(data)) {
return data.map(x => x * 2);
}
return null;
}
Note: Messages passed between threads are copied by value (serialized), not shared by reference.
This means that when you send an object from one thread to another, changes to the object in one thread will not affect the copy in the other thread.
CPU-Intensive Task Example
Here's a more practical example that demonstrates the advantage of using worker threads for CPU-intensive tasks:
// fibonacci.js
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads');
// Recursive Fibonacci function (deliberately inefficient to simulate CPU load)
function fibonacci(n) {
if (n <= 1) return n;
return fibonacci(n - 1) + fibonacci(n - 2);
}
if (isMainThread) {
// This code runs in the main thread
// Function to run a worker
function runFibonacciWorker(n) {
return new Promise((resolve, reject) => {
const worker = new Worker(__filename, { workerData: n });
worker.on('message', resolve);
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0) {
reject(new Error(`Worker stopped with exit code ${code}`));
}
});
});
}
// Measure execution time with and without workers
async function run() {
const numbers = [40, 41, 42, 43];
// Using a single thread (blocking)
console.time('Single thread');
for (const n of numbers) {
console.log(`Fibonacci(${n}) = ${fibonacci(n)}`);
}
console.timeEnd('Single thread');
// Using worker threads (parallel)
console.time('Worker threads');
const results = await Promise.all(
numbers.map(n => runFibonacciWorker(n))
);
for (let i = 0; i < numbers.length; i++) {
console.log(`Fibonacci(${numbers[i]}) = ${results[i]}`);
}
console.timeEnd('Worker threads');
}
run().catch(err => console.error(err));
} else {
// This code runs in worker threads
// Calculate Fibonacci number
const result = fibonacci(workerData);
// Send the result back to the main thread
parentPort.postMessage(result);
}
This example calculates Fibonacci numbers using both a single-threaded approach and a multi-threaded approach with worker threads.
On a multi-core CPU, the worker threads version should be significantly faster because it can utilize multiple CPU cores to calculate the Fibonacci numbers in parallel.
Warning: While worker threads can significantly improve performance for CPU-bound tasks, they do come with overhead for creation and communication. For very small tasks, this overhead might outweigh the benefits.
Sharing Data with Worker Threads
There are several ways to share data between threads:
- Passing copies: The default behavior when using
postMessage()
- Transferring ownership: Using the
transferList
parameter ofpostMessage()
- Sharing memory: Using
SharedArrayBuffer
Transferring ArrayBuffers
When you transfer an ArrayBuffer, you're transferring ownership of the buffer from one thread to another, without copying the data. This is more efficient for large data:
// transfer_main.js
const { Worker } = require('worker_threads');
// Create a large buffer
const buffer = new ArrayBuffer(100 * 1024 * 1024); // 100MB
const view = new Uint8Array(buffer);
// Fill with data
for (let i = 0; i < view.length; i++) {
view[i] = i % 256;
}
console.log('Buffer created in main thread');
console.log('Buffer byteLength before transfer:', buffer.byteLength);
// Create a worker and transfer the buffer
const worker = new Worker('./transfer_worker.js');
worker.on('message', (message) => {
console.log('Message from worker:', message);
// After transfer, the buffer is no longer usable in main thread
console.log('Buffer byteLength after transfer:', buffer.byteLength);
});
// Transfer ownership of the buffer to the worker
worker.postMessage({ buffer }, [buffer]);
// transfer_worker.js
const { parentPort } = require('worker_threads');
parentPort.on('message', ({ buffer }) => {
const view = new Uint8Array(buffer);
// Calculate sum to verify data
let sum = 0;
for (let i = 0; i < view.length; i++) {
sum += view[i];
}
console.log('Buffer received in worker');
console.log('Buffer byteLength in worker:', buffer.byteLength);
console.log('Sum of all values:', sum);
// Send confirmation back
parentPort.postMessage('Buffer processed successfully');
});
Note: After transferring an ArrayBuffer, the original buffer becomes unusable (its byteLength becomes 0).
The receiving thread gains full access to the buffer.
Sharing Memory with SharedArrayBuffer
For scenarios where you need to share data between threads without copying or transferring, the SharedArrayBuffer
provides a way to access the same memory from multiple threads.
Warning: SharedArrayBuffer
may be disabled in some Node.js versions due to security considerations related to Spectre vulnerabilities. Check your Node.js version documentation for details on how to enable it if needed.
// shared_main.js
const { Worker } = require('worker_threads');
// Create a shared buffer
const sharedBuffer = new SharedArrayBuffer(4 * 10); // 10 Int32 values
const sharedArray = new Int32Array(sharedBuffer);
// Initialize the shared array
for (let i = 0; i < sharedArray.length; i++) {
sharedArray[i] = i;
}
console.log('Initial shared array in main thread:', [...sharedArray]);
// Create a worker that will update the shared memory
const worker = new Worker('./shared_worker.js', {
workerData: { sharedBuffer }
});
worker.on('message', (message) => {
console.log('Message from worker:', message);
console.log('Updated shared array in main thread:', [...sharedArray]);
// The changes made in the worker are visible here
// because we're accessing the same memory
});
// shared_worker.js
const { parentPort, workerData } = require('worker_threads');
const { sharedBuffer } = workerData;
// Create a new view on the shared buffer
const sharedArray = new Int32Array(sharedBuffer);
console.log('Initial shared array in worker:', [...sharedArray]);
// Modify the shared memory
for (let i = 0; i < sharedArray.length; i++) {
// Double each value
sharedArray[i] = sharedArray[i] * 2;
}
console.log('Updated shared array in worker:', [...sharedArray]);
// Notify the main thread
parentPort.postMessage('Shared memory updated');
Synchronizing Access with Atomics
When multiple threads access shared memory, you need a way to synchronize access to prevent race conditions.
The Atomics
object provides methods for atomic operations on shared memory arrays.
// atomics_main.js
const { Worker } = require('worker_threads');
// Create a shared buffer with control flags and data
const sharedBuffer = new SharedArrayBuffer(4 * 10);
const sharedArray = new Int32Array(sharedBuffer);
// Initialize values
sharedArray[0] = 0; // Control flag: 0 = main thread's turn, 1 = worker's turn
sharedArray[1] = 0; // Data value to increment
// Create workers
const workerCount = 4;
const workerIterations = 10;
const workers = [];
console.log(`Creating ${workerCount} workers with ${workerIterations} iterations each`);
for (let i = 0; i < workerCount; i++) {
const worker = new Worker('./atomics_worker.js', {
workerData: { sharedBuffer, id: i, iterations: workerIterations }
});
workers.push(worker);
worker.on('exit', () => {
console.log(`Worker ${i} exited`);
// If all workers have exited, show final value
if (workers.every(w => w.threadId === -1)) {
console.log(`Final value: ${sharedArray[1]}`);
console.log(`Expected value: ${workerCount * workerIterations}`);
}
});
}
// Signal to the first worker to start
Atomics.store(sharedArray, 0, 1);
Atomics.notify(sharedArray, 0);
// atomics_worker.js
const { parentPort, workerData } = require('worker_threads');
const { sharedBuffer, id, iterations } = workerData;
// Create a typed array from the shared memory
const sharedArray = new Int32Array(sharedBuffer);
for (let i = 0; i < iterations; i++) {
// Wait for this worker's turn
while (Atomics.load(sharedArray, 0) !== id + 1) {
// Wait for notification
Atomics.wait(sharedArray, 0, Atomics.load(sharedArray, 0));
}
// Increment the shared counter
const currentValue = Atomics.add(sharedArray, 1, 1);
console.log(`Worker ${id} incremented counter to ${currentValue + 1}`);
// Signal to the next worker
const nextWorkerId = (id + 1) % (iterations === 0 ? 1 : iterations);
Atomics.store(sharedArray, 0, nextWorkerId + 1);
Atomics.notify(sharedArray, 0);
}
// Exit the worker
parentPort.close();
Note: The Atomics
object provides methods like load
, store
, add
, wait
, and notify
for synchronizing access to shared memory and implementing coordination patterns between threads.
Creating a Worker Pool
For most applications, you'll want to create a pool of workers to handle multiple tasks concurrently.
Here's an implementation of a simple worker pool:
// worker_pool.js
const { Worker } = require('worker_threads');
const os = require('os');
const path = require('path');
class WorkerPool {
constructor(workerScript, numWorkers = os.cpus().length) {
this.workerScript = workerScript;
this.numWorkers = numWorkers;
this.workers = [];
this.freeWorkers = [];
this.tasks = [];
// Initialize workers
this._initialize();
}
_initialize() {
// Create all workers
for (let i = 0; i < this.numWorkers; i++) {
this._createWorker();
}
}
_createWorker() {
const worker = new Worker(this.workerScript);
worker.on('message', (result) => {
// Get the current task
const { resolve } = this.tasks.shift();
// Resolve the task with the result
resolve(result);
// Add this worker back to the free workers pool
this.freeWorkers.push(worker);
// Process the next task if any
this._processQueue();
});
worker.on('error', (err) => {
// If a worker errors, terminate it and create a new one
console.error(`Worker error: ${err}`);
this._removeWorker(worker);
this._createWorker();
// Process the next task
if (this.tasks.length > 0) {
const { reject } = this.tasks.shift();
reject(err);
this._processQueue();
}
});
worker.on('exit', (code) => {
if (code !== 0) {
console.error(`Worker exited with code ${code}`);
this._removeWorker(worker);
this._createWorker();
}
});
// Add to free workers
this.workers.push(worker);
this.freeWorkers.push(worker);
}
_removeWorker(worker) {
// Remove from the workers arrays
this.workers = this.workers.filter(w => w !== worker);
this.freeWorkers = this.freeWorkers.filter(w => w !== worker);
}
_processQueue() {
// If there are tasks and free workers, process the next task
if (this.tasks.length > 0 && this.freeWorkers.length > 0) {
const { taskData } = this.tasks[0];
const worker = this.freeWorkers.pop();
worker.postMessage(taskData);
}
}
// Run a task on a worker
runTask(taskData) {
return new Promise((resolve, reject) => {
const task = { taskData, resolve, reject };
this.tasks.push(task);
this._processQueue();
});
}
// Close all workers when done
close() {
for (const worker of this.workers) {
worker.terminate();
}
}
}
module.exports = WorkerPool;
Using the worker pool:
// pool_usage.js
const WorkerPool = require('./worker_pool');
const path = require('path');
// Create a worker pool with the worker script
const pool = new WorkerPool(path.resolve(__dirname, 'pool_worker.js'));
// Function to run tasks on the pool
async function runTasks() {
const tasks = [
{ type: 'fibonacci', data: 40 },
{ type: 'factorial', data: 15 },
{ type: 'prime', data: 10000000 },
{ type: 'fibonacci', data: 41 },
{ type: 'factorial', data: 16 },
{ type: 'prime', data: 20000000 },
{ type: 'fibonacci', data: 42 },
{ type: 'factorial', data: 17 },
];
console.time('All tasks');
try {
// Run all tasks in parallel
const results = await Promise.all(
tasks.map(task => {
console.time(`Task: ${task.type}(${task.data})`);
return pool.runTask(task)
.then(result => {
console.timeEnd(`Task: ${task.type}(${task.data})`);
return result;
});
})
);
// Log results
for (let i = 0; i < tasks.length; i++) {
console.log(`${tasks[i].type}(${tasks[i].data}) = ${results[i].result}`);
}
} catch (err) {
console.error('Error running tasks:', err);
} finally {
console.timeEnd('All tasks');
pool.close();
}
}
runTasks().catch(console.error);
// pool_worker.js
const { parentPort } = require('worker_threads');
// Fibonacci function
function fibonacci(n) {
if (n <= 1) return n;
return fibonacci(n - 1) + fibonacci(n - 2);
}
// Factorial function
function factorial(n) {
if (n <= 1) return 1;
return n * factorial(n - 1);
}
// Prime count function
function countPrimes(max) {
const sieve = new Uint8Array(max);
let count = 0;
for (let i = 2; i < max; i++) {
if (!sieve[i]) {
count++;
for (let j = i * 2; j < max; j += i) {
sieve[j] = 1;
}
}
}
return count;
}
// Handle messages from the main thread
parentPort.on('message', (task) => {
const { type, data } = task;
let result;
// Perform different calculations based on task type
switch (type) {
case 'fibonacci':
result = fibonacci(data);
break;
case 'factorial':
result = factorial(data);
break;
case 'prime':
result = countPrimes(data);
break;
default:
throw new Error(`Unknown task type: ${type}`);
}
// Send the result back
parentPort.postMessage({ result });
});
Note: This worker pool implementation handles task scheduling, worker errors, and automatic worker replacement.
It's a good starting point for real-world applications but can be expanded with features like worker timeouts and prioritized tasks.
Practical Application: Image Processing
Image processing is a perfect use case for worker threads as it's both CPU-intensive and easily parallelizable.
Here's an example of parallel image processing:
// image_main.js
const { Worker } = require('worker_threads');
const path = require('path');
const fs = require('fs');
// Function to process an image in a worker
function processImageInWorker(imagePath, options) {
return new Promise((resolve, reject) => {
const worker = new Worker('./image_worker.js', {
workerData: {
imagePath,
options
}
});
worker.on('message', resolve);
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0) {
reject(new Error(`Worker stopped with exit code ${code}`));
}
});
});
}
// Main function to process multiple images in parallel
async function processImages() {
const images = [
{ path: 'image1.jpg', options: { grayscale: true } },
{ path: 'image2.jpg', options: { blur: 5 } },
{ path: 'image3.jpg', options: { sharpen: 10 } },
{ path: 'image4.jpg', options: { resize: { width: 800, height: 600 } } }
];
console.time('Image processing');
try {
// Process all images in parallel
const results = await Promise.all(
images.map(img => processImageInWorker(img.path, img.options))
);
console.log('All images processed successfully');
console.log('Results:', results);
} catch (err) {
console.error('Error processing images:', err);
}
console.timeEnd('Image processing');
}
// Note: This is a conceptual example.
// In a real application, you would use an image processing library like sharp or jimp
// and provide actual image files.
// processImages().catch(console.error);
console.log('Image processing example (not actually running)');
// image_worker.js
const { parentPort, workerData } = require('worker_threads');
const { imagePath, options } = workerData;
// In a real application, you would import an image processing library here
// const sharp = require('sharp');
// Simulate image processing
function processImage(imagePath, options) {
console.log(`Processing image: ${imagePath} with options:`, options);
// Simulate processing time based on options
let processingTime = 500; // Base time in ms
if (options.grayscale) processingTime += 200;
if (options.blur) processingTime += options.blur * 50;
if (options.sharpen) processingTime += options.sharpen * 30;
if (options.resize) processingTime += 300;
// Simulate the actual processing
return new Promise(resolve => {
setTimeout(() => {
// Return simulated result
resolve({
imagePath,
outputPath: `processed_${imagePath}`,
processing: options,
dimensions: options.resize || { width: 1024, height: 768 },
size: Math.floor(Math.random() * 1000000) + 500000 // Random file size
});
}, processingTime);
});
}
// Process the image and send the result back
processImage(imagePath, options)
.then(result => {
parentPort.postMessage(result);
})
.catch(err => {
throw err;
});
Worker Threads vs. Child Process and Cluster
It's important to understand when to use Worker Threads versus other Node.js concurrency mechanisms:
Feature | Worker Threads | Child Process | Cluster |
---|---|---|---|
Shared Memory | Yes (via SharedArrayBuffer) | No (IPC only) | No (IPC only) |
Resource Usage | Lower (shared V8 instance) | Higher (separate processes) | Higher (separate processes) |
Startup Time | Faster | Slower | Slower |
Isolation | Lower (shares event loop) | Higher (full process isolation) | Higher (full process isolation) |
Failure Impact | Can affect parent thread | Limited to child process | Limited to worker process |
Best For | CPU-intensive tasks | Running different programs | Scaling applications |
When to Use Worker Threads
- CPU-bound tasks like number crunching, image processing, or compression
- When shared memory is needed for better performance
- When you need to run parallel JavaScript code within a single Node.js instance
When to Use Child Process
- Running external programs or commands
- Executing tasks in different languages
- When you need stronger isolation between the main process and the spawned processes
When to Use Cluster
- Scaling an HTTP server across multiple cores
- Load balancing incoming connections
- Improving application resilience and uptime
Best Practices
- Don't overuse threads: Only use worker threads for CPU-intensive tasks that would otherwise block the main thread.
- Consider the overhead: Creating threads has overhead. For very short tasks, this overhead might outweigh the benefits.
- Use a worker pool: Reuse workers for multiple tasks instead of creating and destroying them for each task.
- Minimize data transfer: Transfer ownership with ArrayBuffer or use SharedArrayBuffer when working with large amounts of data.
- Handle errors properly: Always catch errors from workers and have a strategy for worker failures.
- Monitor worker lifecycles: Keep track of worker health and restart them if they crash.
- Use appropriate synchronization: Use Atomics for coordinating access to shared memory.
- Benchmark your solution: Always measure the performance improvement to ensure threads are actually helping.
Warning: Threading adds complexity to your code. Only use worker threads when you have a genuine need for parallel execution. For I/O-bound operations, Node.js's built-in asynchronous APIs are usually more efficient.
Summary
The Worker Threads module provides true multithreading capabilities in Node.js, enabling CPU-intensive tasks to run in parallel without blocking the main event loop.
In this tutorial, we've covered:
- Sharing data between threads using
SharedArrayBuffer
- Synchronizing thread access with
Atomics
- Creating a reusable worker pool for efficient task management
- Practical applications like parallel image processing
- Comparison with other Node.js concurrency models
- Best practices for using worker threads effectively