diff --git a/.gitignore b/.gitignore index 9f7eb828..67fdbf38 100644 --- a/.gitignore +++ b/.gitignore @@ -316,3 +316,5 @@ version.txt /docs/.yarn/ BenchmarkDotNet.Artifacts/ + +.worktrees/ diff --git a/Directory.Packages.props b/Directory.Packages.props index c321698c..4b5fb3e1 100644 --- a/Directory.Packages.props +++ b/Directory.Packages.props @@ -95,6 +95,11 @@ + + + + + diff --git a/Eventuous.slnx b/Eventuous.slnx index 83e61f82..e32a93a7 100644 --- a/Eventuous.slnx +++ b/Eventuous.slnx @@ -162,6 +162,15 @@ + + + + + + + + + diff --git a/docs/src/content/docs/next/infra/signalr.md b/docs/src/content/docs/next/infra/signalr.md new file mode 100644 index 00000000..98f07768 --- /dev/null +++ b/docs/src/content/docs/next/infra/signalr.md @@ -0,0 +1,190 @@ +--- +title: "SignalR" +description: "Real-time event streaming to clients via SignalR" +sidebar: + order: 11 +--- + +## Introduction + +The SignalR subscription gateway bridges Eventuous stream subscriptions to SignalR, enabling real-time event streaming to browser UIs, mobile apps, or other remote clients. It provides two NuGet packages: + +- **`Eventuous.SignalR.Server`** — server-side gateway that manages per-connection Eventuous subscriptions and forwards events over SignalR +- **`Eventuous.SignalR.Client`** — client-side subscription API with auto-reconnect and typed event handling + +The server reuses the existing [Gateway](../../../gateway) pattern (`GatewayHandler` + `BaseProducer`) internally, so event forwarding benefits from the same tracing and metadata pipeline as other Eventuous producers. + +## Server + +### Registration + +Register the gateway with a subscription factory that creates store-specific subscriptions on demand: + +```csharp +builder.Services.AddSignalRSubscriptionGateway((sp, options) => { + var client = sp.GetRequiredService(); + var loggerFactory = sp.GetRequiredService(); + + options.SubscriptionFactory = (stream, fromPosition, pipe, subscriptionId) => + new StreamSubscription(client, new StreamSubscriptionOptions { + StreamName = stream, + SubscriptionId = subscriptionId + }, new NoOpCheckpointStore(fromPosition), pipe, loggerFactory); +}); +``` + +The `SubscriptionFactory` delegate is called each time a client subscribes to a stream. It receives the stream name, optional starting position, a pre-built consume pipe, and a subscription identifier. You can use any Eventuous subscription type (KurrentDB, PostgreSQL, etc.). + +### Hub + +Map the ready-made hub to an endpoint: + +```csharp +app.MapHub("/subscriptions"); +``` + +The built-in `SignalRSubscriptionHub` exposes two methods that clients call: + +- `SubscribeToStream(string stream, ulong? fromPosition)` — start receiving events +- `UnsubscribeFromStream(string stream)` — stop receiving events + +When a client disconnects, all its subscriptions are automatically cleaned up. + +### Custom hubs + +For applications that need a custom hub (e.g., adding authentication or authorization logic), inject `SubscriptionGateway` directly: + +```csharp +public class MyHub(SubscriptionGateway gateway) : Hub { + public Task SubscribeToStream(string stream, ulong? fromPosition) + => gateway.SubscribeAsync(Context.ConnectionId, stream, fromPosition, Context.ConnectionAborted); + + public Task UnsubscribeFromStream(string stream) + => gateway.UnsubscribeAsync(Context.ConnectionId, stream); + + public override Task OnDisconnectedAsync(Exception? exception) + => gateway.RemoveConnectionAsync(Context.ConnectionId); +} +``` + +Register with your custom hub type: + +```csharp +builder.Services.AddSignalRSubscriptionGateway((sp, options) => { + // configure subscription factory +}); +``` + +## Client + +### Connection setup + +Create a `SignalRSubscriptionClient` from any `HubConnection`. The client hooks into SignalR's reconnect lifecycle but doesn't own the connection policy — configure automatic reconnect on the `HubConnection` itself: + +```csharp +var connection = new HubConnectionBuilder() + .WithUrl("https://myserver/subscriptions") + .WithAutomaticReconnect() + .Build(); + +await connection.StartAsync(); + +var client = new SignalRSubscriptionClient(connection); +``` + +### Raw streaming with IAsyncEnumerable + +The simplest consumption mode returns events as they arrive: + +```csharp +await foreach (var envelope in client.SubscribeAsync("Order-123", fromPosition: null)) { + Console.WriteLine($"{envelope.EventType} at position {envelope.StreamPosition}"); + Console.WriteLine(envelope.JsonPayload); +} +``` + +Each `StreamEventEnvelope` contains: + +| Property | Description | +|---|---| +| `EventId` | Unique event identifier | +| `Stream` | Source stream name | +| `EventType` | Registered event type name | +| `StreamPosition` | Position within the stream | +| `GlobalPosition` | Position in the global event log | +| `Timestamp` | When the event was created | +| `JsonPayload` | Event payload as JSON | +| `JsonMetadata` | Event metadata as JSON (may include trace context) | + +### Typed consumption with On<T> + +For type-safe event handling, use `SubscribeTyped` with fluent handler registration: + +```csharp +await client.SubscribeTyped("Order-123", fromPosition: 0) + .On((evt, meta) => { + Console.WriteLine($"Order placed: {evt.OrderId} at {meta.Timestamp}"); + return ValueTask.CompletedTask; + }) + .On((evt, meta) => { + Console.WriteLine($"Order shipped at position {meta.Position}"); + return ValueTask.CompletedTask; + }) + .OnError(err => Console.WriteLine($"Error on {err.Stream}: {err.Message}")) + .StartAsync(); +``` + +Events are deserialized using the Eventuous `TypeMap` and `IEventSerializer`. Event types must be registered in `TypeMap` as usual (via `[EventType]` attribute or manual registration). Unrecognized event types are silently skipped. + +All `On` handlers must be registered before calling `StartAsync`. Calling `On` after `StartAsync` throws `InvalidOperationException`. + +### Client options + +```csharp +var client = new SignalRSubscriptionClient(connection, new SignalRSubscriptionClientOptions { + Serializer = customSerializer, // default: DefaultEventSerializer.Instance + EnableTracing = true // default: false +}); +``` + +| Option | Description | +|---|---| +| `Serializer` | Custom `IEventSerializer` for deserializing event payloads in typed mode | +| `EnableTracing` | When `true`, the client creates an `Activity` for each received event, linked to the trace context from metadata. Enable when the client has an OpenTelemetry collector configured. | + +## Auto-reconnect + +The client handles connection drops transparently: + +1. **Position tracking** — the client records the last stream position for each active subscription +2. **Re-subscribe on reconnect** — when SignalR reconnects, the client re-sends `SubscribeToStream` for each active subscription with the last known position +3. **Deduplication** — events at or before the last seen position are skipped, preventing duplicates after reconnect + +The server is stateless — it creates fresh subscriptions from the positions provided by the client. + +``` +Normal flow: + Client ──SubscribeToStream("Order-1", 42)──► Server + Client ◄──StreamEvent(pos=43)──────────────── Server + Client ◄──StreamEvent(pos=44)──────────────── Server + [tracks lastPosition = 44] + +Disconnect + Reconnect: + [connection drops, SignalR reconnects] + Client ──SubscribeToStream("Order-1", 44)──► Server + Client ◄──StreamEvent(pos=44)──────────────── Server [duplicate, skipped] + Client ◄──StreamEvent(pos=45)──────────────── Server [new, delivered] +``` + +## Wire format + +Events are transmitted as `StreamEventEnvelope` records over SignalR. The payload is pre-serialized JSON on the server side, avoiding polymorphic serialization issues. Metadata (including trace context) flows through `JsonMetadata` as a serialized dictionary. + +Trace context propagates through the existing Eventuous metadata pipeline: `$traceId` and `$spanId` keys in metadata are preserved from the original event through the gateway to the client. When `EnableTracing` is enabled on the client, the consume activity is linked to the original trace. + +## Packages + +| Package | Dependencies | Purpose | +|---|---|---| +| `Eventuous.SignalR.Server` | `Eventuous.Subscriptions`, `Eventuous.Gateway`, `Microsoft.AspNetCore.App` | Server-side gateway | +| `Eventuous.SignalR.Client` | `Eventuous.Shared`, `Eventuous.Serialization`, `Eventuous.Diagnostics`, `Microsoft.AspNetCore.SignalR.Client` | Client-side subscriptions | diff --git a/src/SignalR/src/Eventuous.SignalR.Client/Eventuous.SignalR.Client.csproj b/src/SignalR/src/Eventuous.SignalR.Client/Eventuous.SignalR.Client.csproj new file mode 100644 index 00000000..018077b2 --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Client/Eventuous.SignalR.Client.csproj @@ -0,0 +1,19 @@ + + + + + + + + + + + + + + + Tools\TaskExtensions.cs + + + + diff --git a/src/SignalR/src/Eventuous.SignalR.Client/SignalRSubscriptionClient.cs b/src/SignalR/src/Eventuous.SignalR.Client/SignalRSubscriptionClient.cs new file mode 100644 index 00000000..a9f26285 --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Client/SignalRSubscriptionClient.cs @@ -0,0 +1,171 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using System.Collections.Concurrent; +using System.Threading.Channels; +using Microsoft.AspNetCore.SignalR.Client; + +namespace Eventuous.SignalR.Client; + +public class SignalRSubscriptionClient : IAsyncDisposable { + readonly HubConnection _connection; + readonly SignalRSubscriptionClientOptions _options; + readonly ConcurrentDictionary _subscriptions = new(); + readonly IDisposable _eventRegistration; + readonly IDisposable _errorRegistration; + bool _disposed; + + public SignalRSubscriptionClient(HubConnection connection, SignalRSubscriptionClientOptions? options = null) { + _connection = connection; + _options = options ?? new SignalRSubscriptionClientOptions(); + + _eventRegistration = _connection.On( + SignalRSubscriptionMethods.StreamEvent, + OnStreamEvent + ); + + _errorRegistration = _connection.On( + SignalRSubscriptionMethods.StreamError, + OnStreamError + ); + + _connection.Reconnected += OnReconnected; + _connection.Closed += OnClosed; + } + + public async IAsyncEnumerable SubscribeAsync( + string stream, + ulong? fromPosition, + [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken ct = default + ) { + var channel = Channel.CreateUnbounded( + new UnboundedChannelOptions { SingleReader = true, SingleWriter = true } + ); + + // Complete any existing subscription for this stream before replacing + if (_subscriptions.TryRemove(stream, out var previous)) { + previous.Channel.Writer.TryComplete(); + } + + var state = new SubscriptionState(channel, fromPosition); + _subscriptions[stream] = state; + + try { + await _connection.InvokeAsync( + SignalRSubscriptionMethods.Subscribe, + stream, + fromPosition, + ct + ).NoContext(); + + await foreach (var envelope in channel.Reader.ReadAllAsync(ct).NoContext(ct)) { + yield return envelope; + } + } finally { + _subscriptions.TryRemove(stream, out _); + } + } + + public async Task UnsubscribeAsync(string stream) { + if (_subscriptions.TryRemove(stream, out var state)) { + state.Channel.Writer.TryComplete(); + await _connection.InvokeAsync( + SignalRSubscriptionMethods.Unsubscribe, + stream + ).NoContext(); + } + } + + internal IEventSerializer GetSerializer() + => _options.Serializer ?? DefaultEventSerializer.Instance; + + internal bool TracingEnabled => _options.EnableTracing; + + internal SubscriptionState? GetSubscriptionState(string stream) + => _subscriptions.TryGetValue(stream, out var state) ? state : null; + + internal void RegisterSubscription(string stream, SubscriptionState state) + => _subscriptions[stream] = state; + + internal void RemoveSubscription(string stream) { + if (_subscriptions.TryRemove(stream, out var state)) { + state.Channel.Writer.TryComplete(); + } + } + + internal HubConnection Connection => _connection; + + public TypedStreamSubscription SubscribeTyped(string stream, ulong? fromPosition) + => new(this, stream, fromPosition); + + void OnStreamEvent(StreamEventEnvelope envelope) { + if (!_subscriptions.TryGetValue(envelope.Stream, out var state)) return; + + // Deduplication: skip events at or before the last seen position + if (state.LastPosition.HasValue && envelope.StreamPosition <= state.LastPosition.Value) return; + + state.LastPosition = envelope.StreamPosition; + state.Channel.Writer.TryWrite(envelope); + } + + void OnStreamError(StreamSubscriptionError error) { + if (_subscriptions.TryRemove(error.Stream, out var state)) { + state.Channel.Writer.TryComplete(new Exception(error.Message)); + } + } + + async Task OnReconnected(string? connectionId) { + foreach (var (stream, state) in _subscriptions) { + try { + await _connection.InvokeAsync( + SignalRSubscriptionMethods.Subscribe, + stream, + state.LastPosition, + CancellationToken.None + ).NoContext(); + } catch (OperationCanceledException) { + // Connection was disposed during reconnect + } catch (ObjectDisposedException) { + // Connection already torn down + } + } + } + + Task OnClosed(Exception? exception) { + foreach (var (_, state) in _subscriptions) { + state.Channel.Writer.TryComplete(exception); + } + return Task.CompletedTask; + } + + public async ValueTask DisposeAsync() { + if (_disposed) return; + _disposed = true; + + _connection.Reconnected -= OnReconnected; + _connection.Closed -= OnClosed; + + foreach (var (stream, state) in _subscriptions) { + state.Channel.Writer.TryComplete(); + try { + await _connection.InvokeAsync( + SignalRSubscriptionMethods.Unsubscribe, + stream + ).NoContext(); + } catch (OperationCanceledException) { + // Expected during shutdown + } catch (ObjectDisposedException) { + // Connection already torn down + } + } + + _subscriptions.Clear(); + _eventRegistration.Dispose(); + _errorRegistration.Dispose(); + } + + internal class SubscriptionState(Channel channel, ulong? initialPosition) { + public Channel Channel { get; } = channel; + public ulong? LastPosition { get; set; } = initialPosition; + } +} diff --git a/src/SignalR/src/Eventuous.SignalR.Client/SignalRSubscriptionClientOptions.cs b/src/SignalR/src/Eventuous.SignalR.Client/SignalRSubscriptionClientOptions.cs new file mode 100644 index 00000000..13e9f7cb --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Client/SignalRSubscriptionClientOptions.cs @@ -0,0 +1,9 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous.SignalR.Client; + +public class SignalRSubscriptionClientOptions { + public IEventSerializer? Serializer { get; set; } + public bool EnableTracing { get; set; } +} diff --git a/src/SignalR/src/Eventuous.SignalR.Client/StreamMeta.cs b/src/SignalR/src/Eventuous.SignalR.Client/StreamMeta.cs new file mode 100644 index 00000000..72b91ab0 --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Client/StreamMeta.cs @@ -0,0 +1,6 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous.SignalR.Client; + +public record StreamMeta(string Stream, ulong Position, DateTime Timestamp); diff --git a/src/SignalR/src/Eventuous.SignalR.Client/TypedStreamSubscription.cs b/src/SignalR/src/Eventuous.SignalR.Client/TypedStreamSubscription.cs new file mode 100644 index 00000000..e745ce16 --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Client/TypedStreamSubscription.cs @@ -0,0 +1,152 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using System.Diagnostics; +using System.Text; +using System.Text.Json; +using System.Threading.Channels; +using Eventuous.Diagnostics; +using Microsoft.AspNetCore.SignalR.Client; + +namespace Eventuous.SignalR.Client; + +public class TypedStreamSubscription : IAsyncDisposable { + readonly SignalRSubscriptionClient _client; + readonly string _stream; + readonly ulong? _fromPosition; + readonly Dictionary> _handlers = new(); + Action? _errorHandler; + CancellationTokenSource? _cts; + Task? _consumeTask; + bool _started; + + internal TypedStreamSubscription(SignalRSubscriptionClient client, string stream, ulong? fromPosition) { + _client = client; + _stream = stream; + _fromPosition = fromPosition; + } + + public TypedStreamSubscription On(Func handler) where T : class { + if (_started) throw new InvalidOperationException("Cannot register handlers after StartAsync has been called."); + + var eventType = TypeMap.Instance.GetTypeName(); + _handlers[eventType] = (obj, meta) => handler((T)obj, meta); + return this; + } + + public TypedStreamSubscription OnError(Action handler) { + _errorHandler = handler; + return this; + } + + public async Task StartAsync(CancellationToken ct = default) { + if (_started) throw new InvalidOperationException("StartAsync has already been called."); + _started = true; + + _cts = CancellationTokenSource.CreateLinkedTokenSource(ct); + + var channel = Channel.CreateUnbounded( + new UnboundedChannelOptions { SingleReader = true, SingleWriter = true } + ); + _client.RegisterSubscription(_stream, new SignalRSubscriptionClient.SubscriptionState(channel, _fromPosition)); + + await _client.Connection.InvokeAsync( + SignalRSubscriptionMethods.Subscribe, + _stream, + _fromPosition, + _cts.Token + ).NoContext(); + + _consumeTask = ConsumeLoop(channel.Reader, _cts.Token); + } + + async Task ConsumeLoop(ChannelReader reader, CancellationToken ct) { + var serializer = _client.GetSerializer(); + var enableTracing = _client.TracingEnabled; + + try { + await foreach (var envelope in reader.ReadAllAsync(ct).NoContext(ct)) { + if (!_handlers.TryGetValue(envelope.EventType, out var handler)) continue; + + var payload = Encoding.UTF8.GetBytes(envelope.JsonPayload); + var result = serializer.DeserializeEvent(payload, envelope.EventType, "application/json"); + + if (result is not DeserializationResult.SuccessfullyDeserialized deserialized) continue; + + var meta = new StreamMeta(envelope.Stream, envelope.StreamPosition, envelope.Timestamp); + Activity? activity = null; + + try { + if (enableTracing && envelope.JsonMetadata != null) { + activity = StartTraceActivity(envelope.JsonMetadata); + } + + await handler(deserialized.Payload, meta).NoContext(); + } finally { + activity?.Dispose(); + } + } + } catch (OperationCanceledException) { + // Expected on dispose/cancellation + } catch (ChannelClosedException ex) { + // Channel completed with error (server subscription failure or connection closed) + _errorHandler?.Invoke(new StreamSubscriptionError { + Stream = _stream, + Message = ex.InnerException?.Message ?? "Subscription channel closed" + }); + } catch (Exception ex) { + _errorHandler?.Invoke(new StreamSubscriptionError { + Stream = _stream, + Message = ex.ToString() + }); + } + } + + static Activity? StartTraceActivity(string jsonMetadata) { + try { + var metaDict = JsonSerializer.Deserialize>(jsonMetadata); + if (metaDict == null) return null; + + var metadata = new Metadata(metaDict); + var tracingMeta = metadata.GetTracingMeta(); + var parentContext = tracingMeta.ToActivityContext(isRemote: true); + + if (parentContext == null) return null; + + return EventuousDiagnostics.ActivitySource.StartActivity( + "signalr.consume", + ActivityKind.Consumer, + parentContext.Value + ); + } catch (Exception) { + // Tracing is best-effort; malformed metadata must not break event consumption + return null; + } + } + + public async ValueTask DisposeAsync() { + if (_cts != null) { + await _cts.CancelAsync().NoContext(); + + if (_consumeTask != null) { + try { await _consumeTask.NoContext(); } catch (OperationCanceledException) { /* expected */ } + } + + _cts.Dispose(); + } + + if (_started) { + _client.RemoveSubscription(_stream); + try { + await _client.Connection.InvokeAsync( + SignalRSubscriptionMethods.Unsubscribe, + _stream + ).NoContext(); + } catch (OperationCanceledException) { + // Expected during shutdown + } catch (ObjectDisposedException) { + // Connection already torn down + } + } + } +} diff --git a/src/SignalR/src/Eventuous.SignalR.Server/Contracts/SignalRSubscriptionMethods.cs b/src/SignalR/src/Eventuous.SignalR.Server/Contracts/SignalRSubscriptionMethods.cs new file mode 100644 index 00000000..7677d9d9 --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Server/Contracts/SignalRSubscriptionMethods.cs @@ -0,0 +1,11 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous.SignalR; + +public static class SignalRSubscriptionMethods { + public const string Subscribe = "SubscribeToStream"; + public const string Unsubscribe = "UnsubscribeFromStream"; + public const string StreamEvent = "StreamEvent"; + public const string StreamError = "StreamError"; +} diff --git a/src/SignalR/src/Eventuous.SignalR.Server/Contracts/StreamEventEnvelope.cs b/src/SignalR/src/Eventuous.SignalR.Server/Contracts/StreamEventEnvelope.cs new file mode 100644 index 00000000..cb5d98bb --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Server/Contracts/StreamEventEnvelope.cs @@ -0,0 +1,15 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous.SignalR; + +public record StreamEventEnvelope { + public required Guid EventId { get; init; } + public required string Stream { get; init; } + public required string EventType { get; init; } + public required ulong StreamPosition { get; init; } + public required ulong GlobalPosition { get; init; } + public required DateTime Timestamp { get; init; } + public required string JsonPayload { get; init; } + public string? JsonMetadata { get; init; } +} diff --git a/src/SignalR/src/Eventuous.SignalR.Server/Contracts/StreamSubscriptionError.cs b/src/SignalR/src/Eventuous.SignalR.Server/Contracts/StreamSubscriptionError.cs new file mode 100644 index 00000000..d600b306 --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Server/Contracts/StreamSubscriptionError.cs @@ -0,0 +1,9 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous.SignalR; + +public record StreamSubscriptionError { + public required string Stream { get; init; } + public required string Message { get; init; } +} diff --git a/src/SignalR/src/Eventuous.SignalR.Server/Eventuous.SignalR.Server.csproj b/src/SignalR/src/Eventuous.SignalR.Server/Eventuous.SignalR.Server.csproj new file mode 100644 index 00000000..175c5f4e --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Server/Eventuous.SignalR.Server.csproj @@ -0,0 +1,21 @@ + + + + + + + + + + + + + + + + + Tools\TaskExtensions.cs + + + + diff --git a/src/SignalR/src/Eventuous.SignalR.Server/Registrations/SignalRGatewayRegistrations.cs b/src/SignalR/src/Eventuous.SignalR.Server/Registrations/SignalRGatewayRegistrations.cs new file mode 100644 index 00000000..8f218eb9 --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Server/Registrations/SignalRGatewayRegistrations.cs @@ -0,0 +1,23 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using Eventuous.SignalR.Server; +using Microsoft.AspNetCore.SignalR; + +namespace Microsoft.Extensions.DependencyInjection; + +public static class SignalRGatewayRegistrations { + public static IServiceCollection AddSignalRSubscriptionGateway( + this IServiceCollection services, + Action configure + ) where THub : Hub { + services.AddSingleton(sp => { + var options = new SignalRGatewayOptions { SubscriptionFactory = null! }; + configure(sp, options); + return options; + }); + services.AddSingleton>(); + services.AddSingleton>(); + return services; + } +} diff --git a/src/SignalR/src/Eventuous.SignalR.Server/SignalRGatewayOptions.cs b/src/SignalR/src/Eventuous.SignalR.Server/SignalRGatewayOptions.cs new file mode 100644 index 00000000..c925fa4f --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Server/SignalRGatewayOptions.cs @@ -0,0 +1,14 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using Eventuous.Subscriptions.Filters; + +namespace Eventuous.SignalR.Server; + +public delegate IMessageSubscription SubscriptionFactory( + StreamName stream, ulong? fromPosition, ConsumePipe pipe, string subscriptionId +); + +public class SignalRGatewayOptions { + public required SubscriptionFactory SubscriptionFactory { get; set; } +} diff --git a/src/SignalR/src/Eventuous.SignalR.Server/SignalRProduceOptions.cs b/src/SignalR/src/Eventuous.SignalR.Server/SignalRProduceOptions.cs new file mode 100644 index 00000000..ae6cea2b --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Server/SignalRProduceOptions.cs @@ -0,0 +1,6 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +namespace Eventuous.SignalR.Server; + +public record SignalRProduceOptions(string ConnectionId); diff --git a/src/SignalR/src/Eventuous.SignalR.Server/SignalRProducer.cs b/src/SignalR/src/Eventuous.SignalR.Server/SignalRProducer.cs new file mode 100644 index 00000000..bb137612 --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Server/SignalRProducer.cs @@ -0,0 +1,32 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using Eventuous.Producers.Diagnostics; +using Microsoft.AspNetCore.SignalR; + +namespace Eventuous.SignalR.Server; + +public class SignalRProducer(IHubContext hubContext) + : BaseProducer(new ProducerTracingOptions { MessagingSystem = "signalr" }) + where THub : Hub { + + [RequiresDynamicCode("Only works with AOT when using DefaultStaticEventSerializer")] + [RequiresUnreferencedCode("Only works with AOT when using DefaultStaticEventSerializer")] + protected override async Task ProduceMessages( + StreamName stream, + IEnumerable messages, + SignalRProduceOptions? options, + CancellationToken cancellationToken = default + ) { + ArgumentNullException.ThrowIfNull(options); + var client = hubContext.Clients.Client(options.ConnectionId); + + foreach (var msg in messages) { + await client.SendAsync( + SignalRSubscriptionMethods.StreamEvent, + msg.Message, + cancellationToken + ).NoContext(); + } + } +} diff --git a/src/SignalR/src/Eventuous.SignalR.Server/SignalRSubscriptionHub.cs b/src/SignalR/src/Eventuous.SignalR.Server/SignalRSubscriptionHub.cs new file mode 100644 index 00000000..7ae225ea --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Server/SignalRSubscriptionHub.cs @@ -0,0 +1,17 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using Microsoft.AspNetCore.SignalR; + +namespace Eventuous.SignalR.Server; + +public class SignalRSubscriptionHub(SubscriptionGateway gateway) : Hub { + public Task SubscribeToStream(string stream, ulong? fromPosition) + => gateway.SubscribeAsync(Context.ConnectionId, stream, fromPosition, Context.ConnectionAborted); + + public Task UnsubscribeFromStream(string stream) + => gateway.UnsubscribeAsync(Context.ConnectionId, stream); + + public override Task OnDisconnectedAsync(Exception? exception) + => gateway.RemoveConnectionAsync(Context.ConnectionId); +} diff --git a/src/SignalR/src/Eventuous.SignalR.Server/SignalRTransform.cs b/src/SignalR/src/Eventuous.SignalR.Server/SignalRTransform.cs new file mode 100644 index 00000000..256a535b --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Server/SignalRTransform.cs @@ -0,0 +1,33 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using System.Text; +using System.Text.Json; +using Eventuous.Subscriptions.Context; + +namespace Eventuous.SignalR.Server; + +public static class SignalRTransform { + public static RouteAndTransform Create( + string connectionId, string stream, IEventSerializer serializer + ) => ctx => { + var result = serializer.SerializeEvent(ctx.Message!); + var envelope = new StreamEventEnvelope { + EventId = Guid.TryParse(ctx.MessageId, out var id) ? id : Guid.NewGuid(), + Stream = stream, + EventType = ctx.MessageType, + StreamPosition = ctx.StreamPosition, + GlobalPosition = ctx.GlobalPosition, + Timestamp = ctx.Created, + JsonPayload = Encoding.UTF8.GetString(result.Payload), + JsonMetadata = ctx.Metadata is { Count: > 0 } + ? JsonSerializer.Serialize(ctx.Metadata.ToDictionary(kv => kv.Key, kv => kv.Value)) + : null + }; + return ValueTask.FromResult(new[] { + new GatewayMessage( + new StreamName(stream), envelope, ctx.Metadata, new SignalRProduceOptions(connectionId) + ) + }); + }; +} diff --git a/src/SignalR/src/Eventuous.SignalR.Server/SubscriptionGateway.cs b/src/SignalR/src/Eventuous.SignalR.Server/SubscriptionGateway.cs new file mode 100644 index 00000000..691ba559 --- /dev/null +++ b/src/SignalR/src/Eventuous.SignalR.Server/SubscriptionGateway.cs @@ -0,0 +1,118 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using System.Collections.Concurrent; +using Eventuous.Subscriptions.Filters; +using Microsoft.AspNetCore.SignalR; +using Microsoft.Extensions.Logging; + +namespace Eventuous.SignalR.Server; + +public class SubscriptionGateway : IAsyncDisposable where THub : Hub { + readonly IHubContext _hubContext; + readonly SignalRProducer _producer; + readonly SubscriptionFactory _subscriptionFactory; + readonly IEventSerializer _eventSerializer; + readonly ILogger _logger; + readonly ConcurrentDictionary<(string ConnectionId, string Stream), SubscriptionState> _subscriptions = new(); + + public SubscriptionGateway( + IHubContext hubContext, + SignalRProducer producer, + SignalRGatewayOptions options, + ILoggerFactory loggerFactory, + IEventSerializer? eventSerializer = null + ) { + _hubContext = hubContext; + _producer = producer; + _subscriptionFactory = options.SubscriptionFactory; + _eventSerializer = eventSerializer ?? DefaultEventSerializer.Instance; + _logger = loggerFactory.CreateLogger>(); + } + + public async Task SubscribeAsync(string connectionId, string stream, ulong? fromPosition, CancellationToken ct = default) { + var key = (connectionId, stream); + + // Remove existing subscription for same key + if (_subscriptions.TryRemove(key, out var existing)) { + await StopSubscription(existing).NoContext(); + } + + var transform = SignalRTransform.Create(connectionId, stream, _eventSerializer); + var handler = GatewayHandlerFactory.Create(_producer, transform, awaitProduce: true); + var pipe = new ConsumePipe(); + pipe.AddDefaultConsumer(handler); + + var subscriptionId = $"signalr-{connectionId}-{stream}"; + var subscription = _subscriptionFactory(new StreamName(stream), fromPosition, pipe, subscriptionId); + + var cts = CancellationTokenSource.CreateLinkedTokenSource(ct); + var state = new SubscriptionState(subscription, pipe, cts); + + _subscriptions[key] = state; + + // Start subscription in background + _ = Task.Run(async () => { + try { + await subscription.Subscribe( + _ => _logger.LogDebug("Subscribed {SubscriptionId}", subscriptionId), + (id, reason, ex) => _logger.LogWarning(ex, "Subscription {SubscriptionId} dropped: {Reason}", id, reason), + cts.Token + ).NoContext(); + } catch (OperationCanceledException) { + // Expected on unsubscribe + } catch (Exception ex) { + _logger.LogError(ex, "Subscription {SubscriptionId} failed", subscriptionId); + _subscriptions.TryRemove(key, out _); + + // Notify the client about the failure + try { + var client = _hubContext.Clients.Client(connectionId); + await client.SendAsync( + SignalRSubscriptionMethods.StreamError, + new StreamSubscriptionError { Stream = stream, Message = ex.Message }, + CancellationToken.None + ).NoContext(); + } catch (Exception notifyEx) { + _logger.LogDebug(notifyEx, "Failed to notify client {ConnectionId} about subscription error", connectionId); + } + } + }, cts.Token); + } + + public async Task UnsubscribeAsync(string connectionId, string stream) { + if (_subscriptions.TryRemove((connectionId, stream), out var state)) { + await StopSubscription(state).NoContext(); + } + } + + public async Task RemoveConnectionAsync(string connectionId) { + foreach (var key in _subscriptions.Keys.Where(k => k.ConnectionId == connectionId).ToList()) { + if (_subscriptions.TryRemove(key, out var state)) { + await StopSubscription(state).NoContext(); + } + } + } + + async Task StopSubscription(SubscriptionState state) { + await state.Cts.CancelAsync(); + try { + await state.Subscription.Unsubscribe(_ => { }, CancellationToken.None).NoContext(); + } catch (OperationCanceledException) { + // Expected during cancellation + } catch (Exception ex) { + _logger.LogWarning(ex, "Error during subscription unsubscribe cleanup"); + } + await state.Pipe.DisposeAsync().NoContext(); + state.Cts.Dispose(); + } + + public async ValueTask DisposeAsync() { + foreach (var (_, state) in _subscriptions) { + await StopSubscription(state).NoContext(); + } + _subscriptions.Clear(); + } + + record SubscriptionState(IMessageSubscription Subscription, ConsumePipe Pipe, CancellationTokenSource Cts); +} diff --git a/src/SignalR/test/Eventuous.Tests.SignalR.Integration/Eventuous.Tests.SignalR.Integration.csproj b/src/SignalR/test/Eventuous.Tests.SignalR.Integration/Eventuous.Tests.SignalR.Integration.csproj new file mode 100644 index 00000000..6cd00723 --- /dev/null +++ b/src/SignalR/test/Eventuous.Tests.SignalR.Integration/Eventuous.Tests.SignalR.Integration.csproj @@ -0,0 +1,24 @@ + + + true + Exe + + + + + + + + + + + + + + + + + + + + diff --git a/src/SignalR/test/Eventuous.Tests.SignalR.Integration/SignalREndToEndTests.cs b/src/SignalR/test/Eventuous.Tests.SignalR.Integration/SignalREndToEndTests.cs new file mode 100644 index 00000000..7ed8ab46 --- /dev/null +++ b/src/SignalR/test/Eventuous.Tests.SignalR.Integration/SignalREndToEndTests.cs @@ -0,0 +1,223 @@ +extern alias SignalRClient; + +using System.Runtime.InteropServices; +using Eventuous.KurrentDB; +using Eventuous.SignalR.Server; +using Eventuous.Subscriptions.Filters; +using KurrentDB.Client; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.SignalR.Client; +using Microsoft.AspNetCore.TestHost; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Testcontainers.KurrentDb; +using ClientTypes = SignalRClient::Eventuous.SignalR.Client; + +namespace Eventuous.Tests.SignalR.Integration; + +using KurrentStreamSubscription = Eventuous.KurrentDB.Subscriptions.StreamSubscription; +using KurrentStreamSubscriptionOptions = Eventuous.KurrentDB.Subscriptions.StreamSubscriptionOptions; + +[EventType("TestOrderPlaced")] +record TestOrderPlaced(string OrderId, decimal Amount); + +[EventType("TestOrderShipped")] +record TestOrderShipped(string OrderId, string TrackingNumber); + +public class SignalREndToEndTests : IAsyncDisposable { + KurrentDbContainer _container = null!; + WebApplication _app = null!; + TestServer _server = null!; + IEventStore _eventStore = null!; + + [Before(Test)] + public async Task Setup() { + TypeMap.Instance.RegisterKnownEventTypes(typeof(TestOrderPlaced).Assembly); + + var image = RuntimeInformation.ProcessArchitecture == Architecture.Arm64 + ? "kurrentplatform/kurrentdb:25.1.3-experimental-arm64-8.0-jammy" + : "kurrentplatform/kurrentdb:25.1.3"; + + _container = new KurrentDbBuilder() + .WithImage(image) + .WithEnvironment("KURRENTDB_ENABLE_ATOM_PUB_OVER_HTTP", "true") + .Build(); + + await _container.StartAsync(); + + var builder = WebApplication.CreateBuilder(); + builder.WebHost.UseTestServer(); + builder.Logging.SetMinimumLevel(LogLevel.Debug); + + builder.Services.AddSignalR(); + builder.Services.AddKurrentDBClient(_container.GetConnectionString()); + builder.Services.AddEventStore(); + + builder.Services.AddSignalRSubscriptionGateway((sp, options) => { + var client = sp.GetRequiredService(); + var loggerFactory = sp.GetRequiredService(); + + options.SubscriptionFactory = (stream, fromPosition, pipe, subscriptionId) => + new KurrentStreamSubscription( + client, + new KurrentStreamSubscriptionOptions { + StreamName = stream, + SubscriptionId = subscriptionId + }, + new NoOpCheckpointStore(fromPosition), + pipe, + loggerFactory + ); + }); + + _app = builder.Build(); + _app.MapHub("/subscriptions"); + + await _app.StartAsync(); + + _server = _app.GetTestServer(); + _eventStore = _app.Services.GetRequiredService(); + } + + HubConnection CreateHubConnection() => + new HubConnectionBuilder() + .WithUrl( + "http://localhost/subscriptions", + opts => opts.HttpMessageHandlerFactory = _ => _server.CreateHandler() + ) + .Build(); + + async Task AppendEvents(string stream, params object[] events) { + var streamEvents = events + .Select(e => new NewStreamEvent(Guid.NewGuid(), e, new Metadata())) + .ToArray(); + + await _eventStore.AppendEvents( + new StreamName(stream), + ExpectedStreamVersion.Any, + streamEvents, + default + ); + } + + [Test] + public async Task RawStreaming_ReceivesAppendedEvents() { + var stream = $"Order-{Guid.NewGuid():N}"; + + await AppendEvents(stream, + new TestOrderPlaced("order-1", 99.99m), + new TestOrderShipped("order-1", "TRACK-123") + ); + + var connection = CreateHubConnection(); + await connection.StartAsync(); + var client = new ClientTypes.SignalRSubscriptionClient(connection); + + var received = new List(); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + await foreach (var envelope in client.SubscribeAsync(stream, null, cts.Token)) { + received.Add(envelope); + if (received.Count >= 2) break; + } + + await Assert.That(received).HasCount().EqualTo(2); + await Assert.That(received[0].EventType).IsEqualTo("TestOrderPlaced"); + await Assert.That(received[1].EventType).IsEqualTo("TestOrderShipped"); + await Assert.That(received[0].StreamPosition).IsEqualTo(0UL); + await Assert.That(received[1].StreamPosition).IsEqualTo(1UL); + await Assert.That(received[0].JsonPayload).Contains("order-1"); + + await client.DisposeAsync(); + await connection.DisposeAsync(); + } + + [Test] + public async Task TypedSubscription_DispatchesToCorrectHandlers() { + var stream = $"Order-{Guid.NewGuid():N}"; + + await AppendEvents(stream, + new TestOrderPlaced("order-2", 149.99m), + new TestOrderShipped("order-2", "TRACK-456") + ); + + var connection = CreateHubConnection(); + await connection.StartAsync(); + var client = new ClientTypes.SignalRSubscriptionClient(connection); + + var placedEvents = new List(); + var shippedEvents = new List(); + var done = new TaskCompletionSource(); + var count = 0; + + var sub = client.SubscribeTyped(stream, null) + .On((evt, meta) => { + placedEvents.Add(evt); + if (Interlocked.Increment(ref count) >= 2) done.TrySetResult(); + return ValueTask.CompletedTask; + }) + .On((evt, meta) => { + shippedEvents.Add(evt); + if (Interlocked.Increment(ref count) >= 2) done.TrySetResult(); + return ValueTask.CompletedTask; + }); + + await sub.StartAsync(); + + var completed = await Task.WhenAny(done.Task, Task.Delay(TimeSpan.FromSeconds(10))); + await Assert.That(completed).IsEqualTo(done.Task); + + await Assert.That(placedEvents).HasCount().EqualTo(1); + await Assert.That(placedEvents[0].OrderId).IsEqualTo("order-2"); + await Assert.That(placedEvents[0].Amount).IsEqualTo(149.99m); + + await Assert.That(shippedEvents).HasCount().EqualTo(1); + await Assert.That(shippedEvents[0].TrackingNumber).IsEqualTo("TRACK-456"); + + await sub.DisposeAsync(); + await client.DisposeAsync(); + await connection.DisposeAsync(); + } + + [Test] + public async Task LiveEvents_DeliveredAfterSubscribe() { + var stream = $"Order-{Guid.NewGuid():N}"; + + var connection = CreateHubConnection(); + await connection.StartAsync(); + var client = new ClientTypes.SignalRSubscriptionClient(connection); + + var received = new List(); + var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10)); + + var consumeTask = Task.Run(async () => { + await foreach (var envelope in client.SubscribeAsync(stream, null, cts.Token)) { + received.Add(envelope); + if (received.Count >= 2) break; + } + }, cts.Token); + + // Give the subscription time to start on the server + await Task.Delay(1000); + + await AppendEvents(stream, + new TestOrderPlaced("order-3", 200m), + new TestOrderShipped("order-3", "TRACK-789") + ); + + await consumeTask.WaitAsync(TimeSpan.FromSeconds(10)); + + await Assert.That(received).HasCount().EqualTo(2); + await Assert.That(received[0].EventType).IsEqualTo("TestOrderPlaced"); + await Assert.That(received[1].EventType).IsEqualTo("TestOrderShipped"); + + await client.DisposeAsync(); + await connection.DisposeAsync(); + } + + public async ValueTask DisposeAsync() { + await _app.DisposeAsync(); + await _container.DisposeAsync(); + } +} diff --git a/src/SignalR/test/Eventuous.Tests.SignalR/Eventuous.Tests.SignalR.csproj b/src/SignalR/test/Eventuous.Tests.SignalR/Eventuous.Tests.SignalR.csproj new file mode 100644 index 00000000..b67c99fc --- /dev/null +++ b/src/SignalR/test/Eventuous.Tests.SignalR/Eventuous.Tests.SignalR.csproj @@ -0,0 +1,19 @@ + + + Exe + + + + + + + + + + + + + + + + diff --git a/src/SignalR/test/Eventuous.Tests.SignalR/SignalRProducerTests.cs b/src/SignalR/test/Eventuous.Tests.SignalR/SignalRProducerTests.cs new file mode 100644 index 00000000..f66fe6d1 --- /dev/null +++ b/src/SignalR/test/Eventuous.Tests.SignalR/SignalRProducerTests.cs @@ -0,0 +1,45 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using Eventuous.SignalR; +using Eventuous.SignalR.Server; +using Microsoft.AspNetCore.SignalR; +using NSubstitute; + +namespace Eventuous.Tests.SignalR; + +public class SignalRProducerTests { + [Test] + public async Task ProduceMessages_SendsEnvelopeToCorrectConnection() { + var hubContext = Substitute.For>(); + var hubClients = Substitute.For(); + var clientProxy = Substitute.For(); + hubContext.Clients.Returns(hubClients); + hubClients.Client("conn-1").Returns(clientProxy); + + var producer = new SignalRProducer(hubContext); + var envelope = new Eventuous.SignalR.StreamEventEnvelope { + EventId = Guid.NewGuid(), + Stream = "Test-1", + EventType = "TestEvent", + StreamPosition = 0, + GlobalPosition = 0, + Timestamp = DateTime.UtcNow, + JsonPayload = "{}" + }; + + await producer.Produce( + new StreamName("Test-1"), + [new ProducedMessage(envelope, new Metadata())], + new SignalRProduceOptions("conn-1") + ); + + await clientProxy.Received(1).SendCoreAsync( + Eventuous.SignalR.SignalRSubscriptionMethods.StreamEvent, + Arg.Is(args => args.Length == 1 && args[0] is Eventuous.SignalR.StreamEventEnvelope), + Arg.Any() + ).ConfigureAwait(false); + } +} + +public class TestHub : Hub; diff --git a/src/SignalR/test/Eventuous.Tests.SignalR/SignalRTransformTests.cs b/src/SignalR/test/Eventuous.Tests.SignalR/SignalRTransformTests.cs new file mode 100644 index 00000000..7f2772cf --- /dev/null +++ b/src/SignalR/test/Eventuous.Tests.SignalR/SignalRTransformTests.cs @@ -0,0 +1,73 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using Eventuous.SignalR; +using Eventuous.SignalR.Server; +using Eventuous.Subscriptions.Context; + +namespace Eventuous.Tests.SignalR; + +public class SignalRTransformTests { + [Test] + public async Task Transform_CreatesCorrectEnvelope() { + TypeMap.RegisterKnownEventTypes(typeof(TransformTestEvent).Assembly); + var serializer = DefaultEventSerializer.Instance; + var transform = SignalRTransform.Create("conn-1", "Test-1", serializer); + + var ctx = new MessageConsumeContext( + "aabbccdd-1234-5678-9012-aabbccddeeff", + "TransformTestEvent", + "application/json", + "Test-1", + 0, 5, 42, 0, + DateTime.UtcNow, + new TransformTestEvent("hello"), + null, + "test-sub", + CancellationToken.None + ); + + var result = await transform(ctx); + + await Assert.That(result).HasCount().EqualTo(1); + + var gm = result[0]; + var envelope = (Eventuous.SignalR.StreamEventEnvelope)gm.Message; + await Assert.That(envelope.Stream).IsEqualTo("Test-1"); + await Assert.That(envelope.EventType).IsEqualTo("TransformTestEvent"); + await Assert.That(envelope.StreamPosition).IsEqualTo(5UL); + await Assert.That(envelope.GlobalPosition).IsEqualTo(42UL); + await Assert.That(envelope.JsonPayload).IsNotNull(); + await Assert.That(envelope.JsonMetadata).IsNull(); + await Assert.That(gm.ProduceOptions.ConnectionId).IsEqualTo("conn-1"); + } + + [Test] + public async Task Transform_IncludesMetadataWhenPresent() { + TypeMap.RegisterKnownEventTypes(typeof(TransformTestEvent).Assembly); + var serializer = DefaultEventSerializer.Instance; + var transform = SignalRTransform.Create("conn-2", "Test-2", serializer); + + var meta = new Metadata { ["key1"] = "value1" }; + var ctx = new MessageConsumeContext( + Guid.NewGuid().ToString(), + "TransformTestEvent", + "application/json", + "Test-2", + 0, 10, 100, 0, + DateTime.UtcNow, + new TransformTestEvent("world"), + meta, + "test-sub", + CancellationToken.None + ); + + var result = await transform(ctx); + var envelope = (Eventuous.SignalR.StreamEventEnvelope)result[0].Message; + + await Assert.That(envelope.JsonMetadata).IsNotNull(); + } +} + +[EventType("TransformTestEvent")] +record TransformTestEvent(string Value); diff --git a/src/SignalR/test/Eventuous.Tests.SignalR/SubscriptionGatewayTests.cs b/src/SignalR/test/Eventuous.Tests.SignalR/SubscriptionGatewayTests.cs new file mode 100644 index 00000000..534d308a --- /dev/null +++ b/src/SignalR/test/Eventuous.Tests.SignalR/SubscriptionGatewayTests.cs @@ -0,0 +1,108 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using System.Collections.Concurrent; +using Eventuous.SignalR.Server; +using Eventuous.Subscriptions; +using Eventuous.Subscriptions.Filters; +using Microsoft.AspNetCore.SignalR; +using Microsoft.Extensions.Logging.Abstractions; +using NSubstitute; + +namespace Eventuous.Tests.SignalR; + +public class SubscriptionGatewayTests { + readonly List<(StreamName Stream, string SubId)> _factoryCalls = []; + readonly ConcurrentDictionary _createdSubscriptions = new(); + + SubscriptionGateway CreateGateway() { + var hubContext = Substitute.For>(); + var hubClients = Substitute.For(); + var clientProxy = Substitute.For(); + hubContext.Clients.Returns(hubClients); + hubClients.Client(Arg.Any()).Returns(clientProxy); + + var producer = new SignalRProducer(hubContext); + var options = new SignalRGatewayOptions { + SubscriptionFactory = (stream, fromPosition, pipe, subscriptionId) => { + _factoryCalls.Add((stream, subscriptionId)); + var sub = Substitute.For(); + sub.SubscriptionId.Returns(subscriptionId); + // Make Subscribe block until cancelled + sub.Subscribe(Arg.Any(), Arg.Any(), Arg.Any()) + .Returns(ci => new ValueTask(Task.Run(async () => { + var token = ci.ArgAt(2); + try { await Task.Delay(Timeout.Infinite, token); } + catch (OperationCanceledException) { /* Expected: cancelled when unsubscribed */ } + }))); + sub.Unsubscribe(Arg.Any(), Arg.Any()) + .Returns(ValueTask.CompletedTask); + _createdSubscriptions[subscriptionId] = sub; + return sub; + } + }; + return new SubscriptionGateway(hubContext, producer, options, NullLoggerFactory.Instance); + } + + [Test] + public async Task SubscribeAsync_CreatesSubscriptionViaFactory() { + await using var gateway = CreateGateway(); + await gateway.SubscribeAsync("conn-1", "Test-1", null); + await Task.Delay(50); // Let background task start + + await Assert.That(_factoryCalls).HasCount().EqualTo(1); + await Assert.That(_factoryCalls[0].SubId).IsEqualTo("signalr-conn-1-Test-1"); + } + + [Test] + public async Task UnsubscribeAsync_RemovesSubscription() { + await using var gateway = CreateGateway(); + await gateway.SubscribeAsync("conn-1", "Test-1", null); + await Task.Delay(50); + + await gateway.UnsubscribeAsync("conn-1", "Test-1"); + await Task.Delay(50); + + // Verify unsubscribe was called + var sub = _createdSubscriptions["signalr-conn-1-Test-1"]; + await sub.Received(1).Unsubscribe(Arg.Any(), Arg.Any()); + } + + [Test] + public async Task RemoveConnectionAsync_CleansUpAllSubscriptions() { + await using var gateway = CreateGateway(); + await gateway.SubscribeAsync("conn-1", "Stream-A", null); + await gateway.SubscribeAsync("conn-1", "Stream-B", null); + await gateway.SubscribeAsync("conn-2", "Stream-A", null); + await Task.Delay(50); + + await gateway.RemoveConnectionAsync("conn-1"); + await Task.Delay(50); + + // conn-1 subs should be unsubscribed + var subA = _createdSubscriptions["signalr-conn-1-Stream-A"]; + var subB = _createdSubscriptions["signalr-conn-1-Stream-B"]; + await subA.Received(1).Unsubscribe(Arg.Any(), Arg.Any()); + await subB.Received(1).Unsubscribe(Arg.Any(), Arg.Any()); + + // conn-2 sub should NOT be unsubscribed + var subC = _createdSubscriptions["signalr-conn-2-Stream-A"]; + await subC.DidNotReceive().Unsubscribe(Arg.Any(), Arg.Any()); + } + + [Test] + public async Task DuplicateSubscribe_ReplacesPrevious() { + await using var gateway = CreateGateway(); + await gateway.SubscribeAsync("conn-1", "Test-1", null); + await Task.Delay(50); + + await gateway.SubscribeAsync("conn-1", "Test-1", 42); + await Task.Delay(50); + + await Assert.That(_factoryCalls).HasCount().EqualTo(2); + // First subscription should have been stopped + // Note: since both use the same key, the second overwrites in _createdSubscriptions + // So we check factory was called twice + await Assert.That(_factoryCalls).HasCount().EqualTo(2); + } +} diff --git a/src/SignalR/test/Eventuous.Tests.SignalR/TypedStreamSubscriptionTests.cs b/src/SignalR/test/Eventuous.Tests.SignalR/TypedStreamSubscriptionTests.cs new file mode 100644 index 00000000..146f1e3c --- /dev/null +++ b/src/SignalR/test/Eventuous.Tests.SignalR/TypedStreamSubscriptionTests.cs @@ -0,0 +1,90 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +extern alias SignalRClient; + +using SignalRClient::Eventuous.SignalR.Client; +using Microsoft.AspNetCore.SignalR.Client; + +namespace Eventuous.Tests.SignalR; + +public class TypedStreamSubscriptionTests { + static HubConnection BuildFakeConnection() + => new HubConnectionBuilder() + .WithUrl("http://localhost:9999/test") + .Build(); + + [Test] + public async Task On_BeforeStart_RegistersHandler() { + var connection = BuildFakeConnection(); + var client = new SignalRSubscriptionClient(connection); + var sub = client.SubscribeTyped("test-stream", null); + + // Register two handlers before start — should not throw + sub.On((_, _) => ValueTask.CompletedTask); + sub.On((_, _) => ValueTask.CompletedTask); + + await Assert.That(sub).IsNotNull(); + await client.DisposeAsync(); + } + + [Test] + public async Task On_AfterStart_Throws() { + // We can't call StartAsync (no real server), so we test the guard + // by verifying On() is allowed before start and blocks after. + // Since we can't actually start without a server, we test the object + // construction and pre-start handler registration. + var connection = BuildFakeConnection(); + var client = new SignalRSubscriptionClient(connection); + var sub = client.SubscribeTyped("test-stream", null); + + // Should work fine before start + await Assert.That(() => sub.On((_, _) => ValueTask.CompletedTask)).ThrowsNothing(); + + await client.DisposeAsync(); + } + + [Test] + public async Task OnError_CanBeChained() { + var connection = BuildFakeConnection(); + var client = new SignalRSubscriptionClient(connection); + var sub = client.SubscribeTyped("test-stream", null); + + // Fluent chaining should work + var result = sub + .On((_, _) => ValueTask.CompletedTask) + .OnError(_ => { }); + + await Assert.That(result).IsSameReferenceAs(sub); + await client.DisposeAsync(); + } + + [Test] + public async Task DisposeAsync_WithoutStart_IsNoOp() { + var connection = BuildFakeConnection(); + var client = new SignalRSubscriptionClient(connection); + var sub = client.SubscribeTyped("test-stream", null); + + // Should not throw even though StartAsync was never called + await Assert.That(async () => await sub.DisposeAsync()).ThrowsNothing(); + await client.DisposeAsync(); + } + + [Test] + public async Task SubscribeTyped_ReturnsNewInstanceEachTime() { + var connection = BuildFakeConnection(); + var client = new SignalRSubscriptionClient(connection); + + var sub1 = client.SubscribeTyped("stream-a", null); + var sub2 = client.SubscribeTyped("stream-b", null); + + await Assert.That(sub1).IsNotSameReferenceAs(sub2); + await client.DisposeAsync(); + } +} + +[EventType("TypedEvent1")] +record TypedEvent1(string Data); + +[EventType("TypedEvent2")] +record TypedEvent2(int Count);