Під тиском: Як системи чергування виконують репресії з прикладами у C# (Українська (Ukrainian))

Під тиском: Як системи чергування виконують репресії з прикладами у C#

Sunday, 23 November 2025

//

13 minute read

Burpresserure - це несуттєвий герой розподілених систем. Це те, що утримує ваші черги від вибуху на швах, коли виробники виголошують повідомлення швидше, ніж споживачі можуть їх жувати. Просто кажучи: це система, яка каже " затримайте мить ," коли речі стають занадто зайнятими.

Справа ось в чому: технічні прийоми, які містяться в цій статті, стосуються майже всіх методів. кожні У черзі повідомлень і службовому автобусі }RabbitMQ, Kafka, Azure Service Bus, AWS SQS, NATS, ви називаєте його. Специфічні особливості відрізняються, але принципи є універсальними. Я використовуватиму RubleMQ для більшості прикладів, тому що це те, що я знаю найкраще, але я покажу вам, як ці шаблони перекладаються на різних платформах.

Сповідь: Навіть більшість розробників похилого віку не реалізують належного механізму репресування, вони створюють системи радісних шляхів, які чудово працюють у дискотеці, а потім замислюються над тим, чому виробництво падає протягом Чорної п'ятниці.

Що таке придушення

При його ядрі, зворотний тиск - це петля зворотного зв'язку, яка сповільнює виробників, коли споживачі відстають.

Без зворотного друку, швидкий виробник перехопить повільного споживача. Повідомлення скупчуються у черзі, пам' ять виснажується, і, зрештою, ваша система відпаде.

flowchart LR
    P[Producer] --> Q[Queue]
    Q --> C[Consumer]
    C -. "Slow down!" .-> P

    style P stroke:#f59e0b,stroke-width:2px
    style Q stroke:#0ea5e9,stroke-width:2px
    style C stroke:#10b981,stroke-width:2px

Краса відштовхування це те, що це - розмова Споживачі сигналі "Я ситий, дай мені хвилину," і виробник відповідає: "Нема проблем, я зачекаю."

Як Кроліман реагує на відштовхування

КроликМК має декілька вбудованих механізмів для роботи з спинного друку, і розуміння їх має вирішальне значення, якщо ви будуєте системи, які повинні залишатися прямо під навантаженням. Документація з CrootMQ Чудово. Я з'єднаюся з конкретними сторінками.

Керування потоком

Якщо використання пам' яті КроликMQ або глибина черги перевищить налаштовані порції, буде задіяно керування потоком. Цей тимчасовий блок з' єднань для вісників не може надсилати нових повідомлень, поки брокер не вичистить достатньо зворотного журналу. Див. також docs on нагадування для пам' яті@ info: whatsthis і нагадування на диску@ info: whatsthis який активує керування потоком.

flowchart TD
    subgraph "RabbitMQ Flow Control"
        A[Publisher Sends Message] --> B{Memory/Queue<br/>Threshold OK?}
        B -->|Yes| C[Message Accepted]
        C --> D[Add to Queue]
        B -->|No| E[Connection Blocked]
        E --> F[Publisher Waits]
        F --> G{Threshold<br/>Cleared?}
        G -->|No| F
        G -->|Yes| H[Connection Unblocked]
        H --> A
    end

    style E stroke:#ef4444,stroke-width:3px
    style H stroke:#10b981,stroke-width:2px

Ключовим є те, що КроликMQ не просто скидає повідомлення, коли під тиском, він сповільнює джерело.

Підтвердження споживача

Споживачі контролюють темп. підтвердження (АКЛ). Повідомлення не буде вилучено з черги, поки споживач не визнає його явно. Якщо споживач не надсилає повідомлень ACK достатньо швидко, черга росте, що врешті- решт призводить до керування потоком вгору за течією.

Ви також можете використовувати границі перед випередженням (QOS) для керування кількістю непізнаних повідомлень, які може одночасно мати споживач у польоті. Таким чином можна запобігти одному повільному споживачу зберігати повідомлення.

// Set prefetch count to limit unacknowledged messages
channel.BasicQos(prefetchSize: 0, prefetchCount: 10, global: false);

Це каже КроликМК: "Тільки відправте мені 10 повідомлень за раз. Як тільки я пропущу деякі, ви можете надіслати більше ." Це споживач чітко вказує на те, скільки тиску він може витримати. Документація клієнта. NET докладно описано API.

Як інші системи поводяться з репресієм

