Back to "threecluded.ephemeral.full; дивна система шаблонів систем у кеші LRU."

This is a viewer only at the moment see the article on how this works.

To update the preview hit Ctrl-Alt-R (or ⌘-Alt-R on Mac) or Enter to refresh. The Save icon lets you save the markdown file to disk

This is a preview from the server running through my markdig pipeline

mostlylucid-ephemeral

threecluded.ephemeral.full; дивна система шаблонів систем у кеші LRU.

Sunday, 14 December 2025

Ну, це було моєю одержимістю протягом минулого тижня. Див. попередні частини і те, що призвело до цього; " Що, якщо LRU був контекстом виконання. ." Тепер це набір з 30 пакетів Nuget, що містять найважливіші послідовні шаблони виконання (у пакунках TYNY 5- 10 рядків). Отримати вражаючі адаптивні можливості за допомогою синтаксису SIMPLE!

Знайдіть джерело тут: https: // gitub.com/scottgal/ methulcid.atoms/blob/ main/ metholcid. ephemeral/src/ methulcid. ephemeral. comphermeral. ephemeral. Complete

Поперці

Читати [Попередня частина сигналів ]Для того, щоб краще зрозуміти його використання. Перед вами - Readme. md з пакунку maculicd.ephemeral. Complete pacakage, у якій міститься як ядро, що в основному складається з. ephemeral пакунку, так і всі шаблони, 'atoms' (coordinators тощо) у одному зручному DLLL.

Ядро

АБО використовувати ядро threecluded.ephemeral DINY (буквально 10 класів), що надає вам всі функціональні можливості.

Атрибути & DI

АБО, якщо ви бажаєте, щоб всі атрибути, засновані на асинхронічному маршруті з простим [EphemeralJob] і сервіс. Додайте щеПерелік стилю Coordinator} використовує значення пакунок threelucid.ephemeral.attributes.

Це, ймовірно, тема мого блогу, що йде вперед ... вас попередили ♫

usiculid.Ephemeral.Comlete

NuGet

Вся Ефемераль в одному ДЛТ - обмеженою асинхронною виконанням з координацією сигналів.

dotnet add package mostlylucid.ephemeral.complete

За допомогою цього пакунка можна зібрати всі коди ядра, атома і шаблонів до однієї збірки. Для окремих пакунків ви можете ознайомитися з посиланнями у кожному з них. секція внизу.


Зміст


Швидкий запуск

using Mostlylucid.Ephemeral;

// Long-lived work coordinator
await using var coordinator = new EphemeralWorkCoordinator<WorkItem>(
    async (item, ct) => await ProcessAsync(item, ct),
    new EphemeralOptions { MaxConcurrency = 8 });

await coordinator.EnqueueAsync(new WorkItem("data"));

// One-shot parallel processing
await items.EphemeralForEachAsync(
    async (item, ct) => await ProcessAsync(item, ct),
    new EphemeralOptions { MaxConcurrency = 8 });

Реєстрація служби

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddCoordinator<WorkItem>(
    async (item, ct) => await ProcessAsync(item, ct),
    new EphemeralOptions { MaxConcurrency = 8, MaxTrackedOperations = 128 });

builder.Services.AddEphemeralSignalJobRunner<LogWatcherJobs>();

var app = builder.Build();
app.MapPost("/", async ([FromServices] IEphemeralCoordinatorFactory<WorkItem> factory, WorkItem item) =>
{
    var coordinator = factory.CreateCoordinator();
    await coordinator.EnqueueAsync(item);
    return Results.Accepted();
});

await app.RunAsync();

Знайомі services.AddCoordinator<T>() Допомоги і AddEphemeralSignalJobRunner<T>() підтримувати стислу реєстрацію служб, дозволити DI мати власником раковини/ запуску, а також зробити нові повідомлення про відповідальність/ кеш/ ведення журналу одним клацанням лівою кнопкою миші.

Завдання, пов' язані з атрибутами

mostlylucid.ephemeral.complete вузли mostlylucid.ephemeral.attributes, отже, трубопроводи атрибутів є частиною ядра поверхня. Вважайте бігуна споживачем сигналу першого класу: прикрашені методи приєднуються до того самого кешування, лісозаготівлі і прилипання історій, і кожен з атрибутів може бути оголошено Priority, рівень завдання MaxConcurrency, Lane, Key джерела, сигнал Випромінювання, перевищення пін/експірів і резервування.

Позначки атрибутів ключа:

  • Впорядкування & ліній: Використання Priority, MaxConcurrency, і Lane для збереження роботи у детермінованому порядку під час гарячих шляхів Залишайтеся окремо.
  • & Теґування за ключем: OperationKey, KeyFromSignal, KeyFromPayload, і [KeySource] допомога у груповій роботі з значущі ключі для ведення журналу, справедливого планування і діагностики.
  • Пересилання & повторень: Pin, ExpireAfterMs, AwaitSignals, MaxRetries, і RetryDelayMs Дозволити розширення обробників Їхня видимість, страта воріт, поки не з'являться залежності, і загоїться від рецидивів під час виголошення сигналів про невдачу.
  • Хореографія сигналу: Еміт EmitOnStart, EmitOnComplete, і EmitOnFailure для сигнальних спускових стадій, log Спостерігачі, або інші координатори без ручних дротів.
