Concurrent Operations Management

Advanced techniques for handling multiple asynchronous operations efficiently

Introduction to Concurrent Asynchronous Operations

Modern web applications frequently need to manage multiple asynchronous operations simultaneously. From fetching data from various APIs to processing user interactions while background tasks run, handling concurrency correctly is crucial for building responsive, efficient applications.

Think of concurrent operations like managing a restaurant kitchen. You don't prepare one dish at a time from start to finish—instead, you have multiple dishes cooking simultaneously, with different completion times and dependencies. The chef (your code) needs to coordinate all these operations to ensure everything gets done efficiently and in the right order.

graph LR A[Concurrent Operations] --> B[Sequential] A --> C[Parallel] A --> D[Race] B --> B1[One after another] C --> C1[All at once] D --> D1[First to complete] A --> E[Limiting] E --> E1[Rate limiting] E --> E2[Pool size] E --> E3[Priority queues]

JavaScript's Concurrency Model

Before diving into concurrent operation patterns, it's important to understand how JavaScript handles concurrency.

Single-Threaded Execution with Event Loop

JavaScript is single-threaded, meaning it can only execute one piece of code at a time. However, through the event loop, it creates the illusion of concurrency by handling asynchronous operations.

flowchart TD A[Call Stack] --> B{Is stack empty?} B -->|Yes| C[Check Event Queue] C --> D{Events waiting?} D -->|Yes| E[Move callback to stack] E --> A D -->|No| B B -->|No| F[Continue execution] F --> A

Key points about JavaScript's concurrency:

The Evolution of Asynchronous Concurrency

JavaScript's concurrency patterns have evolved significantly over time:

Callback Era


// Sequential operations with callbacks
fetchUserData(userId, function(userData) {
  fetchUserPosts(userData.id, function(posts) {
    fetchPostComments(posts[0].id, function(comments) {
      // Deeply nested "callback hell"
      displayUserInfo(userData, posts, comments);
    });
  });
});

// Parallel operations with callbacks
let userData, userPosts, userComments;
let completed = 0;

function checkAllComplete() {
  if (completed === 3) {
    displayUserInfo(userData, userPosts, userComments);
  }
}

fetchUserData(userId, function(data) {
  userData = data;
  completed++;
  checkAllComplete();
});

fetchUserPosts(userId, function(posts) {
  userPosts = posts;
  completed++;
  checkAllComplete();
});

fetchUserComments(userId, function(comments) {
  userComments = comments;
  completed++;
  checkAllComplete();
});
          

Promise Era


// Sequential operations with promises
fetchUserData(userId)
  .then(userData => fetchUserPosts(userData.id))
  .then(posts => fetchPostComments(posts[0].id))
  .then(comments => {
    displayUserInfo(userData, posts, comments); // Problem: userData and posts aren't available!
  });

// Better sequential with promise chaining
fetchUserData(userId)
  .then(userData => {
    return fetchUserPosts(userData.id)
      .then(posts => {
        return fetchPostComments(posts[0].id)
          .then(comments => {
            return { userData, posts, comments };
          });
      });
  })
  .then(results => {
    displayUserInfo(results.userData, results.posts, results.comments);
  });

// Parallel operations with Promise.all
Promise.all([
  fetchUserData(userId),
  fetchUserPosts(userId),
  fetchUserComments(userId)
])
.then(([userData, posts, comments]) => {
  displayUserInfo(userData, posts, comments);
})
.catch(error => {
  console.error('One of the requests failed:', error);
});
          

Modern Async/Await Era


// Sequential operations with async/await
async function loadUserData(userId) {
  const userData = await fetchUserData(userId);
  const posts = await fetchUserPosts(userData.id);
  const comments = await fetchPostComments(posts[0].id);
  
  displayUserInfo(userData, posts, comments);
}

// Parallel operations with async/await
async function loadUserData(userId) {
  const [userData, posts, comments] = await Promise.all([
    fetchUserData(userId),
    fetchUserPosts(userId),
    fetchUserComments(userId)
  ]);
  
  displayUserInfo(userData, posts, comments);
}
          

Promise Combinators: Managing Multiple Promises

JavaScript provides several built-in ways to manage multiple promises, each serving different concurrency needs.

Promise.all - Parallel Execution

Use Promise.all when you need to execute multiple operations concurrently and need all of them to succeed.

Promise.all Example


// Use case: Loading a dashboard that needs multiple data sources
async function loadDashboard(userId) {
  try {
    const startTime = performance.now();
    
    const [
      accountInfo,
      recentTransactions,
      stockPrices,
      newsArticles,
      weatherData
    ] = await Promise.all([
      fetchAccountInfo(userId),
      fetchRecentTransactions(userId),
      fetchStockPrices(userId),
      fetchNewsArticles(),
      fetchWeatherData(userLocation)
    ]);
    
    const loadTime = performance.now() - startTime;
    console.log(`Dashboard loaded in ${loadTime}ms`);
    
    renderDashboard({
      accountInfo,
      recentTransactions,
      stockPrices,
      newsArticles,
      weatherData
    });
  } catch (error) {
    // If ANY promise fails, this catch block will execute
    console.error('Failed to load dashboard:', error);
    showErrorMessage('Some dashboard components failed to load. Please try again.');
  }
}
          

Key Point: Promise.all fails fast. If any promise rejects, the entire operation rejects immediately with that error, even if other promises are still pending.

Promise.allSettled - Complete Execution

Use Promise.allSettled when you want to execute multiple operations and get all their results, regardless of whether some operations fail.

Promise.allSettled Example


// Use case: Loading non-critical dashboard components where some can fail
async function loadDashboard(userId) {
  const startTime = performance.now();
  
  const results = await Promise.allSettled([
    fetchAccountInfo(userId),
    fetchRecentTransactions(userId),
    fetchStockPrices(userId),
    fetchNewsArticles(),
    fetchWeatherData(userLocation)
  ]);
  
  const loadTime = performance.now() - startTime;
  console.log(`Dashboard loaded in ${loadTime}ms`);
  
  // Process results based on their status
  const dashboard = {
    loadTime
  };
  
  const labels = ['accountInfo', 'recentTransactions', 'stockPrices', 'newsArticles', 'weatherData'];
  
  // Process each result
  results.forEach((result, index) => {
    const componentName = labels[index];
    
    if (result.status === 'fulfilled') {
      dashboard[componentName] = {
        status: 'loaded',
        data: result.value
      };
    } else {
      console.error(`Failed to load ${componentName}:`, result.reason);
      dashboard[componentName] = {
        status: 'error',
        error: result.reason.message
      };
    }
  });
  
  // Count failures
  const failedComponents = results.filter(r => r.status === 'rejected').length;
  if (failedComponents > 0) {
    showWarningMessage(`${failedComponents} dashboard components failed to load.`);
  }
  
  renderDashboard(dashboard);
}
          

Promise.race - First to Complete

Use Promise.race when you need the result of the first promise to settle (either resolve or reject).

Promise.race Example


