Async Generators and Iteration

Working with asynchronous data streams and iterator patterns

Introduction to Asynchronous Iteration

JavaScript has powerful features for handling streams of data with generators and iterators. Combining these with asynchronous programming creates extremely flexible patterns for working with data that arrives over time or in chunks.

Think of asynchronous generators like a water dispenser that produces cups of water on demand. You don't need to wait for all cups to be filled at once, and the dispenser can take time between each cup to refill. This allows for efficient processing of large or slow data streams without blocking the main thread.

graph LR A[Async Data Source] --> B[Async Generator] B --> |yield| C[Value 1] B --> |yield| D[Value 2] B --> |yield| E[Value 3] C --> F[Consumer] D --> F E --> F F --> |next()| B

Generators and Iterators Refresher

Before diving into asynchronous generators, let's review synchronous generators and iterators, which form the foundation of this pattern.

Iterators: The Protocol

An iterator is an object that implements the iterator protocol with a next() method that returns an object with value and done properties.

Custom Iterator Implementation


// Custom iterator for a range of numbers
function createRangeIterator(start, end) {
  let current = start;
  
  // Return an iterator object
  return {
    // The next() method is the core of the iterator protocol
    next() {
      if (current <= end) {
        return { value: current++, done: false };
      } else {
        return { done: true };
      }
    }
  };
}

// Usage example
const iterator = createRangeIterator(1, 5);

let result = iterator.next();
while (!result.done) {
  console.log(result.value); // 1, 2, 3, 4, 5
  result = iterator.next();
}
          

Iterables: Objects That Can Be Iterated

An iterable is an object that implements the iterable protocol by providing a method with the key Symbol.iterator that returns an iterator.

Custom Iterable Implementation


// Custom iterable for a range of numbers
function createRange(start, end) {
  // Return an iterable object
  return {
    // The Symbol.iterator method makes an object iterable
    [Symbol.iterator]() {
      let current = start;
      
      // Return an iterator
      return {
        next() {
          if (current <= end) {
            return { value: current++, done: false };
          } else {
            return { done: true };
          }
        }
      };
    }
  };
}

// Usage with for...of loop
const range = createRange(1, 5);

for (const num of range) {
  console.log(num); // 1, 2, 3, 4, 5
}

// Or use spread operator
const numbers = [...range]; // [1, 2, 3, 4, 5]
          

Generators: Functions That Generate Iterables

Generators provide a more concise way to create iterables using the function* syntax and yield keyword.

Generator Function


// Generator function for a range of numbers
function* rangeGenerator(start, end) {
  for (let i = start; i <= end; i++) {
    yield i;
  }
}

// Usage with for...of loop
for (const num of rangeGenerator(1, 5)) {
  console.log(num); // 1, 2, 3, 4, 5
}

// Get iterator explicitly
const iterator = rangeGenerator(1, 5);
console.log(iterator.next()); // { value: 1, done: false }
console.log(iterator.next()); // { value: 2, done: false }
console.log(iterator.next()); // { value: 3, done: false }
console.log(iterator.next()); // { value: 4, done: false }
console.log(iterator.next()); // { value: 5, done: false }
console.log(iterator.next()); // { value: undefined, done: true }
          

Key Generator Features

Generators have several powerful features beyond basic iteration:

Two-Way Communication with Generators


// Generator with two-way communication
function* communicationGenerator() {
  console.log('Generator started');
  
  // yield can receive a value from next()
  const a = yield 'First yield';
  console.log('Received:', a);
  
  const b = yield 'Second yield';
  console.log('Received:', b);
  
  return 'Final return value';
}

const generator = communicationGenerator();

// First next() starts the generator
console.log(generator.next()); // { value: 'First yield', done: false }

// Second next() sends a value to the waiting yield
console.log(generator.next('Value for a')); // { value: 'Second yield', done: false }

// Third next() sends another value
console.log(generator.next('Value for b')); // { value: 'Final return value', done: true }
          

Early Termination with return() and throw()


function* sampleGenerator() {
  try {
    yield 1;
    yield 2;
    yield 3;
  } catch (error) {
    console.log('Caught error:', error);
    yield 'error recovery';
  } finally {
    console.log('Generator cleanup');
  }
}

// Normal iteration
const g1 = sampleGenerator();
console.log(g1.next()); // { value: 1, done: false }
console.log(g1.next()); // { value: 2, done: false }

// Early termination with return
console.log(g1.return('early end')); // { value: 'early end', done: true }
// 'Generator cleanup' is logged

// Using throw
const g2 = sampleGenerator();
console.log(g2.next()); // { value: 1, done: false }
console.log(g2.throw(new Error('Something went wrong')));
// Logs: 'Caught error: Error: Something went wrong'
// Returns: { value: 'error recovery', done: false }
          

Generator Composition with yield*


// Compose generators using yield*
function* partOne() {
  yield 1;
  yield 2;
}

function* partTwo() {
  yield 3;
  yield 4;
}

function* combined() {
  yield* partOne();
  yield* partTwo();
  yield 5;
}

// Usage
for (const value of combined()) {
  console.log(value); // 1, 2, 3, 4, 5
}
          

Real-world applications of generators:

Asynchronous Iterators and For-Await-Of

Asynchronous iterators extend the iterator protocol to work with asynchronous values, where next() returns a Promise that resolves to the usual { value, done } object.

Asynchronous Iterator Implementation


// Custom asynchronous iterable for delayed values
function createAsyncIterable(values, delayMs = 1000) {
  return {
    // Symbol.asyncIterator defines an async iterable
    [Symbol.asyncIterator]() {
      let index = 0;
      
      return {
        async next() {
          if (index < values.length) {
            // Simulate async delay
            await new Promise(resolve => setTimeout(resolve, delayMs));
            
            return { value: values[index++], done: false };
          } else {
            return { done: true };
          }
        }
      };
    }
  };
}

