top of page

How in MediatR we can have events (Notifications) async and completely real Parallel


When you use MediatR, it gives you the feature that you can send Notifications or Events. MediatR executes all notifications in sequence and in sync. The total response time is similar to the following equation.

The default implementation of Publish loops through the notification handlers and awaits each one. This ensures each handler is run after one another.

ResponseTime = (Behaviors1 time + Behaviors2 time + ….) + (CommandHandler time) + (Notifications1 time + Notifications2 time + ….)


Consider a scenario where a request for an order from the Client reaches your controller. The controller uses MediatR to send its Command. This command is to create an order. MediatR sends the command to CommandHandler.

After creating the order, you want to send several events, which include: sending SMS and emails to the user, sending events in MessageBroker that other microservices will notice. These 3 tasks are time-consuming tasks and are among the IO tasks that are connected to the external provider.

The first solution

In CommandHandler, after creating the order, send the SMS and email and publish it in MessageBroker. Well, this is not the right solution. First of all, if the provider slows down the email and gets out of reach, we have a problem. We do not have the possibility of compensatory operations for them. We have to keep the Client request open to complete the IO work. Notifications in MediatR can be used to do this. Because MediatR syncs all notifications in a row, it means that if notifications are slow, the request stays open until it’s done.

The second solution Perform IO tasks async and Parallel. We have the ability to execute Notifications the way we want with a little change.

Talk about rubbing salt in my wounds — d’oh!


To do this, we must follow the steps below:


1- First we have to make our Mediator from the base class

public class CustomMediator : Mediator
{
    private Func<IEnumerable<Func<INotification, CancellationToken, 
    Task>>, INotification, CancellationToken, Task> _publish;
    
    public CustomMediator(ServiceFactoryserviceFactory, 
    Func<IEnumerable<Func<INotification, CancellationToken, Task>>, 
    INotification, CancellationToken, Task> publish) : 
    base(serviceFactory)    
    {
        _publish=publish;    
    }
    
    protecte doverride TaskPublishCore(IEnumerable<Func<INotification, 
    CancellationToken, Task>> allHandlers, INotificationnotification, 
    CancellationTokencancellationToken)    
    {
        return _publish(allHandlers, notification, cancellationToken);    
    }
}


2- Build our own publisher

public interface ICustomPublisher
{
    Task Publish<TNotification>(TNotification notification);
    Task Publish<TNotification>(TNotification notification, 
    PublishStrategy strategy);
    Task Publish<TNotification>(TNotification notification, 
    CancellationToken cancellationToken);
    Task Publish<TNotification>(TNotification notification, 
    PublishStrategy strategy, CancellationToken cancellationToken);
}

public class CustomPublisher : ICustomPublisher
{
    private readonly ServiceFactory_serviceFactory;
    
    public CustomPublisher(ServiceFactoryserviceFactory)    
    {
        _serviceFactory=serviceFactory;
        
        PublishStrategies[PublishStrategy.Async] =new 
            CustomMediator(_serviceFactory, AsyncContinueOnException);
        PublishStrategies[PublishStrategy.ParallelNoWait] =new 
            CustomMediator(_serviceFactory, ParallelNoWait);
        PublishStrategies[PublishStrategy.ParallelWhenAll] =new 
            CustomMediator(_serviceFactory, ParallelWhenAll);
        PublishStrategies[PublishStrategy.ParallelWhenAny] =new 
            CustomMediator(_serviceFactory, ParallelWhenAny);
        PublishStrategies[PublishStrategy.SyncContinueOnException] =new 
            CustomMediator(_serviceFactory, SyncContinueOnException);
        PublishStrategies[PublishStrategy.SyncStopOnException] =new 
            CustomMediator(_serviceFactory, SyncStopOnException);    
    }
    
    public IDictionary<PublishStrategy, IMediator> PublishStrategies=new 
                            Dictionary<PublishStrategy, IMediator>();
    public PublishStra tegyDefaultStrategy { get; set; } = 
                            PublishStrategy.SyncContinueOnException;
    
    public TaskPublish<TNotification>(TNotificationnotification)    
    {
        return Publish(notification, DefaultStrategy, 
                        default(CancellationToken));    
    }
    
    public TaskPublish<TNotification>(TNotification notification, 
                                        PublishStrategy strategy)    
    {
        return Publish(notification, strategy, 
                default(CancellationToken));    
    }
    
    public TaskPublish<TNotification>(TNotification notification, 
                        CancellationToken cancellationToken)    
    {
        return Publish(notification, DefaultStrategy, cancellationToken);    
    }
    
    public TaskPublish<TNotification>(TNotification notification, 
        PublishStrategy strategy, CancellationToken cancellationToken)    
    {
        if (!PublishStrategies.TryGetValue(strategy, out var mediator))        
        {
            throw new ArgumentException($"Unknown strategy: {strategy}");        
        }
        
        return mediator.Publish(notification, cancellationToken);    
    }
    
    private TaskParallelWhenAll(IEnumerable<Func<INotification, 
        CancellationToken, Task>> handlers, INotification notification, 
        CancellationToken cancellationToken)    
    {
        var tasks = new List<Task>();
        
        foreach (var handler in handlers)        
        {
            tasks.Add(Task.Run(() =>handler(notification, 
                                    cancellationToken)));        
        }
        
        return Task.WhenAll(tasks);    
    }
    
    private TaskParallelWhenAny(IEnumerable<Func<INotification, 
        CancellationToken, Task>> handlers, INotification notification, 
        CancellationToken cancellationToken)    
    {
        var tasks = new List<Task>();
        
        foreach (var handler in handlers)        
        {
            tasks.Add(Task.Run(() =>handler(notification, 
                                cancellationToken)));        
        }
        
        return Task.WhenAny(tasks);    
    }
    