// Use case: Fetching data with a timeout
function fetchWithTimeout(url, timeoutMs = 5000) {
  const fetchPromise = fetch(url).then(response => {
    if (!response.ok) {
      throw new Error(`HTTP error! Status: ${response.status}`);
    }
    return response.json();
  });
  
  const timeoutPromise = new Promise((_, reject) => {
    setTimeout(() => {
      reject(new Error(`Request to ${url} timed out after ${timeoutMs}ms`));
    }, timeoutMs);
  });
  
  return Promise.race([fetchPromise, timeoutPromise]);
}

// Use case: Getting data from the fastest available server
async function fetchFromFastestServer(resourcePath) {
  const servers = [
    'https://api-us-east.example.com',
    'https://api-us-west.example.com',
    'https://api-eu.example.com'
  ];
  
  // Create an array of fetch promises, one for each server
  const fetchPromises = servers.map(server => {
    return fetch(`${server}/${resourcePath}`)
      .then(response => {
        if (!response.ok) {
          throw new Error(`HTTP error! Status: ${response.status}`);
        }
        return response.json();
      })
      .then(data => {
        console.log(`Got response from ${server}`);
        return data;
      });
  });
  
  // Return data from the fastest server to respond
  return Promise.race(fetchPromises);
}
          

Important: With Promise.race, if the winning promise rejects, the entire operation rejects, even if other promises would have resolved successfully.

Promise.any - First to Succeed

Use Promise.any when you need the result of the first promise to successfully resolve, ignoring any that reject.

Promise.any Example


// Use case: Trying multiple servers until one succeeds
async function fetchWithFailover(resourcePath) {
  const servers = [
    'https://api-primary.example.com',
    'https://api-backup1.example.com',
    'https://api-backup2.example.com'
  ];
  
  // Create an array of fetch promises, one for each server
  const fetchPromises = servers.map(server => {
    return fetch(`${server}/${resourcePath}`)
      .then(response => {
        if (!response.ok) {
          throw new Error(`HTTP error from ${server}! Status: ${response.status}`);
        }
        console.log(`Successfully fetched from ${server}`);
        return response.json();
      });
  });
  
  try {
    // Return the first successful response
    return await Promise.any(fetchPromises);
  } catch (error) {
    // If all promises rejected, AggregateError is thrown with all the individual errors
    console.error('All servers failed:', error.errors);
    throw new Error('Unable to fetch data from any server.');
  }
}

// Use case: Finding a user by multiple identifiers
async function findUser(possibleIds) {
  try {
    // Try to find a user with any of these IDs
    const user = await Promise.any(
      possibleIds.map(id => findUserById(id))
    );
    return user;
  } catch (error) {
    throw new Error('User not found with any of the provided IDs');
  }
}
          

Sequential vs Parallel Execution

Understanding when to use sequential versus parallel execution is crucial for optimizing performance while maintaining correctness.

Sequential Execution

Execute operations one after another when:

Deliberate Sequential Processing


// Processing steps in order with intermediate processing
async function processUserUpload(fileId) {
  // Step 1: Download file
  const file = await downloadFile(fileId);
  console.log('File downloaded, size:', file.size);
  
  // Step 2: Validate file contents
  const validationResult = await validateFile(file);
  if (!validationResult.valid) {
    throw new Error(`Invalid file: ${validationResult.error}`);
  }
  console.log('File validated successfully');
  
  // Step 3: Transform data (depends on validation rules)
  const transformedData = await transformData(file, validationResult.rules);
  console.log('Data transformed, records:', transformedData.length);
  
  // Step 4: Save to database (depends on transformed data)
  const saveResult = await saveToDatabase(transformedData);
  console.log('Data saved to database, id:', saveResult.id);
  
  // Step 5: Generate report (depends on saved data with DB IDs)
  const report = await generateReport(saveResult.id);
  console.log('Report generated');
  
  return {
    status: 'success',
    recordsProcessed: transformedData.length,
    reportUrl: report.url
  };
}
          

Parallel Execution

Execute operations simultaneously when:

Optimized Mixed Sequential/Parallel Processing


// Using both sequential and parallel execution for optimal performance
async function loadUserProfile(userId) {
  // First, get the basic user data (sequential prerequisite)
  const user = await fetchUserData(userId);
  
  // Once we have the user, fetch multiple related data sources in parallel
  const [
    posts,
    followers,
    following,
    notifications
  ] = await Promise.all([
    fetchUserPosts(userId),
    fetchUserFollowers(userId),
    fetchUserFollowing(userId),
    fetchUserNotifications(userId)
  ]);
  
  // Process user's post activity in parallel
  const postStats = await Promise.all(
    posts.map(async post => {
      const [likes, comments, shares] = await Promise.all([
        fetchPostLikes(post.id),
        fetchPostComments(post.id),
        fetchPostShares(post.id)
      ]);
      
      return {
        postId: post.id,
        metrics: {
          likeCount: likes.length,
          commentCount: comments.length,
          shareCount: shares.length,
          engagement: (likes.length + comments.length * 2 + shares.length * 3) / user.followerCount
        }
      };
    })
  );
  
  // Now sequentially calculate some metrics that depend on all the above data
  const profileMetrics = await calculateProfileMetrics(user, posts, followers, postStats);
  
  // Return the complete profile
  return {
    user,
    posts,
    followers,
    following,
    notifications,
    postStats,
    profileMetrics
  };
}
          

Performance Comparison

Let's visualize the performance difference between sequential and parallel execution:

gantt title Sequential vs Parallel Operations dateFormat s axisFormat %S section Sequential Task A :seq1, 0, 2s Task B :seq2, after seq1, 3s Task C :seq3, after seq2, 1s Task D :seq4, after seq3, 2s section Parallel Task A :par1, 0, 2s Task B :par2, 0, 3s Task C :par3, 0, 1s Task D :par4, 0, 2s

Real-world example: Consider loading a social media profile page:

Advanced Concurrency Patterns

Beyond basic sequential and parallel execution, several advanced patterns help manage concurrency in complex scenarios.

Pattern 1: Concurrency Limiting

Limit the number of operations running concurrently to avoid overwhelming resources.

Concurrency Pool Implementation


/**
 * Pool for limiting concurrent operations
 * @param {Function[]} tasks - Array of task functions that return promises
 * @param {number} concurrency - Maximum number of concurrent tasks
 * @returns {Promise} - Results of all tasks
 */
async function concurrencyPool(tasks, concurrency = 5) {
  const results = [];
  const executing = new Set();
  
  // Convert each task to a promise that updates the executing set
  const enqueue = async function (taskIndex) {
    // Create a promise for this task
    const task = tasks[taskIndex];
    
    // Add the task to the set of executing tasks
    executing.add(task);
    
    try {
      // Execute the task
      const result = await task();
      results[taskIndex] = result;
      return result;
    } catch (error) {
      results[taskIndex] = { error };
      throw error;
    } finally {
      // Remove the task from the set of executing tasks
      executing.delete(task);
    }
  };
  
  // Use recursion to ensure we always have the maximum number of tasks executing
  const executeAll = async function (taskIndex) {
    // If we've processed all tasks, we're done
    if (taskIndex >= tasks.length) {
      return;
    }
    
    // Execute this task
    try {
      await enqueue(taskIndex);
    } catch (error) {
      // Individual task errors are stored in results
      console.error(`Task ${taskIndex} failed:`, error);
    }
    
    // Recursively execute the next task
    await executeAll(taskIndex + 1);
  };
  
  // Start executing tasks up to the concurrency limit
  const initialTasks = [];
  for (let i = 0; i < Math.min(concurrency, tasks.length); i++) {
    initialTasks.push(executeAll(i));
  }
  
  // Wait for all tasks to complete
  await Promise.all(initialTasks);
  
  return results;
}