Шаблони універсальні, але реалізація різниться. Ось як деякі інші популярні системи обміну повідомленнями підходять до тієї ж проблеми.

Kafka: опитування зі споживачем

Кафка займає фундаментально різні підходи до людей, які мають справу з консюмерами. pull повідомлення замість того, щоб їх натискати. Ця дія робить зворотний тиск неявним: якщо споживач не опитує, він не отримуватиме повідомлень. брокерові байдуже, він просто зберігає повідомлення навколо, доки споживач не буде готовий.

// Kafka consumer with explicit backpressure control
using var consumer = new ConsumerBuilder<string, string>(config).Build();
consumer.Subscribe("orders");

while (!cancellationToken.IsCancellationRequested)
{
    // Only fetch what you can handle - this IS your backpressure
    var result = consumer.Consume(timeout: TimeSpan.FromSeconds(1));

    if (result != null)
    {
        await ProcessMessageAsync(result.Message.Value);

        // Manual commit = explicit acknowledgement
        consumer.Commit(result);
    }

    // If processing is slow, you simply poll less frequently
    // Kafka doesn't push more messages at you
}

Розумний біт: споживчі групи Кафки автоматично зрівноважують розділи. Якщо один з споживачів відпаде, ви зможете додати більше споживачів до групи, а розділи буде перерозподілено. Відштовхування стає прийняттям рішення щодо масштабування.

// Control batch size to manage memory pressure
var config = new ConsumerConfig
{
    BootstrapServers = "localhost:9092",
    GroupId = "order-processors",
    AutoOffsetReset = AutoOffsetReset.Earliest,
    MaxPollIntervalMs = 300000,      // 5 mins max between polls
    MaxPartitionFetchBytes = 1048576, // 1MB max per partition fetch
    FetchMaxBytes = 52428800          // 50MB max total fetch
};

Service Bus Azure: керування поточними повідомленнями

Azure Service Bus використовує a MaxConcurrentCalls Параметр, який є дуже простим, визначає кількість повідомлень, якими процесор керує одночасно. Зворотний тиск є автоматичним.

var processor = client.CreateProcessor("orders-queue", new ServiceBusProcessorOptions
{
    // This IS your backpressure - only process 10 at a time
    MaxConcurrentCalls = 10,
    AutoCompleteMessages = false,
    PrefetchCount = 20  // Buffer 20 messages locally
});

processor.ProcessMessageAsync += async args =>
{
    try
    {
        await ProcessOrderAsync(args.Message.Body.ToString());
        await args.CompleteMessageAsync(args.Message);
    }
    catch (Exception ex)
    {
        // Abandon returns message to queue for retry
        await args.AbandonMessageAsync(args.Message);
    }
};

processor.ProcessErrorAsync += args =>
{
    Console.WriteLine($"Error: {args.Exception.Message}");
    return Task.CompletedTask;
};

await processor.StartProcessingAsync();

Service Bus Azure також підтримує сеанси для впорядкованої обробки і чергові lead- letter Для того, щоб знову й знову передавати інформацію, яка не відповідає дійсності, важливо, коли щось іде не так.

AWS SQS: танок Тайм- аут видимості

SQS використовує час очікування для зворотного друку. Коли ви отримуєте повідомлення, він стає невидимим для інших споживачів. Якщо ви не вилучите його вчасно, програма знову з' явиться для того, щоб хтось інший спробував.

var sqsClient = new AmazonSQSClient();

// Receive with explicit backpressure control
var response = await sqsClient.ReceiveMessageAsync(new ReceiveMessageRequest
{
    QueueUrl = queueUrl,
    MaxNumberOfMessages = 10,           // Batch size = backpressure control
    WaitTimeSeconds = 20,               // Long polling
    VisibilityTimeout = 300             // 5 mins to process before retry
});

foreach (var message in response.Messages)
{
    try
    {
        await ProcessAsync(message.Body);

        // Only delete after successful processing
        await sqsClient.DeleteMessageAsync(queueUrl, message.ReceiptHandle);
    }
    catch
    {
        // Don't delete - message will become visible again after timeout
        // Optionally, change visibility timeout to retry sooner
        await sqsClient.ChangeMessageVisibilityAsync(queueUrl,
            message.ReceiptHandle, visibilityTimeout: 0);
    }
}

Розумний трюк SQS: використання ApproximateNumberOfMessages для спостереження за глибиною черги і клієнтами автомасштабів:

var attributes = await sqsClient.GetQueueAttributesAsync(new GetQueueAttributesRequest
{
    QueueUrl = queueUrl,
    AttributeNames = new List<string> { "ApproximateNumberOfMessages" }
});

var depth = int.Parse(attributes.Attributes["ApproximateNumberOfMessages"]);

if (depth > 1000)
{
    // Signal to scale up consumers
    await TriggerAutoScalingAsync();
}

NATS JatStream: вбудоване керування потоком

NATS JetStream має явне керування потоком з прийомами споживачів і максимальне обмеження на кількість повідомлень:

var js = connection.CreateJetStreamContext();

var subscription = js.PushSubscribeAsync("orders.>", (sender, args) =>
{
    try
    {
        ProcessMessage(args.Message.Data);
        args.Message.Ack();
    }
    catch
    {
        args.Message.Nak();  // Negative ack - redeliver
    }
}, new PushSubscribeOptions.Builder()
    .WithConfiguration(new ConsumerConfiguration.Builder()
        .WithMaxAckPending(100)     // Max unacked messages - THIS is backpressure
        .WithAckWait(30000)         // 30 seconds to ack
        .Build())
    .Build());

Загальна гілка

Зауважте, що спільного мають ці системи:

  1. Обмеження пакетності/ коректності - ты управляешь всем, что хочешь сделать.
  2. Потік, заснований на підтвердженні - повідомлення залишатимуться доступними, поки ви не підтвердите обробку
  3. Поновлення на основі часу очікування - якщо ви програєте, повідомлення повернуться для повторення
  4. Спостереження за глибинами - ты всегда можешь спросить "как я поддерживаю?"

Синтаксис відрізняється, але танець однаковий: "Ось з чим я можу впоратися, скажи мені, коли я це зробив. Якщо я не скажу тобі вчасно, пригадайте, що я невдала."

Приклади C# Code

Так, давайте перейдемо до коду. Ось приклади впровадження і реагування на зворотний тиск у ваших програмах C#.

Спостереження за глибиною черги

Ось як перевірити кількість повідомлень, що чекають у черзі:

var queue = channel.QueueDeclare(
    queue: "tasks",
    durable: true,
    exclusive: false,
    autoDelete: false);

Console.WriteLine($"Messages ready: {queue.MessageCount}");

// React to queue depth
if (queue.MessageCount > 1000)
{
    Console.WriteLine("Queue backing up - consider throttling producers");
}

Цей фрагмент перевіряє кількість повідомлень, які чекають. Якщо рахунок піднімається, це ваш вибір виробникам газів або збільшити кількість споживачів. Не переймайтеся тим, що це надто часте перевірка на стан здоров'я, зазвичай, достатньо.

Підтвердження видавця

Видавець підтверджує, що вам відомо, коли КроликMQ успішно отримав і оприлюднив ваше повідомлення. Якщо підтвердження уповільнюються, це чіткий сигнал відштовхування:

// Enable publisher confirms
channel.ConfirmSelect();

var body = Encoding.UTF8.GetBytes("Hello, Queue!");

channel.BasicPublish(
    exchange: "",
    routingKey: "tasks",
    basicProperties: null,
    body: body);

// Wait for confirmation - timeout indicates backpressure
bool confirmed = channel.WaitForConfirms(TimeSpan.FromSeconds(5));

if (!confirmed)
{
    Console.WriteLine("Message not confirmed - broker may be under pressure");
}

Якщо КроликМК бореться, підтвердження вимагають більше часу або більше часу. Ваш виробник може використати цей сигнал, щоб вимкнути його, замість того, щоб підсилити більше тисків.

Для сценаріїв високого рівня вам потрібні асинхронні підтвердження:

channel.ConfirmSelect();

var outstandingConfirms = new ConcurrentDictionary<ulong, string>();

channel.BasicAcks += (sender, ea) =>
{
    if (ea.Multiple)
    {
        var confirmed = outstandingConfirms.Where(k => k.Key <= ea.DeliveryTag);
        foreach (var entry in confirmed)
        {
            outstandingConfirms.TryRemove(entry.Key, out _);
        }
    }
    else
    {
        outstandingConfirms.TryRemove(ea.DeliveryTag, out _);
    }
};

channel.BasicNacks += (sender, ea) =>
{
    // Message was rejected - implement retry logic
    Console.WriteLine($"Message {ea.DeliveryTag} was nacked - broker under pressure");
    // Back off before retrying
};