var sink = new SignalSink();
await using var runner = new EphemeralSignalJobRunner(sink, new[] { new LogWatcherJobs(sink) });

var loggerFactory = LoggerFactory.Create(builder =>
{
    builder.AddConsole();
    builder.AddProvider(new SignalLoggerProvider(new TypedSignalSink<SignalLogPayload>(sink)));
});

var logger = loggerFactory.CreateLogger("orders");
logger.LogError(new EventId(1001, "DbFailure"), "Order store failed");

// Later tasks or other services can also raise watcher-friendly signals directly:
sink.Raise("log.error.orders.dbfailure", key: "orders");
public sealed class LogWatcherJobs
{
    private readonly SignalSink _sink;

    public LogWatcherJobs(SignalSink sink) => _sink = sink;

    [EphemeralJob("log.error.*", Priority = 1, MaxConcurrency = 2, Lane = "hot:4", EmitOnComplete = new[] { "incident.created" })]
    public Task EscalateAsync(SignalEvent signal)
    {
        Console.WriteLine($"escalating {signal.Signal} for {signal.Key}");
        _sink.Raise("incident.created", key: signal.Key);
        return Task.CompletedTask;
    }

    [EphemeralJob("incident.created", EmitOnStart = new[] { "incident.monitor.start" })]
    public Task NotifyAsync(SignalEvent signal)
    {
        Console.WriteLine($"notified incident for {signal.Key}");
        return Task.CompletedTask;
    }
}

Цей інструмент запуску тепер сидить при запуску і реагує кожного разу, log.error.* або будь- який випромінений сигнал влучає у раковину. Атрибут Обробники також можуть читати ключі від сигналів/ сплат, роботи з шпильками, аж доки не зменшаться затримки, не вивільняються сигнали завершення/ знешкодження, і slot у смуги для впорядкування. Для першого налаштування DI використовується services.AddEphemeralSignalJobRunner<T>() (або область видимості варіант) так, що місткість керує бігуном і раковиною.

[EphemeralJobs} Сингнальний Prefix = "сцена", DefaultLanne = "pipeline"] публічні закриті завдання класу { [EphemeralJob} ingest ," EmitOnComple = new[] { "сцена. ingest.done"}] публічна задача IngestAsync} Синхронізація = > Консоль.Out.WriteLineAync=event.Signal);

[EphemeralJob("finalize")]
public Task FinalizeAsync(SignalEvent evt) => Console.Out.WriteLineAsync("final stage");

}

var actureSink = new SignSink}); Очікування на використання var tagRunner = new EphemeralSignalJobRunner} tagSink, new[] }); grepSink. THE} tag. ingest";

Покладатися на важкі pin завдання ResponsibilitySignalManager.PinUntilQueried (типовий шаблон шапки responsibility.ack.*) до залишати свої операції видимими до того часу, доки інструмент читання з нижньою частиною не отримає вантажу, у той час як OperationEchoMaker/ OperationEchoAtom Продовжувати потік кінцевих сигналів, так що слухи або молекули можуть все ще мати останній стан, навіть після Атом умирає.

Заплановані завдання

