diff --git a/docs/plans/2026-03-09-multi-stream-produce-design.md b/docs/plans/2026-03-09-multi-stream-produce-design.md new file mode 100644 index 000000000..a280cd97e --- /dev/null +++ b/docs/plans/2026-03-09-multi-stream-produce-design.md @@ -0,0 +1,64 @@ +# Multi-Stream Produce Design + +## Goal + +Add a multi-stream `Produce` overload to `IProducer` and `IProducer` for batch efficiency and API simplification. Update Gateway to use the new overload. + +## New Types + +Located in `src/Core/src/Eventuous.Producers/`. + +```csharp +// Non-generic, for IProducer +[StructLayout(LayoutKind.Auto)] +public record struct ProduceRequest(StreamName Stream, IEnumerable Messages); + +// Generic, for IProducer +[StructLayout(LayoutKind.Auto)] +public record struct ProduceRequest( + StreamName Stream, + IEnumerable Messages, + TProduceOptions? Options +) where TProduceOptions : class; +``` + +## Interface Changes + +### IProducer + +New default interface method with parallel execution: + +```csharp +Task Produce(IReadOnlyCollection requests, CancellationToken ct = default) { + return Task.WhenAll(requests.Select(r => Produce(r.Stream, r.Messages, ct))); +} +``` + +### IProducer + +New default interface method with parallel execution: + +```csharp +Task Produce(IReadOnlyCollection> requests, CancellationToken ct = default) { + return Task.WhenAll(requests.Select(r => Produce(r.Stream, r.Messages, r.Options, ct))); +} +``` + +## BaseProducer Changes + +Override multi-stream `Produce` to add tracing at batch level, then delegate to a new virtual `ProduceMessages` overload. Implementations can override for optimized multi-stream behavior; default calls single-stream `ProduceMessages` in parallel. + +## Gateway Update + +`GatewayHandler` replaces `GroupBy` + parallel per-stream `Produce` with a single call to multi-stream `Produce`, constructing `ProduceRequest` from the transformed messages. + +## What Doesn't Change + +- Individual producer implementations (Kafka, RabbitMQ, PubSub, Service Bus) inherit the default parallel behavior. They can override later for optimization. +- Existing single-stream `Produce` signatures remain untouched. +- Ack/nack semantics unchanged — `ProducedMessage` already carries callbacks. + +## Testing + +- Unit tests for default parallel behavior +- Gateway integration test verifying multi-stream produce works end-to-end diff --git a/docs/plans/2026-03-09-multi-stream-produce.md b/docs/plans/2026-03-09-multi-stream-produce.md new file mode 100644 index 000000000..9ab696032 --- /dev/null +++ b/docs/plans/2026-03-09-multi-stream-produce.md @@ -0,0 +1,339 @@ +# Multi-Stream Produce Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** Add multi-stream `Produce` overloads to `IProducer` / `IProducer` for batch efficiency, and update Gateway to use them. + +**Architecture:** New `ProduceRequest` / `ProduceRequest` record structs as input types. Default interface methods on `IProducer` / `IProducer` with parallel execution. `BaseProducer` overrides with tracing. Gateway simplified to single multi-stream call. + +**Tech Stack:** C# preview, .NET 10/9/8, TUnit test framework, OpenTelemetry tracing + +--- + +### Task 1: Add ProduceRequest Types + +**Files:** +- Create: `src/Core/src/Eventuous.Producers/ProduceRequest.cs` + +**Step 1: Create the ProduceRequest types file** + +```csharp +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using System.Runtime.InteropServices; + +namespace Eventuous.Producers; + +[StructLayout(LayoutKind.Auto)] +public record struct ProduceRequest(StreamName Stream, IEnumerable Messages); + +[StructLayout(LayoutKind.Auto)] +public record struct ProduceRequest(StreamName Stream, IEnumerable Messages, TProduceOptions? Options) + where TProduceOptions : class; +``` + +**Step 2: Verify it builds** + +Run: `dotnet build src/Core/src/Eventuous.Producers/Eventuous.Producers.csproj` +Expected: Build succeeded + +**Step 3: Commit** + +``` +feat: add ProduceRequest types for multi-stream produce +``` + +--- + +### Task 2: Add Multi-Stream Overloads to IProducer Interfaces + +**Files:** +- Modify: `src/Core/src/Eventuous.Producers/IProducer.cs` + +**Step 1: Add default interface method to IProducer** + +After the existing `Produce` method (line 18), add: + +```csharp +/// +/// Produce messages to multiple streams in parallel. +/// +/// Collection of produce requests, one per target stream +/// +/// +[RequiresDynamicCode(AttrConstants.DynamicSerializationMessage)] +[RequiresUnreferencedCode(AttrConstants.DynamicSerializationMessage)] +Task Produce(IReadOnlyCollection requests, CancellationToken cancellationToken = default) + => Task.WhenAll(requests.Select(r => Produce(r.Stream, r.Messages, cancellationToken))); +``` + +**Step 2: Add default interface method to IProducer** + +After the existing `Produce` method (line 33), add: + +```csharp +/// +/// Produce messages to multiple streams in parallel. +/// +/// Collection of produce requests with options, one per target stream +/// +/// +[RequiresDynamicCode(AttrConstants.DynamicSerializationMessage)] +[RequiresUnreferencedCode(AttrConstants.DynamicSerializationMessage)] +Task Produce(IReadOnlyCollection> requests, CancellationToken cancellationToken = default) + => Task.WhenAll(requests.Select(r => Produce(r.Stream, r.Messages, r.Options, cancellationToken))); +``` + +**Step 3: Verify it builds** + +Run: `dotnet build src/Core/src/Eventuous.Producers/Eventuous.Producers.csproj` +Expected: Build succeeded + +**Step 4: Commit** + +``` +feat: add multi-stream Produce overloads to IProducer interfaces +``` + +--- + +### Task 3: Add Multi-Stream Produce to BaseProducer with Tracing + +**Files:** +- Modify: `src/Core/src/Eventuous.Producers/BaseProducer.cs` + +**Step 1: Add multi-stream Produce override with batch tracing** + +After the existing single-stream `Produce` method (ends at line 56), add: + +```csharp +/// +[RequiresDynamicCode(AttrConstants.DynamicSerializationMessage)] +[RequiresUnreferencedCode(AttrConstants.DynamicSerializationMessage)] +public Task Produce(IReadOnlyCollection> requests, CancellationToken cancellationToken = default) { + if (requests.Count == 0) return Task.CompletedTask; + + return Task.WhenAll(requests.Select(r => Produce(r.Stream, r.Messages, r.Options, cancellationToken))); +} +``` + +Note: Each individual `Produce(stream, messages, options, ct)` call already creates its own tracing activity via the existing single-stream `Produce` method on `BaseProducer`. So multi-stream just fans out to the already-traced single-stream calls. + +Also add the non-generic overload: + +```csharp +[RequiresDynamicCode(AttrConstants.DynamicSerializationMessage)] +[RequiresUnreferencedCode(AttrConstants.DynamicSerializationMessage)] +public Task Produce(IReadOnlyCollection requests, CancellationToken cancellationToken = default) { + if (requests.Count == 0) return Task.CompletedTask; + + return Task.WhenAll(requests.Select(r => Produce(r.Stream, r.Messages, cancellationToken))); +} +``` + +**Step 2: Build the solution** + +Run: `dotnet build Eventuous.slnx` +Expected: Build succeeded. This validates all producer implementations (Kafka, RabbitMQ, PubSub, Service Bus) still compile — they inherit from `BaseProducer` and don't need changes. + +**Step 3: Commit** + +``` +feat: add multi-stream Produce to BaseProducer with empty-check +``` + +--- + +### Task 4: Update GatewayProducer to Delegate Multi-Stream + +**Files:** +- Modify: `src/Gateway/src/Eventuous.Gateway/GatewayProducer.cs` + +**Step 1: Add multi-stream Produce delegation** + +Add two new methods to `GatewayProducer` that delegate to the inner producer after waiting for readiness: + +```csharp +public async Task Produce(IReadOnlyCollection> requests, CancellationToken cancellationToken = default) { + if (_isHostedService) { await WaitForInner(inner, cancellationToken).NoContext(); } + + await inner.Produce(requests, cancellationToken).NoContext(); +} + +public async Task Produce(IReadOnlyCollection requests, CancellationToken cancellationToken = default) { + if (_isHostedService) { await WaitForInner(inner, cancellationToken).NoContext(); } + + await ((IProducer)inner).Produce(requests, cancellationToken).NoContext(); +} +``` + +**Step 2: Verify it builds** + +Run: `dotnet build src/Gateway/src/Eventuous.Gateway/Eventuous.Gateway.csproj` +Expected: Build succeeded + +**Step 3: Commit** + +``` +feat: add multi-stream Produce delegation to GatewayProducer +``` + +--- + +### Task 5: Update GatewayHandler to Use Multi-Stream Produce + +**Files:** +- Modify: `src/Gateway/src/Eventuous.Gateway/GatewayHandler.cs` + +**Step 1: Replace GroupBy + parallel Produce with single multi-stream call** + +Replace the `HandleEvent` try block (lines 37-41) with: + +```csharp +try { + var contextMeta = GatewayMetaHelper.GetContextMeta(context); + + var requests = shovelMessages + .GroupBy(x => x.TargetStream) + .Select(g => new ProduceRequest( + g.Key, + g.Select(x => new ProducedMessage(x.Message, x.GetMeta(context), contextMeta) { OnAck = onAck, OnNack = onFail }), + g.First().ProduceOptions + )) + .ToArray(); + + await producer.Produce(requests, context.CancellationToken).NoContext(); +} catch (OperationCanceledException e) { context.Nack>(e); } +``` + +This eliminates the `ProduceToStream` local function entirely. The `ProduceRequest` constructor bundles stream + messages + options, and the multi-stream `Produce` handles parallel execution. + +Note on `ProduceOptions`: Currently the old code calls `Produce` per message (each with its own `ProduceOptions`). Grouping by stream means we pick `g.First().ProduceOptions` for the group. This is consistent with the existing behavior — messages to the same stream should have the same options. If different options per message within a stream are needed, that's a separate concern handled at the single-stream level. + +**Step 2: Verify it builds** + +Run: `dotnet build src/Gateway/src/Eventuous.Gateway/Eventuous.Gateway.csproj` +Expected: Build succeeded + +**Step 3: Commit** + +``` +refactor: simplify GatewayHandler using multi-stream Produce +``` + +--- + +### Task 6: Write Tests for Multi-Stream Produce + +**Files:** +- Create: `src/Gateway/test/Eventuous.Tests.Gateway/MultiStreamProduceTests.cs` + +**Step 1: Write the test class** + +Create a test file that validates multi-stream produce behavior using a test producer (similar pattern to `RegistrationTests.cs`): + +```csharp +using Eventuous.Producers; + +namespace Eventuous.Tests.Gateway; + +public class MultiStreamProduceTests { + [Test] + public async Task ShouldProduceToMultipleStreams() { + var producer = new TestProducer(); + var stream1 = new StreamName("stream-1"); + var stream2 = new StreamName("stream-2"); + var msg1 = new ProducedMessage("event-1", null); + var msg2 = new ProducedMessage("event-2", null); + + var requests = new ProduceRequest[] { + new(stream1, [msg1], null), + new(stream2, [msg2], null) + }; + + await producer.Produce(requests); + + await Assert.That(producer.ProducedMessages).HasCount().EqualTo(2); + await Assert.That(producer.Streams).Contains(stream1); + await Assert.That(producer.Streams).Contains(stream2); + } + + [Test] + public async Task ShouldHandleEmptyRequests() { + var producer = new TestProducer(); + + await producer.Produce(Array.Empty>()); + + await Assert.That(producer.ProducedMessages).HasCount().EqualTo(0); + } + + [Test] + public async Task ShouldProduceMultipleMessagesToSameStream() { + var producer = new TestProducer(); + var stream = new StreamName("stream-1"); + var msg1 = new ProducedMessage("event-1", null); + var msg2 = new ProducedMessage("event-2", null); + + var requests = new ProduceRequest[] { + new(stream, [msg1, msg2], null) + }; + + await producer.Produce(requests); + + await Assert.That(producer.ProducedMessages).HasCount().EqualTo(2); + await Assert.That(producer.Streams.Distinct()).HasCount().EqualTo(1); + } + + class TestProducer : BaseProducer { + public List ProducedMessages { get; } = []; + public List Streams { get; } = []; + + protected override Task ProduceMessages( + StreamName stream, + IEnumerable messages, + TestProduceOptions? options, + CancellationToken cancellationToken = default + ) { + Streams.Add(stream); + ProducedMessages.AddRange(messages); + + return Task.CompletedTask; + } + } + + record TestProduceOptions; +} +``` + +**Step 2: Run the tests** + +Run: `dotnet test --project src/Gateway/test/Eventuous.Tests.Gateway/Eventuous.Tests.Gateway.csproj -f net10.0` +Expected: All tests pass + +**Step 3: Commit** + +``` +test: add multi-stream produce tests +``` + +--- + +### Task 7: Build and Run Full Test Suite + +**Step 1: Build entire solution** + +Run: `dotnet build Eventuous.slnx` +Expected: Build succeeded, 0 errors + +**Step 2: Run Gateway tests** + +Run: `dotnet test --project src/Gateway/test/Eventuous.Tests.Gateway/Eventuous.Tests.Gateway.csproj -f net10.0` +Expected: All tests pass + +**Step 3: Run core tests to verify no regressions** + +Run: `dotnet test --project src/Core/test/Eventuous.Tests/Eventuous.Tests.csproj -f net10.0` +Expected: All tests pass + +**Step 4: Final commit if any cleanup needed, then push** diff --git a/src/Azure/test/Eventuous.Tests.Azure.ServiceBus/AzureServiceBusFixture.cs b/src/Azure/test/Eventuous.Tests.Azure.ServiceBus/AzureServiceBusFixture.cs index cc5d90c79..7c6e04598 100644 --- a/src/Azure/test/Eventuous.Tests.Azure.ServiceBus/AzureServiceBusFixture.cs +++ b/src/Azure/test/Eventuous.Tests.Azure.ServiceBus/AzureServiceBusFixture.cs @@ -17,7 +17,8 @@ public class AzureServiceBusFixture : IAsyncInitializer, IAsyncDisposable { .Build(); public async Task InitializeAsync() { - await Container.StartAsync(); + using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(5)); + await Container.StartAsync(cts.Token); ConnectionString = Container.GetConnectionString(); diff --git a/src/Core/src/Eventuous.Producers/BaseProducer.cs b/src/Core/src/Eventuous.Producers/BaseProducer.cs index 146b55f71..81d0b880f 100644 --- a/src/Core/src/Eventuous.Producers/BaseProducer.cs +++ b/src/Core/src/Eventuous.Producers/BaseProducer.cs @@ -54,4 +54,22 @@ public async Task Produce(StreamName stream, IEnumerable messag return (act, [producedMessage]); } } -} \ No newline at end of file + + /// + [RequiresDynamicCode(AttrConstants.DynamicSerializationMessage)] + [RequiresUnreferencedCode(AttrConstants.DynamicSerializationMessage)] + public Task Produce(IReadOnlyCollection> requests, CancellationToken cancellationToken = default) { + if (requests.Count == 0) return Task.CompletedTask; + + return Task.WhenAll(requests.Select(r => Produce(r.Stream, r.Messages, r.Options, cancellationToken))); + } + + /// + [RequiresDynamicCode(AttrConstants.DynamicSerializationMessage)] + [RequiresUnreferencedCode(AttrConstants.DynamicSerializationMessage)] + public Task Produce(IReadOnlyCollection requests, CancellationToken cancellationToken = default) { + if (requests.Count == 0) return Task.CompletedTask; + + return Task.WhenAll(requests.Select(r => Produce(r.Stream, r.Messages, cancellationToken))); + } +} diff --git a/src/Core/src/Eventuous.Producers/IProducer.cs b/src/Core/src/Eventuous.Producers/IProducer.cs index a74366777..8e44d8511 100644 --- a/src/Core/src/Eventuous.Producers/IProducer.cs +++ b/src/Core/src/Eventuous.Producers/IProducer.cs @@ -16,6 +16,17 @@ public interface IProducer { [RequiresDynamicCode(AttrConstants.DynamicSerializationMessage)] [RequiresUnreferencedCode(AttrConstants.DynamicSerializationMessage)] Task Produce(StreamName stream, IEnumerable messages, CancellationToken cancellationToken = default); + + /// + /// Produce messages to multiple streams in parallel. + /// + /// Collection of produce requests, one per target stream + /// + /// + [RequiresDynamicCode(AttrConstants.DynamicSerializationMessage)] + [RequiresUnreferencedCode(AttrConstants.DynamicSerializationMessage)] + Task Produce(IReadOnlyCollection requests, CancellationToken cancellationToken = default) + => Task.WhenAll(requests.Select(r => Produce(r.Stream, r.Messages, cancellationToken))); } [PublicAPI] diff --git a/src/Core/src/Eventuous.Producers/ProduceRequest.cs b/src/Core/src/Eventuous.Producers/ProduceRequest.cs new file mode 100644 index 000000000..a57d38148 --- /dev/null +++ b/src/Core/src/Eventuous.Producers/ProduceRequest.cs @@ -0,0 +1,13 @@ +// Copyright (C) Eventuous HQ OÜ. All rights reserved +// Licensed under the Apache License, Version 2.0. + +using System.Runtime.InteropServices; + +namespace Eventuous.Producers; + +[StructLayout(LayoutKind.Auto)] +public record struct ProduceRequest(StreamName Stream, IEnumerable Messages); + +[StructLayout(LayoutKind.Auto)] +public record struct ProduceRequest(StreamName Stream, IEnumerable Messages, TProduceOptions? Options) + where TProduceOptions : class; diff --git a/src/Gateway/src/Eventuous.Gateway/GatewayHandler.cs b/src/Gateway/src/Eventuous.Gateway/GatewayHandler.cs index d3e2fa13e..4d53e505b 100644 --- a/src/Gateway/src/Eventuous.Gateway/GatewayHandler.cs +++ b/src/Gateway/src/Eventuous.Gateway/GatewayHandler.cs @@ -35,27 +35,24 @@ public override async ValueTask HandleEvent(IMessageConsume } try { - var grouped = shovelMessages.GroupBy(x => x.TargetStream); + var contextMeta = GatewayMetaHelper.GetContextMeta(context); - await grouped.Select(x => ProduceToStream(x.Key, x)).WhenAll().NoContext(); + var requests = shovelMessages + .GroupBy(x => (x.TargetStream, x.ProduceOptions)) + .Select(g => new ProduceRequest( + g.Key.TargetStream, + g.Select(x => new ProducedMessage(x.Message, x.GetMeta(context), contextMeta) { OnAck = onAck, OnNack = onFail }), + g.Key.ProduceOptions + )) + .ToArray(); + + if (producer is GatewayProducer gp) + await gp.Produce(requests, context.CancellationToken).NoContext(); + else + await Task.WhenAll(requests.Select(r => producer.Produce(r.Stream, r.Messages, r.Options, context.CancellationToken))).NoContext(); } catch (OperationCanceledException e) { context.Nack>(e); } return awaitProduce ? EventHandlingStatus.Success : EventHandlingStatus.Pending; - - Task ProduceToStream(StreamName streamName, IEnumerable> toProduce) - => toProduce.Select( - x => producer.Produce( - streamName, - x.Message, - x.GetMeta(context), - x.ProduceOptions, - GatewayMetaHelper.GetContextMeta(context), - onAck, - onFail, - context.CancellationToken - ) - ) - .WhenAll(); } } diff --git a/src/Gateway/src/Eventuous.Gateway/GatewayProducer.cs b/src/Gateway/src/Eventuous.Gateway/GatewayProducer.cs index b420fc396..7fd479176 100644 --- a/src/Gateway/src/Eventuous.Gateway/GatewayProducer.cs +++ b/src/Gateway/src/Eventuous.Gateway/GatewayProducer.cs @@ -4,14 +4,28 @@ namespace Eventuous.Gateway; class GatewayProducer(IProducer inner) : IProducer where T : class { - readonly bool _isHostedService = inner is not IHostedProducer; - public async Task Produce(StreamName stream, IEnumerable messages, T? options, CancellationToken cancellationToken = default) { - if (_isHostedService) { await WaitForInner(inner, cancellationToken).NoContext(); } + await WaitForInner(inner, cancellationToken).NoContext(); await inner.Produce(stream, messages, options, cancellationToken).NoContext(); } + public async Task Produce(IReadOnlyCollection> requests, CancellationToken cancellationToken = default) { + await WaitForInner(inner, cancellationToken).NoContext(); + + if (inner is BaseProducer baseProducer) { + await baseProducer.Produce(requests, cancellationToken).NoContext(); + } else { + await Task.WhenAll(requests.Select(r => inner.Produce(r.Stream, r.Messages, r.Options, cancellationToken))).NoContext(); + } + } + + public async Task Produce(IReadOnlyCollection requests, CancellationToken cancellationToken = default) { + await WaitForInner(inner, cancellationToken).NoContext(); + + await ((IProducer)inner).Produce(requests, cancellationToken).NoContext(); + } + static async ValueTask WaitForInner(IProducer inner, CancellationToken cancellationToken) { if (inner is not IHostedProducer hosted) return; diff --git a/src/Gateway/test/Eventuous.Tests.Gateway/MultiStreamProduceTests.cs b/src/Gateway/test/Eventuous.Tests.Gateway/MultiStreamProduceTests.cs new file mode 100644 index 000000000..e30bf9f85 --- /dev/null +++ b/src/Gateway/test/Eventuous.Tests.Gateway/MultiStreamProduceTests.cs @@ -0,0 +1,90 @@ +using Eventuous.Producers; + +namespace Eventuous.Tests.Gateway; + +public class MultiStreamProduceTests { + [Test] + public async Task ShouldProduceToMultipleStreams() { + var producer = new TestProducer(); + var stream1 = new StreamName("stream-1"); + var stream2 = new StreamName("stream-2"); + var msg1 = new ProducedMessage("event-1", null); + var msg2 = new ProducedMessage("event-2", null); + + var requests = new ProduceRequest[] { + new(stream1, [msg1], null), + new(stream2, [msg2], null) + }; + + await producer.Produce(requests); + + await Assert.That(producer.ProducedMessages).HasCount().EqualTo(2); + await Assert.That(producer.Streams).Contains(stream1); + await Assert.That(producer.Streams).Contains(stream2); + } + + [Test] + public async Task ShouldHandleEmptyRequests() { + var producer = new TestProducer(); + + await producer.Produce(Array.Empty>()); + + await Assert.That(producer.ProducedMessages).HasCount().EqualTo(0); + } + + [Test] + public async Task ShouldProduceMultipleMessagesToSameStream() { + var producer = new TestProducer(); + var stream = new StreamName("stream-1"); + var msg1 = new ProducedMessage("event-1", null); + var msg2 = new ProducedMessage("event-2", null); + + var requests = new ProduceRequest[] { + new(stream, [msg1, msg2], null) + }; + + await producer.Produce(requests); + + await Assert.That(producer.ProducedMessages).HasCount().EqualTo(2); + await Assert.That(producer.Streams.Distinct().Count()).IsEqualTo(1); + } + + [Test] + public async Task ShouldProduceUntypedRequests() { + var producer = new TestProducer(); + var stream1 = new StreamName("stream-1"); + var stream2 = new StreamName("stream-2"); + var msg1 = new ProducedMessage("event-1", null); + var msg2 = new ProducedMessage("event-2", null); + + var requests = new ProduceRequest[] { + new(stream1, [msg1]), + new(stream2, [msg2]) + }; + + await producer.Produce(requests); + + await Assert.That(producer.ProducedMessages).HasCount().EqualTo(2); + await Assert.That(producer.Streams).Contains(stream1); + await Assert.That(producer.Streams).Contains(stream2); + } + + class TestProducer : BaseProducer { + public List ProducedMessages { get; } = []; + public List Streams { get; } = []; + + protected override Task ProduceMessages( + StreamName stream, + IEnumerable messages, + TestProduceOptions? options, + CancellationToken cancellationToken = default + ) { + Streams.Add(stream); + ProducedMessages.AddRange(messages); + + return Task.CompletedTask; + } + } + + record TestProduceOptions; +}