// Usage with for-await-of loop
async function consumeAsyncIterable() {
  const asyncIterable = createAsyncIterable([1, 2, 3, 4, 5]);
  
  // The for-await-of loop works with async iterables
  for await (const value of asyncIterable) {
    console.log(value); // 1, 2, 3, 4, 5 (each after ~1 second delay)
  }
  
  console.log('Done consuming async values');
}

// Call the async function
consumeAsyncIterable();
          

Key points about asynchronous iterators:

Practical Example: Paginated API Requests

A common use case for asynchronous iterators is handling paginated API responses:

Paginated API Client with Async Iteration


// Paginated API client that implements Symbol.asyncIterator
class PaginatedAPI {
  constructor(endpoint, pageSize = 10) {
    this.endpoint = endpoint;
    this.pageSize = pageSize;
  }
  
  // Make the API client iterable asynchronously
  [Symbol.asyncIterator]() {
    let currentPage = 1;
    let hasMorePages = true;
    
    return {
      async next() {
        if (!hasMorePages) {
          return { done: true };
        }
        
        try {
          // Fetch the current page
          const response = await fetch(
            `${this.endpoint}?page=${currentPage}&pageSize=${this.pageSize}`
          );
          
          if (!response.ok) {
            throw new Error(`API error: ${response.status}`);
          }
          
          const data = await response.json();
          
          // Check if there are more pages
          hasMorePages = data.hasMore || data.next != null;
          
          // Increment the page for next fetch
          currentPage++;
          
          // Return the items from this page
          return { value: data.items, done: false };
        } catch (error) {
          // Handle error (could also propagate it)
          console.error('Error fetching page:', error);
          hasMorePages = false;
          return { done: true };
        }
      }
    };
  }
}

// Usage example
async function fetchAllUsers() {
  const usersAPI = new PaginatedAPI('/api/users', 20);
  const allUsers = [];
  
  // Process each page of users as it arrives
  for await (const usersPage of usersAPI) {
    console.log(`Received page with ${usersPage.length} users`);
    
    // Process this page
    for (const user of usersPage) {
      allUsers.push(user);
      
      // Could do more processing here
      console.log(`Processing user: ${user.name}`);
    }
  }
  
  console.log(`Fetched ${allUsers.length} users in total`);
  return allUsers;
}
          

Benefits of this approach:

Async Generators: The Best of Both Worlds

Async generators combine the concise syntax of generators with the asynchronous behavior of async iterators, providing an elegant way to work with streams of asynchronous data.

Basic Async Generator


// Async generator function with delayed values
async function* delayedNumbers(start, end, delayMs = 1000) {
  for (let i = start; i <= end; i++) {
    // Pause execution and wait for the specified delay
    await new Promise(resolve => setTimeout(resolve, delayMs));
    
    // Yield the value (just like in a regular generator)
    yield i;
  }
}

// Usage with for-await-of
async function consumeAsyncGenerator() {
  // Iterate over async generator values as they become available
  for await (const number of delayedNumbers(1, 5, 1000)) {
    console.log(number); // 1, 2, 3, 4, 5 (each after ~1 second delay)
  }
}

// Call the function
consumeAsyncGenerator();
          

Key features of async generators:

Implementing the Paginated API with Async Generators

Let's reimplement the paginated API example using an async generator for even cleaner code:

Paginated API with Async Generator


// Fetch paginated API results with an async generator
async function* fetchPaginatedData(endpoint, pageSize = 10) {
  let currentPage = 1;
  let hasMorePages = true;
  
  while (hasMorePages) {
    // Fetch the current page
    const response = await fetch(
      `${endpoint}?page=${currentPage}&pageSize=${pageSize}`
    );
    
    if (!response.ok) {
      throw new Error(`API error: ${response.status}`);
    }
    
    const data = await response.json();
    
    // Check if there are more pages
    hasMorePages = data.hasMore || data.next != null;
    
    // Increment the page for next fetch
    currentPage++;
    
    // Yield the items from this page
    yield data.items;
  }
}

// Usage example
async function fetchAllUsers() {
  const allUsers = [];
  
  // Process each page of users as it arrives
  try {
    for await (const usersPage of fetchPaginatedData('/api/users', 20)) {
      console.log(`Received page with ${usersPage.length} users`);
      
      // Process this page
      for (const user of usersPage) {
        allUsers.push(user);
        console.log(`Processing user: ${user.name}`);
      }
    }
    
    console.log(`Fetched ${allUsers.length} users in total`);
    return allUsers;
  } catch (error) {
    console.error('Error fetching users:', error);
    throw error;
  }
}
          

Benefits of using async generators over manual async iterators:

Advanced Async Generator Patterns

Let's explore some more sophisticated patterns and techniques with async generators.

Pattern 1: Flattening Nested Async Data

When working with nested async data structures, async generators can help flatten them:

Flattening Nested Async Data


// Flatten a nested structure of async data sources
async function* flattenAsyncData(nestedData) {
  // If nestedData is an async iterable, iterate over its values
  if (nestedData && typeof nestedData[Symbol.asyncIterator] === 'function') {
    for await (const item of nestedData) {
      // Recursively yield* from each item
      yield* flattenAsyncData(item);
    }
  }
  // If nestedData is a regular iterable, iterate over its values
  else if (nestedData && typeof nestedData[Symbol.iterator] === 'function' && 
          !Array.isArray(nestedData) && typeof nestedData !== 'string') {
    for (const item of nestedData) {
      yield* flattenAsyncData(item);
    }
  }
  // If nestedData is an array, recursively process each element
  else if (Array.isArray(nestedData)) {
    for (const item of nestedData) {
      yield* flattenAsyncData(item);
    }
  }
  // Otherwise, nestedData is a leaf value - yield it directly
  else {
    yield nestedData;
  }
}

// Example usage: working with a complex nested structure
async function processNestedAsyncData() {
  // Create some nested async generators
  const level1 = async function* () {
    yield 1;
    yield async function* () {
      yield 2;
      yield [3, 4];
      yield async function* () {
        await new Promise(resolve => setTimeout(resolve, 100));
        yield 5;
      }();
    }();
    yield 6;
  }();
  
  // Flatten and process all values
  for await (const value of flattenAsyncData(level1)) {
    console.log('Flattened value:', value); // 1, 2, 3, 4, 5, 6
  }
}
          