Повторні спроби з Expential Backout

Коли ви помічаєте зворотний тиск, найгірша річ, яку ви можете зробити, - це негайно спробувати з повною швидкістю. Це те саме, що відповісти на затор, натиснувши акселератор сильніше. Замість цього, реалізуйте експоненційний зворотний зв' язок:

public async Task PublishWithBackpressureAsync(
    IModel channel,
    byte[] body,
    int maxRetries = 5)
{
    int attempt = 0;

    while (attempt < maxRetries)
    {
        try
        {
            channel.ConfirmSelect();
            channel.BasicPublish(
                exchange: "",
                routingKey: "tasks",
                basicProperties: null,
                body: body);

            if (channel.WaitForConfirms(TimeSpan.FromSeconds(5)))
            {
                return; // Success
            }

            throw new Exception("Publish not confirmed");
        }
        catch (Exception ex)
        {
            attempt++;

            if (attempt >= maxRetries)
            {
                throw new Exception($"Failed to publish after {maxRetries} attempts", ex);
            }

            // Exponential backoff: 1s, 2s, 4s, 8s, 16s
            var delay = TimeSpan.FromSeconds(Math.Pow(2, attempt - 1));
            Console.WriteLine($"Backpressure detected - retry {attempt} after {delay}");

            await Task.Delay(delay);
        }
    }
}

Ця програма імітує модель HTTP 429 (у багатьох запитах). Замість того, щоб перебивати брокера, ми призупиняємо спробу, надаючи системному часу для відновлення.

Використання каналів для зворотного друку

Якщо ви створюєте внутрішній трубопровід (продуктор → процесор → у всіх межах Вашої програми), то це означає, що ви можете створити його.NET's Channel<T> надає елегантну підтримку спинного друку:

// Create a bounded channel - backpressure is automatic
var channel = Channel.CreateBounded<WorkItem>(new BoundedChannelOptions(100)
{
    FullMode = BoundedChannelFullMode.Wait // Block producer when full
});

// Producer - will automatically wait when channel is full
async Task ProduceAsync(ChannelWriter<WorkItem> writer)
{
    for (int i = 0; i < 10000; i++)
    {
        var item = new WorkItem { Id = i };

        // This awaits if the channel is at capacity
        await writer.WriteAsync(item);

        Console.WriteLine($"Produced item {i}");
    }

    writer.Complete();
}

// Consumer - processes at its own pace
async Task ConsumeAsync(ChannelReader<WorkItem> reader)
{
    await foreach (var item in reader.ReadAllAsync())
    {
        // Simulate slow processing
        await Task.Delay(100);
        Console.WriteLine($"Processed item {item.Id}");
    }
}

// Run both concurrently
await Task.WhenAll(
    ProduceAsync(channel.Writer),
    ConsumeAsync(channel.Reader)
);

Обмежений канал автоматично застосовує зворотні натискувальні лапки виробника, коли канал переповнений, природно сповільнюється, щоб відповідати темпу споживача.

Повноцінний видавець комп'ютерних програм

Ось більш вичерпний приклад, який об'єднує спостереження, підтвердження і зворотний зв'язок:

public class BackpressureAwarePublisher : IDisposable
{
    private readonly IConnection _connection;
    private readonly IModel _channel;
    private readonly string _queueName;
    private readonly int _queueDepthThreshold;

    public BackpressureAwarePublisher(
        string hostName,
        string queueName,
        int queueDepthThreshold = 1000)
    {
        var factory = new ConnectionFactory { HostName = hostName };
        _connection = factory.CreateConnection();
        _channel = _connection.CreateModel();
        _queueName = queueName;
        _queueDepthThreshold = queueDepthThreshold;

        _channel.QueueDeclare(
            queue: queueName,
            durable: true,
            exclusive: false,
            autoDelete: false);

        _channel.ConfirmSelect();
    }