mostlylucid.ephemeral.complete також містить mostlylucid.ephemeral.atoms.scheduledtasksВизначити cron або JSON розклади за допомогою ScheduledTaskDefinition (cron, сигнал, необов' язкове key, payload, description, timeZone, format, runOnStartupтощо) і ScheduledTasksAtom вдосконалити тривалу працю через DurableTaskAtom. Кожну заплановану роботу підвищує налаштований сигнал у вікні координатора, отже він успадковує хронологію, лісозаготівлю та семантику відповідальності в той час як ваші молекули або трубопроводи з атрибутами реагують на випромінену сигнальну хвилю.

Кожні DurableTask містить розклад Name, Signal, необов' язковий Key, навіть набране Payload, і Description, так що вниз по стрічці слухачі негайно знають, яке завдання виконано і які метадані (файли, адреси URL тощо) слід використати. Виклик DurableTaskAtom.WaitForIdleAsync() якщо ви просто бажаєте зачекати на поточний вибух запланованої роботи, щоб завершити його без завершення атому, у такому випадку планувальник буде готовий до наступного кроку cron.

Журналювання і сигнали

mostlylucid.ephemeral.logging дзеркальні дані Microsoft. Extensions. Запис до сигналів і навпаки. Почати з долучення SignalLoggerProvider на ваш завод ведення журналу, отже кількість подій у журналі зростає log.* сигнали і гачок SignalToLoggerAdapter if ви хочете, щоб сигнали повертались назад до стандартного трубопроводу журналу.

var sink = new SignalSink();
var typedSink = new TypedSignalSink<SignalLogPayload>(sink);

using var loggerFactory = LoggerFactory.Create(builder =>
{
    builder.AddConsole();
    builder.AddProvider(new SignalLoggerProvider(typedSink));
});

using var watcher = new EphemeralSignalJobRunner(sink, new[] { new LogWatcherJobs(sink) });

var logger = loggerFactory.CreateLogger("orders");
logger.LogError(new EventId(1001, "DbFailure"), "Order store failed");
public sealed class LogWatcherJobs
{
    private readonly SignalSink _sink;

    public LogWatcherJobs(SignalSink sink) => _sink = sink;

    [EphemeralJob("log.error.*")]
    public Task EscalateAsync(SignalEvent signal)
    {
        _sink.Raise("incident.created", key: signal.Key);
        return Task.CompletedTask;
    }

    [EphemeralJob("incident.created")]
    public Task NotifyAsync(SignalEvent signal)
    {
        Console.WriteLine($"Incident for {signal.Key}");
        return Task.CompletedTask;
    }
}

Користування SignalToLoggerAdapter для віддзеркалення отриманих сигналів назад до стандартних журналів так, щоб ваш стек моніторинга бачив обидва Сторони мосту.


Core Coordinators

Пакунок: threecluded.ephemeral

EphemeralWorkCoordinator<T>

Робоча черга, що триває довгий час, з обмеженою послідовністю і видимим вікном.

await using var coordinator = new EphemeralWorkCoordinator<Request>(
    async (req, ct) => await HandleAsync(req, ct),
    new EphemeralOptions
    {
        MaxConcurrency = 8,
        MaxTrackedOperations = 200,
        MaxOperationLifetime = TimeSpan.FromMinutes(5)
    });

await coordinator.EnqueueAsync(request);

// Observe state
var running = coordinator.GetRunning();
var failed = coordinator.GetFailed();
var pending = coordinator.PendingCount;

// Graceful shutdown
coordinator.Complete();
await coordinator.DrainAsync();

EphemeralKeywordCoordinator<TKey, T>

Послідовна обробка клавіш - елементи з однаковими ключами оброблені за порядком.

await using var coordinator = new EphemeralKeyedWorkCoordinator<Order, string>(
    order => order.CustomerId,  // Key selector
    async (order, ct) => await ProcessOrder(order, ct),
    new EphemeralOptions
    {
        MaxConcurrency = 16,      // Total parallel
        MaxConcurrencyPerKey = 1  // Sequential per customer
    });

await coordinator.EnqueueAsync(order);

EphemeraalResultCoordinator<TInput, Trysult>

Захоплення результатів асинхронних дій.

await using var coordinator = new EphemeralResultCoordinator<Request, Response>(
    async (req, ct) => await FetchAsync(req, ct),
    new EphemeralOptions { MaxConcurrency = 4 });

var id = await coordinator.EnqueueAsync(request);
var snapshot = await coordinator.WaitForResult(id);
if (snapshot.HasResult)
    Console.WriteLine(snapshot.Result);

PriorityWorkCoordinator<T>

Смужки з декількома пріоритетами з можливістю конфігурації на смугу.

var coordinator = new PriorityWorkCoordinator<WorkItem>(
    async (item, ct) => await ProcessAsync(item, ct),
    new PriorityWorkCoordinatorOptions<WorkItem>(
        Lanes: new[] { new PriorityLane("high"), new PriorityLane("normal"), new PriorityLane("low") }
    ));

await coordinator.EnqueueAsync(item, "high");

Налаштування (Ефемеральні параметри)

new EphemeralOptions
{
    // Concurrency
    MaxConcurrency = 8,                    // Max parallel operations
    MaxConcurrencyPerKey = 1,              // For keyed coordinators
    EnableDynamicConcurrency = false,      // Allow runtime adjustment

    // Memory
    MaxTrackedOperations = 200,            // Window size (LRU eviction)
    MaxOperationLifetime = TimeSpan.FromMinutes(5),

    // Fair scheduling (keyed only)
    EnableFairScheduling = false,          // Prevent hot key starvation
    FairSchedulingThreshold = 10,

    // Signals
    Signals = sharedSink,                  // Shared signal sink
    OnSignal = evt => { },                 // Sync callback
    OnSignalAsync = async (evt, ct) => { }, // Async callback
    CancelOnSignals = new HashSet<string> { "circuit-open" },
    DeferOnSignals = new HashSet<string> { "backpressure" },
    DeferCheckInterval = TimeSpan.FromMilliseconds(100),
    MaxDeferAttempts = 50,

    // Signal handler limits
    MaxConcurrentSignalHandlers = 4,
    MaxQueuedSignals = 1000
}

Сигнали

Операції випромінюють сигнали для перетинної економії.

// Query signals
bool hasError = coordinator.HasSignal("error");
int count = coordinator.CountSignals("error");
var errors = coordinator.GetSignalsByPattern("error.*");

// Shared sink across coordinators
var sink = new SignalSink();
var c1 = new EphemeralWorkCoordinator<A>(body, new EphemeralOptions { Signals = sink });
var c2 = new EphemeralWorkCoordinator<B>(body, new EphemeralOptions { Signals = sink });
sink.Raise("system.busy");  // Both see it

Сигнали відповідальності і завершення

Чи потрібно тримати результати видимими лише так довго, щоб споживачі знизилися? ResponsibilitySignalManager дає вам змогу приклеїти операція, доки не з' являється сигнал ack (типовий шаблон responsibility.ack.* з key=operationId.) Подбайте про обов'язок необов' язкове description так, щоб операція могла описати свою відповідальність, і встановити maxPinDuration граційно Самоочевидно, якщо споживач ніколи не з'явиться.

var manager = new ResponsibilitySignalManager(coordinator, sink, maxPinDuration: TimeSpan.FromMinutes(5));
if (manager.PinUntilQueried(operationId, "file.ready", ackKey: fileId, description: "Awaiting fetch"))
{
    sink.Raise("file.ready", key: fileId);
}
// Consumer acknowledges the work
sink.Raise("file.ready.ack", key: fileId);
using Mostlylucid.Ephemeral.Patterns;

var notes = new LastWordsNoteAtom(async note => await noteRepository.SaveAsync(note));
coordinator.OperationFinalized += snapshot =>
{
    var note = new LastWordsNote(
        OperationId: snapshot.OperationId,
        Key: snapshot.Key,
        Signal: snapshot.Signals?.FirstOrDefault(),
        Timestamp: DateTimeOffset.UtcNow);

    _ = notes.EnqueueAsync(note);
};

LastWordsNote залишається крихітним (ідентифікаційний ідентифікатор, ключ, сигнал, часовий штамп), отже, ви можете записувати будь- який з мінімальних станів, які вам не потрібні приблизно перед збиранням операції.

Координатор також зберігає короткочасне відлуння останніх сигналів (потрібно через EnableOperationEcho) що можеш перевірити за допомогою GetEchoes() коли вам потрібно повторити обрізану сигналну хвилю без повної дії.

var recentErrors = coordinator.GetEchoes(pattern: "error.*")
    .Where(e => e.Timestamp > DateTimeOffset.UtcNow - TimeSpan.FromMinutes(1))
    .ToList();

if (recentErrors.Any())
    logger.LogWarning("Trimmed errors: {Count}", recentErrors.Count);

OperationEchoRetention і OperationEchoCapacity Нехай ви збалансуєте кількість відлуння, яке ви тримаєте, і тривалість, яку вони затримують, Отже, ви можете повторити останні слова, які достатньо довгі, щоб розпочати діагностику.

Керування автоматично розмикається, коли стрілка горить, але ви можете назвати CompleteResponsibility(operationId) завершити Відповідальність на початку (наприклад, під час повторних спроб). Операції все ще збільшуються OperationFinalized коли вікно ріже їх, так Підпишіться, якщо хочете випускати кінцевий сигнал, діагностувати журнал або запустити останні слова.


Атоми (збудовані блоки)

Фіксована робоча сила

**Пакунок: ** threecluded.ephemeral.atoms.fixedwork

Фіксований набір працівників за допомогою статистики. Мінімальна обгортка API навколо EphemeralWorkCoordinator.

using Mostlylucid.Ephemeral.Atoms.FixedWork;

await using var atom = new FixedWorkAtom<WorkItem>(
    async (item, ct) => await ProcessAsync(item, ct),
    maxConcurrency: 4,
    maxTracked: 200);

await atom.EnqueueAsync(item);

// Get stats
var (pending, active, completed, failed) = atom.Stats();
Console.WriteLine($"Completed: {completed}, Failed: {failed}");

// Get recent operations
var snapshot = atom.Snapshot();

// Graceful shutdown
await atom.DrainAsync();

Клавіатральна атмосфера

**Пакунок: ** method.ephemeral.atoms. keyedsequential

Процедура послідовної обробки клавіш з необов' язковим справедливим плануванням.

using Mostlylucid.Ephemeral.Atoms.KeyedSequential;

await using var atom = new KeyedSequentialAtom<Order, string>(
    keySelector: order => order.CustomerId,
    body: async (order, ct) => await ProcessOrder(order, ct),
    maxConcurrency: 16,
    perKeyConcurrency: 1,           // Sequential per key
    enableFairScheduling: true);    // Prevent hot key starvation

await atom.EnqueueAsync(order1);  // Customer A
await atom.EnqueueAsync(order2);  // Customer A - waits for order1
await atom.EnqueueAsync(order3);  // Customer B - parallel with A

var (pending, active, completed, failed) = atom.Stats();
await atom.DrainAsync();

SignAwareAtom

**Пакунок: ** threelucid.ephemeral.atoms.signalaware

Призупинити або скасувати присвоювання на основі сигналів з амнітивом.

using Mostlylucid.Ephemeral.Atoms.SignalAware;

var sink = new SignalSink();

await using var atom = new SignalAwareAtom<WorkItem>(
    async (item, ct) => await ProcessAsync(item, ct),
    cancelOn: new HashSet<string> { "shutdown", "circuit-open" },
    deferOn: new HashSet<string> { "backpressure.*" },
    deferInterval: TimeSpan.FromMilliseconds(100),
    maxDeferAttempts: 50,
    signals: sink,
    maxConcurrency: 8);

// Enqueue work
await atom.EnqueueAsync(item);

// Raise ambient signals
atom.Raise("backpressure.downstream");  // New items defer
sink.Raise("shutdown");                  // New items rejected (returns -1)

await atom.DrainAsync();

Пакетна атмосфера

**Пакунок: ** threelucid.ephemeral.atoms.batching

Збирати елементи на пакети за розміром або інтервалом часу.

using Mostlylucid.Ephemeral.Atoms.Batching;

await using var atom = new BatchingAtom<LogEntry>(
    onBatch: async (batch, ct) =>
    {
        Console.WriteLine($"Flushing {batch.Count} entries");
        await FlushToDatabase(batch, ct);
    },
    maxBatchSize: 100,
    flushInterval: TimeSpan.FromSeconds(5));

// Items are batched automatically
atom.Enqueue(new LogEntry("User logged in"));
atom.Enqueue(new LogEntry("Request received"));
// ... batch flushes when full OR after 5 seconds

Повторна спроба

Пакунок: threelucid.ephemeral.atoms.retrie

Експонентна обгортка зворотного зриву.

using Mostlylucid.Ephemeral.Atoms.Retry;

await using var atom = new RetryAtom<ApiRequest>(
    async (req, ct) => await CallExternalApi(req, ct),
    maxAttempts: 3,
    backoff: attempt => TimeSpan.FromMilliseconds(100 * Math.Pow(2, attempt)),
    maxConcurrency: 4);

// Automatically retries on failure with exponential backoff
// Attempt 1: immediate
// Attempt 2: 200ms delay
// Attempt 3: 400ms delay
await atom.EnqueueAsync(new ApiRequest("https://api.example.com"));

await atom.DrainAsync();

Атоми зберігання даних

Пакунок: threelucid.ephemeral.atoms. data

Спільні налаштування для атомів зберігання (DataStorageConfig, IDataStorageAtom<TKey, TValue>) Крім тих конвенцій з сигналами, які надсилають файли, SQLite і програми адаптації PostgreSQL.

using Mostlylucid.Ephemeral.Atoms.Data;
using Mostlylucid.Ephemeral.Atoms.Data.File;

var sink = new SignalSink();
var config = new DataStorageConfig
{
    DatabaseName = "orders",
    SignalPrefix = "save.data",
    LoadSignalPrefix = "load.data",
    DeleteSignalPrefix = "delete.data",
    MaxConcurrency = 1
};

await using var storage = new FileDataStorageAtom<string, Order>(sink, config, "./orders");

storage.EnqueueSave("order-123", new Order { Id = "order-123", Total = 42.00m });
var loaded = await storage.LoadAsync("order-123");

Використовувати те саме DataStorageConfig на Mostlylucid.Ephemeral.Atoms.Data.Sqlite або Mostlylucid.Ephemeral.Atoms.Data.Postgres Реалізація довготривалої, орієнтованої на сигналізацію, яка підтримується SQLite/Postgres. Завдання атрибутів можуть бути підписані на saved.data.{dbname} сигнальні сигнали, які слід виштовхнути з системи, поки load.data.{dbname} викликає кеш гідратів.


МолекулаRunner і AtomTiger

**Пакунок: ** throcid.ephemeral.atoms.moleculles

Blueprints, складені з MoleculeBlueprintBuilder Нехай ви визначаєте атоми (плата, інвентаризація, доставка, сповіщення), яке має виконуватися, якщо сигнал на зразок order.placed приходить. MoleculeRunner очікувати на гачок шаблон, створює спільний ресурс MoleculeContext, і виконує кожен крок під час підписки на події початку/ завершення. Скористайтеся AtomTrigger коли сигнал одного атома повинен починати інший координатор або молекула.

var sink = new SignalSink();
var blueprint = new MoleculeBlueprintBuilder("order", "order.placed")
    .AddAtom(async (ctx, ct) => await paymentCoordinator.EnqueueAsync(ctx.TriggerSignal.Key!, ct))
    .AddAtom(async (ctx, ct) =>
    {
        ctx.Raise("order.payment.complete", ctx.TriggerSignal.Key);
        await inventoryCoordinator.EnqueueAsync(ctx.TriggerSignal.Key!, ct);
    })
    .Build();

await using var runner = new MoleculeRunner(sink, new[] { blueprint }, serviceProvider);
using var trigger = new AtomTrigger(sink, "order.payment.complete", async (signal, ct) =>
{
    await notificationCoordinator.EnqueueAsync(signal.Key!, ct);
});

sink.Raise("order.placed", key: "order-42");

Молекули можуть викликати додаткові сигнали (ctx.Raise("order.shipping.start")) так що решта системи бере Кусочек.


SlidgeCacheAtom

**Пакунок: ** threecluded.ephemeral.atoms.slidingcache

Кеш з просковзанням - доступ до результату відновлює його TTL.

using Mostlylucid.Ephemeral.Atoms.SlidingCache;

await using var cache = new SlidingCacheAtom<string, UserProfile>(
    async (userId, ct) => await LoadUserProfileAsync(userId, ct),
    slidingExpiration: TimeSpan.FromMinutes(5),
    absoluteExpiration: TimeSpan.FromHours(1),
    maxSize: 1000);

// First call: computes and caches
var profile = await cache.GetOrComputeAsync("user-123");

// Second call within 5 minutes: returns cached, resets TTL
var cached = await cache.GetOrComputeAsync("user-123");

// Try get without computation (still resets TTL on hit)
if (cache.TryGet("user-123", out var profile))
    Console.WriteLine(profile.Name);

// Get stats
var stats = cache.GetStats();
Console.WriteLine($"Entries: {stats.TotalEntries}, Hot: {stats.HotEntries}");

EphemeralLruCache

Пакунок: core (mostlylucid.ephemeral) } Кеш самооптимізації з ковзанням TTL на кожен удар і розширений TTL від гарячі ключі.