Pattern 2: Backpressure and Flow Control

Async generators enable natural backpressure, where consuming data at a slow pace automatically throttles the producer.

Backpressure with Async Generators


// Producer of data that respects consumer's pace
async function* fastDataProducer() {
  let counter = 0;
  
  while (true) {
    // Simulate producing data faster than it can be consumed
    console.log(`Producing item ${counter}`);
    
    // Yield the item - this will pause until the consumer calls next()
    yield { id: counter++, timestamp: Date.now() };
    
    // Data is produced immediately after previous item is consumed
    // No artificial delay here - consumer controls the pace
  }
}

// Consumer that processes data slowly
async function slowConsumer() {
  const producer = fastDataProducer();
  
  // Process only 5 items
  for (let i = 0; i < 5; i++) {
    // Get the next item (this resumes the generator)
    const { value } = await producer.next();
    
    console.log(`Consuming item ${value.id} produced at ${value.timestamp}`);
    
    // Simulate slow processing
    await new Promise(resolve => setTimeout(resolve, 1000));
    
    console.log(`Finished processing item ${value.id}`);
  }
  
  console.log('Consumer finished');
}

// With this pattern, the producer is naturally throttled to match the
// consumer's pace, avoiding memory issues from buffering too much data.
          

Pattern 3: Bidirectional Communication with Async Generators

Just like regular generators, async generators support bidirectional communication:

Two-Way Communication with Async Generators


// Async generator with bidirectional communication
async function* communicativeGenerator() {
  console.log('Generator started');
  
  // First yield
  const message1 = yield 'Hello, what is your name?';
  console.log(`Received name: ${message1}`);
  
  // Simulate async operation
  await new Promise(resolve => setTimeout(resolve, 1000));
  
  // Second yield - using the received value
  const message2 = yield `Nice to meet you, ${message1}! How old are you?`;
  console.log(`Received age: ${message2}`);
  
  // Simulate async operation
  await new Promise(resolve => setTimeout(resolve, 1000));
  
  // Generate personalized response
  return `Thank you, ${message1}! ${message2} is a great age.`;
}

// Using the bidirectional generator
async function conversationDemo() {
  const conversation = communicativeGenerator();
  
  // Start the conversation
  const greeting = await conversation.next();
  console.log('Generator says:', greeting.value);
  
  // Send name and get next question
  const question = await conversation.next('Alice');
  console.log('Generator says:', question.value);
  
  // Send age and get final response
  const response = await conversation.next(30);
  console.log('Generator says:', response.value);
}
          

Pattern 4: Error Handling and Cancellation

Async generators provide elegant ways to handle errors and cancellation:

Error Handling and Cancellation with Async Generators


// Async generator with error handling and cancellation
async function* robustDataStream() {
  let itemCount = 0;
  let controller;
  
  // Create an AbortController for cancellation
  const signal = new AbortController().signal;
  
  try {
    while (true) {
      // Check if cancelled
      if (signal.aborted) {
        console.log('Stream was cancelled');
        break;
      }
      
      try {
        // Simulate potential error in data fetching
        if (Math.random() < 0.2) {
          throw new Error(`Random error at item ${itemCount}`);
        }
        
        // Simulate async data fetch
        const data = await fetchNextItem(itemCount);
        
        // Yield the result
        itemCount++;
        yield data;
      } catch (error) {
        // We can handle errors here and continue
        console.error('Error fetching item:', error);
        
        // Yield an error notification
        yield { error: error.message, recoverable: true };
        
        // Wait a bit before retrying
        await new Promise(resolve => setTimeout(resolve, 1000));
      }
    }
  } finally {
    // Cleanup resources when the generator exits
    console.log('Cleaning up stream resources');
    await releaseResources();
  }
  
  // Helper function to simulate fetching data
  async function fetchNextItem(id) {
    await new Promise(resolve => setTimeout(resolve, 500));
    return { id, data: `Data for item ${id}`, timestamp: Date.now() };
  }
  
  // Helper function to simulate resource cleanup
  async function releaseResources() {
    await new Promise(resolve => setTimeout(resolve, 200));
    console.log('Resources released');
  }
}

// Consumer with cancellation
async function consumeWithCancellation() {
  const controller = new AbortController();
  const signal = controller.signal;
  
  // Set a timeout to cancel after 5 seconds
  setTimeout(() => {
    console.log('Initiating cancellation');
    controller.abort();
  }, 5000);
  
  try {
    const stream = robustDataStream();
    
    for await (const item of stream) {
      // Process normal items
      if (!item.error) {
        console.log('Received item:', item);
      } else {
        console.warn('Received error notification:', item.error);
      }
      
      // Check for external cancellation
      if (signal.aborted) {
        console.log('Consumer detected cancellation');
        
        // Signal the generator to stop by calling return()
        await stream.return();
        break;
      }
      
      // Simulate processing time
      await new Promise(resolve => setTimeout(resolve, 300));
    }
  } catch (error) {
    console.error('Fatal stream error:', error);
  }
  
  console.log('Stream consumption completed');
}
          

Real-World Applications of Async Generators

Async generators shine in many real-world scenarios:

Application 1: Data Streaming and Processing

Async generators are perfect for processing large data streams like file uploads or downloads:

Processing a Large File Stream


