Introduction to Concurrent Asynchronous Operations
Modern web applications frequently need to manage multiple asynchronous operations simultaneously. From fetching data from various APIs to processing user interactions while background tasks run, handling concurrency correctly is crucial for building responsive, efficient applications.
Think of concurrent operations like managing a restaurant kitchen. You don't prepare one dish at a time from start to finish—instead, you have multiple dishes cooking simultaneously, with different completion times and dependencies. The chef (your code) needs to coordinate all these operations to ensure everything gets done efficiently and in the right order.
JavaScript's Concurrency Model
Before diving into concurrent operation patterns, it's important to understand how JavaScript handles concurrency.
Single-Threaded Execution with Event Loop
JavaScript is single-threaded, meaning it can only execute one piece of code at a time. However, through the event loop, it creates the illusion of concurrency by handling asynchronous operations.
Key points about JavaScript's concurrency:
- JavaScript performs concurrent operations via asynchronous callbacks
- Actual parallelism isn't possible in single-threaded JavaScript (except with Web Workers)
- Asynchronous operations appear concurrent but are scheduled sequentially
- This model prevents race conditions in the main thread but requires careful management
The Evolution of Asynchronous Concurrency
JavaScript's concurrency patterns have evolved significantly over time:
Callback Era
// Sequential operations with callbacks
fetchUserData(userId, function(userData) {
fetchUserPosts(userData.id, function(posts) {
fetchPostComments(posts[0].id, function(comments) {
// Deeply nested "callback hell"
displayUserInfo(userData, posts, comments);
});
});
});
// Parallel operations with callbacks
let userData, userPosts, userComments;
let completed = 0;
function checkAllComplete() {
if (completed === 3) {
displayUserInfo(userData, userPosts, userComments);
}
}
fetchUserData(userId, function(data) {
userData = data;
completed++;
checkAllComplete();
});
fetchUserPosts(userId, function(posts) {
userPosts = posts;
completed++;
checkAllComplete();
});
fetchUserComments(userId, function(comments) {
userComments = comments;
completed++;
checkAllComplete();
});
Promise Era
// Sequential operations with promises
fetchUserData(userId)
.then(userData => fetchUserPosts(userData.id))
.then(posts => fetchPostComments(posts[0].id))
.then(comments => {
displayUserInfo(userData, posts, comments); // Problem: userData and posts aren't available!
});
// Better sequential with promise chaining
fetchUserData(userId)
.then(userData => {
return fetchUserPosts(userData.id)
.then(posts => {
return fetchPostComments(posts[0].id)
.then(comments => {
return { userData, posts, comments };
});
});
})
.then(results => {
displayUserInfo(results.userData, results.posts, results.comments);
});
// Parallel operations with Promise.all
Promise.all([
fetchUserData(userId),
fetchUserPosts(userId),
fetchUserComments(userId)
])
.then(([userData, posts, comments]) => {
displayUserInfo(userData, posts, comments);
})
.catch(error => {
console.error('One of the requests failed:', error);
});
Modern Async/Await Era
// Sequential operations with async/await
async function loadUserData(userId) {
const userData = await fetchUserData(userId);
const posts = await fetchUserPosts(userData.id);
const comments = await fetchPostComments(posts[0].id);
displayUserInfo(userData, posts, comments);
}
// Parallel operations with async/await
async function loadUserData(userId) {
const [userData, posts, comments] = await Promise.all([
fetchUserData(userId),
fetchUserPosts(userId),
fetchUserComments(userId)
]);
displayUserInfo(userData, posts, comments);
}
Promise Combinators: Managing Multiple Promises
JavaScript provides several built-in ways to manage multiple promises, each serving different concurrency needs.
Promise.all - Parallel Execution
Use Promise.all when you need to execute multiple operations concurrently and need all of them to succeed.
Promise.all Example
// Use case: Loading a dashboard that needs multiple data sources
async function loadDashboard(userId) {
try {
const startTime = performance.now();
const [
accountInfo,
recentTransactions,
stockPrices,
newsArticles,
weatherData
] = await Promise.all([
fetchAccountInfo(userId),
fetchRecentTransactions(userId),
fetchStockPrices(userId),
fetchNewsArticles(),
fetchWeatherData(userLocation)
]);
const loadTime = performance.now() - startTime;
console.log(`Dashboard loaded in ${loadTime}ms`);
renderDashboard({
accountInfo,
recentTransactions,
stockPrices,
newsArticles,
weatherData
});
} catch (error) {
// If ANY promise fails, this catch block will execute
console.error('Failed to load dashboard:', error);
showErrorMessage('Some dashboard components failed to load. Please try again.');
}
}
Key Point: Promise.all fails fast. If any promise rejects, the entire operation rejects immediately with that error, even if other promises are still pending.
Promise.allSettled - Complete Execution
Use Promise.allSettled when you want to execute multiple operations and get all their results, regardless of whether some operations fail.
Promise.allSettled Example
// Use case: Loading non-critical dashboard components where some can fail
async function loadDashboard(userId) {
const startTime = performance.now();
const results = await Promise.allSettled([
fetchAccountInfo(userId),
fetchRecentTransactions(userId),
fetchStockPrices(userId),
fetchNewsArticles(),
fetchWeatherData(userLocation)
]);
const loadTime = performance.now() - startTime;
console.log(`Dashboard loaded in ${loadTime}ms`);
// Process results based on their status
const dashboard = {
loadTime
};
const labels = ['accountInfo', 'recentTransactions', 'stockPrices', 'newsArticles', 'weatherData'];
// Process each result
results.forEach((result, index) => {
const componentName = labels[index];
if (result.status === 'fulfilled') {
dashboard[componentName] = {
status: 'loaded',
data: result.value
};
} else {
console.error(`Failed to load ${componentName}:`, result.reason);
dashboard[componentName] = {
status: 'error',
error: result.reason.message
};
}
});
// Count failures
const failedComponents = results.filter(r => r.status === 'rejected').length;
if (failedComponents > 0) {
showWarningMessage(`${failedComponents} dashboard components failed to load.`);
}
renderDashboard(dashboard);
}
Promise.race - First to Complete
Use Promise.race when you need the result of the first promise to settle (either resolve or reject).
Promise.race Example
// Use case: Fetching data with a timeout
function fetchWithTimeout(url, timeoutMs = 5000) {
const fetchPromise = fetch(url).then(response => {
if (!response.ok) {
throw new Error(`HTTP error! Status: ${response.status}`);
}
return response.json();
});
const timeoutPromise = new Promise((_, reject) => {
setTimeout(() => {
reject(new Error(`Request to ${url} timed out after ${timeoutMs}ms`));
}, timeoutMs);
});
return Promise.race([fetchPromise, timeoutPromise]);
}
// Use case: Getting data from the fastest available server
async function fetchFromFastestServer(resourcePath) {
const servers = [
'https://api-us-east.example.com',
'https://api-us-west.example.com',
'https://api-eu.example.com'
];
// Create an array of fetch promises, one for each server
const fetchPromises = servers.map(server => {
return fetch(`${server}/${resourcePath}`)
.then(response => {
if (!response.ok) {
throw new Error(`HTTP error! Status: ${response.status}`);
}
return response.json();
})
.then(data => {
console.log(`Got response from ${server}`);
return data;
});
});
// Return data from the fastest server to respond
return Promise.race(fetchPromises);
}
Important: With Promise.race, if the winning promise rejects, the entire operation rejects, even if other promises would have resolved successfully.
Promise.any - First to Succeed
Use Promise.any when you need the result of the first promise to successfully resolve, ignoring any that reject.
Promise.any Example
// Use case: Trying multiple servers until one succeeds
async function fetchWithFailover(resourcePath) {
const servers = [
'https://api-primary.example.com',
'https://api-backup1.example.com',
'https://api-backup2.example.com'
];
// Create an array of fetch promises, one for each server
const fetchPromises = servers.map(server => {
return fetch(`${server}/${resourcePath}`)
.then(response => {
if (!response.ok) {
throw new Error(`HTTP error from ${server}! Status: ${response.status}`);
}
console.log(`Successfully fetched from ${server}`);
return response.json();
});
});
try {
// Return the first successful response
return await Promise.any(fetchPromises);
} catch (error) {
// If all promises rejected, AggregateError is thrown with all the individual errors
console.error('All servers failed:', error.errors);
throw new Error('Unable to fetch data from any server.');
}
}
// Use case: Finding a user by multiple identifiers
async function findUser(possibleIds) {
try {
// Try to find a user with any of these IDs
const user = await Promise.any(
possibleIds.map(id => findUserById(id))
);
return user;
} catch (error) {
throw new Error('User not found with any of the provided IDs');
}
}
Sequential vs Parallel Execution
Understanding when to use sequential versus parallel execution is crucial for optimizing performance while maintaining correctness.
Sequential Execution
Execute operations one after another when:
- Later operations depend on the results of earlier ones
- Operations must be performed in a specific order
- You need to avoid overwhelming a server or API with concurrent requests
- Error handling requires stopping the process after a failure
Deliberate Sequential Processing
// Processing steps in order with intermediate processing
async function processUserUpload(fileId) {
// Step 1: Download file
const file = await downloadFile(fileId);
console.log('File downloaded, size:', file.size);
// Step 2: Validate file contents
const validationResult = await validateFile(file);
if (!validationResult.valid) {
throw new Error(`Invalid file: ${validationResult.error}`);
}
console.log('File validated successfully');
// Step 3: Transform data (depends on validation rules)
const transformedData = await transformData(file, validationResult.rules);
console.log('Data transformed, records:', transformedData.length);
// Step 4: Save to database (depends on transformed data)
const saveResult = await saveToDatabase(transformedData);
console.log('Data saved to database, id:', saveResult.id);
// Step 5: Generate report (depends on saved data with DB IDs)
const report = await generateReport(saveResult.id);
console.log('Report generated');
return {
status: 'success',
recordsProcessed: transformedData.length,
reportUrl: report.url
};
}
Parallel Execution
Execute operations simultaneously when:
- Operations are independent of each other
- You want to maximize performance by doing work concurrently
- You need to fetch data from multiple sources simultaneously
- Operations can be started without waiting for others to complete
Optimized Mixed Sequential/Parallel Processing
// Using both sequential and parallel execution for optimal performance
async function loadUserProfile(userId) {
// First, get the basic user data (sequential prerequisite)
const user = await fetchUserData(userId);
// Once we have the user, fetch multiple related data sources in parallel
const [
posts,
followers,
following,
notifications
] = await Promise.all([
fetchUserPosts(userId),
fetchUserFollowers(userId),
fetchUserFollowing(userId),
fetchUserNotifications(userId)
]);
// Process user's post activity in parallel
const postStats = await Promise.all(
posts.map(async post => {
const [likes, comments, shares] = await Promise.all([
fetchPostLikes(post.id),
fetchPostComments(post.id),
fetchPostShares(post.id)
]);
return {
postId: post.id,
metrics: {
likeCount: likes.length,
commentCount: comments.length,
shareCount: shares.length,
engagement: (likes.length + comments.length * 2 + shares.length * 3) / user.followerCount
}
};
})
);
// Now sequentially calculate some metrics that depend on all the above data
const profileMetrics = await calculateProfileMetrics(user, posts, followers, postStats);
// Return the complete profile
return {
user,
posts,
followers,
following,
notifications,
postStats,
profileMetrics
};
}
Performance Comparison
Let's visualize the performance difference between sequential and parallel execution:
Real-world example: Consider loading a social media profile page:
- Sequential approach: Load user data → then load posts → then load comments → then load followers (total time: sum of all operations)
- Parallel approach: Load user data, posts, comments, and followers simultaneously (total time: duration of the longest operation)
- Hybrid approach: Load user data first (required) → then load posts, comments, and followers in parallel (best of both worlds)
Advanced Concurrency Patterns
Beyond basic sequential and parallel execution, several advanced patterns help manage concurrency in complex scenarios.
Pattern 1: Concurrency Limiting
Limit the number of operations running concurrently to avoid overwhelming resources.
Concurrency Pool Implementation
/**
* Pool for limiting concurrent operations
* @param {Function[]} tasks - Array of task functions that return promises
* @param {number} concurrency - Maximum number of concurrent tasks
* @returns {Promise} - Results of all tasks
*/
async function concurrencyPool(tasks, concurrency = 5) {
const results = [];
const executing = new Set();
// Convert each task to a promise that updates the executing set
const enqueue = async function (taskIndex) {
// Create a promise for this task
const task = tasks[taskIndex];
// Add the task to the set of executing tasks
executing.add(task);
try {
// Execute the task
const result = await task();
results[taskIndex] = result;
return result;
} catch (error) {
results[taskIndex] = { error };
throw error;
} finally {
// Remove the task from the set of executing tasks
executing.delete(task);
}
};
// Use recursion to ensure we always have the maximum number of tasks executing
const executeAll = async function (taskIndex) {
// If we've processed all tasks, we're done
if (taskIndex >= tasks.length) {
return;
}
// Execute this task
try {
await enqueue(taskIndex);
} catch (error) {
// Individual task errors are stored in results
console.error(`Task ${taskIndex} failed:`, error);
}
// Recursively execute the next task
await executeAll(taskIndex + 1);
};
// Start executing tasks up to the concurrency limit
const initialTasks = [];
for (let i = 0; i < Math.min(concurrency, tasks.length); i++) {
initialTasks.push(executeAll(i));
}
// Wait for all tasks to complete
await Promise.all(initialTasks);
return results;
}
// Usage example: processing a large batch of images
async function processBatchOfImages(imageUrls) {
const startTime = performance.now();
// Create an array of task functions
const tasks = imageUrls.map(url => {
return async () => {
console.log(`Processing image: ${url}`);
// Simulate network delay and processing time
const image = await fetchImage(url);
const processed = await processImage(image);
const uploadResult = await uploadProcessedImage(processed);
return {
url,
newUrl: uploadResult.url,
processingTime: uploadResult.processingTime
};
};
});
// Process with limited concurrency
const results = await concurrencyPool(tasks, 3);
const totalTime = performance.now() - startTime;
console.log(`Processed ${results.length} images in ${totalTime}ms`);
// Count successes and failures
const successes = results.filter(r => !r.error).length;
const failures = results.filter(r => r.error).length;
console.log(`Successes: ${successes}, Failures: ${failures}`);
return {
results,
totalTime,
successes,
failures
};
}
When to use concurrency limiting:
- APIs with rate limits or quotas
- When processing large datasets
- To avoid overwhelming server resources
- To control memory usage in the browser
- When working with limited hardware resources
Pattern 2: Task Prioritization
Execute tasks in order of importance while still maintaining concurrency.
Priority Queue Implementation
/**
* Priority queue for task execution
*/
class PriorityTaskQueue {
constructor(concurrency = 4) {
this.concurrency = concurrency;
this.runningCount = 0;
this.highPriorityQueue = [];
this.normalPriorityQueue = [];
this.lowPriorityQueue = [];
}
/**
* Add a task to the queue
* @param {Function} taskFn - Function that returns a promise
* @param {string} priority - 'high', 'normal', or 'low'
* @returns {Promise} - Promise that resolves when the task completes
*/
enqueue(taskFn, priority = 'normal') {
return new Promise((resolve, reject) => {
const task = { taskFn, resolve, reject };
switch (priority) {
case 'high':
this.highPriorityQueue.push(task);
break;
case 'normal':
this.normalPriorityQueue.push(task);
break;
case 'low':
this.lowPriorityQueue.push(task);
break;
default:
this.normalPriorityQueue.push(task);
break;
}
// Try to execute tasks immediately if possible
this.runTasks();
});
}
/**
* Run tasks if there are available slots
*/
runTasks() {
// If we can run more tasks, do so
while (this.runningCount < this.concurrency) {
// Get the next task from the highest priority queue that has tasks
const task = this.getNextTask();
// If there are no more tasks, we're done
if (!task) {
break;
}
// Increment the running count
this.runningCount++;
// Execute the task
Promise.resolve()
.then(() => task.taskFn())
.then(result => {
task.resolve(result);
})
.catch(error => {
task.reject(error);
})
.finally(() => {
// Decrement the running count
this.runningCount--;
// Try to run more tasks
this.runTasks();
});
}
}
/**
* Get the next task from the highest priority queue that has tasks
*/
getNextTask() {
if (this.highPriorityQueue.length > 0) {
return this.highPriorityQueue.shift();
}
if (this.normalPriorityQueue.length > 0) {
return this.normalPriorityQueue.shift();
}
if (this.lowPriorityQueue.length > 0) {
return this.lowPriorityQueue.shift();
}
return null;
}
/**
* Get the current status of the queue
*/
getStatus() {
return {
running: this.runningCount,
queued: {
high: this.highPriorityQueue.length,
normal: this.normalPriorityQueue.length,
low: this.lowPriorityQueue.length,
total: this.highPriorityQueue.length + this.normalPriorityQueue.length + this.lowPriorityQueue.length
}
};
}
}
// Usage example: image loading in a gallery
const taskQueue = new PriorityTaskQueue(3);
// Function to load and display an image
async function loadImage(url, priority) {
try {
const imageData = await taskQueue.enqueue(
() => fetchImage(url),
priority
);
displayImage(url, imageData);
console.log(`Loaded image: ${url} (${priority} priority)`);
console.log('Queue status:', taskQueue.getStatus());
} catch (error) {
console.error(`Failed to load image: ${url}`, error);
}
}
// Load images with different priorities
function loadGallery(visibleImages, offscreenImages) {
// Load visible images first (high priority)
visibleImages.forEach(url => {
loadImage(url, 'high');
});
// Then load images just outside the viewport (normal priority)
const nearbyImages = offscreenImages.slice(0, 10);
nearbyImages.forEach(url => {
loadImage(url, 'normal');
});
// Finally load the rest (low priority)
const remainingImages = offscreenImages.slice(10);
remainingImages.forEach(url => {
loadImage(url, 'low');
});
}
Pattern 3: Pipeline Processing
Process data through a series of stages, where each stage can process items concurrently but items flow through stages sequentially.
Data Processing Pipeline
/**
* Create a processing pipeline
* @param {Function[]} stages - Array of processing stage functions
* @param {Object} options - Configuration options
* @returns {Function} - Function that accepts input and returns processed output
*/
function createPipeline(stages, options = {}) {
const concurrencyLimit = options.concurrencyLimit || 5;
const stageNames = options.stageNames || stages.map((_, i) => `Stage ${i+1}`);
return async function(items) {
let currentItems = items;
const startTime = performance.now();
// Process each stage sequentially
for (let stageIndex = 0; stageIndex < stages.length; stageIndex++) {
const stageFn = stages[stageIndex];
const stageName = stageNames[stageIndex];
const stageStartTime = performance.now();
console.log(`Starting ${stageName} with ${currentItems.length} items`);
// Process items in this stage with limited concurrency
const stageResults = await concurrencyPool(
currentItems.map(item => () => stageFn(item)),
concurrencyLimit
);
// Filter out any failed items (could also choose to keep them based on requirements)
currentItems = stageResults.filter(result => !result.error);
const stageDuration = performance.now() - stageStartTime;
console.log(`Completed ${stageName} with ${currentItems.length} successful items in ${stageDuration}ms`);
}
const totalDuration = performance.now() - startTime;
console.log(`Pipeline completed in ${totalDuration}ms with ${currentItems.length} items`);
return currentItems;
};
}
// Example: Document processing pipeline
// Stage 1: Download documents
const downloadDocument = async (docInfo) => {
const content = await fetchDocumentContent(docInfo.id);
return {
...docInfo,
content
};
};
// Stage 2: Parse and extract data
const parseDocument = async (doc) => {
const parsedData = await parseDocumentContent(doc.content);
return {
...doc,
parsedData
};
};
// Stage 3: Validate extracted data
const validateDocument = async (doc) => {
const validationResult = await validateData(doc.parsedData);
return {
...doc,
isValid: validationResult.valid,
validationErrors: validationResult.errors
};
};
// Stage 4: Process business logic
const processDocument = async (doc) => {
if (!doc.isValid) {
return {
...doc,
status: 'rejected',
reason: doc.validationErrors.join(', ')
};
}
const processingResult = await applyBusinessRules(doc.parsedData);
return {
...doc,
status: 'processed',
processingResult
};
};
// Stage 5: Generate report
const generateReport = async (doc) => {
if (doc.status !== 'processed') {
return {
...doc,
report: null
};
}
const report = await createReport(doc);
return {
...doc,
report
};
};
// Create and use the pipeline
const documentPipeline = createPipeline(
[downloadDocument, parseDocument, validateDocument, processDocument, generateReport],
{
concurrencyLimit: 3,
stageNames: ['Download', 'Parse', 'Validate', 'Process', 'Report']
}
);
// Process a batch of documents
async function processBatch(documentIds) {
const documents = documentIds.map(id => ({ id }));
const results = await documentPipeline(documents);
// Summary
const processedCount = results.filter(doc => doc.status === 'processed').length;
const rejectedCount = results.filter(doc => doc.status === 'rejected').length;
return {
results,
summary: {
total: documentIds.length,
processed: processedCount,
rejected: rejectedCount,
successRate: (processedCount / documentIds.length) * 100
}
};
}
Benefits of pipeline processing:
- Combines sequential and parallel processing for optimal efficiency
- Improves code organization by separating concerns
- Allows different concurrency limits at different stages
- Makes it easy to add, remove, or modify processing stages
- Provides clear visibility into the progress of each processing stage
Rate Limiting and Backoff Strategies
When working with external APIs or services, rate limiting is crucial to avoid being throttled or banned.
Token Bucket Rate Limiter
/**
* Token bucket rate limiter
*/
class RateLimiter {
constructor(tokensPerSecond, bucketSize) {
this.tokensPerSecond = tokensPerSecond;
this.bucketSize = bucketSize;
this.tokens = bucketSize;
this.lastRefillTime = Date.now();
this.requestQueue = [];
}
/**
* Refill tokens based on elapsed time
*/
refillTokens() {
const now = Date.now();
const elapsedMs = now - this.lastRefillTime;
if (elapsedMs > 0) {
// Calculate how many tokens to add based on elapsed time
const newTokens = (elapsedMs / 1000) * this.tokensPerSecond;
// Add tokens to the bucket, not exceeding the bucket size
this.tokens = Math.min(this.bucketSize, this.tokens + newTokens);
// Update the last refill time
this.lastRefillTime = now;
}
}
/**
* Get permission to send a request
* @param {number} tokenCost - Number of tokens required for the request
* @returns {Promise} - Resolves when the request can be sent
*/
async acquireToken(tokenCost = 1) {
return new Promise(resolve => {
const attempt = () => {
// Refill tokens
this.refillTokens();
// Check if we have enough tokens
if (this.tokens >= tokenCost) {
// Consume tokens
this.tokens -= tokenCost;
resolve();
// Process the next request in the queue if any
if (this.requestQueue.length > 0) {
const nextRequest = this.requestQueue.shift();
setTimeout(nextRequest, 0);
}
} else {
// Calculate how long to wait for enough tokens
const waitTimeMs = ((tokenCost - this.tokens) / this.tokensPerSecond) * 1000;
// Add to the request queue
this.requestQueue.push(attempt);
// Set a timer to try again
setTimeout(() => {
// Check if this request is at the front of the queue
if (this.requestQueue[0] === attempt) {
this.requestQueue.shift();
attempt();
}
}, waitTimeMs);
}
};
attempt();
});
}
/**
* Wrap a function with rate limiting
* @param {Function} fn - Function to rate limit
* @param {number} tokenCost - Number of tokens required for the function
* @returns {Function} - Rate-limited function
*/
wrap(fn, tokenCost = 1) {
return async (...args) => {
await this.acquireToken(tokenCost);
return fn(...args);
};
}
}
// Example: Rate-limited API client
class ApiClient {
constructor(baseUrl, requestsPerSecond = 5) {
this.baseUrl = baseUrl;
this.rateLimiter = new RateLimiter(requestsPerSecond, requestsPerSecond);
// Create rate-limited fetch function
this.limitedFetch = this.rateLimiter.wrap(
this.fetch.bind(this)
);
}
async fetch(endpoint, options = {}) {
try {
const url = `${this.baseUrl}${endpoint}`;
const response = await fetch(url, options);
if (!response.ok) {
throw new Error(`HTTP error! Status: ${response.status}`);
}
return response.json();
} catch (error) {
console.error(`API request failed: ${error.message}`);
throw error;
}
}
async get(endpoint) {
return this.limitedFetch(endpoint, { method: 'GET' });
}
async post(endpoint, data) {
return this.limitedFetch(endpoint, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(data)
});
}
}
// Usage
const client = new ApiClient('https://api.example.com', 10); // 10 requests per second
// Fetch multiple items with automatic rate limiting
async function fetchUserData(userIds) {
const results = {};
for (const userId of userIds) {
try {
// This will automatically be rate limited
results[userId] = await client.get(`/users/${userId}`);
console.log(`Fetched user ${userId}`);
} catch (error) {
console.error(`Failed to fetch user ${userId}:`, error);
results[userId] = { error: error.message };
}
}
return results;
}
Exponential Backoff Strategy
When a request fails due to rate limiting or server overload, use exponential backoff to retry with increasing delays.
Exponential Backoff Implementation
/**
* Make a request with exponential backoff
* @param {Function} requestFn - Function that makes the request
* @param {Object} options - Backoff options
* @returns {Promise} - Promise resolving to the request result
*/
async function withBackoff(requestFn, options = {}) {
const {
initialDelay = 1000,
maxDelay = 30000,
factor = 2,
jitter = 0.1,
maxRetries = 5,
shouldRetry = (error) => true
} = options;
let retries = 0;
let delay = initialDelay;
while (true) {
try {
return await requestFn();
} catch (error) {
// Check if we've reached the maximum number of retries
if (retries >= maxRetries) {
console.log(`Maximum retries (${maxRetries}) reached. Giving up.`);
throw error;
}
// Check if we should retry based on the error
if (!shouldRetry(error)) {
console.log(`Error not eligible for retry: ${error.message}`);
throw error;
}
// Increment retry counter
retries++;
// Calculate the next delay with jitter
delay = Math.min(
maxDelay,
delay * factor * (1 + jitter * (Math.random() * 2 - 1))
);
console.log(`Retry ${retries} after ${Math.round(delay)}ms delay`);
// Wait before retrying
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
// Example usage with a rate-limited API
async function fetchDataWithBackoff(url) {
return withBackoff(
async () => {
const response = await fetch(url);
if (response.status === 429) {
// Rate limited, extract retry-after header if available
const retryAfter = response.headers.get('Retry-After');
const retryMs = retryAfter ? parseInt(retryAfter, 10) * 1000 : null;
const error = new Error('Rate limited');
error.status = 429;
error.retryAfter = retryMs;
throw error;
}
if (!response.ok) {
const error = new Error(`HTTP error ${response.status}`);
error.status = response.status;
throw error;
}
return response.json();
},
{
maxRetries: 5,
shouldRetry: (error) => {
// Retry on network errors, rate limits, and server errors
return (
error.name === 'TypeError' || // Network errors
error.status === 429 || // Rate limit
(error.status >= 500 && error.status < 600) // Server errors
);
},
// Use the server's retry-after header if available
initialDelay: (error) => error.retryAfter || 1000
}
);
}
Web Workers for True Parallelism
For computationally intensive tasks, Web Workers allow JavaScript to run in parallel threads, unlocking true parallelism.
Web Worker Pool Implementation
// worker.js
self.addEventListener('message', (event) => {
const { id, task, data } = event.data;
try {
let result;
// Execute the requested task
switch (task) {
case 'processImage':
result = processImage(data);
break;
case 'heavyCalculation':
result = performCalculation(data);
break;
default:
throw new Error(`Unknown task: ${task}`);
}
// Return successful result
self.postMessage({
id,
status: 'completed',
result
});
} catch (error) {
// Return error
self.postMessage({
id,
status: 'error',
error: error.message
});
}
});
// Task implementations
function processImage(imageData) {
// Simulate image processing
// ...
return { processed: true };
}
function performCalculation(params) {
// Simulate heavy calculation
let result = 0;
for (let i = 0; i < params.iterations; i++) {
result += Math.sqrt(i) * Math.sin(i);
}
return { result };
}
// Main application code
class WorkerPool {
constructor(workerScript, numWorkers = navigator.hardwareConcurrency || 4) {
this.taskQueue = [];
this.workers = [];
this.availableWorkers = [];
this.nextTaskId = 1;
// Create workers
for (let i = 0; i < numWorkers; i++) {
const worker = new Worker(workerScript);
worker.addEventListener('message', (event) => {
const { id, status, result, error } = event.data;
// Find the task that corresponds to this message
const taskIndex = this.taskQueue.findIndex(task => task.id === id);
if (taskIndex !== -1) {
const task = this.taskQueue[taskIndex];
// Remove the task from the queue
this.taskQueue.splice(taskIndex, 1);
// Handle the result
if (status === 'completed') {
task.resolve(result);
} else {
task.reject(new Error(error));
}
}
// Make this worker available again
this.availableWorkers.push(worker);
// Process the next task if any
this.processQueue();
});
// Add to available workers
this.availableWorkers.push(worker);
this.workers.push(worker);
}
}
/**
* Execute a task in a worker
* @param {string} task - Task type
* @param {any} data - Task data
* @returns {Promise} - Promise that resolves with the task result
*/
execute(task, data) {
return new Promise((resolve, reject) => {
// Create a task object
const taskObj = {
id: this.nextTaskId++,
task,
data,
resolve,
reject
};
// Add to the queue
this.taskQueue.push(taskObj);
// Try to process the queue
this.processQueue();
});
}
/**
* Process the next task in the queue if a worker is available
*/
processQueue() {
// If there are no tasks or no available workers, return
if (this.taskQueue.length === 0 || this.availableWorkers.length === 0) {
return;
}
// Get the next task and worker
const task = this.taskQueue[0];
const worker = this.availableWorkers.pop();
// Send the task to the worker
worker.postMessage({
id: task.id,
task: task.task,
data: task.data
});
}
/**
* Terminate all workers
*/
terminate() {
this.workers.forEach(worker => worker.terminate());
this.workers = [];
this.availableWorkers = [];
// Reject all pending tasks
this.taskQueue.forEach(task => {
task.reject(new Error('Worker pool terminated'));
});
this.taskQueue = [];
}
}
// Usage example
const workerPool = new WorkerPool('worker.js', 4);
// Process multiple images in parallel
async function processImages(images) {
const results = await Promise.all(
images.map(image =>
workerPool.execute('processImage', image)
)
);
return results;
}
// Perform heavy calculations without blocking the UI
async function calculateComplexData(paramSets) {
const results = [];
for (const params of paramSets) {
try {
const result = await workerPool.execute('heavyCalculation', params);
results.push({
params,
status: 'completed',
result
});
} catch (error) {
results.push({
params,
status: 'error',
error: error.message
});
}
}
return results;
}
When to use Web Workers:
- CPU-intensive operations like image processing
- Complex mathematical calculations
- Parsing large datasets
- Any operation that might block the main thread
- Background synchronization or data processing
Limitations of Web Workers:
- No direct DOM access
- Limited communication (postMessage only)
- Additional overhead for data transfer
- Not suitable for small, quick operations
Handling Dependencies Between Concurrent Operations
In many real-world scenarios, operations have dependencies on each other. Let's explore patterns for managing these dependencies.
Dependency Graph Execution
When operations have complex dependencies, use a graph-based approach to maximize concurrency while respecting dependencies.
Task Dependency Graph
/**
* Task dependency manager
*/
class TaskGraph {
constructor() {
this.tasks = new Map();
this.results = new Map();
}
/**
* Add a task to the graph
* @param {string} id - Task ID
* @param {Function} fn - Task function
* @param {string[]} dependencies - IDs of dependencies
*/
addTask(id, fn, dependencies = []) {
this.tasks.set(id, { fn, dependencies });
}
/**
* Execute all tasks in the graph
* @returns {Map} - Map of task results by ID
*/
async execute() {
// Reset results
this.results = new Map();
// Create a set of remaining tasks
const remaining = new Set(this.tasks.keys());
// Continue until all tasks are done
while (remaining.size > 0) {
// Find tasks that can be executed (all dependencies are satisfied)
const readyTasks = [];
for (const taskId of remaining) {
const task = this.tasks.get(taskId);
// Check if all dependencies are satisfied
const dependenciesSatisfied = task.dependencies.every(
depId => this.results.has(depId)
);
if (dependenciesSatisfied) {
readyTasks.push(taskId);
}
}
// If no tasks are ready, we have a circular dependency
if (readyTasks.length === 0) {
throw new Error('Circular dependency detected');
}
// Execute all ready tasks in parallel
const executions = readyTasks.map(async taskId => {
const task = this.tasks.get(taskId);
// Get dependency results
const dependencyResults = task.dependencies.map(depId =>
this.results.get(depId)
);
try {
// Execute the task with dependency results
const result = await task.fn(...dependencyResults);
// Store the result
this.results.set(taskId, result);
// Remove from remaining
remaining.delete(taskId);
return { taskId, success: true, result };
} catch (error) {
// Store the error as the result
this.results.set(taskId, { error });
// Remove from remaining
remaining.delete(taskId);
return { taskId, success: false, error };
}
});
// Wait for all ready tasks to complete
await Promise.all(executions);
}
return this.results;
}
}
// Example: Data processing workflow
async function runDataProcessingWorkflow(inputData) {
const graph = new TaskGraph();
// Step 1: Parse input data
graph.addTask('parseData', async () => {
console.log('Parsing data...');
return parseInputData(inputData);
}, []);
// Step 2: Validate data
graph.addTask('validateData', async (parsedData) => {
console.log('Validating data...');
return validateData(parsedData);
}, ['parseData']);
// Step 3a: Process customer data
graph.addTask('processCustomers', async (parsedData) => {
console.log('Processing customer data...');
return processCustomerData(parsedData.customers);
}, ['validateData']);
// Step 3b: Process product data
graph.addTask('processProducts', async (parsedData) => {
console.log('Processing product data...');
return processProductData(parsedData.products);
}, ['validateData']);
// Step 3c: Process order data
graph.addTask('processOrders', async (parsedData) => {
console.log('Processing order data...');
return processOrderData(parsedData.orders);
}, ['validateData']);
// Step 4: Generate customer insights (depends on customer and order data)
graph.addTask('customerInsights', async (customers, orders) => {
console.log('Generating customer insights...');
return generateCustomerInsights(customers, orders);
}, ['processCustomers', 'processOrders']);
// Step 5: Generate product insights (depends on product and order data)
graph.addTask('productInsights', async (products, orders) => {
console.log('Generating product insights...');
return generateProductInsights(products, orders);
}, ['processProducts', 'processOrders']);
// Step 6: Generate final report (depends on all insights)
graph.addTask('finalReport', async (customerInsights, productInsights) => {
console.log('Generating final report...');
return generateFinalReport(customerInsights, productInsights);
}, ['customerInsights', 'productInsights']);
// Execute the workflow
const results = await graph.execute();
// Return the final report
return results.get('finalReport');
}
The task graph visualized:
Reactive Programming for Complex Concurrency
For the most complex concurrency scenarios, reactive programming provides a powerful paradigm.
Basic Observable Implementation
/**
* Simple Observable implementation
*/
class Observable {
constructor(subscribeFn) {
this.subscribeFn = subscribeFn;
}
subscribe(observer) {
// Create a subscription object
const subscription = {
unsubscribe: () => {
// Mark as unsubscribed
subscription.closed = true;
// Call teardown logic if available
if (typeof teardown === 'function') {
teardown();
}
},
closed: false
};
// Normalize the observer
const observerObj = typeof observer === 'function'
? { next: observer }
: observer;
// Setup the subscription
try {
// Call the subscribe function
const teardown = this.subscribeFn({
next: (value) => {
if (!subscription.closed && observerObj.next) {
try {
observerObj.next(value);
} catch (e) {
if (observerObj.error) {
observerObj.error(e);
}
}
}
},
error: (err) => {
if (!subscription.closed) {
subscription.closed = true;
if (observerObj.error) {
observerObj.error(err);
}
}
},
complete: () => {
if (!subscription.closed) {
subscription.closed = true;
if (observerObj.complete) {
observerObj.complete();
}
}
}
});
// If subscribeFn throws synchronously, unsubscribe will not be called
if (subscription.closed) {
// Cleanup immediately
if (typeof teardown === 'function') {
teardown();
}
}
} catch (e) {
// Handle synchronous errors
observerObj.error && observerObj.error(e);
subscription.closed = true;
}
return subscription;
}
/**
* Map the values emitted by this Observable
* @param {Function} mapFn - Function to map values
* @returns {Observable} - New Observable with mapped values
*/
map(mapFn) {
return new Observable(observer => {
return this.subscribe({
next: value => observer.next(mapFn(value)),
error: err => observer.error(err),
complete: () => observer.complete()
});
});
}
/**
* Filter the values emitted by this Observable
* @param {Function} filterFn - Function to filter values
* @returns {Observable} - New Observable with filtered values
*/
filter(filterFn) {
return new Observable(observer => {
return this.subscribe({
next: value => {
if (filterFn(value)) {
observer.next(value);
}
},
error: err => observer.error(err),
complete: () => observer.complete()
});
});
}
/**
* Merge this Observable with another
* @param {Observable} other - Other Observable to merge with
* @returns {Observable} - New Observable that emits values from both
*/
merge(other) {
return new Observable(observer => {
const subscriptionA = this.subscribe(observer);
const subscriptionB = other.subscribe(observer);
return () => {
subscriptionA.unsubscribe();
subscriptionB.unsubscribe();
};
});
}
/**
* Create an Observable from a Promise
* @param {Promise} promise - Promise to convert
* @returns {Observable} - Observable that emits the Promise result
*/
static fromPromise(promise) {
return new Observable(observer => {
promise
.then(value => {
observer.next(value);
observer.complete();
})
.catch(err => {
observer.error(err);
});
return () => {
// Nothing to do for a Promise
};
});
}
/**
* Create an Observable from events
* @param {EventTarget} target - DOM element or other event target
* @param {string} eventName - Name of the event to listen for
* @returns {Observable} - Observable that emits the events
*/
static fromEvent(target, eventName) {
return new Observable(observer => {
// Create the event handler
const handler = event => {
observer.next(event);
};
// Add the event listener
target.addEventListener(eventName, handler);
// Return a teardown function
return () => {
target.removeEventListener(eventName, handler);
};
});
}
}
// Example: Reactive data processing pipeline
function createSearchPipeline(searchInput) {
// Create an Observable from input events
const inputEvents = Observable.fromEvent(searchInput, 'input');
// Create a pipeline
const searchResults = inputEvents
// Map to the input value
.map(event => event.target.value)
// Only trigger search when input is at least 3 characters
.filter(text => text.length >= 3)
// Map to API call
.map(searchText => {
console.log(`Searching for: ${searchText}`);
// Convert the Promise to an Observable
return Observable.fromPromise(
fetch(`/api/search?q=${encodeURIComponent(searchText)}`)
.then(response => response.json())
);
});
return searchResults;
}
// Usage
const searchInput = document.getElementById('search');
const searchResults = createSearchPipeline(searchInput);
const subscription = searchResults.subscribe({
next: results => {
console.log('Search results:', results);
displayResults(results);
},
error: err => {
console.error('Search error:', err);
displayError(err);
}
});
// Later when no longer needed
subscription.unsubscribe();
Benefits of reactive programming:
- Declarative approach to handling asynchronous events
- Built-in support for cancellation
- Composable operations
- Elegant handling of complex data flows
- Automatic resource cleanup
Popular reactive libraries:
- RxJS - The most comprehensive reactive library for JavaScript
- Most - A simple, high-performance reactive library
- Bacon.js - Functional reactive programming for JavaScript
Best Practices for Concurrent Operations
Let's summarize the key best practices for managing concurrent operations effectively:
-
Choose the right concurrency pattern for the job:
- Use sequential operations when steps depend on each other
- Use parallel operations for independent tasks
- Use Promise.race for timeout or fastest-response patterns
- Use dependency graphs for complex workflows
-
Limit concurrency appropriately:
- Don't overwhelm servers with too many parallel requests
- Respect rate limits of external APIs
- Consider resource constraints (memory, network, CPU)
- Implement backoff strategies when encountering errors
-
Prioritize important operations:
- Use priority queues for critical tasks
- Load visible content first, defer off-screen content
- Consider the user experience when prioritizing
-
Handle errors gracefully:
- Don't let one failure block other operations
- Implement appropriate retry strategies
- Provide meaningful feedback to users
-
Optimize resource usage:
- Recycle and reuse resources when possible (e.g., worker pools)
- Clean up resources properly when they're no longer needed
- Consider batch processing for small operations
-
Keep the UI responsive:
- Offload heavy computation to Web Workers
- Use requestAnimationFrame for visual updates
- Break up long operations into smaller chunks
Practical Example: Data Processing Pipeline
Let's put everything together with a complete example of a data processing system that handles multiple concurrent operations.
Complete Data Processing System
// dataProcessor.js - A complete data processing system
class DataProcessor {
constructor(options = {}) {
this.concurrencyLimit = options.concurrencyLimit || 5;
this.apiRequestsPerSecond = options.apiRequestsPerSecond || 10;
this.maxRetries = options.maxRetries || 3;
this.workerCount = options.workerCount || 2;
// Initialize subsystems
this.initRateLimiter();
this.initWorkerPool();
this.initTaskQueue();
}
initRateLimiter() {
this.rateLimiter = new RateLimiter(this.apiRequestsPerSecond, this.apiRequestsPerSecond * 2);
// Rate-limited API fetch
this.fetchWithRateLimit = this.rateLimiter.wrap(
async (url, options) => {
const response = await fetch(url, options);
if (!response.ok) {
const error = new Error(`HTTP error ${response.status}`);
error.status = response.status;
error.retryable = response.status >= 500 || response.status === 429;
throw error;
}
return response;
}
);
}
initWorkerPool() {
// Initialize web worker pool for CPU-intensive operations
this.workerPool = new WorkerPool('data-worker.js', this.workerCount);
}
initTaskQueue() {
// Priority queue for tasks
this.taskQueue = new PriorityTaskQueue(this.concurrencyLimit);
}
/**
* Process a dataset
* @param {Object} options - Processing options
* @returns {Promise} - Processing results
*/
async processDataset(options) {
const {
datasetId,
transformations = [],
outputFormat = 'json',
priority = 'normal'
} = options;
console.log(`Processing dataset ${datasetId} with priority ${priority}`);
const startTime = performance.now();
try {
// Step 1: Fetch the dataset metadata
const metadata = await this.fetchDatasetMetadata(datasetId, priority);
console.log(`Dataset ${datasetId} metadata fetched:`, metadata);
// Step 2: Fetch the dataset content (with retry)
const content = await this.fetchWithRetry(
() => this.fetchDatasetContent(datasetId, metadata, priority),
{ maxRetries: this.maxRetries }
);
console.log(`Dataset ${datasetId} content fetched: ${content.length} bytes`);
// Step 3: Parse the dataset
const parsedData = await this.parseDataset(content, metadata.format, priority);
console.log(`Dataset ${datasetId} parsed: ${parsedData.length} records`);
// Step 4: Apply transformations in sequence
let transformedData = parsedData;
for (const transformation of transformations) {
transformedData = await this.applyTransformation(
transformedData,
transformation,
priority
);
console.log(`Transformation ${transformation.type} applied`);
}
// Step 5: Format the output
const result = await this.formatOutput(transformedData, outputFormat, priority);
const duration = performance.now() - startTime;
console.log(`Dataset ${datasetId} processed in ${duration}ms`);
return {
datasetId,
result,
metadata: {
recordCount: transformedData.length,
processingTime: duration,
outputSize: JSON.stringify(result).length
}
};
} catch (error) {
console.error(`Error processing dataset ${datasetId}:`, error);
throw error;
}
}
/**
* Process multiple datasets in parallel with limited concurrency
* @param {Array} datasetOptions - Array of dataset options
* @returns {Promise} - Processing results
*/
async processMultipleDatasets(datasetOptions) {
// Create task functions
const tasks = datasetOptions.map(options => {
return async () => {
return this.processDataset(options);
};
});
// Process with limited concurrency
const results = await this.processConcurrentTasks(
tasks,
Math.min(this.concurrencyLimit, datasetOptions.length)
);
return results;
}
// Helper methods
async fetchDatasetMetadata(datasetId, priority) {
return this.taskQueue.enqueue(
async () => {
console.log(`Fetching metadata for dataset ${datasetId}`);
const response = await this.fetchWithRateLimit(
`/api/datasets/${datasetId}/metadata`
);
return response.json();
},
priority
);
}
async fetchDatasetContent(datasetId, metadata, priority) {
return this.taskQueue.enqueue(
async () => {
console.log(`Fetching content for dataset ${datasetId}`);
const response = await this.fetchWithRateLimit(
`/api/datasets/${datasetId}/content`
);
// Choose the appropriate response method based on content type
const contentType = response.headers.get('Content-Type');
if (contentType?.includes('application/json')) {
return response.json();
} else if (contentType?.includes('text/csv')) {
return response.text();
} else {
// Default to text
return response.text();
}
},
priority
);
}
async parseDataset(content, format, priority) {
// For large datasets, use Web Workers to parse without blocking the UI
if (typeof content === 'string' && content.length > 1000000) {
// Large dataset, use Web Worker
return this.workerPool.execute('parseDataset', { content, format });
} else {
// Smaller dataset, process in the main thread
return this.taskQueue.enqueue(
async () => {
console.log(`Parsing ${format} dataset`);
switch (format) {
case 'json':
return typeof content === 'string' ? JSON.parse(content) : content;
case 'csv':
return parseCSV(content);
case 'xml':
return parseXML(content);
default:
throw new Error(`Unsupported format: ${format}`);
}
},
priority
);
}
}
async applyTransformation(data, transformation, priority) {
const { type, params } = transformation;
// CPU-intensive transformations should use Web Workers
if (type === 'filter' || type === 'map' || type === 'aggregate') {
return this.workerPool.execute('transform', {
data,
transformation: { type, params }
});
} else {
// Other transformations can run in the main thread
return this.taskQueue.enqueue(
async () => {
console.log(`Applying ${type} transformation with params:`, params);
switch (type) {
case 'sort':
return data.slice().sort((a, b) => {
const field = params.field;
const direction = params.direction === 'desc' ? -1 : 1;
return (a[field] > b[field] ? 1 : -1) * direction;
});
case 'group':
return groupData(data, params);
case 'join':
return joinData(data, params.otherData, params);
default:
throw new Error(`Unsupported transformation: ${type}`);
}
},
priority
);
}
}
async formatOutput(data, format, priority) {
return this.taskQueue.enqueue(
async () => {
console.log(`Formatting output as ${format}`);
switch (format) {
case 'json':
return data;
case 'csv':
return convertToCSV(data);
case 'xml':
return convertToXML(data);
case 'html':
return convertToHTML(data);
default:
throw new Error(`Unsupported output format: ${format}`);
}
},
priority
);
}
async fetchWithRetry(requestFn, options = {}) {
const {
maxRetries = this.maxRetries,
initialDelay = 1000,
maxDelay = 10000,
factor = 2,
jitter = 0.1
} = options;
let retries = 0;
let delay = initialDelay;
while (true) {
try {
return await requestFn();
} catch (error) {
// If this is not a retryable error, throw immediately
if (error.retryable === false) {
throw error;
}
// If we've reached the maximum number of retries, throw
if (retries >= maxRetries) {
console.warn(`Maximum retries (${maxRetries}) reached`);
throw error;
}
// Increment retry counter
retries++;
// Calculate the next delay with jitter
delay = Math.min(
maxDelay,
delay * factor * (1 + jitter * (Math.random() * 2 - 1))
);
console.log(`Retry ${retries} after ${Math.round(delay)}ms delay`);
// Wait before retrying
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
async processConcurrentTasks(tasks, concurrency) {
const results = [];
const executing = new Set();
// Convert each task to a promise that updates the executing set
const enqueue = async function (taskIndex) {
const task = tasks[taskIndex];
executing.add(task);
try {
const result = await task();
results[taskIndex] = { status: 'success', data: result };
} catch (error) {
results[taskIndex] = { status: 'error', error };
} finally {
executing.delete(task);
}
};
// Execute tasks with limited concurrency
let index = 0;
const startTasks = [];
// Start initial batch of tasks
while (index < tasks.length && executing.size < concurrency) {
startTasks.push(enqueue(index++));
}
// Process remaining tasks as others complete
await Promise.all(startTasks);
while (index < tasks.length) {
await enqueue(index++);
}
return results;
}
// Cleanup
destroy() {
// Terminate Web Workers
if (this.workerPool) {
this.workerPool.terminate();
}
}
}
// Usage example
async function runDemo() {
// Create the data processor
const processor = new DataProcessor({
concurrencyLimit: 3,
apiRequestsPerSecond: 5,
maxRetries: 3,
workerCount: 4
});
// Define datasets to process
const datasets = [
{
datasetId: 'sales-2025-q1',
transformations: [
{ type: 'filter', params: { field: 'amount', operator: 'gt', value: 1000 } },
{ type: 'sort', params: { field: 'date', direction: 'asc' } }
],
outputFormat: 'json',
priority: 'high'
},
{
datasetId: 'customers-2025',
transformations: [
{ type: 'map', params: {
fields: ['id', 'name', 'email', 'totalPurchases']
} },
{ type: 'sort', params: { field: 'totalPurchases', direction: 'desc' } }
],
outputFormat: 'csv',
priority: 'normal'
},
{
datasetId: 'products-inventory',
transformations: [
{ type: 'filter', params: { field: 'stock', operator: 'lt', value: 10 } },
{ type: 'sort', params: { field: 'restockDate', direction: 'asc' } }
],
outputFormat: 'json',
priority: 'high'
}
];
try {
// Process all datasets
console.log('Starting batch processing');
const startTime = performance.now();
const results = await processor.processMultipleDatasets(datasets);
const totalTime = performance.now() - startTime;
console.log(`All datasets processed in ${totalTime}ms`);
// Analyze results
const successful = results.filter(r => r.status === 'success').length;
const failed = results.filter(r => r.status === 'error').length;
console.log(`Processed ${results.length} datasets: ${successful} successful, ${failed} failed`);
// Generate summary report
const report = {
totalDatasets: results.length,
successful,
failed,
totalProcessingTime: totalTime,
datasetDetails: results.map((result, index) => {
const dataset = datasets[index];
if (result.status === 'success') {
return {
id: dataset.datasetId,
status: 'success',
recordCount: result.data.metadata.recordCount,
processingTime: result.data.metadata.processingTime
};
} else {
return {
id: dataset.datasetId,
status: 'error',
error: result.error.message
};
}
})
};
console.log('Processing report:', report);
return report;
} catch (error) {
console.error('Batch processing failed:', error);
throw error;
} finally {
// Clean up resources
processor.destroy();
}
}
Practice Exercises
Exercise 1: Concurrent API Client
Build a reusable API client that:
- Limits concurrent requests to avoid overwhelming servers
- Implements rate limiting based on API requirements
- Handles retries with exponential backoff
- Supports request prioritization
- Provides meaningful progress updates
Exercise 2: Image Gallery with Optimized Loading
Create an image gallery that:
- Loads visible images first
- Pre-loads images just outside the viewport
- Limits concurrent image loading
- Processes images in Web Workers
- Cancels loading of images that scroll out of view
Exercise 3: Task Dependency Scheduler
Implement a task scheduler that:
- Allows defining tasks with dependencies
- Executes independent tasks in parallel
- Respects dependency ordering
- Supports dynamic task addition
- Provides visualization of the task execution flow
Summary
Managing concurrent operations effectively is a critical skill for modern JavaScript development. We've explored various approaches and patterns:
- Built-in Promise combinators (all, allSettled, race, any) for different concurrency scenarios
- Comparing sequential vs. parallel execution patterns
- Concurrency limiting techniques to avoid overwhelming resources
- Rate limiting and backoff strategies for external API interactions
- Web Workers for true parallelism with CPU-intensive tasks
- Task dependency management for complex workflows
- Reactive programming for sophisticated asynchronous data flows
By understanding these patterns and knowing when to apply each, you can build more efficient, responsive applications that make the most of available resources while providing a smooth user experience.