using Mostlylucid.Ephemeral;

var cache = new EphemeralLruCache<string, Widget>(new EphemeralLruCacheOptions
{
    DefaultTtl = TimeSpan.FromMinutes(5),
    HotKeyExtension = TimeSpan.FromMinutes(30),
    HotAccessThreshold = 3,
    MaxSize = 10_000,
    SampleRate = 5 // emit 1 in 5 signals
});

var widget = await cache.GetOrAddAsync("widget:42", async key =>
{
    var data = await LoadWidgetAsync(key);
    return data!;
});

// Stats and signals to see how the cache self-focuses on hot keys
var stats = cache.GetStats();              // hot/expired counts, size
var signals = cache.GetSignals("cache.*"); // cache.hot/evict/miss/hit

Підказка: MemoryCache може бути налаштовано на завершення просування, але він ніколи не випромінює гарячі/ холодні сигнали або розширює TTL гарячих ключів. EphemeralLruCache є типовим способом оптимізації у пакунку ядра (і у SqliteSingleWriter) кожного разу, коли ви бажаєте, щоб кеш зосереджувався на активному робочому наборі.

Відлуння від виробника

Пакунок: threelucid.ephemeral.atoms.echo

Атом тримає обмежене вікно сигналу. Об' єм вантажів (відповідність) ActivationSignalPattern / CaptureSignalPattern) і коли OperationFinalized призводить до пожежі OperationEchoEntry<TPayload> записи, які ви можете продовжувати за допомогою OperationEchoAtom<TPayload>.

