Back to "Building a Reusable Ephemeral Execution Library"

This is a viewer only at the moment see the article on how this works.

To update the preview hit Ctrl-Alt-R (or ⌘-Alt-R on Mac) or Enter to refresh. The Save icon lets you save the markdown file to disk

This is a preview from the server running through my markdig pipeline

Architecture ASP.NET Async DI Systems Design

Building a Reusable Ephemeral Execution Library

Friday, 12 December 2025

In Part 1: Fire and Don't Quite Forget, we explored the theory behind ephemeral execution - bounded, private, debuggable async workflows that remember just enough to be useful and then evaporate.

This article turns that pattern into a reusable library you can drop into any .NET project.

NUGET!!!

This is now in the mostlylucid.ephemerals Nuget package also more than 20 mostlylucid.ephemerals patterns and 'atoms'.

NuGet License

Source Files

The full source code is in the mostlylucid.atoms GitHub repository


Before and After

Here's what we're replacing:

// ❌ Before: Fire-and-forget black hole
_ = Task.Run(() => ProcessAsync(item));
// No visibility. No debugging. No idea if it worked.

// ❌ Or: Blocking everything
await ProcessAsync(item);  // Hope you like waiting...

And what we're building:

// ✅ After: Trackable, bounded, debuggable
await coordinator.EnqueueAsync(item);

// Instant visibility
Console.WriteLine($"Pending: {coordinator.PendingCount}");
Console.WriteLine($"Active: {coordinator.ActiveCount}");
Console.WriteLine($"Failed: {coordinator.TotalFailed}");

// Full operation history
var snapshot = coordinator.GetSnapshot();
var failures = coordinator.GetFailed();

Same async execution. Complete observability. No user data retained.


Quick Start

The most common pattern - register a coordinator in DI and inject it:

// Program.cs
services.AddEphemeralWorkCoordinator<TranslationRequest>(
    async (request, ct) => await TranslateAsync(request, ct),
    new EphemeralOptions { MaxConcurrency = 8 });

// Your service
public class TranslationService(EphemeralWorkCoordinator<TranslationRequest> coordinator)
{
    public async Task TranslateAsync(TranslationRequest request)
    {
        await coordinator.EnqueueAsync(request);
        // Returns immediately - work happens in background
    }

    public object GetStatus() => new
    {
        pending = coordinator.PendingCount,
        active = coordinator.ActiveCount,
        completed = coordinator.TotalCompleted,
        failed = coordinator.TotalFailed
    };
}

Which Variant Do I Need?

┌─────────────────────────────────────────────────────────────────┐
│                    DECISION TREE                                │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Processing a collection once?                                  │
│  └─► EphemeralForEachAsync<T> (ParallelEphemeral.cs)            │
│                                                                 │
│  Need a long-lived queue that accepts items over time?          │
│  └─► EphemeralWorkCoordinator<T>                                │
│                                                                 │
│  Need per-entity ordering (user commands, tenant jobs)?         │
│  └─► EphemeralKeyedWorkCoordinator<TKey, T>                     │
│                                                                 │
│  Need to capture results (fingerprints, summaries)?             │
│  └─► EphemeralResultCoordinator<TInput, TResult>                │
│                                                                 │
│  Need multiple coordinators with different configs?             │
│  └─► IEphemeralCoordinatorFactory<T> (like IHttpClientFactory)  │
│                                                                 │
│  Need dynamic concurrency adjustment at runtime?                │
│  └─► Set EnableDynamicConcurrency = true, call SetMaxConcurrency│
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

The Configuration Object

From EphemeralOptions.cs:

public sealed class EphemeralOptions
{
    // Concurrency control
    public int MaxConcurrency { get; init; } = Environment.ProcessorCount;
    public int MaxConcurrencyPerKey { get; init; } = 1;
    public bool EnableDynamicConcurrency { get; init; } = false;

    // Window management
    public int MaxTrackedOperations { get; init; } = 200;
    public TimeSpan? MaxOperationLifetime { get; init; } = TimeSpan.FromMinutes(5);

    // Fair scheduling (keyed coordinator)
    public bool EnableFairScheduling { get; init; } = false;
    public int FairSchedulingThreshold { get; init; } = 10;

    // Signal-reactive processing
    public IReadOnlySet<string>? CancelOnSignals { get; init; }
    public IReadOnlySet<string>? DeferOnSignals { get; init; }
    public int MaxDeferAttempts { get; init; } = 10;
    public TimeSpan DeferCheckInterval { get; init; } = TimeSpan.FromMilliseconds(100);