// Stream and process large file data with chunks
async function* streamFile(url, chunkSize = 1024 * 1024) {
  // Fetch file with streaming enabled
  const response = await fetch(url);
  
  if (!response.ok) {
    throw new Error(`Failed to fetch file: ${response.status}`);
  }
  
  // Get a reader from the response body stream
  const reader = response.body.getReader();
  
  // Track total bytes processed
  let bytesProcessed = 0;
  let isDone = false;
  let chunk = new Uint8Array(0);
  
  // Process the stream in chunks
  while (!isDone) {
    // If we have a partial chunk from before
    if (chunk.length > 0) {
      // Try to read enough to complete our desired chunk size
      const { done, value } = await reader.read();
      isDone = done;
      
      // If we got more data, combine with existing chunk
      if (value) {
        const newChunk = new Uint8Array(chunk.length + value.length);
        newChunk.set(chunk);
        newChunk.set(value, chunk.length);
        chunk = newChunk;
      }
    } else {
      // Start fresh by reading new data
      const { done, value } = await reader.read();
      isDone = done;
      
      if (value) {
        chunk = value;
      }
    }
    
    // Process complete chunks (if we have enough data)
    while (chunk.length >= chunkSize && !isDone) {
      // Extract a complete chunk
      const completeChunk = chunk.slice(0, chunkSize);
      
      // Update the remaining chunk
      chunk = chunk.slice(chunkSize);
      
      // Update bytes processed
      bytesProcessed += completeChunk.length;
      
      // Yield the chunk and progress
      yield {
        chunk: completeChunk,
        bytesProcessed,
        progress: response.headers.get('Content-Length') 
          ? bytesProcessed / parseInt(response.headers.get('Content-Length')) 
          : null
      };
    }
    
    // If we're done and have a final partial chunk, yield it
    if (isDone && chunk.length > 0) {
      bytesProcessed += chunk.length;
      
      yield {
        chunk,
        bytesProcessed,
        progress: response.headers.get('Content-Length') 
          ? bytesProcessed / parseInt(response.headers.get('Content-Length')) 
          : null
      };
      
      chunk = new Uint8Array(0);
    }
  }
}

// Usage example: processing a large file
async function processLargeFile(url) {
  try {
    // Display a progress indicator
    const progressBar = document.getElementById('progress-bar');
    const progressLabel = document.getElementById('progress-label');
    
    // Initialize processing
    let totalChunks = 0;
    const startTime = performance.now();
    
    // Process the file in chunks
    for await (const { chunk, bytesProcessed, progress } of streamFile(url)) {
      // Do something with each chunk (e.g., parse CSV data)
      const chunkData = processChunk(chunk);
      
      // Update progress
      totalChunks++;
      if (progress !== null) {
        progressBar.value = progress * 100;
        progressLabel.textContent = `${Math.round(progress * 100)}% (${formatBytes(bytesProcessed)})`;
      } else {
        progressLabel.textContent = `Chunks: ${totalChunks}, Bytes: ${formatBytes(bytesProcessed)}`;
      }
      
      // Allow UI to update by yielding to the event loop
      await new Promise(resolve => setTimeout(resolve, 0));
    }
    
    const elapsedTime = performance.now() - startTime;
    console.log(`File processing complete in ${elapsedTime}ms`);
    progressLabel.textContent = 'Complete!';
  } catch (error) {
    console.error('Error processing file:', error);
    document.getElementById('error-message').textContent = error.message;
  }
}

// Helper function to format bytes
function formatBytes(bytes) {
  if (bytes === 0) return '0 Bytes';
  
  const k = 1024;
  const sizes = ['Bytes', 'KB', 'MB', 'GB'];
  const i = Math.floor(Math.log(bytes) / Math.log(k));
  
  return parseFloat((bytes / Math.pow(k, i)).toFixed(2)) + ' ' + sizes[i];
}

// Helper function to process a chunk of data
function processChunk(chunk) {
  // Example: decode text and process
  const text = new TextDecoder().decode(chunk);
  const lines = text.split('\n').filter(line => line.trim());
  
  // Process each line
  return lines.map(line => {
    // Example processing (e.g., parsing CSV)
    return line.split(',');
  });
}
          

Application 2: Infinite Scrolling and Lazy Loading

Async generators can power efficient infinite scrolling interfaces:

Infinite Scrolling Image Gallery


// Infinite scrolling image gallery using async generators
class InfiniteGallery {
  constructor(apiEndpoint, containerElement) {
    this.apiEndpoint = apiEndpoint;
    this.container = containerElement;
    this.page = 1;
    this.loading = false;
    this.hasMore = true;
    this.observer = null;
    
    // Create a sentinel element for intersection observer
    this.sentinel = document.createElement('div');
    this.sentinel.className = 'sentinel';
    this.container.appendChild(this.sentinel);
    
    // Initialize the gallery
    this.init();
  }
  
  // Initialize the gallery
  init() {
    // Create an async generator for images
    this.imageStream = this.getImages();
    
    // Set up intersection observer for infinite scrolling
    this.observer = new IntersectionObserver(entries => {
      if (entries[0].isIntersecting && !this.loading && this.hasMore) {
        this.loadMoreImages();
      }
    });
    
    this.observer.observe(this.sentinel);
    
    // Load initial images
    this.loadMoreImages();
  }
  
  // Async generator for fetching images
  async* getImages() {
    while (this.hasMore) {
      this.loading = true;
      
      try {
        const response = await fetch(`${this.apiEndpoint}?page=${this.page}&limit=10`);
        
        if (!response.ok) {
          throw new Error(`API error: ${response.status}`);
        }
        
        const data = await response.json();
        
        // Check if there are more images
        this.hasMore = data.hasMore;
        
        // Increment page for next fetch
        this.page++;
        
        // Yield each image individually
        for (const image of data.images) {
          yield image;
        }
      } catch (error) {
        console.error('Error fetching images:', error);
        this.hasMore = false;
        yield { error: error.message };
      } finally {
        this.loading = false;
      }
    }
  }
  
  // Load more images from the generator
  async loadMoreImages() {
    if (this.loading || !this.hasMore) return;
    
    // Show loading indicator
    this.showLoading(true);
    
    // Load the next batch of images
    for (let i = 0; i < 10; i++) {
      const { value, done } = await this.imageStream.next();
      
      if (done) {
        this.hasMore = false;
        break;
      }
      
      if (value.error) {
        this.showError(value.error);
      } else {
        this.renderImage(value);
      }
      
      // Small delay between rendering images for smoother experience
      await new Promise(resolve => setTimeout(resolve, 50));
    }
    
    // Hide loading indicator
    this.showLoading(false);
  }
  