var sink = new SignalSink();
var typedSink = new TypedSignalSink<EchoPayload>(sink);
var echoAtom = new OperationEchoAtom<EchoPayload>(async echo => await repository.AppendAsync(echo));

await using var coordinator = new EphemeralWorkCoordinator<JobItem>(ProcessAsync);
using var maker = coordinator.EnableOperationEchoing(
    typedSink,
    echoAtom,
    new OperationEchoMakerOptions<EchoPayload>
    {
        ActivationSignalPattern = "echo.capture",
        CaptureSignalPattern = "echo.*",
        MaxTrackedOperations = 128
    });

typedSink.Raise("echo.capture", new EchoPayload("order-1", "archived"), key: "order-1");

Завдання атрибута просто підвищує введений сигнал з будь- якого стану, який вони вважають критичним, і виробник зберігає робочий набір Обмежений, поки ви наполягаєте на ехо.


Шаблони (завершене використання)

SignalBasedCircuitBreaker

**Пакунок: ** threecluded.ephemeral. patterns.circuitbreaker

Безстроковий вимикач об' єктів з використанням вікна історії сигналів.

using Mostlylucid.Ephemeral.Patterns.CircuitBreaker;

var breaker = new SignalBasedCircuitBreaker(
    failureSignal: "api.failure",
    threshold: 5,
    windowSize: TimeSpan.FromSeconds(30));

