Async Streams (IAsyncEnumerable<T>)
Overview
IAsyncEnumerable\<T\> combines the power of async/await with the simplicity of foreach loops, enabling you to process streams of data asynchronously as they become available.
What is IAsyncEnumerable<T>?
An asynchronous version of IEnumerable\<T\> that allows async iteration.
// Traditional IEnumerable\<T\> - synchronous
public IEnumerable<int> GetNumbers()
{
for (int i = 0; i < 10; i++)
{
yield return i;
}
}
// IAsyncEnumerable\<T\> - asynchronous
public async IAsyncEnumerable<int> GetNumbersAsync()
{
for (int i = 0; i < 10; i++)
{
await Task.Delay(100); // Simulate async work
yield return i;
}
}
// Consumption
await foreach (var number in GetNumbersAsync())
{
Console.WriteLine(number); // Printed as they arrive
}
Basic Usage
Producer Pattern
public async IAsyncEnumerable<string> ReadLinesAsync(string filePath)
{
using var reader = new StreamReader(filePath);
while (!reader.EndOfStream)
{
var line = await reader.ReadLineAsync();
if (line != null)
yield return line;
}
}
// Usage
await foreach (var line in ReadLinesAsync("data.txt"))
{
Console.WriteLine(line);
}
Consumer with CancellationToken
public async IAsyncEnumerable\<T\> GetDataAsync\<T\>(
[EnumeratorCancellation] CancellationToken ct = default)
{
while (!ct.IsCancellationRequested)
{
var data = await FetchDataAsync(ct);
yield return data;
await Task.Delay(1000, ct);
}
}
// Usage with cancellation
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await foreach (var item in GetDataAsync<Data>(cts.Token))
{
Console.WriteLine(item);
}
Real-World Examples
Example 1: Streaming API Results
public class ApiClient
{
private readonly HttpClient _httpClient;
public async IAsyncEnumerable<User> GetAllUsersAsync(
[EnumeratorCancellation] CancellationToken ct = default)
{
int page = 1;
bool hasMore = true;
while (hasMore)
{
var response = await _httpClient.GetFromJsonAsync<PagedResponse<User>>(
$"/api/users?page={page}", ct);
if (response?.Items == null || response.Items.Count == 0)
{
hasMore = false;
break;
}
foreach (var user in response.Items)
{
yield return user;
}
page++;
hasMore = response.HasNextPage;
}
}
}
// Usage
await foreach (var user in apiClient.GetAllUsersAsync())
{
Console.WriteLine($"Processing user: {user.Name}");
// Process users as they arrive, no need to load all at once!
}
Example 2: Database Streaming
public class UserRepository
{
private readonly DbContext _context;
public async IAsyncEnumerable<User> StreamActiveUsersAsync(
[EnumeratorCancellation] CancellationToken ct = default)
{
await using var command = _context.Database.GetDbConnection().CreateCommand();
command.CommandText = "SELECT * FROM Users WHERE IsActive = 1";
await _context.Database.OpenConnectionAsync(ct);
await using var reader = await command.ExecuteReaderAsync(ct);
while (await reader.ReadAsync(ct))
{
yield return new User
{
Id = reader.GetInt32(0),
Name = reader.GetString(1),
Email = reader.GetString(2)
};
}
}
}
// Usage - process millions of records without loading all into memory
await foreach (var user in repository.StreamActiveUsersAsync())
{
await ProcessUserAsync(user);
}
Example 3: Real-Time Events
public class EventMonitor
{
private readonly Channel<Event> _channel = Channel.CreateUnbounded<Event>();
public void PublishEvent(Event evt)
{
_channel.Writer.TryWrite(evt);
}
public async IAsyncEnumerable<Event> SubscribeAsync(
[EnumeratorCancellation] CancellationToken ct = default)
{
await foreach (var evt in _channel.Reader.ReadAllAsync(ct))
{
yield return evt;
}
}
}
// Producer
var monitor = new EventMonitor();
_ = Task.Run(async () =>
{
while (true)
{
var evt = await GetNextEventAsync();
monitor.PublishEvent(evt);
}
});
// Consumer
await foreach (var evt in monitor.SubscribeAsync())
{
Console.WriteLine($"Event: {evt.Type} at {evt.Timestamp}");
}
Advanced Patterns
Pattern 1: Transformation
public static async IAsyncEnumerable<TResult> SelectAsync<TSource, TResult>(
this IAsyncEnumerable<TSource> source,
Func<TSource, Task<TResult>> selector,
[EnumeratorCancellation] CancellationToken ct = default)
{
await foreach (var item in source.WithCancellation(ct))
{
yield return await selector(item);
}
}
// Usage
var numbers = GetNumbersAsync();
var squared = numbers.SelectAsync(async n =>
{
await Task.Delay(10);
return n * n;
});
await foreach (var n in squared)
{
Console.WriteLine(n);
}
Pattern 2: Filtering
public static async IAsyncEnumerable\<T\> WhereAsync\<T\>(
this IAsyncEnumerable\<T\> source,
Func<T, Task<bool>> predicate,
[EnumeratorCancellation] CancellationToken ct = default)
{
await foreach (var item in source.WithCancellation(ct))
{
if (await predicate(item))
{
yield return item;
}
}
}
// Usage
var users = GetUsersAsync();
var activeUsers = users.WhereAsync(async user =>
{
return user.IsActive && await IsValidSubscriptionAsync(user.Id);
});
Pattern 3: Batching
public static async IAsyncEnumerable<List\<T\>> BatchAsync\<T\>(
this IAsyncEnumerable\<T\> source,
int batchSize,
[EnumeratorCancellation] CancellationToken ct = default)
{
var batch = new List\<T\>(batchSize);
await foreach (var item in source.WithCancellation(ct))
{
batch.Add(item);
if (batch.Count >= batchSize)
{
yield return batch;
batch = new List\<T\>(batchSize);
}
}
if (batch.Count > 0)
{
yield return batch;
}
}
// Usage
await foreach (var batch in GetItemsAsync().BatchAsync(100))
{
await ProcessBatchAsync(batch);
}
Pattern 4: Throttling
public static async IAsyncEnumerable\<T\> ThrottleAsync\<T\>(
this IAsyncEnumerable\<T\> source,
TimeSpan interval,
[EnumeratorCancellation] CancellationToken ct = default)
{
var lastYield = DateTime.MinValue;
await foreach (var item in source.WithCancellation(ct))
{
var now = DateTime.UtcNow;
var elapsed = now - lastYield;
if (elapsed < interval)
{
await Task.Delay(interval - elapsed, ct);
}
yield return item;
lastYield = DateTime.UtcNow;
}
}
// Usage - rate limit to 1 item per second
await foreach (var item in GetItemsAsync().ThrottleAsync(TimeSpan.FromSeconds(1)))
{
await ProcessItemAsync(item);
}
Pattern 5: Merging Streams
public static async IAsyncEnumerable\<T\> MergeAsync\<T\>(
params IAsyncEnumerable\<T\>[] sources)
{
var channel = Channel.CreateUnbounded\<T\>();
var tasks = sources.Select(async source =>
{
await foreach (var item in source)
{
await channel.Writer.WriteAsync(item);
}
}).ToList();
_ = Task.Run(async () =>
{
await Task.WhenAll(tasks);
channel.Writer.Complete();
});
await foreach (var item in channel.Reader.ReadAllAsync())
{
yield return item;
}
}
// Usage - merge multiple streams
var stream1 = GetStream1Async();
var stream2 = GetStream2Async();
var merged = MergeAsync(stream1, stream2);
await foreach (var item in merged)
{
Console.WriteLine(item);
}
IAsyncEnumerable with LINQ
.NET provides async LINQ operations:
using System.Linq;
// Where
var filtered = source.Where(x => x.IsActive);
// Select
var transformed = source.Select(x => x.Name);
// Take
var first10 = source.Take(10);
// Skip
var afterFirst10 = source.Skip(10);
// ToListAsync (materialize)
var list = await source.ToListAsync();
// CountAsync
var count = await source.CountAsync();
// AnyAsync
var hasItems = await source.AnyAsync();
// FirstOrDefaultAsync
var first = await source.FirstOrDefaultAsync();
Channels + IAsyncEnumerable
Channels provide a producer/consumer pattern perfect for async streams:
public class MessageQueue
{
private readonly Channel<Message> _channel =
Channel.CreateBounded<Message>(new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait
});
public async Task ProduceAsync(Message message, CancellationToken ct = default)
{
await _channel.Writer.WriteAsync(message, ct);
}
public async IAsyncEnumerable<Message> ConsumeAsync(
[EnumeratorCancellation] CancellationToken ct = default)
{
await foreach (var message in _channel.Reader.ReadAllAsync(ct))
{
yield return message;
}
}
public void Complete()
{
_channel.Writer.Complete();
}
}
// Producer
var queue = new MessageQueue();
_ = Task.Run(async () =>
{
for (int i = 0; i < 100; i++)
{
await queue.ProduceAsync(new Message { Id = i });
await Task.Delay(100);
}
queue.Complete();
});
// Consumer
await foreach (var message in queue.ConsumeAsync())
{
Console.WriteLine($"Processing message {message.Id}");
}
Error Handling
public async IAsyncEnumerable<Result> ProcessWithErrorHandlingAsync(
[EnumeratorCancellation] CancellationToken ct = default)
{
await foreach (var item in GetItemsAsync(ct))
{
Result result;
try
{
result = await ProcessItemAsync(item);
}
catch (Exception ex)
{
// Log error but continue processing
Console.WriteLine($"Error processing {item}: {ex.Message}");
result = Result.Failed(item, ex.Message);
}
yield return result;
}
}
// Consumer with exception handling
try
{
await foreach (var result in ProcessWithErrorHandlingAsync(ct))
{
if (result.IsSuccess)
{
Console.WriteLine($"Success: {result.Value}");
}
else
{
Console.WriteLine($"Failed: {result.Error}");
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Operation cancelled");
}
catch (Exception ex)
{
Console.WriteLine($"Fatal error: {ex.Message}");
}
Performance Considerations
Buffer vs Unbounded
// ❌ Unbounded - can consume unlimited memory
var channel = Channel.CreateUnbounded<Data>();
// ✅ Bounded - backpressure when full
var channel = Channel.CreateBounded<Data>(new BoundedChannelOptions(1000)
{
FullMode = BoundedChannelFullMode.Wait // Producer waits
});
Async vs Sync Iteration
// ❌ Blocking in async stream
public async IAsyncEnumerable<Data> GetDataAsync()
{
foreach (var item in _syncCollection) // Synchronous!
{
yield return await ProcessAsync(item);
}
}
// ✅ Proper async iteration
public async IAsyncEnumerable<Data> GetDataAsync()
{
await foreach (var item in _asyncCollection) // Asynchronous!
{
yield return await ProcessAsync(item);
}
}
Common Pitfalls
// ❌ Don't forget [EnumeratorCancellation]
public async IAsyncEnumerable\<T\> GetDataAsync(CancellationToken ct)
{
// ct is not properly plumbed through!
}
// ✅ Use [EnumeratorCancellation]
public async IAsyncEnumerable\<T\> GetDataAsync(
[EnumeratorCancellation] CancellationToken ct = default)
{
// Now ct is properly connected
}
// ❌ Don't materialize unnecessarily
var list = await GetDataAsync().ToListAsync(); // Loads all into memory!
// ✅ Stream processing
await foreach (var item in GetDataAsync())
{
await ProcessAsync(item); // One at a time
}
// ❌ Don't use ConfigureAwait(false) with yield
public async IAsyncEnumerable\<T\> GetDataAsync()
{
yield return await GetItemAsync().ConfigureAwait(false); // Wrong!
}
// ✅ No ConfigureAwait in async iterators
public async IAsyncEnumerable\<T\> GetDataAsync()
{
yield return await GetItemAsync(); // Correct
}
Interview Questions
Q: What is IAsyncEnumerable<T> and when should you use it?
A: IAsyncEnumerable<T> is an asynchronous version of IEnumerable<T> that allows you to iterate over data that arrives asynchronously. Use it for streaming large datasets, pagination, real-time events, or any scenario where you want to process data as it becomes available without loading everything into memory.
Q: How do you properly handle cancellation in async streams?
A: Use the [EnumeratorCancellation] attribute on the CancellationToken parameter. This ensures the token is properly plumbed through when consumers use WithCancellation() or pass a token to the foreach loop.
Q: What's the difference between ToListAsync() and iterating with await foreach?
A: ToListAsync() materializes the entire stream into memory at once, while await foreach processes items one at a time as they arrive. Use ToListAsync() when you need all items in memory; use await foreach for streaming processing with lower memory usage.
Q: Can you use ConfigureAwait(false) in async iterator methods?
A: No, you shouldn't use ConfigureAwait(false) in methods with yield return. The compiler handles context properly for async iterators. Using ConfigureAwait(false) can cause issues with the state machine.
Exercises
See exercises/day7.md for hands-on practice with:
- Building async stream processors
- Implementing LINQ-style operators
- Working with Channels
- Error handling in streams
- Performance optimization