// Usage example: processing a large batch of images
async function processBatchOfImages(imageUrls) {
  const startTime = performance.now();
  
  // Create an array of task functions
  const tasks = imageUrls.map(url => {
    return async () => {
      console.log(`Processing image: ${url}`);
      
      // Simulate network delay and processing time
      const image = await fetchImage(url);
      const processed = await processImage(image);
      const uploadResult = await uploadProcessedImage(processed);
      
      return {
        url,
        newUrl: uploadResult.url,
        processingTime: uploadResult.processingTime
      };
    };
  });
  
  // Process with limited concurrency
  const results = await concurrencyPool(tasks, 3);
  
  const totalTime = performance.now() - startTime;
  console.log(`Processed ${results.length} images in ${totalTime}ms`);
  
  // Count successes and failures
  const successes = results.filter(r => !r.error).length;
  const failures = results.filter(r => r.error).length;
  
  console.log(`Successes: ${successes}, Failures: ${failures}`);
  
  return {
    results,
    totalTime,
    successes,
    failures
  };
}
          

When to use concurrency limiting:

Pattern 2: Task Prioritization

Execute tasks in order of importance while still maintaining concurrency.

Priority Queue Implementation


/**
 * Priority queue for task execution
 */
class PriorityTaskQueue {
  constructor(concurrency = 4) {
    this.concurrency = concurrency;
    this.runningCount = 0;
    this.highPriorityQueue = [];
    this.normalPriorityQueue = [];
    this.lowPriorityQueue = [];
  }
  
  /**
   * Add a task to the queue
   * @param {Function} taskFn - Function that returns a promise
   * @param {string} priority - 'high', 'normal', or 'low'
   * @returns {Promise} - Promise that resolves when the task completes
   */
  enqueue(taskFn, priority = 'normal') {
    return new Promise((resolve, reject) => {
      const task = { taskFn, resolve, reject };
      
      switch (priority) {
        case 'high':
          this.highPriorityQueue.push(task);
          break;
        case 'normal':
          this.normalPriorityQueue.push(task);
          break;
        case 'low':
          this.lowPriorityQueue.push(task);
          break;
        default:
          this.normalPriorityQueue.push(task);
          break;
      }
      
      // Try to execute tasks immediately if possible
      this.runTasks();
    });
  }
  
  /**
   * Run tasks if there are available slots
   */
  runTasks() {
    // If we can run more tasks, do so
    while (this.runningCount < this.concurrency) {
      // Get the next task from the highest priority queue that has tasks
      const task = this.getNextTask();
      
      // If there are no more tasks, we're done
      if (!task) {
        break;
      }
      
      // Increment the running count
      this.runningCount++;
      
      // Execute the task
      Promise.resolve()
        .then(() => task.taskFn())
        .then(result => {
          task.resolve(result);
        })
        .catch(error => {
          task.reject(error);
        })
        .finally(() => {
          // Decrement the running count
          this.runningCount--;
          
          // Try to run more tasks
          this.runTasks();
        });
    }
  }
  
  /**
   * Get the next task from the highest priority queue that has tasks
   */
  getNextTask() {
    if (this.highPriorityQueue.length > 0) {
      return this.highPriorityQueue.shift();
    }
    
    if (this.normalPriorityQueue.length > 0) {
      return this.normalPriorityQueue.shift();
    }
    
    if (this.lowPriorityQueue.length > 0) {
      return this.lowPriorityQueue.shift();
    }
    
    return null;
  }
  
  /**
   * Get the current status of the queue
   */
  getStatus() {
    return {
      running: this.runningCount,
      queued: {
        high: this.highPriorityQueue.length,
        normal: this.normalPriorityQueue.length,
        low: this.lowPriorityQueue.length,
        total: this.highPriorityQueue.length + this.normalPriorityQueue.length + this.lowPriorityQueue.length
      }
    };
  }
}

// Usage example: image loading in a gallery
const taskQueue = new PriorityTaskQueue(3);

// Function to load and display an image
async function loadImage(url, priority) {
  try {
    const imageData = await taskQueue.enqueue(
      () => fetchImage(url),
      priority
    );
    
    displayImage(url, imageData);
    
    console.log(`Loaded image: ${url} (${priority} priority)`);
    console.log('Queue status:', taskQueue.getStatus());
  } catch (error) {
    console.error(`Failed to load image: ${url}`, error);
  }
}

// Load images with different priorities
function loadGallery(visibleImages, offscreenImages) {
  // Load visible images first (high priority)
  visibleImages.forEach(url => {
    loadImage(url, 'high');
  });
  
  // Then load images just outside the viewport (normal priority)
  const nearbyImages = offscreenImages.slice(0, 10);
  nearbyImages.forEach(url => {
    loadImage(url, 'normal');
  });
  
  // Finally load the rest (low priority)
  const remainingImages = offscreenImages.slice(10);
  remainingImages.forEach(url => {
    loadImage(url, 'low');
  });
}
          

Pattern 3: Pipeline Processing

Process data through a series of stages, where each stage can process items concurrently but items flow through stages sequentially.

Data Processing Pipeline


/**
 * Create a processing pipeline
 * @param {Function[]} stages - Array of processing stage functions
 * @param {Object} options - Configuration options
 * @returns {Function} - Function that accepts input and returns processed output
 */
function createPipeline(stages, options = {}) {
  const concurrencyLimit = options.concurrencyLimit || 5;
  const stageNames = options.stageNames || stages.map((_, i) => `Stage ${i+1}`);
  
  return async function(items) {
    let currentItems = items;
    const startTime = performance.now();
    
    // Process each stage sequentially
    for (let stageIndex = 0; stageIndex < stages.length; stageIndex++) {
      const stageFn = stages[stageIndex];
      const stageName = stageNames[stageIndex];
      const stageStartTime = performance.now();
      
      console.log(`Starting ${stageName} with ${currentItems.length} items`);
      
      // Process items in this stage with limited concurrency
      const stageResults = await concurrencyPool(
        currentItems.map(item => () => stageFn(item)),
        concurrencyLimit
      );
      
      // Filter out any failed items (could also choose to keep them based on requirements)
      currentItems = stageResults.filter(result => !result.error);
      
      const stageDuration = performance.now() - stageStartTime;
      console.log(`Completed ${stageName} with ${currentItems.length} successful items in ${stageDuration}ms`);
    }
    
    const totalDuration = performance.now() - startTime;
    console.log(`Pipeline completed in ${totalDuration}ms with ${currentItems.length} items`);
    
    return currentItems;
  };
}