    // Signal infrastructure
    public SignalSink? Signals { get; init; }
    public SignalConstraints? SignalConstraints { get; init; }
    public Action<SignalEvent>? OnSignal { get; init; }

    // Async signal handling
    public Func<SignalEvent, CancellationToken, Task>? OnSignalAsync { get; init; }
    public int MaxConcurrentSignalHandlers { get; init; } = 4;
    public int MaxQueuedSignals { get; init; } = 1000;

    // Observability
    public Action<IReadOnlyCollection<EphemeralOperationSnapshot>>? OnSample { get; init; }
}

Key Design Decisions

  • MaxConcurrency defaults to CPU count - sensible for CPU-bound work. For I/O-bound work, increase it.
  • EnableDynamicConcurrency enables runtime adjustment via SetMaxConcurrency() - uses a custom gate instead of SemaphoreSlim.
  • CancelOnSignals/DeferOnSignals make coordinators signal-reactive - they respond to ambient system state (pattern matching supports */?/comma lists).
  • OnSignal is synchronous; for async fan-out use SignalDispatcher or AsyncSignalProcessor inside the handler.
  • SignalConstraints prevents infinite signal loops with cycle detection and depth limits.

The Snapshot Records

From Snapshots.cs:

public sealed record EphemeralOperationSnapshot(
    long Id,
    DateTimeOffset Started,
    DateTimeOffset? Completed,
    string? Key,
    bool IsFaulted,
    Exception? Error,
    TimeSpan? Duration,
    IReadOnlyList<string>? Signals = null,
    bool IsPinned = false)
{
    public bool HasSignal(string signal) => Signals?.Contains(signal) == true;
}

// For result-capturing coordinators
public sealed record EphemeralOperationSnapshot<TResult>(
    long Id,
    DateTimeOffset Started,
    DateTimeOffset? Completed,
    string? Key,
    bool IsFaulted,
    Exception? Error,
    TimeSpan? Duration,
    TResult? Result,
    bool HasResult,
    IReadOnlyList<string>? Signals = null,
    bool IsPinned = false);

This is metadata only. Notice what's not here:

  • No payload
  • No input data
  • No user content

Just enough to answer "what happened, when, and did it work?" - nothing more.


How This Compares to Other Approaches

.NET gives you several ways to do parallel work. Here's how the Ephemeral library compares:

Parallel.ForEachAsync (.NET 6+)

await Parallel.ForEachAsync(items,
    new ParallelOptions { MaxDegreeOfParallelism = 4 },
    async (item, ct) => await ProcessAsync(item, ct));

Best for: Simple parallel processing of collections where you don't need visibility.

What it lacks:

  • No operation tracking
  • No per-key sequential execution
  • No visibility into what's running

Use Ephemeral when: You need debugging/observability, per-key ordering, or signal-reactive processing.

TPL Dataflow

