This is a viewer only at the moment see the article on how this works.
To update the preview hit Ctrl-Alt-R (or ⌘-Alt-R on Mac) or Enter to refresh. The Save icon lets you save the markdown file to disk
This is a preview from the server running through my markdig pipeline
Monday, 13 January 2025
ЗАУВАЖЕННЯ: Цю стару статтю я забула видати.
А ось і він, насолоджуйся!
Я оновила его, но, возможно, какие-то проблемы я пропустила.
У цій статті я покажу вам, як правильно реалізовувати їх за допомогою сучасних інструментів NET: Marten для передачі подій, Dapper для оптимізованих запитів, і Mediatr, щоб все було організовано.
**Я також покажу вам "незастарілу" альтернативу, і пояснити, чому спроби змішати Подій Суертінг з нечинністю кешу вручну є жахливою ідеєю.**Вступ
**CQRS (Командований запит відповідальності Сегрегація) і Створення подій є двома різними шаблонами, які чудово працюють між собою:**CQRS
Відокремлюючи ваші моделі читання від ваших моделей письма
Розподіл подій
Коли ви правильно виконуєте роботу з інструментами на зразок Marten, ви отримуєте:
Повний слід перевірки для кожної зміни
Можливість відновлення стану у будь- який момент часуАвтоматичні прогнози зчитування моделейПриродне вписування з дизайном, що орієнтований на домен.
належним чином
// Write Model - Commands that change state
public record CreateBlogPostCommand(string Title, string Content, string AuthorId);
// Read Model - DTOs optimised for display
public class BlogPostListItemDto
{
public Guid Id { get; set; }
public string Title { get; set; }
public string AuthorName { get; set; }
public DateTime PublishedDate { get; set; }
public int CommentCount { get; set; }
}
Що таке CQRS?
// Traditional: Store current state
public class BlogPost
{
public Guid Id { get; set; }
public string Title { get; set; } // Current title
public bool IsPublished { get; set; } // Current status
}
// Event Sourcing: Store the events
public record BlogPostCreated(Guid Id, string Title, string Content, DateTime CreatedAt);
public record BlogPostTitleChanged(Guid Id, string OldTitle, string NewTitle, DateTime ChangedAt);
public record BlogPostPublished(Guid Id, DateTime PublishedAt);
В основі CQRS мається на увазі використання різних моделей для читання і запису даних:
**Читача сторона є ненормальною і оптимізованою для дисплею.**Досить просто, але істинна CQRS означає, що вони абсолютно різні шляхи через вашу програму.
**Що відбувається?**Замість збереження поточного стану, ви зберігаєте події, які призвели до цього стану:
**Поточний стан зароджується повторенням подій.**Це дає вам повну історію всього, що коли-небудь відбувалося у вашій системі.
**Чому слід використовувати інформацію про події, пов'язані з CQRS?**Повний маршрут перевірки
**: Кожна зміна записана.**Ідеально для фінансових систем, охорони здоров'я, або будь-де, де вам потрібно довести, що сталося і коли.
**"Як виглядав цей блог минулого вівторка?" - дрібниця - просто повторіть події до того моменту.**Зневадження
**: Регенерувати вади, повторюючи точну послідовність подій, що їх спричиняли.**Бізнес- розвідка
**: Створити нові звіти з історичних даних без запуску міграцій.**Події вже там.
Влаштувати натуральні CQRS: Події, звичайно ж, відокремлені від запису (завершувати події) від читання (проблемних проекцій).
Простий CRUDЯкщо ти просто зберігаєш і отримуєш дані, то Подією буде сміховинна над головою.Маленька команда без досвіду
: Крива навчання дуже крута.
МартенCity in Germany
dotnet add package Marten
dotnet add package Marten.AspNetCore
Program.cs:
var builder = WebApplication.CreateBuilder(args);
builder.Services.AddMarten(options =>
{
options.Connection(builder.Configuration.GetConnectionString("Marten")!);
// Register event types
options.Events.AddEventType<BlogPostCreated>();
options.Events.AddEventType<BlogPostPublished>();
options.Events.AddEventType<BlogPostTitleChanged>();
options.Events.AddEventType<CommentAdded>();
// Configure async projections
options.Projections.Add<BlogPostProjection>(ProjectionLifecycle.Async);
});
builder.Services.AddMediatR(cfg =>
cfg.RegisterServicesFromAssembly(typeof(Program).Assembly));
var app = builder.Build();
Чому Мартен?
flowchart TB
subgraph Client["Client Application"]
UI[User Interface]
end
subgraph Commands["Command Side (Writes)"]
CMD[Commands] --> CMDH[Command Handlers]
CMDH --> MARTEN[Marten Session]
MARTEN --> EVENTS[(Event Store)]
end
subgraph Background["Async Processing"]
EVENTS -.->|Event Stream| DAEMON[Marten Async Daemon]
DAEMON --> PROJ[Projections]
PROJ --> READDB[(Read Models)]
end
subgraph Queries["Query Side (Reads)"]
QRY[Queries] --> QRYH[Query Handlers with Dapper]
QRYH --> READDB
end
UI -->|Commands| CMD
UI -->|Queries| QRY
classDef commandStyle fill:none,stroke:#e63946,stroke-width:3px
classDef queryStyle fill:none,stroke:#457b9d,stroke-width:3px
classDef dataStyle fill:none,stroke:#2a9d8f,stroke-width:3px
class CMD,CMDH,MARTEN commandStyle
class QRY,QRYH queryStyle
class EVENTS,READDB dataStyle
Збудований на PostgreSQL (ви вже це знаєте)
Ось як усе співвідноситься між собою:
// Always past tense - these things have happened
public record BlogPostCreated(
Guid BlogPostId,
string Title,
string Content,
string AuthorId,
DateTime CreatedAt
);
public record BlogPostPublished(
Guid BlogPostId,
DateTime PublishedAt
);
public record BlogPostTitleChanged(
Guid BlogPostId,
string OldTitle,
string NewTitle,
DateTime ChangedAt
);
public record CommentAdded(
Guid BlogPostId,
Guid CommentId,
string Author,
string Content,
DateTime CreatedAt
);
Точки ключів:
Визначення подій
public class BlogPost
{
// Marten requires an Id property
public Guid Id { get; set; }
// Current state (private setters)
public string Title { get; private set; } = string.Empty;
public string Content { get; private set; } = string.Empty;
public string AuthorId { get; private set; } = string.Empty;
public bool IsPublished { get; private set; }
public DateTime? PublishedDate { get; private set; }
private readonly List<Comment> _comments = new();
public IReadOnlyList<Comment> Comments => _comments.AsReadOnly();
// Apply methods - called by Marten when replaying events
public void Apply(BlogPostCreated e)
{
Id = e.BlogPostId;
Title = e.Title;
Content = e.Content;
AuthorId = e.AuthorId;
}
public void Apply(BlogPostPublished e)
{
IsPublished = true;
PublishedDate = e.PublishedAt;
}
public void Apply(BlogPostTitleChanged e)
{
Title = e.NewTitle;
}
public void Apply(CommentAdded e)
{
_comments.Add(new Comment
{
Id = e.CommentId,
Author = e.Author,
Content = e.Content,
CreatedAt = e.CreatedAt
});
}
// Business logic methods that produce events
public static BlogPostCreated Create(string title, string content, string authorId)
{
if (string.IsNullOrWhiteSpace(title))
throw new ArgumentException("Title is required");
return new BlogPostCreated(
Guid.NewGuid(),
title,
content,
authorId,
DateTime.UtcNow
);
}
public BlogPostPublished Publish()
{
if (IsPublished)
throw new InvalidOperationException("Post is already published");
return new BlogPostPublished(Id, DateTime.UtcNow);
}
public BlogPostTitleChanged ChangeTitle(string newTitle)
{
if (string.IsNullOrWhiteSpace(newTitle))
throw new ArgumentException("Title cannot be empty");
if (newTitle == Title)
throw new InvalidOperationException("New title is the same as current title");
return new BlogPostTitleChanged(Id, Title, newTitle, DateTime.UtcNow);
}
}
public class Comment
{
public Guid Id { get; set; }
public string Author { get; set; } = string.Empty;
public string Content { get; set; } = string.Empty;
public DateTime CreatedAt { get; set; }
}
Події - це незмінні записи, які описують події, що відбулися:
// Define commands
public record CreateBlogPostCommand(
string Title,
string Content,
string AuthorId
) : IRequest<Guid>;
public record PublishBlogPostCommand(Guid BlogPostId) : IRequest;
public record ChangeBlogPostTitleCommand(
Guid BlogPostId,
string NewTitle
) : IRequest;
// Handler for creating a blog post
public class CreateBlogPostHandler : IRequestHandler<CreateBlogPostCommand, Guid>
{
private readonly IDocumentSession _session;
public CreateBlogPostHandler(IDocumentSession session)
{
_session = session;
}
public async Task<Guid> Handle(CreateBlogPostCommand request, CancellationToken cancellationToken)
{
// Create the event
var created = BlogPost.Create(
request.Title,
request.Content,
request.AuthorId
);
// Start a new event stream
_session.Events.StartStream<BlogPost>(created.BlogPostId, created);
await _session.SaveChangesAsync(cancellationToken);
return created.BlogPostId;
}
}
// Handler for publishing
public class PublishBlogPostHandler : IRequestHandler<PublishBlogPostCommand>
{
private readonly IDocumentSession _session;
public PublishBlogPostHandler(IDocumentSession session)
{
_session = session;
}
public async Task Handle(PublishBlogPostCommand request, CancellationToken cancellationToken)
{
// Load the aggregate by replaying its events
var blogPost = await _session.Events.AggregateStreamAsync<BlogPost>(
request.BlogPostId,
token: cancellationToken
);
if (blogPost == null)
throw new InvalidOperationException($"Blog post {request.BlogPostId} not found");
// Business logic produces new event
var published = blogPost.Publish();
// Append event to the stream
_session.Events.Append(request.BlogPostId, published);
await _session.SaveChangesAsync(cancellationToken);
}
}
// Handler for changing title
public class ChangeBlogPostTitleHandler : IRequestHandler<ChangeBlogPostTitleCommand>
{
private readonly IDocumentSession _session;
public ChangeBlogPostTitleHandler(IDocumentSession session)
{
_session = session;
}
public async Task Handle(ChangeBlogPostTitleCommand request, CancellationToken cancellationToken)
{
var blogPost = await _session.Events.AggregateStreamAsync<BlogPost>(
request.BlogPostId,
token: cancellationToken
);
if (blogPost == null)
throw new InvalidOperationException($"Blog post {request.BlogPostId} not found");
var titleChanged = blogPost.ChangeTitle(request.NewTitle);
_session.Events.Append(request.BlogPostId, titleChanged);
await _session.SaveChangesAsync(cancellationToken);
}
}
Багаті на ділові цілі
Зразок:
Застосувати методи оновлюють внутрішній стан
// Read model - optimised for queries
public class BlogPostReadModel
{
public Guid Id { get; set; }
public string Title { get; set; } = string.Empty;
public string Content { get; set; } = string.Empty;
public string AuthorId { get; set; } = string.Empty;
public DateTime CreatedAt { get; set; }
public DateTime? PublishedAt { get; set; }
public bool IsPublished { get; set; }
public int CommentCount { get; set; }
}
// Projection - tells Marten how to build read models from events
public class BlogPostProjection : MultiStreamProjection<BlogPostReadModel, Guid>
{
public BlogPostProjection()
{
// Identity tells Marten which stream each event belongs to
Identity<BlogPostCreated>(x => x.BlogPostId);
Identity<BlogPostPublished>(x => x.BlogPostId);
Identity<BlogPostTitleChanged>(x => x.BlogPostId);
Identity<CommentAdded>(x => x.BlogPostId);
}
// Apply methods - Marten calls these to update read models
public void Apply(BlogPostReadModel view, BlogPostCreated e)
{
view.Id = e.BlogPostId;
view.Title = e.Title;
view.Content = e.Content;
view.AuthorId = e.AuthorId;
view.CreatedAt = e.CreatedAt;
view.IsPublished = false;
}
public void Apply(BlogPostReadModel view, BlogPostPublished e)
{
view.IsPublished = true;
view.PublishedAt = e.PublishedAt;
}
public void Apply(BlogPostReadModel view, BlogPostTitleChanged e)
{
view.Title = e.NewTitle;
}
public void Apply(BlogPostReadModel view, CommentAdded e)
{
view.CommentCount++;
}
}
Мартен керує наполегливістю і повтором подій
Команди оброблятимуться додаванням записів подій до потоків даних:
// Define queries
public record GetRecentBlogPostsQuery(
int Count,
bool PublishedOnly
) : IRequest<List<BlogPostListItemDto>>;
public record GetBlogPostByIdQuery(Guid Id) : IRequest<BlogPostDetailDto?>;
// DTOs for display
public class BlogPostListItemDto
{
public Guid Id { get; set; }
public string Title { get; set; } = string.Empty;
public string AuthorName { get; set; } = string.Empty;
public DateTime CreatedAt { get; set; }
public DateTime? PublishedAt { get; set; }
public int CommentCount { get; set; }
public bool IsPublished { get; set; }
}
public class BlogPostDetailDto
{
public Guid Id { get; set; }
public string Title { get; set; } = string.Empty;
public string Content { get; set; } = string.Empty;
public string AuthorId { get; set; } = string.Empty;
public string AuthorName { get; set; } = string.Empty;
public DateTime CreatedAt { get; set; }
public DateTime? PublishedAt { get; set; }
public bool IsPublished { get; set; }
public List<CommentDto> Comments { get; set; } = new();
}
public class CommentDto
{
public Guid Id { get; set; }
public string Author { get; set; } = string.Empty;
public string Content { get; set; } = string.Empty;
public DateTime CreatedAt { get; set; }
}
// Query handlers
public class GetRecentBlogPostsHandler : IRequestHandler<GetRecentBlogPostsQuery, List<BlogPostListItemDto>>
{
private readonly string _connectionString;
public GetRecentBlogPostsHandler(IConfiguration config)
{
_connectionString = config.GetConnectionString("Marten")!;
}
public async Task<List<BlogPostListItemDto>> Handle(
GetRecentBlogPostsQuery request,
CancellationToken cancellationToken)
{
await using var connection = new NpgsqlConnection(_connectionString);
// Query the Marten-generated read model table
const string sql = @"
SELECT
bp.id AS Id,
bp.title AS Title,
u.name AS AuthorName,
bp.created_at AS CreatedAt,
bp.published_at AS PublishedAt,
bp.comment_count AS CommentCount,
bp.is_published AS IsPublished
FROM blog_post_read_models bp
LEFT JOIN users u ON bp.author_id = u.id
WHERE (@PublishedOnly = false OR bp.is_published = true)
ORDER BY
CASE WHEN bp.is_published THEN bp.published_at
ELSE bp.created_at
END DESC
LIMIT @Count";
var results = await connection.QueryAsync<BlogPostListItemDto>(
sql,
new
{
PublishedOnly = request.PublishedOnly,
Count = request.Count
});
return results.ToList();
}
}
public class GetBlogPostByIdHandler : IRequestHandler<GetBlogPostByIdQuery, BlogPostDetailDto?>
{
private readonly string _connectionString;
public GetBlogPostByIdHandler(IConfiguration config)
{
_connectionString = config.GetConnectionString("Marten")!;
}
public async Task<BlogPostDetailDto?> Handle(
GetBlogPostByIdQuery request,
CancellationToken cancellationToken)
{
await using var connection = new NpgsqlConnection(_connectionString);
const string sql = @"
SELECT
bp.id AS Id,
bp.title AS Title,
bp.content AS Content,
bp.author_id AS AuthorId,
u.name AS AuthorName,
bp.created_at AS CreatedAt,
bp.published_at AS PublishedAt,
bp.is_published AS IsPublished
FROM blog_post_read_models bp
LEFT JOIN users u ON bp.author_id = u.id
WHERE bp.id = @Id";
var post = await connection.QuerySingleOrDefaultAsync<BlogPostDetailDto>(
sql,
new { request.Id });
if (post == null)
return null;
// Get comments from event stream if needed
// Or maintain a separate comment read model
return post;
}
}
Потік:
Метод виклику (перевірка і повернення події)
[ApiController]
[Route("api/[controller]")]
public class BlogPostsController : ControllerBase
{
private readonly IMediator _mediator;
public BlogPostsController(IMediator mediator)
{
_mediator = mediator;
}
[HttpGet]
public async Task<ActionResult<List<BlogPostListItemDto>>> GetRecent(
[FromQuery] int count = 10,
[FromQuery] bool publishedOnly = true)
{
var query = new GetRecentBlogPostsQuery(count, publishedOnly);
var results = await _mediator.Send(query);
return Ok(results);
}
[HttpGet("{id}")]
public async Task<ActionResult<BlogPostDetailDto>> GetById(Guid id)
{
var query = new GetBlogPostByIdQuery(id);
var result = await _mediator.Send(query);
if (result == null)
return NotFound();
return Ok(result);
}
[HttpPost]
public async Task<ActionResult<Guid>> Create([FromBody] CreateBlogPostCommand command)
{
var postId = await _mediator.Send(command);
return CreatedAtAction(nameof(GetById), new { id = postId }, postId);
}
[HttpPost("{id}/publish")]
public async Task<ActionResult> Publish(Guid id)
{
await _mediator.Send(new PublishBlogPostCommand(id));
return NoContent();
}
[HttpPut("{id}/title")]
public async Task<ActionResult> ChangeTitle(
Guid id,
[FromBody] ChangeBlogPostTitleCommand command)
{
if (id != command.BlogPostId)
return BadRequest();
await _mediator.Send(command);
return NoContent();
}
}
Зберегти зміни
sequenceDiagram
participant Client
participant Controller
participant MediatR
participant CommandHandler
participant Marten
participant EventStore
participant AsyncDaemon
participant ReadDB
participant QueryHandler
Note over Client,ReadDB: Write Operation
Client->>Controller: POST /api/blogposts
Controller->>MediatR: Send CreateBlogPostCommand
MediatR->>CommandHandler: Handle command
CommandHandler->>CommandHandler: Validate & create event
CommandHandler->>Marten: StartStream(event)
Marten->>EventStore: Append event
EventStore-->>Marten: Success
Marten-->>CommandHandler: Success
CommandHandler-->>Controller: Return ID
Controller-->>Client: 201 Created
Note over AsyncDaemon,ReadDB: Background Processing
EventStore->>AsyncDaemon: New event available
AsyncDaemon->>AsyncDaemon: Apply projection
AsyncDaemon->>ReadDB: Update read model
ReadDB-->>AsyncDaemon: Updated
Note over Client,ReadDB: Read Operation
Client->>Controller: GET /api/blogposts
Controller->>MediatR: Send Query
MediatR->>QueryHandler: Handle query
QueryHandler->>ReadDB: SELECT with Dapper
ReadDB-->>QueryHandler: Return data
QueryHandler-->>Controller: Return DTOs
Controller-->>Client: 200 OK
Прочитати моделі і проектування
Проекції перетворюють події на ненормальні для читання моделі:
builder.Services.AddMarten(options =>
{
// This projection runs synchronously
options.Projections.Add<CriticalDataProjection>(ProjectionLifecycle.Inline);
// This projection runs async
options.Projections.Add<BlogPostProjection>(ProjectionLifecycle.Async);
});
Асинхронна служба Marten виконує події у тлі і продовжує читати моделі до сьогодні.
var blogPost = await _session.Events.AggregateStreamAsync<BlogPost>(id);
Сторона запиту за допомогою Dapper
У медіатрному режимі контролери чудово прості:
// Command handler
public class CreateBlogPostHandler : IRequestHandler<CreateBlogPostCommand, int>
{
private readonly ApplicationDbContext _context;
private readonly IMemoryCache _cache;
public async Task<int> Handle(CreateBlogPostCommand request, CancellationToken cancellationToken)
{
var blogPost = new BlogPost
{
Title = request.Title,
Content = request.Content,
AuthorId = request.AuthorId,
PublishedDate = DateTime.UtcNow
};
_context.BlogPosts.Add(blogPost);
await _context.SaveChangesAsync(cancellationToken);
// Manual cache invalidation - this is the tedious bit
_cache.Remove("recent-posts");
_cache.Remove($"author-posts-{request.AuthorId}");
_cache.Remove($"post-{blogPost.Id}");
return blogPost.Id;
}
}
// Query handler
public class GetRecentPostsHandler : IRequestHandler<GetRecentPostsQuery, List<BlogPostDto>>
{
private readonly string _connectionString;
private readonly IMemoryCache _cache;
public async Task<List<BlogPostDto>> Handle(GetRecentPostsQuery request, CancellationToken cancellationToken)
{
var cacheKey = "recent-posts";
if (_cache.TryGetValue<List<BlogPostDto>>(cacheKey, out var cached))
return cached!;
// Cache miss - query with Dapper
using var connection = new NpgsqlConnection(_connectionString);
var posts = (await connection.QueryAsync<BlogPostDto>(
"SELECT id, title, author_name, published_date FROM blog_posts ORDER BY published_date DESC LIMIT 10"
)).ToList();
_cache.Set(cacheKey, posts, TimeSpan.FromMinutes(5));
return posts;
}
}
**Частина 2: Напівступний підхід (обгрунтований Cache)**Правильно, так що ви читали про правильне складання подій і ви думаєте, що "це багато роботи."
**Справедливо.**Ось підхід до "інформального CQRS," який використовує більшість команд.
НалаштуванняКоманди запису до бази даних (за допомогою EF або Dapper)
Запити, прочитані з IMemoryCache або IDistribedCacheКоманди не мають дійових записів кешу після запису
Без розподілу подій, без проекції, без фонової служби асинхронізаціїЭто не правда.
Ви не отримуєте сліди аудиторії, світські запити чи автоматичні прогнози.
Швидкий приклад
Прості програми без складних вимог перевірки
Вам потрібна вистава, але ви не можете виправдати складність подій, що виникають
Тривалість декількох секунд прийнятнаПроблеми
Фіктивне кешування жорстке: Похибка за одним ключем кешу, а ви обслуговуєте застарілі дані.
**Кожна команда має знати, які кеші буде скасовано.**Немає сліду перевірки
**: У вас лише поточний стан.**Не могу доказать, что случилось и когда.
Немає світських запитів"Не можу спитати, як система виглядала вчора?"
: У IMemoryCache кожен сервер має власний кеш.
Зміна запиту, може зламати команду.
Частина 3: найгірша ідея - Початок події + Ручний кеш
// Don't do this!
public class PublishBlogPostHandler : IRequestHandler<PublishBlogPostCommand>
{
private readonly IDocumentSession _session;
private readonly IMemoryCache _cache; // ← BAD
public async Task Handle(PublishBlogPostCommand request, CancellationToken cancellationToken)
{
var blogPost = await _session.Events.AggregateStreamAsync<BlogPost>(request.BlogPostId);
var published = blogPost.Publish();
_session.Events.Append(request.BlogPostId, published);
await _session.SaveChangesAsync(cancellationToken);
// Manually invalidating cache while using Event Sourcing ← TERRIBLE IDEA
_cache.Remove($"post-{request.BlogPostId}");
_cache.Remove("recent-posts");
// Now you have:
// 1. Event in event store
// 2. Cache invalidated
// 3. But projection hasn't run yet!
// Queries will hit database before projection completes = stale data
}
}
Чому люди роблять це?
Вони думають: "Я просто додам нечинність кешу, щоб зробити читання швидшим і більш послідовним!"
Чому це жахливо
: Тепер у вас є дві системи, у яких синхронізовано читання моделей - прогнози Мартена і непрацездатність вашого кешу вручну.
Вони не синхронізуються.
Сховальний жах
Правильний підхід
Використовувати проекції Марте (синхронні або вбудовані)
Опитати моделі читання безпосередньоПрийняти кінцеву послідовність (зазвичай, це нормально)Використовувати вбудовані проекції, якщо вам справді потрібна негайна послідовність
© 2026 Scott Galloway — Unlicense — All content and source code on this site is free to use, copy, modify, and sell.