  // Render an image in the container
  renderImage(image) {
    const imageElement = document.createElement('div');
    imageElement.className = 'gallery-item';
    
    imageElement.innerHTML = `
      <img src="${image.thumbnailUrl}" alt="${image.title}" loading="lazy" />
      <div class="caption">${image.title}</div>
    `;
    
    // Insert before the sentinel
    this.container.insertBefore(imageElement, this.sentinel);
    
    // Add click handler for full-size view
    imageElement.addEventListener('click', () => {
      this.showFullSize(image);
    });
  }
  
  // Show loading indicator
  showLoading(isLoading) {
    if (isLoading) {
      const loader = document.createElement('div');
      loader.className = 'loader';
      loader.textContent = 'Loading more images...';
      this.container.insertBefore(loader, this.sentinel);
    } else {
      const loader = this.container.querySelector('.loader');
      if (loader) {
        loader.remove();
      }
    }
  }
  
  // Show error message
  showError(message) {
    const errorElement = document.createElement('div');
    errorElement.className = 'error-message';
    errorElement.textContent = `Error: ${message}`;
    this.container.insertBefore(errorElement, this.sentinel);
  }
  
  // Show full-size image
  showFullSize(image) {
    // Create modal for full-size image
    const modal = document.createElement('div');
    modal.className = 'image-modal';
    
    modal.innerHTML = `
      <div class="modal-content">
        <span class="close">×</span>
        <img src="${image.fullUrl}" alt="${image.title}" />
        <div class="modal-caption">
          <h3>${image.title}</h3>
          <p>${image.description}</p>
        </div>
      </div>
    `;
    
    document.body.appendChild(modal);
    
    // Add close handler
    modal.querySelector('.close').addEventListener('click', () => {
      modal.remove();
    });
    
    // Close on background click
    modal.addEventListener('click', (event) => {
      if (event.target === modal) {
        modal.remove();
      }
    });
  }
  
  // Clean up resources
  destroy() {
    if (this.observer) {
      this.observer.disconnect();
    }
    
    // Stop the generator by calling return()
    if (this.imageStream && this.imageStream.return) {
      this.imageStream.return();
    }
  }
}

// Usage
document.addEventListener('DOMContentLoaded', () => {
  const galleryContainer = document.getElementById('image-gallery');
  const gallery = new InfiniteGallery('/api/images', galleryContainer);
  
  // Clean up when leaving the page
  window.addEventListener('beforeunload', () => {
    gallery.destroy();
  });
});
          

Application 3: Real-Time Data Processing

Async generators can effectively process real-time data from WebSockets or Server-Sent Events:

WebSocket Stream Processing


// Create an async generator from WebSocket events
async function* createWebSocketStream(url) {
  // Create a new WebSocket connection
  const socket = new WebSocket(url);
  
  // Set up a promise-based message queue
  const messageQueue = [];
  let resolveNextMessage;
  
  // Track if the socket is closed
  let socketClosed = false;
  let error = null;
  
  // Set up event handlers
  socket.addEventListener('message', (event) => {
    // Parse the message data
    try {
      const data = JSON.parse(event.data);
      
      // If we have a waiting promise, resolve it
      if (resolveNextMessage) {
        const resolve = resolveNextMessage;
        resolveNextMessage = null;
        resolve({ value: data, done: false });
      } else {
        // Otherwise, queue the message
        messageQueue.push(data);
      }
    } catch (e) {
      console.error('Error parsing WebSocket message:', e);
    }
  });
  
  socket.addEventListener('close', (event) => {
    console.log(`WebSocket closed: code=${event.code}, reason=${event.reason}`);
    socketClosed = true;
    
    // If we have a waiting promise, resolve it as done
    if (resolveNextMessage) {
      const resolve = resolveNextMessage;
      resolveNextMessage = null;
      resolve({ done: true });
    }
  });
  
  socket.addEventListener('error', (event) => {
    console.error('WebSocket error:', event);
    error = new Error('WebSocket error');
    
    // If we have a waiting promise, reject it
    if (resolveNextMessage) {
      const resolve = resolveNextMessage;
      resolveNextMessage = null;
      resolve({ error, done: true });
    }
  });
  
  // Wait for the socket to connect
  await new Promise((resolve, reject) => {
    socket.addEventListener('open', resolve);
    socket.addEventListener('error', reject);
  });
  
  try {
    // Create the generator's next method
    while (!socketClosed) {
      // If we have queued messages, yield the next one
      if (messageQueue.length > 0) {
        yield messageQueue.shift();
      } else {
        // Otherwise, wait for the next message
        const next = await new Promise(resolve => {
          resolveNextMessage = resolve;
        });
        
        // If we're done or have an error, handle it
        if (next.done) {
          break;
        }
        
        if (next.error) {
          throw next.error;
        }
        
        // Yield the message
        yield next.value;
      }
    }
  } finally {
    // Clean up
    if (socket.readyState === WebSocket.OPEN || 
        socket.readyState === WebSocket.CONNECTING) {
      socket.close();
    }
  }
}

// Example: Real-time stock price tracker using WebSocket
async function trackStockPrices(symbols) {
  try {
    // Connect to a WebSocket API that streams stock prices
    const socketUrl = `wss://api.example.com/stocks?symbols=${symbols.join(',')}`;
    const priceStream = createWebSocketStream(socketUrl);
    
    // Process
    // Process price updates as they arrive
    for await (const update of priceStream) {
      console.log(`${update.symbol}: $${update.price} (${update.change > 0 ? '+' : ''}${update.change}%)`);
      
      // Update UI
      updateStockDisplay(update);
      
      // Alert on significant price movements
      if (Math.abs(update.change) > 5) {
        sendPriceAlert(update);
      }
      
      // Store in historical data
      recordPriceHistory(update);
    }
  } catch (error) {
    console.error('Error tracking stock prices:', error);
    displayErrorMessage('Connection to price feed lost. Retrying in 5 seconds...');
    
    // Retry after a delay
    setTimeout(() => trackStockPrices(symbols), 5000);
  }
}