// Example: Document processing pipeline
// Stage 1: Download documents
const downloadDocument = async (docInfo) => {
  const content = await fetchDocumentContent(docInfo.id);
  return {
    ...docInfo,
    content
  };
};

// Stage 2: Parse and extract data
const parseDocument = async (doc) => {
  const parsedData = await parseDocumentContent(doc.content);
  return {
    ...doc,
    parsedData
  };
};

// Stage 3: Validate extracted data
const validateDocument = async (doc) => {
  const validationResult = await validateData(doc.parsedData);
  return {
    ...doc,
    isValid: validationResult.valid,
    validationErrors: validationResult.errors
  };
};

// Stage 4: Process business logic
const processDocument = async (doc) => {
  if (!doc.isValid) {
    return {
      ...doc,
      status: 'rejected',
      reason: doc.validationErrors.join(', ')
    };
  }
  
  const processingResult = await applyBusinessRules(doc.parsedData);
  return {
    ...doc,
    status: 'processed',
    processingResult
  };
};

// Stage 5: Generate report
const generateReport = async (doc) => {
  if (doc.status !== 'processed') {
    return {
      ...doc,
      report: null
    };
  }
  
  const report = await createReport(doc);
  return {
    ...doc,
    report
  };
};

// Create and use the pipeline
const documentPipeline = createPipeline(
  [downloadDocument, parseDocument, validateDocument, processDocument, generateReport],
  {
    concurrencyLimit: 3,
    stageNames: ['Download', 'Parse', 'Validate', 'Process', 'Report']
  }
);

// Process a batch of documents
async function processBatch(documentIds) {
  const documents = documentIds.map(id => ({ id }));
  const results = await documentPipeline(documents);
  
  // Summary
  const processedCount = results.filter(doc => doc.status === 'processed').length;
  const rejectedCount = results.filter(doc => doc.status === 'rejected').length;
  
  return {
    results,
    summary: {
      total: documentIds.length,
      processed: processedCount,
      rejected: rejectedCount,
      successRate: (processedCount / documentIds.length) * 100
    }
  };
}
          

Benefits of pipeline processing:

Rate Limiting and Backoff Strategies

When working with external APIs or services, rate limiting is crucial to avoid being throttled or banned.

Token Bucket Rate Limiter


/**
 * Token bucket rate limiter
 */
class RateLimiter {
  constructor(tokensPerSecond, bucketSize) {
    this.tokensPerSecond = tokensPerSecond;
    this.bucketSize = bucketSize;
    this.tokens = bucketSize;
    this.lastRefillTime = Date.now();
    this.requestQueue = [];
  }
  
  /**
   * Refill tokens based on elapsed time
   */
  refillTokens() {
    const now = Date.now();
    const elapsedMs = now - this.lastRefillTime;
    
    if (elapsedMs > 0) {
      // Calculate how many tokens to add based on elapsed time
      const newTokens = (elapsedMs / 1000) * this.tokensPerSecond;
      
      // Add tokens to the bucket, not exceeding the bucket size
      this.tokens = Math.min(this.bucketSize, this.tokens + newTokens);
      
      // Update the last refill time
      this.lastRefillTime = now;
    }
  }
  
  /**
   * Get permission to send a request
   * @param {number} tokenCost - Number of tokens required for the request
   * @returns {Promise} - Resolves when the request can be sent
   */
  async acquireToken(tokenCost = 1) {
    return new Promise(resolve => {
      const attempt = () => {
        // Refill tokens
        this.refillTokens();
        
        // Check if we have enough tokens
        if (this.tokens >= tokenCost) {
          // Consume tokens
          this.tokens -= tokenCost;
          resolve();
          
          // Process the next request in the queue if any
          if (this.requestQueue.length > 0) {
            const nextRequest = this.requestQueue.shift();
            setTimeout(nextRequest, 0);
          }
        } else {
          // Calculate how long to wait for enough tokens
          const waitTimeMs = ((tokenCost - this.tokens) / this.tokensPerSecond) * 1000;
          
          // Add to the request queue
          this.requestQueue.push(attempt);
          
          // Set a timer to try again
          setTimeout(() => {
            // Check if this request is at the front of the queue
            if (this.requestQueue[0] === attempt) {
              this.requestQueue.shift();
              attempt();
            }
          }, waitTimeMs);
        }
      };
      
      attempt();
    });
  }
  
  /**
   * Wrap a function with rate limiting
   * @param {Function} fn - Function to rate limit
   * @param {number} tokenCost - Number of tokens required for the function
   * @returns {Function} - Rate-limited function
   */
  wrap(fn, tokenCost = 1) {
    return async (...args) => {
      await this.acquireToken(tokenCost);
      return fn(...args);
    };
  }
}

// Example: Rate-limited API client
class ApiClient {
  constructor(baseUrl, requestsPerSecond = 5) {
    this.baseUrl = baseUrl;
    this.rateLimiter = new RateLimiter(requestsPerSecond, requestsPerSecond);
    
    // Create rate-limited fetch function
    this.limitedFetch = this.rateLimiter.wrap(
      this.fetch.bind(this)
    );
  }
  
  async fetch(endpoint, options = {}) {
    try {
      const url = `${this.baseUrl}${endpoint}`;
      const response = await fetch(url, options);
      
      if (!response.ok) {
        throw new Error(`HTTP error! Status: ${response.status}`);
      }
      
      return response.json();
    } catch (error) {
      console.error(`API request failed: ${error.message}`);
      throw error;
    }
  }
  
  async get(endpoint) {
    return this.limitedFetch(endpoint, { method: 'GET' });
  }
  
  async post(endpoint, data) {
    return this.limitedFetch(endpoint, {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify(data)
    });
  }
}

// Usage
const client = new ApiClient('https://api.example.com', 10); // 10 requests per second

// Fetch multiple items with automatic rate limiting
async function fetchUserData(userIds) {
  const results = {};
  
  for (const userId of userIds) {
    try {
      // This will automatically be rate limited
      results[userId] = await client.get(`/users/${userId}`);
      console.log(`Fetched user ${userId}`);
    } catch (error) {
      console.error(`Failed to fetch user ${userId}:`, error);
      results[userId] = { error: error.message };
    }
  }
  
  return results;
}
          

Exponential Backoff Strategy

When a request fails due to rate limiting or server overload, use exponential backoff to retry with increasing delays.

Exponential Backoff Implementation


/**
 * Make a request with exponential backoff
 * @param {Function} requestFn - Function that makes the request
 * @param {Object} options - Backoff options
 * @returns {Promise} - Promise resolving to the request result
 */
async function withBackoff(requestFn, options = {}) {
  const {
    initialDelay = 1000,
    maxDelay = 30000,
    factor = 2,
    jitter = 0.1,
    maxRetries = 5,
    shouldRetry = (error) => true
  } = options;
  
  let retries = 0;
  let delay = initialDelay;
  
  while (true) {
    try {
      return await requestFn();
    } catch (error) {
      // Check if we've reached the maximum number of retries
      if (retries >= maxRetries) {
        console.log(`Maximum retries (${maxRetries}) reached. Giving up.`);
        throw error;
      }
      
      // Check if we should retry based on the error
      if (!shouldRetry(error)) {
        console.log(`Error not eligible for retry: ${error.message}`);
        throw error;
      }
      
      // Increment retry counter
      retries++;
      
      // Calculate the next delay with jitter
      delay = Math.min(
        maxDelay,
        delay * factor * (1 + jitter * (Math.random() * 2 - 1))
      );
      
      console.log(`Retry ${retries} after ${Math.round(delay)}ms delay`);
      
      // Wait before retrying
      await new Promise(resolve => setTimeout(resolve, delay));
    }
  }
}

