In Part 3, we built a beautiful visual editor. But our workflows only run when we manually trigger them. In this final post, we'll make workflows truly autonomous using Hangfire for:
Hangfire is perfect for our needs because it:
First, let's understand our trigger state entity (we already created this in Part 2):
[Table("workflow_trigger_states")]
public class WorkflowTriggerStateEntity
{
public int Id { get; set; }
public int WorkflowDefinitionId { get; set; }
// Type: "Schedule", "ApiPoll", "Webhook"
public string TriggerType { get; set; } = string.Empty;
// Configuration as JSON
public string ConfigJson { get; set; } = "{}";
// Current state as JSON (stores last poll time, content hash, etc.)
public string StateJson { get; set; } = "{}";
public bool IsEnabled { get; set; } = true;
public DateTime? LastCheckedAt { get; set; }
public DateTime? LastFiredAt { get; set; }
public int FireCount { get; set; } = 0;
public string? LastError { get; set; }
}
This entity tracks everything about a workflow trigger:
public class ScheduleTriggerConfig
{
public string IntervalType { get; set; } = "minutes"; // minutes, hours, days
public int IntervalValue { get; set; } = 60;
public Dictionary<string, object>? InputData { get; set; }
}
public class WorkflowSchedulerJob
{
private readonly MostlylucidDbContext _context;
private readonly WorkflowExecutionService _executionService;
private readonly ILogger<WorkflowSchedulerJob> _logger;
[AutomaticRetry(Attempts = 3)]
public async Task ExecuteScheduledWorkflowsAsync()
{
_logger.LogInformation("Checking for scheduled workflows");
// Get all enabled schedule triggers
var triggers = await _context.WorkflowTriggerStates
.Include(t => t.WorkflowDefinition)
.Where(t => t.IsEnabled && t.TriggerType == "Schedule")
.ToListAsync();
foreach (var trigger in triggers)
{
try
{
var config = JsonSerializer.Deserialize<ScheduleTriggerConfig>(
trigger.ConfigJson);
if (config == null) continue;
// Check if it's time to run
if (!ShouldRunScheduledWorkflow(trigger, config))
continue;
_logger.LogInformation(
"Executing scheduled workflow {WorkflowId}",
trigger.WorkflowDefinition.WorkflowId);
// Execute the workflow
await _executionService.ExecuteWorkflowAsync(
trigger.WorkflowDefinition.WorkflowId,
config.InputData,
"Scheduler");
// Update trigger state
trigger.LastCheckedAt = DateTime.UtcNow;
trigger.LastFiredAt = DateTime.UtcNow;
trigger.FireCount++;
var state = JsonSerializer.Deserialize<Dictionary<string, object>>(
trigger.StateJson) ?? new();
state["lastRun"] = DateTime.UtcNow.ToString("O");
trigger.StateJson = JsonSerializer.Serialize(state);
await _context.SaveChangesAsync();
}
catch (Exception ex)
{
_logger.LogError(ex,
"Error executing scheduled workflow {TriggerId}",
trigger.Id);
trigger.LastError = ex.Message;
await _context.SaveChangesAsync();
}
}
}
private bool ShouldRunScheduledWorkflow(
WorkflowTriggerStateEntity trigger,
ScheduleTriggerConfig config)
{
// First run?
if (!trigger.LastFiredAt.HasValue)
return true;
var timeSinceLastRun = DateTime.UtcNow - trigger.LastFiredAt.Value;
return config.IntervalType.ToLower() switch
{
"minutes" => timeSinceLastRun.TotalMinutes >= config.IntervalValue,
"hours" => timeSinceLastRun.TotalHours >= config.IntervalValue,
"days" => timeSinceLastRun.TotalDays >= config.IntervalValue,
_ => false
};
}
}
How It Works:
ExecuteScheduledWorkflowsAsync()API polling is more interesting - we monitor external APIs and trigger workflows when content changes!
public class ApiPollTriggerConfig
{
public string Url { get; set; } = string.Empty;
public int IntervalSeconds { get; set; } = 300; // 5 minutes
public bool AlwaysTrigger { get; set; } = false;
public Dictionary<string, string>? Headers { get; set; }
}
[AutomaticRetry(Attempts = 3)]
public async Task PollApiTriggersAsync()
{
_logger.LogInformation("Polling API triggers");
var triggers = await _context.WorkflowTriggerStates
.Include(t => t.WorkflowDefinition)
.Where(t => t.IsEnabled && t.TriggerType == "ApiPoll")
.ToListAsync();
foreach (var trigger in triggers)
{
try
{
var config = JsonSerializer.Deserialize<ApiPollTriggerConfig>(
trigger.ConfigJson);
if (config == null) continue;
// Check if it's time to poll
if (trigger.LastCheckedAt.HasValue)
{
var timeSinceLastCheck = DateTime.UtcNow - trigger.LastCheckedAt.Value;
if (timeSinceLastCheck.TotalSeconds < config.IntervalSeconds)
continue;
}
_logger.LogInformation("Polling API for workflow {WorkflowId}",
trigger.WorkflowDefinition.WorkflowId);
// Poll the API
using var httpClient = new HttpClient();
var response = await httpClient.GetAsync(config.Url);
var content = await response.Content.ReadAsStringAsync();
// Get previous state
var state = JsonSerializer.Deserialize<Dictionary<string, object>>(
trigger.StateJson) ?? new();
var previousHash = state.GetValueOrDefault("contentHash")?.ToString();
var currentHash = ComputeHash(content);
// Has content changed?
if (previousHash != currentHash || config.AlwaysTrigger)
{
_logger.LogInformation(
"API content changed, triggering workflow {WorkflowId}",
trigger.WorkflowDefinition.WorkflowId);
// Pass response as input to workflow
var inputData = new Dictionary<string, object>
{
["apiResponse"] = content,
["statusCode"] = (int)response.StatusCode,
["previousHash"] = previousHash ?? string.Empty,
["currentHash"] = currentHash
};
// Execute the workflow
await _executionService.ExecuteWorkflowAsync(
trigger.WorkflowDefinition.WorkflowId,
inputData,
$"ApiPoll:{config.Url}");
trigger.LastFiredAt = DateTime.UtcNow;
trigger.FireCount++;
// Update state
state["contentHash"] = currentHash;
state["lastContent"] = content.Length > 1000
? content.Substring(0, 1000)
: content;
state["lastPoll"] = DateTime.UtcNow.ToString("O");
}
trigger.LastCheckedAt = DateTime.UtcNow;
trigger.StateJson = JsonSerializer.Serialize(state);
trigger.LastError = null;
await _context.SaveChangesAsync();
}
catch (Exception ex)
{
_logger.LogError(ex, "Error polling API trigger {TriggerId}",
trigger.Id);
trigger.LastError = ex.Message;
trigger.LastCheckedAt = DateTime.UtcNow;
await _context.SaveChangesAsync();
}
}
}
private string ComputeHash(string content)
{
using var sha256 = System.Security.Cryptography.SHA256.Create();
var bytes = System.Text.Encoding.UTF8.GetBytes(content);
var hash = sha256.ComputeHash(bytes);
return Convert.ToBase64String(hash);
}
How It Works:
AlwaysTrigger is true), execute the workflowMonitor GitHub Releases:
{
"triggerType": "ApiPoll",
"config": {
"url": "https://api.github.com/repos/dotnet/aspnetcore/releases/latest",
"intervalSeconds": 3600,
"alwaysTrigger": false
}
}
This polls the GitHub API every hour. When a new release is published, the content hash changes, and the workflow executes with the release data!
In your Program.cs or startup configuration:
// Add Hangfire services
builder.Services.AddHangfire(config =>
{
config.UsePostgreSqlStorage(
builder.Configuration.GetConnectionString("DefaultConnection"));
});
builder.Services.AddHangfireServer();
// Register our job
builder.Services.AddScoped<WorkflowSchedulerJob>();
Then, after the app starts, register recurring jobs:
app.UseHangfireDashboard("/hangfire");
// Register recurring jobs
RecurringJob.AddOrUpdate<WorkflowSchedulerJob>(
"scheduled-workflows",
job => job.ExecuteScheduledWorkflowsAsync(),
Cron.Minutely);
RecurringJob.AddOrUpdate<WorkflowSchedulerJob>(
"api-poll-triggers",
job => job.PollApiTriggersAsync(),
Cron.Minutely);
Hangfire includes a built-in dashboard accessible at /hangfire:
app.UseHangfireDashboard("/hangfire", new DashboardOptions
{
Authorization = new[]
{
new HangfireAuthorizationFilter()
}
});
public class HangfireAuthorizationFilter : IDashboardAuthorizationFilter
{
public bool Authorize(DashboardContext context)
{
var httpContext = context.GetHttpContext();
// Only allow authenticated users
return httpContext.User.Identity?.IsAuthenticated == true;
}
}
Let's add UI for creating and managing triggers:
[HttpPost("workflow/{id}/triggers")]
public async Task<IActionResult> CreateTrigger(
string id,
[FromBody] TriggerCreateRequest request)
{
var workflow = await _context.WorkflowDefinitions
.FirstOrDefaultAsync(w => w.WorkflowId == id);
if (workflow == null)
return NotFound();
var trigger = new WorkflowTriggerStateEntity
{
WorkflowDefinitionId = workflow.Id,
TriggerType = request.Type,
ConfigJson = JsonSerializer.Serialize(request.Config),
StateJson = "{}",
IsEnabled = true
};
await _context.WorkflowTriggerStates.AddAsync(trigger);
await _context.SaveChangesAsync();
return Json(new { success = true, triggerId = trigger.Id });
}
public class TriggerCreateRequest
{
public string Type { get; set; } = string.Empty; // Schedule, ApiPoll
public object Config { get; set; } = new();
}
<div class="card bg-base-100 shadow-xl">
<div class="card-body">
<h2 class="card-title">⏰ Add Trigger</h2>
<div class="form-control">
<label class="label">Trigger Type</label>
<select class="select select-bordered" x-model="triggerType">
<option value="Schedule">Schedule</option>
<option value="ApiPoll">API Poll</option>
</select>
</div>
<!-- Schedule Config -->
<template x-if="triggerType === 'Schedule'">
<div class="space-y-4">
<div class="form-control">
<label class="label">Interval</label>
<div class="flex gap-2">
<input type="number"
x-model="scheduleConfig.intervalValue"
class="input input-bordered flex-1" />
<select x-model="scheduleConfig.intervalType"
class="select select-bordered">
<option value="minutes">Minutes</option>
<option value="hours">Hours</option>
<option value="days">Days</option>
</select>
</div>
</div>
</div>
</template>
<!-- API Poll Config -->
<template x-if="triggerType === 'ApiPoll'">
<div class="space-y-4">
<div class="form-control">
<label class="label">API URL</label>
<input type="url"
x-model="apiConfig.url"
class="input input-bordered"
placeholder="https://api.example.com/data" />
</div>
<div class="form-control">
<label class="label">Poll Interval (seconds)</label>
<input type="number"
x-model="apiConfig.intervalSeconds"
class="input input-bordered"
value="300" />
</div>
</div>
</template>
<button @click="createTrigger()" class="btn btn-primary mt-4">
Create Trigger
</button>
</div>
</div>
Let's build a complete automated workflow that:
{
"name": "GitHub Release Monitor",
"startNodeId": "parse-data",
"nodes": [
{
"id": "parse-data",
"type": "Transform",
"name": "Extract Version",
"inputs": {
"operation": "json_parse",
"data": "{{apiResponse}}"
}
},
{
"id": "log-release",
"type": "Log",
"name": "Log New Release",
"inputs": {
"message": "New release: {{tag_name}} - {{name}}",
"level": "info"
}
}
],
"connections": [
{
"sourceNodeId": "parse-data",
"targetNodeId": "log-release"
}
]
}
{
"type": "ApiPoll",
"config": {
"url": "https://api.github.com/repos/dotnet/aspnetcore/releases/latest",
"intervalSeconds": 3600
}
}
Now, every hour, Hangfire will:
All workflow executions are logged:
_logger.LogInformation(
"Workflow {WorkflowId} execution {ExecutionId} completed in {Duration}ms with status {Status}",
execution.WorkflowId,
execution.Id,
execution.DurationMs,
execution.Status);
We can add Prometheus metrics:
private static readonly Counter WorkflowExecutions = Metrics
.CreateCounter("workflow_executions_total",
"Total workflow executions",
new CounterConfiguration
{
LabelNames = new[] { "workflow_id", "status" }
});
// In execution service
WorkflowExecutions
.WithLabels(workflow.Id, execution.Status.ToString())
.Inc();
Set up alerts for:
With many workflows polling frequently, database load can be significant:
Solution: Batch queries
// Instead of querying per trigger
var triggers = await _context.WorkflowTriggerStates
.Include(t => t.WorkflowDefinition)
.Where(t => t.IsEnabled && t.TriggerType == "ApiPoll")
.AsNoTracking() // Read-only
.ToListAsync();
When polling external APIs:
Solution: Exponential backoff
catch (HttpRequestException ex) when (ex.StatusCode == HttpStatusCode.TooManyRequests)
{
// Back off
var retryAfter = response.Headers.RetryAfter?.Delta ?? TimeSpan.FromMinutes(5);
state["backoffUntil"] = DateTime.UtcNow.Add(retryAfter).ToString("O");
}
Only trigger if certain conditions are met:
public class ConditionalTriggerConfig : ApiPollTriggerConfig
{
public string? Condition { get; set; } // e.g., "{{stars}} > 1000"
}
Chain triggers - one workflow's completion triggers another:
// After workflow completes
if (execution.Status == WorkflowExecutionStatus.Completed)
{
var dependentTriggers = await _context.WorkflowTriggerStates
.Where(t => t.TriggerType == "WorkflowComplete" &&
t.ConfigJson.Contains(execution.WorkflowId))
.ToListAsync();
foreach (var trigger in dependentTriggers)
{
await _executionService.ExecuteWorkflowAsync(
trigger.WorkflowDefinition.WorkflowId,
execution.OutputData,
$"Triggered by {execution.WorkflowId}");
}
}
Unit test your jobs:
[Fact]
public async Task ExecuteScheduledWorkflows_ShouldExecuteWhenIntervalPassed()
{
// Arrange
var mockContext = CreateMockContext();
var mockExecutionService = new Mock<IWorkflowExecutionService>();
var job = new WorkflowSchedulerJob(mockContext.Object,
mockExecutionService.Object, Mock.Of<ILogger>());
// Act
await job.ExecuteScheduledWorkflowsAsync();
// Assert
mockExecutionService.Verify(s => s.ExecuteWorkflowAsync(
It.IsAny<string>(),
It.IsAny<Dictionary<string, object>>(),
"Scheduler",
It.IsAny<CancellationToken>()), Times.Once);
}
We've built a complete automation system! Our workflows can now:
✅ Run on schedules - Hourly, daily, or custom intervals ✅ Poll APIs - Monitor external services for changes ✅ Track state - Remember what we've seen before ✅ Auto-retry - Handle transient failures ✅ Monitor - Dashboard for all jobs ✅ Scale - Hangfire handles load balancing
We've built an enterprise-grade workflow system from scratch:
You now have:
Possible enhancements:
All code is available in the repository:
Mostlylucid.SchedulerService/Jobs/Mostlylucid.Workflow.Shared/Mostlylucid.Workflow.Engine/Thank you for following this series! Happy workflow building! 🎉
© 2025 Scott Galloway — Unlicense — All content and source code on this site is free to use, copy, modify, and sell.