// Check before making calls
if (breaker.IsOpen(coordinator))
{
    var retryAfter = breaker.GetTimeUntilClose(coordinator);
    throw new CircuitOpenException("Too many failures", retryAfter);
}

// Pattern matching variant
if (breaker.IsOpenMatching(coordinator, "error.*"))
    throw new CircuitOpenException("Error pattern detected");

// Get current failure count
int failures = breaker.GetFailureCount(coordinator);

SignDrivenBackpressure

**Пакунок: ** threecluded.ephemeral. patterns. backuppressure

Керування глибиною черги з автоматичним відкладанням на сигналах зворотного друку.

using Mostlylucid.Ephemeral.Patterns.Backpressure;

var sink = new SignalSink();

await using var coordinator = SignalDrivenBackpressure.Create<WorkItem>(
    async (item, ct) => await ProcessAsync(item, ct),
    sink,
    maxConcurrency: 4);

// Enqueue work
await coordinator.EnqueueAsync(item);

// When downstream is slow
sink.Raise("backpressure.downstream");  // New work auto-defers

// When recovered
sink.Retract("backpressure.downstream"); // Work resumes

РегульованийFanOut

**Пакунок: ** threecluded.ephemeral. patterns. controlnfanout

Глобальний плюс кожен ключ для контрольованого паралелізму.

using Mostlylucid.Ephemeral.Patterns.ControlledFanOut;

await using var fanout = new ControlledFanOut<string, Request>(
    keySelector: req => req.TenantId,
    body: async (req, ct) => await ProcessAsync(req, ct),
    maxGlobalConcurrency: 100,  // Total parallel across all tenants
    perKeyConcurrency: 5);      // Max 5 parallel per tenant

// Items for same tenant processed with limit
await fanout.EnqueueAsync(requestA);  // Tenant1
await fanout.EnqueueAsync(requestB);  // Tenant1 - waits if 5 already running
await fanout.EnqueueAsync(requestC);  // Tenant2 - parallel with Tenant1

await fanout.DrainAsync();

Адаптивна служба RateServiceName

**Пакунок: ** threecluded.ephemeral. patterns. adriverate

Швидкість керування сигналами обмежено автоматичним зворотним зв' язком.

using Mostlylucid.Ephemeral.Patterns.AdaptiveRate;

await using var service = new AdaptiveRateService<ApiRequest>(
    async (req, ct) => await CallApiAsync(req, ct),
    maxConcurrency: 8);

// Process with automatic rate limit handling
await service.ProcessAsync(request);

