This is a viewer only at the moment see the article on how this works.
To update the preview hit Ctrl-Alt-R (or ⌘-Alt-R on Mac) or Enter to refresh. The Save icon lets you save the markdown file to disk
This is a preview from the server running through my markdig pipeline
Saturday, 25 January 2025
Коли твої робочі сили вчаться ходити без тренувальних коліс (і ти перестаєш платити за няню)
**Примітка:**Це частина 2 у серії "Початка з DiSE," що досліджує практичні схеми виробництва виробництва.
Проблема: Ми платимо докторам, щоб вони стежили за досконалими пацієнтами
Your workflow: *executes perfectly for the 1,000th time*
Your monitoring AI: "Yep, still perfect! That'll be $0.15."
Your workflow: *executes perfectly for the 1,001st time*
Your monitoring AI: "Still good! Another $0.15 please."
Your workflow: *executes perfectly for the 1,002nd time*
Your monitoring AI: "Perfect again! $0.15."
Monthly cost: $450 to watch perfection happen
Value provided: Approximately zero
Ось що сміховинне про те, як ми запускаємо комп'ютери ШІ в виробництві сьогодні:
У нас нормалізація платежі за моніторинг, який не надає жодної цінності.Не "нижне значення."
Нульова вартість.Коли робота успішно виконується 1000 разів з тими ж показниками якості, тими самими характеристиками, тими ж речами
чому ми все ще платимо комп'ютеру за перегляд?
Це як найняти рятувальника, щоб подивитися, як олімпійські плавці практикуються в дитячому басейні.**Але ось що робить це гірше:**
Коли все йде не так, наш теперішній монітор часто все одно пропускає його.
Тому що статичні моніторинги шукають відомих візерунків.
- Виявлення, діагностика і виправлення.
Week 1 (Apprentice): Senior watches everything you do, corrects mistakes in real-time
Week 4 (Intermediate): Senior checks in periodically, reviews output
Week 12 (Graduate): You work independently, senior only involved if something weird happens
Week 52 (Expert): You barely need supervision unless the job itself changes
Це зразок Apprentication.
graph TB
Start[New Workflow v1.0.0] --> Monitor[Monitoring AI Layer<br/>ACTIVE]
Monitor --> Step1[Tool Call: fetch_data]
Monitor --> Step2[Tool Call: process]
Monitor --> Step3[Tool Call: validate]
Monitor --> Step4[Tool Call: send_results]
Step1 --> Eval1[Quality Check<br/>Response time: 234ms<br/>Data completeness: 100%<br/>Error rate: 0%]
Step2 --> Eval2[Quality Check<br/>Processing accuracy: 99.7%<br/>Memory usage: 45MB<br/>CPU: 23%]
Step3 --> Eval3[Quality Check<br/>Validation pass: 100%<br/>Schema compliance: ✓<br/>Business rules: ✓]
Step4 --> Eval4[Quality Check<br/>Delivery success: 100%<br/>Latency: 156ms<br/>Format: valid JSON]
Eval1 --> Learn[Learning System]
Eval2 --> Learn
Eval3 --> Learn
Eval4 --> Learn
Learn --> Profile[Build Quality Profile<br/>Execution: 1/50 required]
Profile --> Decision{Iterations < 50?}
Decision --> |Yes| Monitor
Decision --> |No| Graduate[Graduate to Phase 2]
style Monitor stroke:#f57c00,stroke-width:3px
style Learn stroke:#0277bd,stroke-width:2px
style Graduate stroke:#2e7d32,stroke-width:3px
Подумайте про те, як люди вчаться нової роботи:
Потікання має йти за тим самим зразком.
class ApprenticeWorkflow:
def __init__(self, workflow_id: str):
self.workflow_id = workflow_id
self.monitoring_tier = MonitoringTier.FULL # Expensive!
self.quality_profile = QualityProfile()
self.execution_count = 0
self.required_successes = 50 # Configurable
async def execute_tool(self, tool_name: str, params: dict):
"""Execute with full monitoring and learning"""
# Pre-execution baseline
baseline = await self.capture_baseline()
# Execute the tool
start_time = time.time()
result = await call_tool(tool_name, params)
execution_time = time.time() - start_time
# Post-execution analysis (THIS IS EXPENSIVE)
quality_check = await self.monitoring_ai.analyze(
tool_name=tool_name,
params=params,
result=result,
execution_time=execution_time,
baseline=baseline,
quality_profile=self.quality_profile
)
# Learn from this execution
self.quality_profile.update(
tool_name=tool_name,
metrics={
"execution_time": execution_time,
"result_size": len(str(result)),
"quality_score": quality_check.score,
"resource_usage": quality_check.resources,
"output_characteristics": quality_check.characteristics
}
)
return result, quality_check
**Режим блокування фази 1: (Спостереження за палицями)**У режимі Apprentice:
class MonitoringAI:
def __init__(self):
self.fast_model = "gemma2:2b" # Quick checks
self.medium_model = "llama3:8b" # Deeper analysis
self.expensive_model = "claude-3.5-sonnet" # Full investigation
async def analyze(self, **context):
"""Tiered monitoring with escalation"""
# Tier 1: Fast checks (always run)
quick_check = await self.quick_analysis(context)
if quick_check.confidence > 0.95:
# We're confident it's fine or definitely broken
return quick_check
# Tier 2: Deeper analysis (escalate if uncertain)
medium_check = await self.medium_analysis(context)
if medium_check.confidence > 0.90:
return medium_check
# Tier 3: Full investigation (expensive, rare)
full_check = await self.expensive_analysis(context)
return full_check
async def quick_analysis(self, context):
"""Fast pass/fail classification"""
prompt = f"""
Quick quality check for tool execution:
Tool: {context['tool_name']}
Execution time: {context['execution_time']}ms
Expected range: {context['quality_profile'].get_expected_range()}
Is this execution within normal parameters?
Answer: NORMAL | SUSPICIOUS | BROKEN
Confidence: 0.0-1.0
"""
response = await call_llm(self.fast_model, prompt)
return AnalysisResult(
status=response.status,
confidence=response.confidence,
cost=0.001, # Very cheap
tier="fast"
)
Кожен інструмент має інструментальне призначення:
Execution #1:
- Tool execution: 234ms, $0
- Fast monitoring: 45ms, $0.001
- Medium monitoring: (escalated) 180ms, $0.015
- Learning update: 12ms, $0
Total: 471ms, $0.016
Execution #2:
- Tool execution: 229ms, $0
- Fast monitoring: 43ms, $0.001
- Medium monitoring: (escalated) 175ms, $0.015
- Learning update: 11ms, $0
Total: 458ms, $0.016
[... repeated 48 more times ...]
Total Apprenticeship Cost:
50 executions × $0.016 = $0.80
Total time investment: ~23 seconds
Quality profile learned:
✓ Normal execution time: 225ms ± 15ms
✓ Normal output size: 1.2KB ± 200 bytes
✓ Normal resource usage: 45MB ± 5MB
✓ Success patterns: 50/50 perfect
✓ Failure patterns: 0/50 (none seen yet)
Комп' ютерна програма моніторингу(швидка модель з ескалацією):**Вартість навчання:**Це дорого!Але це також.
скінчено
дорогоціннеМи платимо, щоб дізнатися, як виглядає "добре.":
graph TB
Start[Graduated Workflow v1.0.0] --> Lite[Statistical Monitoring<br/>NO AI COST]
Lite --> Step1[Tool Call: fetch_data<br/>Time: 231ms ✓<br/>Size: 1.18KB ✓]
Lite --> Step2[Tool Call: process<br/>Time: 89ms ✓<br/>Memory: 44MB ✓]
Lite --> Step3[Tool Call: validate<br/>Pass: 100% ✓<br/>Rules: OK ✓]
Lite --> Step4[Tool Call: send_results<br/>Success: ✓<br/>Latency: 152ms ✓]
Step1 --> Check{Within<br/>profile?}
Step2 --> Check
Step3 --> Check
Step4 --> Check
Check --> |All ✓| Success[Execution Complete<br/>Cost: $0.00]
Check --> |Drift| Alert[Drift Detected!<br/>Re-engage monitoring]
Alert --> Diagnose[Monitoring AI<br/>Investigates]
Diagnose --> Fix[Auto-Fix or Escalate]
style Lite stroke:#388e3c,stroke-width:2px
style Success stroke:#2e7d32,stroke-width:3px
style Alert stroke:#f57c00,stroke-width:2px
Це знання постійне.
class GraduatedWorkflow:
def __init__(self, workflow_id: str, quality_profile: QualityProfile):
self.workflow_id = workflow_id
self.monitoring_tier = MonitoringTier.STATISTICAL # FREE!
self.quality_profile = quality_profile
self.drift_detector = DriftDetector(quality_profile)
async def execute_tool(self, tool_name: str, params: dict):
"""Execute with lightweight statistical monitoring"""
# Execute the tool (same as before)
start_time = time.time()
result = await call_tool(tool_name, params)
execution_time = time.time() - start_time
# NO AI MONITORING - Just compare to profile
metrics = {
"execution_time": execution_time,
"result_size": len(str(result)),
"timestamp": datetime.now()
}
# Statistical drift detection (milliseconds, zero cost)
drift_score = self.drift_detector.check(tool_name, metrics)
if drift_score < 0.1: # Within normal bounds
return result, MonitoringResult(
status="OK",
cost=0.0, # FREE!
drift_score=drift_score
)
# DRIFT DETECTED - Re-engage monitoring AI
alert = await self.handle_drift(tool_name, metrics, drift_score)
return result, alert
async def handle_drift(self, tool_name, metrics, drift_score):
"""Drift detected - engage monitoring AI to diagnose"""
# This is the ONLY time we pay for AI monitoring
diagnosis = await self.monitoring_ai.investigate_drift(
tool_name=tool_name,
current_metrics=metrics,
expected_profile=self.quality_profile.get_profile(tool_name),
drift_score=drift_score,
recent_executions=self.get_recent_executions(tool_name, n=10)
)
# Return diagnosis with recommended action
return DriftAlert(
drift_score=drift_score,
diagnosis=diagnosis,
recommended_action=diagnosis.action,
cost=diagnosis.cost # Only paid when drift detected!
)
Режим випуску 2: (легкий монітор)
Execution #51 (graduated):
- Tool execution: 228ms, $0
- Statistical monitoring: 0.3ms, $0
- AI monitoring: SKIPPED, $0
Total: 228ms, $0.00
Execution #52:
- Tool execution: 231ms, $0
- Statistical monitoring: 0.3ms, $0
- AI monitoring: SKIPPED, $0
Total: 231ms, $0.00
[... repeated 948 more times ...]
Execution #1000:
- Tool execution: 226ms, $0
- Statistical monitoring: 0.3ms, $0
- AI monitoring: SKIPPED, $0
Total: 226ms, $0.00
Total Cost (Executions 51-1000):
950 executions × $0.00 = $0.00
Drift detections: 0
AI monitoring engaged: 0 times
Total monitoring cost: $0.00
Після 50 успішних виконання з послідовною якістю, робочий потік
випускники
Ми перейшли від 0.016 на виконання до 0.0.0.
Вартість режиму Apprentic: 160/ day (для 50 виконання)Вартість режиму випуску: 0/ день (за 9 9 950 виконання)
Execution #1,247:
- Tool execution: 228ms, $0
- Statistical monitoring: 0.3ms, $0
- Drift score: 0.02 (normal)
- AI monitoring: SKIPPED
Execution #1,248:
- Tool execution: 892ms, $0 ← WHOA
- Statistical monitoring: 0.3ms, $0
- Drift score: 0.47 (DRIFT DETECTED!)
- AI monitoring: ENGAGED!
Monitoring AI investigation:
Analyzing drift...
✓ Execution time: 892ms (expected: 225ms ± 15ms)
✓ Drift magnitude: 296% increase
✓ Result correctness: Unchanged
✓ Output size: Normal
✓ Error rate: 0%
Diagnosis: External API latency increased
Evidence:
- fetch_data tool calling external API
- API response time: 750ms (was 100ms)
- API behavior changed but output still valid
Trend analysis:
- Last 5 executions: 892ms, 876ms, 901ms, 888ms, 894ms
- Consistent elevated latency
- Not intermittent - PERMANENT SHIFT
Recommended action: UPDATE_PROFILE
Reason: API has permanently slowed, workflow still correct
Cost: $0.025 (one-time)
Фаза 3: виявлення дрейфів і повторне створення
class DriftDetector:
async def handle_consistent_drift(
self,
tool_name: str,
diagnosis: Diagnosis
):
"""Handle drift that represents a new normal"""
if diagnosis.action == "UPDATE_PROFILE":
# The world changed, workflow is still correct
# Update our expectations
self.quality_profile.update_baseline(
tool_name=tool_name,
new_metrics=diagnosis.new_normal,
reason=diagnosis.reason
)
logger.info(
f"Quality profile updated for {tool_name}: "
f"{diagnosis.reason}"
)
return ProfileUpdateResult(
action="updated",
cost=diagnosis.cost, # One-time
future_cost=0.0 # Back to free monitoring
)
Але ось тут стає цікаво.
Drift detection: 1 event
AI investigation: 1 × $0.025 = $0.025
Profile update: 1 × $0 = $0
Total: $0.025 (one-time)
Future executions: Back to $0.00 each
Що стається, коли щось змінюється?
Execution #2,847:
- Tool execution: 229ms, $0
- Validation pass: 100%
- Drift score: 0.03 (normal)
Execution #2,848:
- Tool execution: 231ms, $0
- Validation pass: 94% ← Hmm
- Drift score: 0.12 (minor drift)
- AI monitoring: ENGAGED (Tier 1)
Fast Monitoring AI:
Quick check: Validation pass rate dropped from 100% to 94%
Confidence: 0.72 (not confident - ESCALATE)
Cost: $0.001
Medium Monitoring AI:
Detailed analysis:
- Last 10 executions: 94%, 92%, 100%, 89%, 91%, 100%, 87%, 93%, 100%, 85%
- Trend: DEGRADING (5% drop over 10 runs)
- Root cause: Input data quality decreased
- Workflow correctness: Still OK, but fragile
- Recommendation: EVOLVE_WORKFLOW
Confidence: 0.94 (high confidence)
Cost: $0.015
Evolution Triggered:
Strategy: Strengthen validation rules
Approach: Add input sanitization step
Estimated improvement: +8% validation pass rate
Mutation generated:
- New step: sanitize_input (before process)
- Tool: input_sanitizer_v1.0.0 (generated)
- Expected impact: Reduce invalid inputs by 80%
Cost: $0.050 (one-time generation)
Система виявила постійний зсув і змінила свій профіль якості:
Глибокий аналіз носія (0. 015)
Execution #4,521:
- Tool execution: 234ms, $0
- Result: SUCCESS
- Drift score: 0.02 (normal)
Execution #4,522:
- Tool execution: EXCEPTION
- Error: "API returned 500 Internal Server Error"
- Drift score: 1.0 (MAXIMUM DRIFT!)
- AI monitoring: ENGAGED (All tiers)
Fast Monitoring AI:
Quick check: CRITICAL FAILURE
Confidence: 1.0 (certain)
Escalate: YES
Cost: $0.001
Medium Monitoring AI:
Analysis: External API is down
Confidence: 0.98
Escalate: YES (need recovery strategy)
Cost: $0.015
Expensive Monitoring AI (claude-3.5-sonnet):
Diagnosis:
- API: example-api.com/v1/process
- Status: HTTP 500 (Internal Server Error)
- Duration: Started 3 minutes ago
- Impact: ALL workflows using this API
- Historical pattern: API has had 3 outages in last 6 months
Recommended actions:
1. IMMEDIATE: Add retry logic with exponential backoff
2. SHORT-TERM: Implement circuit breaker pattern
3. LONG-TERM: Add fallback to alternative API
Implementation:
- Generate retry_wrapper tool with 3 attempts, exp backoff
- Wrap existing API call with retry logic
- Add circuit breaker after 5 consecutive failures
- Estimated downtime reduction: 95%
Mutation generated:
- New workflow v1.1.0 with resilience
- Tools added: retry_wrapper, circuit_breaker
- Fallback: graceful degradation if API unavailable
Cost: $0.125 (comprehensive analysis + mutation)
Автоматично створено виправлення (0. 050 мутації)
Escalated через tieers (0. 015)
Створена стійка версія робочого потоку
Workflow: process_customer_order
Execution frequency: 50,000 times/day
Uptime requirement: 99.9%
Cost per execution:
- Workflow execution: $0 (internal tools)
- AI monitoring: $0.01 (watch every execution)
Daily cost: 50,000 × $0.01 = $500/day
Annual cost: $182,500/year
Value provided:
- Catches maybe 10 issues per year
- Cost per issue caught: $18,250
- Most issues: False positives or minor
Phase 1: Apprenticeship (Days 1-2)
- Executions: 100 (learning phase)
- Cost per execution: $0.016
- Total: $1.60
Phase 2: Graduated Operation (Days 3-365)
- Executions: 50,000 × 363 days = 18,150,000
- Cost per execution: $0.00
- Total: $0.00
Drift events (estimated: 12 per year)
- Minor drift (profile update): 8 × $0.025 = $0.20
- Quality degradation (evolution): 3 × $0.066 = $0.20
- Critical failure (major fix): 1 × $0.141 = $0.14
- Total: $0.54
Annual total: $1.60 + $0.00 + $0.54 = $2.14
Давайте порахуємо виробничу систему:
Сценарій: Порядок обробки порядку
Прихід ApprenticationCommentВнески: 182 498 доларів на рік.
МОЖЛИВО ПРАЦЮЄ.
class ProactiveEvolver:
def analyze_graduated_workflow(self, workflow_id: str):
"""Analyze trends in graduated workflows"""
recent_executions = self.get_executions(workflow_id, days=30)
# Statistical analysis of trends
trends = {
"latency": self.analyze_latency_trend(recent_executions),
"quality": self.analyze_quality_trend(recent_executions),
"resource": self.analyze_resource_trend(recent_executions),
"success_rate": self.analyze_success_trend(recent_executions)
}
# Detect gradual degradation BEFORE it becomes a problem
warnings = []
if trends["latency"].slope > 0.05: # 5% increase per week
warnings.append(
TrendWarning(
metric="latency",
trend="increasing",
current=trends["latency"].current,
projected=trends["latency"].project_forward(weeks=4),
severity="medium",
action="consider_optimization"
)
)
if trends["quality"].slope < -0.02: # 2% decrease per week
warnings.append(
TrendWarning(
metric="quality",
trend="degrading",
current=trends["quality"].current,
projected=trends["quality"].project_forward(weeks=4),
severity="high",
action="proactive_evolution_recommended"
)
)
return TrendAnalysis(
workflow_id=workflow_id,
trends=trends,
warnings=warnings,
cost=0.0 # Statistical analysis, no AI cost
)
Таємна зброя: Проактивна еволюція
Workflow: process_customer_order
Status: GRADUATED
Execution count: 456,231 (since graduation)
Trend Analysis (30-day window):
Latency trend:
- Current: 234ms average
- 30 days ago: 198ms average
- Slope: +1.2ms per day
- Projection (30 days): 270ms
- Severity: MEDIUM
- Cause: Gradual database growth (not a bug)
Quality trend:
- Current: 99.2% validation pass
- 30 days ago: 99.8% validation pass
- Slope: -0.02% per day
- Projection (30 days): 98.6%
- Severity: HIGH
- Cause: Input data quality degrading
Action recommended: PROACTIVE_EVOLUTION
Rationale:
The workflow is still within acceptable bounds NOW,
but trends suggest it will degrade significantly in
~30 days. Evolve now while we have time, rather than
wait for production incident.
Proposed evolution:
1. Add input sanitization layer
2. Optimize database queries (add index)
3. Implement caching for frequent reads
Estimated impact:
- Latency: 234ms → 180ms (23% faster)
- Quality: 99.2% → 99.9% (0.7% improvement)
- Resource cost: -15% (caching reduces DB load)
Cost: $0.085 (one-time evolution)
ROI: Prevents future incident, improves performance
Ось де Авантюра стає дійсно цікавою.
Мова йде не лише про заощадження грошей на моніторинг.
**Мова йде про про активну еволюцію, яка базується на тенденційах.**Тенденційне виявлення: до розриву
**Приклад:**Це Святий Грайл.
Традиційне спостереження:
Реактивний (чекати на помилку, а потім виправити)
class ResourceAwareEvolver:
async def optimize_for_resources(
self,
workflow_id: str,
constraint: ResourceConstraint
):
"""Evolve workflow to fit resource limits"""
current_usage = self.get_resource_usage(workflow_id)
if constraint.type == "MEMORY" and current_usage.memory > constraint.limit:
# Memory pressure - evolve to use less memory
analysis = await self.monitoring_ai.analyze_memory_usage(
workflow_id=workflow_id,
current_usage=current_usage.memory,
limit=constraint.limit
)
if analysis.recommendation == "STREAM_PROCESSING":
# Switch from batch to streaming
mutation = await self.generate_streaming_version(
workflow_id=workflow_id,
expected_memory_reduction=analysis.expected_savings
)
return mutation
elif constraint.type == "SCALE" and current_usage.instances < constraint.desired:
# Need more throughput - can we scale horizontally?
analysis = await self.monitoring_ai.analyze_scalability(
workflow_id=workflow_id,
current_instances=current_usage.instances,
desired_instances=constraint.desired
)
if analysis.bottleneck:
# Found a bottleneck preventing scale
mutation = await self.remove_bottleneck(
workflow_id=workflow_id,
bottleneck=analysis.bottleneck
)
return mutation
Спостереження за Apprentication:
Event: Black Friday sale starting in 12 hours
Expected traffic: 10x normal
Current capacity: 5,000 orders/hour
Required capacity: 50,000 orders/hour
Workflow: process_customer_order (currently graduated)
Auto-scaling analysis:
Current: 10 instances handling 5,000 orders/hour (500 each)
Naive scale: 100 instances needed (10x)
Problem: Shared database bottleneck limits to 60 instances
Bottleneck detected:
- Database connection pool: Max 100 connections
- Current usage: 60/100 (10 instances × 6 connections each)
- Scaling to 100 instances would need 600 connections
- Current limit: 100
Solution: Reduce connections per instance
Mutation strategy:
1. Add connection pooling optimization
2. Implement read replicas for queries
3. Add caching layer for frequent lookups
4. Reduce per-instance connections: 6 → 2
Result:
- 100 instances × 2 connections = 200 connections
- Add 10 read replicas for queries
- 90% of queries hit cache
- Net database load: Actually DECREASES
New capacity:
- 100 instances × 500 orders/hour = 50,000 orders/hour
- Database load: LOWER than before
- Cost: One-time evolution ($0.125)
Mutation generated: process_customer_order v1.2.0
Status: TESTING (shadow mode)
Expected savings: Scale to 100x without database upgrade
ROI: Infinite (prevents $50K+ emergency database scaling)
Проактивний (виявлення тенденцій, виправлення перед невдачею)
Автоматичний прибуток: Еволюція роботи з ресурсами
Починається чорна п'ятниця
Проактивна еволюція усуває вузьке місце
from dataclasses import dataclass, field
from typing import Dict, List, Optional
import numpy as np
from scipy import stats
@dataclass
class MetricDistribution:
"""Statistical distribution of a metric"""
mean: float
std_dev: float
median: float
percentile_95: float
percentile_99: float
samples: List[float] = field(default_factory=list)
def is_within_bounds(self, value: float, sigma: float = 3.0) -> bool:
"""Check if value is within N standard deviations"""
lower = self.mean - (sigma * self.std_dev)
upper = self.mean + (sigma * self.std_dev)
return lower <= value <= upper
def drift_score(self, value: float) -> float:
"""Calculate drift score (0.0 = perfect, 1.0 = extreme)"""
if self.std_dev == 0:
return 0.0 if value == self.mean else 1.0
# Z-score normalized to 0-1 range
z_score = abs((value - self.mean) / self.std_dev)
# Sigmoid to bound between 0 and 1
return 1.0 / (1.0 + np.exp(-z_score + 3))
@dataclass
class QualityProfile:
"""Learned quality profile for a workflow"""
workflow_id: str
tool_metrics: Dict[str, Dict[str, MetricDistribution]] = field(default_factory=dict)
execution_count: int = 0
graduated: bool = False
graduation_threshold: int = 50
def update(self, tool_name: str, metrics: Dict[str, float]):
"""Update profile with new execution metrics"""
if tool_name not in self.tool_metrics:
self.tool_metrics[tool_name] = {}
for metric_name, value in metrics.items():
if metric_name not in self.tool_metrics[tool_name]:
self.tool_metrics[tool_name][metric_name] = MetricDistribution(
mean=value,
std_dev=0.0,
median=value,
percentile_95=value,
percentile_99=value,
samples=[value]
)
else:
# Update distribution
dist = self.tool_metrics[tool_name][metric_name]
dist.samples.append(value)
# Recalculate statistics
dist.mean = np.mean(dist.samples)
dist.std_dev = np.std(dist.samples)
dist.median = np.median(dist.samples)
dist.percentile_95 = np.percentile(dist.samples, 95)
dist.percentile_99 = np.percentile(dist.samples, 99)
self.execution_count += 1
# Check for graduation
if not self.graduated and self.execution_count >= self.graduation_threshold:
self.graduated = True
def check_drift(self, tool_name: str, metrics: Dict[str, float]) -> Dict[str, float]:
"""Check for drift in metrics (returns drift scores)"""
if tool_name not in self.tool_metrics:
return {} # No profile yet
drift_scores = {}
for metric_name, value in metrics.items():
if metric_name in self.tool_metrics[tool_name]:
dist = self.tool_metrics[tool_name][metric_name]
drift_scores[metric_name] = dist.drift_score(value)
return drift_scores
from enum import Enum
from typing import Optional
class MonitoringTier(Enum):
FULL = "full" # Apprentice mode - expensive
STATISTICAL = "statistical" # Graduate mode - free
DRIFT_INVESTIGATION = "drift" # Re-engaged monitoring
class MonitoringManager:
def __init__(self):
self.fast_model = "gemma2:2b"
self.medium_model = "llama3:8b"
self.expensive_model = "claude-3.5-sonnet"
async def monitor_execution(
self,
tier: MonitoringTier,
tool_name: str,
metrics: Dict[str, float],
quality_profile: Optional[QualityProfile] = None
) -> MonitoringResult:
"""Route monitoring based on tier"""
if tier == MonitoringTier.FULL:
# Apprentice mode - learn everything
return await self.full_monitoring(tool_name, metrics, quality_profile)
elif tier == MonitoringTier.STATISTICAL:
# Graduate mode - just check drift
if quality_profile is None:
raise ValueError("Quality profile required for statistical monitoring")
drift_scores = quality_profile.check_drift(tool_name, metrics)
max_drift = max(drift_scores.values()) if drift_scores else 0.0
if max_drift > 0.15: # Drift threshold
# Escalate to drift investigation
return await self.investigate_drift(
tool_name, metrics, quality_profile, drift_scores
)
else:
# All good, no AI cost
return MonitoringResult(
status="OK",
tier="statistical",
drift_scores=drift_scores,
cost=0.0
)
elif tier == MonitoringTier.DRIFT_INVESTIGATION:
# Drift detected - investigate
return await self.investigate_drift(
tool_name, metrics, quality_profile, {}
)
async def full_monitoring(
self,
tool_name: str,
metrics: Dict[str, float],
quality_profile: Optional[QualityProfile]
) -> MonitoringResult:
"""Full AI-powered monitoring (expensive)"""
# Tier 1: Fast check
fast_result = await self.fast_check(tool_name, metrics, quality_profile)
if fast_result.confidence > 0.95:
return MonitoringResult(
status=fast_result.status,
tier="fast",
confidence=fast_result.confidence,
cost=0.001
)
# Tier 2: Medium analysis
medium_result = await self.medium_check(tool_name, metrics, quality_profile)
if medium_result.confidence > 0.90:
return MonitoringResult(
status=medium_result.status,
tier="medium",
confidence=medium_result.confidence,
analysis=medium_result.analysis,
cost=0.016
)
# Tier 3: Expensive investigation
expensive_result = await self.expensive_check(tool_name, metrics, quality_profile)
return MonitoringResult(
status=expensive_result.status,
tier="expensive",
confidence=expensive_result.confidence,
analysis=expensive_result.analysis,
recommendations=expensive_result.recommendations,
cost=0.125
)
class GraduationController:
def __init__(self):
self.monitoring_manager = MonitoringManager()
self.profiles: Dict[str, QualityProfile] = {}
async def execute_workflow(
self,
workflow_id: str,
workflow_fn: Callable,
*args,
**kwargs
):
"""Execute workflow with appropriate monitoring tier"""
# Get or create quality profile
if workflow_id not in self.profiles:
self.profiles[workflow_id] = QualityProfile(
workflow_id=workflow_id,
graduation_threshold=50 # Configurable
)
profile = self.profiles[workflow_id]
# Determine monitoring tier
if not profile.graduated:
tier = MonitoringTier.FULL
else:
tier = MonitoringTier.STATISTICAL
# Execute with monitoring
result = await self.monitored_execution(
workflow_id=workflow_id,
workflow_fn=workflow_fn,
tier=tier,
profile=profile,
args=args,
kwargs=kwargs
)
return result
async def monitored_execution(
self,
workflow_id: str,
workflow_fn: Callable,
tier: MonitoringTier,
profile: QualityProfile,
args: tuple,
kwargs: dict
):
"""Execute workflow with instrumentation"""
# Capture baseline
start_time = time.time()
start_memory = self.get_memory_usage()
# Execute workflow
try:
result = await workflow_fn(*args, **kwargs)
status = "success"
except Exception as e:
result = None
status = "error"
error = e
# Capture metrics
execution_time = time.time() - start_time
memory_used = self.get_memory_usage() - start_memory
metrics = {
"execution_time": execution_time,
"memory_used": memory_used,
"status": status
}
# Monitor based on tier
monitoring_result = await self.monitoring_manager.monitor_execution(
tier=tier,
tool_name=workflow_id,
metrics=metrics,
quality_profile=profile
)
# Update profile
if tier == MonitoringTier.FULL:
profile.update(workflow_id, metrics)
if profile.graduated:
logger.info(
f"Workflow {workflow_id} GRADUATED after "
f"{profile.execution_count} successful executions"
)
# Handle drift if detected
if monitoring_result.status == "DRIFT":
await self.handle_drift(workflow_id, monitoring_result, profile)
return WorkflowResult(
result=result,
metrics=metrics,
monitoring=monitoring_result,
profile=profile
)
Давайте поглянемо на конкретну реалізацію:
: Ollama (локальний) + OpenAI/ Anthropic (хмара)
мутація робочого потоку
: Існуюча еволюційна система Аналіз тенденцій
: Аналіз серії часу (statsmodels) Складна частина - це не технологія.
Трудність полягає в зміні нашого мислення щодо моніторингу. От: "Все объективно, всегда"
З: "Реактивна реакція інциденту"
Company: Global e-commerce platform
Workflows: 10,000+ unique workflows
Total executions: 100M per day
Traditional monitoring cost:
100M executions × $0.01 = $1M/day
Annual: $365M
Apprenticeship approach:
New workflows per day: ~10
Apprenticeship cost: 10 × $1.60 = $16/day
Drift investigations: ~50/day × $0.05 = $2.50/day
Major evolutions: ~5/day × $0.15 = $0.75/day
Daily cost: $19.25
Annual: $7,026
Savings: $364,992,974 per year
ROI: 51,971:1
До: "Революція, заснована на проактивних трендах"
Від: "Пороги стоїстої якості"
Майбутнє: випускні робочі дані на шкалі
Безперервна оптимізація
Виявлено пристарілий крах → Автооптимізувати
2.
Невдалі мутації поширюються як правила уникнення:
Цей вид жуків ніколи більше не повториться.
Відступ заборонено перед тим, як це станеться
Еволюція з нульовим спадомВиконаний робочий потік можна розвинути без ризику:
Тінь, що відкриває нову версію паралельноПорівняти профілі якості
**Градійний вихід на основі впевненості.**Миттєве повернення, якщо виявлено відсутнє
**Безперервна доставка для робітного потоку ШІ.**Заключення: тренувальне колесо, яке знає, коли виходити
**Шаблон Apprentication простий:**Спочатку старанно тренуйтеся
- Довідайтесь, як виглядає "добре"
Випускник до незалежності
Стежте за дрейфуванням
- Виправити тенденції перед тим, як вони зламаються
Шаблон Apprentication інвертує ось що:
Наполегливе спостереження за тим, що важливо (навчитися і дрейфувати)
Нуль моніторинг того місця, де він не (завершена операція)
Проактивна еволюція, заснована на тенденційних тенденційах.
Результат:
99. 99% - зменшення вартості моніторингуКраща якість через вивчені профілі
Пропорційна профілактика проблем
src/quality_profile.pyПостійне поліпшення без людейsrc/monitoring_manager.pyТак має працювати робочий потік.src/graduation_controller.pyApprentice → Випускник → Експерт → Вчительsrc/drift_detector.pyЯк і в людей.src/proactive_evolver.pyОкрім тренувальних коліс, вони знають, коли з'явитися самі.Спробуйте самі
from src import GraduationController
controller = GraduationController()
# Define your workflow
async def my_workflow(input_data):
# Your workflow logic here
result = await process_data(input_data)
return result
# Execute with automatic graduation
result = await controller.execute_workflow(
workflow_id="my_workflow_v1",
workflow_fn=my_workflow,
input_data={"foo": "bar"}
)
print(f"Status: {result.monitoring.status}")
print(f"Cost: ${result.monitoring.cost:.4f}")
print(f"Graduated: {result.profile.graduated}")
**Хочеш реалізувати Apprentication Special у вашому робочому процесі?**Сховище: https: //github.com/scottgal/ method. dseФайли ключів: - Навчання профілю якості- Tied спостереження
Приклад:
Перші 50 виконання:
© 2026 Scott Galloway — Unlicense — All content and source code on this site is free to use, copy, modify, and sell.