// Example usage with a rate-limited API
async function fetchDataWithBackoff(url) {
  return withBackoff(
    async () => {
      const response = await fetch(url);
      
      if (response.status === 429) {
        // Rate limited, extract retry-after header if available
        const retryAfter = response.headers.get('Retry-After');
        const retryMs = retryAfter ? parseInt(retryAfter, 10) * 1000 : null;
        
        const error = new Error('Rate limited');
        error.status = 429;
        error.retryAfter = retryMs;
        throw error;
      }
      
      if (!response.ok) {
        const error = new Error(`HTTP error ${response.status}`);
        error.status = response.status;
        throw error;
      }
      
      return response.json();
    },
    {
      maxRetries: 5,
      shouldRetry: (error) => {
        // Retry on network errors, rate limits, and server errors
        return (
          error.name === 'TypeError' || // Network errors
          error.status === 429 || // Rate limit
          (error.status >= 500 && error.status < 600) // Server errors
        );
      },
      // Use the server's retry-after header if available
      initialDelay: (error) => error.retryAfter || 1000
    }
  );
}
          

Web Workers for True Parallelism

For computationally intensive tasks, Web Workers allow JavaScript to run in parallel threads, unlocking true parallelism.

Web Worker Pool Implementation


// worker.js
self.addEventListener('message', (event) => {
  const { id, task, data } = event.data;
  
  try {
    let result;
    
    // Execute the requested task
    switch (task) {
      case 'processImage':
        result = processImage(data);
        break;
      case 'heavyCalculation':
        result = performCalculation(data);
        break;
      default:
        throw new Error(`Unknown task: ${task}`);
    }
    
    // Return successful result
    self.postMessage({
      id,
      status: 'completed',
      result
    });
  } catch (error) {
    // Return error
    self.postMessage({
      id,
      status: 'error',
      error: error.message
    });
  }
});

// Task implementations
function processImage(imageData) {
  // Simulate image processing
  // ...
  return { processed: true };
}

function performCalculation(params) {
  // Simulate heavy calculation
  let result = 0;
  for (let i = 0; i < params.iterations; i++) {
    result += Math.sqrt(i) * Math.sin(i);
  }
  return { result };
}

// Main application code
class WorkerPool {
  constructor(workerScript, numWorkers = navigator.hardwareConcurrency || 4) {
    this.taskQueue = [];
    this.workers = [];
    this.availableWorkers = [];
    this.nextTaskId = 1;
    
    // Create workers
    for (let i = 0; i < numWorkers; i++) {
      const worker = new Worker(workerScript);
      
      worker.addEventListener('message', (event) => {
        const { id, status, result, error } = event.data;
        
        // Find the task that corresponds to this message
        const taskIndex = this.taskQueue.findIndex(task => task.id === id);
        
        if (taskIndex !== -1) {
          const task = this.taskQueue[taskIndex];
          
          // Remove the task from the queue
          this.taskQueue.splice(taskIndex, 1);
          
          // Handle the result
          if (status === 'completed') {
            task.resolve(result);
          } else {
            task.reject(new Error(error));
          }
        }
        
        // Make this worker available again
        this.availableWorkers.push(worker);
        
        // Process the next task if any
        this.processQueue();
      });
      
      // Add to available workers
      this.availableWorkers.push(worker);
      this.workers.push(worker);
    }
  }
  
  /**
   * Execute a task in a worker
   * @param {string} task - Task type
   * @param {any} data - Task data
   * @returns {Promise} - Promise that resolves with the task result
   */
  execute(task, data) {
    return new Promise((resolve, reject) => {
      // Create a task object
      const taskObj = {
        id: this.nextTaskId++,
        task,
        data,
        resolve,
        reject
      };
      
      // Add to the queue
      this.taskQueue.push(taskObj);
      
      // Try to process the queue
      this.processQueue();
    });
  }
  
  /**
   * Process the next task in the queue if a worker is available
   */
  processQueue() {
    // If there are no tasks or no available workers, return
    if (this.taskQueue.length === 0 || this.availableWorkers.length === 0) {
      return;
    }
    
    // Get the next task and worker
    const task = this.taskQueue[0];
    const worker = this.availableWorkers.pop();
    
    // Send the task to the worker
    worker.postMessage({
      id: task.id,
      task: task.task,
      data: task.data
    });
  }
  
  /**
   * Terminate all workers
   */
  terminate() {
    this.workers.forEach(worker => worker.terminate());
    this.workers = [];
    this.availableWorkers = [];
    
    // Reject all pending tasks
    this.taskQueue.forEach(task => {
      task.reject(new Error('Worker pool terminated'));
    });
    
    this.taskQueue = [];
  }
}

// Usage example
const workerPool = new WorkerPool('worker.js', 4);

// Process multiple images in parallel
async function processImages(images) {
  const results = await Promise.all(
    images.map(image => 
      workerPool.execute('processImage', image)
    )
  );
  
  return results;
}

// Perform heavy calculations without blocking the UI
async function calculateComplexData(paramSets) {
  const results = [];
  
  for (const params of paramSets) {
    try {
      const result = await workerPool.execute('heavyCalculation', params);
      results.push({
        params,
        status: 'completed',
        result
      });
    } catch (error) {
      results.push({
        params,
        status: 'error',
        error: error.message
      });
    }
  }
  
  return results;
}
          

When to use Web Workers:

Limitations of Web Workers:

Handling Dependencies Between Concurrent Operations

In many real-world scenarios, operations have dependencies on each other. Let's explore patterns for managing these dependencies.

Dependency Graph Execution

When operations have complex dependencies, use a graph-based approach to maximize concurrency while respecting dependencies.

Task Dependency Graph


/**
 * Task dependency manager
 */
class TaskGraph {
  constructor() {
    this.tasks = new Map();
    this.results = new Map();
  }
  
  /**
   * Add a task to the graph
   * @param {string} id - Task ID
   * @param {Function} fn - Task function
   * @param {string[]} dependencies - IDs of dependencies
   */
  addTask(id, fn, dependencies = []) {
    this.tasks.set(id, { fn, dependencies });
  }
  