// When API returns 429, emit signal with retry-after
// Signal: "rate-limit:500ms"
// Service auto-parses and delays

Console.WriteLine($"Pending: {service.PendingCount}, Active: {service.ActiveCount}");

DynamicCurrencyDemo

**Пакунок: ** threecluded.ephemeral. patterns.dynamicconcurrance

Часовий масштаб збігу, заснований на сигналах завантаження.

using Mostlylucid.Ephemeral.Patterns.DynamicConcurrency;

var sink = new SignalSink();

await using var demo = new DynamicConcurrencyDemo<WorkItem>(
    async (item, ct) => await ProcessAsync(item, ct),
    sink,
    minConcurrency: 2,
    maxConcurrency: 32,
    scaleUpPattern: "load.high",
    scaleDownPattern: "load.low");

await demo.EnqueueAsync(item);

// Concurrency adjusts automatically based on signals
sink.Raise("load.high");  // Concurrency doubles (up to max)
sink.Raise("load.low");   // Concurrency halves (down to min)

Console.WriteLine($"Current concurrency: {demo.CurrentMaxConcurrency}");

await demo.DrainAsync();

KeyedPriorityFaneOut

**Пакунок: ** threecluded.ephemeral. patterns. keyedpriorityfanout

Лінії пріоритету зі збереженням порядку клавіш.

using Mostlylucid.Ephemeral.Patterns.KeyedPriorityFanOut;

await using var fanout = new KeyedPriorityFanOut<string, UserCommand>(
    keySelector: cmd => cmd.UserId,
    body: async (cmd, ct) => await HandleCommand(cmd, ct),
    maxConcurrency: 32,
    perKeyConcurrency: 1,  // Sequential per user
    maxPriorityDepth: 100);

// Normal lane
await fanout.EnqueueAsync(normalCommand);

// Priority lane - jumps the queue for that user
bool accepted = await fanout.EnqueuePriorityAsync(urgentCommand);

// Check lane depths
var counts = fanout.PendingCounts;
Console.WriteLine($"Priority: {counts.Priority}, Normal: {counts.Normal}");

await fanout.DrainAsync();

ReactiveFanOutPipeline

**Пакунок: ** threecluded.ephemeral. patterns. reactivefanout

Двостулковий трубопровод з автоматичним придушенням.

using Mostlylucid.Ephemeral.Patterns.ReactiveFanOut;

await using var pipeline = new ReactiveFanOutPipeline<WorkItem>(
    stage2Work: async (item, ct) => await SlowProcessing(item, ct),
    preStageWork: async (item, ct) => await FastPreprocessing(item, ct),
    stage1MaxConcurrency: 8,
    stage1MinConcurrency: 1,
    stage2MaxConcurrency: 4,
    backpressureThreshold: 32,  // Throttle when stage2 has 32+ pending
    reliefThreshold: 8);        // Resume when stage2 drops below 8

await pipeline.EnqueueAsync(item);

// Stage1 auto-throttles when stage2 backs up
Console.WriteLine($"Stage1 concurrency: {pipeline.Stage1CurrentMaxConcurrency}");
Console.WriteLine($"Stage2 pending: {pipeline.Stage2Pending}");

await pipeline.DrainAsync();

SignAnomalyDetector

**Пакунок: ** threecluded.ephemeral. patterns. anomaly definer

Виявлення аномалій під час руху вікна.

using Mostlylucid.Ephemeral.Patterns.AnomalyDetector;

var sink = new SignalSink();

var detector = new SignalAnomalyDetector(
    sink,
    pattern: "error.*",
    threshold: 5,
    window: TimeSpan.FromSeconds(10));

// Check for anomalies
if (detector.IsAnomalous())
{
    Console.WriteLine("Anomaly detected! Too many errors.");
    TriggerAlert();
}

// Get current match count
int errorCount = detector.GetMatchCount();
Console.WriteLine($"Errors in window: {errorCount}");

SignCorized Readeds

**Пакунок: ** threecluded.ephemeral. patterns.signal cordordedreads

Кісс читає під час оновлень без жорстких замків.

using Mostlylucid.Ephemeral.Patterns.SignalCoordinatedReads;

// Run demo: readers pause when update signal is present
var result = await SignalCoordinatedReads.RunAsync(
    readCount: 10,
    updateCount: 1);

Console.WriteLine($"Reads: {result.ReadsCompleted}, Updates: {result.UpdatesCompleted}");
Console.WriteLine($"Signals: {string.Join(", ", result.Signals)}");

// Manual implementation:
var sink = new SignalSink();

await using var readers = new EphemeralWorkCoordinator<Query>(
    body,
    new EphemeralOptions
    {
        DeferOnSignals = new HashSet<string> { "update.in-progress" },
        Signals = sink
    });

// Readers auto-defer when update is running
sink.Raise("update.in-progress");  // Readers wait
sink.Raise("update.done");         // Readers resume

Сигнальний HttpClient

**Пакунок: ** threecluded.ephemeral. patterns.signalinghttp

Клієнт HTTP з сигналами поступу.

using Mostlylucid.Ephemeral.Patterns.SignalingHttp;

var httpClient = new HttpClient();
var request = new HttpRequestMessage(HttpMethod.Get, "https://example.com/large-file");

