Маленька примітивна система, яка перетворює регулярну роботу у координовану, адаптивну систему.
"Зразок ефемеральних сигналів"
Вхід Частина 1 Ми побудували ефемеральну виконання - прив'язану, приватну, самоочищену робочу систему. Частина 2 Ми перетворили це на реабілітативну бібліотеку з координаторами, ключовими трубопроводами та інтеграцією DI.
У цій статті додається ще одна маленька особливість, яка змінює все: сигналів.
Зараз це у пакунку maculicid.ephemerals Nuget також. більш ніж 20 qualized.ephemerals patterns and 'atoms'.
Повнофункціональний код знаходиться у speciallucid.atoms GitHub сховище
Сигнальна інфраструктура живе у:
| ------ | --------- |
|---|---|
| EphemeralOperation.cs Д. д. д. д. д. д. д. д. д. д. д. д. | |
EphemeralOptions.cs дідька- реактивна конфігурація (CancelOnSignals, DeferOnSignals, OnSignal, OnSignalRetracted) |
|
| StringPreakenMatcher.cs Шаблон стилю slob- style, що відповідає параметру zespa ♪ | |
SignDispatcher.cs } Асинхронний сигнал, що працює з шаблоном, що відповідає (підтримує *, ?, списки комів, детермінативний порядок] |
|
| Приклади/ SignalingHttpClient.cs Дзвінок з дрібним випроміненням сигналу для HTTP-окремих дзвінків | |
| Приклади/A AddressionService.cs ♪ Адаптивна ставка, що обмежується на основі сигналу] | |
| Приклади/SignalBasedCircuitBreaker.cs | |
| Приклади/TelemetrieSignalHandler.cs ♪ Async spect process processing with tempta} |
Кожен координатор знає про свої операції, але не знає, що відбувається в інших частинах системи.
// Translation coordinator has no idea that...
await translationCoordinator.EnqueueAsync(request);
// ...the API just hit a rate limit
// ...another service is experiencing backpressure
// ...a downstream dependency is slow
Ми могли б прокрутити явні залежності, але це створює зворотній зв'язок. обізнаність з мітлом - Координатори, які відчувають своє довкілля без прямого зв'язку.
Сигнали дають змогу атомам виконання залишати сліди у їх ефемеральному вікні. Ці сліди поводяться як короткострокові факти:
Координатори можуть змінити свою поведінку на основі сигналів, видимих у вікні.
Від Сигнали.cs:
public readonly record struct SignalEvent(
string Signal,
long OperationId,
string? Key,
DateTimeOffset Timestamp,
SignalPropagation? Propagation = null)
{
public int Depth => Propagation?.Depth ?? 0;
public bool WouldCycle(string signal) => Propagation?.Contains(signal) == true;
public bool Is(string name) => Signal == name;
public bool StartsWith(string prefix) => Signal.StartsWith(prefix, StringComparison.Ordinal);
}
Операція може викликати сигнали під час виконання. Ці сигнали зберігаються у ефемеральному вікні поряд з операцією. Після того, як операція вичерпається, сигнали з' являються разом з нею.
Ні посередника, ні окремої інфраструктури, тільки струни, пов'язані з операціями.
Оскільки сигнали живуть у ефемеральному вікні, вони успадковують його гарантії: розмір, автоматичний вік і нульовий життєвий цикл над головою.
Сам по собі сигнал не спричиняє виконання. Он записывает только факт в эфемеральном окне. Нічого не працює, тому що сигнал було випромінено.
Якщо координатор визначає обробник онсініну, він синхронно працює під час випромінювання сигналу, але лише тому, що координатор його долучив. Ремісники не знають і не дбають про них. Вилучення всіх засобів обробки призводить до незмінної поведінки ядра.
Сигнал приєднаний лише до атома/операції, що його випромінював. Жоден сигнал ніколи не мутує або не звертає уваги на інший атом. Немає спільного придатного для запису автобуса.
Виділення сигналу додає факт до історії. Сигнали ніколи не оновлюються або не перезаписуються. Ретракції усувають лише власні сигнали від передач.
Сигнали існують лише в межах переднього вікна. Вони автоматично застарівають, коли вікно старіє. Ніщо не триває, якщо ви не нарощуєте наполегливості.
Коли передній проскакує сигнали для ключа "К'ю," він відскакує:
атоми у вікні
і сигнали локальні для цих атомів Спостерігачі ніколи не змінюють стан "апарата."
Якщо ви бажаєте, щоб приводив асинхронний робочий потік, вам слід скористатися:
SignDispatcher
AsyncSignal Processionor
або інші адаптатори.
Це необов'язкові шари над сигналами, а не частина семантики.
Жоден атом не може змінити знімок, стан, сигнали або метадані іншого атома. Координація відбувається через:
сигналів
Відчуття
вікна
правила
Не через листи.
Поверхня, що складається з сигналів або рою, може показувати комбінований вигляд джинів але це завжди тільки для читання, ніколи не авторитетний, ніколи не придатний для запису.
Якщо всі обробники (OnSignal, диспетчери, процесори) будуть відокремлені, система залишається повністю правильною і передбачуваною. Сигнали все ще мають значення, бо це факти, а не причини.
Події як телефонний дзвінок:
"Я дзвоню тобі прямо зараз, підніми руку і зреагуй."
Сигнали подібні до слідів у снігу:
"Я залишив сліди, якщо ти хочеш знати, куди я пішов - поглянь, якщо тобі байдуже - ігноруй це."
Ось чому сигнали ніколи не ламаються, не блокують і ніколи не взаємодіють з контрольним потоком, якщо ви не будете вибір брати участь у опитуванні.
Важливість різниці, оскільки сигнали зникають:
♪. |---------|:--------------:|:----------------:| → Підписи "Немає" Дзвінок відлуння/вихід/вихід/вихід/вихід/вихід/вихід/випромінювання/побачення. Передня частина циклу може викликати об'єднувача без явного запитання Д. д. д. д. д. д. д. д. д. д. д. ст. д. д. д. д. д. д. д. д. д. д. д. д. д. д. д. д. д. д. Передавання/постачальності має бути віддане/постачальникам без дози, просто транслюватись
*Пенсальності ♪ Один обробник зазнає невдачі, ланцюгові переривання ♪ No chain to break ♪
Дівки. |---------|--------|---------| ♪ Soupling ♪ (publisher → subscribes) ♪ None' ♪ ♪ Timing ♪ Im сяйнула ♪ ♪ сякового ♪ Необов' язкове' язково ♪ Д_ д. Д_ д. Дзвінок у стилі "LRU" означає "Немає." Життя}Інстані автоматично} Мається на увазі, що немає.
Наближається до подій (класичне):
public event Action RateLimited;
try
{
await CallApiAsync();
}
catch (RateLimitException)
{
RateLimited?.Invoke(); // Makes someone else act right now
}
Проблеми:
Наближається сигнал (пророчий):
try
{
await CallApiAsync(ct);
}
catch (RateLimitException ex)
{
op.Signal($"rate-limit:{ex.RetryAfterMs}ms");
throw;
}
Тут нічого не відбувається.
if (translator.HasSignal("rate-limit"))
{
await Task.Delay(1000); // Act when *we* choose
}
Примітки:
Керування передаванням подій. Сигнали контексту передачі.
Це вся уявна модель в одному реченні.
Залежно від вашого тла:
♪ |----------|------------| | Думки систем ♪ A stigmergic субстрат для непрямих цілей} | Інженери ♪ Light ep метадані, прив'язані до операцій у обмеженому вікні ♪ | Об' єкти PL/ CCurrency ♪ Неявно, скронева дошка парується з об'єднаною семантичною дошкою | Користувачі фреймів ♪ Pers in- process, shape- cleaning state pany, you can query at any time ♪
Сигнали - це сліди, залишені на спільній, обмеженому поверхні пам' яті. Будь- хто може подивитися на них. Ніхто не зобов' язаний реагувати. Це фундаментально stigmergic Модель координації - те саме, що й одна мурашка, та сама дошка, яку використовували на ранній стадії штучного інтелекту, і та сама одна сучасна мережа, яка поширює плітки.
using var activity = source.StartActivity("ProcessOrder");
activity?.SetTag("order.id", orderId);
activity?.SetTag("rate.limited", true);
Найкраще дляРозподіл зв'язків між службами, довгострокове сховище телеметрії, ідентифікатори кореляцій.
Використовувати телеметрію у: Вам потрібно стежити за запитами через декілька служб, зберігати метричні дані для аналізу або об'єднувати їх з інструментами спостереження.
Використовувати ефемеральні сигнали, якщоВам потрібна свідомість, координація реакцій, або телеметрія.
var rateLimits = Observable.FromEventPattern<RateLimitEventArgs>(
h => api.RateLimitHit += h,
h => api.RateLimitHit -= h);
rateLimits
.Throttle(TimeSpan.FromSeconds(1))
.Subscribe(e => HandleRateLimit(e));
Найкраще для: Складна обробка подій, термінові операції, поєднання декількох потоків даних подій.
Використовувати Rx у: Вам потрібні складні світські запити (відкривання, дебюнгування, об' єднання потоків).
Використовувати ефемеральні сигнали, якщо: Вам потрібні простіші відчуття, автоматичне очищення або інтеграція з стеженням за операціями.
public class RateLimitNotification : INotification
{
public int RetryAfterMs { get; init; }
}
await _mediator.Publish(new RateLimitNotification { RetryAfterMs = 5000 });
Найкраще для: Необроблена обробка подій під час обробки з декількома обробниками.
Використовувати медіатр у разі: Ви хочете, щоб декілька обробників відповідали синхронно одній події.
Використовувати ефемеральні сигнали, якщо: Ви хочете відчути себе без явної підписки, самоочищення історії або інтеграції з обмеженою виконанням.
var circuitBreaker = Policy
.Handle<HttpRequestException>()
.CircuitBreakerAsync(5, TimeSpan.FromSeconds(30));
Найкраще для: Сумніви щодо окремих дзвінків з автоматичним управлінням державою.
Використовувати у полі: Вам потрібна гнучкість під час кожного дзвінка з автоматичними напіввідкритими/ закритими переходами.
Використовувати ефемеральні сигнали, якщо: Ви бажаєте знати, що відбувається через багато операцій, нетипову логіку або інтеграцію з стеженням за операціями.
Об' єднати їх: Користуйтесь полілі всередині робочого тіла, вивільняйте сигнали під час подорожі по колу.
Передня частина логарифмічної системи. |----------|:-------------:|:---------------:|:---------:|:------------:|:-----------:| OpenTallemecus далі: далі, у точці, розташованій на початку, ми маємо справу з цими інструментами, а не з ними. Передбачається, що це буде більше, ніж потрібно. Дівчино. Узвишшя Округ Перетин | Ефемеричні сигнали А це означає, що ми маємо на увазі те, що ми робимо.
Обробка операцій ISignalEmitter:
public interface ISignalEmitter
{
// Emit signals
void Emit(string signal);
bool EmitCaused(string signal, SignalPropagation? cause);
// Retract (remove) signals
bool Retract(string signal);
int RetractMatching(string pattern);
bool HasSignal(string signal);
long OperationId { get; }
string? Key { get; }
}
Всередині вашого робочого тіла:
await coordinator.ProcessAsync(async (item, op, ct) =>
{
try
{
var result = await CallExternalApiAsync(item, ct);
if (result.WasCached)
op.Signal("cache-hit");
if (result.Duration > TimeSpan.FromSeconds(2))
op.Signal("slow-response");
}
catch (RateLimitException ex)
{
op.Signal("rate-limit");
op.Signal($"rate-limit:{ex.RetryAfterMs}ms");
throw;
}
catch (TimeoutException)
{
op.Signal("timeout");
throw;
}
});
Сигнали - це просто рядки. Використовувати прості назви ("rate-limit") або структуровані назви ("rate-limit:5000ms").
Фільтри візерунків використовують семантику glob (*, ?) і підтримка списків ком ("error.*,timeout"Порівнювання є детермінованим і розподіленим світлом через StringPatternMatcher.
Для дуже докладної економії ви можете надсилати сигнали на кожному етапі роботи. У бібліотеці є вибірка. Сигнальний HttpClient що демонструє цей шаблон:
using Mostlylucid.Helpers.Ephemeral.Examples;
// Inside your work body where you have access to the operation's emitter:
await coordinator.ProcessAsync(async (request, op, ct) =>
{
var data = await SignalingHttpClient.DownloadWithSignalsAsync(
httpClient,
new HttpRequestMessage(HttpMethod.Get, request.Url),
op, // ISignalEmitter
ct);
// Process the downloaded data...
});
Це випромінює сигнали на кожному етапі:
Апаратура
|--------|------|
| stage.starting ♪ Перед просьбою йде ♪
| progress:0 forcurrence separe seconds
| stage.request ♪ HTTP- запит надіслано ♪
| stage.headers Передавання + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + $2 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +])
| stage.reading Д. д. д. д. д. д. д. д. д. д. д. д. д. д. д. д. д. д. д. д. д. ст. д. д. д. д. д. д. д. д. д. д. д. д. д. д. д.
| progress:XX } Відсоток уперед (0- 100) під час звантаження}
| stage.completed ♪Звантажте до кінця ♪
Після цього ви можете надіслати запит до цих шаблонів, що відповідають шаблону:
// Find all stage transitions
var stages = coordinator.GetSignalsByPattern("stage.*");
// Check download progress
var progress = coordinator.GetSignalsByPattern("progress:*");
// Check if any download is still in progress
if (coordinator.HasSignalMatching("stage.reading") &&
!coordinator.HasSignalMatching("stage.completed"))
{
// Download in progress
}
Дії також можуть вилучати власні сигнали. Це корисно для тимчасових станів:
await coordinator.ProcessAsync(async (item, op, ct) =>
{
// Mark as processing
op.Emit("processing");
try
{
await ProcessItemAsync(item, ct);
// Success - retract the processing signal
op.Retract("processing");
op.Emit("completed");
}
catch (RetryableException)
{
// Keep processing signal, add retry info
op.Emit("retrying");
}
catch (Exception)
{
// Remove all temporary signals
op.RetractMatching("processing*");
op.Emit("failed");
throw;
}
});
Так само, як і випромінювання сигналів, згортання можуть викликати зворотні виклики:
var coordinator = new EphemeralWorkCoordinator<Request>(
body,
new EphemeralOptions
{
// Sync retraction handler
OnSignalRetracted = evt =>
{
_metrics.DecrementGauge(evt.Signal);
Console.WriteLine($"Signal {evt.Signal} retracted from op {evt.OperationId}");
if (evt.WasPatternMatch)
Console.WriteLine($" (matched pattern: {evt.Pattern})");
},
// Async retraction handler
OnSignalRetractedAsync = async (evt, ct) =>
{
await _telemetry.TrackRetraction(evt.Signal, evt.OperationId, ct);
}
});
The SignalRetractedEvent включення:
Signal - Ім' я сигналу, відкликаногоOperationId - Операція, що відхилила їїKey - Ключ операції (якщо є)Timestamp - Коли відбувся спростуванняWasPatternMatch - Так, якщо відхилятися через RetractMatchingPattern - Використаний шаблон (якщо буде знайдено шаблон)await coordinator.ProcessAsync(async (request, op, ct) =>
{
// Check if we already have a rate limit signal
if (op.HasSignal("rate-limited"))
{
// We're in recovery mode
await Task.Delay(1000, ct);
}
try
{
var response = await _api.SendAsync(request, ct);
// Success! Remove any rate limit signal
if (op.Retract("rate-limited"))
{
op.Emit("rate-limit-cleared");
}
}
catch (RateLimitException ex)
{
op.Emit("rate-limited");
op.Emit($"rate-limit:{ex.RetryAfterMs}ms");
throw;
}
});
Всі координатори надають оптимізований запит на сигнал:
// Check if any recent operation hit a rate limit
if (coordinator.HasSignal("rate-limit"))
{
await Task.Delay(1000);
}
// Count slow responses in the window
var slowCount = coordinator.CountSignals("slow-response");
if (slowCount > 10)
{
await ThrottleAsync();
}
// Get signals by pattern
var httpErrors = coordinator.GetSignalsByPattern("http.error.*");
// Get signals since a time
var recentSignals = coordinator.GetSignalsSince(DateTimeOffset.UtcNow.AddMinutes(-1));
// Get signals for a specific key
var userSignals = coordinator.GetSignalsByKey("user-123");
Від EphemeralOptions.cs:
Координатори можуть автоматично реагувати на сигнали:
var coordinator = new EphemeralWorkCoordinator<Request>(
body,
new EphemeralOptions
{
// Cancel new work if these signals are present
CancelOnSignals = new HashSet<string> { "system-overload", "circuit-open" },
// Defer new work while these signals are present
DeferOnSignals = new HashSet<string> { "rate-limit" },
MaxDeferAttempts = 10,
DeferCheckInterval = TimeSpan.FromMilliseconds(100)
});
Коли сигнал в CancelOnSignals виявлено, нові елементи пропущено (не враховано як невдалі).
Коли сигнал в DeferOnSignals виявлено нові елементи, які чекають на вилучення сигналу.
Від Адаптивна програма Service.cs:
public class AdaptiveTranslationService : IAsyncDisposable
{
private readonly EphemeralWorkCoordinator<TranslationRequest> _coordinator;
private readonly ITranslationApi _translationApi;
public AdaptiveTranslationService(ITranslationApi translationApi)
{
_translationApi = translationApi;
_coordinator = new EphemeralWorkCoordinator<TranslationRequest>(
ProcessTranslationAsync,
new EphemeralOptions
{
MaxConcurrency = 8,
MaxTrackedOperations = 100,
// New work is deferred while any "rate-limit" or "rate-limit:*" signal is present
DeferOnSignals = new HashSet<string> { "rate-limit", "rate-limit:*" },
MaxDeferAttempts = 10,
DeferCheckInterval = TimeSpan.FromMilliseconds(100)
});
}
public async Task TranslateAsync(TranslationRequest request)
{
// Optional: extra politeness based on most recent retry-after
var rateLimitSignals = _coordinator.GetSignalsByPattern("rate-limit:*");
if (rateLimitSignals.Count > 0)
{
var latest = rateLimitSignals
.OrderByDescending(s => s.Timestamp)
.First()
.Signal; // "rate-limit:5000ms"
if (TryParseRetryAfter(latest, out var delay))
{
await Task.Delay(delay);
}
}
await _coordinator.EnqueueAsync(request);
}
public static bool TryParseRetryAfter(string signal, out TimeSpan delay)
{
delay = default;
var parts = signal.Split(':', 2);
if (parts.Length != 2) return false;
var payload = parts[1].Trim();
if (!payload.EndsWith("ms", StringComparison.OrdinalIgnoreCase)) return false;
var numPart = payload[..^2];
if (!int.TryParse(numPart, out var ms) || ms < 0) return false;
delay = TimeSpan.FromMilliseconds(ms);
return true;
}
}
Всі екземпляри цієї служби автоматично відхиляються під час виконання обмеження швидкості. Немає спільного стану. Не передаються повідомлення. Просто прочитайте вікно ефемерального перегляду.
Декілька координаторів можуть відчувати один одного через спільний ресурс. SignalSink:
public class OrderProcessingSystem
{
private readonly SignalSink _sharedSignals = new(maxCapacity: 1000);
private readonly EphemeralWorkCoordinator<Order> _orderProcessor;
private readonly EphemeralWorkCoordinator<PaymentRequest> _paymentProcessor;
public OrderProcessingSystem()
{
var options = new EphemeralOptions { Signals = _sharedSignals };
_orderProcessor = new EphemeralWorkCoordinator<Order>(
ProcessOrderAsync, options);
_paymentProcessor = new EphemeralWorkCoordinator<PaymentRequest>(
ProcessPaymentAsync, options);
}
public async Task ProcessOrderAsync(Order order)
{
// Check shared signals for payment gateway issues
if (_sharedSignals.Detect("gateway-error"))
{
await _retryQueue.EnqueueAsync(order);
return;
}
await _orderProcessor.EnqueueAsync(order);
}
}
[HttpGet("/health/detailed")]
public IActionResult GetDetailedHealth()
{
return Ok(new
{
translation = new
{
pending = _translationCoordinator.PendingCount,
active = _translationCoordinator.ActiveCount,
recentRateLimits = _translationCoordinator.CountSignals("rate-limit"),
recentTimeouts = _translationCoordinator.CountSignals("timeout"),
recentSuccess = _translationCoordinator.CountSignals("success"),
hasErrors = _translationCoordinator.HasSignalMatching("error.*")
},
payment = new
{
pending = _paymentCoordinator.PendingCount,
gatewayErrors = _paymentCoordinator.CountSignals("gateway-error"),
declines = _paymentCoordinator.CountSignals("declined"),
approvals = _paymentCoordinator.CountSignals("approved")
}
});
}
Бібліотека метричних даних не потрібна. Просто надішліть до вікна ефемерал.
Від SignalBasedCircuitBreaker.cs:
public class SignalBasedCircuitBreaker
{
private readonly string _failureSignal;
private readonly int _threshold;
private readonly TimeSpan _windowSize;
public SignalBasedCircuitBreaker(
string failureSignal = "failure",
int threshold = 5,
TimeSpan? windowSize = null)
{
_failureSignal = failureSignal;
_threshold = threshold;
_windowSize = windowSize ?? TimeSpan.FromSeconds(30);
}
public bool IsOpen<T>(EphemeralWorkCoordinator<T> coordinator)
{
var recentFailures = coordinator.GetSignalsSince(
DateTimeOffset.UtcNow - _windowSize);
return recentFailures.Count(s => s.Signal == _failureSignal) >= _threshold;
}
public int GetFailureCount<T>(EphemeralWorkCoordinator<T> coordinator)
{
var recentFailures = coordinator.GetSignalsSince(
DateTimeOffset.UtcNow - _windowSize);
return recentFailures.Count(s => s.Signal == _failureSignal);
}
}
// Usage
var circuitBreaker = new SignalBasedCircuitBreaker("api-error", threshold: 3);
if (circuitBreaker.IsOpen(_coordinator))
{
throw new CircuitOpenException("Too many recent API errors");
}
await _coordinator.EnqueueAsync(request);
Проривний апарат не має власного стану - він просто читає ефемеральне вікно.
Від Сигнали.cs:
Коли сигнали можуть викликати інші сигнали, ви ризикуєте потрапити в нескінченний цикл. SignalConstraints запобігти цьому:
var options = new EphemeralOptions
{
SignalConstraints = new SignalConstraints
{
// Max propagation depth before blocking
MaxDepth = 10,
// Prevent A → B → A cycles
BlockCycles = true,
// Signals that end propagation chains
TerminalSignals = new HashSet<string> { "completed", "failed", "resolved" },
// Signals that emit but don't propagate
LeafSignals = new HashSet<string> { "logged", "metric" },
// Callback when a signal is blocked
OnBlocked = (signal, reason) =>
{
_logger.LogWarning("Signal {Signal} blocked: {Reason}",
signal.Signal, reason);
}
}
};
Залежність від доріжки за допомогою EmitCaused:
public void HandleSignal(SignalEvent evt, ISignalEmitter emitter)
{
if (evt.Is("order-placed"))
{
// This signal carries the propagation chain
// Will be blocked if it would create a cycle
emitter.EmitCaused("inventory-reserved", evt.Propagation);
}
}
Ланцюг розповсюдження слідує шляхом: order-placed → inventory-reserved → ...
Якщо inventory-reserved спроба випромінювання order-placed, він буде заблокований (позначений цикл).
Від Сигнали.cs:
Для сигналів, які мають бути видимі через координаторів:
public sealed class SignalSink
{
private readonly ConcurrentQueue<SignalEvent> _window;
private readonly int _maxCapacity;
private readonly TimeSpan _maxAge;
public SignalSink(int maxCapacity = 1000, TimeSpan? maxAge = null);
// Raise signals
public void Raise(SignalEvent signal);
public void Raise(string signal, string? key = null);
// Sense signals
public IReadOnlyList<SignalEvent> Sense();
public IReadOnlyList<SignalEvent> Sense(Func<SignalEvent, bool> predicate);
public bool Detect(string signalName);
public bool Detect(Func<SignalEvent, bool> predicate);
public int Count { get; }
}
Використання:
// Create a shared sink
var sink = new SignalSink(maxCapacity: 1000, maxAge: TimeSpan.FromMinutes(2));
// Configure coordinators to use it
var options = new EphemeralOptions { Signals = sink };
// Or raise signals directly
sink.Raise("system-maintenance");
// Sense from anywhere
if (sink.Detect("system-maintenance"))
{
await DeferWorkAsync();
}
Відповідність у стилі Glob для фільтрування сигналів:
// Exact match
coordinator.HasSignal("rate-limit");
// Wildcard patterns
coordinator.HasSignalMatching("http.*"); // http.timeout, http.error
coordinator.HasSignalMatching("error.*.critical"); // error.payment.critical
coordinator.HasSignalMatching("user-???-failed"); // user-123-failed
// Comma-separated patterns in CancelOnSignals/DeferOnSignals
new EphemeralOptions
{
CancelOnSignals = new HashSet<string>
{
"system-overload, circuit-open", // Either pattern
"error.*" // Any error signal
}
}
Зберігати сигнали простими і послідовними:
// Good - simple, categorical
op.Signal("success");
op.Signal("failure");
op.Signal("rate-limit");
op.Signal("timeout");
op.Signal("cache-hit");
// Good - structured for parsing
op.Signal("rate-limit:5000ms");
op.Signal("retry:attempt-3");
op.Signal("slow:2500ms");
op.Signal("http.error:429");
// Good - hierarchical for pattern matching
op.Signal("payment.declined");
op.Signal("payment.approved");
op.Signal("payment.gateway-error");
// Avoid - entity identification belongs in Key, not signals
op.Signal("user-123-rate-limited"); // Bad
// Instead
op.Key = "user-123";
op.Signal("rate-limit");
Синхронний обробник сигналів (OnSignal) запустити на нитку операції} підтримувати їх швидко. Щоб виконати завдання I/O-bound, сигнали вентилятора в асинхронному шляху з SignalDispatcher (подібний збіг, детермінований порядок) або AsyncSignalProcessor.
await using var dispatcher = new SignalDispatcher(new EphemeralOptions
{
MaxConcurrency = Environment.ProcessorCount,
MaxConcurrencyPerKey = 1 // sequential per signal name by default
});
dispatcher.Register("error.*", evt => _alerts.SendAsync(evt.Signal));
dispatcher.Register("progress:*", evt => _metrics.Record(evt.Signal));
// In coordinator options, keep OnSignal chor options, keep OnSignal cheap and enqueue
var coordinator = new EphemeralWorkCoordinator<Request>(
body,
new EphemeralOptions
{
OnSignal = dispatcher.Dispatch
});
Підтримка візерунків *, ?, і списки ком ("error.*,timeout"). Всі відповідні засоби обробки виконуються у порядку реєстрації на координаторі тла з ключами; джерело живлення залишається синхронізованим.
Для окремого процесу синхронізації:
await using var processor = new AsyncSignalProcessor(
async (signal, ct) =>
{
await _externalService.LogAsync(signal, ct);
},
maxConcurrency: 4,
maxQueueSize: 1000);
// Enqueue signals (returns immediately)
processor.Enqueue(new SignalEvent(
"rate-limit",
operationId,
key,
DateTimeOffset.UtcNow));
Повний приклад поєднання асинхронної обробки сигналів з інтеграцією телеметрії:
Джерело: TelemerySignalHandler.cs
public class TelemetrySignalHandler : IAsyncDisposable
{
private readonly AsyncSignalProcessor _processor;
private readonly ITelemetryClient _telemetry;
public TelemetrySignalHandler(ITelemetryClient telemetry)
{
_telemetry = telemetry;
_processor = new AsyncSignalProcessor(
HandleSignalAsync,
maxConcurrency: 8,
maxQueueSize: 5000);
}
// Synchronous entry point - returns immediately
public bool OnSignal(SignalEvent signal) => _processor.Enqueue(signal);
private async Task HandleSignalAsync(SignalEvent signal, CancellationToken ct)
{
var properties = new Dictionary<string, string>
{
["signal"] = signal.Signal,
["operationId"] = signal.OperationId.ToString(),
["key"] = signal.Key ?? "none"
};
await _telemetry.TrackEventAsync("EphemeralSignal", properties, ct);
// Categorized tracking based on signal prefix
if (signal.StartsWith("error"))
await _telemetry.TrackExceptionAsync(signal.Signal, properties, ct);
else if (signal.StartsWith("perf"))
await _telemetry.TrackMetricAsync(signal.Signal, 1, ct);
}
// Expose stats for monitoring
public int QueuedCount => _processor.QueuedCount;
public long ProcessedCount => _processor.ProcessedCount;
public long DroppedCount => _processor.DroppedCount;
public async ValueTask DisposeAsync() => await _processor.DisposeAsync();
}
Покарайте координатора:
await using var telemetryHandler = new TelemetrySignalHandler(telemetryClient);
await using var coordinator = new EphemeralWorkCoordinator<Request>(
ProcessAsync,
new EphemeralOptions
{
OnSignal = signal => telemetryHandler.OnSignal(signal)
});
Обробник:
OnSignal повертає негайноСигнали потужні, тому що вони ефемераль:
♪ |----------|---------| | Обв' язані ♪ Не може рости без обкладинок - старі значення age out ♪ | Самоочищення ♪Не потрібен код clete | ДековпінгCity in Alaska USA Еміттери не знають, що таке еталон | Спостереження Місячний код може відчувати поточний стан ♪ | Закритий ♪ Nobody user data - just captions names ♪ | Швидка Дзвінок О'1 з короткими циклами
Ефемеральне вікно вже існує для зневаджування. Сигнали просто дають йому семантичне значення.
Сигнали перетворюють ізольовані атоми виконання у a Відчуття мережі. Кожен координатор може:
CancelOnSignals і DeferOnSignalsЖодного брокера, ніякого спільного штату, ніякого протоколу координації, просто операції з метаданими, які природно розпадаються.
Атоми не спілкуються один з одним безпосередньо - вони просто залишають сліди в ефемеральному вікні, які можуть бачити інші.
Вогонь, сигнал, сенс, забудь.
© 2026 Scott Galloway — Unlicense — All content and source code on this site is free to use, copy, modify, and sell.