var block = new ActionBlock<T>(
    async item => await ProcessAsync(item),
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

foreach (var item in items)
    block.Post(item);

block.Complete();
await block.Completion;

Best for: Complex dataflow pipelines with branching, merging, batching.

What it does well:

  • Rich pipeline composition (link blocks together)
  • Built-in batching, transforming, broadcasting
  • Bounded capacity with back-pressure

Use TPL Dataflow when: You need complex pipeline topologies (fan-out, fan-in, conditional routing).

Use Ephemeral when: You need operation tracking, simpler API, or signal-reactive coordination.

System.Threading.Channels

var channel = Channel.CreateBounded<T>(100);

// Producer
foreach (var item in items)
    await channel.Writer.WriteAsync(item);
channel.Writer.Complete();

// Consumer (multiple workers)
var workers = Enumerable.Range(0, 4).Select(async _ =>
{
    await foreach (var item in channel.Reader.ReadAllAsync())
        await ProcessAsync(item);
});
await Task.WhenAll(workers);

Best for: Producer-consumer patterns where you control both sides.

What it does well:

  • Excellent performance
  • Back-pressure via bounded channels
  • Separation of producers and consumers

Use Channels when: You're building custom infrastructure and need maximum control.

Use Ephemeral when: You want operation tracking and observability without the boilerplate.

Polly

var policy = Policy
    .Handle<HttpRequestException>()
    .WaitAndRetryAsync(3, attempt => TimeSpan.FromSeconds(Math.Pow(2, attempt)));

await policy.ExecuteAsync(() => ProcessAsync(item));

Best for: Resilience policies (retry, circuit breaker, timeout) for individual operations.

Use Polly when: You need resilience around individual calls.

Use Ephemeral when: You need coordination across many operations with ambient awareness.

Combine them: Use Polly inside your Ephemeral work body for per-operation resilience.

MassTransit / NServiceBus

Best for: Distributed messaging across services with durable queues.

Use message buses when: Work must survive process restarts, span multiple services, or require guaranteed delivery.

Use Ephemeral when: Work is in-process, doesn't need durability, and you want lightweight observability.

Comparison Table

Approach Bounded Tracking Per-Key Signals Self-Cleaning Complexity
Parallel.ForEachAsync N/A Low
TPL Dataflow High
Channels Medium
Polly N/A N/A N/A Low
Background Services Medium
MassTransit/NServiceBus High
Ephemeral Library Low

EphemeralForEachAsync: The One-Shot Version

From ParallelEphemeral.cs:

// Simple parallel processing with tracking
await items.EphemeralForEachAsync(
    async (item, ct) => await ProcessAsync(item, ct),
    new EphemeralOptions { MaxConcurrency = 8 });

// With keyed execution (per-user sequential)
await commands.EphemeralForEachAsync(
    cmd => cmd.UserId,  // Key selector
    async (cmd, ct) => await ExecuteCommandAsync(cmd, ct),
    new EphemeralOptions
    {
        MaxConcurrency = 32,
        MaxConcurrencyPerKey = 1  // Sequential per user
    });

Why Keyed Pipelines Matter

Imagine processing user commands:

  • User A sends commands 1, 2, 3
  • User B sends commands 4, 5, 6

Without keying, these might execute as: 1, 4, 2, 5, 3, 6 - interleaved.

With MaxConcurrencyPerKey = 1:

  • User A's commands execute in order: 1 → 2 → 3
  • User B's commands execute in order: 4 → 5 → 6
  • But A and B can run in parallel

This is per-entity sequential, globally parallel - critical for systems where order matters within an entity.


The Work Coordinator: A Long-Lived Queue

From EphemeralWorkCoordinator.cs:

await using var coordinator = new EphemeralWorkCoordinator<TranslationRequest>(
    async (request, ct) => await TranslateAsync(request, ct),
    new EphemeralOptions
    {
        MaxConcurrency = 8,
        MaxTrackedOperations = 500,
        EnableDynamicConcurrency = true  // Allow runtime adjustment
    });

// Enqueue items over time
await coordinator.EnqueueAsync(new TranslationRequest("Hello", "es"));

// Check status anytime
Console.WriteLine($"Pending: {coordinator.PendingCount}");
Console.WriteLine($"Active: {coordinator.ActiveCount}");

// Get snapshots
var snapshot = coordinator.GetSnapshot();
var running = coordinator.GetRunning();
var failed = coordinator.GetFailed();
var completed = coordinator.GetCompleted();

// Control flow
coordinator.Pause();   // Stop pulling new work
coordinator.Resume();  // Continue

// Adjust concurrency at runtime (requires EnableDynamicConcurrency)
coordinator.SetMaxConcurrency(16);

// Pin important operations to survive eviction
coordinator.Pin(operationId);
coordinator.Unpin(operationId);
coordinator.Evict(operationId);

// When done
coordinator.Complete();
await coordinator.DrainAsync();

Continuous Streams with IAsyncEnumerable

await using var coordinator = EphemeralWorkCoordinator<Message>.FromAsyncEnumerable(
    messageStream,  // IAsyncEnumerable<Message>
    async (msg, ct) => await ProcessMessageAsync(msg, ct),
    new EphemeralOptions { MaxConcurrency = 16 });

await coordinator.DrainAsync();

The Keyed Coordinator: Per-Entity Pipelines

From EphemeralKeyedWorkCoordinator.cs:

await using var coordinator = new EphemeralKeyedWorkCoordinator<string, Command>(
    cmd => cmd.UserId,  // Key selector
    async (cmd, ct) => await ExecuteCommandAsync(cmd, ct),
    new EphemeralOptions
    {
        MaxConcurrency = 32,
        MaxConcurrencyPerKey = 1,      // Per-user sequential
        EnableFairScheduling = true,   // Prevent hot user starvation
        FairSchedulingThreshold = 10   // Reject if user has 10+ pending
    });

// TryEnqueue returns false if fair scheduling rejects
if (!coordinator.TryEnqueue(hotUserCommand))
{
    await DeferCommandAsync(hotUserCommand);
}

// Per-key visibility
var pendingForUser = coordinator.GetPendingCountForKey("user-123");
var opsForUser = coordinator.GetSnapshotForKey("user-123");

Result-Capturing Coordinators

From EphemeralResultCoordinator.cs:

await using var coordinator = new EphemeralResultCoordinator<SessionInput, SessionResult>(
    async (input, ct) =>
    {
        var fingerprint = await ComputeFingerprintAsync(input.Events, ct);
        return new SessionResult(fingerprint, input.Events.Length);
    },
    new EphemeralOptions { MaxConcurrency = 16 });

await coordinator.EnqueueAsync(session);
coordinator.Complete();
await coordinator.DrainAsync();

// Get just the results (no metadata)
var results = coordinator.GetResults();

// Get snapshots with results + metadata
var snapshots = coordinator.GetSnapshot();

// Get base snapshots without results (privacy-safe)
var baseSnapshots = coordinator.GetBaseSnapshot();

// Filter by success/failure
var successful = coordinator.GetSuccessful();
var failed = coordinator.GetFailed();

Concurrency Control

From ConcurrencyGates.cs:

The library provides two concurrency control mechanisms:

FixedConcurrencyGate (Default)

  • Backed by SemaphoreSlim
  • Optimal hot-path performance
  • Cannot be adjusted at runtime

AdjustableConcurrencyGate

  • Custom implementation with Queue<WaiterEntry>
  • Supports UpdateLimit() at runtime
  • Enabled via EnableDynamicConcurrency = true
// Dynamic concurrency adjustment
var coordinator = new EphemeralWorkCoordinator<T>(body,
    new EphemeralOptions
    {
        MaxConcurrency = 4,
        EnableDynamicConcurrency = true
    });

// Later, based on system load:
coordinator.SetMaxConcurrency(16);  // Scale up
coordinator.SetMaxConcurrency(2);   // Scale down

The Factory Pattern: Named Coordinators

From DependencyInjection.cs:

Like IHttpClientFactory, you can register named configurations:

// Registration
services.AddEphemeralWorkCoordinator<TranslationRequest>("fast",
    async (request, ct) => await FastTranslateAsync(request, ct),
    new EphemeralOptions { MaxConcurrency = 32 });

services.AddEphemeralWorkCoordinator<TranslationRequest>("accurate",
    async (request, ct) => await AccurateTranslateAsync(request, ct),
    new EphemeralOptions { MaxConcurrency = 4 });

// Usage
public class TranslationService(IEphemeralCoordinatorFactory<TranslationRequest> factory)
{
    private readonly EphemeralWorkCoordinator<TranslationRequest> _fast =
        factory.CreateCoordinator("fast");
    private readonly EphemeralWorkCoordinator<TranslationRequest> _accurate =
        factory.CreateCoordinator("accurate");
}

Factory Guarantees

  1. Same name = same instance - Calling CreateCoordinator("fast") twice returns the same coordinator
  2. Different names = different instances - "fast" and "accurate" get separate coordinators
  3. Lazy creation - Coordinators are only created when first requested
  4. Configuration validation - Requesting an unregistered name throws a helpful error

Signal Querying API

All coordinators provide optimised signal querying methods:

// Get all signals
var signals = coordinator.GetSignals();

// Filter by key (zero-allocation)
var userSignals = coordinator.GetSignalsByKey("user-123");

// Filter by time range
var recentSignals = coordinator.GetSignalsSince(DateTimeOffset.UtcNow.AddMinutes(-5));
var rangeSignals = coordinator.GetSignalsByTimeRange(from, to);

// Filter by signal name or pattern
var rateSignals = coordinator.GetSignalsByName("rate-limit");
var httpSignals = coordinator.GetSignalsByPattern("http.*");

// Check existence (short-circuits on first match)
if (coordinator.HasSignal("rate-limit"))
    await ThrottleAsync();

if (coordinator.HasSignalMatching("error.*"))
    await AlertAsync();

// Count signals efficiently (no allocation)
var totalSignals = coordinator.CountSignals();
var errorCount = coordinator.CountSignals("error");
var httpCount = coordinator.CountSignalsMatching("http.*");

Production Optimisations

Fast ID Generation

From EphemeralIdGenerator.cs:

internal static class EphemeralIdGenerator
{
    private static long _counter;
    private static readonly long _processStart = Environment.TickCount64;
    private static readonly int _processId = Environment.ProcessId;

    [MethodImpl(MethodImplOptions.AggressiveInlining)]
    public static long NextId()
    {
        var counter = Interlocked.Increment(ref _counter);

        // Combine counter with process-unique seed
        Span<byte> buffer = stackalloc byte[24];
        BitConverter.TryWriteBytes(buffer, _processStart);
        BitConverter.TryWriteBytes(buffer.Slice(8), _processId);
        BitConverter.TryWriteBytes(buffer.Slice(16), counter);

        return unchecked((long)XxHash64.HashToUInt64(buffer));
    }
}
  • Allocation-free (uses stackalloc)
  • Thread-safe (uses Interlocked.Increment)
  • Unique across processes (includes process ID)
  • Non-sequential (hash diffuses the counter)

Memory-Safe Long-Lived Operation

The coordinators don't store Task references - just counters:

private int _activeTaskCount;
private readonly TaskCompletionSource _drainTcs;

// In ExecuteItemAsync:
finally
{
    // Signal drain when last task completes AND channel iteration is done
    if (Interlocked.Decrement(ref _activeTaskCount) == 0 &&
        Volatile.Read(ref _channelIterationComplete))
    {
        _drainTcs.TrySetResult();
    }
}

Per-Key Lock Cleanup

The keyed coordinator automatically cleans up idle per-key semaphores:

private sealed class KeyLock(SemaphoreSlim gate, int maxCount)
{
    public SemaphoreSlim Gate { get; } = gate;
    public int MaxCount { get; } = maxCount;
    public long LastUsedTicks = Environment.TickCount64;
}

// Cleanup runs periodically, removes locks idle > 60 seconds

Complete Example

// Program.cs
var builder = WebApplication.CreateBuilder(args);

// Named coordinators
builder.Services.AddEphemeralWorkCoordinator<TranslationRequest>("fast",
    async (req, ct) => await FastTranslateAsync(req, ct),
    new EphemeralOptions { MaxConcurrency = 16 });

// Keyed coordinator for per-user commands
builder.Services.AddEphemeralKeyedWorkCoordinator<string, UserCommand>("commands",
    cmd => cmd.UserId,
    sp =>
    {
        var handler = sp.GetRequiredService<ICommandHandler>();
        return async (cmd, ct) => await handler.HandleAsync(cmd, ct);
    },
    new EphemeralOptions
    {
        MaxConcurrency = 32,
        MaxConcurrencyPerKey = 1,
        EnableFairScheduling = true,
        CancelOnSignals = new HashSet<string> { "system-overload" }
    });

var app = builder.Build();
// Controller
[ApiController]
[Route("api")]
public class WorkController : ControllerBase
{
    private readonly EphemeralWorkCoordinator<TranslationRequest> _translator;
    private readonly EphemeralKeyedWorkCoordinator<string, UserCommand> _commands;

    public WorkController(
        IEphemeralCoordinatorFactory<TranslationRequest> translationFactory,
        IEphemeralKeyedCoordinatorFactory<string, UserCommand> commandFactory)
    {
        _translator = translationFactory.CreateCoordinator("fast");
        _commands = commandFactory.CreateCoordinator("commands");
    }

    [HttpPost("translate")]
    public async Task<IActionResult> Translate([FromBody] TranslationRequest request)
    {
        await _translator.EnqueueAsync(request);
        return Ok(new { pending = _translator.PendingCount });
    }

    [HttpPost("command")]
    public IActionResult SubmitCommand([FromBody] UserCommand command)
    {
        if (!_commands.TryEnqueue(command))
            return StatusCode(429, "Too many pending commands for this user");
        return Ok();
    }

    [HttpGet("status")]
    public IActionResult GetStatus() => Ok(new
    {
        translator = new
        {
            pending = _translator.PendingCount,
            active = _translator.ActiveCount,
            completed = _translator.TotalCompleted,
            failed = _translator.TotalFailed,
            hasRateLimit = _translator.HasSignal("rate-limit")
        },
        commands = new
        {
            pending = _commands.PendingCount,
            active = _commands.ActiveCount,
            errorCount = _commands.CountSignalsMatching("error.*")
        }
    });
}

Conclusion

We've built a complete ephemeral execution library with:

  1. EphemeralForEachAsync - One-shot parallel processing with tracking
  2. EphemeralWorkCoordinator - Long-lived observable queues
  3. EphemeralKeyedWorkCoordinator - Per-entity sequential execution with fair scheduling
  4. EphemeralResultCoordinator - Result-capturing variant
  5. Factory pattern - Named configurations like IHttpClientFactory
  6. Dynamic concurrency - Runtime adjustment of parallelism
  7. Signal infrastructure - Built-in signal emission and querying

The pattern sits in a sweet spot:

  • More observable than Parallel.ForEachAsync
  • Simpler than TPL Dataflow
  • More integrated than raw Channels
  • Privacy-safe by design

Fire... and Don't Quite Forget.


logo

© 2025 Scott Galloway — Unlicense — All content and source code on this site is free to use, copy, modify, and sell.