diff --git a/.gitignore b/.gitignore
index 9f7eb828..67fdbf38 100644
--- a/.gitignore
+++ b/.gitignore
@@ -316,3 +316,5 @@ version.txt
/docs/.yarn/
BenchmarkDotNet.Artifacts/
+
+.worktrees/
diff --git a/Directory.Packages.props b/Directory.Packages.props
index c321698c..4b5fb3e1 100644
--- a/Directory.Packages.props
+++ b/Directory.Packages.props
@@ -95,6 +95,11 @@
+
+
+
+
+
diff --git a/Eventuous.slnx b/Eventuous.slnx
index 83e61f82..e32a93a7 100644
--- a/Eventuous.slnx
+++ b/Eventuous.slnx
@@ -162,6 +162,15 @@
+
+
+
+
+
+
+
+
+
diff --git a/docs/src/content/docs/next/infra/signalr.md b/docs/src/content/docs/next/infra/signalr.md
new file mode 100644
index 00000000..98f07768
--- /dev/null
+++ b/docs/src/content/docs/next/infra/signalr.md
@@ -0,0 +1,190 @@
+---
+title: "SignalR"
+description: "Real-time event streaming to clients via SignalR"
+sidebar:
+ order: 11
+---
+
+## Introduction
+
+The SignalR subscription gateway bridges Eventuous stream subscriptions to SignalR, enabling real-time event streaming to browser UIs, mobile apps, or other remote clients. It provides two NuGet packages:
+
+- **`Eventuous.SignalR.Server`** — server-side gateway that manages per-connection Eventuous subscriptions and forwards events over SignalR
+- **`Eventuous.SignalR.Client`** — client-side subscription API with auto-reconnect and typed event handling
+
+The server reuses the existing [Gateway](../../../gateway) pattern (`GatewayHandler` + `BaseProducer`) internally, so event forwarding benefits from the same tracing and metadata pipeline as other Eventuous producers.
+
+## Server
+
+### Registration
+
+Register the gateway with a subscription factory that creates store-specific subscriptions on demand:
+
+```csharp
+builder.Services.AddSignalRSubscriptionGateway((sp, options) => {
+ var client = sp.GetRequiredService();
+ var loggerFactory = sp.GetRequiredService();
+
+ options.SubscriptionFactory = (stream, fromPosition, pipe, subscriptionId) =>
+ new StreamSubscription(client, new StreamSubscriptionOptions {
+ StreamName = stream,
+ SubscriptionId = subscriptionId
+ }, new NoOpCheckpointStore(fromPosition), pipe, loggerFactory);
+});
+```
+
+The `SubscriptionFactory` delegate is called each time a client subscribes to a stream. It receives the stream name, optional starting position, a pre-built consume pipe, and a subscription identifier. You can use any Eventuous subscription type (KurrentDB, PostgreSQL, etc.).
+
+### Hub
+
+Map the ready-made hub to an endpoint:
+
+```csharp
+app.MapHub("/subscriptions");
+```
+
+The built-in `SignalRSubscriptionHub` exposes two methods that clients call:
+
+- `SubscribeToStream(string stream, ulong? fromPosition)` — start receiving events
+- `UnsubscribeFromStream(string stream)` — stop receiving events
+
+When a client disconnects, all its subscriptions are automatically cleaned up.
+
+### Custom hubs
+
+For applications that need a custom hub (e.g., adding authentication or authorization logic), inject `SubscriptionGateway` directly:
+
+```csharp
+public class MyHub(SubscriptionGateway gateway) : Hub {
+ public Task SubscribeToStream(string stream, ulong? fromPosition)
+ => gateway.SubscribeAsync(Context.ConnectionId, stream, fromPosition, Context.ConnectionAborted);
+
+ public Task UnsubscribeFromStream(string stream)
+ => gateway.UnsubscribeAsync(Context.ConnectionId, stream);
+
+ public override Task OnDisconnectedAsync(Exception? exception)
+ => gateway.RemoveConnectionAsync(Context.ConnectionId);
+}
+```
+
+Register with your custom hub type:
+
+```csharp
+builder.Services.AddSignalRSubscriptionGateway((sp, options) => {
+ // configure subscription factory
+});
+```
+
+## Client
+
+### Connection setup
+
+Create a `SignalRSubscriptionClient` from any `HubConnection`. The client hooks into SignalR's reconnect lifecycle but doesn't own the connection policy — configure automatic reconnect on the `HubConnection` itself:
+
+```csharp
+var connection = new HubConnectionBuilder()
+ .WithUrl("https://myserver/subscriptions")
+ .WithAutomaticReconnect()
+ .Build();
+
+await connection.StartAsync();
+
+var client = new SignalRSubscriptionClient(connection);
+```
+
+### Raw streaming with IAsyncEnumerable
+
+The simplest consumption mode returns events as they arrive:
+
+```csharp
+await foreach (var envelope in client.SubscribeAsync("Order-123", fromPosition: null)) {
+ Console.WriteLine($"{envelope.EventType} at position {envelope.StreamPosition}");
+ Console.WriteLine(envelope.JsonPayload);
+}
+```
+
+Each `StreamEventEnvelope` contains:
+
+| Property | Description |
+|---|---|
+| `EventId` | Unique event identifier |
+| `Stream` | Source stream name |
+| `EventType` | Registered event type name |
+| `StreamPosition` | Position within the stream |
+| `GlobalPosition` | Position in the global event log |
+| `Timestamp` | When the event was created |
+| `JsonPayload` | Event payload as JSON |
+| `JsonMetadata` | Event metadata as JSON (may include trace context) |
+
+### Typed consumption with On<T>
+
+For type-safe event handling, use `SubscribeTyped` with fluent handler registration:
+
+```csharp
+await client.SubscribeTyped("Order-123", fromPosition: 0)
+ .On((evt, meta) => {
+ Console.WriteLine($"Order placed: {evt.OrderId} at {meta.Timestamp}");
+ return ValueTask.CompletedTask;
+ })
+ .On((evt, meta) => {
+ Console.WriteLine($"Order shipped at position {meta.Position}");
+ return ValueTask.CompletedTask;
+ })
+ .OnError(err => Console.WriteLine($"Error on {err.Stream}: {err.Message}"))
+ .StartAsync();
+```
+
+Events are deserialized using the Eventuous `TypeMap` and `IEventSerializer`. Event types must be registered in `TypeMap` as usual (via `[EventType]` attribute or manual registration). Unrecognized event types are silently skipped.
+
+All `On` handlers must be registered before calling `StartAsync`. Calling `On` after `StartAsync` throws `InvalidOperationException`.
+
+### Client options
+
+```csharp
+var client = new SignalRSubscriptionClient(connection, new SignalRSubscriptionClientOptions {
+ Serializer = customSerializer, // default: DefaultEventSerializer.Instance
+ EnableTracing = true // default: false
+});
+```
+
+| Option | Description |
+|---|---|
+| `Serializer` | Custom `IEventSerializer` for deserializing event payloads in typed mode |
+| `EnableTracing` | When `true`, the client creates an `Activity` for each received event, linked to the trace context from metadata. Enable when the client has an OpenTelemetry collector configured. |
+
+## Auto-reconnect
+
+The client handles connection drops transparently:
+
+1. **Position tracking** — the client records the last stream position for each active subscription
+2. **Re-subscribe on reconnect** — when SignalR reconnects, the client re-sends `SubscribeToStream` for each active subscription with the last known position
+3. **Deduplication** — events at or before the last seen position are skipped, preventing duplicates after reconnect
+
+The server is stateless — it creates fresh subscriptions from the positions provided by the client.
+
+```
+Normal flow:
+ Client ──SubscribeToStream("Order-1", 42)──► Server
+ Client ◄──StreamEvent(pos=43)──────────────── Server
+ Client ◄──StreamEvent(pos=44)──────────────── Server
+ [tracks lastPosition = 44]
+
+Disconnect + Reconnect:
+ [connection drops, SignalR reconnects]
+ Client ──SubscribeToStream("Order-1", 44)──► Server
+ Client ◄──StreamEvent(pos=44)──────────────── Server [duplicate, skipped]
+ Client ◄──StreamEvent(pos=45)──────────────── Server [new, delivered]
+```
+
+## Wire format
+
+Events are transmitted as `StreamEventEnvelope` records over SignalR. The payload is pre-serialized JSON on the server side, avoiding polymorphic serialization issues. Metadata (including trace context) flows through `JsonMetadata` as a serialized dictionary.
+
+Trace context propagates through the existing Eventuous metadata pipeline: `$traceId` and `$spanId` keys in metadata are preserved from the original event through the gateway to the client. When `EnableTracing` is enabled on the client, the consume activity is linked to the original trace.
+
+## Packages
+
+| Package | Dependencies | Purpose |
+|---|---|---|
+| `Eventuous.SignalR.Server` | `Eventuous.Subscriptions`, `Eventuous.Gateway`, `Microsoft.AspNetCore.App` | Server-side gateway |
+| `Eventuous.SignalR.Client` | `Eventuous.Shared`, `Eventuous.Serialization`, `Eventuous.Diagnostics`, `Microsoft.AspNetCore.SignalR.Client` | Client-side subscriptions |
diff --git a/src/SignalR/src/Eventuous.SignalR.Client/Eventuous.SignalR.Client.csproj b/src/SignalR/src/Eventuous.SignalR.Client/Eventuous.SignalR.Client.csproj
new file mode 100644
index 00000000..018077b2
--- /dev/null
+++ b/src/SignalR/src/Eventuous.SignalR.Client/Eventuous.SignalR.Client.csproj
@@ -0,0 +1,19 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Tools\TaskExtensions.cs
+
+
+
+
diff --git a/src/SignalR/src/Eventuous.SignalR.Client/SignalRSubscriptionClient.cs b/src/SignalR/src/Eventuous.SignalR.Client/SignalRSubscriptionClient.cs
new file mode 100644
index 00000000..a9f26285
--- /dev/null
+++ b/src/SignalR/src/Eventuous.SignalR.Client/SignalRSubscriptionClient.cs
@@ -0,0 +1,171 @@
+// Copyright (C) Eventuous HQ OÜ. All rights reserved
+// Licensed under the Apache License, Version 2.0.
+
+using System.Collections.Concurrent;
+using System.Threading.Channels;
+using Microsoft.AspNetCore.SignalR.Client;
+
+namespace Eventuous.SignalR.Client;
+
+public class SignalRSubscriptionClient : IAsyncDisposable {
+ readonly HubConnection _connection;
+ readonly SignalRSubscriptionClientOptions _options;
+ readonly ConcurrentDictionary _subscriptions = new();
+ readonly IDisposable _eventRegistration;
+ readonly IDisposable _errorRegistration;
+ bool _disposed;
+
+ public SignalRSubscriptionClient(HubConnection connection, SignalRSubscriptionClientOptions? options = null) {
+ _connection = connection;
+ _options = options ?? new SignalRSubscriptionClientOptions();
+
+ _eventRegistration = _connection.On(
+ SignalRSubscriptionMethods.StreamEvent,
+ OnStreamEvent
+ );
+
+ _errorRegistration = _connection.On(
+ SignalRSubscriptionMethods.StreamError,
+ OnStreamError
+ );
+
+ _connection.Reconnected += OnReconnected;
+ _connection.Closed += OnClosed;
+ }
+
+ public async IAsyncEnumerable SubscribeAsync(
+ string stream,
+ ulong? fromPosition,
+ [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken ct = default
+ ) {
+ var channel = Channel.CreateUnbounded(
+ new UnboundedChannelOptions { SingleReader = true, SingleWriter = true }
+ );
+
+ // Complete any existing subscription for this stream before replacing
+ if (_subscriptions.TryRemove(stream, out var previous)) {
+ previous.Channel.Writer.TryComplete();
+ }
+
+ var state = new SubscriptionState(channel, fromPosition);
+ _subscriptions[stream] = state;
+
+ try {
+ await _connection.InvokeAsync(
+ SignalRSubscriptionMethods.Subscribe,
+ stream,
+ fromPosition,
+ ct
+ ).NoContext();
+
+ await foreach (var envelope in channel.Reader.ReadAllAsync(ct).NoContext(ct)) {
+ yield return envelope;
+ }
+ } finally {
+ _subscriptions.TryRemove(stream, out _);
+ }
+ }
+
+ public async Task UnsubscribeAsync(string stream) {
+ if (_subscriptions.TryRemove(stream, out var state)) {
+ state.Channel.Writer.TryComplete();
+ await _connection.InvokeAsync(
+ SignalRSubscriptionMethods.Unsubscribe,
+ stream
+ ).NoContext();
+ }
+ }
+
+ internal IEventSerializer GetSerializer()
+ => _options.Serializer ?? DefaultEventSerializer.Instance;
+
+ internal bool TracingEnabled => _options.EnableTracing;
+
+ internal SubscriptionState? GetSubscriptionState(string stream)
+ => _subscriptions.TryGetValue(stream, out var state) ? state : null;
+
+ internal void RegisterSubscription(string stream, SubscriptionState state)
+ => _subscriptions[stream] = state;
+
+ internal void RemoveSubscription(string stream) {
+ if (_subscriptions.TryRemove(stream, out var state)) {
+ state.Channel.Writer.TryComplete();
+ }
+ }
+
+ internal HubConnection Connection => _connection;
+
+ public TypedStreamSubscription SubscribeTyped(string stream, ulong? fromPosition)
+ => new(this, stream, fromPosition);
+
+ void OnStreamEvent(StreamEventEnvelope envelope) {
+ if (!_subscriptions.TryGetValue(envelope.Stream, out var state)) return;
+
+ // Deduplication: skip events at or before the last seen position
+ if (state.LastPosition.HasValue && envelope.StreamPosition <= state.LastPosition.Value) return;
+
+ state.LastPosition = envelope.StreamPosition;
+ state.Channel.Writer.TryWrite(envelope);
+ }
+
+ void OnStreamError(StreamSubscriptionError error) {
+ if (_subscriptions.TryRemove(error.Stream, out var state)) {
+ state.Channel.Writer.TryComplete(new Exception(error.Message));
+ }
+ }
+
+ async Task OnReconnected(string? connectionId) {
+ foreach (var (stream, state) in _subscriptions) {
+ try {
+ await _connection.InvokeAsync(
+ SignalRSubscriptionMethods.Subscribe,
+ stream,
+ state.LastPosition,
+ CancellationToken.None
+ ).NoContext();
+ } catch (OperationCanceledException) {
+ // Connection was disposed during reconnect
+ } catch (ObjectDisposedException) {
+ // Connection already torn down
+ }
+ }
+ }
+
+ Task OnClosed(Exception? exception) {
+ foreach (var (_, state) in _subscriptions) {
+ state.Channel.Writer.TryComplete(exception);
+ }
+ return Task.CompletedTask;
+ }
+
+ public async ValueTask DisposeAsync() {
+ if (_disposed) return;
+ _disposed = true;
+
+ _connection.Reconnected -= OnReconnected;
+ _connection.Closed -= OnClosed;
+
+ foreach (var (stream, state) in _subscriptions) {
+ state.Channel.Writer.TryComplete();
+ try {
+ await _connection.InvokeAsync(
+ SignalRSubscriptionMethods.Unsubscribe,
+ stream
+ ).NoContext();
+ } catch (OperationCanceledException) {
+ // Expected during shutdown
+ } catch (ObjectDisposedException) {
+ // Connection already torn down
+ }
+ }
+
+ _subscriptions.Clear();
+ _eventRegistration.Dispose();
+ _errorRegistration.Dispose();
+ }
+
+ internal class SubscriptionState(Channel channel, ulong? initialPosition) {
+ public Channel Channel { get; } = channel;
+ public ulong? LastPosition { get; set; } = initialPosition;
+ }
+}
diff --git a/src/SignalR/src/Eventuous.SignalR.Client/SignalRSubscriptionClientOptions.cs b/src/SignalR/src/Eventuous.SignalR.Client/SignalRSubscriptionClientOptions.cs
new file mode 100644
index 00000000..13e9f7cb
--- /dev/null
+++ b/src/SignalR/src/Eventuous.SignalR.Client/SignalRSubscriptionClientOptions.cs
@@ -0,0 +1,9 @@
+// Copyright (C) Eventuous HQ OÜ. All rights reserved
+// Licensed under the Apache License, Version 2.0.
+
+namespace Eventuous.SignalR.Client;
+
+public class SignalRSubscriptionClientOptions {
+ public IEventSerializer? Serializer { get; set; }
+ public bool EnableTracing { get; set; }
+}
diff --git a/src/SignalR/src/Eventuous.SignalR.Client/StreamMeta.cs b/src/SignalR/src/Eventuous.SignalR.Client/StreamMeta.cs
new file mode 100644
index 00000000..72b91ab0
--- /dev/null
+++ b/src/SignalR/src/Eventuous.SignalR.Client/StreamMeta.cs
@@ -0,0 +1,6 @@
+// Copyright (C) Eventuous HQ OÜ. All rights reserved
+// Licensed under the Apache License, Version 2.0.
+
+namespace Eventuous.SignalR.Client;
+
+public record StreamMeta(string Stream, ulong Position, DateTime Timestamp);
diff --git a/src/SignalR/src/Eventuous.SignalR.Client/TypedStreamSubscription.cs b/src/SignalR/src/Eventuous.SignalR.Client/TypedStreamSubscription.cs
new file mode 100644
index 00000000..e745ce16
--- /dev/null
+++ b/src/SignalR/src/Eventuous.SignalR.Client/TypedStreamSubscription.cs
@@ -0,0 +1,152 @@
+// Copyright (C) Eventuous HQ OÜ. All rights reserved
+// Licensed under the Apache License, Version 2.0.
+
+using System.Diagnostics;
+using System.Text;
+using System.Text.Json;
+using System.Threading.Channels;
+using Eventuous.Diagnostics;
+using Microsoft.AspNetCore.SignalR.Client;
+
+namespace Eventuous.SignalR.Client;
+
+public class TypedStreamSubscription : IAsyncDisposable {
+ readonly SignalRSubscriptionClient _client;
+ readonly string _stream;
+ readonly ulong? _fromPosition;
+ readonly Dictionary> _handlers = new();
+ Action? _errorHandler;
+ CancellationTokenSource? _cts;
+ Task? _consumeTask;
+ bool _started;
+
+ internal TypedStreamSubscription(SignalRSubscriptionClient client, string stream, ulong? fromPosition) {
+ _client = client;
+ _stream = stream;
+ _fromPosition = fromPosition;
+ }
+
+ public TypedStreamSubscription On(Func handler) where T : class {
+ if (_started) throw new InvalidOperationException("Cannot register handlers after StartAsync has been called.");
+
+ var eventType = TypeMap.Instance.GetTypeName();
+ _handlers[eventType] = (obj, meta) => handler((T)obj, meta);
+ return this;
+ }
+
+ public TypedStreamSubscription OnError(Action handler) {
+ _errorHandler = handler;
+ return this;
+ }
+
+ public async Task StartAsync(CancellationToken ct = default) {
+ if (_started) throw new InvalidOperationException("StartAsync has already been called.");
+ _started = true;
+
+ _cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
+
+ var channel = Channel.CreateUnbounded(
+ new UnboundedChannelOptions { SingleReader = true, SingleWriter = true }
+ );
+ _client.RegisterSubscription(_stream, new SignalRSubscriptionClient.SubscriptionState(channel, _fromPosition));
+
+ await _client.Connection.InvokeAsync(
+ SignalRSubscriptionMethods.Subscribe,
+ _stream,
+ _fromPosition,
+ _cts.Token
+ ).NoContext();
+
+ _consumeTask = ConsumeLoop(channel.Reader, _cts.Token);
+ }
+
+ async Task ConsumeLoop(ChannelReader reader, CancellationToken ct) {
+ var serializer = _client.GetSerializer();
+ var enableTracing = _client.TracingEnabled;
+
+ try {
+ await foreach (var envelope in reader.ReadAllAsync(ct).NoContext(ct)) {
+ if (!_handlers.TryGetValue(envelope.EventType, out var handler)) continue;
+
+ var payload = Encoding.UTF8.GetBytes(envelope.JsonPayload);
+ var result = serializer.DeserializeEvent(payload, envelope.EventType, "application/json");
+
+ if (result is not DeserializationResult.SuccessfullyDeserialized deserialized) continue;
+
+ var meta = new StreamMeta(envelope.Stream, envelope.StreamPosition, envelope.Timestamp);
+ Activity? activity = null;
+
+ try {
+ if (enableTracing && envelope.JsonMetadata != null) {
+ activity = StartTraceActivity(envelope.JsonMetadata);
+ }
+
+ await handler(deserialized.Payload, meta).NoContext();
+ } finally {
+ activity?.Dispose();
+ }
+ }
+ } catch (OperationCanceledException) {
+ // Expected on dispose/cancellation
+ } catch (ChannelClosedException ex) {
+ // Channel completed with error (server subscription failure or connection closed)
+ _errorHandler?.Invoke(new StreamSubscriptionError {
+ Stream = _stream,
+ Message = ex.InnerException?.Message ?? "Subscription channel closed"
+ });
+ } catch (Exception ex) {
+ _errorHandler?.Invoke(new StreamSubscriptionError {
+ Stream = _stream,
+ Message = ex.ToString()
+ });
+ }
+ }
+
+ static Activity? StartTraceActivity(string jsonMetadata) {
+ try {
+ var metaDict = JsonSerializer.Deserialize>(jsonMetadata);
+ if (metaDict == null) return null;
+
+ var metadata = new Metadata(metaDict);
+ var tracingMeta = metadata.GetTracingMeta();
+ var parentContext = tracingMeta.ToActivityContext(isRemote: true);
+
+ if (parentContext == null) return null;
+
+ return EventuousDiagnostics.ActivitySource.StartActivity(
+ "signalr.consume",
+ ActivityKind.Consumer,
+ parentContext.Value
+ );
+ } catch (Exception) {
+ // Tracing is best-effort; malformed metadata must not break event consumption
+ return null;
+ }
+ }
+
+ public async ValueTask DisposeAsync() {
+ if (_cts != null) {
+ await _cts.CancelAsync().NoContext();
+
+ if (_consumeTask != null) {
+ try { await _consumeTask.NoContext(); } catch (OperationCanceledException) { /* expected */ }
+ }
+
+ _cts.Dispose();
+ }
+
+ if (_started) {
+ _client.RemoveSubscription(_stream);
+ try {
+ await _client.Connection.InvokeAsync(
+ SignalRSubscriptionMethods.Unsubscribe,
+ _stream
+ ).NoContext();
+ } catch (OperationCanceledException) {
+ // Expected during shutdown
+ } catch (ObjectDisposedException) {
+ // Connection already torn down
+ }
+ }
+ }
+}
diff --git a/src/SignalR/src/Eventuous.SignalR.Server/Contracts/SignalRSubscriptionMethods.cs b/src/SignalR/src/Eventuous.SignalR.Server/Contracts/SignalRSubscriptionMethods.cs
new file mode 100644
index 00000000..7677d9d9
--- /dev/null
+++ b/src/SignalR/src/Eventuous.SignalR.Server/Contracts/SignalRSubscriptionMethods.cs
@@ -0,0 +1,11 @@
+// Copyright (C) Eventuous HQ OÜ. All rights reserved
+// Licensed under the Apache License, Version 2.0.
+
+namespace Eventuous.SignalR;
+
+public static class SignalRSubscriptionMethods {
+ public const string Subscribe = "SubscribeToStream";
+ public const string Unsubscribe = "UnsubscribeFromStream";
+ public const string StreamEvent = "StreamEvent";
+ public const string StreamError = "StreamError";
+}
diff --git a/src/SignalR/src/Eventuous.SignalR.Server/Contracts/StreamEventEnvelope.cs b/src/SignalR/src/Eventuous.SignalR.Server/Contracts/StreamEventEnvelope.cs
new file mode 100644
index 00000000..cb5d98bb
--- /dev/null
+++ b/src/SignalR/src/Eventuous.SignalR.Server/Contracts/StreamEventEnvelope.cs
@@ -0,0 +1,15 @@
+// Copyright (C) Eventuous HQ OÜ. All rights reserved
+// Licensed under the Apache License, Version 2.0.
+
+namespace Eventuous.SignalR;
+
+public record StreamEventEnvelope {
+ public required Guid EventId { get; init; }
+ public required string Stream { get; init; }
+ public required string EventType { get; init; }
+ public required ulong StreamPosition { get; init; }
+ public required ulong GlobalPosition { get; init; }
+ public required DateTime Timestamp { get; init; }
+ public required string JsonPayload { get; init; }
+ public string? JsonMetadata { get; init; }
+}
diff --git a/src/SignalR/src/Eventuous.SignalR.Server/Contracts/StreamSubscriptionError.cs b/src/SignalR/src/Eventuous.SignalR.Server/Contracts/StreamSubscriptionError.cs
new file mode 100644
index 00000000..d600b306
--- /dev/null
+++ b/src/SignalR/src/Eventuous.SignalR.Server/Contracts/StreamSubscriptionError.cs
@@ -0,0 +1,9 @@
+// Copyright (C) Eventuous HQ OÜ. All rights reserved
+// Licensed under the Apache License, Version 2.0.
+
+namespace Eventuous.SignalR;
+
+public record StreamSubscriptionError {
+ public required string Stream { get; init; }
+ public required string Message { get; init; }
+}
diff --git a/src/SignalR/src/Eventuous.SignalR.Server/Eventuous.SignalR.Server.csproj b/src/SignalR/src/Eventuous.SignalR.Server/Eventuous.SignalR.Server.csproj
new file mode 100644
index 00000000..175c5f4e
--- /dev/null
+++ b/src/SignalR/src/Eventuous.SignalR.Server/Eventuous.SignalR.Server.csproj
@@ -0,0 +1,21 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Tools\TaskExtensions.cs
+
+
+
+
diff --git a/src/SignalR/src/Eventuous.SignalR.Server/Registrations/SignalRGatewayRegistrations.cs b/src/SignalR/src/Eventuous.SignalR.Server/Registrations/SignalRGatewayRegistrations.cs
new file mode 100644
index 00000000..8f218eb9
--- /dev/null
+++ b/src/SignalR/src/Eventuous.SignalR.Server/Registrations/SignalRGatewayRegistrations.cs
@@ -0,0 +1,23 @@
+// Copyright (C) Eventuous HQ OÜ. All rights reserved
+// Licensed under the Apache License, Version 2.0.
+
+using Eventuous.SignalR.Server;
+using Microsoft.AspNetCore.SignalR;
+
+namespace Microsoft.Extensions.DependencyInjection;
+
+public static class SignalRGatewayRegistrations {
+ public static IServiceCollection AddSignalRSubscriptionGateway(
+ this IServiceCollection services,
+ Action configure
+ ) where THub : Hub {
+ services.AddSingleton(sp => {
+ var options = new SignalRGatewayOptions { SubscriptionFactory = null! };
+ configure(sp, options);
+ return options;
+ });
+ services.AddSingleton>();
+ services.AddSingleton>();
+ return services;
+ }
+}
diff --git a/src/SignalR/src/Eventuous.SignalR.Server/SignalRGatewayOptions.cs b/src/SignalR/src/Eventuous.SignalR.Server/SignalRGatewayOptions.cs
new file mode 100644
index 00000000..c925fa4f
--- /dev/null
+++ b/src/SignalR/src/Eventuous.SignalR.Server/SignalRGatewayOptions.cs
@@ -0,0 +1,14 @@
+// Copyright (C) Eventuous HQ OÜ. All rights reserved
+// Licensed under the Apache License, Version 2.0.
+
+using Eventuous.Subscriptions.Filters;
+
+namespace Eventuous.SignalR.Server;
+
+public delegate IMessageSubscription SubscriptionFactory(
+ StreamName stream, ulong? fromPosition, ConsumePipe pipe, string subscriptionId
+);
+
+public class SignalRGatewayOptions {
+ public required SubscriptionFactory SubscriptionFactory { get; set; }
+}
diff --git a/src/SignalR/src/Eventuous.SignalR.Server/SignalRProduceOptions.cs b/src/SignalR/src/Eventuous.SignalR.Server/SignalRProduceOptions.cs
new file mode 100644
index 00000000..ae6cea2b
--- /dev/null
+++ b/src/SignalR/src/Eventuous.SignalR.Server/SignalRProduceOptions.cs
@@ -0,0 +1,6 @@
+// Copyright (C) Eventuous HQ OÜ. All rights reserved
+// Licensed under the Apache License, Version 2.0.
+
+namespace Eventuous.SignalR.Server;
+
+public record SignalRProduceOptions(string ConnectionId);
diff --git a/src/SignalR/src/Eventuous.SignalR.Server/SignalRProducer.cs b/src/SignalR/src/Eventuous.SignalR.Server/SignalRProducer.cs
new file mode 100644
index 00000000..bb137612
--- /dev/null
+++ b/src/SignalR/src/Eventuous.SignalR.Server/SignalRProducer.cs
@@ -0,0 +1,32 @@
+// Copyright (C) Eventuous HQ OÜ. All rights reserved
+// Licensed under the Apache License, Version 2.0.
+
+using Eventuous.Producers.Diagnostics;
+using Microsoft.AspNetCore.SignalR;
+
+namespace Eventuous.SignalR.Server;
+
+public class SignalRProducer(IHubContext hubContext)
+ : BaseProducer(new ProducerTracingOptions { MessagingSystem = "signalr" })
+ where THub : Hub {
+
+ [RequiresDynamicCode("Only works with AOT when using DefaultStaticEventSerializer")]
+ [RequiresUnreferencedCode("Only works with AOT when using DefaultStaticEventSerializer")]
+ protected override async Task ProduceMessages(
+ StreamName stream,
+ IEnumerable messages,
+ SignalRProduceOptions? options,
+ CancellationToken cancellationToken = default
+ ) {
+ ArgumentNullException.ThrowIfNull(options);
+ var client = hubContext.Clients.Client(options.ConnectionId);
+
+ foreach (var msg in messages) {
+ await client.SendAsync(
+ SignalRSubscriptionMethods.StreamEvent,
+ msg.Message,
+ cancellationToken
+ ).NoContext();
+ }
+ }
+}
diff --git a/src/SignalR/src/Eventuous.SignalR.Server/SignalRSubscriptionHub.cs b/src/SignalR/src/Eventuous.SignalR.Server/SignalRSubscriptionHub.cs
new file mode 100644
index 00000000..7ae225ea
--- /dev/null
+++ b/src/SignalR/src/Eventuous.SignalR.Server/SignalRSubscriptionHub.cs
@@ -0,0 +1,17 @@
+// Copyright (C) Eventuous HQ OÜ. All rights reserved
+// Licensed under the Apache License, Version 2.0.
+
+using Microsoft.AspNetCore.SignalR;
+
+namespace Eventuous.SignalR.Server;
+
+public class SignalRSubscriptionHub(SubscriptionGateway gateway) : Hub {
+ public Task SubscribeToStream(string stream, ulong? fromPosition)
+ => gateway.SubscribeAsync(Context.ConnectionId, stream, fromPosition, Context.ConnectionAborted);
+
+ public Task UnsubscribeFromStream(string stream)
+ => gateway.UnsubscribeAsync(Context.ConnectionId, stream);
+
+ public override Task OnDisconnectedAsync(Exception? exception)
+ => gateway.RemoveConnectionAsync(Context.ConnectionId);
+}
diff --git a/src/SignalR/src/Eventuous.SignalR.Server/SignalRTransform.cs b/src/SignalR/src/Eventuous.SignalR.Server/SignalRTransform.cs
new file mode 100644
index 00000000..256a535b
--- /dev/null
+++ b/src/SignalR/src/Eventuous.SignalR.Server/SignalRTransform.cs
@@ -0,0 +1,33 @@
+// Copyright (C) Eventuous HQ OÜ. All rights reserved
+// Licensed under the Apache License, Version 2.0.
+
+using System.Text;
+using System.Text.Json;
+using Eventuous.Subscriptions.Context;
+
+namespace Eventuous.SignalR.Server;
+
+public static class SignalRTransform {
+ public static RouteAndTransform Create(
+ string connectionId, string stream, IEventSerializer serializer
+ ) => ctx => {
+ var result = serializer.SerializeEvent(ctx.Message!);
+ var envelope = new StreamEventEnvelope {
+ EventId = Guid.TryParse(ctx.MessageId, out var id) ? id : Guid.NewGuid(),
+ Stream = stream,
+ EventType = ctx.MessageType,
+ StreamPosition = ctx.StreamPosition,
+ GlobalPosition = ctx.GlobalPosition,
+ Timestamp = ctx.Created,
+ JsonPayload = Encoding.UTF8.GetString(result.Payload),
+ JsonMetadata = ctx.Metadata is { Count: > 0 }
+ ? JsonSerializer.Serialize(ctx.Metadata.ToDictionary(kv => kv.Key, kv => kv.Value))
+ : null
+ };
+ return ValueTask.FromResult(new[] {
+ new GatewayMessage(
+ new StreamName(stream), envelope, ctx.Metadata, new SignalRProduceOptions(connectionId)
+ )
+ });
+ };
+}
diff --git a/src/SignalR/src/Eventuous.SignalR.Server/SubscriptionGateway.cs b/src/SignalR/src/Eventuous.SignalR.Server/SubscriptionGateway.cs
new file mode 100644
index 00000000..691ba559
--- /dev/null
+++ b/src/SignalR/src/Eventuous.SignalR.Server/SubscriptionGateway.cs
@@ -0,0 +1,118 @@
+// Copyright (C) Eventuous HQ OÜ. All rights reserved
+// Licensed under the Apache License, Version 2.0.
+
+using System.Collections.Concurrent;
+using Eventuous.Subscriptions.Filters;
+using Microsoft.AspNetCore.SignalR;
+using Microsoft.Extensions.Logging;
+
+namespace Eventuous.SignalR.Server;
+
+public class SubscriptionGateway : IAsyncDisposable where THub : Hub {
+ readonly IHubContext _hubContext;
+ readonly SignalRProducer _producer;
+ readonly SubscriptionFactory _subscriptionFactory;
+ readonly IEventSerializer _eventSerializer;
+ readonly ILogger _logger;
+ readonly ConcurrentDictionary<(string ConnectionId, string Stream), SubscriptionState> _subscriptions = new();
+
+ public SubscriptionGateway(
+ IHubContext hubContext,
+ SignalRProducer producer,
+ SignalRGatewayOptions options,
+ ILoggerFactory loggerFactory,
+ IEventSerializer? eventSerializer = null
+ ) {
+ _hubContext = hubContext;
+ _producer = producer;
+ _subscriptionFactory = options.SubscriptionFactory;
+ _eventSerializer = eventSerializer ?? DefaultEventSerializer.Instance;
+ _logger = loggerFactory.CreateLogger>();
+ }
+
+ public async Task SubscribeAsync(string connectionId, string stream, ulong? fromPosition, CancellationToken ct = default) {
+ var key = (connectionId, stream);
+
+ // Remove existing subscription for same key
+ if (_subscriptions.TryRemove(key, out var existing)) {
+ await StopSubscription(existing).NoContext();
+ }
+
+ var transform = SignalRTransform.Create(connectionId, stream, _eventSerializer);
+ var handler = GatewayHandlerFactory.Create(_producer, transform, awaitProduce: true);
+ var pipe = new ConsumePipe();
+ pipe.AddDefaultConsumer(handler);
+
+ var subscriptionId = $"signalr-{connectionId}-{stream}";
+ var subscription = _subscriptionFactory(new StreamName(stream), fromPosition, pipe, subscriptionId);
+
+ var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);
+ var state = new SubscriptionState(subscription, pipe, cts);
+
+ _subscriptions[key] = state;
+
+ // Start subscription in background
+ _ = Task.Run(async () => {
+ try {
+ await subscription.Subscribe(
+ _ => _logger.LogDebug("Subscribed {SubscriptionId}", subscriptionId),
+ (id, reason, ex) => _logger.LogWarning(ex, "Subscription {SubscriptionId} dropped: {Reason}", id, reason),
+ cts.Token
+ ).NoContext();
+ } catch (OperationCanceledException) {
+ // Expected on unsubscribe
+ } catch (Exception ex) {
+ _logger.LogError(ex, "Subscription {SubscriptionId} failed", subscriptionId);
+ _subscriptions.TryRemove(key, out _);
+
+ // Notify the client about the failure
+ try {
+ var client = _hubContext.Clients.Client(connectionId);
+ await client.SendAsync(
+ SignalRSubscriptionMethods.StreamError,
+ new StreamSubscriptionError { Stream = stream, Message = ex.Message },
+ CancellationToken.None
+ ).NoContext();
+ } catch (Exception notifyEx) {
+ _logger.LogDebug(notifyEx, "Failed to notify client {ConnectionId} about subscription error", connectionId);
+ }
+ }
+ }, cts.Token);
+ }
+
+ public async Task UnsubscribeAsync(string connectionId, string stream) {
+ if (_subscriptions.TryRemove((connectionId, stream), out var state)) {
+ await StopSubscription(state).NoContext();
+ }
+ }
+
+ public async Task RemoveConnectionAsync(string connectionId) {
+ foreach (var key in _subscriptions.Keys.Where(k => k.ConnectionId == connectionId).ToList()) {
+ if (_subscriptions.TryRemove(key, out var state)) {
+ await StopSubscription(state).NoContext();
+ }
+ }
+ }
+
+ async Task StopSubscription(SubscriptionState state) {
+ await state.Cts.CancelAsync();
+ try {
+ await state.Subscription.Unsubscribe(_ => { }, CancellationToken.None).NoContext();
+ } catch (OperationCanceledException) {
+ // Expected during cancellation
+ } catch (Exception ex) {
+ _logger.LogWarning(ex, "Error during subscription unsubscribe cleanup");
+ }
+ await state.Pipe.DisposeAsync().NoContext();
+ state.Cts.Dispose();
+ }
+
+ public async ValueTask DisposeAsync() {
+ foreach (var (_, state) in _subscriptions) {
+ await StopSubscription(state).NoContext();
+ }
+ _subscriptions.Clear();
+ }
+
+ record SubscriptionState(IMessageSubscription Subscription, ConsumePipe Pipe, CancellationTokenSource Cts);
+}
diff --git a/src/SignalR/test/Eventuous.Tests.SignalR.Integration/Eventuous.Tests.SignalR.Integration.csproj b/src/SignalR/test/Eventuous.Tests.SignalR.Integration/Eventuous.Tests.SignalR.Integration.csproj
new file mode 100644
index 00000000..6cd00723
--- /dev/null
+++ b/src/SignalR/test/Eventuous.Tests.SignalR.Integration/Eventuous.Tests.SignalR.Integration.csproj
@@ -0,0 +1,24 @@
+
+
+ true
+ Exe
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/SignalR/test/Eventuous.Tests.SignalR.Integration/SignalREndToEndTests.cs b/src/SignalR/test/Eventuous.Tests.SignalR.Integration/SignalREndToEndTests.cs
new file mode 100644
index 00000000..7ed8ab46
--- /dev/null
+++ b/src/SignalR/test/Eventuous.Tests.SignalR.Integration/SignalREndToEndTests.cs
@@ -0,0 +1,223 @@
+extern alias SignalRClient;
+
+using System.Runtime.InteropServices;
+using Eventuous.KurrentDB;
+using Eventuous.SignalR.Server;
+using Eventuous.Subscriptions.Filters;
+using KurrentDB.Client;
+using Microsoft.AspNetCore.Builder;
+using Microsoft.AspNetCore.Hosting;
+using Microsoft.AspNetCore.SignalR.Client;
+using Microsoft.AspNetCore.TestHost;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+using Testcontainers.KurrentDb;
+using ClientTypes = SignalRClient::Eventuous.SignalR.Client;
+
+namespace Eventuous.Tests.SignalR.Integration;
+
+using KurrentStreamSubscription = Eventuous.KurrentDB.Subscriptions.StreamSubscription;
+using KurrentStreamSubscriptionOptions = Eventuous.KurrentDB.Subscriptions.StreamSubscriptionOptions;
+
+[EventType("TestOrderPlaced")]
+record TestOrderPlaced(string OrderId, decimal Amount);
+
+[EventType("TestOrderShipped")]
+record TestOrderShipped(string OrderId, string TrackingNumber);
+
+public class SignalREndToEndTests : IAsyncDisposable {
+ KurrentDbContainer _container = null!;
+ WebApplication _app = null!;
+ TestServer _server = null!;
+ IEventStore _eventStore = null!;
+
+ [Before(Test)]
+ public async Task Setup() {
+ TypeMap.Instance.RegisterKnownEventTypes(typeof(TestOrderPlaced).Assembly);
+
+ var image = RuntimeInformation.ProcessArchitecture == Architecture.Arm64
+ ? "kurrentplatform/kurrentdb:25.1.3-experimental-arm64-8.0-jammy"
+ : "kurrentplatform/kurrentdb:25.1.3";
+
+ _container = new KurrentDbBuilder()
+ .WithImage(image)
+ .WithEnvironment("KURRENTDB_ENABLE_ATOM_PUB_OVER_HTTP", "true")
+ .Build();
+
+ await _container.StartAsync();
+
+ var builder = WebApplication.CreateBuilder();
+ builder.WebHost.UseTestServer();
+ builder.Logging.SetMinimumLevel(LogLevel.Debug);
+
+ builder.Services.AddSignalR();
+ builder.Services.AddKurrentDBClient(_container.GetConnectionString());
+ builder.Services.AddEventStore();
+
+ builder.Services.AddSignalRSubscriptionGateway((sp, options) => {
+ var client = sp.GetRequiredService();
+ var loggerFactory = sp.GetRequiredService();
+
+ options.SubscriptionFactory = (stream, fromPosition, pipe, subscriptionId) =>
+ new KurrentStreamSubscription(
+ client,
+ new KurrentStreamSubscriptionOptions {
+ StreamName = stream,
+ SubscriptionId = subscriptionId
+ },
+ new NoOpCheckpointStore(fromPosition),
+ pipe,
+ loggerFactory
+ );
+ });
+
+ _app = builder.Build();
+ _app.MapHub("/subscriptions");
+
+ await _app.StartAsync();
+
+ _server = _app.GetTestServer();
+ _eventStore = _app.Services.GetRequiredService();
+ }
+
+ HubConnection CreateHubConnection() =>
+ new HubConnectionBuilder()
+ .WithUrl(
+ "http://localhost/subscriptions",
+ opts => opts.HttpMessageHandlerFactory = _ => _server.CreateHandler()
+ )
+ .Build();
+
+ async Task AppendEvents(string stream, params object[] events) {
+ var streamEvents = events
+ .Select(e => new NewStreamEvent(Guid.NewGuid(), e, new Metadata()))
+ .ToArray();
+
+ await _eventStore.AppendEvents(
+ new StreamName(stream),
+ ExpectedStreamVersion.Any,
+ streamEvents,
+ default
+ );
+ }
+
+ [Test]
+ public async Task RawStreaming_ReceivesAppendedEvents() {
+ var stream = $"Order-{Guid.NewGuid():N}";
+
+ await AppendEvents(stream,
+ new TestOrderPlaced("order-1", 99.99m),
+ new TestOrderShipped("order-1", "TRACK-123")
+ );
+
+ var connection = CreateHubConnection();
+ await connection.StartAsync();
+ var client = new ClientTypes.SignalRSubscriptionClient(connection);
+
+ var received = new List();
+ var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
+
+ await foreach (var envelope in client.SubscribeAsync(stream, null, cts.Token)) {
+ received.Add(envelope);
+ if (received.Count >= 2) break;
+ }
+
+ await Assert.That(received).HasCount().EqualTo(2);
+ await Assert.That(received[0].EventType).IsEqualTo("TestOrderPlaced");
+ await Assert.That(received[1].EventType).IsEqualTo("TestOrderShipped");
+ await Assert.That(received[0].StreamPosition).IsEqualTo(0UL);
+ await Assert.That(received[1].StreamPosition).IsEqualTo(1UL);
+ await Assert.That(received[0].JsonPayload).Contains("order-1");
+
+ await client.DisposeAsync();
+ await connection.DisposeAsync();
+ }
+
+ [Test]
+ public async Task TypedSubscription_DispatchesToCorrectHandlers() {
+ var stream = $"Order-{Guid.NewGuid():N}";
+
+ await AppendEvents(stream,
+ new TestOrderPlaced("order-2", 149.99m),
+ new TestOrderShipped("order-2", "TRACK-456")
+ );
+
+ var connection = CreateHubConnection();
+ await connection.StartAsync();
+ var client = new ClientTypes.SignalRSubscriptionClient(connection);
+
+ var placedEvents = new List();
+ var shippedEvents = new List();
+ var done = new TaskCompletionSource();
+ var count = 0;
+
+ var sub = client.SubscribeTyped(stream, null)
+ .On((evt, meta) => {
+ placedEvents.Add(evt);
+ if (Interlocked.Increment(ref count) >= 2) done.TrySetResult();
+ return ValueTask.CompletedTask;
+ })
+ .On((evt, meta) => {
+ shippedEvents.Add(evt);
+ if (Interlocked.Increment(ref count) >= 2) done.TrySetResult();
+ return ValueTask.CompletedTask;
+ });
+
+ await sub.StartAsync();
+
+ var completed = await Task.WhenAny(done.Task, Task.Delay(TimeSpan.FromSeconds(10)));
+ await Assert.That(completed).IsEqualTo(done.Task);
+
+ await Assert.That(placedEvents).HasCount().EqualTo(1);
+ await Assert.That(placedEvents[0].OrderId).IsEqualTo("order-2");
+ await Assert.That(placedEvents[0].Amount).IsEqualTo(149.99m);
+
+ await Assert.That(shippedEvents).HasCount().EqualTo(1);
+ await Assert.That(shippedEvents[0].TrackingNumber).IsEqualTo("TRACK-456");
+
+ await sub.DisposeAsync();
+ await client.DisposeAsync();
+ await connection.DisposeAsync();
+ }
+
+ [Test]
+ public async Task LiveEvents_DeliveredAfterSubscribe() {
+ var stream = $"Order-{Guid.NewGuid():N}";
+
+ var connection = CreateHubConnection();
+ await connection.StartAsync();
+ var client = new ClientTypes.SignalRSubscriptionClient(connection);
+
+ var received = new List();
+ var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
+
+ var consumeTask = Task.Run(async () => {
+ await foreach (var envelope in client.SubscribeAsync(stream, null, cts.Token)) {
+ received.Add(envelope);
+ if (received.Count >= 2) break;
+ }
+ }, cts.Token);
+
+ // Give the subscription time to start on the server
+ await Task.Delay(1000);
+
+ await AppendEvents(stream,
+ new TestOrderPlaced("order-3", 200m),
+ new TestOrderShipped("order-3", "TRACK-789")
+ );
+
+ await consumeTask.WaitAsync(TimeSpan.FromSeconds(10));
+
+ await Assert.That(received).HasCount().EqualTo(2);
+ await Assert.That(received[0].EventType).IsEqualTo("TestOrderPlaced");
+ await Assert.That(received[1].EventType).IsEqualTo("TestOrderShipped");
+
+ await client.DisposeAsync();
+ await connection.DisposeAsync();
+ }
+
+ public async ValueTask DisposeAsync() {
+ await _app.DisposeAsync();
+ await _container.DisposeAsync();
+ }
+}
diff --git a/src/SignalR/test/Eventuous.Tests.SignalR/Eventuous.Tests.SignalR.csproj b/src/SignalR/test/Eventuous.Tests.SignalR/Eventuous.Tests.SignalR.csproj
new file mode 100644
index 00000000..b67c99fc
--- /dev/null
+++ b/src/SignalR/test/Eventuous.Tests.SignalR/Eventuous.Tests.SignalR.csproj
@@ -0,0 +1,19 @@
+
+
+ Exe
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/SignalR/test/Eventuous.Tests.SignalR/SignalRProducerTests.cs b/src/SignalR/test/Eventuous.Tests.SignalR/SignalRProducerTests.cs
new file mode 100644
index 00000000..f66fe6d1
--- /dev/null
+++ b/src/SignalR/test/Eventuous.Tests.SignalR/SignalRProducerTests.cs
@@ -0,0 +1,45 @@
+// Copyright (C) Eventuous HQ OÜ. All rights reserved
+// Licensed under the Apache License, Version 2.0.
+
+using Eventuous.SignalR;
+using Eventuous.SignalR.Server;
+using Microsoft.AspNetCore.SignalR;
+using NSubstitute;
+
+namespace Eventuous.Tests.SignalR;
+
+public class SignalRProducerTests {
+ [Test]
+ public async Task ProduceMessages_SendsEnvelopeToCorrectConnection() {
+ var hubContext = Substitute.For>();
+ var hubClients = Substitute.For();
+ var clientProxy = Substitute.For();
+ hubContext.Clients.Returns(hubClients);
+ hubClients.Client("conn-1").Returns(clientProxy);
+
+ var producer = new SignalRProducer(hubContext);
+ var envelope = new Eventuous.SignalR.StreamEventEnvelope {
+ EventId = Guid.NewGuid(),
+ Stream = "Test-1",
+ EventType = "TestEvent",
+ StreamPosition = 0,
+ GlobalPosition = 0,
+ Timestamp = DateTime.UtcNow,
+ JsonPayload = "{}"
+ };
+
+ await producer.Produce(
+ new StreamName("Test-1"),
+ [new ProducedMessage(envelope, new Metadata())],
+ new SignalRProduceOptions("conn-1")
+ );
+
+ await clientProxy.Received(1).SendCoreAsync(
+ Eventuous.SignalR.SignalRSubscriptionMethods.StreamEvent,
+ Arg.Is