MediatR Yerine ‘martinothamar/Mediator’ Kütüphanesini İnceleyelim
Merhaba,
Bu içeriğimizde MediatR kütüphanesine alternatif olarak değerlendirebileceğimiz martinothamar/Mediator kütüphanesini hem teorik hem de pratik olarak inceleyecek, MediatR yerine nasıl konumlandığını en net şekilde ele alıyor olacağız.
martinothamar/Mediator Nedir?
martinothamar/Mediator, .NET dünyasında MediatR’a bir alternatif olarak geliştirilen lightweight, performans odaklı ve allocation-free olan bir Mediator Pattern implementasyonu kütüphanesidir. MediatR’a çok benzer API yapısına sahip olsa da daha modern, source-generator altyapısına sahip ve AOT dostu bir yapıya sahiptir.
Amacı;
- Request/Response iş akışlarını desteklemek,
- Notification/Publish-Subscribe olay akışlarını desteklemek,
- Pipeline davranışlarını desteklemektir.
Ana özellikleri ise;
- Source generator (compile-time code generation) kullanabilmektedir.
- Native AOT senaryolarına özellikle uyumlu olacak şekilde tasarlanmıştır.
- Reflection kullanmadan dependency injection registry işlemini ve IMediator implementasyonunu compile-time’da üretebilmektedir.
Kim tarafından geliştirilmiştir?
Kütüphanenin ana geliştiricisi Martin Othamar abimizdir. GitHub reposuda https://github.com/martinothamar/Mediator adresindedir.
.NET’de Kullanımı
Mediator kütüphanesini kullanabilmek için öncelikle gerekli paketleri
dotnet add package Mediator.Abstractions
ve
dotnet add package Mediator.SourceGenerator
talimatları eşliğinde ilgili projeye yükleyiniz.
Ardından Mediator yapılarını projeye dahil edebilmek için aşağıdaki gibi servis entegrasyonunu gerçekleştiriniz:
var builder = WebApplication.CreateBuilder(args); builder.Services.AddMediator(); var app = builder.Build();
Bu işlemlerden sonra CQRS, Notification veya Pipeline Behavior davranışlarından hangisini uygulamak istiyorsanız aşağıdaki gibi çalışmalar gerçekleştirebilirsiniz:
- CQRS
CQRS pattern’ı uygularken birebir MediatR kütüphanesinde olduğu gibi Command ve Query’ler fiziksel ayrılmalı ve Request / Response implementasyonları eşliğinde davranışlar oluşturulmalıdır.
Örnek Command çalışması;
public sealed record CreateUserCommandRequest(string FirstName, string LastName) : IRequest<CreateUserCommandResponse>;public sealed record CreateUserCommandResponse(Guid Id);public sealed class CreateUserCommandHandler : IRequestHandler<CreateUserCommandRequest, CreateUserCommandResponse> { public async ValueTask<CreateUserCommandResponse> Handle(CreateUserCommandRequest request, CancellationToken cancellationToken) { await Console.Out.WriteLineAsync($"'{request.FirstName} {request.LastName}' created"); var id = Guid.NewGuid(); return new CreateUserCommandResponse(id); } }Örnek Query çalışması;
public sealed record GetUsersQueryRequest : IRequest<GetUsersQueryResponse[]>;public sealed record GetUsersQueryResponse(string FirstName, string LastName);public sealed class GetUsersQueryHandler : IRequestHandler<GetUsersQueryRequest, GetUsersQueryResponse[]> { public ValueTask<GetUsersQueryResponse[]> Handle(GetUsersQueryRequest request, CancellationToken cancellationToken) { return ValueTask.FromResult(new[] { new GetUsersQueryResponse("Ahmet", "Yılmaz"), new GetUsersQueryResponse("İbrahim", "Yılmaz"), new GetUsersQueryResponse("Cüneyt", "Yıldız"), new GetUsersQueryResponse("Mustafa", "Yıldız") }); } }Her iki çalışmayı da kullanırken aşağıdaki gibi basit endpoint’lerden istifade edebilir ve testimizi gerçekleştirebiliriz;
app.MapGet("/get-users", async (IMediator mediator, [AsParameters] GetUsersQueryRequest getUsersQueryRequest) => await mediator.Send(getUsersQueryRequest)); app.MapPost("/create-user-command", async (IMediator mediator, CreateUserCommandRequest createUserCommandRequest) => await mediator.Send(createUserCommandRequest)); - Notification
Notification’da da aynı alışılageldik mantıkla hareket edilmekte veINotification<T>veINotificationHandler<T>arayüzlerinden istifade edilmektedir.Şöyle ki;
Bir notification tanımı tasarlanmalı;
public sealed record UserCreatedNotification(Guid UserId) : INotification;
Ve akabinde bu tanımı karşılayacak aşağıdaki gibi bir veya birden fazla handler oluşturulmalıdır;
public class SendEmailOnUserCreatedHandler : INotificationHandler<UserCreatedNotification> { public ValueTask Handle(UserCreatedNotification notification, CancellationToken ct) { Console.WriteLine("Email sent: " + notification.UserId); return ValueTask.CompletedTask; } }public class LogUserCreatedHandler : INotificationHandler<UserCreatedNotification> { public ValueTask Handle(UserCreatedNotification notification, CancellationToken ct) { Console.WriteLine("Log: User created " + notification.UserId); return ValueTask.CompletedTask; } }Bu çalışmayı da test edebilmek için CQRS örneğinde oluşturduğumuz POST endpoint’ini aşağıdaki gibi genişletebiliriz;
app.MapPost("/create-user-command", async (IMediator mediator, CreateUserCommandRequest createUserCommandRequest) => { var response = await mediator.Send(createUserCommandRequest); await mediator.Publish(new UserCreatedNotification(response.Id)); return response; }); - Pipeline Behavior
Logging, validation ve caching işlemleri için de Pipeline Behavior davranışlarından aşağıdaki gibi faydalanabilmekteyiz.- Logging
public class LoggingPipelineBehavior<TMessage, TResponse>(ILogger<LoggingPipelineBehavior<TMessage, TResponse>> logger) : IPipelineBehavior<TMessage, TResponse> where TMessage : notnull, IMessage { public async ValueTask<TResponse> Handle(TMessage message, MessageHandlerDelegate<TMessage, TResponse> next, CancellationToken cancellationToken) { logger.LogInformation( "Handling {RequestName} with data {@Request}", typeof(TMessage).Name, message); var response = await next(message, cancellationToken); logger.LogInformation( "Handled {RequestName} with response {@Response}", typeof(TMessage).Name, response); return response; } } - Validation
public class CreateUserCommandValidator : AbstractValidator<CreateUserCommandRequest> { public CreateUserCommandValidator() { RuleFor(x => x.FirstName) .NotEmpty() .WithMessage("FirstName boş geçilemez!") .MaximumLength(15) .MinimumLength(3); RuleFor(x => x.LastName) .NotEmpty() .WithMessage("LastName boş geçilemez!") .MaximumLength(15) .MinimumLength(3); } }public sealed class ValidationPipelineBehavior<TMessage, TResponse>(IServiceProvider serviceProvider) : IPipelineBehavior<TMessage, TResponse> where TMessage : IMessage { public async ValueTask<TResponse> Handle(TMessage message, MessageHandlerDelegate<TMessage, TResponse> next, CancellationToken cancellationToken) { using var scope = serviceProvider.CreateScope(); var validators = scope.ServiceProvider.GetServices<IValidator<TMessage>>().ToList(); if (validators is []) return await next(message, cancellationToken); var context = new ValidationContext<TMessage>(message); var validationResults = await Task.WhenAll(validators.Select(async validator => await validator.ValidateAsync(context, cancellationToken))); var failures = validationResults .SelectMany(result => result.Errors) .Where(failure => failure != null) .ToList(); if (failures is not []) throw new ValidationException(failures); return await next(message, cancellationToken); } } - Caching
public class RedisService { ConnectionMultiplexer? connectionMultiplexer; public void Connect() => connectionMultiplexer = ConnectionMultiplexer.Connect("localhost:6379"); public IDatabase Database { get { if (!connectionMultiplexer?.IsConnected == null) Connect(); return connectionMultiplexer.GetDatabase(0); } } }public sealed class CachingPipelineBehavior<TMessage, TResponse>(RedisService redisService) : IPipelineBehavior<TMessage, TResponse> where TMessage : IMessage where TResponse : class { public async ValueTask<TResponse> Handle(TMessage message, MessageHandlerDelegate<TMessage, TResponse> next, CancellationToken cancellationToken) { var redisDatabase = redisService.Database; var cacheKey = $"{message.GetType().FullName}"; TResponse? cachedResponse = null; var cachedData = await redisDatabase.StringGetAsync(cacheKey); if (cachedData.HasValue) { var cachedResponseJson = Encoding.UTF8.GetString(cachedData); cachedResponse = JsonSerializer.Deserialize<TResponse>(cachedResponseJson); } if (cachedResponse != null) return cachedResponse; var response = await next(message, cancellationToken); //Yeni veriyi cache'e alıyoruz. var responseJson = JsonSerializer.Serialize(response); var encodedData = Encoding.UTF8.GetBytes(responseJson); var result = await redisDatabase.StringSetAsync(cacheKey, encodedData, expiry: TimeSpan.FromMinutes(60)); return response; }
Tabi tüm bu Pipeline Behavior davranışlarını sisteme dahil edebilmek için aşağıdaki gibi bir yapılandırmada bulunmak yeterli olacaktır:
builder.Services.AddMediator(options => options.PipelineBehaviors = [ typeof(LoggingPipelineBehavior<,>), typeof(ValidationPipelineBehavior<,>), typeof(CachingPipelineBehavior<,>) ]); - Logging
Streaming Messages
Son olarak Mediator kütüphanesinin en az bilinen ama en güçlü diyebileceğim özelliği olan Streaming Messages özelliğini değerlendirelim.
Bu özellik; esasında adı üzerinde, klasik CQRS davranışındaki handler’ın tek bir response döndürmesinden ziyade, zaman içinde birden fazla sonuç üretmesi (stream) üzerine kurgulanmış davranışlar için geçerli bir çözüm sağlamaktadır.
Aşağıdaki örneği incelerseniz eğer;
public sealed record StreamRequest : IStreamRequest<StreamResponse>;
public sealed record StreamResponse(Guid Id);
public sealed class StreamHandler : IStreamRequestHandler<StreamRequest, StreamResponse>
{
public async IAsyncEnumerable<StreamResponse> Handle(StreamRequest request, CancellationToken cancellationToken)
{
for (int i = 0; i < 5; i++)
{
await Task.Delay(1000, cancellationToken);
yield return new StreamResponse(Guid.NewGuid());
}
}
}
şeklinde teknik bir kuyguyla rahatlıkla uygulanabilmektedir.
Tabi kullanırken de aşağıdaki gibi bir endpoint’ten istifade edilebilir:
app.MapGet("/streaming-messages", async (IMediator mediator, [AsParameters] StreamRequest streamRequest) =>
{
await foreach (var response in mediator.CreateStream(streamRequest))
Console.WriteLine(response);
});
Farkındaysanız esasında bu özelliğin tüm altyapısı .NET’teki asenkron iterasyonel işlevlerin temeli olan IAsyncEnumerable<T> interface’ine göre yapılandırılmıştır. Haliyle parça parça veri aktarımları süreçlerinde CQRS benimsenmişse eğer bu streamin özelliği ile hızlıca gerekli aksiyonlar alınabilecektir. Böylece, özellikle büyük veri setlerinde, uzun süren işlemlerde ve real-time akışlarda ihtiyacı karşılayabilecek muazzam bir altyapı elimizin altındadır diyebiliriz. Bir yandan, bu özellik ile ilgili kütüphanenin MediatR’a fark yarattığını da söyleyebiliriz.
Nihai olarak;
MediatR ücretli olduktan sonra CQRS gibi önemli bir pattern’ı manuel uygulamayı öneren onlarca tavsiyeden sonra (ki bu pattern’ın nasıl manuel uygulanacağına dair teknik makaleyi yıllar önce ilk kaleme alan kişi olarak diyorum ki) martinothamar/Mediator kütüphanesinin aynı kullanım pratikleri eşliğinde daha elverişli bir altyapı sağladığını söyleyebilirim.
İlgilenenlerin faydalanması dileğiyle…
Sonraki yazılarımda görüşmek üzere…
İyi çalışmalar…
Not : Örnek çalışmaya aşağıdaki GitHub adresinden erişebilirsiniz.
https://github.com/gncyyldz/CQRS_With_martinothamar_Mediator_Example

Hocam elinize sağlık.
Milan Jovanovic’in youtube’ta bir videosunda wolverine adlı bir kütüphane kullandığını gördüm. Kendi dokumantasyonuna bakmıştım biraz. Sizin inceleme şansınız oldumu, .net tarafında ileride “MediatR + BackgroundService + Message Queue + Outbox + Saga” için popüler bir çözüm sunan kütüphaneye dönüşme potansiyeli var mı. Olurda incelerseniz/incelediyseniz/denerseniz makaleye çevirirseniz deneyim ve düşüncelerinizden faydalanmak isterim. Saygılarımla
Youtube’da canlı yayın yapsanız arada? özlettiniz kendinizi