    private Task ParallelNoWait(IEnumerable<Func<INotification,     
        CancellationToken, Task>> handlers, INotification notification, 
        CancellationToken cancellationToken)    
    {
        foreach (var handler in handlers)        
        {
            Task.Run(() =>handler(notification, cancellationToken));        
        }
        
        return Task.CompletedTask;    
    }
      private async TaskAsyncContinueOnException
          (IEnumerable<Func<INotification, CancellationToken, Task>> 
          handlers, INotification notification, CancellationToken 
          cancellationToken)    
    {
        var tasks = new List<Task>();
        var exceptions = new List<Exception>();
        
        foreach (var handler in handlers)        
        {
            try            
            {
                tasks.Add(handler(notification, cancellationToken));            
            }
            catch (Exceptionex) when (!(ex is OutOfMemoryException || ex 
                                            is StackOverflowException))            
            {
                exceptions.Add(ex);            
            }        
        }
        
        try        
        {
            await Task.WhenAll(tasks).ConfigureAwait(false);        
        }
        catch (AggregateExceptionex)        
        {
            exceptions.AddRange(ex.Flatten().InnerExceptions);        
        }
        catch (Exceptionex) when (!(ex is OutOfMemoryException || ex is 
                                            StackOverflowException))        
        {
            exceptions.Add(ex);       
        }
        
        if (exceptions.Any())        
        {
            throw new AggregateException(exceptions);        
        }    
    }
        private async TaskSyncStopOnException
            (IEnumerable<Func<INotification, CancellationToken, Task>> 
            handlers, INotification notification, CancellationToken 
            cancellationToken)    
    {
        foreach (var handler in handlers)            
        {
            await handler(notification, 
                        cancellationToken).ConfigureAwait(false);        
        }    
    }
        
        private async TaskSyncContinueOnException
            (IEnumerable<Func<INotification, CancellationToken, Task>> 
            handlers, INotification notification, CancellationToken 
            cancellationToken)    
    {
        var exceptions = new List<Exception>();
        
        foreach (var handler in handlers)        
        {
            try            
            {
                await handler(notification, 
                            cancellationToken).ConfigureAwait(false);            
            }
            catch (AggregateExceptionex)            
            {
                exceptions.AddRange(ex.Flatten().InnerExceptions);            
            }
            catch (Exceptionex) when (!(ex is OutOfMemoryException || ex 
                                        is StackOverflowException))            
            {exceptions.Add(ex);

3- Types of PublishStrategy. The default strategy in MediatR is the value of SyncContinueOnException. That is, in a row, and if an error occurs, the operation is stopped.

public enum PublishStrategy
{    
    /// <summary>    
    /// Run each notification handler after one another. Returns when all 
    handlers are finished. In case of any exception(s), they will be 
    captured in an AggregateException.    
    /// </summary>    
    SyncContinueOnException = 0,    
    
    /// <summary>    
    /// Run each notification handler after one another. Returns when all 
    handlers are finished or an exception has been thrown. In case of an 
    exception, any handlers after that will not be run.    
    /// </summary>    
    SyncStopOnException = 1,    
    
    /// <summary>    
    /// Run all notification handlers asynchronously. Returns when all 
    handlers are finished. In case of any exception(s), they will be 
    captured in an AggregateException.   
    /// </summary>    
    Async = 2,    
    
    /// <summary>    
    /// Run each notification handler on it's own thread using Task.Run(). 
    Returns immediately and does not wait for any handlers to finish. Note 
    that you cannot capture any exceptions, even if you await the call to 
    Publish.    
    /// </summary>    
    ParallelNoWait = 3,    
    
    /// <summary>    
    /// Run each notification handler on it's own thread using Task.Run(). 
    Returns when all threads (handlers) are finished. In case of any 
    exception(s), they are captured in an AggregateException by 
    Task.WhenAll.    
    /// </summary>    
    ParallelWhenAll = 4,    
    
    /// <summary>    
    /// Run each notification handler on it's own thread using Task.Run(). 
    Returns when any thread (handler) is finished. Note that you cannot 
    capture any exceptions (See msdn documentation of Task.WhenAny)    
    /// </summary>    
    ParallelWhenAny = 5,
}

4- Introduction to IOC system

services.AddSingleton<ICustomPublisher, CustomPublisher>();

5- Using a publisher

[ApiVersion("1")]
public class OrderController : BaseController
{
    private readonly IMapper _mapper;
    private readonly IMediator _mediator;
    private readonly ICustomPublisher _publisher;
    
    public OrderController(IMappermapper, IMediatormediator, 
                            ICustomPublisherpublisher)    
    {
        _mapper = mapper;
        _mediator = mediator;
        _publisher = publisher;    
    }    
    
    [HttpPost]
    public async Task<IActionResult> 
            OrderSubmited(OrderSubmitedDtoorderSubmitedDto)    
    {
        var model = _mapper.Map<OrderSubmitedCommandReq>
                                    (orderSubmitedDto);
        
        var result = await _mediator.Send(model);
        
        await_publisher.Publish(new OrderSummitedMessage(), 
            PublishStrategy.ParallelNoWait, CancellationToken.None);
        
        if (result.ToResult(out var baseRes)) return 
                    BadRequest(baseRes.Error);
        
        return Ok(result);    
    }
}

Using this method, you can determine the strategy for your publishers, how your events will be executed at that moment, according to your business. It’s very important whether at the same time? What if the error is thrown out? …….



Source: Medium - Mohsen Rajabi


The Tech Platform

0 comments

Comments


bottom of page