  /**
   * Execute all tasks in the graph
   * @returns {Map} - Map of task results by ID
   */
  async execute() {
    // Reset results
    this.results = new Map();
    
    // Create a set of remaining tasks
    const remaining = new Set(this.tasks.keys());
    
    // Continue until all tasks are done
    while (remaining.size > 0) {
      // Find tasks that can be executed (all dependencies are satisfied)
      const readyTasks = [];
      
      for (const taskId of remaining) {
        const task = this.tasks.get(taskId);
        
        // Check if all dependencies are satisfied
        const dependenciesSatisfied = task.dependencies.every(
          depId => this.results.has(depId)
        );
        
        if (dependenciesSatisfied) {
          readyTasks.push(taskId);
        }
      }
      
      // If no tasks are ready, we have a circular dependency
      if (readyTasks.length === 0) {
        throw new Error('Circular dependency detected');
      }
      
      // Execute all ready tasks in parallel
      const executions = readyTasks.map(async taskId => {
        const task = this.tasks.get(taskId);
        
        // Get dependency results
        const dependencyResults = task.dependencies.map(depId => 
          this.results.get(depId)
        );
        
        try {
          // Execute the task with dependency results
          const result = await task.fn(...dependencyResults);
          
          // Store the result
          this.results.set(taskId, result);
          
          // Remove from remaining
          remaining.delete(taskId);
          
          return { taskId, success: true, result };
        } catch (error) {
          // Store the error as the result
          this.results.set(taskId, { error });
          
          // Remove from remaining
          remaining.delete(taskId);
          
          return { taskId, success: false, error };
        }
      });
      
      // Wait for all ready tasks to complete
      await Promise.all(executions);
    }
    
    return this.results;
  }
}

// Example: Data processing workflow
async function runDataProcessingWorkflow(inputData) {
  const graph = new TaskGraph();
  
  // Step 1: Parse input data
  graph.addTask('parseData', async () => {
    console.log('Parsing data...');
    return parseInputData(inputData);
  }, []);
  
  // Step 2: Validate data
  graph.addTask('validateData', async (parsedData) => {
    console.log('Validating data...');
    return validateData(parsedData);
  }, ['parseData']);
  
  // Step 3a: Process customer data
  graph.addTask('processCustomers', async (parsedData) => {
    console.log('Processing customer data...');
    return processCustomerData(parsedData.customers);
  }, ['validateData']);
  
  // Step 3b: Process product data
  graph.addTask('processProducts', async (parsedData) => {
    console.log('Processing product data...');
    return processProductData(parsedData.products);
  }, ['validateData']);
  
  // Step 3c: Process order data
  graph.addTask('processOrders', async (parsedData) => {
    console.log('Processing order data...');
    return processOrderData(parsedData.orders);
  }, ['validateData']);
  
  // Step 4: Generate customer insights (depends on customer and order data)
  graph.addTask('customerInsights', async (customers, orders) => {
    console.log('Generating customer insights...');
    return generateCustomerInsights(customers, orders);
  }, ['processCustomers', 'processOrders']);
  
  // Step 5: Generate product insights (depends on product and order data)
  graph.addTask('productInsights', async (products, orders) => {
    console.log('Generating product insights...');
    return generateProductInsights(products, orders);
  }, ['processProducts', 'processOrders']);
  
  // Step 6: Generate final report (depends on all insights)
  graph.addTask('finalReport', async (customerInsights, productInsights) => {
    console.log('Generating final report...');
    return generateFinalReport(customerInsights, productInsights);
  }, ['customerInsights', 'productInsights']);
  
  // Execute the workflow
  const results = await graph.execute();
  
  // Return the final report
  return results.get('finalReport');
}
          

The task graph visualized:

graph TD A[parseData] --> B[validateData] B --> C[processCustomers] B --> D[processProducts] B --> E[processOrders] C --> F[customerInsights] E --> F D --> G[productInsights] E --> G F --> H[finalReport] G --> H

Reactive Programming for Complex Concurrency

For the most complex concurrency scenarios, reactive programming provides a powerful paradigm.

Basic Observable Implementation


/**
 * Simple Observable implementation
 */
class Observable {
  constructor(subscribeFn) {
    this.subscribeFn = subscribeFn;
  }
  
  subscribe(observer) {
    // Create a subscription object
    const subscription = {
      unsubscribe: () => {
        // Mark as unsubscribed
        subscription.closed = true;
        
        // Call teardown logic if available
        if (typeof teardown === 'function') {
          teardown();
        }
      },
      closed: false
    };
    
    // Normalize the observer
    const observerObj = typeof observer === 'function'
      ? { next: observer }
      : observer;
    
    // Setup the subscription
    try {
      // Call the subscribe function
      const teardown = this.subscribeFn({
        next: (value) => {
          if (!subscription.closed && observerObj.next) {
            try {
              observerObj.next(value);
            } catch (e) {
              if (observerObj.error) {
                observerObj.error(e);
              }
            }
          }
        },
        error: (err) => {
          if (!subscription.closed) {
            subscription.closed = true;
            if (observerObj.error) {
              observerObj.error(err);
            }
          }
        },
        complete: () => {
          if (!subscription.closed) {
            subscription.closed = true;
            if (observerObj.complete) {
              observerObj.complete();
            }
          }
        }
      });
      
      // If subscribeFn throws synchronously, unsubscribe will not be called
      if (subscription.closed) {
        // Cleanup immediately
        if (typeof teardown === 'function') {
          teardown();
        }
      }
    } catch (e) {
      // Handle synchronous errors
      observerObj.error && observerObj.error(e);
      subscription.closed = true;
    }
    
    return subscription;
  }
  
  /**
   * Map the values emitted by this Observable
   * @param {Function} mapFn - Function to map values
   * @returns {Observable} - New Observable with mapped values
   */
  map(mapFn) {
    return new Observable(observer => {
      return this.subscribe({
        next: value => observer.next(mapFn(value)),
        error: err => observer.error(err),
        complete: () => observer.complete()
      });
    });
  }
  
  /**
   * Filter the values emitted by this Observable
   * @param {Function} filterFn - Function to filter values
   * @returns {Observable} - New Observable with filtered values
   */
  filter(filterFn) {
    return new Observable(observer => {
      return this.subscribe({
        next: value => {
          if (filterFn(value)) {
            observer.next(value);
          }
        },
        error: err => observer.error(err),
        complete: () => observer.complete()
      });
    });
  }
  
  /**
   * Merge this Observable with another
   * @param {Observable} other - Other Observable to merge with
   * @returns {Observable} - New Observable that emits values from both
   */
  merge(other) {
    return new Observable(observer => {
      const subscriptionA = this.subscribe(observer);
      const subscriptionB = other.subscribe(observer);
      
      return () => {
        subscriptionA.unsubscribe();
        subscriptionB.unsubscribe();
      };
    });
  }
  
  /**
   * Create an Observable from a Promise
   * @param {Promise} promise - Promise to convert
   * @returns {Observable} - Observable that emits the Promise result
   */
  static fromPromise(promise) {
    return new Observable(observer => {
      promise
        .then(value => {
          observer.next(value);
          observer.complete();
        })
        .catch(err => {
          observer.error(err);
        });
      
      return () => {
        // Nothing to do for a Promise
      };
    });
  }
  
  /**
   * Create an Observable from events
   * @param {EventTarget} target - DOM element or other event target
   * @param {string} eventName - Name of the event to listen for
   * @returns {Observable} - Observable that emits the events
   */
  static fromEvent(target, eventName) {
    return new Observable(observer => {
      // Create the event handler
      const handler = event => {
        observer.next(event);
      };
      
      // Add the event listener
      target.addEventListener(eventName, handler);
      
      // Return a teardown function
      return () => {
        target.removeEventListener(eventName, handler);
      };
    });
  }
}

