It mainly supports Publish/Subscribe patterns in in-memory, and covers all cases with a unified interface. For example, it supports communication between different hubs in MagicOnion and SignalR for real-time communication, messenger pattern for loosely coupling between V-VM in GUI application development such as WPF (EventAggregator in Prism library), implementation of Mediator pattern for CQRS architecture in server applications(MediatR in ASP.NET), and communication between game objects in Unity such as SignalBus in Zenject and MessageBroker in UniRx.
One thing that is common to all the libraries provided by Cysharp is that performance is essential.
The above graph shows the performance of Publish with 8 Subscribers, which is faster than C#’s event syntax and 78 times faster than Prism’s EventAggregator.
Of course, memory allocation for each Publish is kept to zero.
One of the features of MessagePipe is that it assumes Dependency Injection.
.NET 5, many frameworks are built on top of the .NET Generic Host (e.g. ASP.NET Core, MagicOnion, ConsoleAppFramework, and desktop applications such as MAUI). We believe that the design will be cleaner if it is built on the assumption that DI exists. DI libraries are also available in Unity such as Zenject and VContainer.
Therefore, you only need one line for setup as follows.
using MessagePipe;
using Microsoft.Extensions.DependencyInjection;
Host.CreateDefaultBuilder()
.ConfigureServices((ctx, services) =>
{
services.AddMessagePipe();
// AddMessagePipe(options => { }) for configure options
})
Then, the publisher injects IPublisher<T>, and the subscriber injects ISubscriber<T>, as in ILogger<T>. For T, any type is possible, e.g., primitive (int, string, etc…), struct, class, enum, etc.
using MessagePipe;
public struct MyEvent { }
public class SceneA
{
readonly IPublisher<MyEvent> publisher;
public SceneA(IPublisher<MyEvent> publisher)
{
this.publisher=publisher;
}
void Send()
{
this.publisher.Publish(newMyEvent());
}
}
public class SceneB
{
readonly ISubscriber<MyEvent> subscriber;
readonly IDisposable disposable;
public SceneB(ISubscriber<MyEvent> subscriber)
{
var bag = DisposableBag.CreateBuilder();
// composite disposable for manage subscription
subscriber.Subscribe(x=>Console.WriteLine("here")).AddTo(bag);
disposable=bag.Build();
}
void Close()
{
disposable.Dispose();
// unsubscribe event, all subscription **must** Dispose when completed
}
}
The return value of subscribe is IDisposable, which unlike event can be unsubscribed by simply disposing of it. Multiple IDisposables can be grouped together using DisposableBag (or CompositeDisposable, which is included in Rx), so that they can be managed in conjunction with the lifecycle of the object to which they are attached.
In WPF, it is common to keep subscriptions in messengers by weak reference, which I consider to be an anti-pattern. Weak references have obscure and implicit rules for tying them to GC objects and unavoidable performance degradation (as shown by Prism’s unacceptably low performance in benchmarks). The fact that you can explicitly choose the lifecycle to be associated with is an advantage, not a hassle. One drawback is the subscription leak caused by forgetting to handle, but this can cover by the subscription manager described below, scope dispose by DI, and compile error by Analyzer.
By using DI, it is possible to get Publisher/Subscriber objects smoothly, and by using the scope function of DI, it is possible to separate senders and receivers of messages for each scope, and to prevent subscription leaks by batch dispose at the end of the scope.
The above example is the simplest, using IPublisher<T>/ISubscriber<T>, but MessagePipe has a similar interface, IPublisher<TKey, TMessage>/ISubscriber<TKey, Message>, with a key (topic in the PubSub pattern) interface.
As a practical example, Cysharp is currently developing an application that connects Unity with MagicOnion and delivers the data to the browser with Blazor. The problem here is the transfer of data between the Blazor page (browser lifecycle) and the MagicOnion Hub (Unity connection lifecycle).
Browser <-> Blazor <- ??? -> MagicOnion <-> Unity
We decided to use this as a key to pass data through MessagePipe using the connection ID.
Browser <-> Blazor <- [MessagePipe] -> MagicOnion <-> Unity
The following is an example code.
// MagicOnion(similar as SignalR, realtime event framework for .NET and Unity)
public class UnityConnectionHub : StreamingHubBase<
IUnityConnectionHub,
IUnityConnectionHubReceiver>,
IUnityConnectionHub
{
readonly IPublisher<Guid, UnitEventData> eventPublisher;
readonly IPublisher<Guid, ConnectionClose> closePublisher;
Guidid;
public UnityConnectionHub(IPublisher<Guid,
UnitEventData> eventPublisher,
IPublisher<Guid,
ConnectionClose> closePublisher)
{
this.eventPublisher=eventPublisher;
this.closePublisher=closePublisher;
}
override async ValueTask OnConnected()
{
this.id=Guid.Parse(Context.Headers["id"]);
}
override async ValueTask OnDisconnected()
{
this.closePublisher.Publish(id, newConnectionClose());
// publish to browser(Blazor)
}
// called from Client(Unity)
public Task<UnityEventData> SendEventAsync(UnityEventDatadata)
{
this.eventPublisher.Publish(id, data);
// publish to browser(Blazor)
}
}
// Blazor
public partial class BlazorPage : ComponentBase, IDisposable
{
[Parameter]
public Guid ID { get; set; }
[Inject]
ISubscriber<Guid, UnitEventData> UnityEventSubscriber { get; set;
}
[Inject]
ISubscriber<Guid, ConnectionClose> ConnectionCloseSubscriber {
get; set; }
IDisposable subscription;
protected override void OnInitialized()
{
// receive event from MagicOnion(that is from Unity)
var d1 = UnityEventSubscriber.Subscribe(ID, x=>
{
// do anything...
});
var d2 = ConnectionCloseSubscriber.Subscribe(ID, _=>
{
// show disconnected thing to view...
subscription?.Dispose();
// and unsubscribe events.
});
subscription=DisposableBag.Create(d1, d2);
// combine disposable.
}
public void Dispose()
{
// unsubscribe event when browser is closed.
subscription?.Dispose();
}
}
We were able to connect two applications (MagicOnion and Blazor) that are co-located in the same solution through MessagePipe. Types are also shared with Unity, which is hosted in the same solution, we were able to unify the Unity client to the browser in C# for a simple, all-in-one connection.
By the way, the difference between MessagePipe’s IPublisher/ISusbcriber and Rx’s Subject is that OnError and OnCompleted do not exist. In other words, it can be regarded as an IObservable<T> where only OnNext exists. This allows us to guarantee that the event will not end and that the subscription will not expire due to an error. In terms of event handling, the possibility of subscription termination due to OnError/OnCompleted increases the number of considerations, such as the need to re-subscribe. By using the “never ending” state, which is a less expressive constraint than Rx, we improve the ease of handling. If you need more expressiveness, you can convert it to Rx by AsObservable as appropriate. This is the same concept as the Relay (PublishRelay, BehaivorRelay) introduced in RxSwift.
In addition to keyless/keyed, there are other variations such as asynchronous handlers utilizing async/await(IAsyncPublisher/IAsyncSubscriber), or holding the most recent value like BehaviorSubject in Rx (IBufferedPublisher/ IBufferedSubscriber).
sync/async
keyed/keyless
buffered/bufferless
broadcast/response(+many)
in-memory/distributed
The combination of all interfaces is shown below.
There are a lot of them, but they are grouped together in a similar unified API, so that the large number of functions should not lead to increased learning costs. For example, the only differences between sync/async are Publish/PublishAsync and asynchronous handlers, and the entire flow of sending a message with Publish, subscribing with Subscribe, and managing the return value IDisposable is the same.
// keyless-sync
public interface IPublisher<TMessage>
{
void Publish(TMessage message);
}
public interface ISubscriber<TMessage>
{
IDisposable Subscribe(IMessageHandler<TMessage> handler,
paramsMessageHandlerFilter<TMessage>[] filters);
}
// keyless-async
public interface IAsyncPublisher<TMessage>
{
ValueTask PublishAsync(TMessage message,
AsyncPublishStrategy publishStrategy,
CancellationToken cancellationToken=default(CancellationToken));
}
public interface IAsyncSubscriber<TMessage>
{
IDisposable Subscribe(IAsyncMessageHandler<TMessage> asyncHandler,
paramsAsyncMessageHandlerFilter<TMessage>[] filters);
}
Managing Subscription
In MessagePipe, the return value IDisposable must be handled in some way. If you ignore it, it will leak. However, there is always the possibility of leaks (e.g., if the IDisposable is tied to a lifecycle object, the lifecycle object may not be handled properly and the subscription may remain). Therefore, MessagePipe allows you to centrally get the number of all subscriptions and, if necessary, the stack trace at the time of subscribe.
public sealed class MessagePipeDiagnosticsInfo
{
/// <summary>Get current subscribed count.</summary>
public int SubscribeCount { get; }
/// <summary>
/// When MessagePipeOptions.EnableCaptureStackTrace is enabled, list all stacktrace on subscribe.
/// </summary>
public StackTraceInfo[] GetCapturedStackTraces(bool ascending=true);
/// <summary>
/// When MessagePipeOptions.EnableCaptureStackTrace is enabled, groped by caller of subscribe.
/// </summary>
public ILookup<string, StackTraceInfo> GetGroupedByCaller(bool ascending=true)
}
What kind of information can we get from this? In Unity, we provide an editor extension, MessagePipe Diagnostics Window, which makes it easy to visualize the results of the above management classes.
This way, if there is a leak, it will be obvious and can be dealt with immediately.
If the return value of Subscribe must be handled, it should be better if it should never be ignored (it would be a compilation error). That’s why we’ve published a Roslyn Analyzer called MessagePipe.Analyzer.
This will prevent 100% of leaks due to ignoring the return value, since not handling the return value of Subscribe will result in an error.
Note that Analyzer can be used with Unity 2020.2 or later.
However, since the integration of Analyzer with Unity-IDE is currently lacking, we have released an extension Cysharp/CsprojModifier to compensate for it. By using CsprojModifier, you can use MessagePipe.Analyzer in Unity without any errors.
Conclusion
I didn’t introduce it in this article, AsyncPublisher can wait for asynchronous processing of all subscribers by await(which is not possible with normal events or Rx), IDistributedPublisher/Subscriber allows PubSub not only in-memory but also over the network, and Filter hooks before and after every send and receive for both sync and async.
// don't send same value(Rx's DistinctUntilChanged) by Filter
public class ChangedValueFilter<T> : MessageHandlerFilter<T>
{
T lastValue;
public override void Handle(Tmessage, Action<T> next)
{
if (EqualityComparer<T>.Default.Equals(message, lastValue))
{
return;
}
lastValue=message;
next(message);
}
}
In addition, IRequestHandler<TRequest, TResponse>, which is slightly different from Publisher/Subscriber interface, supports implementation of mediator pattern like MediatR.
public interface IRequestHandler<in TRequest, out TResponse>
{
TResponse Invoke(TRequest request);
}
public interface IAsyncRequestHandler<in TRequest, TResponse>
{
ValueTask<TResponse> InvokeAsync(TRequest request, CancellationToken cancellationToken=default);
}
It is a very useful and high-performance library, so please give it a try.
Source: Medium
The Tech Platform
Comments