When you use MediatR, it gives you the feature that you can send Notifications or Events. MediatR executes all notifications in sequence and in sync. The total response time is similar to the following equation.
The default implementation of Publish loops through the notification handlers and awaits each one. This ensures each handler is run after one another.
ResponseTime = (Behaviors1 time + Behaviors2 time + ….) + (CommandHandler time) + (Notifications1 time + Notifications2 time + ….)
Consider a scenario where a request for an order from the Client reaches your controller. The controller uses MediatR to send its Command. This command is to create an order. MediatR sends the command to CommandHandler.
After creating the order, you want to send several events, which include: sending SMS and emails to the user, sending events in MessageBroker that other microservices will notice. These 3 tasks are time-consuming tasks and are among the IO tasks that are connected to the external provider.
The first solution
In CommandHandler, after creating the order, send the SMS and email and publish it in MessageBroker. Well, this is not the right solution. First of all, if the provider slows down the email and gets out of reach, we have a problem. We do not have the possibility of compensatory operations for them. We have to keep the Client request open to complete the IO work. Notifications in MediatR can be used to do this. Because MediatR syncs all notifications in a row, it means that if notifications are slow, the request stays open until it’s done.
The second solution
Perform IO tasks async and Parallel. We have the ability to execute Notifications the way we want with a little change.
Talk about rubbing salt in my wounds — d’oh!
To do this, we must follow the steps below:
1- First we have to make our Mediator from the base class
public class CustomMediator : Mediator
{
private Func<IEnumerable<Func<INotification, CancellationToken,
Task>>, INotification, CancellationToken, Task> _publish;
public CustomMediator(ServiceFactoryserviceFactory,
Func<IEnumerable<Func<INotification, CancellationToken, Task>>,
INotification, CancellationToken, Task> publish) :
base(serviceFactory)
{
_publish=publish;
}
protecte doverride TaskPublishCore(IEnumerable<Func<INotification,
CancellationToken, Task>> allHandlers, INotificationnotification,
CancellationTokencancellationToken)
{
return _publish(allHandlers, notification, cancellationToken);
}
}
2- Build our own publisher
public interface ICustomPublisher
{
Task Publish<TNotification>(TNotification notification);
Task Publish<TNotification>(TNotification notification,
PublishStrategy strategy);
Task Publish<TNotification>(TNotification notification,
CancellationToken cancellationToken);
Task Publish<TNotification>(TNotification notification,
PublishStrategy strategy, CancellationToken cancellationToken);
}
public class CustomPublisher : ICustomPublisher
{
private readonly ServiceFactory_serviceFactory;
public CustomPublisher(ServiceFactoryserviceFactory)
{
_serviceFactory=serviceFactory;
PublishStrategies[PublishStrategy.Async] =new
CustomMediator(_serviceFactory, AsyncContinueOnException);
PublishStrategies[PublishStrategy.ParallelNoWait] =new
CustomMediator(_serviceFactory, ParallelNoWait);
PublishStrategies[PublishStrategy.ParallelWhenAll] =new
CustomMediator(_serviceFactory, ParallelWhenAll);
PublishStrategies[PublishStrategy.ParallelWhenAny] =new
CustomMediator(_serviceFactory, ParallelWhenAny);
PublishStrategies[PublishStrategy.SyncContinueOnException] =new
CustomMediator(_serviceFactory, SyncContinueOnException);
PublishStrategies[PublishStrategy.SyncStopOnException] =new
CustomMediator(_serviceFactory, SyncStopOnException);
}
public IDictionary<PublishStrategy, IMediator> PublishStrategies=new
Dictionary<PublishStrategy, IMediator>();
public PublishStra tegyDefaultStrategy { get; set; } =
PublishStrategy.SyncContinueOnException;
public TaskPublish<TNotification>(TNotificationnotification)
{
return Publish(notification, DefaultStrategy,
default(CancellationToken));
}
public TaskPublish<TNotification>(TNotification notification,
PublishStrategy strategy)
{
return Publish(notification, strategy,
default(CancellationToken));
}
public TaskPublish<TNotification>(TNotification notification,
CancellationToken cancellationToken)
{
return Publish(notification, DefaultStrategy, cancellationToken);
}
public TaskPublish<TNotification>(TNotification notification,
PublishStrategy strategy, CancellationToken cancellationToken)
{
if (!PublishStrategies.TryGetValue(strategy, out var mediator))
{
throw new ArgumentException($"Unknown strategy: {strategy}");
}
return mediator.Publish(notification, cancellationToken);
}
private TaskParallelWhenAll(IEnumerable<Func<INotification,
CancellationToken, Task>> handlers, INotification notification,
CancellationToken cancellationToken)
{
var tasks = new List<Task>();
foreach (var handler in handlers)
{
tasks.Add(Task.Run(() =>handler(notification,
cancellationToken)));
}
return Task.WhenAll(tasks);
}
private TaskParallelWhenAny(IEnumerable<Func<INotification,
CancellationToken, Task>> handlers, INotification notification,
CancellationToken cancellationToken)
{
var tasks = new List<Task>();
foreach (var handler in handlers)
{
tasks.Add(Task.Run(() =>handler(notification,
cancellationToken)));
}
return Task.WhenAny(tasks);
}
private Task ParallelNoWait(IEnumerable<Func<INotification,
CancellationToken, Task>> handlers, INotification notification,
CancellationToken cancellationToken)
{
foreach (var handler in handlers)
{
Task.Run(() =>handler(notification, cancellationToken));
}
return Task.CompletedTask;
}
private async TaskAsyncContinueOnException
(IEnumerable<Func<INotification, CancellationToken, Task>>
handlers, INotification notification, CancellationToken
cancellationToken)
{
var tasks = new List<Task>();
var exceptions = new List<Exception>();
foreach (var handler in handlers)
{
try
{
tasks.Add(handler(notification, cancellationToken));
}
catch (Exceptionex) when (!(ex is OutOfMemoryException || ex
is StackOverflowException))
{
exceptions.Add(ex);
}
}
try
{
await Task.WhenAll(tasks).ConfigureAwait(false);
}
catch (AggregateExceptionex)
{
exceptions.AddRange(ex.Flatten().InnerExceptions);
}
catch (Exceptionex) when (!(ex is OutOfMemoryException || ex is
StackOverflowException))
{
exceptions.Add(ex);
}
if (exceptions.Any())
{
throw new AggregateException(exceptions);
}
}
private async TaskSyncStopOnException
(IEnumerable<Func<INotification, CancellationToken, Task>>
handlers, INotification notification, CancellationToken
cancellationToken)
{
foreach (var handler in handlers)
{
await handler(notification,
cancellationToken).ConfigureAwait(false);
}
}
private async TaskSyncContinueOnException
(IEnumerable<Func<INotification, CancellationToken, Task>>
handlers, INotification notification, CancellationToken
cancellationToken)
{
var exceptions = new List<Exception>();
foreach (var handler in handlers)
{
try
{
await handler(notification,
cancellationToken).ConfigureAwait(false);
}
catch (AggregateExceptionex)
{
exceptions.AddRange(ex.Flatten().InnerExceptions);
}
catch (Exceptionex) when (!(ex is OutOfMemoryException || ex
is StackOverflowException))
{exceptions.Add(ex);
3- Types of PublishStrategy. The default strategy in MediatR is the value of SyncContinueOnException. That is, in a row, and if an error occurs, the operation is stopped.
public enum PublishStrategy
{
/// <summary>
/// Run each notification handler after one another. Returns when all
handlers are finished. In case of any exception(s), they will be
captured in an AggregateException.
/// </summary>
SyncContinueOnException = 0,
/// <summary>
/// Run each notification handler after one another. Returns when all
handlers are finished or an exception has been thrown. In case of an
exception, any handlers after that will not be run.
/// </summary>
SyncStopOnException = 1,
/// <summary>
/// Run all notification handlers asynchronously. Returns when all
handlers are finished. In case of any exception(s), they will be
captured in an AggregateException.
/// </summary>
Async = 2,
/// <summary>
/// Run each notification handler on it's own thread using Task.Run().
Returns immediately and does not wait for any handlers to finish. Note
that you cannot capture any exceptions, even if you await the call to
Publish.
/// </summary>
ParallelNoWait = 3,
/// <summary>
/// Run each notification handler on it's own thread using Task.Run().
Returns when all threads (handlers) are finished. In case of any
exception(s), they are captured in an AggregateException by
Task.WhenAll.
/// </summary>
ParallelWhenAll = 4,
/// <summary>
/// Run each notification handler on it's own thread using Task.Run().
Returns when any thread (handler) is finished. Note that you cannot
capture any exceptions (See msdn documentation of Task.WhenAny)
/// </summary>
ParallelWhenAny = 5,
}
4- Introduction to IOC system
services.AddSingleton<ICustomPublisher, CustomPublisher>();
5- Using a publisher
[ApiVersion("1")]
public class OrderController : BaseController
{
private readonly IMapper _mapper;
private readonly IMediator _mediator;
private readonly ICustomPublisher _publisher;
public OrderController(IMappermapper, IMediatormediator,
ICustomPublisherpublisher)
{
_mapper = mapper;
_mediator = mediator;
_publisher = publisher;
}
[HttpPost]
public async Task<IActionResult>
OrderSubmited(OrderSubmitedDtoorderSubmitedDto)
{
var model = _mapper.Map<OrderSubmitedCommandReq>
(orderSubmitedDto);
var result = await _mediator.Send(model);
await_publisher.Publish(new OrderSummitedMessage(),
PublishStrategy.ParallelNoWait, CancellationToken.None);
if (result.ToResult(out var baseRes)) return
BadRequest(baseRes.Error);
return Ok(result);
}
}
Using this method, you can determine the strategy for your publishers, how your events will be executed at that moment, according to your business. It’s very important whether at the same time? What if the error is thrown out? …….
Source: Medium - Mohsen Rajabi
The Tech Platform
Kommentare