This is a viewer only at the moment see the article on how this works.
To update the preview hit Ctrl-Alt-R (or ⌘-Alt-R on Mac) or Enter to refresh. The Save icon lets you save the markdown file to disk
This is a preview from the server running through my markdig pipeline
Wednesday, 15 January 2025
ВхідЧастина 3Ми побудували чудовий візуальний редактор.
Слідкувати за всіма фоновими завданнями
Чому пожежа?
Повіска ідеальна для наших потреб, тому що:
Зберігає завдання у нашій існуючій базі даних PostgreSQL
Надає вбудовану панель приладів
Підтримує регулярні завдання
Відображає горизонтальні масштаби
[Table("workflow_trigger_states")]
public class WorkflowTriggerStateEntity
{
public int Id { get; set; }
public int WorkflowDefinitionId { get; set; }
// Type: "Schedule", "ApiPoll", "Webhook"
public string TriggerType { get; set; } = string.Empty;
// Configuration as JSON
public string ConfigJson { get; set; } = "{}";
// Current state as JSON (stores last poll time, content hash, etc.)
public string StateJson { get; set; } = "{}";
public bool IsEnabled { get; set; } = true;
public DateTime? LastCheckedAt { get; set; }
public DateTime? LastFiredAt { get; set; }
public int FireCount { get; set; } = 0;
public string? LastError { get; set; }
}
Модель стану вмикання
public class ScheduleTriggerConfig
{
public string IntervalType { get; set; } = "minutes"; // minutes, hours, days
public int IntervalValue { get; set; } = 60;
public Dictionary<string, object>? InputData { get; set; }
}
public class WorkflowSchedulerJob
{
private readonly MostlylucidDbContext _context;
private readonly WorkflowExecutionService _executionService;
private readonly ILogger<WorkflowSchedulerJob> _logger;
[AutomaticRetry(Attempts = 3)]
public async Task ExecuteScheduledWorkflowsAsync()
{
_logger.LogInformation("Checking for scheduled workflows");
// Get all enabled schedule triggers
var triggers = await _context.WorkflowTriggerStates
.Include(t => t.WorkflowDefinition)
.Where(t => t.IsEnabled && t.TriggerType == "Schedule")
.ToListAsync();
foreach (var trigger in triggers)
{
try
{
var config = JsonSerializer.Deserialize<ScheduleTriggerConfig>(
trigger.ConfigJson);
if (config == null) continue;
// Check if it's time to run
if (!ShouldRunScheduledWorkflow(trigger, config))
continue;
_logger.LogInformation(
"Executing scheduled workflow {WorkflowId}",
trigger.WorkflowDefinition.WorkflowId);
// Execute the workflow
await _executionService.ExecuteWorkflowAsync(
trigger.WorkflowDefinition.WorkflowId,
config.InputData,
"Scheduler");
// Update trigger state
trigger.LastCheckedAt = DateTime.UtcNow;
trigger.LastFiredAt = DateTime.UtcNow;
trigger.FireCount++;
var state = JsonSerializer.Deserialize<Dictionary<string, object>>(
trigger.StateJson) ?? new();
state["lastRun"] = DateTime.UtcNow.ToString("O");
trigger.StateJson = JsonSerializer.Serialize(state);
await _context.SaveChangesAsync();
}
catch (Exception ex)
{
_logger.LogError(ex,
"Error executing scheduled workflow {TriggerId}",
trigger.Id);
trigger.LastError = ex.Message;
await _context.SaveChangesAsync();
}
}
}
private bool ShouldRunScheduledWorkflow(
WorkflowTriggerStateEntity trigger,
ScheduleTriggerConfig config)
{
// First run?
if (!trigger.LastFiredAt.HasValue)
return true;
var timeSinceLastRun = DateTime.UtcNow - trigger.LastFiredAt.Value;
return config.IntervalType.ToLower() switch
{
"minutes" => timeSinceLastRun.TotalMinutes >= config.IntervalValue,
"hours" => timeSinceLastRun.TotalHours >= config.IntervalValue,
"days" => timeSinceLastRun.TotalDays >= config.IntervalValue,
_ => false
};
}
}
Модель налаштування
ExecuteScheduledWorkflowsAsync()Оновити стан вмикання під час останнього запуску
public class ApiPollTriggerConfig
{
public string Url { get; set; } = string.Empty;
public int IntervalSeconds { get; set; } = 300; // 5 minutes
public bool AlwaysTrigger { get; set; } = false;
public Dictionary<string, string>? Headers { get; set; }
}
[AutomaticRetry(Attempts = 3)]
public async Task PollApiTriggersAsync()
{
_logger.LogInformation("Polling API triggers");
var triggers = await _context.WorkflowTriggerStates
.Include(t => t.WorkflowDefinition)
.Where(t => t.IsEnabled && t.TriggerType == "ApiPoll")
.ToListAsync();
foreach (var trigger in triggers)
{
try
{
var config = JsonSerializer.Deserialize<ApiPollTriggerConfig>(
trigger.ConfigJson);
if (config == null) continue;
// Check if it's time to poll
if (trigger.LastCheckedAt.HasValue)
{
var timeSinceLastCheck = DateTime.UtcNow - trigger.LastCheckedAt.Value;
if (timeSinceLastCheck.TotalSeconds < config.IntervalSeconds)
continue;
}
_logger.LogInformation("Polling API for workflow {WorkflowId}",
trigger.WorkflowDefinition.WorkflowId);
// Poll the API
using var httpClient = new HttpClient();
var response = await httpClient.GetAsync(config.Url);
var content = await response.Content.ReadAsStringAsync();
// Get previous state
var state = JsonSerializer.Deserialize<Dictionary<string, object>>(
trigger.StateJson) ?? new();
var previousHash = state.GetValueOrDefault("contentHash")?.ToString();
var currentHash = ComputeHash(content);
// Has content changed?
if (previousHash != currentHash || config.AlwaysTrigger)
{
_logger.LogInformation(
"API content changed, triggering workflow {WorkflowId}",
trigger.WorkflowDefinition.WorkflowId);
// Pass response as input to workflow
var inputData = new Dictionary<string, object>
{
["apiResponse"] = content,
["statusCode"] = (int)response.StatusCode,
["previousHash"] = previousHash ?? string.Empty,
["currentHash"] = currentHash
};
// Execute the workflow
await _executionService.ExecuteWorkflowAsync(
trigger.WorkflowDefinition.WorkflowId,
inputData,
$"ApiPoll:{config.Url}");
trigger.LastFiredAt = DateTime.UtcNow;
trigger.FireCount++;
// Update state
state["contentHash"] = currentHash;
state["lastContent"] = content.Length > 1000
? content.Substring(0, 1000)
: content;
state["lastPoll"] = DateTime.UtcNow.ToString("O");
}
trigger.LastCheckedAt = DateTime.UtcNow;
trigger.StateJson = JsonSerializer.Serialize(state);
trigger.LastError = null;
await _context.SaveChangesAsync();
}
catch (Exception ex)
{
_logger.LogError(ex, "Error polling API trigger {TriggerId}",
trigger.Id);
trigger.LastError = ex.Message;
trigger.LastCheckedAt = DateTime.UtcNow;
await _context.SaveChangesAsync();
}
}
}
private string ComputeHash(string content)
{
using var sha256 = System.Security.Cryptography.SHA256.Create();
var bytes = System.Text.Encoding.UTF8.GetBytes(content);
var hash = sha256.ComputeHash(bytes);
return Convert.ToBase64String(hash);
}
Модель налаштування
AlwaysTriggerПорівняти з попереднім хешом, збереженим у станіОновити стан новим хешом
{
"triggerType": "ApiPoll",
"config": {
"url": "https://api.github.com/repos/dotnet/aspnetcore/releases/latest",
"intervalSeconds": 3600,
"alwaysTrigger": false
}
}
Приклад випадків використання
Це опитує API GitHub щогодини.Program.csПісля опублікування нового випуску вміст хеш змінюється, а процес виконується з даними випуску!
// Add Hangfire services
builder.Services.AddHangfire(config =>
{
config.UsePostgreSqlStorage(
builder.Configuration.GetConnectionString("DefaultConnection"));
});
builder.Services.AddHangfireServer();
// Register our job
builder.Services.AddScoped<WorkflowSchedulerJob>();
Зареєстровані роботи повіски
app.UseHangfireDashboard("/hangfire");
// Register recurring jobs
RecurringJob.AddOrUpdate<WorkflowSchedulerJob>(
"scheduled-workflows",
job => job.ExecuteScheduledWorkflowsAsync(),
Cron.Minutely);
RecurringJob.AddOrUpdate<WorkflowSchedulerJob>(
"api-poll-triggers",
job => job.PollApiTriggersAsync(),
Cron.Minutely);
або налаштування запуску:/hangfire:
app.UseHangfireDashboard("/hangfire", new DashboardOptions
{
Authorization = new[]
{
new HangfireAuthorizationFilter()
}
});
public class HangfireAuthorizationFilter : IDashboardAuthorizationFilter
{
public bool Authorize(DashboardContext context)
{
var httpContext = context.GetHttpContext();
// Only allow authenticated users
return httpContext.User.Identity?.IsAuthenticated == true;
}
}
: Слідкувати за серверами Hangfire
[HttpPost("workflow/{id}/triggers")]
public async Task<IActionResult> CreateTrigger(
string id,
[FromBody] TriggerCreateRequest request)
{
var workflow = await _context.WorkflowDefinitions
.FirstOrDefaultAsync(w => w.WorkflowId == id);
if (workflow == null)
return NotFound();
var trigger = new WorkflowTriggerStateEntity
{
WorkflowDefinitionId = workflow.Id,
TriggerType = request.Type,
ConfigJson = JsonSerializer.Serialize(request.Config),
StateJson = "{}",
IsEnabled = true
};
await _context.WorkflowTriggerStates.AddAsync(trigger);
await _context.SaveChangesAsync();
return Json(new { success = true, triggerId = trigger.Id });
}
public class TriggerCreateRequest
{
public string Type { get; set; } = string.Empty; // Schedule, ApiPoll
public object Config { get; set; } = new();
}
<div class="card bg-base-100 shadow-xl">
<div class="card-body">
<h2 class="card-title">⏰ Add Trigger</h2>
<div class="form-control">
<label class="label">Trigger Type</label>
<select class="select select-bordered" x-model="triggerType">
<option value="Schedule">Schedule</option>
<option value="ApiPoll">API Poll</option>
</select>
</div>
<!-- Schedule Config -->
<template x-if="triggerType === 'Schedule'">
<div class="space-y-4">
<div class="form-control">
<label class="label">Interval</label>
<div class="flex gap-2">
<input type="number"
x-model="scheduleConfig.intervalValue"
class="input input-bordered flex-1" />
<select x-model="scheduleConfig.intervalType"
class="select select-bordered">
<option value="minutes">Minutes</option>
<option value="hours">Hours</option>
<option value="days">Days</option>
</select>
</div>
</div>
</div>
</template>
<!-- API Poll Config -->
<template x-if="triggerType === 'ApiPoll'">
<div class="space-y-4">
<div class="form-control">
<label class="label">API URL</label>
<input type="url"
x-model="apiConfig.url"
class="input input-bordered"
placeholder="https://api.example.com/data" />
</div>
<div class="form-control">
<label class="label">Poll Interval (seconds)</label>
<input type="number"
x-model="apiConfig.intervalSeconds"
class="input input-bordered"
value="300" />
</div>
</div>
</template>
<button @click="createTrigger()" class="btn btn-primary mt-4">
Create Trigger
</button>
</div>
</div>
Давайте додамо UI для створення і керування сигналами:
{
"name": "GitHub Release Monitor",
"startNodeId": "parse-data",
"nodes": [
{
"id": "parse-data",
"type": "Transform",
"name": "Extract Version",
"inputs": {
"operation": "json_parse",
"data": "{{apiResponse}}"
}
},
{
"id": "log-release",
"type": "Log",
"name": "Log New Release",
"inputs": {
"message": "New release: {{tag_name}} - {{name}}",
"level": "info"
}
}
],
"connections": [
{
"sourceNodeId": "parse-data",
"targetNodeId": "log-release"
}
]
}
{
"type": "ApiPoll",
"config": {
"url": "https://api.github.com/repos/dotnet/aspnetcore/releases/latest",
"intervalSeconds": 3600
}
}
(Можливо, надішліть електронну пошту, надішліть повідомлення Slack тощо)
Процес аналізує JSON і записує інформацію про випуск.
_logger.LogInformation(
"Workflow {WorkflowId} execution {ExecutionId} completed in {Duration}ms with status {Status}",
execution.WorkflowId,
execution.Id,
execution.DurationMs,
execution.Status);
Журналювання
private static readonly Counter WorkflowExecutions = Metrics
.CreateCounter("workflow_executions_total",
"Total workflow executions",
new CounterConfiguration
{
LabelNames = new[] { "workflow_id", "status" }
});
// In execution service
WorkflowExecutions
.WithLabels(workflow.Id, execution.Status.ToString())
.Inc();
Метрики
Ефекти, які не були звільнені під час очікуваного часового блоку
Обмірковування швидкодії
// Instead of querying per trigger
var triggers = await _context.WorkflowTriggerStates
.Include(t => t.WorkflowDefinition)
.Where(t => t.IsEnabled && t.TriggerType == "ApiPoll")
.AsNoTracking() // Read-only
.ToListAsync();
Оскільки часто проводиться опитування багатьох робочих даних, завантаження бази даних може бути значним:
Вирішення: пакетні запити
catch (HttpRequestException ex) when (ex.StatusCode == HttpStatusCode.TooManyRequests)
{
// Back off
var retryAfter = response.Headers.RetryAfter?.Delta ?? TimeSpan.FromMinutes(5);
state["backoffUntil"] = DateTime.UtcNow.Add(retryAfter).ToString("O");
}
Вирішення: Expential зворотний зв' язок
public class ConditionalTriggerConfig : ApiPollTriggerConfig
{
public string? Condition { get; set; } // e.g., "{{stars}} > 1000"
}
Умовні наслідки
// After workflow completes
if (execution.Status == WorkflowExecutionStatus.Completed)
{
var dependentTriggers = await _context.WorkflowTriggerStates
.Where(t => t.TriggerType == "WorkflowComplete" &&
t.ConfigJson.Contains(execution.WorkflowId))
.ToListAsync();
foreach (var trigger in dependentTriggers)
{
await _executionService.ExecuteWorkflowAsync(
trigger.WorkflowDefinition.WorkflowId,
execution.OutputData,
$"Triggered by {execution.WorkflowId}");
}
}
Залежності вмикання
[Fact]
public async Task ExecuteScheduledWorkflows_ShouldExecuteWhenIntervalPassed()
{
// Arrange
var mockContext = CreateMockContext();
var mockExecutionService = new Mock<IWorkflowExecutionService>();
var job = new WorkflowSchedulerJob(mockContext.Object,
mockExecutionService.Object, Mock.Of<ILogger>());
// Act
await job.ExecuteScheduledWorkflowsAsync();
// Assert
mockExecutionService.Verify(s => s.ExecuteWorkflowAsync(
It.IsAny<string>(),
It.IsAny<Dictionary<string, object>>(),
"Scheduler",
It.IsAny<CancellationToken>()), Times.Once);
}
Перевірка завдань з повіскою
✅ **Одиниця Перевірити ваші завдання:**Висновки ✅ **Ми побудували повну систему автоматизації!**Наш робочий потік тепер може: ✅ Запустити на розкладах- Щогодини, щодня або через певні проміжки часу. ✅ Опитування APILImage/ info menu item (should be translated)- Слідкувати за зовнішніми службами за змінами ✅ Стан доріжки- Пам'ятай, що ми бачили раніше. ✅ Автоповторення- Керуйте транзитними помилками
Дошка для всіх завдань
Масштаб- Балансування вантажу за допомогою Hangfire
Повна серіяМи побудували робочу систему із самого початку:
Частина 1: Вступ і архітектура
Частина 2: Основний рушій потоку
Частина 3
Автоматизоване виконання
: Скомпонувати робочі місця разом
Mostlylucid.SchedulerService/Jobs/Mostlylucid.Workflow.Shared/Mostlylucid.Workflow.Engine/Thank you for following this series! Happy workflow building! 🎉
© 2026 Scott Galloway — Unlicense — All content and source code on this site is free to use, copy, modify, and sell.