// Example: Reactive data processing pipeline
function createSearchPipeline(searchInput) {
  // Create an Observable from input events
  const inputEvents = Observable.fromEvent(searchInput, 'input');
  
  // Create a pipeline
  const searchResults = inputEvents
    // Map to the input value
    .map(event => event.target.value)
    // Only trigger search when input is at least 3 characters
    .filter(text => text.length >= 3)
    // Map to API call
    .map(searchText => {
      console.log(`Searching for: ${searchText}`);
      
      // Convert the Promise to an Observable
      return Observable.fromPromise(
        fetch(`/api/search?q=${encodeURIComponent(searchText)}`)
          .then(response => response.json())
      );
    });
  
  return searchResults;
}

// Usage
const searchInput = document.getElementById('search');
const searchResults = createSearchPipeline(searchInput);

const subscription = searchResults.subscribe({
  next: results => {
    console.log('Search results:', results);
    displayResults(results);
  },
  error: err => {
    console.error('Search error:', err);
    displayError(err);
  }
});

// Later when no longer needed
subscription.unsubscribe();
          

Benefits of reactive programming:

Popular reactive libraries:

Best Practices for Concurrent Operations

Let's summarize the key best practices for managing concurrent operations effectively:

Practical Example: Data Processing Pipeline

Let's put everything together with a complete example of a data processing system that handles multiple concurrent operations.

Complete Data Processing System


// dataProcessor.js - A complete data processing system
class DataProcessor {
  constructor(options = {}) {
    this.concurrencyLimit = options.concurrencyLimit || 5;
    this.apiRequestsPerSecond = options.apiRequestsPerSecond || 10;
    this.maxRetries = options.maxRetries || 3;
    this.workerCount = options.workerCount || 2;
    
    // Initialize subsystems
    this.initRateLimiter();
    this.initWorkerPool();
    this.initTaskQueue();
  }
  
  initRateLimiter() {
    this.rateLimiter = new RateLimiter(this.apiRequestsPerSecond, this.apiRequestsPerSecond * 2);
    
    // Rate-limited API fetch
    this.fetchWithRateLimit = this.rateLimiter.wrap(
      async (url, options) => {
        const response = await fetch(url, options);
        
        if (!response.ok) {
          const error = new Error(`HTTP error ${response.status}`);
          error.status = response.status;
          error.retryable = response.status >= 500 || response.status === 429;
          throw error;
        }
        
        return response;
      }
    );
  }
  
  initWorkerPool() {
    // Initialize web worker pool for CPU-intensive operations
    this.workerPool = new WorkerPool('data-worker.js', this.workerCount);
  }
  
  initTaskQueue() {
    // Priority queue for tasks
    this.taskQueue = new PriorityTaskQueue(this.concurrencyLimit);
  }
  
  /**
   * Process a dataset
   * @param {Object} options - Processing options
   * @returns {Promise} - Processing results
   */
  async processDataset(options) {
    const {
      datasetId,
      transformations = [],
      outputFormat = 'json',
      priority = 'normal'
    } = options;
    
    console.log(`Processing dataset ${datasetId} with priority ${priority}`);
    const startTime = performance.now();
    
    try {
      // Step 1: Fetch the dataset metadata
      const metadata = await this.fetchDatasetMetadata(datasetId, priority);
      console.log(`Dataset ${datasetId} metadata fetched:`, metadata);
      
      // Step 2: Fetch the dataset content (with retry)
      const content = await this.fetchWithRetry(
        () => this.fetchDatasetContent(datasetId, metadata, priority),
        { maxRetries: this.maxRetries }
      );
      console.log(`Dataset ${datasetId} content fetched: ${content.length} bytes`);
      
      // Step 3: Parse the dataset
      const parsedData = await this.parseDataset(content, metadata.format, priority);
      console.log(`Dataset ${datasetId} parsed: ${parsedData.length} records`);
      
      // Step 4: Apply transformations in sequence
      let transformedData = parsedData;
      
      for (const transformation of transformations) {
        transformedData = await this.applyTransformation(
          transformedData,
          transformation,
          priority
        );
        console.log(`Transformation ${transformation.type} applied`);
      }
      
      // Step 5: Format the output
      const result = await this.formatOutput(transformedData, outputFormat, priority);
      
      const duration = performance.now() - startTime;
      console.log(`Dataset ${datasetId} processed in ${duration}ms`);
      
      return {
        datasetId,
        result,
        metadata: {
          recordCount: transformedData.length,
          processingTime: duration,
          outputSize: JSON.stringify(result).length
        }
      };
    } catch (error) {
      console.error(`Error processing dataset ${datasetId}:`, error);
      throw error;
    }
  }
  
  /**
   * Process multiple datasets in parallel with limited concurrency
   * @param {Array} datasetOptions - Array of dataset options
   * @returns {Promise} - Processing results
   */
  async processMultipleDatasets(datasetOptions) {
    // Create task functions
    const tasks = datasetOptions.map(options => {
      return async () => {
        return this.processDataset(options);
      };
    });
    
    // Process with limited concurrency
    const results = await this.processConcurrentTasks(
      tasks,
      Math.min(this.concurrencyLimit, datasetOptions.length)
    );
    
    return results;
  }
  
  // Helper methods
  
  async fetchDatasetMetadata(datasetId, priority) {
    return this.taskQueue.enqueue(
      async () => {
        console.log(`Fetching metadata for dataset ${datasetId}`);
        
        const response = await this.fetchWithRateLimit(
          `/api/datasets/${datasetId}/metadata`
        );
        
        return response.json();
      },
      priority
    );
  }
  
  async fetchDatasetContent(datasetId, metadata, priority) {
    return this.taskQueue.enqueue(
      async () => {
        console.log(`Fetching content for dataset ${datasetId}`);
        
        const response = await this.fetchWithRateLimit(
          `/api/datasets/${datasetId}/content`
        );
        
        // Choose the appropriate response method based on content type
        const contentType = response.headers.get('Content-Type');
        
        if (contentType?.includes('application/json')) {
          return response.json();
        } else if (contentType?.includes('text/csv')) {
          return response.text();
        } else {
          // Default to text
          return response.text();
        }
      },
      priority
    );
  }
  
  async parseDataset(content, format, priority) {
    // For large datasets, use Web Workers to parse without blocking the UI
    if (typeof content === 'string' && content.length > 1000000) {
      // Large dataset, use Web Worker
      return this.workerPool.execute('parseDataset', { content, format });
    } else {
      // Smaller dataset, process in the main thread
      return this.taskQueue.enqueue(
        async () => {
          console.log(`Parsing ${format} dataset`);
          
          switch (format) {
            case 'json':
              return typeof content === 'string' ? JSON.parse(content) : content;
            
            case 'csv':
              return parseCSV(content);
            
            case 'xml':
              return parseXML(content);
            
            default:
              throw new Error(`Unsupported format: ${format}`);
          }
        },
        priority
      );
    }
  }
  
