Burpresserure - це несуттєвий герой розподілених систем. Це те, що утримує ваші черги від вибуху на швах, коли виробники виголошують повідомлення швидше, ніж споживачі можуть їх жувати. Просто кажучи: це система, яка каже " затримайте мить ," коли речі стають занадто зайнятими.
Справа ось в чому: технічні прийоми, які містяться в цій статті, стосуються майже всіх методів. кожні У черзі повідомлень і службовому автобусі }RabbitMQ, Kafka, Azure Service Bus, AWS SQS, NATS, ви називаєте його. Специфічні особливості відрізняються, але принципи є універсальними. Я використовуватиму RubleMQ для більшості прикладів, тому що це те, що я знаю найкраще, але я покажу вам, як ці шаблони перекладаються на різних платформах.
Сповідь: Навіть більшість розробників похилого віку не реалізують належного механізму репресування, вони створюють системи радісних шляхів, які чудово працюють у дискотеці, а потім замислюються над тим, чому виробництво падає протягом Чорної п'ятниці.
При його ядрі, зворотний тиск - це петля зворотного зв'язку, яка сповільнює виробників, коли споживачі відстають.
Без зворотного друку, швидкий виробник перехопить повільного споживача. Повідомлення скупчуються у черзі, пам' ять виснажується, і, зрештою, ваша система відпаде.
flowchart LR
P[Producer] --> Q[Queue]
Q --> C[Consumer]
C -. "Slow down!" .-> P
style P stroke:#f59e0b,stroke-width:2px
style Q stroke:#0ea5e9,stroke-width:2px
style C stroke:#10b981,stroke-width:2px
Краса відштовхування це те, що це - розмова Споживачі сигналі "Я ситий, дай мені хвилину," і виробник відповідає: "Нема проблем, я зачекаю."
КроликМК має декілька вбудованих механізмів для роботи з спинного друку, і розуміння їх має вирішальне значення, якщо ви будуєте системи, які повинні залишатися прямо під навантаженням. Документація з CrootMQ Чудово. Я з'єднаюся з конкретними сторінками.
Якщо використання пам' яті КроликMQ або глибина черги перевищить налаштовані порції, буде задіяно керування потоком. Цей тимчасовий блок з' єднань для вісників не може надсилати нових повідомлень, поки брокер не вичистить достатньо зворотного журналу. Див. також docs on нагадування для пам' яті@ info: whatsthis і нагадування на диску@ info: whatsthis який активує керування потоком.
flowchart TD
subgraph "RabbitMQ Flow Control"
A[Publisher Sends Message] --> B{Memory/Queue<br/>Threshold OK?}
B -->|Yes| C[Message Accepted]
C --> D[Add to Queue]
B -->|No| E[Connection Blocked]
E --> F[Publisher Waits]
F --> G{Threshold<br/>Cleared?}
G -->|No| F
G -->|Yes| H[Connection Unblocked]
H --> A
end
style E stroke:#ef4444,stroke-width:3px
style H stroke:#10b981,stroke-width:2px
Ключовим є те, що КроликMQ не просто скидає повідомлення, коли під тиском, він сповільнює джерело.
Споживачі контролюють темп. підтвердження (АКЛ). Повідомлення не буде вилучено з черги, поки споживач не визнає його явно. Якщо споживач не надсилає повідомлень ACK достатньо швидко, черга росте, що врешті- решт призводить до керування потоком вгору за течією.
Ви також можете використовувати границі перед випередженням (QOS) для керування кількістю непізнаних повідомлень, які може одночасно мати споживач у польоті. Таким чином можна запобігти одному повільному споживачу зберігати повідомлення.
// Set prefetch count to limit unacknowledged messages
channel.BasicQos(prefetchSize: 0, prefetchCount: 10, global: false);
Це каже КроликМК: "Тільки відправте мені 10 повідомлень за раз. Як тільки я пропущу деякі, ви можете надіслати більше ." Це споживач чітко вказує на те, скільки тиску він може витримати. Документація клієнта. NET докладно описано API.
Шаблони універсальні, але реалізація різниться. Ось як деякі інші популярні системи обміну повідомленнями підходять до тієї ж проблеми.
Кафка займає фундаментально різні підходи до людей, які мають справу з консюмерами. pull повідомлення замість того, щоб їх натискати. Ця дія робить зворотний тиск неявним: якщо споживач не опитує, він не отримуватиме повідомлень. брокерові байдуже, він просто зберігає повідомлення навколо, доки споживач не буде готовий.
// Kafka consumer with explicit backpressure control
using var consumer = new ConsumerBuilder<string, string>(config).Build();
consumer.Subscribe("orders");
while (!cancellationToken.IsCancellationRequested)
{
// Only fetch what you can handle - this IS your backpressure
var result = consumer.Consume(timeout: TimeSpan.FromSeconds(1));
if (result != null)
{
await ProcessMessageAsync(result.Message.Value);
// Manual commit = explicit acknowledgement
consumer.Commit(result);
}
// If processing is slow, you simply poll less frequently
// Kafka doesn't push more messages at you
}
Розумний біт: споживчі групи Кафки автоматично зрівноважують розділи. Якщо один з споживачів відпаде, ви зможете додати більше споживачів до групи, а розділи буде перерозподілено. Відштовхування стає прийняттям рішення щодо масштабування.
// Control batch size to manage memory pressure
var config = new ConsumerConfig
{
BootstrapServers = "localhost:9092",
GroupId = "order-processors",
AutoOffsetReset = AutoOffsetReset.Earliest,
MaxPollIntervalMs = 300000, // 5 mins max between polls
MaxPartitionFetchBytes = 1048576, // 1MB max per partition fetch
FetchMaxBytes = 52428800 // 50MB max total fetch
};
Azure Service Bus використовує a MaxConcurrentCalls Параметр, який є дуже простим, визначає кількість повідомлень, якими процесор керує одночасно. Зворотний тиск є автоматичним.
var processor = client.CreateProcessor("orders-queue", new ServiceBusProcessorOptions
{
// This IS your backpressure - only process 10 at a time
MaxConcurrentCalls = 10,
AutoCompleteMessages = false,
PrefetchCount = 20 // Buffer 20 messages locally
});
processor.ProcessMessageAsync += async args =>
{
try
{
await ProcessOrderAsync(args.Message.Body.ToString());
await args.CompleteMessageAsync(args.Message);
}
catch (Exception ex)
{
// Abandon returns message to queue for retry
await args.AbandonMessageAsync(args.Message);
}
};
processor.ProcessErrorAsync += args =>
{
Console.WriteLine($"Error: {args.Exception.Message}");
return Task.CompletedTask;
};
await processor.StartProcessingAsync();
Service Bus Azure також підтримує сеанси для впорядкованої обробки і чергові lead- letter Для того, щоб знову й знову передавати інформацію, яка не відповідає дійсності, важливо, коли щось іде не так.
SQS використовує час очікування для зворотного друку. Коли ви отримуєте повідомлення, він стає невидимим для інших споживачів. Якщо ви не вилучите його вчасно, програма знову з' явиться для того, щоб хтось інший спробував.
var sqsClient = new AmazonSQSClient();
// Receive with explicit backpressure control
var response = await sqsClient.ReceiveMessageAsync(new ReceiveMessageRequest
{
QueueUrl = queueUrl,
MaxNumberOfMessages = 10, // Batch size = backpressure control
WaitTimeSeconds = 20, // Long polling
VisibilityTimeout = 300 // 5 mins to process before retry
});
foreach (var message in response.Messages)
{
try
{
await ProcessAsync(message.Body);
// Only delete after successful processing
await sqsClient.DeleteMessageAsync(queueUrl, message.ReceiptHandle);
}
catch
{
// Don't delete - message will become visible again after timeout
// Optionally, change visibility timeout to retry sooner
await sqsClient.ChangeMessageVisibilityAsync(queueUrl,
message.ReceiptHandle, visibilityTimeout: 0);
}
}
Розумний трюк SQS: використання ApproximateNumberOfMessages для спостереження за глибиною черги і клієнтами автомасштабів:
var attributes = await sqsClient.GetQueueAttributesAsync(new GetQueueAttributesRequest
{
QueueUrl = queueUrl,
AttributeNames = new List<string> { "ApproximateNumberOfMessages" }
});
var depth = int.Parse(attributes.Attributes["ApproximateNumberOfMessages"]);
if (depth > 1000)
{
// Signal to scale up consumers
await TriggerAutoScalingAsync();
}
NATS JetStream має явне керування потоком з прийомами споживачів і максимальне обмеження на кількість повідомлень:
var js = connection.CreateJetStreamContext();
var subscription = js.PushSubscribeAsync("orders.>", (sender, args) =>
{
try
{
ProcessMessage(args.Message.Data);
args.Message.Ack();
}
catch
{
args.Message.Nak(); // Negative ack - redeliver
}
}, new PushSubscribeOptions.Builder()
.WithConfiguration(new ConsumerConfiguration.Builder()
.WithMaxAckPending(100) // Max unacked messages - THIS is backpressure
.WithAckWait(30000) // 30 seconds to ack
.Build())
.Build());
Зауважте, що спільного мають ці системи:
Синтаксис відрізняється, але танець однаковий: "Ось з чим я можу впоратися, скажи мені, коли я це зробив. Якщо я не скажу тобі вчасно, пригадайте, що я невдала."
Так, давайте перейдемо до коду. Ось приклади впровадження і реагування на зворотний тиск у ваших програмах C#.
Ось як перевірити кількість повідомлень, що чекають у черзі:
var queue = channel.QueueDeclare(
queue: "tasks",
durable: true,
exclusive: false,
autoDelete: false);
Console.WriteLine($"Messages ready: {queue.MessageCount}");
// React to queue depth
if (queue.MessageCount > 1000)
{
Console.WriteLine("Queue backing up - consider throttling producers");
}
Цей фрагмент перевіряє кількість повідомлень, які чекають. Якщо рахунок піднімається, це ваш вибір виробникам газів або збільшити кількість споживачів. Не переймайтеся тим, що це надто часте перевірка на стан здоров'я, зазвичай, достатньо.
Видавець підтверджує, що вам відомо, коли КроликMQ успішно отримав і оприлюднив ваше повідомлення. Якщо підтвердження уповільнюються, це чіткий сигнал відштовхування:
// Enable publisher confirms
channel.ConfirmSelect();
var body = Encoding.UTF8.GetBytes("Hello, Queue!");
channel.BasicPublish(
exchange: "",
routingKey: "tasks",
basicProperties: null,
body: body);
// Wait for confirmation - timeout indicates backpressure
bool confirmed = channel.WaitForConfirms(TimeSpan.FromSeconds(5));
if (!confirmed)
{
Console.WriteLine("Message not confirmed - broker may be under pressure");
}
Якщо КроликМК бореться, підтвердження вимагають більше часу або більше часу. Ваш виробник може використати цей сигнал, щоб вимкнути його, замість того, щоб підсилити більше тисків.
Для сценаріїв високого рівня вам потрібні асинхронні підтвердження:
channel.ConfirmSelect();
var outstandingConfirms = new ConcurrentDictionary<ulong, string>();
channel.BasicAcks += (sender, ea) =>
{
if (ea.Multiple)
{
var confirmed = outstandingConfirms.Where(k => k.Key <= ea.DeliveryTag);
foreach (var entry in confirmed)
{
outstandingConfirms.TryRemove(entry.Key, out _);
}
}
else
{
outstandingConfirms.TryRemove(ea.DeliveryTag, out _);
}
};
channel.BasicNacks += (sender, ea) =>
{
// Message was rejected - implement retry logic
Console.WriteLine($"Message {ea.DeliveryTag} was nacked - broker under pressure");
// Back off before retrying
};
Коли ви помічаєте зворотний тиск, найгірша річ, яку ви можете зробити, - це негайно спробувати з повною швидкістю. Це те саме, що відповісти на затор, натиснувши акселератор сильніше. Замість цього, реалізуйте експоненційний зворотний зв' язок:
public async Task PublishWithBackpressureAsync(
IModel channel,
byte[] body,
int maxRetries = 5)
{
int attempt = 0;
while (attempt < maxRetries)
{
try
{
channel.ConfirmSelect();
channel.BasicPublish(
exchange: "",
routingKey: "tasks",
basicProperties: null,
body: body);
if (channel.WaitForConfirms(TimeSpan.FromSeconds(5)))
{
return; // Success
}
throw new Exception("Publish not confirmed");
}
catch (Exception ex)
{
attempt++;
if (attempt >= maxRetries)
{
throw new Exception($"Failed to publish after {maxRetries} attempts", ex);
}
// Exponential backoff: 1s, 2s, 4s, 8s, 16s
var delay = TimeSpan.FromSeconds(Math.Pow(2, attempt - 1));
Console.WriteLine($"Backpressure detected - retry {attempt} after {delay}");
await Task.Delay(delay);
}
}
}
Ця програма імітує модель HTTP 429 (у багатьох запитах). Замість того, щоб перебивати брокера, ми призупиняємо спробу, надаючи системному часу для відновлення.
Якщо ви створюєте внутрішній трубопровід (продуктор → процесор → у всіх межах Вашої програми), то це означає, що ви можете створити його.NET's Channel<T> надає елегантну підтримку спинного друку:
// Create a bounded channel - backpressure is automatic
var channel = Channel.CreateBounded<WorkItem>(new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait // Block producer when full
});
// Producer - will automatically wait when channel is full
async Task ProduceAsync(ChannelWriter<WorkItem> writer)
{
for (int i = 0; i < 10000; i++)
{
var item = new WorkItem { Id = i };
// This awaits if the channel is at capacity
await writer.WriteAsync(item);
Console.WriteLine($"Produced item {i}");
}
writer.Complete();
}
// Consumer - processes at its own pace
async Task ConsumeAsync(ChannelReader<WorkItem> reader)
{
await foreach (var item in reader.ReadAllAsync())
{
// Simulate slow processing
await Task.Delay(100);
Console.WriteLine($"Processed item {item.Id}");
}
}
// Run both concurrently
await Task.WhenAll(
ProduceAsync(channel.Writer),
ConsumeAsync(channel.Reader)
);
Обмежений канал автоматично застосовує зворотні натискувальні лапки виробника, коли канал переповнений, природно сповільнюється, щоб відповідати темпу споживача.
Ось більш вичерпний приклад, який об'єднує спостереження, підтвердження і зворотний зв'язок:
public class BackpressureAwarePublisher : IDisposable
{
private readonly IConnection _connection;
private readonly IModel _channel;
private readonly string _queueName;
private readonly int _queueDepthThreshold;
public BackpressureAwarePublisher(
string hostName,
string queueName,
int queueDepthThreshold = 1000)
{
var factory = new ConnectionFactory { HostName = hostName };
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_queueName = queueName;
_queueDepthThreshold = queueDepthThreshold;
_channel.QueueDeclare(
queue: queueName,
durable: true,
exclusive: false,
autoDelete: false);
_channel.ConfirmSelect();
}
public async Task<bool> PublishAsync(byte[] body, CancellationToken ct = default)
{
// Check queue depth first
var queueInfo = _channel.QueueDeclarePassive(_queueName);
if (queueInfo.MessageCount > _queueDepthThreshold)
{
Console.WriteLine($"Queue depth {queueInfo.MessageCount} exceeds threshold - applying backpressure");
// Wait for queue to drain a bit
while (queueInfo.MessageCount > _queueDepthThreshold * 0.8)
{
await Task.Delay(1000, ct);
queueInfo = _channel.QueueDeclarePassive(_queueName);
}
}
// Publish with retry
for (int attempt = 1; attempt <= 3; attempt++)
{
try
{
var properties = _channel.CreateBasicProperties();
properties.Persistent = true;
_channel.BasicPublish(
exchange: "",
routingKey: _queueName,
basicProperties: properties,
body: body);
if (_channel.WaitForConfirms(TimeSpan.FromSeconds(5)))
{
return true;
}
}
catch (Exception ex)
{
Console.WriteLine($"Publish attempt {attempt} failed: {ex.Message}");
}
if (attempt < 3)
{
await Task.Delay(TimeSpan.FromSeconds(Math.Pow(2, attempt)), ct);
}
}
return false;
}
public void Dispose()
{
_channel?.Dispose();
_connection?.Dispose();
}
}
Не панікуйте, коли черга росте. стабільний Черга, яка росте не дуже швидко.
Глибина спостереження за чергою з часом. Пошукати за моделями, а не знімками. Чергою, яка постійно складається зі 100 повідомлень, достатньо добре. Черга, яка зросла від 100 до 10 000 протягом останньої години, потребує уваги.
Застосовувати шаблони pragmaticly. Не кожне повідомлення потребує підтвердження. Не для кожної з черг потрібна складна обробка зворотного друку. Черга, у якій виконується 10 повідомлень за годину, ймовірно, не потребує такої ж інженерії гнучкості, як і для однієї обробки 10 000 повідомлень за секунду.
Запитайте себе: " Яка реальна вартість, якщо це повідомлення втрачено або його затримано? Якщо відповідь " не надто ," не переінформатор. Якщо відповідь на це питання є " показником впливу на фінансову цілісність або дані," вкладайте кошти у належне керування зворотним тиском.
Коли чергуються, відповідь не завжди "додається більше виробників." Це те саме, що намагатися полагодити затор, додавши більше машин.
Подумайте:
flowchart TD
A[Queue Growing] --> B{Consumer<br/>Saturated?}
B -->|Yes| C[Add Consumers]
B -->|No| D{Downstream<br/>Bottleneck?}
D -->|Yes| E[Fix/Scale Downstream]
D -->|No| F{Can Batch<br/>Process?}
F -->|Yes| G[Implement Batching]
F -->|No| H[Accept Higher Latency<br/>or Reduce Load]
style C stroke:#10b981,stroke-width:2px
style E stroke:#f59e0b,stroke-width:2px
style G stroke:#0ea5e9,stroke-width:2px
Встановити попередження для:
Ви хочете знати, що таке відштовхування. до це стає кризою, а не коли твоя система вже втрачена.
Якщо розглядати це як розмову між виробником і споживачем, ви будуєте системи, які залишаються стійкими під тиском.
Ключові факти:
Коли ваша система каже "Я ситий, дайте мені хвилину," правильна відповідь: "Жодна проблема, я чекатиму." Це сутність розподілених систем, що розподіляються, популяцій, стійкості.
Залишайтеся спокійними, коли черги ростуть, і пам'ятайте: система під граціозним спинним тиском набагато краща, ніж та, що впала повністю.
© 2026 Scott Galloway — Unlicense — All content and source code on this site is free to use, copy, modify, and sell.