// Create an emitter from your coordinator
// (emitter is any ISignalEmitter - operations implement this)

byte[] data = await SignalingHttpClient.DownloadWithSignalsAsync(
    httpClient,
    request,
    emitter);

// Signals emitted during download:
// - stage.starting
// - progress:0
// - stage.request
// - stage.headers
// - stage.reading
// - progress:25, progress:50, progress:75, progress:100
// - stage.completed

SignLogWatcher

**Пакунок: ** threecluded.ephemeral. patterns.signallogwatcher

Спостерігати за вікнами сигналів про шаблони та зворотні виклики.

using Mostlylucid.Ephemeral.Patterns.SignalLogWatcher;

var sink = new SignalSink();

await using var watcher = new SignalLogWatcher(
    sink,
    onMatch: evt =>
    {
        Console.WriteLine($"Error detected: {evt.Signal} at {evt.Timestamp}");
        AlertOps(evt);
    },
    pattern: "error.*",
    pollInterval: TimeSpan.FromMilliseconds(200));

// Watcher runs in background, calling onMatch for each new error signal
sink.Raise("error.database");    // -> onMatch called
sink.Raise("error.timeout");     // -> onMatch called
sink.Raise("info.started");      // -> ignored (doesn't match pattern)

TelemerySignalHandler

**Пакунок: ** threecid.ephemeral. patterns.temeterry

Інтеграція з оцінками OpenTelemetrary/Applications.

using Mostlylucid.Ephemeral.Patterns.Telemetry;

// Use in-memory for testing, or implement ITelemetryClient for real telemetry
var telemetry = new InMemoryTelemetryClient();

await using var handler = new TelemetrySignalHandler(telemetry);

// Wire up to coordinator
var options = new EphemeralOptions
{
    OnSignal = signal => handler.OnSignal(signal)
};

// Signals are processed asynchronously
// - "error.*" signals -> TrackExceptionAsync
// - "perf.*" signals -> TrackMetricAsync
// - all signals -> TrackEventAsync

Console.WriteLine($"Queued: {handler.QueuedCount}");
Console.WriteLine($"Processed: {handler.ProcessedCount}");
Console.WriteLine($"Dropped: {handler.DroppedCount}");

// Check recorded events
var events = telemetry.GetEvents();

LongWindowDemo

**Пакунок: ** threecluded.ephemeral. patterns.ongwindodemo

Демонструє великі налаштування вікон для аудиторій.

using Mostlylucid.Ephemeral.Patterns.LongWindowDemo;

// Configure coordinator with large tracking window
var options = new EphemeralOptions
{
    MaxTrackedOperations = 10000,
    MaxOperationLifetime = TimeSpan.FromHours(24)
};

SignReactionShowcase

**Пакунок: ** threeclude.ephemeral. patterns.signalreactionscase

Демонструє шаблони сигнального сигналу і зворотні виклики.

using Mostlylucid.Ephemeral.Patterns.SignalReactionShowcase;

// See source for signal dispatch examples
// Demonstrates OnSignal, OnSignalAsync, CancelOnSignals, DeferOnSignals

Постійне вікно

**Пакунок: ** threecluded.ephemeral. patterns.expistent window

Вікно попередження з наполегливістю SQLite - процес уціліє під час перезапуску.

using Mostlylucid.Ephemeral.Patterns.PersistentWindow;

await using var window = new PersistentSignalWindow(
    "Data Source=signals.db",
    flushInterval: TimeSpan.FromSeconds(30));

// On startup: restore previous signals
await window.LoadFromDiskAsync(maxAge: TimeSpan.FromHours(24));

// Raise signals as normal
window.Raise("order.completed", key: "order-service");
window.Raise("payment.processed", key: "payment-service");

// Query signals
var recentOrders = window.Sense("order.*");

// Signals automatically flush every 30 seconds
// Also flushes on dispose

// Get stats
var stats = window.GetStats();
Console.WriteLine($"In memory: {stats.InMemoryCount}, Flushed: {stats.LastFlushedId}");

Коефіцієнт залежності

// Register in Startup/Program.cs
services.AddEphemeralWorkCoordinator<WorkItem>(
    async (item, ct) => await ProcessAsync(item, ct),
    new EphemeralOptions { MaxConcurrency = 8 });

// Named coordinators
services.AddEphemeralWorkCoordinator<WorkItem>("priority",
    async (item, ct) => await ProcessPriorityAsync(item, ct));

// Inject and use
public class MyService(IEphemeralCoordinatorFactory<WorkItem> factory)
{
    public async Task DoWork()
    {
        var coordinator = factory.CreateCoordinator();
        await coordinator.EnqueueAsync(new WorkItem());
    }
}

Сучасне коріння DI може надавати перевагу таким коротшим помічникам, як services.AddCoordinator<T>(...), services.AddScopedCoordinator<T>(...), або services.AddKeyedCoordinator<T, TKey>(...) з тих пір, як вони читають, як нормальні AddX Реєстрації; вони просто доручають ефемеральні помічники під капотом.


Frameworks

  • . NET 6. 0, 7. 0, 8. 0, 9. 0, 10. 0

Ліцензія

Незареєстрований (публічний домен)

logo

© 2026 Scott Galloway — Unlicense — All content and source code on this site is free to use, copy, modify, and sell.