I built StyloFlow because I kept writing the same pattern over and over: components that react to what happened before, emit confidence scores, and sometimes need to escalate to more expensive analysis. Existing workflow engines wanted me to think in terms of DAGs or state machines. I wanted to think in signals.
StyloFlow is a signal-driven orchestration library that matches how I think about AI pipelines: components declare what they produce and what they need, confidence scores guide execution, and cheap operations run first with escalation to expensive ones only when needed.
This is the infrastructure powering lucidRAG - a cross-modal graph RAG tool that combines DocSummarizer (documents), DataSummarizer (structured data), and ImageSummarizer (images) into a unified question-answering system with knowledge graph visualization. It also powers Stylobot (an advanced bot protection system) and implements the Reduced RAG pattern.

Source: GitHub - StyloFlow
NOTE: StyloFlow is not YET a finished product; as I build lucidRAG and StyloBot I'm adding missing features and polishing the API. It's still in active development, but you can try it out and provide feedback.
StyloFlow is a working prototype of a signal-driven orchestration model. The API and shape will evolve as I build lucidRAG and Stylobot, but the execution semantics and patterns described here are the point: signals as first-class facts, confidence-driven branching, and escalation as a structural pattern.
This isn't a new DSL or workflow language. It's a set of execution semantics built around signals, confidence, and bounded escalation. Today it runs in-process with bounded concurrency. Tomorrow it will distribute lanes across machines while keeping signals as the stable boundary.
Here's what most workflow engines look like:
// ❌ Traditional: Hardcoded dependencies
public async Task ProcessDocumentAsync(string path)
{
var text = await ExtractTextAsync(path);
var chunks = await ChunkTextAsync(text);
var embeddings = await GenerateEmbeddingsAsync(chunks);
var entities = await ExtractEntitiesAsync(chunks);
await StoreEverythingAsync(embeddings, entities);
}
This works until:
You end up with either:
StyloFlow builds on mostlylucid.ephemeral - a library for bounded, trackable async execution.
Quick recap of what ephemeral provides:
// Bounded concurrent processing with full visibility
var coordinator = new EphemeralWorkCoordinator<DocumentJob>(
async (job, operation, ct) => {
await ProcessAsync(job, ct);
operation.Signal("document.processed");
},
new EphemeralOptions { MaxConcurrency = 4 });
// Enqueue work
await coordinator.EnqueueAsync(new DocumentJob(filePath));
// Full observability
Console.WriteLine($"Active: {coordinator.ActiveCount}");
Console.WriteLine($"Completed: {coordinator.TotalCompleted}");
Key benefits from ephemeral:
For details, see Fire and Don't Quite Forget.
This orchestration model extends ephemeral with:
Here's the key architectural shift:
graph TD
subgraph Traditional["❌ Traditional: Hardcoded"]
T1[Component A] -->|calls| T2[Component B]
T2 -->|calls| T3[Component C]
T3 -->|calls| T4[Component D]
end
subgraph StyloFlow["✅ StyloFlow: Signal-Driven"]
S1[Component A]
S2[Component B]
S3[Component C]
S4[Component D]
SS[Signal Sink]
S1 -.emits.-> SS
S2 -.emits.-> SS
S3 -.emits.-> SS
SS -.triggers.-> S2
SS -.triggers.-> S3
SS -.triggers.-> S4
end
style T1 stroke:#ff6b6b
style T2 stroke:#ff6b6b
style T3 stroke:#ff6b6b
style T4 stroke:#ff6b6b
style S1 stroke:#51cf66
style S2 stroke:#51cf66
style S3 stroke:#51cf66
style S4 stroke:#51cf66
style SS stroke:#339af0
Components never call each other. They emit signals and react to signals.
Signals are facts about what happened, not commands or events. They're immutable, timestamped, and carry confidence scores. Each atom owns its signals - they're externally immutable. Nothing else can modify that list.
public record Signal
{
public required string Key { get; init; } // "document.chunked"
public object? Value { get; init; } // Optional payload
public double Confidence { get; init; } = 1.0; // 0.0 to 1.0
public required string Source { get; init; } // Which component
public DateTime Timestamp { get; init; }
public Dictionary<string, object>? Metadata { get; init; }
}
Critical architectural point: Signals are owned by atoms, coordinated via SignalSink.
Each operation/atom owns its signals - they're externally immutable. SignalSink is a read-only view across atoms that provides BOTH push and pull coordination patterns.
// Each operation owns its signals
var operation = coordinator.GetOperation(opId);
var signals = operation.GetSignals(); // Read-only view
// Signals can be escalated (copied to another operation)
operation.EscalateSignal("quality.low", targetOperationId);
// Or echoed (preserved when operation evicts)
operation.EmitEcho("final.state", value);
// But the original signal list is externally immutable
// Nothing can modify operation.GetSignals() from outside
SignalSink provides TWO coordination patterns:
1. Push-based (Subscribe to the SINK, not atoms):
var sink = new SignalSink();
// Subscribe to the sink for push notifications
sink.Subscribe(signal => {
if (signal.Is("document.chunked"))
{
// React to signal, but query atom for authoritative state
var operation = coordinator.GetOperation(signal.OperationId);
var chunkCount = operation.State["ChunkCount"];
}
});
// Or use the SignalRaised event
sink.SignalRaised += (sender, signal) => {
// Handle signal from any atom using this sink
};
2. Pull-based (Query the SINK):
// Query for specific signals
var documentSignals = sink.Sense("document.chunked");
// Check if signal exists
if (sink.Detect("embeddings.generated"))
{
// Proceed
}
// Get all signals for an operation
var opSignals = sink.GetOpSignals(operationId);
Why this matters:
Example coordination patterns:
// Pull pattern: Wave queries sink to decide if it should run
public bool ShouldRun(string path, AnalysisContext ctx)
{
var chunkSignal = ctx.GetSignal("document.chunked");
return chunkSignal != null && (int)chunkSignal.Value > 0;
}
// Atom emits signals (adds to its owned list)
public async Task<IEnumerable<Signal>> AnalyzeAsync(...)
{
return new[]
{
new Signal
{
Key = "embeddings.generated",
Confidence = 1.0,
Source = Name
}
};
}
// Push pattern: UI subscribes to sink for reactive updates
sink.Subscribe(signal => {
if (signal.Key.StartsWith("document."))
{
UpdateProgressUI(signal);
}
});
This structure exists because AI components naturally produce probabilistic outputs - the signal model makes confidence first-class while maintaining clear ownership boundaries. You can use Subscribe() for reactive patterns or query methods for polling, but either way, atoms own the signals and the sink provides a read-only view.
For the theory behind this, see Constrained Fuzzy Context Dragging.
Manifests declare contracts (what triggers me, what I emit, what I cost) separate from implementation. This separation exists so you can understand the workflow without reading code, and change execution order without recompiling.
name: BotDetector
priority: 10 # Lower runs first
enabled: true
# What kind of component is this?
taxonomy:
kind: analyzer # sensor|analyzer|proposer|gatekeeper
determinism: probabilistic
persistence: ephemeral
# When should this run?
triggers:
requires:
- signal: http.request.received
condition: exists
# What does it produce?
emits:
on_complete:
- key: bot.detected
confidence_range: [0.0, 1.0]
conditional:
- key: bot.escalation.needed
when: confidence < 0.7
# Resource limits
lane:
name: fast # fast|normal|slow|llm
max_concurrency: 8
budget:
max_duration: 100ms
# Configuration values
defaults:
confidence:
bot_detected: 0.6
timing:
timeout_ms: 100
Benefits:
While you can write YAML manifests by hand, StyloFlow includes a visual workflow builder that lets you design signal-driven workflows using modular-synth-style patching:

The UI provides:
This makes it easy to experiment with different workflow shapes without writing YAML by hand, while still giving you full control over the generated configuration.
A wave is a composable analysis stage. This interface exists to make "should we run?" a first-class decision, not an implementation detail buried in conditional logic.
public interface IContentAnalysisWave
{
string Name { get; }
int Priority { get; } // Higher runs first
bool Enabled { get; set; }
// Quick filter - avoid expensive work
bool ShouldRun(string contentPath, AnalysisContext context);
// Do the analysis
Task<IEnumerable<Signal>> AnalyzeAsync(
string contentPath,
AnalysisContext context,
CancellationToken ct);
}
Simple wave example:
public class FileTypeWave : IContentAnalysisWave
{
public string Name => "FileType";
public int Priority => 100;
public bool Enabled { get; set; } = true;
public bool ShouldRun(string path, AnalysisContext ctx)
{
// Skip if we already know the type
return ctx.GetSignal("file.type") == null;
}
public async Task<IEnumerable<Signal>> AnalyzeAsync(
string path,
AnalysisContext ctx,
CancellationToken ct)
{
var extension = Path.GetExtension(path);
var mimeType = GetMimeType(extension);
return new[]
{
new Signal
{
Key = "file.type",
Value = mimeType,
Confidence = 1.0,
Source = Name
}
};
}
}
Wave coordination:
The WaveCoordinator runs waves in priority order:
var coordinator = new WaveCoordinator(waves, profile);
var context = new AnalysisContext();
var results = await coordinator.ExecuteAsync(filePath, context, ct);
// All signals from all waves
foreach (var signal in context.GetAllSignals())
{
Console.WriteLine($"{signal.Key}: {signal.Value}");
}
Concurrency lanes:
Waves run in lanes with different concurrency limits:
| Lane | Purpose | Concurrency |
|---|---|---|
fast |
Quick checks (IP lookup, file type) | 16 |
normal |
Standard processing (parsing, chunking) | 8 |
io |
I/O bound (file reads, API calls) | 32 |
llm |
Expensive LLM calls | 2 |
This prevents expensive operations from blocking cheap ones.
Here's the complete picture:
graph TB
subgraph Input["Input Layer"]
REQ[HTTP Request]
FILE[File Upload]
JOB[Background Job]
end
subgraph Ephemeral["Ephemeral Layer"]
COORD[Work Coordinator]
OPS[Operations<br/>own signals]
SINK[SignalSink<br/>read-only view]
end
subgraph StyloFlow["StyloFlow Layer"]
MAN[Manifests]
WAVE[Wave Coordinator]
ATOMS[Atoms<br/>own signals]
end
subgraph Execution["Execution"]
FAST[Fast Lane]
NORM[Normal Lane]
LLM[LLM Lane]
end
subgraph Output["Output"]
RES[Results]
ESCAL[Escalation]
STORE[Persistence]
end
REQ --> COORD
FILE --> COORD
JOB --> COORD
COORD --> OPS
SINK -.queries.-> OPS
WAVE -.reads.-> SINK
MAN -.configures.-> WAVE
WAVE --> ATOMS
ATOMS --> FAST
ATOMS --> NORM
ATOMS --> LLM
SINK -.queries.-> FAST
SINK -.queries.-> NORM
SINK -.queries.-> LLM
SINK -.read for.-> RES
SINK -.read for.-> ESCAL
SINK -.read for.-> STORE
style COORD stroke:#339af0
style SINK stroke:#339af0
style WAVE stroke:#51cf66
style ATOMS stroke:#51cf66
Flow:
Ownership model: Each operation/atom owns its signals. SignalSink provides a read-only view across all operations. Signals can be escalated (copied) or echoed (preserved when evicted), but the owned list is externally immutable.
Current execution model: Single-process, bounded concurrency, observable operations with LRU eviction.
Future execution model: Distributed lanes across machines, SignalSink queries remote operations, atoms execute on different hosts. Signals remain the stable boundary - they're already serializable, timestamped, and self-contained. The ownership model doesn't change.
The in-process implementation validates the semantics. Distribution is about scaling the execution substrate, not changing the orchestration model.
Let's see how lucidRAG uses StyloFlow:
Stage 1: Initial Detection
public class FileTypeDetectorWave : IContentAnalysisWave
{
public int Priority => 100; // Run first
public async Task<IEnumerable<Signal>> AnalyzeAsync(...)
{
var extension = Path.GetExtension(path);
return new[]
{
new Signal
{
Key = "file.extension",
Value = extension,
Source = "FileTypeDetector"
}
};
}
}
Stage 2: Chunking (triggered by file.extension)
// In manifest:
// triggers:
// requires:
// - signal: file.extension
// condition: in
// value: [".pdf", ".docx", ".md"]
public class ChunkingWave : ConfiguredComponentBase, IContentAnalysisWave
{
public int Priority => 80;
public async Task<IEnumerable<Signal>> AnalyzeAsync(...)
{
var chunks = await ChunkDocumentAsync(path);
ctx.SetCached("chunks", chunks); // Share with other waves
return new[]
{
new Signal
{
Key = "document.chunked",
Value = chunks.Count,
Source = Name
}
};
}
}
Stage 3: Embedding (triggered by document.chunked)
public class EmbeddingWave : ConfiguredComponentBase, IContentAnalysisWave
{
public int Priority => 60;
public bool ShouldRun(string path, AnalysisContext ctx)
{
// Only run if chunking succeeded
return ctx.GetSignal("document.chunked") != null;
}
public async Task<IEnumerable<Signal>> AnalyzeAsync(...)
{
var chunks = ctx.GetCached<List<Chunk>>("chunks");
var embeddings = await GenerateEmbeddingsAsync(chunks);
ctx.SetCached("embeddings", embeddings);
return new[]
{
new Signal
{
Key = "embeddings.generated",
Value = embeddings.Count,
Source = Name
}
};
}
}
Stage 4: Entity Extraction (parallel with embedding)
public class EntityExtractionWave : ConfiguredComponentBase, IContentAnalysisWave
{
public int Priority => 60; // Same as embedding - runs in parallel
public async Task<IEnumerable<Signal>> AnalyzeAsync(...)
{
var chunks = ctx.GetCached<List<Chunk>>("chunks");
// Use deterministic IDF scoring, not LLM per chunk
// (See Reduced RAG pattern)
var entities = await ExtractEntitiesAsync(chunks);
return new[]
{
new Signal
{
Key = "entities.extracted",
Value = entities.Count,
Confidence = CalculateConfidence(entities),
Source = Name
}
};
}
}
Stage 5: Quality Check
public class QualityCheckWave : ConfiguredComponentBase, IContentAnalysisWave
{
public int Priority => 40; // After embedding + entities
public async Task<IEnumerable<Signal>> AnalyzeAsync(...)
{
var embeddingSignal = ctx.GetSignal("embeddings.generated");
var entitySignal = ctx.GetSignal("entities.extracted");
var embeddingCount = (int)embeddingSignal.Value;
var entityConfidence = entitySignal.Confidence;
var quality = CalculateQuality(embeddingCount, entityConfidence);
var signals = new List<Signal>
{
new Signal
{
Key = "quality.score",
Value = quality,
Source = Name
}
};
// Trigger escalation if quality is poor
if (quality < GetParam<double>("quality_threshold", 0.7))
{
signals.Add(new Signal
{
Key = "escalation.needed",
Value = "low_quality_document",
Source = Name
});
}
return signals;
}
}
Benefits of this approach:
This is the Reduced RAG pattern in action: deterministic extraction up front, LLMs only for synthesis.
Stylobot is an advanced bot detection system that uses StyloFlow for multi-stage threat analysis. See the complete escalation example in the "Escalation: From Fast to Thorough" section for detailed code.
Pattern 1: Fan-Out
One signal triggers multiple waves:
graph LR
S1[document.uploaded] --> W1[ChunkingWave]
S1 --> W2[MetadataWave]
S1 --> W3[LanguageDetectionWave]
W1 -.signal.-> S2[document.chunked]
W2 -.signal.-> S3[metadata.extracted]
W3 -.signal.-> S4[language.detected]
style S1 stroke:#339af0
style S2 stroke:#339af0
style S3 stroke:#339af0
style S4 stroke:#339af0
style W1 stroke:#51cf66
style W2 stroke:#51cf66
style W3 stroke:#51cf66
Pattern 2: Sequential Dependency
Waves wait for previous signals:
graph LR
W1[ExtractWave] -.signal.-> S1[text.extracted]
S1 --> W2[ChunkWave]
W2 -.signal.-> S2[text.chunked]
S2 --> W3[EmbedWave]
W3 -.signal.-> S3[embeddings.generated]
style S1 stroke:#339af0
style S2 stroke:#339af0
style S3 stroke:#339af0
style W1 stroke:#51cf66
style W2 stroke:#51cf66
style W3 stroke:#51cf66
Pattern 3: Conditional Branching
Different waves run based on signals:
graph TD
W1[DetectorWave] -.signal.-> S1{confidence}
S1 -->|< 0.4| W2[RejectWave]
S1 -->|0.4-0.7| W3[EscalateWave]
S1 -->|> 0.7| W4[AcceptWave]
W2 -.signal.-> S2[rejected]
W3 -.signal.-> S3[escalated]
W4 -.signal.-> S4[accepted]
style S1 stroke:#ffd43b
style S2 stroke:#ff6b6b
style S3 stroke:#ff922b
style S4 stroke:#51cf66
style W1 stroke:#339af0
style W2 stroke:#ff6b6b
style W3 stroke:#ff922b
style W4 stroke:#51cf66
Pattern 4: Aggregation
Multiple signals trigger one wave:
graph LR
W1[Wave A] -.signal.-> S1[a.complete]
W2[Wave B] -.signal.-> S2[b.complete]
W3[Wave C] -.signal.-> S3[c.complete]
S1 --> T{All Ready?}
S2 --> T
S3 --> T
T -->|Yes| W4[AggregatorWave]
W4 -.signal.-> S4[aggregation.complete]
style S1 stroke:#339af0
style S2 stroke:#339af0
style S3 stroke:#339af0
style S4 stroke:#51cf66
style T stroke:#ffd43b
style W4 stroke:#51cf66
The key innovation is bounded escalation - cheap detectors run first, expensive analysis only when needed.
Example: Bot detection
// Stage 1: Fast IP check (< 10ms)
public class IpReputationWave : IContentAnalysisWave
{
public int Priority => 100;
public async Task<IEnumerable<Signal>> AnalyzeAsync(...)
{
var ip = ctx.GetCached<string>("client_ip");
var reputation = await _ipService.CheckAsync(ip);
double confidence;
if (reputation == IpReputation.KnownBot)
confidence = 0.95;
else if (reputation == IpReputation.Suspicious)
confidence = 0.5;
else
confidence = 0.1;
var signals = new List<Signal>
{
new Signal
{
Key = "bot.detected",
Value = reputation != IpReputation.Clean,
Confidence = confidence,
Source = Name
}
};
// Trigger escalation if unsure
if (confidence > 0.4 && confidence < 0.7)
{
signals.Add(new Signal
{
Key = "bot.escalation.needed",
Source = Name
});
}
return signals;
}
}
Stage 2: Behavioral analysis (triggered by escalation, ~100ms)
// Only runs if bot.escalation.needed signal exists
public class BehaviorAnalysisWave : ConfiguredComponentBase, IContentAnalysisWave
{
public int Priority => 80;
public bool ShouldRun(string path, AnalysisContext ctx)
{
// Skip if already confident
var botSignal = ctx.GetBestSignal("bot.detected");
if (botSignal?.Confidence > 0.7 || botSignal?.Confidence < 0.4)
return false;
// Only run if escalation requested
return ctx.GetSignal("bot.escalation.needed") != null;
}
public async Task<IEnumerable<Signal>> AnalyzeAsync(...)
{
var userAgent = ctx.GetCached<string>("user_agent");
var clickPattern = ctx.GetCached<List<Click>>("clicks");
var behaviorScore = AnalyzeBehavior(userAgent, clickPattern);
return new[]
{
new Signal
{
Key = "bot.detected",
Value = behaviorScore > 0.6,
Confidence = behaviorScore,
Source = Name
}
};
}
}
Stage 3: LLM analysis (only if still unsure, ~2s)
public class LlmBotAnalysisWave : ConfiguredComponentBase, IContentAnalysisWave
{
public int Priority => 60;
public bool ShouldRun(string path, AnalysisContext ctx)
{
var botSignal = ctx.GetBestSignal("bot.detected");
// Only use LLM if still in ambiguous range
return botSignal?.Confidence > 0.4 &&
botSignal?.Confidence < 0.7;
}
public async Task<IEnumerable<Signal>> AnalyzeAsync(...)
{
var messages = ctx.GetCached<List<Message>>("conversation");
var prompt = $@"Analyze if this conversation is from a bot:
{string.Join("\n", messages.Select(m => m.Text))}
Reply with JSON: {{""is_bot"": bool, ""confidence"": 0.0-1.0, ""reasoning"": string}}";
var response = await _llm.CompleteAsync(prompt);
var result = JsonSerializer.Deserialize<BotAnalysisResult>(response);
return new[]
{
new Signal
{
Key = "bot.detected",
Value = result.IsBot,
Confidence = result.Confidence,
Source = Name,
Metadata = new() { ["reasoning"] = result.Reasoning }
}
};
}
}
Cost breakdown:
| Stage | Latency | Cost | Hit Rate |
|---|---|---|---|
| IP check | 5ms | $0 | 100% of requests |
| Behavioral | 100ms | $0 | 30% (ambiguous cases) |
| LLM | 2s | $0.002 | 5% (still ambiguous) |
Total cost: $0.002 * 0.05 = $0.0001 per request
Compare to naive "LLM everything" approach: $0.002 * 100% = $0.002 per request (20x more expensive)
This is the power of wave-based escalation.
This is not a quick-start guide; it's the smallest example that shows how the model fits together.
Installation:
dotnet add package StyloFlow.Complete
Conceptual entry point:
// 1. Define a wave
public class MyAnalysisWave : IContentAnalysisWave
{
public string Name => "MyAnalysis";
public int Priority => 50;
public bool Enabled { get; set; } = true;
public bool ShouldRun(string path, AnalysisContext ctx) => true;
public async Task<IEnumerable<Signal>> AnalyzeAsync(
string path,
AnalysisContext ctx,
CancellationToken ct)
{
// Your analysis logic here
var result = await AnalyzeAsync(path);
return new[]
{
new Signal
{
Key = "my.signal",
Value = result,
Confidence = 1.0,
Source = Name
}
};
}
}
// 2. Register waves
var waves = new List<IContentAnalysisWave>
{
new MyAnalysisWave(),
new AnotherWave(),
};
// 3. Create coordinator
var coordinator = new WaveCoordinator(
waves,
CoordinatorProfile.Default);
// 4. Execute
var context = new AnalysisContext();
var results = await coordinator.ExecuteAsync(filePath, context);
// 5. Read signals
foreach (var signal in context.GetAllSignals())
{
Console.WriteLine($"{signal.Key}: {signal.Value} ({signal.Confidence})");
}
With manifests:
// Load manifests from directory
var loader = new FileSystemManifestLoader("./manifests");
var manifests = await loader.LoadAllAsync();
// Build waves from manifests
var waves = manifests
.Where(m => m.Enabled)
.OrderBy(m => m.Priority)
.Select(m => WaveFactory.Create(m))
.ToList();
var coordinator = new WaveCoordinator(waves, profile);
For complete examples, see the StyloFlow GitHub repository.
One of StyloFlow's key features is workflow discoverability - you can understand the entire pipeline just by reading the manifests. No code diving required.
Here's the actual manifest directory structure for lucidRAG:
manifests/
├── 01-file-type-detector.yaml
├── 02-chunking.yaml
├── 03-embedding.yaml
├── 04-entity-extraction.yaml
├── 05-quality-check.yaml
└── 06-escalation.yaml
01-file-type-detector.yaml:
name: FileTypeDetector
priority: 100
enabled: true
description: Detects file type from extension
taxonomy:
kind: sensor
determinism: deterministic
persistence: ephemeral
triggers:
requires:
- signal: document.uploaded
condition: exists
emits:
on_start:
- file.detection.started
on_complete:
- key: file.extension
type: string
confidence_range: [1.0, 1.0]
- key: file.mime_type
type: string
confidence_range: [1.0, 1.0]
lane:
name: fast
max_concurrency: 16
budget:
max_duration: 10ms
02-chunking.yaml:
name: ChunkingWave
priority: 80
enabled: true
description: Splits documents into semantic chunks
taxonomy:
kind: extractor
determinism: deterministic
persistence: ephemeral
input:
accepts:
- document.pdf
- document.docx
- document.markdown
required_signals:
- file.extension
triggers:
requ.Enabled)
.OrderBy(m => m.Priority)
.Select(m => WaveFactory.Create(m))
.ToList();
var coordinator = new WaveCoordinator(waves, profile);
For complete examples, see the StyloFlow GitHub repository.
One of StyloFlow's key features is workflow discoverability - you can understand the entire pipeline just by reading the manifests. No code diving required.
Here's the actual manifest directory structure for lucidRAG:
manifests/
├── 01-file-type-detector.yaml
├── 02-chunking.yaml
├── 03-embedding.yaml
├── 04-entity-extraction.yaml
├── 05-quality-check.yaml
└── 06-escalation.yaml
01-file-type-detector.yaml:
name: FileTypeDetector
priority: 100
enabled: true
description: Detects file type from extension
taxonomy:
kind: sensor
determinism: deterministic
persistence: ephemeral
triggers:
requires:
- signal: document.uploaded
condition: exists
emits:
on_start:
- file.detection.started
on_complete:
- key: file.extension
type: string
confidence_range: [1.0, 1.0]
- key: file.mime_type
type: string
confidence_range: [1.0, 1.0]
lane:
name: fast
max_concurrency: 16
budget:
max_duration: 10ms
02-chunking.yaml:
name: ChunkingWave
priority: 80
enabled: true
description: Splits documents into semantic chunks
taxonomy:
kind: extractor
determinism: deterministic
persistence: ephemeral
input:
accepts:
- document.pdf
- document.docx
- document.markdown
required_signals:
- file.extension
triggers:
requires:
- signal: file.extension
condition: in
value: [".pdf", ".docx", ".md", ".txt"]
emits:
on_complete:
- key: document.chunked
type: integer
confidence_range: [1.0, 1.0]
- key: chunks.cached
type: boolean
lane:
name: normal
max_concurrency: 8
budget:
max_duration: 30s
defaults:
chunking:
max_chunk_size: 512
overlap: 50
respect_boundaries: true
03-embedding.yaml:
name: EmbeddingWave
priority: 60
enabled: true
description: Generates ONNX embeddings for chunks
taxonomy:
kind: embedder
determinism: deterministic
persistence: cached
input:
required_signals:
- document.chunked
- chunks.cached
triggers:
requires:
- signal: document.chunked
condition: ">"
value: 0
emits:
on_complete:
- key: embeddings.generated
type: integer
confidence_range: [1.0, 1.0]
lane:
name: normal
max_concurrency: 4
budget:
max_duration: 2m
max_cost: 0.0 # Local ONNX model
defaults:
embedding:
model: all-MiniLM-L6-v2
batch_size: 32
04-entity-extraction.yaml:
name: EntityExtractionWave
priority: 60 # Same as embedding - runs in parallel
enabled: true
description: Extracts entities using IDF scoring
taxonomy:
kind: extractor
determinism: deterministic
persistence: persisted
input:
required_signals:
- document.chunked
triggers:
requires:
- signal: document.chunked
condition: ">"
value: 0
emits:
on_complete:
- key: entities.extracted
type: integer
confidence_range: [0.0, 1.0] # Confidence varies
lane:
name: normal
max_concurrency: 8
budget:
max_duration: 1m
defaults:
entity:
min_idf_score: 2.5
min_frequency: 2
max_entities: 100
05-quality-check.yaml:
name: QualityCheckWave
priority: 40
enabled: true
description: Validates extraction quality
taxonomy:
kind: gatekeeper
determinism: deterministic
persistence: ephemeral
input:
required_signals:
- embeddings.generated
- entities.extracted
triggers:
requires:
- signal: embeddings.generated
condition: ">"
value: 0
- signal: entities.extracted
condition: exists
emits:
on_complete:
- key: quality.score
type: double
confidence_range: [0.0, 1.0]
conditional:
- key: escalation.needed
when: quality.score < 0.7
lane:
name: fast
max_concurrency: 16
defaults:
quality:
min_embeddings: 5
min_entity_confidence: 0.5
threshold: 0.7
06-escalation.yaml:
name: EscalationWave
priority: 20
enabled: true
description: Improves low-quality extractions using LLM
taxonomy:
kind: proposer
determinism: probabilistic
persistence: persisted
input:
required_signals:
- escalation.needed
triggers:
requires:
- signal: escalation.needed
condition: exists
skip_when:
- signal: budget.exhausted
emits:
on_complete:
- key: escalation.complete
type: boolean
- key: entities.improved
type: integer
confidence_range: [0.7, 1.0]
lane:
name: llm
max_concurrency: 2 # Expensive
budget:
max_duration: 30s
max_tokens: 4000
max_cost: 0.05
defaults:
llm:
model: gpt-4o-mini
temperature: 0.1
prompt_template: entity_extraction
Looking at these files, you immediately know:
No code reading required. The workflow is self-documenting.
While the examples above show fully declarative YAML, waves can also be code-based atoms referenced in manifests:
name: CustomAnalyzer
priority: 50
enabled: true
description: Custom analysis logic
# Reference a code-based atom implementation
implementation:
assembly: MyProject.Analyzers
type: MyProject.Analyzers.CustomAnalyzerWave
method: AnalyzeAsync
# The manifest still declares the contract
taxonomy:
kind: analyzer
determinism: probabilistic
triggers:
requires:
- signal: data.ready
emits:
on_complete:
- key: analysis.complete
confidence_range: [0.0, 1.0]
lane:
name: normal
max_concurrency: 4
# Configuration values passed to the atom
defaults:
threshold: 0.75
max_iterations: 10
The C# implementation:
public class CustomAnalyzerWave : ConfiguredComponentBase, IContentAnalysisWave
{
public async Task<IEnumerable<Signal>> AnalyzeAsync(
string path,
AnalysisContext ctx,
CancellationToken ct)
{
// Access manifest config
var threshold = GetParam<double>("threshold", 0.75);
var maxIterations = GetParam<int>("max_iterations", 10);
// Custom logic here
var result = await PerformComplexAnalysis(path, threshold, maxIterations);
return new[]
{
new Signal
{
Key = "analysis.complete",
Value = result.Score,
Confidence = result.Confidence,
Source = Name
}
};
}
}
Benefits:
This hybrid approach gives you declarative workflow discovery while keeping complex logic in maintainable C#.
The manifest structure makes it trivial to generate visualizations:
graph TD
DOC[document.uploaded] --> FT[FileTypeDetector<br/>Priority: 100<br/>Lane: fast]
FT --> EXT[file.extension]
EXT --> CH[ChunkingWave<br/>Priority: 80<br/>Lane: normal]
CH --> CHUNKED[document.chunked]
CHUNKED --> EMB[EmbeddingWave<br/>Priority: 60<br/>Lane: normal]
CHUNKED --> ENT[EntityExtractionWave<br/>Priority: 60<br/>Lane: normal]
EMB --> EMBGEN[embeddings.generated]
ENT --> ENTEX[entities.extracted]
EMBGEN --> QC[QualityCheckWave<br/>Priority: 40<br/>Lane: fast]
ENTEX --> QC
QC --> QSCORE[quality.score]
QC -.conditional.-> ESC_NEED[escalation.needed]
ESC_NEED -.-> ESC[EscalationWave<br/>Priority: 20<br/>Lane: llm]
ESC --> ESC_DONE[escalation.complete]
style DOC stroke:#339af0
style FT stroke:#51cf66
style CH stroke:#51cf66
style EMB stroke:#51cf66
style ENT stroke:#51cf66
style QC stroke:#ffd43b
style ESC stroke:#ff922b
This diagram was generated programmatically from the YAML manifests - no manual drawing.
1. Declarative composition
Components declare their contracts (triggers, signals, budget), not their dependencies. The system figures out execution order. This isn't a feature - it's what happens when you make signals first-class.
2. Observable by default
Every action is a signal. You don't add observability - it's inherent. Full execution trace, confidence tracking, escalation paths, and budget consumption fall out naturally.
3. Adaptive execution
Confidence scores drive branching without explicit routing logic. Skip expensive stages when unnecessary, escalate when unsure, abort early on high-confidence failures. The control flow emerges from signal patterns.
4. Testability without mocking frameworks
Mock signals, not components:
var context = new AnalysisContext();
context.AddSignal(new Signal
{
Key = "document.chunked",
Value = 10,
Confidence = 1.0,
Source = "Test"
});
var wave = new EmbeddingWave();
var results = await wave.AnalyzeAsync(path, context, ct);
Assert.Single(results);
Assert.Equal("embeddings.generated", results.First().Key);
5. Incremental complexity
Start simple:
var coordinator = new EphemeralWorkCoordinator<Job>(ProcessAsync);
Add signals when needed:
new EphemeralOptions { Signals = signalSink }
Add waves for multi-stage:
var waveCoordinator = new WaveCoordinator(waves, profile);
Add manifests for declarative config:
name: MyWave
triggers: [...]
emits: [...]
Here's the complete lucidRAG pipeline at a glance (see the detailed code examples in the "Use Case: lucidRAG Document Processing" section above):
document.uploaded signalfile.extension signaltext.extracted signaldocument.chunked signalembeddings.generated signalentities.extracted signalquality.score signalescalation.complete signalstorage.complete signalThe UI polls the SignalSink view for progress updates (SignalSink provides a read-only view across all document operations):
// Background service polls for document progress
var documentOps = coordinator.GetOperations()
.Where(op => op.State.ContainsKey("documentId"));
foreach (var op in documentOps)
{
var signals = op.GetSignals()
.Where(s => s.Key.StartsWith("document."));
foreach (var signal in signals)
{
await _hub.Clients.User(userId)
.SendAsync("DocumentProgress", new
{
stage = signal.Key,
progress = CalculateProgress(signal)
});
}
}
Or using the SignalSink view directly:
// SignalSink is a view across all operations
var documentSignals = signalSink.GetSignals()
.Where(s => s.Key.StartsWith("document.") &&
s.Timestamp > lastCheck);
// Read signals, don't subscribe - signals are owned by operations
This is how lucidRAG processes documents, data, and images through a unified signal-driven pipeline - combining DocSummarizer, DataSummarizer, and ImageSummarizer under one orchestration layer.
Here's the complete Stylobot pipeline at a glance (see the detailed code examples in the "Use Case: Stylobot Chat Pipeline" section above):
intent.detected signaldocuments.retrieved signalresponse.generated signalresponse.improved signalThe key benefit: conditional execution based on signals.
// ❌ Traditional: Every message goes through everything
var intent = await DetectIntentAsync(message);
var docs = await SearchAsync(message); // Even if not needed
var response = await GenerateAsync(message, docs);
// ✅ StyloFlow: Waves run conditionally based on signals
// SearchWave only runs if IntentWave emits "search" intent
// EscalationWave only runs if ResponseWave confidence < 0.7
Simple greetings skip search and LLM generation entirely. Complex questions trigger the full pipeline with escalation. This saves ~90% of LLM costs on typical chat traffic.
| Feature | StyloFlow | Temporal | Airflow | Step Functions |
|---|---|---|---|---|
| Coordination | Signal-driven | RPC-based | DAG-based | State machine |
| Declarative | ✅ YAML manifests | ❌ Code-first | ✅ DAGs | ✅ JSON/YAML |
| Conditional | ✅ Signal triggers | ✅ Conditions | ✅ Branching | ✅ Choice states |
| Escalation | ✅ Built-in | ❌ Manual | ❌ Manual | ❌ Manual |
| Observability | ✅ Signal trace | ✅ Workflow history | ✅ Task logs | ✅ Execution history |
| Budget control | ✅ Token/cost limits | ❌ Manual | ❌ Manual | ❌ Manual |
| Local execution | ✅ In-process | ❌ Requires cluster | ❌ Requires cluster | ❌ AWS only |
| Concurrency lanes | ✅ Fast/Normal/LLM | ❌ Manual | ✅ Pools | ❌ Service limits |
Where this model fits naturally:
Where it doesn't (and won't):
These are natural extensions of the model, not commitments to a specific implementation.
As the semantics stabilize through lucidRAG and Stylobot development, these patterns become viable:
1. Learning from signals
Track which escalation paths work best:
// Did the LLM escalation improve accuracy?
// Learn to skip it if behavioral analysis is sufficient
2. Cost optimization
Automatic lane assignment based on historical performance:
// If a "slow" wave completes quickly, promote to "normal"
3. Signal replay
Debug by replaying signal sequences:
var replay = SignalReplay.FromFile("trace.jsonl");
await coordinator.ReplayAsync(replay);
4. Multi-machine coordination
Distribute lanes across machines while keeping signals centralized.
The core insight is this: in AI systems, every component has confidence.
Traditional workflows assume success/failure. AI workflows need:
Signals provide this naturally:
// Multiple detectors vote
var signals = context.GetSignals("bot.detected");
// Aggregate by confidence
var verdict = signals
.OrderByDescending(s => s.Confidence)
.First();
// Or majority vote
var isBot = signals
.Count(s => (bool)s.Value) > signals.Count() / 2;
// Or weighted average
var score = signals
.Sum(s => (bool)s.Value ? s.Confidence : -s.Confidence)
/ signals.Count();
This is why StyloFlow works well for Reduced RAG - every extraction stage produces a confidence score, and synthesis only happens when confidence is high enough.
The execution model:
Why signals matter:
Working implementations:
Related articles:
Source code: GitHub - StyloFlow
Traditional workflow engines ask you to declare what happens next. This model asks components to declare what they produce and what they need, then lets signals coordinate execution.
The key shift: signals decouple, confidence guides, lanes protect.
This isn't about choosing StyloFlow over Temporal or Airflow - those solve different problems (durable execution, workflow versioning, distributed coordination across data centers). This is about articulating a different orchestration model: one where control flow emerges from signal patterns rather than being explicitly programmed.
If you're building AI/ML pipelines where:
...then these execution semantics might fit how you think.
The ephemeral library is the stable foundation. StyloFlow adds the signal-driven orchestration layer on top. Both are evolving through real use in lucidRAG and Stylobot.
For questions or feedback, see the GitHub repository or reach out via the chatbot on this site (which uses these patterns).
© 2026 Scott Galloway — Unlicense — All content and source code on this site is free to use, copy, modify, and sell.