diff --git a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java index 0b540dabc0..8a902cc74e 100644 --- a/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java +++ b/durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java @@ -329,30 +329,27 @@ public Task callActivity( } // Add router information for cross-app routing - // Router always has a source app ID from EXECUTIONSTARTED event - OrchestratorService.TaskRouter.Builder routerBuilder = OrchestratorService.TaskRouter.newBuilder() - .setSourceAppID(this.appId); - - // Add target app ID if specified in options - if (options != null && options.hasAppID()) { - String targetAppId = options.getAppID(); - OrchestratorService.TaskRouter router = OrchestratorService.TaskRouter.newBuilder() - .setSourceAppID(this.appId) - .setTargetAppID(targetAppId) - .build(); - scheduleTaskBuilder.setRouter(router); - this.logger.fine(() -> String.format( - "cross app routing detected: source=%s, target=%s", - this.appId, targetAppId)); + if (this.appId != null && !this.appId.isEmpty()) { + // Add target app ID if specified in options + if (options != null && options.hasAppID()) { + String targetAppId = options.getAppID(); + OrchestratorService.TaskRouter router = OrchestratorService.TaskRouter.newBuilder() + .setSourceAppID(this.appId) + .setTargetAppID(targetAppId) + .build(); + scheduleTaskBuilder.setRouter(router); + this.logger.fine(() -> String.format( + "cross app routing detected: source=%s, target=%s", + this.appId, targetAppId)); + } } TaskFactory taskFactory = () -> { int id = this.sequenceNumber++; - OrchestratorService.ScheduleTaskAction scheduleTaskAction = scheduleTaskBuilder.build(); OrchestratorService.OrchestratorAction.Builder actionBuilder = OrchestratorService.OrchestratorAction .newBuilder() .setId(id) .setScheduleTask(scheduleTaskBuilder); - if (options != null && options.hasAppID()) { + if (this.appId != null && !this.appId.isEmpty() && options != null && options.hasAppID()) { String targetAppId = options.getAppID(); OrchestratorService.TaskRouter actionRouter = OrchestratorService.TaskRouter.newBuilder() .setSourceAppID(this.appId) @@ -499,13 +496,40 @@ public Task callSubOrchestrator( } createSubOrchestrationActionBuilder.setInstanceId(instanceId); - // TODO: @cicoyle - add suborchestration cross app logic here when its supported + // Add router information for cross-app routing of sub-orchestrations + if (this.appId != null && !this.appId.isEmpty()) { + OrchestratorService.TaskRouter.Builder routerBuilder = OrchestratorService.TaskRouter.newBuilder() + .setSourceAppID(this.appId); + + // Add target app ID if specified in options + if (options != null && options.hasAppID()) { + routerBuilder.setTargetAppID(options.getAppID()); + this.logger.fine(() -> String.format( + "cross app sub-orchestration routing detected: source=%s, target=%s", + this.appId, options.getAppID())); + } + + createSubOrchestrationActionBuilder.setRouter(routerBuilder.build()); + } + TaskFactory taskFactory = () -> { int id = this.sequenceNumber++; - this.pendingActions.put(id, OrchestratorService.OrchestratorAction.newBuilder() + OrchestratorService.OrchestratorAction.Builder actionBuilder = OrchestratorService.OrchestratorAction + .newBuilder() .setId(id) - .setCreateSubOrchestration(createSubOrchestrationActionBuilder) - .build()); + .setCreateSubOrchestration(createSubOrchestrationActionBuilder); + + // Set router on the OrchestratorAction for cross-app routing + if (this.appId != null && !this.appId.isEmpty()) { + OrchestratorService.TaskRouter.Builder actionRouterBuilder = OrchestratorService.TaskRouter.newBuilder() + .setSourceAppID(this.appId); + if (options != null && options.hasAppID()) { + actionRouterBuilder.setTargetAppID(options.getAppID()); + } + actionBuilder.setRouter(actionRouterBuilder.build()); + } + + this.pendingActions.put(id, actionBuilder.build()); if (!this.isReplaying) { this.logger.fine(() -> String.format( @@ -941,11 +965,20 @@ private void completeInternal( } int id = this.sequenceNumber++; - OrchestratorService.OrchestratorAction action = OrchestratorService.OrchestratorAction.newBuilder() + OrchestratorService.OrchestratorAction.Builder actionBuilder = OrchestratorService.OrchestratorAction + .newBuilder() .setId(id) - .setCompleteOrchestration(builder.build()) - .build(); - this.pendingActions.put(id, action); + .setCompleteOrchestration(builder.build()); + + // Add router to completion action for cross-app routing back to parent + if (this.appId != null && !this.appId.isEmpty()) { + actionBuilder.setRouter( + OrchestratorService.TaskRouter.newBuilder() + .setSourceAppID(this.appId) + .build()); + } + + this.pendingActions.put(id, actionBuilder.build()); this.isComplete = true; } @@ -1009,7 +1042,16 @@ private void processEvent(OrchestratorService.HistoryEvent e) { this.setInput(executionStarted.getInput().getValue()); this.setInstanceId(executionStarted.getOrchestrationInstance().getInstanceId()); this.logger.fine(() -> this.instanceId + ": Workflow execution started"); - this.setAppId(e.getRouter().getSourceAppID()); + // For cross-app suborchestrations, if the router has a target, use that as our appID + // since that's where we're actually executing + if (e.hasRouter()) { + OrchestratorService.TaskRouter router = e.getRouter(); + if (router.hasTargetAppID()) { + this.setAppId(router.getTargetAppID()); + } else { + this.setAppId(router.getSourceAppID()); + } + } var versionName = ""; if (!StringUtils.isEmpty(this.orchestratorVersionName)) { diff --git a/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskClientIT.java b/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskClientIT.java index 3863e01d6b..abf146c7c5 100644 --- a/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskClientIT.java +++ b/durabletask-client/src/test/java/io/dapr/durabletask/DurableTaskClientIT.java @@ -592,6 +592,138 @@ void subOrchestration() throws TimeoutException { } } + @Test + void subOrchestrationWithActivity() throws TimeoutException { + final String parentOrchestratorName = "ParentOrchestrator"; + final String childOrchestratorName = "ChildOrchestrator"; + final String activityName = "PlusOne"; + + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(parentOrchestratorName, ctx -> { + int input = ctx.getInput(int.class); + int childResult = ctx.callSubOrchestrator(childOrchestratorName, input, int.class).await(); + ctx.complete(childResult); + }) + .addOrchestrator(childOrchestratorName, ctx -> { + int input = ctx.getInput(int.class); + int result = ctx.callActivity(activityName, input, int.class).await(); + ctx.complete(result); + }) + .addActivity(activityName, ctx -> ctx.getInput(int.class) + 1) + .buildAndStart(); + + DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(parentOrchestratorName, 10); + OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true); + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + assertEquals(11, instance.readOutputAs(int.class)); + } + } + + @Test + void subOrchestrationChain() throws TimeoutException { + final String orchestratorName = "ChainOrchestrator"; + final String leafOrchestratorName = "LeafOrchestrator"; + final String activityName = "Double"; + + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(orchestratorName, ctx -> { + int input = ctx.getInput(int.class); + // Chain: parent calls child which calls leaf + int result = ctx.callSubOrchestrator(leafOrchestratorName, input, int.class).await(); + // Call activity after sub-orchestration completes + result = ctx.callActivity(activityName, result, int.class).await(); + ctx.complete(result); + }) + .addOrchestrator(leafOrchestratorName, ctx -> { + int input = ctx.getInput(int.class); + int result = ctx.callActivity(activityName, input, int.class).await(); + ctx.complete(result); + }) + .addActivity(activityName, ctx -> ctx.getInput(int.class) * 2) + .buildAndStart(); + + DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + try (worker; client) { + // input=3 -> leaf doubles to 6 -> parent doubles to 12 + String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, 3); + OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true); + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + assertEquals(12, instance.readOutputAs(int.class)); + } + } + + @Test + void subOrchestrationFanOut() throws TimeoutException { + final String parentOrchestratorName = "FanOutParent"; + final String childOrchestratorName = "FanOutChild"; + final String activityName = "Square"; + final int childCount = 5; + + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(parentOrchestratorName, ctx -> { + // Fan out: launch multiple sub-orchestrations in parallel + List> tasks = IntStream.range(1, childCount + 1) + .mapToObj(i -> ctx.callSubOrchestrator(childOrchestratorName, i, int.class)) + .collect(Collectors.toList()); + + List results = ctx.allOf(tasks).await(); + int sum = results.stream().mapToInt(Integer::intValue).sum(); + ctx.complete(sum); + }) + .addOrchestrator(childOrchestratorName, ctx -> { + int input = ctx.getInput(int.class); + int result = ctx.callActivity(activityName, input, int.class).await(); + ctx.complete(result); + }) + .addActivity(activityName, ctx -> { + int val = ctx.getInput(int.class); + return val * val; + }) + .buildAndStart(); + + DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(parentOrchestratorName, 0); + OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true); + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + // 1^2 + 2^2 + 3^2 + 4^2 + 5^2 = 1 + 4 + 9 + 16 + 25 = 55 + assertEquals(55, instance.readOutputAs(int.class)); + } + } + + @Test + void subOrchestrationWithInstanceId() throws TimeoutException { + final String parentOrchestratorName = "ParentWithInstanceId"; + final String childOrchestratorName = "ChildWithInstanceId"; + + DurableTaskGrpcWorker worker = this.createWorkerBuilder() + .addOrchestrator(parentOrchestratorName, ctx -> { + String childInstanceId = ctx.getInstanceId() + ":child"; + String result = ctx.callSubOrchestrator( + childOrchestratorName, "hello", childInstanceId, String.class).await(); + ctx.complete(result); + }) + .addOrchestrator(childOrchestratorName, ctx -> { + String input = ctx.getInput(String.class); + ctx.complete(input + " world"); + }) + .buildAndStart(); + + DurableTaskClient client = new DurableTaskGrpcClientBuilder().build(); + try (worker; client) { + String instanceId = client.scheduleNewOrchestrationInstance(parentOrchestratorName, "test"); + OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true); + assertNotNull(instance); + assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus()); + assertEquals("hello world", instance.readOutputAs(String.class)); + } + } + @Test void continueAsNew() throws TimeoutException { final String orchestratorName = "continueAsNew"; diff --git a/durabletask-client/src/test/java/io/dapr/durabletask/SubOrchestrationCrossAppTest.java b/durabletask-client/src/test/java/io/dapr/durabletask/SubOrchestrationCrossAppTest.java new file mode 100644 index 0000000000..6ddb4e3e13 --- /dev/null +++ b/durabletask-client/src/test/java/io/dapr/durabletask/SubOrchestrationCrossAppTest.java @@ -0,0 +1,575 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and +limitations under the License. +*/ + +package io.dapr.durabletask; + +import com.google.protobuf.StringValue; +import com.google.protobuf.Timestamp; +import io.dapr.durabletask.implementation.protobuf.OrchestratorService; +import io.dapr.durabletask.orchestration.TaskOrchestrationFactories; +import io.dapr.durabletask.orchestration.TaskOrchestrationFactory; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.logging.Logger; + +import static org.junit.jupiter.api.Assertions.*; + +/** + * Unit tests for sub-orchestration cross-app routing in TaskOrchestrationExecutor. + */ +class SubOrchestrationCrossAppTest { + + private static final Logger logger = Logger.getLogger(SubOrchestrationCrossAppTest.class.getName()); + private static final Duration MAX_TIMER_INTERVAL = Duration.ofDays(3); + + /** + * Helper to build an OrchestratorStarted history event. + */ + private static OrchestratorService.HistoryEvent orchestratorStarted() { + return OrchestratorService.HistoryEvent.newBuilder() + .setEventId(-1) + .setTimestamp(Timestamp.newBuilder().setSeconds(1000).build()) + .setOrchestratorStarted(OrchestratorService.OrchestratorStartedEvent.newBuilder().build()) + .build(); + } + + /** + * Helper to build an ExecutionStarted history event with a router. + */ + private static OrchestratorService.HistoryEvent executionStarted( + String name, String instanceId, String input, OrchestratorService.TaskRouter router) { + OrchestratorService.ExecutionStartedEvent.Builder esBuilder = OrchestratorService.ExecutionStartedEvent + .newBuilder() + .setName(name) + .setOrchestrationInstance( + OrchestratorService.OrchestrationInstance.newBuilder().setInstanceId(instanceId).build()) + .setInput(StringValue.of(input)); + + OrchestratorService.HistoryEvent.Builder builder = OrchestratorService.HistoryEvent.newBuilder() + .setEventId(-1) + .setTimestamp(Timestamp.newBuilder().setSeconds(1000).build()) + .setExecutionStarted(esBuilder.build()); + + if (router != null) { + builder.setRouter(router); + } + + return builder.build(); + } + + /** + * Helper to build an OrchestratorCompleted history event. + */ + private static OrchestratorService.HistoryEvent orchestratorCompleted() { + return OrchestratorService.HistoryEvent.newBuilder() + .setEventId(-1) + .setTimestamp(Timestamp.newBuilder().setSeconds(1000).build()) + .setOrchestratorCompleted(OrchestratorService.OrchestratorCompletedEvent.newBuilder().build()) + .build(); + } + + /** + * Creates a TaskOrchestrationExecutor with the given orchestrator and app ID. + */ + private TaskOrchestrationExecutor createExecutor(String orchestratorName, TaskOrchestration orchestration, + String appId) { + TaskOrchestrationFactories factories = new TaskOrchestrationFactories(); + factories.addOrchestration(new TaskOrchestrationFactory() { + @Override + public String getName() { + return orchestratorName; + } + + @Override + public TaskOrchestration create() { + return orchestration; + } + + @Override + public String getVersionName() { + return null; + } + + @Override + public Boolean isLatestVersion() { + return false; + } + }); + return new TaskOrchestrationExecutor(factories, new JacksonDataConverter(), MAX_TIMER_INTERVAL, logger, appId); + } + + // ================================================================================== + // Tests for callSubOrchestrator with cross-app routing + // ================================================================================== + + @Test + void callSubOrchestrator_withTargetAppId_setsRouterOnAction() { + final String orchestratorName = "ParentOrchestrator"; + final String subOrchestratorName = "ChildOrchestrator"; + final String sourceAppId = "app1"; + final String targetAppId = "app2"; + + // The orchestrator calls a sub-orchestration with a target app ID + TaskOrchestration orchestration = ctx -> { + TaskOptions options = TaskOptions.withAppID(targetAppId); + ctx.callSubOrchestrator(subOrchestratorName, "input", "child-instance-1", options, String.class); + }; + + TaskOrchestrationExecutor executor = createExecutor(orchestratorName, orchestration, sourceAppId); + + OrchestratorService.TaskRouter router = OrchestratorService.TaskRouter.newBuilder() + .setSourceAppID(sourceAppId) + .build(); + + List newEvents = List.of( + orchestratorStarted(), + executionStarted(orchestratorName, "parent-instance", "\"hello\"", router), + orchestratorCompleted() + ); + + TaskOrchestratorResult result = executor.execute(new ArrayList<>(), newEvents); + + // There should be a CreateSubOrchestration action + List actions = new ArrayList<>(result.getActions()); + assertEquals(1, actions.size()); + + OrchestratorService.OrchestratorAction action = actions.get(0); + assertTrue(action.hasCreateSubOrchestration()); + + // Verify the CreateSubOrchestrationAction has the router + OrchestratorService.CreateSubOrchestrationAction createSub = action.getCreateSubOrchestration(); + assertEquals(subOrchestratorName, createSub.getName()); + assertEquals("child-instance-1", createSub.getInstanceId()); + assertTrue(createSub.hasRouter()); + assertEquals(sourceAppId, createSub.getRouter().getSourceAppID()); + assertTrue(createSub.getRouter().hasTargetAppID()); + assertEquals(targetAppId, createSub.getRouter().getTargetAppID()); + + // Verify the OrchestratorAction also has the router + assertTrue(action.hasRouter()); + assertEquals(sourceAppId, action.getRouter().getSourceAppID()); + assertTrue(action.getRouter().hasTargetAppID()); + assertEquals(targetAppId, action.getRouter().getTargetAppID()); + } + + @Test + void callSubOrchestrator_withoutTargetAppId_setsRouterWithSourceOnly() { + final String orchestratorName = "ParentOrchestrator"; + final String subOrchestratorName = "ChildOrchestrator"; + final String sourceAppId = "app1"; + + // The orchestrator calls a sub-orchestration WITHOUT a target app ID + TaskOrchestration orchestration = ctx -> { + ctx.callSubOrchestrator(subOrchestratorName, "input", "child-instance-1", null, String.class); + }; + + TaskOrchestrationExecutor executor = createExecutor(orchestratorName, orchestration, sourceAppId); + + OrchestratorService.TaskRouter router = OrchestratorService.TaskRouter.newBuilder() + .setSourceAppID(sourceAppId) + .build(); + + List newEvents = List.of( + orchestratorStarted(), + executionStarted(orchestratorName, "parent-instance", "\"hello\"", router), + orchestratorCompleted() + ); + + TaskOrchestratorResult result = executor.execute(new ArrayList<>(), newEvents); + + List actions = new ArrayList<>(result.getActions()); + assertEquals(1, actions.size()); + + OrchestratorService.OrchestratorAction action = actions.get(0); + assertTrue(action.hasCreateSubOrchestration()); + + // Router should have source only, no target + OrchestratorService.CreateSubOrchestrationAction createSub = action.getCreateSubOrchestration(); + assertTrue(createSub.hasRouter()); + assertEquals(sourceAppId, createSub.getRouter().getSourceAppID()); + assertFalse(createSub.getRouter().hasTargetAppID()); + + // OrchestratorAction router should also have source only + assertTrue(action.hasRouter()); + assertEquals(sourceAppId, action.getRouter().getSourceAppID()); + assertFalse(action.getRouter().hasTargetAppID()); + } + + @Test + void callSubOrchestrator_withNullAppId_noRouterSet() { + final String orchestratorName = "ParentOrchestrator"; + final String subOrchestratorName = "ChildOrchestrator"; + + // The orchestrator calls a sub-orchestration with no app routing context + TaskOrchestration orchestration = ctx -> { + ctx.callSubOrchestrator(subOrchestratorName, "input", "child-instance-1", null, String.class); + }; + + // Create executor with null appId (no router context) + TaskOrchestrationExecutor executor = createExecutor(orchestratorName, orchestration, null); + + // ExecutionStarted without a router + List newEvents = List.of( + orchestratorStarted(), + executionStarted(orchestratorName, "parent-instance", "\"hello\"", null), + orchestratorCompleted() + ); + + TaskOrchestratorResult result = executor.execute(new ArrayList<>(), newEvents); + + List actions = new ArrayList<>(result.getActions()); + assertEquals(1, actions.size()); + + OrchestratorService.OrchestratorAction action = actions.get(0); + assertTrue(action.hasCreateSubOrchestration()); + + // No router should be set when appId is null + OrchestratorService.CreateSubOrchestrationAction createSub = action.getCreateSubOrchestration(); + assertFalse(createSub.hasRouter()); + assertFalse(action.hasRouter()); + } + + // ================================================================================== + // Tests for EXECUTIONSTARTED event router processing (appId extraction) + // ================================================================================== + + @Test + void executionStarted_withRouterTargetAppId_usesTargetAsAppId() { + final String orchestratorName = "SubOrchestrator"; + final String sourceAppId = "parent-app"; + final String targetAppId = "child-app"; + + // This orchestrator will call a local sub-orchestrator with no target app; the router source + // on that sub-action should be the target app id we extracted from the event router + final String[] capturedAppId = new String[1]; + TaskOrchestration orchestration = ctx -> { + capturedAppId[0] = ctx.getAppId(); + ctx.complete(null); + }; + + TaskOrchestrationExecutor executor = createExecutor(orchestratorName, orchestration, sourceAppId); + + // Router with BOTH source and target (cross-app suborchestration scenario) + OrchestratorService.TaskRouter router = OrchestratorService.TaskRouter.newBuilder() + .setSourceAppID(sourceAppId) + .setTargetAppID(targetAppId) + .build(); + + List newEvents = List.of( + orchestratorStarted(), + executionStarted(orchestratorName, "sub-instance-1", "\"data\"", router), + orchestratorCompleted() + ); + + executor.execute(new ArrayList<>(), newEvents); + + // The appId should be the target, not the source + assertEquals(targetAppId, capturedAppId[0]); + } + + @Test + void executionStarted_withRouterSourceOnly_usesSourceAsAppId() { + final String orchestratorName = "MyOrchestrator"; + final String sourceAppId = "my-app"; + + final String[] capturedAppId = new String[1]; + TaskOrchestration orchestration = ctx -> { + capturedAppId[0] = ctx.getAppId(); + ctx.complete(null); + }; + + TaskOrchestrationExecutor executor = createExecutor(orchestratorName, orchestration, sourceAppId); + + // Router with source only (normal, single-app scenario) + OrchestratorService.TaskRouter router = OrchestratorService.TaskRouter.newBuilder() + .setSourceAppID(sourceAppId) + .build(); + + List newEvents = List.of( + orchestratorStarted(), + executionStarted(orchestratorName, "instance-1", "\"data\"", router), + orchestratorCompleted() + ); + + executor.execute(new ArrayList<>(), newEvents); + + assertEquals(sourceAppId, capturedAppId[0]); + } + + @Test + void executionStarted_withNoRouter_appIdIsNull() { + final String orchestratorName = "MyOrchestrator"; + + final String[] capturedAppId = new String[]{" sentinel "}; + TaskOrchestration orchestration = ctx -> { + capturedAppId[0] = ctx.getAppId(); + ctx.complete(null); + }; + + // Executor created with null appId + TaskOrchestrationExecutor executor = createExecutor(orchestratorName, orchestration, null); + + // No router on the event + List newEvents = List.of( + orchestratorStarted(), + executionStarted(orchestratorName, "instance-1", "\"data\"", null), + orchestratorCompleted() + ); + + executor.execute(new ArrayList<>(), newEvents); + + // appId should remain null since no router was present + assertNull(capturedAppId[0]); + } + + // ================================================================================== + // Tests for completion action router + // ================================================================================== + + @Test + void completeOrchestration_withAppId_setsRouterOnCompletionAction() { + final String orchestratorName = "MyOrchestrator"; + final String appId = "my-app"; + + // Orchestrator that completes immediately with a result + TaskOrchestration orchestration = ctx -> { + ctx.complete("done"); + }; + + TaskOrchestrationExecutor executor = createExecutor(orchestratorName, orchestration, appId); + + OrchestratorService.TaskRouter router = OrchestratorService.TaskRouter.newBuilder() + .setSourceAppID(appId) + .build(); + + List newEvents = List.of( + orchestratorStarted(), + executionStarted(orchestratorName, "instance-1", "\"input\"", router), + orchestratorCompleted() + ); + + TaskOrchestratorResult result = executor.execute(new ArrayList<>(), newEvents); + + List actions = new ArrayList<>(result.getActions()); + assertEquals(1, actions.size()); + + OrchestratorService.OrchestratorAction action = actions.get(0); + assertTrue(action.hasCompleteOrchestration()); + assertEquals(OrchestratorService.OrchestrationStatus.ORCHESTRATION_STATUS_COMPLETED, + action.getCompleteOrchestration().getOrchestrationStatus()); + + // The completion action should have a router with source appId + assertTrue(action.hasRouter()); + assertEquals(appId, action.getRouter().getSourceAppID()); + assertFalse(action.getRouter().hasTargetAppID()); + } + + @Test + void completeOrchestration_withNullAppId_noRouterOnCompletionAction() { + final String orchestratorName = "MyOrchestrator"; + + TaskOrchestration orchestration = ctx -> { + ctx.complete("done"); + }; + + // Executor with null appId + TaskOrchestrationExecutor executor = createExecutor(orchestratorName, orchestration, null); + + List newEvents = List.of( + orchestratorStarted(), + executionStarted(orchestratorName, "instance-1", "\"input\"", null), + orchestratorCompleted() + ); + + TaskOrchestratorResult result = executor.execute(new ArrayList<>(), newEvents); + + List actions = new ArrayList<>(result.getActions()); + assertEquals(1, actions.size()); + + OrchestratorService.OrchestratorAction action = actions.get(0); + assertTrue(action.hasCompleteOrchestration()); + + // No router should be set + assertFalse(action.hasRouter()); + } + + @Test + void completeOrchestration_crossAppSubOrchestrator_routerHasTargetDerivedAppId() { + final String orchestratorName = "SubOrchestrator"; + final String parentAppId = "parent-app"; + final String targetAppId = "child-app"; + + // Simulates a cross-app sub-orchestrator that receives a router with target + TaskOrchestration orchestration = ctx -> { + ctx.complete("sub-result"); + }; + + TaskOrchestrationExecutor executor = createExecutor(orchestratorName, orchestration, parentAppId); + + // Router has both source and target (cross-app suborchestration) + OrchestratorService.TaskRouter router = OrchestratorService.TaskRouter.newBuilder() + .setSourceAppID(parentAppId) + .setTargetAppID(targetAppId) + .build(); + + List newEvents = List.of( + orchestratorStarted(), + executionStarted(orchestratorName, "sub-instance-1", "\"input\"", router), + orchestratorCompleted() + ); + + TaskOrchestratorResult result = executor.execute(new ArrayList<>(), newEvents); + + List actions = new ArrayList<>(result.getActions()); + assertEquals(1, actions.size()); + + OrchestratorService.OrchestratorAction action = actions.get(0); + assertTrue(action.hasCompleteOrchestration()); + + // The router source should be the target app (since that's where we're executing) + assertTrue(action.hasRouter()); + assertEquals(targetAppId, action.getRouter().getSourceAppID()); + } + + // ================================================================================== + // Tests for combined suborchestration + completion routing + // ================================================================================== + + @Test + void crossAppSubOrchestration_fullFlow_routersCorrectlySet() { + final String orchestratorName = "ParentOrchestrator"; + final String subOrchestratorName = "RemoteChild"; + final String sourceAppId = "app1"; + final String targetAppId = "app2"; + + // Parent orchestrator calls a cross-app sub-orchestration and then completes + TaskOrchestration orchestration = ctx -> { + TaskOptions options = TaskOptions.withAppID(targetAppId); + ctx.callSubOrchestrator(subOrchestratorName, "data", "child-id-1", options, String.class); + // Note: orchestrator will yield here waiting for the sub-orchestration to complete + }; + + TaskOrchestrationExecutor executor = createExecutor(orchestratorName, orchestration, sourceAppId); + + OrchestratorService.TaskRouter router = OrchestratorService.TaskRouter.newBuilder() + .setSourceAppID(sourceAppId) + .build(); + + List newEvents = List.of( + orchestratorStarted(), + executionStarted(orchestratorName, "parent-instance", "\"start\"", router), + orchestratorCompleted() + ); + + TaskOrchestratorResult result = executor.execute(new ArrayList<>(), newEvents); + + List actions = new ArrayList<>(result.getActions()); + // Should have 1 action: CreateSubOrchestration + assertEquals(1, actions.size()); + + OrchestratorService.OrchestratorAction subAction = actions.get(0); + assertTrue(subAction.hasCreateSubOrchestration()); + + OrchestratorService.CreateSubOrchestrationAction createSub = subAction.getCreateSubOrchestration(); + assertEquals(subOrchestratorName, createSub.getName()); + assertEquals("child-id-1", createSub.getInstanceId()); + + // Verify cross-app router on the sub-orchestration action + assertTrue(createSub.hasRouter()); + assertEquals(sourceAppId, createSub.getRouter().getSourceAppID()); + assertEquals(targetAppId, createSub.getRouter().getTargetAppID()); + + // Verify cross-app router on the OrchestratorAction envelope + assertTrue(subAction.hasRouter()); + assertEquals(sourceAppId, subAction.getRouter().getSourceAppID()); + assertEquals(targetAppId, subAction.getRouter().getTargetAppID()); + } + + @Test + void callSubOrchestrator_withEmptyAppId_noRouterSet() { + final String orchestratorName = "ParentOrchestrator"; + final String subOrchestratorName = "ChildOrchestrator"; + + TaskOrchestration orchestration = ctx -> { + ctx.callSubOrchestrator(subOrchestratorName, "input", "child-1", null, String.class); + }; + + // Executor created with empty appId + TaskOrchestrationExecutor executor = createExecutor(orchestratorName, orchestration, ""); + + List newEvents = List.of( + orchestratorStarted(), + executionStarted(orchestratorName, "parent-instance", "\"hello\"", null), + orchestratorCompleted() + ); + + TaskOrchestratorResult result = executor.execute(new ArrayList<>(), newEvents); + + List actions = new ArrayList<>(result.getActions()); + assertEquals(1, actions.size()); + + OrchestratorService.OrchestratorAction action = actions.get(0); + assertTrue(action.hasCreateSubOrchestration()); + + // No router should be set when appId is empty + assertFalse(action.getCreateSubOrchestration().hasRouter()); + assertFalse(action.hasRouter()); + } + + @Test + void callSubOrchestrator_withRetryPolicyAndAppId_setsRouterAndRetries() { + final String orchestratorName = "ParentOrchestrator"; + final String subOrchestratorName = "ChildOrchestrator"; + final String sourceAppId = "app1"; + final String targetAppId = "app2"; + + TaskOrchestration orchestration = ctx -> { + RetryPolicy retryPolicy = new RetryPolicy(3, Duration.ofSeconds(1)); + TaskOptions options = TaskOptions.builder() + .retryPolicy(retryPolicy) + .appID(targetAppId) + .build(); + ctx.callSubOrchestrator(subOrchestratorName, "input", "child-1", options, String.class); + }; + + TaskOrchestrationExecutor executor = createExecutor(orchestratorName, orchestration, sourceAppId); + + OrchestratorService.TaskRouter router = OrchestratorService.TaskRouter.newBuilder() + .setSourceAppID(sourceAppId) + .build(); + + List newEvents = List.of( + orchestratorStarted(), + executionStarted(orchestratorName, "parent-instance", "\"hello\"", router), + orchestratorCompleted() + ); + + TaskOrchestratorResult result = executor.execute(new ArrayList<>(), newEvents); + + // With RetriableTask the first attempt creates the action; we should still see + // the sub-orchestration action with cross-app routing + List actions = new ArrayList<>(result.getActions()); + assertTrue(actions.size() >= 1); + + OrchestratorService.OrchestratorAction action = actions.get(0); + assertTrue(action.hasCreateSubOrchestration()); + + OrchestratorService.CreateSubOrchestrationAction createSub = action.getCreateSubOrchestration(); + assertTrue(createSub.hasRouter()); + assertEquals(sourceAppId, createSub.getRouter().getSourceAppID()); + assertEquals(targetAppId, createSub.getRouter().getTargetAppID()); + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/multiapp/ChildTransformActivity.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/multiapp/ChildTransformActivity.java new file mode 100644 index 0000000000..ea7333f774 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/multiapp/ChildTransformActivity.java @@ -0,0 +1,36 @@ +/* + * Copyright 2026 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dapr.it.testcontainers.workflows.multiapp; + +import io.dapr.workflows.WorkflowActivity; +import io.dapr.workflows.WorkflowActivityContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Activity that runs within the child sub-orchestration on app2. + * Transforms the input by uppercasing it and adding a marker. + */ +public class ChildTransformActivity implements WorkflowActivity { + private static final Logger logger = LoggerFactory.getLogger(ChildTransformActivity.class); + + @Override + public Object run(WorkflowActivityContext ctx) { + String input = ctx.getInput(String.class); + logger.info("ChildTransformActivity input: {}", input); + String output = input.toUpperCase() + " [CHILD TRANSFORMED]"; + logger.info("ChildTransformActivity output: {}", output); + return output; + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/multiapp/SubOrchestrationChildWorker.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/multiapp/SubOrchestrationChildWorker.java new file mode 100644 index 0000000000..9a147073a2 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/multiapp/SubOrchestrationChildWorker.java @@ -0,0 +1,32 @@ +/* + * Copyright 2026 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dapr.it.testcontainers.workflows.multiapp; + +import io.dapr.workflows.runtime.WorkflowRuntime; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; + +/** + * Worker for app2 that registers the child sub-orchestration workflow and its activity. + */ +public class SubOrchestrationChildWorker { + public static void main(String[] args) throws Exception { + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder() + .registerWorkflow(SubOrchestrationChildWorkflow.class) + .registerActivity(ChildTransformActivity.class); + try (WorkflowRuntime runtime = builder.build()) { + System.out.println("SubOrchestrationChildWorker started - registered SubOrchestrationChildWorkflow and ChildTransformActivity"); + runtime.start(); + } + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/multiapp/SubOrchestrationChildWorkflow.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/multiapp/SubOrchestrationChildWorkflow.java new file mode 100644 index 0000000000..9d4f5d18bc --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/multiapp/SubOrchestrationChildWorkflow.java @@ -0,0 +1,43 @@ +/* + * Copyright 2026 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dapr.it.testcontainers.workflows.multiapp; + +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; +import org.slf4j.Logger; + +/** + * Child workflow that runs on app2 as a sub-orchestration. + * It calls a local activity to transform the input and returns the result. + */ +public class SubOrchestrationChildWorkflow implements Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + Logger logger = ctx.getLogger(); + logger.info("Starting SubOrchestrationChildWorkflow: {}", ctx.getName()); + + String input = ctx.getInput(String.class); + logger.info("Child workflow input: {}", input); + + // Call a local activity within app2 + String transformed = ctx.callActivity( + ChildTransformActivity.class.getName(), input, String.class + ).await(); + + logger.info("Child workflow transformed result: {}", transformed); + ctx.complete(transformed); + }; + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/multiapp/SubOrchestrationParentWorker.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/multiapp/SubOrchestrationParentWorker.java new file mode 100644 index 0000000000..6a9b139d6f --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/multiapp/SubOrchestrationParentWorker.java @@ -0,0 +1,31 @@ +/* + * Copyright 2026 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dapr.it.testcontainers.workflows.multiapp; + +import io.dapr.workflows.runtime.WorkflowRuntime; +import io.dapr.workflows.runtime.WorkflowRuntimeBuilder; + +/** + * Worker for the parent app that registers only the parent sub-orchestration workflow. + */ +public class SubOrchestrationParentWorker { + public static void main(String[] args) throws Exception { + WorkflowRuntimeBuilder builder = new WorkflowRuntimeBuilder() + .registerWorkflow(SubOrchestrationParentWorkflow.class); + try (WorkflowRuntime runtime = builder.build()) { + System.out.println("SubOrchestrationParentWorker started - registered SubOrchestrationParentWorkflow only"); + runtime.start(); + } + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/multiapp/SubOrchestrationParentWorkflow.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/multiapp/SubOrchestrationParentWorkflow.java new file mode 100644 index 0000000000..f6a83e222a --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/multiapp/SubOrchestrationParentWorkflow.java @@ -0,0 +1,50 @@ +/* + * Copyright 2026 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dapr.it.testcontainers.workflows.multiapp; + +import io.dapr.workflows.Workflow; +import io.dapr.workflows.WorkflowStub; +import io.dapr.workflows.WorkflowTaskOptions; +import org.slf4j.Logger; + +/** + * Parent workflow that calls a child sub-orchestration on a remote app (app2). + * The child workflow processes the input and returns the result back to the parent. + */ +public class SubOrchestrationParentWorkflow implements Workflow { + @Override + public WorkflowStub create() { + return ctx -> { + Logger logger = ctx.getLogger(); + String instanceId = ctx.getInstanceId(); + logger.info("Starting SubOrchestrationParentWorkflow: {}", ctx.getName()); + logger.info("Instance ID: {}", instanceId); + + String input = ctx.getInput(String.class); + logger.info("Parent workflow input: {}", input); + + // Call SubOrchestrationChildWorkflow on app2 + String childResult = ctx.callChildWorkflow( + SubOrchestrationChildWorkflow.class.getName(), input, null, + new WorkflowTaskOptions("app2"), String.class + ).await(); + + logger.info("Child workflow result: {}", childResult); + + // Parent appends its own marker + String finalResult = childResult + " [PARENT DONE]"; + ctx.complete(finalResult); + }; + } +} diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/multiapp/WorkflowsMultiAppSubOrchestrationIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/multiapp/WorkflowsMultiAppSubOrchestrationIT.java new file mode 100644 index 0000000000..3ca0d3e879 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/workflows/multiapp/WorkflowsMultiAppSubOrchestrationIT.java @@ -0,0 +1,160 @@ +/* + * Copyright 2026 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.dapr.it.testcontainers.workflows.multiapp; + +import io.dapr.it.testcontainers.ContainerConstants; +import io.dapr.testcontainers.Component; +import io.dapr.testcontainers.DaprContainer; +import io.dapr.testcontainers.DaprLogLevel; +import io.dapr.testcontainers.DaprPlacementContainer; +import io.dapr.testcontainers.DaprSchedulerContainer; +import io.dapr.workflows.client.DaprWorkflowClient; +import io.dapr.workflows.client.WorkflowState; +import io.dapr.workflows.client.WorkflowRuntimeStatus; +import io.dapr.config.Properties; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Network; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.MountableFile; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.Wait; + +import java.time.Duration; +import java.util.Map; + +import static io.dapr.it.testcontainers.ContainerConstants.DAPR_RUNTIME_IMAGE_TAG; +import static io.dapr.testcontainers.DaprContainerConstants.DAPR_PLACEMENT_IMAGE_TAG; +import static io.dapr.testcontainers.DaprContainerConstants.DAPR_SCHEDULER_IMAGE_TAG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +/** + * Multi-App Sub-Orchestration integration test. + * + * This test demonstrates cross-app sub-orchestration by: + * 1. Starting 2 Dapr containers (suborchestration-parent, app2) + * 2. Launching Java processes that register workflows in separate apps + * 3. The parent workflow calls a child workflow on app2 via callChildWorkflow + * 4. The child workflow calls a local activity and returns the result + * 5. Asserting successful completion with expected output + */ +@Testcontainers +@Tag("testcontainers") +public class WorkflowsMultiAppSubOrchestrationIT { + + private static final Network DAPR_NETWORK = Network.newNetwork(); + + @Container + private final static DaprPlacementContainer sharedPlacementContainer = new DaprPlacementContainer(DAPR_PLACEMENT_IMAGE_TAG) + .withNetwork(DAPR_NETWORK) + .withNetworkAliases("placement") + .withReuse(false); + + @Container + private final static DaprSchedulerContainer sharedSchedulerContainer = new DaprSchedulerContainer(DAPR_SCHEDULER_IMAGE_TAG) + .withNetwork(DAPR_NETWORK) + .withNetworkAliases("scheduler") + .withReuse(false); + + // Parent workflow orchestrator sidecar + @Container + private final static DaprContainer PARENT_WORKFLOW_SIDECAR = new DaprContainer(DAPR_RUNTIME_IMAGE_TAG) + .withAppName("suborchestration-parent") + .withNetwork(DAPR_NETWORK) + .withNetworkAliases("parent-workflow-sidecar") + .withPlacementContainer(sharedPlacementContainer) + .withSchedulerContainer(sharedSchedulerContainer) + .withComponent(new Component("kvstore", "state.in-memory", "v1", Map.of("actorStateStore", "true"))) + .withDaprLogLevel(DaprLogLevel.DEBUG) + .dependsOn(sharedPlacementContainer, sharedSchedulerContainer) + .withLogConsumer(outputFrame -> System.out.println("PARENT_WORKFLOW: " + outputFrame.getUtf8String())) + .withAppChannelAddress("host.testcontainers.internal"); + + // App2 sidecar for the child sub-orchestration + @Container + private final static DaprContainer APP2_SIDECAR = new DaprContainer(DAPR_RUNTIME_IMAGE_TAG) + .withAppName("app2") + .withNetwork(DAPR_NETWORK) + .withNetworkAliases("app2-sidecar") + .withPlacementContainer(sharedPlacementContainer) + .withSchedulerContainer(sharedSchedulerContainer) + .withAppChannelAddress("parent-workflow-sidecar:3500") + .withDaprLogLevel(DaprLogLevel.DEBUG) + .dependsOn(sharedPlacementContainer, sharedSchedulerContainer, PARENT_WORKFLOW_SIDECAR) + .withComponent(new Component("kvstore", "state.in-memory", "v1", Map.of("actorStateStore", "true"))) + .withLogConsumer(outputFrame -> System.out.println("APP2: " + outputFrame.getUtf8String())); + + // Parent worker container - registers SubOrchestrationParentWorkflow + @Container + private static GenericContainer parentWorker = new GenericContainer<>(ContainerConstants.JDK_17_TEMURIN_JAMMY) + .withCopyFileToContainer(MountableFile.forHostPath("target"), "/app") + .withWorkingDirectory("/app") + .withCommand("java", "-cp", "test-classes:classes:dependency/*:*", + "-Ddapr.app.id=suborchestration-parent", + "-Ddapr.grpc.endpoint=parent-workflow-sidecar:50001", + "-Ddapr.http.endpoint=parent-workflow-sidecar:3500", + "io.dapr.it.testcontainers.workflows.multiapp.SubOrchestrationParentWorker") + .withNetwork(DAPR_NETWORK) + .dependsOn(PARENT_WORKFLOW_SIDECAR) + .waitingFor(Wait.forLogMessage(".*SubOrchestrationParentWorker started.*", 1)) + .withLogConsumer(outputFrame -> System.out.println("ParentWorker: " + outputFrame.getUtf8String())); + + // App2 worker container - registers SubOrchestrationChildWorkflow and ChildTransformActivity + @Container + private final static GenericContainer app2Worker = new GenericContainer<>(ContainerConstants.JDK_17_TEMURIN_JAMMY) + .withCopyFileToContainer(MountableFile.forHostPath("target"), "/app") + .withWorkingDirectory("/app") + .withCommand("java", "-cp", "test-classes:classes:dependency/*:*", + "-Ddapr.app.id=app2", + "-Ddapr.grpc.endpoint=app2-sidecar:50001", + "-Ddapr.http.endpoint=app2-sidecar:3500", + "io.dapr.it.testcontainers.workflows.multiapp.SubOrchestrationChildWorker") + .withNetwork(DAPR_NETWORK) + .dependsOn(APP2_SIDECAR) + .waitingFor(Wait.forLogMessage(".*SubOrchestrationChildWorker started.*", 1)) + .withLogConsumer(outputFrame -> System.out.println("App2Worker: " + outputFrame.getUtf8String())); + + @Test + public void testMultiAppSubOrchestration() throws Exception { + String input = "Hello World"; + // Child transforms: "Hello World" -> "HELLO WORLD [CHILD TRANSFORMED]" + // Parent appends: -> "HELLO WORLD [CHILD TRANSFORMED] [PARENT DONE]" + String expectedOutput = "HELLO WORLD [CHILD TRANSFORMED] [PARENT DONE]"; + + Map propertyOverrides = Map.of( + "dapr.grpc.endpoint", PARENT_WORKFLOW_SIDECAR.getGrpcEndpoint(), + "dapr.http.endpoint", PARENT_WORKFLOW_SIDECAR.getHttpEndpoint() + ); + + Properties clientProperties = new Properties(propertyOverrides); + DaprWorkflowClient workflowClient = new DaprWorkflowClient(clientProperties); + + try { + String instanceId = workflowClient.scheduleNewWorkflow(SubOrchestrationParentWorkflow.class, input); + assertNotNull(instanceId, "Workflow instance ID should not be null"); + workflowClient.waitForWorkflowStart(instanceId, Duration.ofSeconds(30), false); + + WorkflowState workflowStatus = workflowClient.waitForWorkflowCompletion(instanceId, null, true); + assertNotNull(workflowStatus, "Workflow status should not be null"); + assertEquals(WorkflowRuntimeStatus.COMPLETED, workflowStatus.getRuntimeStatus(), + "Workflow should complete successfully"); + String workflowOutput = workflowStatus.readOutputAs(String.class); + assertEquals(expectedOutput, workflowOutput, "Workflow output should match expected result"); + } finally { + workflowClient.close(); + } + } +} diff --git a/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java b/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java index 22e364f244..931a38fce9 100644 --- a/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java +++ b/sdk-workflows/src/test/java/io/dapr/workflows/DefaultWorkflowContextTest.java @@ -36,6 +36,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; @@ -415,6 +416,33 @@ public void callChildWorkflow() { verify(mockInnerContext, times(1)).callSubOrchestrator(expectedName, expectedInput, null, null, String.class); } + @Test + public void callChildWorkflowWithAppId() { + String expectedName = "TestActivity"; + String expectedInput = "TestInput"; + String expectedInstanceId = "TestInstanceId"; + String expectedAppId = "remote-app"; + WorkflowTaskOptions executionOptions = new WorkflowTaskOptions(expectedAppId); + ArgumentCaptor captor = ArgumentCaptor.forClass(TaskOptions.class); + + context.callChildWorkflow(expectedName, expectedInput, expectedInstanceId, executionOptions, String.class); + + verify(mockInnerContext, times(1)) + .callSubOrchestrator( + eq(expectedName), + eq(expectedInput), + eq(expectedInstanceId), + captor.capture(), + eq(String.class) + ); + + TaskOptions taskOptions = captor.getValue(); + + assertEquals(expectedAppId, taskOptions.getAppID()); + assertNull(taskOptions.getRetryPolicy()); + assertNull(taskOptions.getRetryHandler()); + } + @Test public void setCustomStatusWorkflow() { String customStatus = "CustomStatus";