// Update the stock display in the UI
function updateStockDisplay(update) {
  const element = document.querySelector(`.stock-item[data-symbol="${update.symbol}"]`);
  
  if (element) {
    element.querySelector('.price').textContent = `$${update.price.toFixed(2)}`;
    
    const changeElement = element.querySelector('.change');
    changeElement.textContent = `${update.change > 0 ? '+' : ''}${update.change.toFixed(2)}%`;
    changeElement.className = `change ${update.change >= 0 ? 'positive' : 'negative'}`;
  }
}
          

Application 4: Data Transformation Pipelines

Async generators can create powerful data processing pipelines:

Data Transformation Pipeline


// Create a pipeline of async generators for data processing
async function* fetchDataSource(url) {
  const response = await fetch(url);
  
  if (!response.ok) {
    throw new Error(`Failed to fetch data: ${response.status}`);
  }
  
  const data = await response.json();
  
  for (const item of data) {
    yield item;
  }
}

// Filter pipeline stage
async function* filterItems(source, predicate) {
  for await (const item of source) {
    if (predicate(item)) {
      yield item;
    }
  }
}

// Transform pipeline stage
async function* transformItems(source, transformFn) {
  for await (const item of source) {
    yield transformFn(item);
  }
}

// Batch pipeline stage
async function* batchItems(source, batchSize) {
  let batch = [];
  
  for await (const item of source) {
    batch.push(item);
    
    if (batch.length >= batchSize) {
      yield batch;
      batch = [];
    }
  }
  
  // Yield any remaining items
  if (batch.length > 0) {
    yield batch;
  }
}

// Throttle pipeline stage
async function* throttle(source, intervalMs) {
  let lastYieldTime = 0;
  
  for await (const item of source) {
    const now = Date.now();
    const timeToWait = Math.max(0, intervalMs - (now - lastYieldTime));
    
    if (timeToWait > 0) {
      await new Promise(resolve => setTimeout(resolve, timeToWait));
    }
    
    lastYieldTime = Date.now();
    yield item;
  }
}

// Example usage: Processing user data
async function processUserData() {
  try {
    // Create the source
    const users = fetchDataSource('/api/users');
    
    // Create a pipeline of transformations
    const activeUsersOnly = filterItems(users, user => user.status === 'active');
    
    const enrichedUsers = transformItems(activeUsersOnly, async user => {
      // Fetch additional data for each user
      const details = await fetch(`/api/users/${user.id}/details`).then(r => r.json());
      const activity = await fetch(`/api/users/${user.id}/activity`).then(r => r.json());
      
      return {
        ...user,
        details,
        activity
      };
    });
    
    // Process in batches of 5, and throttle to avoid overloading
    const userBatches = batchItems(throttle(enrichedUsers, 500), 5);
    
    // Process each batch
    for await (const batch of userBatches) {
      console.log(`Processing batch of ${batch.length} users`);
      
      // Process the batch (e.g., update database)
      await processBatch(batch);
      
      // Update progress in the UI
      updateProgressUI(batch);
    }
    
    console.log('User processing complete');
  } catch (error) {
    console.error('Error processing users:', error);
  }
}

// Helper function to process a batch of users
async function processBatch(users) {
  // Simulate processing
  await new Promise(resolve => setTimeout(resolve, 1000));
  console.log(`Processed ${users.length} users`);
}

// Helper function to update the UI
function updateProgressUI(batch) {
  const progressElement = document.getElementById('progress');
  const processed = parseInt(progressElement.getAttribute('data-processed') || '0');
  const newProcessed = processed + batch.length;
  
  progressElement.setAttribute('data-processed', newProcessed);
  progressElement.textContent = `Processed ${newProcessed} users`;
}
          

Performance Considerations

When using async generators, consider these performance aspects:

Benefits

Potential Drawbacks

Performance Optimizations


// Batch processing to reduce overhead
async function* efficientBatchProcessor(source, batchSize = 100) {
  const items = [];
  
  for await (const item of source) {
    items.push(item);
    
    // When we have a full batch, yield it all at once
    if (items.length >= batchSize) {
      // Process the batch
      const processedItems = await processBatch(items);
      
      // Yield each processed item
      for (const processedItem of processedItems) {
        yield processedItem;
      }
      
      // Clear the array (maintaining the same reference)
      items.length = 0;
    }
  }
  
  // Process any remaining items
  if (items.length > 0) {
    const processedItems = await processBatch(items);
    
    for (const processedItem of processedItems) {
      yield processedItem;
    }
  }
}

// Helper function to process a batch
async function processBatch(items) {
  // Example: process items in parallel with limited concurrency
  const results = [];
  const concurrency = 5;
  
  // Process in chunks of 'concurrency' size
  for (let i = 0; i < items.length; i += concurrency) {
    const chunk = items.slice(i, i + concurrency);
    
    // Process chunk in parallel
    const chunkResults = await Promise.all(
      chunk.map(item => processItem(item))
    );
    
    // Add results
    results.push(...chunkResults);
  }
  
  return results;
}

// Single item processing
async function processItem(item) {
  // Simulate processing
  await new Promise(resolve => setTimeout(resolve, 10));
  return { ...item, processed: true };
}
          

Using Async Generators with Libraries and Frameworks

Async generators integrate well with modern libraries and frameworks:

Integration with RxJS

Convert async generators to RxJS Observables for more powerful stream processing:

Async Generator to RxJS Observable


// Convert an async generator to an RxJS Observable
function observableFromAsyncGenerator(generator) {
  return new rxjs.Observable(subscriber => {
    const run = async () => {
      try {
        for await (const value of generator) {
          // Emit each value
          subscriber.next(value);
          
          // Check if subscriber is still interested
          if (subscriber.closed) {
            break;
          }
        }
        
        // Complete the Observable when generator is done
        subscriber.complete();
      } catch (error) {
        // Handle errors
        subscriber.error(error);
      }
    };
    
    // Start the generator
    run();
    
    // Return a cleanup function
    return () => {
      // If generator has a return method, call it to close the generator
      if (generator.return) {
        generator.return();
      }
    };
  });
}