  async applyTransformation(data, transformation, priority) {
    const { type, params } = transformation;
    
    // CPU-intensive transformations should use Web Workers
    if (type === 'filter' || type === 'map' || type === 'aggregate') {
      return this.workerPool.execute('transform', {
        data,
        transformation: { type, params }
      });
    } else {
      // Other transformations can run in the main thread
      return this.taskQueue.enqueue(
        async () => {
          console.log(`Applying ${type} transformation with params:`, params);
          
          switch (type) {
            case 'sort':
              return data.slice().sort((a, b) => {
                const field = params.field;
                const direction = params.direction === 'desc' ? -1 : 1;
                
                return (a[field] > b[field] ? 1 : -1) * direction;
              });
            
            case 'group':
              return groupData(data, params);
            
            case 'join':
              return joinData(data, params.otherData, params);
            
            default:
              throw new Error(`Unsupported transformation: ${type}`);
          }
        },
        priority
      );
    }
  }
  
  async formatOutput(data, format, priority) {
    return this.taskQueue.enqueue(
      async () => {
        console.log(`Formatting output as ${format}`);
        
        switch (format) {
          case 'json':
            return data;
          
          case 'csv':
            return convertToCSV(data);
          
          case 'xml':
            return convertToXML(data);
          
          case 'html':
            return convertToHTML(data);
          
          default:
            throw new Error(`Unsupported output format: ${format}`);
        }
      },
      priority
    );
  }
  
  async fetchWithRetry(requestFn, options = {}) {
    const {
      maxRetries = this.maxRetries,
      initialDelay = 1000,
      maxDelay = 10000,
      factor = 2,
      jitter = 0.1
    } = options;
    
    let retries = 0;
    let delay = initialDelay;
    
    while (true) {
      try {
        return await requestFn();
      } catch (error) {
        // If this is not a retryable error, throw immediately
        if (error.retryable === false) {
          throw error;
        }
        
        // If we've reached the maximum number of retries, throw
        if (retries >= maxRetries) {
          console.warn(`Maximum retries (${maxRetries}) reached`);
          throw error;
        }
        
        // Increment retry counter
        retries++;
        
        // Calculate the next delay with jitter
        delay = Math.min(
          maxDelay,
          delay * factor * (1 + jitter * (Math.random() * 2 - 1))
        );
        
        console.log(`Retry ${retries} after ${Math.round(delay)}ms delay`);
        
        // Wait before retrying
        await new Promise(resolve => setTimeout(resolve, delay));
      }
    }
  }
  
  async processConcurrentTasks(tasks, concurrency) {
    const results = [];
    const executing = new Set();
    
    // Convert each task to a promise that updates the executing set
    const enqueue = async function (taskIndex) {
      const task = tasks[taskIndex];
      executing.add(task);
      
      try {
        const result = await task();
        results[taskIndex] = { status: 'success', data: result };
      } catch (error) {
        results[taskIndex] = { status: 'error', error };
      } finally {
        executing.delete(task);
      }
    };
    
    // Execute tasks with limited concurrency
    let index = 0;
    const startTasks = [];
    
    // Start initial batch of tasks
    while (index < tasks.length && executing.size < concurrency) {
      startTasks.push(enqueue(index++));
    }
    
    // Process remaining tasks as others complete
    await Promise.all(startTasks);
    
    while (index < tasks.length) {
      await enqueue(index++);
    }
    
    return results;
  }
  
  // Cleanup
  destroy() {
    // Terminate Web Workers
    if (this.workerPool) {
      this.workerPool.terminate();
    }
  }
}

// Usage example
async function runDemo() {
  // Create the data processor
  const processor = new DataProcessor({
    concurrencyLimit: 3,
    apiRequestsPerSecond: 5,
    maxRetries: 3,
    workerCount: 4
  });
  
  // Define datasets to process
  const datasets = [
    {
      datasetId: 'sales-2025-q1',
      transformations: [
        { type: 'filter', params: { field: 'amount', operator: 'gt', value: 1000 } },
        { type: 'sort', params: { field: 'date', direction: 'asc' } }
      ],
      outputFormat: 'json',
      priority: 'high'
    },
    {
      datasetId: 'customers-2025',
      transformations: [
        { type: 'map', params: { 
          fields: ['id', 'name', 'email', 'totalPurchases']
        } },
        { type: 'sort', params: { field: 'totalPurchases', direction: 'desc' } }
      ],
      outputFormat: 'csv',
      priority: 'normal'
    },
    {
      datasetId: 'products-inventory',
      transformations: [
        { type: 'filter', params: { field: 'stock', operator: 'lt', value: 10 } },
        { type: 'sort', params: { field: 'restockDate', direction: 'asc' } }
      ],
      outputFormat: 'json',
      priority: 'high'
    }
  ];
  
  try {
    // Process all datasets
    console.log('Starting batch processing');
    const startTime = performance.now();
    
    const results = await processor.processMultipleDatasets(datasets);
    
    const totalTime = performance.now() - startTime;
    console.log(`All datasets processed in ${totalTime}ms`);
    
    // Analyze results
    const successful = results.filter(r => r.status === 'success').length;
    const failed = results.filter(r => r.status === 'error').length;
    
    console.log(`Processed ${results.length} datasets: ${successful} successful, ${failed} failed`);
    
    // Generate summary report
    const report = {
      totalDatasets: results.length,
      successful,
      failed,
      totalProcessingTime: totalTime,
      datasetDetails: results.map((result, index) => {
        const dataset = datasets[index];
        
        if (result.status === 'success') {
          return {
            id: dataset.datasetId,
            status: 'success',
            recordCount: result.data.metadata.recordCount,
            processingTime: result.data.metadata.processingTime
          };
        } else {
          return {
            id: dataset.datasetId,
            status: 'error',
            error: result.error.message
          };
        }
      })
    };
    
    console.log('Processing report:', report);
    
    return report;
  } catch (error) {
    console.error('Batch processing failed:', error);
    throw error;
  } finally {
    // Clean up resources
    processor.destroy();
  }
}
          

Practice Exercises

Exercise 1: Concurrent API Client

Build a reusable API client that:

  • Limits concurrent requests to avoid overwhelming servers
  • Implements rate limiting based on API requirements
  • Handles retries with exponential backoff
  • Supports request prioritization
  • Provides meaningful progress updates

Exercise 2: Image Gallery with Optimized Loading

Create an image gallery that:

  • Loads visible images first
  • Pre-loads images just outside the viewport
  • Limits concurrent image loading
  • Processes images in Web Workers
  • Cancels loading of images that scroll out of view

Exercise 3: Task Dependency Scheduler

Implement a task scheduler that:

  • Allows defining tasks with dependencies
  • Executes independent tasks in parallel
  • Respects dependency ordering
  • Supports dynamic task addition
  • Provides visualization of the task execution flow

Summary

Managing concurrent operations effectively is a critical skill for modern JavaScript development. We've explored various approaches and patterns:

By understanding these patterns and knowing when to apply each, you can build more efficient, responsive applications that make the most of available resources while providing a smooth user experience.

Further Learning