    public async Task<bool> PublishAsync(byte[] body, CancellationToken ct = default)
    {
        // Check queue depth first
        var queueInfo = _channel.QueueDeclarePassive(_queueName);

        if (queueInfo.MessageCount > _queueDepthThreshold)
        {
            Console.WriteLine($"Queue depth {queueInfo.MessageCount} exceeds threshold - applying backpressure");

            // Wait for queue to drain a bit
            while (queueInfo.MessageCount > _queueDepthThreshold * 0.8)
            {
                await Task.Delay(1000, ct);
                queueInfo = _channel.QueueDeclarePassive(_queueName);
            }
        }

        // Publish with retry
        for (int attempt = 1; attempt <= 3; attempt++)
        {
            try
            {
                var properties = _channel.CreateBasicProperties();
                properties.Persistent = true;

                _channel.BasicPublish(
                    exchange: "",
                    routingKey: _queueName,
                    basicProperties: properties,
                    body: body);

                if (_channel.WaitForConfirms(TimeSpan.FromSeconds(5)))
                {
                    return true;
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Publish attempt {attempt} failed: {ex.Message}");
            }

            if (attempt < 3)
            {
                await Task.Delay(TimeSpan.FromSeconds(Math.Pow(2, attempt)), ct);
            }
        }

        return false;
    }

    public void Dispose()
    {
        _channel?.Dispose();
        _connection?.Dispose();
    }
}

Найкращі вправи

Зберігайте спокій

Не панікуйте, коли черга росте. стабільний Черга, яка росте не дуже швидко.

Глибина спостереження за чергою з часом. Пошукати за моделями, а не знімками. Чергою, яка постійно складається зі 100 повідомлень, достатньо добре. Черга, яка зросла від 100 до 10 000 протягом останньої години, потребує уваги.

Будь прагматичним

Застосовувати шаблони pragmaticly. Не кожне повідомлення потребує підтвердження. Не для кожної з черг потрібна складна обробка зворотного друку. Черга, у якій виконується 10 повідомлень за годину, ймовірно, не потребує такої ж інженерії гнучкості, як і для однієї обробки 10 000 повідомлень за секунду.

Запитайте себе: " Яка реальна вартість, якщо це повідомлення втрачено або його затримано? Якщо відповідь " не надто ," не переінформатор. Якщо відповідь на це питання є " показником впливу на фінансову цілісність або дані," вкладайте кошти у належне керування зворотним тиском.

Масштабувати кмітливо

Коли чергуються, відповідь не завжди "додається більше виробників." Це те саме, що намагатися полагодити затор, додавши більше машин.

Подумайте:

  • Спочатку масштабувати споживачів - Можешь добавить больше рабочих, чтобы обработать белье?
  • Перевірити вузькі місця - Це один повільний спуск залежних від течії, що викликає резервну копію?
  • Пакетна, якщо можливо - споживачі можуть одночасно обробити декілька повідомлень?
flowchart TD
    A[Queue Growing] --> B{Consumer<br/>Saturated?}
    B -->|Yes| C[Add Consumers]
    B -->|No| D{Downstream<br/>Bottleneck?}
    D -->|Yes| E[Fix/Scale Downstream]
    D -->|No| F{Can Batch<br/>Process?}
    F -->|Yes| G[Implement Batching]
    F -->|No| H[Accept Higher Latency<br/>or Reduce Load]

    style C stroke:#10b981,stroke-width:2px
    style E stroke:#f59e0b,stroke-width:2px
    style G stroke:#0ea5e9,stroke-width:2px

Монітор і попередження

Встановити попередження для:

  • Поріг глибини черги, що перевищує поріг
  • Збільшення затримки споживача
  • Видавець підтверджує відлік часу
  • З' єднання заблокувало події

Ви хочете знати, що таке відштовхування. до це стає кризою, а не коли твоя система вже втрачена.

Висновки

Якщо розглядати це як розмову між виробником і споживачем, ви будуєте системи, які залишаються стійкими під тиском.

Ключові факти:

  1. Пригнічення - це відгуки - Виробники і споживачі об'єднуються, щоб знайти екологічну пропускну здатність.
  2. Слідкувати за глибиною черги - Ти не можеш виміряти те, що не можеш.
  3. Використовувати видавець підтверджую - Знаєш, коли брокер бореться
  4. Впровадження експоненційного зворотного зв' язку - Не зламайте систему, яка вже під тиском
  5. Масштабувати споживачів, а не лише виробників - виправити вузьку, а не симптом

Коли ваша система каже "Я ситий, дайте мені хвилину," правильна відповідь: "Жодна проблема, я чекатиму." Це сутність розподілених систем, що розподіляються, популяцій, стійкості.

Залишайтеся спокійними, коли черги ростуть, і пам'ятайте: система під граціозним спинним тиском набагато краща, ніж та, що впала повністю.

Finding related posts...
logo

© 2026 Scott Galloway — Unlicense — All content and source code on this site is free to use, copy, modify, and sell.