// Usage example with RxJS operators
async function* numbersGenerator() {
  let i = 0;
  while (i < 100) {
    await new Promise(resolve => setTimeout(resolve, 100));
    yield i++;
  }
}

// Convert to Observable and use RxJS operators
const numbers$ = observableFromAsyncGenerator(numbersGenerator());

// Apply RxJS operators
numbers$.pipe(
  rxjs.operators.filter(n => n % 2 === 0),     // Only even numbers
  rxjs.operators.map(n => n * n),             // Square each number
  rxjs.operators.bufferCount(5),              // Group into batches of 5
  rxjs.operators.take(5)                      // Take only 5 batches
).subscribe({
  next: batch => console.log('Batch:', batch),
  error: err => console.error('Error:', err),
  complete: () => console.log('Complete')
});
          

Integration with React

Use async generators with React hooks for reactive UI updates:

React Hook for Async Generators


// React hook for consuming async generators
function useAsyncGenerator(generatorFn, dependencies = []) {
  // State for the generator values
  const [value, setValue] = React.useState(null);
  const [error, setError] = React.useState(null);
  const [isComplete, setIsComplete] = React.useState(false);
  const [isLoading, setIsLoading] = React.useState(true);
  
  // Ref to store the generator and abort controller
  const generatorRef = React.useRef(null);
  const abortControllerRef = React.useRef(new AbortController());
  
  // Start consuming the generator
  React.useEffect(() => {
    // Reset state on dependencies change
    setValue(null);
    setError(null);
    setIsComplete(false);
    setIsLoading(true);
    
    // Create a new abort controller
    const abortController = new AbortController();
    abortControllerRef.current = abortController;
    
    // Create the generator
    generatorRef.current = generatorFn();
    
    // Consume the generator
    const consumeGenerator = async () => {
      try {
        const generator = generatorRef.current;
        
        // Iterate over the generator
        while (!abortController.signal.aborted) {
          const { value: newValue, done } = await generator.next();
          
          if (done) {
            setIsComplete(true);
            setIsLoading(false);
            break;
          }
          
          // Update the value
          setValue(newValue);
          setIsLoading(false);
        }
      } catch (err) {
        if (!abortController.signal.aborted) {
          setError(err);
          setIsLoading(false);
        }
      }
    };
    
    // Start consuming
    consumeGenerator();
    
    // Cleanup function
    return () => {
      // Abort ongoing operations
      abortController.abort();
      
      // Close the generator if possible
      if (generatorRef.current && generatorRef.current.return) {
        generatorRef.current.return();
      }
    };
  }, dependencies);
  
  // Return the current state and a refresh function
  return {
    value,
    error,
    isComplete,
    isLoading,
    refresh: () => {
      // Abort current generator
      abortControllerRef.current.abort();
      
      // Close current generator
      if (generatorRef.current && generatorRef.current.return) {
        generatorRef.current.return();
      }
      
      // Reset state
      setValue(null);
      setError(null);
      setIsComplete(false);
      setIsLoading(true);
      
      // Create new abort controller
      const newAbortController = new AbortController();
      abortControllerRef.current = newAbortController;
      
      // Create new generator
      generatorRef.current = generatorFn();
      
      // Start consuming again
      const consumeGenerator = async () => {
        try {
          const generator = generatorRef.current;
          
          while (!newAbortController.signal.aborted) {
            const { value: newValue, done } = await generator.next();
            
            if (done) {
              setIsComplete(true);
              setIsLoading(false);
              break;
            }
            
            setValue(newValue);
            setIsLoading(false);
          }
        } catch (err) {
          if (!newAbortController.signal.aborted) {
            setError(err);
            setIsLoading(false);
          }
        }
      };
      
      consumeGenerator();
    }
  };
}

// Example usage in a React component
function StockPriceTracker({ symbol }) {
  const { value: price, error, isLoading, refresh } = useAsyncGenerator(
    () => streamStockPrice(symbol),
    [symbol]
  );
  
  if (isLoading && !price) {
    return <div>Loading price for {symbol}...</div>;
  }
  
  if (error) {
    return (
      <div className="error">
        Error loading price: {error.message}
        <button onClick={refresh}>Retry</button>
      </div>
    );
  }
  
  return (
    <div className="stock-price">
      <h3>{symbol}</h3>
      <div className={`price ${price.change >= 0 ? 'positive' : 'negative'}`}>
        ${price.current.toFixed(2)}
        <span className="change">
          {price.change >= 0 ? '+' : ''}{price.change.toFixed(2)}%
        </span>
      </div>
      <button onClick={refresh}>Refresh</button>
    </div>
  );
}

// Async generator for streaming stock prices
async function* streamStockPrice(symbol) {
  const socket = new WebSocket(`wss://api.example.com/stocks/${symbol}`);
  
  try {
    // Wait for the connection to open
    await new Promise((resolve, reject) => {
      socket.addEventListener('open', resolve);
      socket.addEventListener('error', reject);
    });
    
    // Create a message queue
    const queue = [];
    let resolveNextMessage;
    
    // Set up message handler
    socket.addEventListener('message', event => {
      const data = JSON.parse(event.data);
      
      if (resolveNextMessage) {
        resolveNextMessage(data);
        resolveNextMessage = null;
      } else {
        queue.push(data);
      }
    });
    
    // Handle errors
    socket.addEventListener('error', event => {
      if (resolveNextMessage) {
        resolveNextMessage(new Error('WebSocket error'));
        resolveNextMessage = null;
      }
    });
    
    // Stream messages
    while (true) {
      let price;
      
      if (queue.length > 0) {
        // Get from queue
        price = queue.shift();
      } else {
        // Wait for next message
        price = await new Promise(resolve => {
          resolveNextMessage = resolve;
        });
      }
      
      // If we got an error, throw it
      if (price instanceof Error) {
        throw price;
      }
      
      // Yield the price
      yield price;
    }
  } finally {
    // Clean up the WebSocket
    if (socket.readyState === WebSocket.OPEN || 
        socket.readyState === WebSocket.CONNECTING) {
      socket.close();
    }
  }
}
          

Best Practices and Patterns

Let's summarize key best practices for working with async generators:

Comprehensive Async Generator Pattern


/**
 * Creates a robust async generator with proper error handling and cleanup
 * @param {Function} sourceFn - Function that creates the data source
 * @param {Function} processFn - Function to process each item
 * @param {Object} options - Configuration options
 * @returns {AsyncGenerator} - The configured async generator
 */
function createRobustGenerator(sourceFn, processFn, options = {}) {
  const {
    batchSize = 1,
    retries = 3,
    retryDelay = 1000,
    timeout = 30000,
    signal = null
  } = options;
  
  return async function* () {
    // Resources to clean up
    const resources = [];
    
    try {
      // Create the data source
      const source = await sourceFn();
      
      // Add to resources for cleanup
      if (typeof source.close === 'function') {
        resources.push(source);
      }
      
      // Create abort controller for timeout
      const timeoutController = new AbortController();
      const timeoutSignal = timeoutController.signal;
      
      // Set timeout
      const timeoutId = setTimeout(() => {
        timeoutController.abort();
      }, timeout);
      
      try {
        // Create batch array
        let batch = [];
        
        // Create a helper function for retries
        const withRetries = async (fn, item) => {
          let lastError;
          
          for (let attempt = 0; attempt <= retries; attempt++) {
            try {
              // Check if aborted
              if (signal?.aborted || timeoutSignal.aborted) {
                const reason = signal?.aborted ? signal.reason : new Error('Operation timed out');
                throw reason;
              }
              
              // Attempt the operation
              return await fn(item);
            } catch (error) {
              lastError = error;
              
              // If this was the last attempt, or the error isn't retryable, throw
              if (attempt >= retries || error.retryable === false) {
                throw error;
              }
              
              // Wait before retrying
              const delay = retryDelay * Math.pow(2, attempt) * (0.5 + Math.random());
              await new Promise(resolve => setTimeout(resolve, delay));
            }
          }
          
          // This shouldn't happen, but just in case
          throw lastError || new Error('Operation failed');
        };
        
        // Process the source
        for await (const item of source) {
          // Check if aborted
          if (signal?.aborted || timeoutSignal.aborted) {
            break;
          }
          
          // Add to batch
          batch.push(item);
          
          // When batch is full, process and yield
          if (batch.length >= batchSize) {
            // Process each item with retries
            for (const batchItem of batch) {
              try {
                const result = await withRetries(processFn, batchItem);
                yield result;
              } catch (error) {
                // Yield error result
                yield { error, item: batchItem };
              }
            }
            
            // Clear batch
            batch = [];
          }
        }
        
        // Process any remaining items
        for (const batchItem of batch) {
          try {
            const result = await withRetries(processFn, batchItem);
            yield result;
          } catch (error) {
            yield { error, item: batchItem };
          }
        }
      } finally {
        // Clear timeout
        clearTimeout(timeoutId);
      }
    } catch (error) {
      console.error('Generator error:', error);
      throw error;
    } finally {
      // Clean up resources
      for (const resource of resources) {
        try {
          if (typeof resource.close === 'function') {
            await resource.close();
          } else if (typeof resource.destroy === 'function') {
            await resource.destroy();
          }
        } catch (cleanupError) {
          console.error('Error during cleanup:', cleanupError);
        }
      }
    }
  }();
}

// Usage example
async function processDatabaseRecords() {
  // Create an AbortController for cancellation
  const controller = new AbortController();
  
  // Set a timeout to cancel after 2 minutes
  setTimeout(() => controller.abort('Operation took too long'), 2 * 60 * 1000);
  
  try {
    // Create the robust generator
    const recordProcessor = createRobustGenerator(
      // Source function - returns a database cursor
      async () => {
        const db = await connectToDatabase();
        return db.collection('records').find().sort({ date: 1 }).stream();
      },
      
      // Process function - handles individual records
      async (record) => {
        // Process the record
        const enriched = await enrichRecord(record);
        const validated = validateRecord(enriched);
        
        // Save the processed record
        await saveProcessedRecord(validated);
        
        return {
          id: record._id,
          status: 'processed',
          timestamp: new Date()
        };
      },
      
      // Options
      {
        batchSize: 10,
        retries: 3,
        retryDelay: 1000,
        timeout: 5 * 60 * 1000, // 5 minutes
        signal: controller.signal
      }
    );
    
    // Process records
    let processed = 0;
    let failed = 0;
    
    for await (const result of recordProcessor) {
      if (result.error) {
        console.error(`Failed to process record ${result.item._id}:`, result.error);
        failed++;
      } else {
        console.log(`Processed record ${result.id}`);
        processed++;
      }
      
      // Update progress
      updateProgress(processed, failed);
    }
    
    console.log(`Processing complete: ${processed} processed, ${failed} failed`);
  } catch (error) {
    console.error('Processing failed:', error);
  }
}
          

Practice Exercises

Exercise 1: Paginated API Client

Build an async generator-based API client that:

  • Handles paginated API requests
  • Supports filtering and sorting parameters
  • Implements proper error handling and retries
  • Provides progress updates as data is fetched

Exercise 2: Data Processing Pipeline

Create a data processing pipeline using async generators that:

  • Reads data from a source (file, API, etc.)
  • Filters data based on configurable criteria
  • Transforms data into a different format
  • Batches output for efficient processing
  • Writes results to a destination

Exercise 3: Real-Time Dashboard

Build a real-time dashboard component that:

  • Connects to a WebSocket or EventSource
  • Uses async generators to process incoming data
  • Implements throttling to prevent UI flooding
  • Provides clean resource management (connect/disconnect)
  • Handles connection errors and reconnection

Summary

Async generators provide a powerful and elegant way to work with asynchronous data streams in JavaScript:

By mastering async generators, you can write more efficient, clean, and maintainable code for handling complex asynchronous data flows in your applications.

Further Learning