From bfd3fc6f16394d65c50eda4a4195778ac3f274e4 Mon Sep 17 00:00:00 2001 From: Andrew Yuan Date: Fri, 27 Mar 2026 12:38:38 -0700 Subject: [PATCH] Add worker heartbeat support Implements periodic worker heartbeat RPCs that report worker status, slot usage, poller info, and task counters to the server. Key components: - HeartbeatManager: per-namespace scheduler that aggregates heartbeats from all workers sharing that namespace - PollerTracker: tracks in-flight poll count and last successful poll time - WorkflowClientOptions.workerHeartbeatInterval: configurable interval (default 60s, range 1-60s, negative to disable) - TrackingSlotSupplier: extended with slot type reporting - Worker: builds SharedNamespaceWorker heartbeat data from activity, workflow, and nexus worker stats - TestWorkflowService: implements recordWorkerHeartbeat, describeWorker, and shutdownWorker RPCs for testing --- .../client/WorkflowClientInternalImpl.java | 22 + .../client/WorkflowClientOptions.java | 61 +- .../client/WorkflowClientInternal.java | 7 + .../internal/worker/ActivityPollTask.java | 12 +- .../internal/worker/ActivityWorker.java | 40 +- .../worker/AsyncActivityPollTask.java | 22 +- .../internal/worker/AsyncNexusPollTask.java | 21 +- .../worker/AsyncWorkflowPollTask.java | 21 +- .../internal/worker/HeartbeatManager.java | 175 ++++ .../internal/worker/LocalActivityWorker.java | 22 + .../internal/worker/NexusPollTask.java | 12 +- .../temporal/internal/worker/NexusWorker.java | 51 +- .../internal/worker/PollerTracker.java | 46 + .../internal/worker/SyncActivityWorker.java | 21 + .../internal/worker/SyncNexusWorker.java | 21 + .../internal/worker/SyncWorkflowWorker.java | 58 +- .../internal/worker/TrackingSlotSupplier.java | 16 + .../worker/WorkflowExecutorCache.java | 17 + .../internal/worker/WorkflowPollTask.java | 22 +- .../internal/worker/WorkflowWorker.java | 103 +- .../main/java/io/temporal/worker/Worker.java | 241 +++++ .../io/temporal/worker/WorkerFactory.java | 39 + .../internal/worker/HeartbeatManagerTest.java | 233 +++++ .../internal/worker/PollerTrackerTest.java | 89 ++ .../internal/worker/SlotSupplierTest.java | 7 +- .../worker/StickyQueueBacklogTest.java | 4 +- .../worker/TrackingSlotSupplierKindTest.java | 63 ++ .../internal/worker/WorkflowWorkerTest.java | 12 + .../worker/HeartbeatCapturingInterceptor.java | 49 + .../WorkerHeartbeatDeploymentVersionTest.java | 74 ++ .../WorkerHeartbeatIntegrationTest.java | 936 ++++++++++++++++++ .../testservice/TestWorkflowService.java | 47 +- 32 files changed, 2485 insertions(+), 79 deletions(-) create mode 100644 temporal-sdk/src/main/java/io/temporal/internal/worker/HeartbeatManager.java create mode 100644 temporal-sdk/src/main/java/io/temporal/internal/worker/PollerTracker.java create mode 100644 temporal-sdk/src/test/java/io/temporal/internal/worker/HeartbeatManagerTest.java create mode 100644 temporal-sdk/src/test/java/io/temporal/internal/worker/PollerTrackerTest.java create mode 100644 temporal-sdk/src/test/java/io/temporal/internal/worker/TrackingSlotSupplierKindTest.java create mode 100644 temporal-sdk/src/test/java/io/temporal/worker/HeartbeatCapturingInterceptor.java create mode 100644 temporal-sdk/src/test/java/io/temporal/worker/WorkerHeartbeatDeploymentVersionTest.java create mode 100644 temporal-sdk/src/test/java/io/temporal/worker/WorkerHeartbeatIntegrationTest.java diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java index 76c55d3c5b..fa9cc82b6e 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientInternalImpl.java @@ -23,6 +23,7 @@ import io.temporal.internal.client.external.ManualActivityCompletionClientFactory; import io.temporal.internal.common.PluginUtils; import io.temporal.internal.sync.StubMarker; +import io.temporal.internal.worker.HeartbeatManager; import io.temporal.serviceclient.MetricsTag; import io.temporal.serviceclient.WorkflowServiceStubs; import io.temporal.serviceclient.WorkflowServiceStubsPlugin; @@ -53,6 +54,8 @@ final class WorkflowClientInternalImpl implements WorkflowClient, WorkflowClient private final Scope metricsScope; private final WorkflowClientInterceptor[] interceptors; private final WorkerFactoryRegistry workerFactoryRegistry = new WorkerFactoryRegistry(); + private final String workerGroupingKey = java.util.UUID.randomUUID().toString(); + private final @Nullable HeartbeatManager heartbeatManager; /** * Creates client that connects to an instance of the Temporal Service. Cannot be used from within @@ -112,6 +115,14 @@ public static WorkflowClient newInstance( options.getNamespace(), options.getIdentity(), options.getDataConverter()); + + java.time.Duration heartbeatInterval = options.getWorkerHeartbeatInterval(); + if (!heartbeatInterval.isNegative()) { + this.heartbeatManager = + new HeartbeatManager(workflowServiceStubs, options.getIdentity(), heartbeatInterval); + } else { + this.heartbeatManager = null; + } } private WorkflowClientCallsInterceptor initializeClientInvoker() { @@ -790,6 +801,17 @@ public void deregisterWorkerFactory(WorkerFactory workerFactory) { workerFactoryRegistry.deregister(workerFactory); } + @Override + public String getWorkerGroupingKey() { + return workerGroupingKey; + } + + @Override + @Nullable + public HeartbeatManager getHeartbeatManager() { + return heartbeatManager; + } + @Override public NexusStartWorkflowResponse startNexus( NexusStartWorkflowRequest request, Functions.Proc workflow) { diff --git a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientOptions.java b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientOptions.java index ca67852751..e10defba51 100644 --- a/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientOptions.java +++ b/temporal-sdk/src/main/java/io/temporal/client/WorkflowClientOptions.java @@ -1,5 +1,6 @@ package io.temporal.client; +import com.google.common.base.Preconditions; import io.temporal.api.enums.v1.QueryRejectCondition; import io.temporal.common.Experimental; import io.temporal.common.context.ContextPropagator; @@ -7,6 +8,7 @@ import io.temporal.common.converter.GlobalDataConverter; import io.temporal.common.interceptors.WorkflowClientInterceptor; import java.lang.management.ManagementFactory; +import java.time.Duration; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -49,6 +51,7 @@ public static final class Builder { private List contextPropagators; private QueryRejectCondition queryRejectCondition; private WorkflowClientPlugin[] plugins; + private Duration workerHeartbeatInterval; private Builder() {} @@ -64,6 +67,7 @@ private Builder(WorkflowClientOptions options) { contextPropagators = options.contextPropagators; queryRejectCondition = options.queryRejectCondition; plugins = options.plugins; + workerHeartbeatInterval = options.workerHeartbeatInterval; } public Builder setNamespace(String namespace) { @@ -153,6 +157,19 @@ public Builder setPlugins(WorkflowClientPlugin... plugins) { return this; } + /** + * Sets the interval at which workers send heartbeat RPCs to the server. If not set or set to + * zero, defaults to 60 seconds. A negative duration disables heartbeating. Positive values must + * be between 1 and 60 seconds inclusive. + * + * @param workerHeartbeatInterval the heartbeat interval, or a negative duration to disable + */ + @Experimental + public Builder setWorkerHeartbeatInterval(Duration workerHeartbeatInterval) { + this.workerHeartbeatInterval = workerHeartbeatInterval; + return this; + } + public WorkflowClientOptions build() { return new WorkflowClientOptions( namespace, @@ -162,7 +179,8 @@ public WorkflowClientOptions build() { binaryChecksum, contextPropagators, queryRejectCondition, - plugins == null ? EMPTY_PLUGINS : plugins); + plugins == null ? EMPTY_PLUGINS : plugins, + resolveHeartbeatInterval(workerHeartbeatInterval)); } /** @@ -188,7 +206,22 @@ public WorkflowClientOptions validateAndBuildWithDefaults() { queryRejectCondition == null ? QueryRejectCondition.QUERY_REJECT_CONDITION_UNSPECIFIED : queryRejectCondition, - plugins == null ? EMPTY_PLUGINS : plugins); + plugins == null ? EMPTY_PLUGINS : plugins, + resolveHeartbeatInterval(workerHeartbeatInterval)); + } + + private static Duration resolveHeartbeatInterval(Duration raw) { + if (raw == null || raw.isZero()) { + return Duration.ofSeconds(60); + } + if (raw.isNegative()) { + return raw; + } + Preconditions.checkArgument( + raw.compareTo(Duration.ofSeconds(1)) >= 0 && raw.compareTo(Duration.ofSeconds(60)) <= 0, + "workerHeartbeatInterval must be between 1s and 60s, got %s", + raw); + return raw; } } @@ -215,6 +248,8 @@ public WorkflowClientOptions validateAndBuildWithDefaults() { private final WorkflowClientPlugin[] plugins; + private final Duration workerHeartbeatInterval; + private WorkflowClientOptions( String namespace, DataConverter dataConverter, @@ -223,7 +258,8 @@ private WorkflowClientOptions( String binaryChecksum, List contextPropagators, QueryRejectCondition queryRejectCondition, - WorkflowClientPlugin[] plugins) { + WorkflowClientPlugin[] plugins, + Duration workerHeartbeatInterval) { this.namespace = namespace; this.dataConverter = dataConverter; this.interceptors = interceptors; @@ -232,6 +268,7 @@ private WorkflowClientOptions( this.contextPropagators = contextPropagators; this.queryRejectCondition = queryRejectCondition; this.plugins = plugins; + this.workerHeartbeatInterval = workerHeartbeatInterval; } /** @@ -289,6 +326,15 @@ public WorkflowClientPlugin[] getPlugins() { return plugins; } + /** + * Returns the worker heartbeat interval. Defaults to 60 seconds if not configured. A negative + * duration means heartbeating is explicitly disabled. + */ + @Experimental + public Duration getWorkerHeartbeatInterval() { + return workerHeartbeatInterval; + } + @Override public String toString() { return "WorkflowClientOptions{" @@ -311,6 +357,8 @@ public String toString() { + queryRejectCondition + ", plugins=" + Arrays.toString(plugins) + + ", workerHeartbeatInterval=" + + workerHeartbeatInterval + '}'; } @@ -326,7 +374,9 @@ public boolean equals(Object o) { && com.google.common.base.Objects.equal(binaryChecksum, that.binaryChecksum) && com.google.common.base.Objects.equal(contextPropagators, that.contextPropagators) && queryRejectCondition == that.queryRejectCondition - && Arrays.equals(plugins, that.plugins); + && Arrays.equals(plugins, that.plugins) + && com.google.common.base.Objects.equal( + workerHeartbeatInterval, that.workerHeartbeatInterval); } @Override @@ -339,6 +389,7 @@ public int hashCode() { binaryChecksum, contextPropagators, queryRejectCondition, - Arrays.hashCode(plugins)); + Arrays.hashCode(plugins), + workerHeartbeatInterval); } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientInternal.java b/temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientInternal.java index 438b2bac75..fc034a366b 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientInternal.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/client/WorkflowClientInternal.java @@ -1,8 +1,10 @@ package io.temporal.internal.client; import io.temporal.client.WorkflowClient; +import io.temporal.internal.worker.HeartbeatManager; import io.temporal.worker.WorkerFactory; import io.temporal.workflow.Functions; +import javax.annotation.Nullable; /** * From OOP point of view, there is no reason for this interface not to extend {@link @@ -18,4 +20,9 @@ public interface WorkflowClientInternal { void deregisterWorkerFactory(WorkerFactory workerFactory); NexusStartWorkflowResponse startNexus(NexusStartWorkflowRequest request, Functions.Proc workflow); + + String getWorkerGroupingKey(); + + @Nullable + HeartbeatManager getHeartbeatManager(); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java index 7072d163ee..b015039566 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityPollTask.java @@ -17,7 +17,6 @@ import io.temporal.worker.PollerTypeMetricsTag; import io.temporal.worker.tuning.*; import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import javax.annotation.Nonnull; import org.slf4j.Logger; @@ -30,7 +29,7 @@ final class ActivityPollTask implements MultiThreadedPoller.PollTask slotSupplier; private final Scope metricsScope; private final PollActivityTaskQueueRequest pollRequest; - private final AtomicInteger pollGauge = new AtomicInteger(); + private final PollerTracker pollerTracker; @SuppressWarnings("deprecation") public ActivityPollTask( @@ -42,10 +41,12 @@ public ActivityPollTask( double activitiesPerSecond, @Nonnull TrackingSlotSupplier slotSupplier, @Nonnull Scope metricsScope, - @Nonnull Supplier serverCapabilities) { + @Nonnull Supplier serverCapabilities, + @Nonnull PollerTracker pollerTracker) { this.service = Objects.requireNonNull(service); this.slotSupplier = slotSupplier; this.metricsScope = Objects.requireNonNull(metricsScope); + this.pollerTracker = Objects.requireNonNull(pollerTracker); PollActivityTaskQueueRequest.Builder pollRequest = PollActivityTaskQueueRequest.newBuilder() @@ -100,7 +101,7 @@ public ActivityTask poll() { MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.ACTIVITY_TASK) .gauge(MetricsType.NUM_POLLERS) - .update(pollGauge.incrementAndGet()); + .update(pollerTracker.pollStarted()); try { response = @@ -119,6 +120,7 @@ public ActivityTask poll() { ProtobufTimeUtils.toM3Duration( response.getStartedTime(), response.getCurrentAttemptScheduledTime())); isSuccessful = true; + pollerTracker.pollSucceeded(); return new ActivityTask( response, permit, @@ -126,7 +128,7 @@ public ActivityTask poll() { } finally { MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.ACTIVITY_TASK) .gauge(MetricsType.NUM_POLLERS) - .update(pollGauge.decrementAndGet()); + .update(pollerTracker.pollCompleted()); if (!isSuccessful) slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), permit); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java index 3387ee7607..5217e049fe 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/ActivityWorker.java @@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nonnull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +50,9 @@ final class ActivityWorker implements SuspendableWorker { private final GrpcRetryer grpcRetryer; private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions; private final TrackingSlotSupplier slotSupplier; + private final AtomicInteger totalProcessedTasks = new AtomicInteger(); + private final AtomicInteger totalFailedTasks = new AtomicInteger(); + private final PollerTracker pollerTracker; private final AtomicBoolean serverSupportsAutoscaling; public ActivityWorker( @@ -75,6 +79,7 @@ public ActivityWorker( DefaultStubServiceOperationRpcRetryOptions.INSTANCE, null); this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope); + this.pollerTracker = new PollerTracker(); this.serverSupportsAutoscaling = serverSupportsAutoscaling; } @@ -107,7 +112,8 @@ public boolean start() { taskQueueActivitiesPerSecond, this.slotSupplier, workerMetricsScope, - service.getServerCapabilities()), + service.getServerCapabilities(), + pollerTracker), this.pollTaskExecutor, pollerOptions, serverSupportsAutoscaling.get(), @@ -126,7 +132,8 @@ public boolean start() { taskQueueActivitiesPerSecond, this.slotSupplier, workerMetricsScope, - service.getServerCapabilities()), + service.getServerCapabilities(), + pollerTracker), this.pollTaskExecutor, pollerOptions, workerMetricsScope); @@ -216,6 +223,26 @@ private PollerOptions getPollerOptions(SingleWorkerOptions options) { return pollerOptions; } + public TrackingSlotSupplier getSlotSupplier() { + return slotSupplier; + } + + public AtomicInteger getTotalProcessedTasks() { + return totalProcessedTasks; + } + + public AtomicInteger getTotalFailedTasks() { + return totalFailedTasks; + } + + public PollerOptions getPollerOptions() { + return pollerOptions; + } + + public PollerTracker getPollerTracker() { + return pollerTracker; + } + @Override public String toString() { return String.format( @@ -261,6 +288,15 @@ public void handle(ActivityTask task) throws Exception { ActivityTaskHandler.Result result = null; try { result = handleActivity(task, metricsScope); + totalProcessedTasks.incrementAndGet(); + if (result.getTaskFailed() != null + && !io.temporal.internal.common.FailureUtils.isBenignApplicationFailure( + result.getTaskFailed().getFailure())) { + totalFailedTasks.incrementAndGet(); + } + } catch (Exception e) { + totalFailedTasks.incrementAndGet(); + throw e; } finally { MDC.remove(LoggerTag.ACTIVITY_ID); MDC.remove(LoggerTag.ACTIVITY_TYPE); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncActivityPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncActivityPollTask.java index 7f5573e243..60ebcbf654 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncActivityPollTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncActivityPollTask.java @@ -20,8 +20,8 @@ import io.temporal.worker.tuning.ActivitySlotInfo; import io.temporal.worker.tuning.SlotPermit; import io.temporal.worker.tuning.SlotReleaseReason; +import java.util.Objects; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import javax.annotation.Nonnull; import org.slf4j.Logger; @@ -34,8 +34,8 @@ public class AsyncActivityPollTask implements AsyncPoller.PollTaskAsync slotSupplier, @Nonnull Scope metricsScope, - @Nonnull Supplier serverCapabilities) { + @Nonnull Supplier serverCapabilities, + @Nonnull PollerTracker pollerTracker) { this.service = service; this.slotSupplier = slotSupplier; this.metricsScope = metricsScope; + this.pollerTracker = Objects.requireNonNull(pollerTracker); PollActivityTaskQueueRequest.Builder pollRequest = PollActivityTaskQueueRequest.newBuilder() @@ -86,7 +88,7 @@ public CompletableFuture poll(SlotPermit permit) { MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.ACTIVITY_TASK) .gauge(MetricsType.NUM_POLLERS) - .update(pollGauge.incrementAndGet()); + .update(pollerTracker.pollStarted()); CompletableFuture response = null; try { @@ -101,7 +103,7 @@ public CompletableFuture poll(SlotPermit permit) { } catch (Exception e) { MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.ACTIVITY_TASK) .gauge(MetricsType.NUM_POLLERS) - .update(pollGauge.decrementAndGet()); + .update(pollerTracker.pollCompleted()); throw new RuntimeException(e); } @@ -112,6 +114,7 @@ public CompletableFuture poll(SlotPermit permit) { metricsScope.counter(MetricsType.ACTIVITY_POLL_NO_TASK_COUNTER).inc(1); return null; } + pollerTracker.pollSucceeded(); metricsScope .timer(MetricsType.ACTIVITY_SCHEDULE_TO_START_LATENCY) .record( @@ -123,10 +126,11 @@ public CompletableFuture poll(SlotPermit permit) { () -> slotSupplier.releaseSlot(SlotReleaseReason.taskComplete(), permit)); }) .whenComplete( - (r, e) -> - MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.ACTIVITY_TASK) - .gauge(MetricsType.NUM_POLLERS) - .update(pollGauge.decrementAndGet())); + (r, e) -> { + MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.ACTIVITY_TASK) + .gauge(MetricsType.NUM_POLLERS) + .update(pollerTracker.pollCompleted()); + }); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncNexusPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncNexusPollTask.java index 10be3b588b..efc4dc8077 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncNexusPollTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncNexusPollTask.java @@ -20,7 +20,6 @@ import io.temporal.worker.tuning.SlotReleaseReason; import java.util.Objects; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import javax.annotation.Nonnull; import org.slf4j.Logger; @@ -33,8 +32,8 @@ public class AsyncNexusPollTask implements AsyncPoller.PollTaskAsync private final WorkflowServiceStubs service; private final Scope metricsScope; private final PollNexusTaskQueueRequest pollRequest; - private final AtomicInteger pollGauge = new AtomicInteger(); private final Context.CancellableContext grpcContext = Context.ROOT.withCancellation(); + private final PollerTracker pollerTracker; @SuppressWarnings("deprecation") public AsyncNexusPollTask( @@ -45,10 +44,12 @@ public AsyncNexusPollTask( @Nonnull WorkerVersioningOptions versioningOptions, @Nonnull Scope metricsScope, @Nonnull Supplier serverCapabilities, - TrackingSlotSupplier slotSupplier) { + TrackingSlotSupplier slotSupplier, + @Nonnull PollerTracker pollerTracker) { this.service = Objects.requireNonNull(service); this.metricsScope = Objects.requireNonNull(metricsScope); this.slotSupplier = slotSupplier; + this.pollerTracker = Objects.requireNonNull(pollerTracker); PollNexusTaskQueueRequest.Builder pollRequest = PollNexusTaskQueueRequest.newBuilder() @@ -79,7 +80,7 @@ public CompletableFuture poll(SlotPermit permit) { MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.NEXUS_TASK) .gauge(MetricsType.NUM_POLLERS) - .update(pollGauge.incrementAndGet()); + .update(pollerTracker.pollStarted()); CompletableFuture response = null; try { @@ -94,7 +95,7 @@ public CompletableFuture poll(SlotPermit permit) { } catch (Exception e) { MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.NEXUS_TASK) .gauge(MetricsType.NUM_POLLERS) - .update(pollGauge.decrementAndGet()); + .update(pollerTracker.pollCompleted()); throw new RuntimeException(e); } @@ -105,6 +106,7 @@ public CompletableFuture poll(SlotPermit permit) { metricsScope.counter(MetricsType.NEXUS_POLL_NO_TASK_COUNTER).inc(1); return null; } + pollerTracker.pollSucceeded(); Timestamp startedTime = ProtobufTimeUtils.getCurrentProtoTime(); metricsScope .timer(MetricsType.NEXUS_SCHEDULE_TO_START_LATENCY) @@ -117,10 +119,11 @@ public CompletableFuture poll(SlotPermit permit) { () -> slotSupplier.releaseSlot(SlotReleaseReason.taskComplete(), permit)); }) .whenComplete( - (r, e) -> - MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.NEXUS_TASK) - .gauge(MetricsType.NUM_POLLERS) - .update(pollGauge.decrementAndGet())); + (r, e) -> { + MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.NEXUS_TASK) + .gauge(MetricsType.NUM_POLLERS) + .update(pollerTracker.pollCompleted()); + }); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncWorkflowPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncWorkflowPollTask.java index 4a7f793cc0..c30dbc9e18 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncWorkflowPollTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/AsyncWorkflowPollTask.java @@ -20,7 +20,6 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -35,11 +34,11 @@ public class AsyncWorkflowPollTask private final Scope metricsScope; private final Scope pollerMetricScope; private final PollWorkflowTaskQueueRequest pollRequest; - private final AtomicInteger pollGauge = new AtomicInteger(); private final MetricsTag.TagValue taskQueueTagValue; private final boolean stickyPoller; private final Context.CancellableContext grpcContext = Context.ROOT.withCancellation(); private final AtomicBoolean shutdown = new AtomicBoolean(false); + private final PollerTracker pollerTracker; @Override public String toString() { @@ -56,10 +55,12 @@ public AsyncWorkflowPollTask( @Nonnull WorkerVersioningOptions versioningOptions, @Nonnull TrackingSlotSupplier slotSupplier, @Nonnull Scope metricsScope, - @Nonnull Supplier serverCapabilities) { + @Nonnull Supplier serverCapabilities, + @Nonnull PollerTracker pollerTracker) { this.service = service; this.slotSupplier = slotSupplier; this.metricsScope = metricsScope; + this.pollerTracker = Objects.requireNonNull(pollerTracker); PollWorkflowTaskQueueRequest.Builder pollRequestBuilder = PollWorkflowTaskQueueRequest.newBuilder() @@ -122,7 +123,7 @@ public CompletableFuture poll(SlotPermit permit) MetricsTag.tagged(metricsScope, taskQueueTagValue) .gauge(MetricsType.NUM_POLLERS) - .update(pollGauge.incrementAndGet()); + .update(pollerTracker.pollStarted()); CompletableFuture response = null; try { @@ -137,7 +138,7 @@ public CompletableFuture poll(SlotPermit permit) } catch (Exception e) { MetricsTag.tagged(metricsScope, taskQueueTagValue) .gauge(MetricsType.NUM_POLLERS) - .update(pollGauge.decrementAndGet()); + .update(pollerTracker.pollCompleted()); throw new RuntimeException(e); } @@ -150,6 +151,7 @@ public CompletableFuture poll(SlotPermit permit) .inc(1); return null; } + pollerTracker.pollSucceeded(); slotSupplier.markSlotUsed(new WorkflowSlotInfo(r, pollRequest), permit); pollerMetricScope .counter(MetricsType.WORKFLOW_TASK_QUEUE_POLL_SUCCEED_COUNTER) @@ -160,10 +162,11 @@ public CompletableFuture poll(SlotPermit permit) return new WorkflowTask(r, (reason) -> slotSupplier.releaseSlot(reason, permit)); }) .whenComplete( - (r, e) -> - MetricsTag.tagged(metricsScope, taskQueueTagValue) - .gauge(MetricsType.NUM_POLLERS) - .update(pollGauge.decrementAndGet())); + (r, e) -> { + MetricsTag.tagged(metricsScope, taskQueueTagValue) + .gauge(MetricsType.NUM_POLLERS) + .update(pollerTracker.pollCompleted()); + }); } @Override diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/HeartbeatManager.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/HeartbeatManager.java new file mode 100644 index 0000000000..140d9e2bc4 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/HeartbeatManager.java @@ -0,0 +1,175 @@ +package io.temporal.internal.worker; + +import com.google.common.base.Preconditions; +import io.temporal.api.worker.v1.WorkerHeartbeat; +import io.temporal.api.workflowservice.v1.RecordWorkerHeartbeatRequest; +import io.temporal.serviceclient.WorkflowServiceStubs; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Manages periodic worker heartbeat RPCs. Routes workers to per-namespace {@link + * SharedNamespaceWorker} instances, each with its own scheduler. + */ +public class HeartbeatManager { + private static final Logger log = LoggerFactory.getLogger(HeartbeatManager.class); + + private final WorkflowServiceStubs service; + private final String identity; + private final Duration interval; + private final Map namespaceWorkers = new HashMap<>(); + + private final Object lock = new Object(); + + public HeartbeatManager(WorkflowServiceStubs service, String identity, Duration interval) { + this.service = service; + this.identity = identity; + this.interval = interval; + } + + /** + * Register a worker's heartbeat callback. Creates a per-namespace SharedNamespaceWorker if this + * is the first worker for the given namespace. + */ + public void registerWorker( + String namespace, String workerInstanceKey, Supplier callback) { + synchronized (lock) { + namespaceWorkers.compute( + namespace, + (ns, existing) -> { + if (existing != null && !existing.isShutdown()) { + existing.registerWorker(workerInstanceKey, callback); + return existing; + } + SharedNamespaceWorker nsWorker = + new SharedNamespaceWorker(service, ns, identity, interval); + nsWorker.registerWorker(workerInstanceKey, callback); + return nsWorker; + }); + } + } + + /** Unregister a worker. Stops the namespace worker if no workers remain for that namespace. */ + public void unregisterWorker(String namespace, String workerInstanceKey) { + synchronized (lock) { + SharedNamespaceWorker nsWorker = namespaceWorkers.get(namespace); + Preconditions.checkState( + nsWorker != null, + "unregisterWorker called for unknown namespace %s, worker %s", + namespace, + workerInstanceKey); + nsWorker.unregisterWorker(workerInstanceKey); + if (nsWorker.isEmpty()) { + nsWorker.shutdown(); + namespaceWorkers.remove(namespace); + } + } + } + + public void shutdown() { + synchronized (lock) { + for (SharedNamespaceWorker nsWorker : namespaceWorkers.values()) { + nsWorker.shutdown(); + } + namespaceWorkers.clear(); + } + } + + /** + * Handles heartbeating for all workers in a specific namespace. Each instance owns its own + * scheduler thread and callback map. + */ + static class SharedNamespaceWorker { + private final WorkflowServiceStubs service; + private final String namespace; + private final String identity; + private final ConcurrentHashMap> callbacks = + new ConcurrentHashMap<>(); + private final ScheduledExecutorService scheduler; + private final AtomicBoolean shuttingDown = new AtomicBoolean(false); + + SharedNamespaceWorker( + WorkflowServiceStubs service, String namespace, String identity, Duration interval) { + this.service = service; + this.namespace = namespace; + this.identity = identity; + this.scheduler = + Executors.newSingleThreadScheduledExecutor( + r -> { + Thread t = new Thread(r, "worker-heartbeat-" + namespace); + t.setDaemon(true); + return t; + }); + scheduler.scheduleAtFixedRate( + this::heartbeatTick, 0, interval.toMillis(), TimeUnit.MILLISECONDS); + } + + void registerWorker(String workerInstanceKey, Supplier callback) { + callbacks.put(workerInstanceKey, callback); + } + + void unregisterWorker(String workerInstanceKey) { + callbacks.remove(workerInstanceKey); + } + + boolean isEmpty() { + return callbacks.isEmpty(); + } + + boolean isShutdown() { + return scheduler.isShutdown(); + } + + void shutdown() { + if (!shuttingDown.compareAndSet(false, true)) return; + scheduler.shutdown(); + try { + scheduler.awaitTermination(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + private void heartbeatTick() { + if (callbacks.isEmpty()) return; + + try { + List heartbeats = new ArrayList<>(); + for (Supplier callback : callbacks.values()) { + heartbeats.add(callback.get()); + } + + if (!heartbeats.isEmpty()) { + service + .blockingStub() + .recordWorkerHeartbeat( + RecordWorkerHeartbeatRequest.newBuilder() + .setNamespace(namespace) + .setIdentity(identity) + .addAllWorkerHeartbeat(heartbeats) + .build()); + } + } catch (io.grpc.StatusRuntimeException e) { + if (e.getStatus().getCode() == io.grpc.Status.Code.UNIMPLEMENTED) { + log.warn( + "Server does not support worker heartbeats for namespace {}, disabling", namespace); + // Only signal shutdown — don't awaitTermination from within the scheduler's own thread + shuttingDown.set(true); + scheduler.shutdown(); + return; + } + log.warn("Failed to send worker heartbeat for namespace {}", namespace, e); + } catch (Exception e) { + log.warn("Failed to send worker heartbeat for namespace {}", namespace, e); + } + } + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java index 5fdea211d5..90cdcc4afd 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/LocalActivityWorker.java @@ -30,6 +30,7 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.slf4j.Logger; @@ -47,6 +48,8 @@ final class LocalActivityWorker implements Startable, Shutdownable { private final LocalActivityDispatcherImpl laScheduler; + private final AtomicInteger totalProcessedTasks = new AtomicInteger(); + private final AtomicInteger totalFailedTasks = new AtomicInteger(); private final PollerOptions pollerOptions; private final Scope workerMetricsScope; @@ -473,10 +476,17 @@ public void handle(LocalActivityAttemptTask attemptTask) throws Exception { } reason = handleResult(activityHandlerResult, attemptTask, metricsScope); + totalProcessedTasks.incrementAndGet(); + if (activityHandlerResult.getTaskFailed() != null + && !io.temporal.internal.common.FailureUtils.isBenignApplicationFailure( + activityHandlerResult.getTaskFailed().getFailure())) { + totalFailedTasks.incrementAndGet(); + } } catch (Throwable ex) { // handleLocalActivity is expected to never throw an exception and return a result // that can be used for a workflow callback if this method throws, it's a bug. log.error("[BUG] Code that expected to never throw an exception threw an exception", ex); + totalFailedTasks.incrementAndGet(); executionContext.callback( processingFailed(activityTask.getActivityId(), activityTask.getAttempt(), ex)); throw ex; @@ -748,10 +758,22 @@ private PollerOptions getPollerOptions(SingleWorkerOptions options) { return pollerOptions; } + public TrackingSlotSupplier getSlotSupplier() { + return slotSupplier; + } + public LocalActivityDispatcher getLocalActivityScheduler() { return laScheduler; } + public AtomicInteger getTotalProcessedTasks() { + return totalProcessedTasks; + } + + public AtomicInteger getTotalFailedTasks() { + return totalFailedTasks; + } + private static Failure newTimeoutFailure(TimeoutType timeoutType, @Nullable Failure cause) { TimeoutFailureInfo.Builder info = TimeoutFailureInfo.newBuilder().setTimeoutType(timeoutType); Failure.Builder result = Failure.newBuilder().setTimeoutFailureInfo(info); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusPollTask.java index 13e88690e6..4116825b9e 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusPollTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusPollTask.java @@ -14,7 +14,6 @@ import io.temporal.worker.PollerTypeMetricsTag; import io.temporal.worker.tuning.*; import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import javax.annotation.Nonnull; import org.slf4j.Logger; @@ -27,7 +26,7 @@ final class NexusPollTask implements MultiThreadedPoller.PollTask { private final TrackingSlotSupplier slotSupplier; private final Scope metricsScope; private final PollNexusTaskQueueRequest pollRequest; - private final AtomicInteger pollGauge = new AtomicInteger(); + private final PollerTracker pollerTracker; @SuppressWarnings("deprecation") public NexusPollTask( @@ -38,10 +37,12 @@ public NexusPollTask( @Nonnull WorkerVersioningOptions versioningOptions, @Nonnull TrackingSlotSupplier slotSupplier, @Nonnull Scope metricsScope, - @Nonnull Supplier serverCapabilities) { + @Nonnull Supplier serverCapabilities, + @Nonnull PollerTracker pollerTracker) { this.service = Objects.requireNonNull(service); this.slotSupplier = slotSupplier; this.metricsScope = Objects.requireNonNull(metricsScope); + this.pollerTracker = Objects.requireNonNull(pollerTracker); PollNexusTaskQueueRequest.Builder pollRequest = PollNexusTaskQueueRequest.newBuilder() @@ -89,7 +90,7 @@ public NexusTask poll() { MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.NEXUS_TASK) .gauge(MetricsType.NUM_POLLERS) - .update(pollGauge.incrementAndGet()); + .update(pollerTracker.pollStarted()); try { response = @@ -111,6 +112,7 @@ public NexusTask poll() { startedTime, response.getRequest().getScheduledTime())); isSuccessful = true; + pollerTracker.pollSucceeded(); return new NexusTask( response, permit, @@ -118,7 +120,7 @@ public NexusTask poll() { } finally { MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.NEXUS_TASK) .gauge(MetricsType.NUM_POLLERS) - .update(pollGauge.decrementAndGet()); + .update(pollerTracker.pollCompleted()); if (!isSuccessful) slotSupplier.releaseSlot(SlotReleaseReason.neverUsed(), permit); } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java index f5291cb16e..c980f36c27 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/NexusWorker.java @@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nonnull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +56,9 @@ final class NexusWorker implements SuspendableWorker { private final TrackingSlotSupplier slotSupplier; private final AtomicBoolean serverSupportsAutoscaling; private final boolean forceOldFailureFormat; + private final AtomicInteger totalProcessedTasks = new AtomicInteger(); + private final AtomicInteger totalFailedTasks = new AtomicInteger(); + private final PollerTracker pollerTracker = new PollerTracker(); public NexusWorker( @Nonnull WorkflowServiceStubs service, @@ -113,7 +117,8 @@ public boolean start() { options.getWorkerVersioningOptions(), workerMetricsScope, service.getServerCapabilities(), - this.slotSupplier), + this.slotSupplier, + pollerTracker), this.pollTaskExecutor, pollerOptions, serverSupportsAutoscaling.get(), @@ -130,7 +135,8 @@ public boolean start() { options.getWorkerVersioningOptions(), this.slotSupplier, workerMetricsScope, - service.getServerCapabilities()), + service.getServerCapabilities(), + pollerTracker), this.pollTaskExecutor, pollerOptions, workerMetricsScope); @@ -214,6 +220,26 @@ private PollerOptions getPollerOptions(SingleWorkerOptions options) { return pollerOptions; } + public TrackingSlotSupplier getSlotSupplier() { + return slotSupplier; + } + + public AtomicInteger getTotalProcessedTasks() { + return totalProcessedTasks; + } + + public AtomicInteger getTotalFailedTasks() { + return totalFailedTasks; + } + + public PollerOptions getPollerOptions() { + return pollerOptions; + } + + public PollerTracker getPollerTracker() { + return pollerTracker; + } + @Override public String toString() { return String.format( @@ -272,7 +298,14 @@ public void handle(NexusTask task) { task.getPermit()); try { - handleNexusTask(task, metricsScope); + boolean handlerFailed = handleNexusTask(task, metricsScope); + totalProcessedTasks.incrementAndGet(); + if (handlerFailed) { + totalFailedTasks.incrementAndGet(); + } + } catch (Throwable e) { + totalFailedTasks.incrementAndGet(); + throw e; } finally { task.getCompletionCallback().apply(); MDC.remove(LoggerTag.NEXUS_SERVICE); @@ -287,17 +320,22 @@ public Throwable wrapFailure(NexusTask task, Throwable failure) { "Failure processing nexus response: " + response.getRequest().toString(), failure); } + /** + * @return true if the handler reported a failure or error + */ @SuppressWarnings("deprecation") // Uses deprecated operationError - private void handleNexusTask(NexusTask task, Scope metricsScope) { + private boolean handleNexusTask(NexusTask task, Scope metricsScope) { PollNexusTaskQueueResponseOrBuilder pollResponse = task.getResponse(); ByteString taskToken = pollResponse.getTaskToken(); NexusTaskHandler.Result result; + boolean failed = false; Stopwatch sw = metricsScope.timer(MetricsType.NEXUS_EXEC_LATENCY).start(); try { result = handler.handle(task, metricsScope); if (result.getHandlerException() != null) { + failed = true; metricsScope .tagged( Collections.singletonMap( @@ -307,6 +345,7 @@ private void handleNexusTask(NexusTask task, Scope metricsScope) { .inc(1); } else if (result.getResponse().hasStartOperation() && result.getResponse().getStartOperation().hasOperationError()) { + failed = true; String operationState = result.getResponse().getStartOperation().getOperationError().getOperationState(); metricsScope @@ -315,6 +354,7 @@ private void handleNexusTask(NexusTask task, Scope metricsScope) { .inc(1); } else if (result.getResponse().hasStartOperation() && result.getResponse().getStartOperation().hasFailure()) { + failed = true; Failure f = result.getResponse().getStartOperation().getFailure(); String operationState; if (f.hasApplicationFailureInfo()) { @@ -333,7 +373,7 @@ private void handleNexusTask(NexusTask task, Scope metricsScope) { .tagged(Collections.singletonMap(TASK_FAILURE_TYPE, "timeout")) .counter(MetricsType.NEXUS_EXEC_FAILED_COUNTER) .inc(1); - return; + return true; } catch (Throwable e) { metricsScope .tagged(Collections.singletonMap(TASK_FAILURE_TYPE, "internal_sdk_error")) @@ -364,6 +404,7 @@ private void handleNexusTask(NexusTask task, Scope metricsScope) { Duration e2eDuration = ProtobufTimeUtils.toM3DurationSinceNow(pollResponse.getRequest().getScheduledTime()); metricsScope.timer(MetricsType.NEXUS_TASK_E2E_LATENCY).record(e2eDuration); + return failed; } private void logExceptionDuringResultReporting( diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/PollerTracker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/PollerTracker.java new file mode 100644 index 0000000000..e91d1f6082 --- /dev/null +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/PollerTracker.java @@ -0,0 +1,46 @@ +package io.temporal.internal.worker; + +import java.time.Instant; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Tracks in-flight poll count and last successful poll time for heartbeat reporting. + * + *

A single counter feeds both the metrics system and heartbeat population, avoiding the need for + * separate counters tracking the same value. + * + *

This object bridges the gap between poll tasks (created inside {@code start()}) and heartbeat + * building: the intermediate worker (e.g. ActivityWorker) creates it, passes it to the poll task, + * and later reads it when building heartbeats. + * + *

{@link #pollStarted()} and {@link #pollCompleted()} return the updated count so callers can + * forward the value to the Tally gauge in a single operation. + */ +public class PollerTracker { + private final AtomicInteger inFlightPolls = new AtomicInteger(); + private final AtomicReference lastSuccessfulPollTime = new AtomicReference<>(); + + /** Increments in-flight count. Returns the new value for forwarding to the Tally gauge. */ + public int pollStarted() { + return inFlightPolls.incrementAndGet(); + } + + /** Decrements in-flight count. Returns the new value for forwarding to the Tally gauge. */ + public int pollCompleted() { + return inFlightPolls.decrementAndGet(); + } + + /** Records the current time as the last successful poll (a poll that returned a task). */ + public void pollSucceeded() { + lastSuccessfulPollTime.set(Instant.now()); + } + + public int getInFlightPolls() { + return inFlightPolls.get(); + } + + public Instant getLastSuccessfulPollTime() { + return lastSuccessfulPollTime.get(); + } +} diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java index 87ddc7c4a0..ca2145e293 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncActivityWorker.java @@ -12,6 +12,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nonnull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -151,6 +152,26 @@ public EagerActivityDispatcher getEagerActivityDispatcher() { return this.worker.getEagerActivityDispatcher(); } + public TrackingSlotSupplier getSlotSupplier() { + return worker.getSlotSupplier(); + } + + public AtomicInteger getTotalProcessedTasks() { + return worker.getTotalProcessedTasks(); + } + + public AtomicInteger getTotalFailedTasks() { + return worker.getTotalFailedTasks(); + } + + public PollerOptions getPollerOptions() { + return worker.getPollerOptions(); + } + + public PollerTracker getPollerTracker() { + return worker.getPollerTracker(); + } + @Override public String toString() { return String.format( diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java index 23471f86e3..f154bfd825 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncNexusWorker.java @@ -7,6 +7,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nonnull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,6 +99,26 @@ public boolean isTerminated() { return worker.isTerminated(); } + public TrackingSlotSupplier getSlotSupplier() { + return worker.getSlotSupplier(); + } + + public AtomicInteger getTotalProcessedTasks() { + return worker.getTotalProcessedTasks(); + } + + public AtomicInteger getTotalFailedTasks() { + return worker.getTotalFailedTasks(); + } + + public PollerOptions getPollerOptions() { + return worker.getPollerOptions(); + } + + public PollerTracker getPollerTracker() { + return worker.getPollerTracker(); + } + @Override public WorkerLifecycleState getLifecycleState() { return worker.getLifecycleState(); diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java index 2b94effb6e..c5ebe8f0f7 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/SyncWorkflowWorker.java @@ -3,7 +3,9 @@ import static io.temporal.internal.common.InternalUtils.createStickyTaskQueue; import io.temporal.api.common.v1.Payloads; +import io.temporal.api.enums.v1.TaskQueueType; import io.temporal.api.taskqueue.v1.TaskQueue; +import io.temporal.api.worker.v1.WorkerHeartbeat; import io.temporal.client.WorkflowClient; import io.temporal.common.converter.DataConverter; import io.temporal.common.converter.EncodedValues; @@ -22,10 +24,13 @@ import io.temporal.workflow.Functions.Func1; import java.lang.reflect.Type; import java.time.Duration; +import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.slf4j.Logger; @@ -61,6 +66,8 @@ public SyncWorkflowWorker( @Nonnull WorkflowClient client, @Nonnull String namespace, @Nonnull String taskQueue, + @Nonnull String workerInstanceKey, + @Nonnull List activeTaskQueueTypes, @Nonnull SingleWorkerOptions singleWorkerOptions, @Nonnull SingleWorkerOptions localActivityOptions, @Nonnull WorkflowRunLockManager runLocks, @@ -118,6 +125,8 @@ public SyncWorkflowWorker( client.getWorkflowServiceStubs(), namespace, taskQueue, + workerInstanceKey, + activeTaskQueueTypes, stickyTaskQueueName, singleWorkerOptions, runLocks, @@ -235,7 +244,54 @@ public WorkerLifecycleState getLifecycleState() { return null; } - @Override + public TrackingSlotSupplier getWorkflowSlotSupplier() { + return workflowWorker.getSlotSupplier(); + } + + public TrackingSlotSupplier getLocalActivitySlotSupplier() { + return laWorker.getSlotSupplier(); + } + + public void setHeartbeatSupplier(Supplier supplier) { + workflowWorker.setHeartbeatSupplier(supplier); + } + + public boolean hasStickyQueue() { + return workflowWorker.hasStickyQueue(); + } + + public AtomicInteger getWorkflowTotalProcessedTasks() { + return workflowWorker.getTotalProcessedTasks(); + } + + public AtomicInteger getWorkflowTotalFailedTasks() { + return workflowWorker.getTotalFailedTasks(); + } + + public AtomicInteger getLocalActivityTotalProcessedTasks() { + return laWorker.getTotalProcessedTasks(); + } + + public AtomicInteger getLocalActivityTotalFailedTasks() { + return laWorker.getTotalFailedTasks(); + } + + public PollerOptions getWorkflowPollerOptions() { + return workflowWorker.getPollerOptions(); + } + + public PollerTracker getWorkflowPollerTracker() { + return workflowWorker.getPollerTracker(); + } + + public PollerTracker getStickyPollerTracker() { + return workflowWorker.getStickyPollerTracker(); + } + + public String getStickyTaskQueueName() { + return workflowWorker.getStickyTaskQueueName(); + } + public String toString() { return String.format( "SyncWorkflowWorker{namespace=%s, taskQueue=%s, identity=%s}", diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/TrackingSlotSupplier.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/TrackingSlotSupplier.java index ccd85a4699..4bbee2aa19 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/TrackingSlotSupplier.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/TrackingSlotSupplier.java @@ -24,13 +24,29 @@ public class TrackingSlotSupplier { private final AtomicInteger issuedSlots = new AtomicInteger(); private final Map usedSlots = new ConcurrentHashMap<>(); private final Scope metricsScope; + private final String supplierKind; public TrackingSlotSupplier(SlotSupplier inner, Scope metricsScope) { this.inner = inner; this.metricsScope = metricsScope; + this.supplierKind = determineSupplierKind(inner); publishSlotsMetric(); } + private static String determineSupplierKind(SlotSupplier supplier) { + if (supplier instanceof FixedSizeSlotSupplier) return "Fixed"; + if (supplier instanceof ResourceBasedSlotSupplier) return "ResourceBased"; + return supplier.getClass().getSimpleName(); + } + + public String getSupplierKind() { + return supplierKind; + } + + public int getUsedSlotCount() { + return usedSlots.size(); + } + public SlotSupplierFuture reserveSlot(SlotReservationData data) { final SlotSupplierFuture future; try { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowExecutorCache.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowExecutorCache.java index 69ed8beb45..bdd4c99017 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowExecutorCache.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowExecutorCache.java @@ -12,6 +12,7 @@ import io.temporal.worker.MetricsType; import java.util.Objects; import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicInteger; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.slf4j.Logger; @@ -23,6 +24,8 @@ public final class WorkflowExecutorCache { private final WorkflowRunLockManager runLockManager; private final Cache cache; private final Scope metricsScope; + private final AtomicInteger cacheHits = new AtomicInteger(); + private final AtomicInteger cacheMisses = new AtomicInteger(); public WorkflowExecutorCache( int workflowCacheSize, WorkflowRunLockManager runLockManager, Scope scope) { @@ -77,6 +80,7 @@ public WorkflowRunTaskHandler getOrCreate( if (workflowRunTaskHandler != null) { workflowTypeScope.counter(MetricsType.STICKY_CACHE_HIT).inc(1); + cacheHits.incrementAndGet(); return workflowRunTaskHandler; } @@ -85,6 +89,7 @@ public WorkflowRunTaskHandler getOrCreate( execution.getWorkflowId(), runId); workflowTypeScope.counter(MetricsType.STICKY_CACHE_MISS).inc(1); + cacheMisses.incrementAndGet(); return workflowExecutorFn.call(); } @@ -166,4 +171,16 @@ public void invalidateAll() { cache.invalidateAll(); metricsScope.gauge(MetricsType.STICKY_CACHE_SIZE).update(size()); } + + public int getCacheHits() { + return cacheHits.get(); + } + + public int getCacheMisses() { + return cacheMisses.get(); + } + + public int getCurrentCacheSize() { + return (int) cache.size(); + } } diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java index fa5e3cc796..cdb5e51639 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowPollTask.java @@ -21,7 +21,6 @@ import io.temporal.worker.tuning.SlotSupplierFuture; import io.temporal.worker.tuning.WorkflowSlotInfo; import java.util.Objects; -import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -38,8 +37,8 @@ final class WorkflowPollTask implements MultiThreadedPoller.PollTask slotSupplier, @Nonnull StickyQueueBalancer stickyQueueBalancer, @Nonnull Scope workerMetricsScope, - @Nonnull Supplier serverCapabilities) { + @Nonnull Supplier serverCapabilities, + @Nonnull PollerTracker pollerTracker, + @Nonnull PollerTracker stickyPollerTracker) { this.slotSupplier = Objects.requireNonNull(slotSupplier); this.stickyQueueBalancer = Objects.requireNonNull(stickyQueueBalancer); this.metricsScope = Objects.requireNonNull(workerMetricsScope); + this.pollerTracker = Objects.requireNonNull(pollerTracker); + this.stickyPollerTracker = Objects.requireNonNull(stickyPollerTracker); this.stickyMetricsScope = workerMetricsScope.tagged( new ImmutableMap.Builder(1) @@ -133,16 +136,17 @@ public WorkflowTask poll() { boolean isSticky = TaskQueueKind.TASK_QUEUE_KIND_STICKY.equals(taskQueueKind); PollWorkflowTaskQueueRequest request = isSticky ? stickyPollRequest : pollRequest; Scope scope = isSticky ? stickyMetricsScope : metricsScope; + PollerTracker tracker = isSticky ? stickyPollerTracker : pollerTracker; log.trace("poll request begin: {}", request); if (isSticky) { MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.WORKFLOW_STICKY_TASK) .gauge(MetricsType.NUM_POLLERS) - .update(stickyPollGauge.incrementAndGet()); + .update(stickyPollerTracker.pollStarted()); } else { MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.WORKFLOW_TASK) .gauge(MetricsType.NUM_POLLERS) - .update(normalPollGauge.incrementAndGet()); + .update(pollerTracker.pollStarted()); } try { @@ -151,19 +155,19 @@ public WorkflowTask poll() { return null; } isSuccessful = true; + tracker.pollSucceeded(); stickyQueueBalancer.finishPoll(taskQueueKind, response.getBacklogCountHint()); slotSupplier.markSlotUsed(new WorkflowSlotInfo(response, pollRequest), permit); return new WorkflowTask(response, (rr) -> slotSupplier.releaseSlot(rr, permit)); } finally { - if (isSticky) { MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.WORKFLOW_STICKY_TASK) .gauge(MetricsType.NUM_POLLERS) - .update(stickyPollGauge.decrementAndGet()); + .update(stickyPollerTracker.pollCompleted()); } else { MetricsTag.tagged(metricsScope, PollerTypeMetricsTag.PollerType.WORKFLOW_TASK) .gauge(MetricsType.NUM_POLLERS) - .update(normalPollGauge.decrementAndGet()); + .update(pollerTracker.pollCompleted()); } if (!isSuccessful) { diff --git a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java index 4986808c49..fe4186b241 100644 --- a/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java +++ b/temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java @@ -13,8 +13,11 @@ import io.temporal.api.common.v1.WorkflowExecution; import io.temporal.api.enums.v1.QueryResultType; import io.temporal.api.enums.v1.TaskQueueKind; +import io.temporal.api.enums.v1.TaskQueueType; +import io.temporal.api.enums.v1.WorkerStatus; import io.temporal.api.enums.v1.WorkflowTaskFailedCause; import io.temporal.api.failure.v1.Failure; +import io.temporal.api.worker.v1.WorkerHeartbeat; import io.temporal.api.workflowservice.v1.*; import io.temporal.failure.ApplicationFailure; import io.temporal.internal.logging.LoggerTag; @@ -31,6 +34,8 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.slf4j.Logger; @@ -55,6 +60,14 @@ final class WorkflowWorker implements SuspendableWorker { private final GrpcRetryer grpcRetryer; private final EagerActivityDispatcher eagerActivityDispatcher; private final TrackingSlotSupplier slotSupplier; + private volatile Supplier heartbeatSupplier; + private final String workerInstanceKey; + private final List activeTaskQueueTypes; + + private final AtomicInteger totalProcessedTasks = new AtomicInteger(); + private final AtomicInteger totalFailedTasks = new AtomicInteger(); + private final PollerTracker pollerTracker = new PollerTracker(); + private final PollerTracker stickyPollerTracker = new PollerTracker(); private final AtomicBoolean serverSupportsAutoscaling; private PollTaskExecutor pollTaskExecutor; @@ -69,6 +82,8 @@ public WorkflowWorker( @Nonnull WorkflowServiceStubs service, @Nonnull String namespace, @Nonnull String taskQueue, + @Nonnull String workerInstanceKey, + @Nonnull List activeTaskQueueTypes, @Nullable String stickyTaskQueueName, @Nonnull SingleWorkerOptions options, @Nonnull WorkflowRunLockManager runLocks, @@ -80,6 +95,8 @@ public WorkflowWorker( this.service = Objects.requireNonNull(service); this.namespace = Objects.requireNonNull(namespace); this.taskQueue = Objects.requireNonNull(taskQueue); + this.workerInstanceKey = Objects.requireNonNull(workerInstanceKey); + this.activeTaskQueueTypes = Objects.requireNonNull(activeTaskQueueTypes); this.options = Objects.requireNonNull(options); this.stickyTaskQueueName = stickyTaskQueueName; this.pollerOptions = getPollerOptions(options); @@ -122,7 +139,8 @@ public boolean start() { options.getWorkerVersioningOptions(), slotSupplier, workerMetricsScope, - service.getServerCapabilities()); + service.getServerCapabilities(), + pollerTracker); pollers = Arrays.asList( new AsyncWorkflowPollTask( @@ -134,7 +152,8 @@ public boolean start() { options.getWorkerVersioningOptions(), slotSupplier, workerMetricsScope, - service.getServerCapabilities()), + service.getServerCapabilities(), + stickyPollerTracker), normalPoller); this.stickyQueueBalancer = normalPoller; } else { @@ -149,7 +168,8 @@ public boolean start() { options.getWorkerVersioningOptions(), slotSupplier, workerMetricsScope, - service.getServerCapabilities())); + service.getServerCapabilities(), + pollerTracker)); } poller = new AsyncPoller<>( @@ -180,7 +200,9 @@ public boolean start() { slotSupplier, stickyQueueBalancer, workerMetricsScope, - service.getServerCapabilities()), + service.getServerCapabilities(), + pollerTracker, + stickyPollerTracker), pollTaskExecutor, pollerOptions, workerMetricsScope); @@ -216,19 +238,25 @@ public CompletableFuture shutdown(ShutdownManager shutdownManager, boolean return CompletableFuture.allOf( pollerShutdown.thenCompose( ignore -> { - if (!interruptTasks && stickyTaskQueueName != null) { - return shutdownManager.waitOnWorkerShutdownRequest( - service - .futureStub() - .shutdownWorker( - ShutdownWorkerRequest.newBuilder() - .setIdentity(options.getIdentity()) - .setNamespace(namespace) - .setStickyTaskQueue(stickyTaskQueueName) - .setReason(GRACEFUL_SHUTDOWN_MESSAGE) - .build())); + ShutdownWorkerRequest.Builder shutdownReq = + ShutdownWorkerRequest.newBuilder() + .setIdentity(options.getIdentity()) + .setNamespace(namespace) + .setTaskQueue(taskQueue) + .setWorkerInstanceKey(workerInstanceKey) + .setReason(GRACEFUL_SHUTDOWN_MESSAGE) + .addAllTaskQueueTypes(activeTaskQueueTypes); + if (stickyTaskQueueName != null) { + shutdownReq.setStickyTaskQueue(stickyTaskQueueName); } - return CompletableFuture.completedFuture(null); + if (heartbeatSupplier != null) { + shutdownReq.setWorkerHeartbeat( + heartbeatSupplier.get().toBuilder() + .setStatus(WorkerStatus.WORKER_STATUS_SHUTTING_DOWN) + .build()); + } + return shutdownManager.waitOnWorkerShutdownRequest( + service.futureStub().shutdownWorker(shutdownReq.build())); }), pollerShutdown .thenCompose( @@ -338,6 +366,42 @@ public WorkflowTaskDispatchHandle reserveWorkflowExecutor() { .orElse(null); } + public void setHeartbeatSupplier(Supplier supplier) { + this.heartbeatSupplier = supplier; + } + + public TrackingSlotSupplier getSlotSupplier() { + return slotSupplier; + } + + public boolean hasStickyQueue() { + return stickyTaskQueueName != null; + } + + public String getStickyTaskQueueName() { + return stickyTaskQueueName; + } + + public AtomicInteger getTotalProcessedTasks() { + return totalProcessedTasks; + } + + public AtomicInteger getTotalFailedTasks() { + return totalFailedTasks; + } + + public PollerOptions getPollerOptions() { + return pollerOptions; + } + + public PollerTracker getPollerTracker() { + return pollerTracker; + } + + public PollerTracker getStickyPollerTracker() { + return stickyPollerTracker; + } + @Override public String toString() { return String.format( @@ -368,6 +432,7 @@ public void handle(WorkflowTask task) throws Exception { MDC.put(LoggerTag.RUN_ID, runId); boolean locked = false; + boolean taskProcessingFailed = false; Stopwatch swTotal = workflowTypeScope.timer(MetricsType.WORKFLOW_TASK_EXECUTION_TOTAL_LATENCY).start(); @@ -511,6 +576,7 @@ public void handle(WorkflowTask task) throws Exception { } } } catch (Exception e) { + taskProcessingFailed = true; releaseReason = SlotReleaseReason.error(e); handleReportingFailure(e, currentTask, result, workflowExecution, workflowTypeScope); throw e; @@ -547,6 +613,11 @@ public void handle(WorkflowTask task) throws Exception { MDC.remove(LoggerTag.WORKFLOW_TYPE); MDC.remove(LoggerTag.RUN_ID); + totalProcessedTasks.incrementAndGet(); + if (taskProcessingFailed) { + totalFailedTasks.incrementAndGet(); + } + if (locked) { runLocks.unlock(runId); } diff --git a/temporal-sdk/src/main/java/io/temporal/worker/Worker.java b/temporal-sdk/src/main/java/io/temporal/worker/Worker.java index 5ffd300e9e..ed9372e8e9 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/Worker.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/Worker.java @@ -5,6 +5,14 @@ import com.google.common.base.Strings; import com.uber.m3.tally.Scope; import com.uber.m3.util.ImmutableMap; +import io.temporal.api.deployment.v1.WorkerDeploymentVersion; +import io.temporal.api.enums.v1.TaskQueueType; +import io.temporal.api.enums.v1.WorkerStatus; +import io.temporal.api.worker.v1.PluginInfo; +import io.temporal.api.worker.v1.WorkerHeartbeat; +import io.temporal.api.worker.v1.WorkerHostInfo; +import io.temporal.api.worker.v1.WorkerPollerInfo; +import io.temporal.api.worker.v1.WorkerSlotsInfo; import io.temporal.client.WorkflowClient; import io.temporal.client.WorkflowClientOptions; import io.temporal.common.Experimental; @@ -17,11 +25,17 @@ import io.temporal.internal.sync.WorkflowThreadExecutor; import io.temporal.internal.worker.*; import io.temporal.serviceclient.MetricsTag; +import io.temporal.serviceclient.Version; import io.temporal.worker.tuning.*; import io.temporal.workflow.Functions; import io.temporal.workflow.Functions.Func; import io.temporal.workflow.WorkflowMethod; +import java.lang.management.ManagementFactory; +import java.net.InetAddress; import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -29,6 +43,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.slf4j.Logger; @@ -47,6 +64,13 @@ public final class Worker { final SyncActivityWorker activityWorker; final SyncNexusWorker nexusWorker; private final AtomicBoolean started = new AtomicBoolean(); + private volatile boolean shuttingDown = false; + private boolean hasNexusServices = false; + private final String workerInstanceKey = UUID.randomUUID().toString(); + private final Instant startTime = Instant.now(); + private final WorkflowClientOptions clientOptions; + private final @Nonnull WorkflowExecutorCache cache; + private final Map previousSnapshots = new HashMap<>(); /** * Creates worker that connects to an instance of the Temporal Service. @@ -78,6 +102,8 @@ public final class Worker { !Strings.isNullOrEmpty(taskQueue), "taskQueue should not be an empty string"); this.taskQueue = taskQueue; this.options = WorkerOptions.newBuilder(options).validateAndBuildWithDefaults(); + this.clientOptions = client.getOptions(); + this.cache = cache; factoryOptions = WorkerFactoryOptions.newBuilder(factoryOptions).validateAndBuildWithDefaults(); WorkflowClientOptions clientOptions = client.getOptions(); String namespace = clientOptions.getNamespace(); @@ -158,6 +184,8 @@ public final class Worker { client, namespace, taskQueue, + workerInstanceKey, + getActiveTaskQueueTypes(), singleWorkerOptions, localActivityOptions, runLocks, @@ -411,6 +439,7 @@ public void registerNexusServiceImplementation(Object... nexusServiceImplementat !started.get(), "registerNexusServiceImplementation is not allowed after worker has started"); nexusWorker.registerNexusServiceImplementation(nexusServiceImplementations); + hasNexusServices = true; } void start() { @@ -425,6 +454,7 @@ void start() { } CompletableFuture shutdown(ShutdownManager shutdownManager, boolean interruptUserTasks) { + shuttingDown = true; CompletableFuture workflowWorkerShutdownFuture = workflowWorker.shutdown(shutdownManager, interruptUserTasks); CompletableFuture nexusWorkerShutdownFuture = @@ -457,6 +487,207 @@ void awaitTermination(long timeout, TimeUnit unit) { ShutdownManager.awaitTermination(workflowWorker, timeoutMillis); } + String getWorkerInstanceKey() { + return workerInstanceKey; + } + + List getActiveTaskQueueTypes() { + List types = new ArrayList<>(); + types.add(TaskQueueType.TASK_QUEUE_TYPE_WORKFLOW); + if (activityWorker != null) { + types.add(TaskQueueType.TASK_QUEUE_TYPE_ACTIVITY); + } + if (hasNexusServices) { + types.add(TaskQueueType.TASK_QUEUE_TYPE_NEXUS); + } + return types; + } + + Supplier buildHeartbeatCallback(String workerGroupingKey) { + // The callback can be invoked concurrently from the heartbeat scheduler and the shutdown path + final Object callbackLock = new Object(); + final AtomicReference lastHeartbeatTime = new AtomicReference<>(null); + return () -> { + synchronized (callbackLock) { + Instant now = Instant.now(); + WorkerHeartbeat.Builder hb = + WorkerHeartbeat.newBuilder() + .setWorkerInstanceKey(workerInstanceKey) + .setWorkerIdentity(clientOptions.getIdentity()) + .setTaskQueue(taskQueue) + .setSdkName(Version.SDK_NAME) + .setSdkVersion(Version.LIBRARY_VERSION) + .setStatus( + shuttingDown + ? WorkerStatus.WORKER_STATUS_SHUTTING_DOWN + : WorkerStatus.WORKER_STATUS_RUNNING) + .setStartTime(toProtoTimestamp(startTime)) + .setHeartbeatTime(toProtoTimestamp(now)); + + Instant previousHeartbeat = lastHeartbeatTime.get(); + if (previousHeartbeat != null) { + Duration elapsed = Duration.between(previousHeartbeat, now); + hb.setElapsedSinceLastHeartbeat( + com.google.protobuf.Duration.newBuilder() + .setSeconds(elapsed.getSeconds()) + .setNanos(elapsed.getNano()) + .build()); + } + lastHeartbeatTime.set(now); + + // Deployment version + if (options.getDeploymentOptions() != null + && options.getDeploymentOptions().getVersion() != null) { + hb.setDeploymentVersion( + WorkerDeploymentVersion.newBuilder() + .setDeploymentName( + options.getDeploymentOptions().getVersion().getDeploymentName()) + .setBuildId(options.getDeploymentOptions().getVersion().getBuildId()) + .build()); + } + + hb.setHostInfo(buildHostInfo(workerGroupingKey)); + + // Slot info with task counters + hb.setWorkflowTaskSlotsInfo( + buildSlotsInfo( + "workflow", + workflowWorker.getWorkflowSlotSupplier(), + workflowWorker.getWorkflowTotalProcessedTasks(), + workflowWorker.getWorkflowTotalFailedTasks())); + + if (activityWorker != null) { + hb.setActivityTaskSlotsInfo( + buildSlotsInfo( + "activity", + activityWorker.getSlotSupplier(), + activityWorker.getTotalProcessedTasks(), + activityWorker.getTotalFailedTasks())); + } + + hb.setLocalActivitySlotsInfo( + buildSlotsInfo( + "local-activity", + workflowWorker.getLocalActivitySlotSupplier(), + workflowWorker.getLocalActivityTotalProcessedTasks(), + workflowWorker.getLocalActivityTotalFailedTasks())); + + hb.setNexusTaskSlotsInfo( + buildSlotsInfo( + "nexus", + nexusWorker.getSlotSupplier(), + nexusWorker.getTotalProcessedTasks(), + nexusWorker.getTotalFailedTasks())); + + // Poller info + hb.setWorkflowPollerInfo( + buildPollerInfo( + workflowWorker.getWorkflowPollerOptions(), + workflowWorker.getWorkflowPollerTracker())); + if (workflowWorker.getStickyTaskQueueName() != null) { + hb.setWorkflowStickyPollerInfo( + buildPollerInfo( + workflowWorker.getWorkflowPollerOptions(), + workflowWorker.getStickyPollerTracker())); + } + if (activityWorker != null) { + hb.setActivityPollerInfo( + buildPollerInfo( + activityWorker.getPollerOptions(), activityWorker.getPollerTracker())); + } + hb.setNexusPollerInfo( + buildPollerInfo(nexusWorker.getPollerOptions(), nexusWorker.getPollerTracker())); + + // Sticky cache stats + hb.setTotalStickyCacheHit(cache.getCacheHits()); + hb.setTotalStickyCacheMiss(cache.getCacheMisses()); + hb.setCurrentStickyCacheSize(cache.getCurrentCacheSize()); + + // Plugins + for (WorkerPlugin plugin : plugins) { + hb.addPlugins(PluginInfo.newBuilder().setName(plugin.getName()).build()); + } + + return hb.build(); + } + }; + } + + private WorkerSlotsInfo buildSlotsInfo( + String key, + TrackingSlotSupplier tracker, + AtomicInteger totalProcessed, + AtomicInteger totalFailed) { + int maxSlots = tracker.maximumSlots().orElse(-1); + int usedSlots = tracker.getUsedSlotCount(); + int currentProcessed = totalProcessed.get(); + int currentFailed = totalFailed.get(); + TaskSnapshot previous = + previousSnapshots.put(key, new TaskSnapshot(currentProcessed, currentFailed)); + int intervalProcessed = previous != null ? currentProcessed - previous.processed : 0; + int intervalFailed = previous != null ? currentFailed - previous.failed : 0; + return WorkerSlotsInfo.newBuilder() + .setCurrentAvailableSlots(maxSlots >= 0 ? Math.max(0, maxSlots - usedSlots) : -1) + .setCurrentUsedSlots(usedSlots) + .setSlotSupplierKind(tracker.getSupplierKind()) + .setTotalProcessedTasks(currentProcessed) + .setTotalFailedTasks(currentFailed) + .setLastIntervalProcessedTasks(intervalProcessed) + .setLastIntervalFailureTasks(intervalFailed) + .build(); + } + + private static WorkerPollerInfo buildPollerInfo( + PollerOptions pollerOptions, PollerTracker tracker) { + WorkerPollerInfo.Builder builder = WorkerPollerInfo.newBuilder(); + PollerBehavior behavior = pollerOptions.getPollerBehavior(); + if (behavior instanceof PollerBehaviorAutoscaling) { + builder.setIsAutoscaling(true); + } + builder.setCurrentPollers(tracker.getInFlightPolls()); + Instant lastPoll = tracker.getLastSuccessfulPollTime(); + if (lastPoll != null) { + builder.setLastSuccessfulPollTime(toProtoTimestamp(lastPoll)); + } + return builder.build(); + } + + private static final JVMSystemResourceInfo systemResourceInfo = new JVMSystemResourceInfo(); + + private static final String CACHED_HOSTNAME; + private static final String CACHED_PID; + + static { + String h; + try { + h = InetAddress.getLocalHost().getHostName(); + } catch (Exception e) { + h = "unknown"; + } + CACHED_HOSTNAME = h; + + String name = ManagementFactory.getRuntimeMXBean().getName(); + int atIndex = name.indexOf('@'); + CACHED_PID = atIndex > 0 ? name.substring(0, atIndex) : "unknown"; + } + + private static WorkerHostInfo buildHostInfo(String workerGroupingKey) { + return WorkerHostInfo.newBuilder() + .setHostName(CACHED_HOSTNAME) + .setWorkerGroupingKey(workerGroupingKey) + .setProcessId(CACHED_PID) + .setCurrentHostCpuUsage((float) systemResourceInfo.getCPUUsagePercent()) + .setCurrentHostMemUsage((float) systemResourceInfo.getMemoryUsagePercent()) + .build(); + } + + private static com.google.protobuf.Timestamp toProtoTimestamp(Instant instant) { + return com.google.protobuf.Timestamp.newBuilder() + .setSeconds(instant.getEpochSecond()) + .setNanos(instant.getNano()) + .build(); + } + @Override public String toString() { return "Worker{" + "options=" + options + '}'; @@ -753,4 +984,14 @@ private static void attachMetricsToResourceController( .setMetricsScope(metricsScope); } } + + private static final class TaskSnapshot { + final int processed; + final int failed; + + TaskSnapshot(int processed, int failed) { + this.processed = processed; + this.failed = failed; + } + } } diff --git a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java index 3a22b7351b..26af299a2c 100644 --- a/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java +++ b/temporal-sdk/src/main/java/io/temporal/worker/WorkerFactory.java @@ -4,6 +4,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.uber.m3.tally.Scope; +import io.temporal.api.worker.v1.WorkerHeartbeat; import io.temporal.api.workflowservice.v1.DescribeNamespaceRequest; import io.temporal.api.workflowservice.v1.DescribeNamespaceResponse; import io.temporal.client.WorkflowClient; @@ -13,6 +14,7 @@ import io.temporal.internal.common.PluginUtils; import io.temporal.internal.sync.WorkflowThreadExecutor; import io.temporal.internal.task.VirtualThreadDelegate; +import io.temporal.internal.worker.HeartbeatManager; import io.temporal.internal.worker.ShutdownManager; import io.temporal.internal.worker.WorkflowExecutorCache; import io.temporal.internal.worker.WorkflowRunLockManager; @@ -33,6 +35,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.function.Supplier; import javax.annotation.Nonnull; import javax.annotation.Nullable; import org.slf4j.Logger; @@ -62,6 +65,7 @@ public final class WorkerFactory { private final AtomicBoolean pollerAutoscaling = new AtomicBoolean(false); private State state = State.Initial; + private boolean heartbeatsSupported; private final String statusErrorMessage = "attempted to %s while in %s state. Acceptable States: %s"; @@ -265,6 +269,15 @@ public synchronized void start() { DescribeNamespaceRequest.newBuilder() .setNamespace(workflowClient.getOptions().getNamespace()) .build()); + boolean heartbeatsSupported = + describeNamespaceResponse.getNamespaceInfo().getCapabilities().getWorkerHeartbeats(); + if (!heartbeatsSupported) { + log.debug( + "Server does not support worker heartbeats for namespace {}", + workflowClient.getOptions().getNamespace()); + } + this.heartbeatsSupported = heartbeatsSupported; + if (describeNamespaceResponse.getNamespaceInfo().getCapabilities().getPollerAutoscaling()) { pollerAutoscaling.set(true); } @@ -300,6 +313,20 @@ private void doStart() { startChain.accept(taskQueue, worker); } + // Register heartbeat callbacks after workers are started. + WorkflowClientInternal clientInternal = (WorkflowClientInternal) workflowClient.getInternal(); + HeartbeatManager hbManager = clientInternal.getHeartbeatManager(); + if (hbManager != null && heartbeatsSupported) { + String namespace = workflowClient.getOptions().getNamespace(); + String workerGroupingKey = clientInternal.getWorkerGroupingKey(); + for (Worker worker : workers.values()) { + Supplier heartbeatSupplier = + worker.buildHeartbeatCallback(workerGroupingKey); + hbManager.registerWorker(namespace, worker.getWorkerInstanceKey(), heartbeatSupplier); + worker.workflowWorker.setHeartbeatSupplier(heartbeatSupplier); + } + } + state = State.Started; ((WorkflowClientInternal) workflowClient.getInternal()).registerWorkerFactory(this); } @@ -418,6 +445,18 @@ private void doShutdown(boolean interruptUserTasks) { CompletableFuture.allOf(shutdownFutures.toArray(new CompletableFuture[0])) .thenApply( r -> { + // Unregister workers from heartbeat manager only after full shutdown, + // so heartbeats continue reporting SHUTTING_DOWN until the worker is fully stopped. + if (heartbeatsSupported) { + HeartbeatManager hbManager = + ((WorkflowClientInternal) workflowClient.getInternal()).getHeartbeatManager(); + if (hbManager != null) { + String namespace = workflowClient.getOptions().getNamespace(); + for (Worker worker : workers.values()) { + hbManager.unregisterWorker(namespace, worker.getWorkerInstanceKey()); + } + } + } cache.invalidateAll(); workflowThreadPool.shutdownNow(); return null; diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/HeartbeatManagerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/HeartbeatManagerTest.java new file mode 100644 index 0000000000..75d0c71827 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/HeartbeatManagerTest.java @@ -0,0 +1,233 @@ +package io.temporal.internal.worker; + +import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +import io.temporal.api.worker.v1.WorkerHeartbeat; +import io.temporal.api.workflowservice.v1.*; +import io.temporal.serviceclient.WorkflowServiceStubs; +import java.time.Duration; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.ArgumentCaptor; + +public class HeartbeatManagerTest { + + private WorkflowServiceStubs service; + private WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub; + private HeartbeatManager manager; + + @Before + public void setUp() { + service = mock(WorkflowServiceStubs.class); + blockingStub = mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class); + when(service.blockingStub()).thenReturn(blockingStub); + when(blockingStub.recordWorkerHeartbeat(any())) + .thenReturn(RecordWorkerHeartbeatResponse.getDefaultInstance()); + } + + @After + public void tearDown() { + if (manager != null) { + manager.shutdown(); + } + } + + @Test + public void testHeartbeatRpcSentAtInterval() throws Exception { + manager = new HeartbeatManager(service, "test-identity", Duration.ofSeconds(1)); + + WorkerHeartbeat hb = + WorkerHeartbeat.newBuilder() + .setWorkerInstanceKey("worker-1") + .setTaskQueue("test-queue") + .build(); + manager.registerWorker("default", "worker-1", () -> hb); + + verify(blockingStub, timeout(3000).atLeastOnce()).recordWorkerHeartbeat(any()); + + ArgumentCaptor captor = + ArgumentCaptor.forClass(RecordWorkerHeartbeatRequest.class); + verify(blockingStub, atLeastOnce()).recordWorkerHeartbeat(captor.capture()); + + RecordWorkerHeartbeatRequest request = captor.getValue(); + assertEquals("default", request.getNamespace()); + assertEquals("test-identity", request.getIdentity()); + assertTrue(request.getWorkerHeartbeatCount() > 0); + assertEquals("test-queue", request.getWorkerHeartbeat(0).getTaskQueue()); + } + + @Test + public void testMultipleWorkersInSingleRpc() throws Exception { + manager = new HeartbeatManager(service, "test-identity", Duration.ofSeconds(1)); + + WorkerHeartbeat hb1 = + WorkerHeartbeat.newBuilder() + .setWorkerInstanceKey("worker-1") + .setTaskQueue("queue-1") + .build(); + WorkerHeartbeat hb2 = + WorkerHeartbeat.newBuilder() + .setWorkerInstanceKey("worker-2") + .setTaskQueue("queue-2") + .build(); + manager.registerWorker("default", "worker-1", () -> hb1); + manager.registerWorker("default", "worker-2", () -> hb2); + + // Wait for enough ticks so both workers are captured in at least one RPC + verify(blockingStub, timeout(5000).atLeast(2)).recordWorkerHeartbeat(any()); + + ArgumentCaptor captor = + ArgumentCaptor.forClass(RecordWorkerHeartbeatRequest.class); + verify(blockingStub, atLeast(2)).recordWorkerHeartbeat(captor.capture()); + + boolean foundBoth = + captor.getAllValues().stream().anyMatch(req -> req.getWorkerHeartbeatCount() == 2); + assertTrue("Expected at least one RPC with 2 worker heartbeats", foundBoth); + } + + @Test + public void testUnregisterStopsRpcWhenEmpty() throws Exception { + manager = new HeartbeatManager(service, "test-identity", Duration.ofSeconds(1)); + + WorkerHeartbeat hb = WorkerHeartbeat.newBuilder().setWorkerInstanceKey("worker-1").build(); + manager.registerWorker("default", "worker-1", () -> hb); + + verify(blockingStub, timeout(3000).atLeastOnce()).recordWorkerHeartbeat(any()); + + manager.unregisterWorker("default", "worker-1"); + clearInvocations(blockingStub); + + Thread.sleep(2000); + verify(blockingStub, never()).recordWorkerHeartbeat(any()); + } + + @Test + public void testDifferentNamespacesGetSeparateRpcs() throws Exception { + manager = new HeartbeatManager(service, "test-identity", Duration.ofSeconds(1)); + + WorkerHeartbeat hb1 = WorkerHeartbeat.newBuilder().setWorkerInstanceKey("worker-ns1").build(); + WorkerHeartbeat hb2 = WorkerHeartbeat.newBuilder().setWorkerInstanceKey("worker-ns2").build(); + manager.registerWorker("namespace-1", "worker-ns1", () -> hb1); + manager.registerWorker("namespace-2", "worker-ns2", () -> hb2); + + verify(blockingStub, timeout(5000).atLeast(2)).recordWorkerHeartbeat(any()); + + ArgumentCaptor captor = + ArgumentCaptor.forClass(RecordWorkerHeartbeatRequest.class); + verify(blockingStub, atLeast(2)).recordWorkerHeartbeat(captor.capture()); + + boolean foundNs1 = + captor.getAllValues().stream().anyMatch(req -> "namespace-1".equals(req.getNamespace())); + boolean foundNs2 = + captor.getAllValues().stream().anyMatch(req -> "namespace-2".equals(req.getNamespace())); + assertTrue("Expected heartbeat RPC for namespace-1", foundNs1); + assertTrue("Expected heartbeat RPC for namespace-2", foundNs2); + + // Each RPC should only contain workers for its own namespace + boolean noMixing = + captor.getAllValues().stream().allMatch(req -> req.getWorkerHeartbeatCount() == 1); + assertTrue("Each namespace RPC should contain exactly 1 worker", noMixing); + } + + @Test + public void testExceptionsCaughtAndLogged() throws Exception { + when(blockingStub.recordWorkerHeartbeat(any())).thenThrow(new RuntimeException("test error")); + + manager = new HeartbeatManager(service, "test-identity", Duration.ofSeconds(1)); + + WorkerHeartbeat hb = WorkerHeartbeat.newBuilder().setWorkerInstanceKey("worker-1").build(); + manager.registerWorker("default", "worker-1", () -> hb); + + // Wait for at least 2 ticks — proves the scheduler survived the exception + verify(blockingStub, timeout(5000).atLeast(2)).recordWorkerHeartbeat(any()); + } + + @Test + public void testNoRpcsWhenNoWorkersRegistered() throws Exception { + manager = new HeartbeatManager(service, "test-identity", Duration.ofSeconds(1)); + + Thread.sleep(2000); + verify(blockingStub, never()).recordWorkerHeartbeat(any()); + } + + @Test + public void testUnimplementedStopsScheduler() throws Exception { + when(blockingStub.recordWorkerHeartbeat(any())) + .thenThrow(new io.grpc.StatusRuntimeException(io.grpc.Status.UNIMPLEMENTED)); + + manager = new HeartbeatManager(service, "test-identity", Duration.ofSeconds(1)); + + WorkerHeartbeat hb = WorkerHeartbeat.newBuilder().setWorkerInstanceKey("worker-1").build(); + manager.registerWorker("default", "worker-1", () -> hb); + + // Wait for the first tick to hit UNIMPLEMENTED + verify(blockingStub, timeout(3000).atLeastOnce()).recordWorkerHeartbeat(any()); + + // After UNIMPLEMENTED, scheduler should stop — no more RPCs + clearInvocations(blockingStub); + Thread.sleep(2000); + verify(blockingStub, never()).recordWorkerHeartbeat(any()); + } + + @Test + public void testUnregisterFromOneNamespaceDoesNotAffectAnother() throws Exception { + manager = new HeartbeatManager(service, "test-identity", Duration.ofSeconds(1)); + + WorkerHeartbeat hb1 = WorkerHeartbeat.newBuilder().setWorkerInstanceKey("worker-ns1").build(); + WorkerHeartbeat hb2 = WorkerHeartbeat.newBuilder().setWorkerInstanceKey("worker-ns2").build(); + manager.registerWorker("namespace-1", "worker-ns1", () -> hb1); + manager.registerWorker("namespace-2", "worker-ns2", () -> hb2); + + // Both namespaces heartbeating + verify(blockingStub, timeout(5000).atLeast(2)).recordWorkerHeartbeat(any()); + + // Unregister from namespace-1 only + manager.unregisterWorker("namespace-1", "worker-ns1"); + clearInvocations(blockingStub); + + // namespace-2 should continue heartbeating + verify(blockingStub, timeout(5000).atLeastOnce()).recordWorkerHeartbeat(any()); + + ArgumentCaptor captor = + ArgumentCaptor.forClass(RecordWorkerHeartbeatRequest.class); + verify(blockingStub, atLeastOnce()).recordWorkerHeartbeat(captor.capture()); + + // All RPCs after unregister should be for namespace-2 only + assertTrue( + "Only namespace-2 RPCs expected after unregistering namespace-1", + captor.getAllValues().stream().allMatch(req -> "namespace-2".equals(req.getNamespace()))); + } + + @Test + public void testNamespaceSchedulerStopsWhenLastWorkerUnregisters() throws Exception { + manager = new HeartbeatManager(service, "test-identity", Duration.ofSeconds(1)); + + WorkerHeartbeat hb1 = WorkerHeartbeat.newBuilder().setWorkerInstanceKey("worker-1").build(); + WorkerHeartbeat hb2 = WorkerHeartbeat.newBuilder().setWorkerInstanceKey("worker-2").build(); + manager.registerWorker("default", "worker-1", () -> hb1); + manager.registerWorker("default", "worker-2", () -> hb2); + + verify(blockingStub, timeout(3000).atLeastOnce()).recordWorkerHeartbeat(any()); + + // Unregister first worker — namespace scheduler should still be running + manager.unregisterWorker("default", "worker-1"); + clearInvocations(blockingStub); + verify(blockingStub, timeout(3000).atLeastOnce()).recordWorkerHeartbeat(any()); + + // Unregister last worker — namespace scheduler should stop + manager.unregisterWorker("default", "worker-2"); + clearInvocations(blockingStub); + Thread.sleep(2000); + verify(blockingStub, never()).recordWorkerHeartbeat(any()); + } + + @Test + public void testIntervalValidation() { + HeartbeatManager hm = new HeartbeatManager(service, "test-identity", Duration.ofSeconds(30)); + assertNotNull(hm); + hm.shutdown(); + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/PollerTrackerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/PollerTrackerTest.java new file mode 100644 index 0000000000..2cd97894f6 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/PollerTrackerTest.java @@ -0,0 +1,89 @@ +package io.temporal.internal.worker; + +import static org.junit.Assert.*; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; +import org.junit.Test; + +public class PollerTrackerTest { + + @Test + public void testPollStartedIncrementsAndReturnsCount() { + PollerTracker tracker = new PollerTracker(); + assertEquals(0, tracker.getInFlightPolls()); + assertEquals(1, tracker.pollStarted()); + assertEquals(1, tracker.getInFlightPolls()); + assertEquals(2, tracker.pollStarted()); + assertEquals(2, tracker.getInFlightPolls()); + } + + @Test + public void testPollCompletedDecrementsAndReturnsCount() { + PollerTracker tracker = new PollerTracker(); + tracker.pollStarted(); + tracker.pollStarted(); + assertEquals(1, tracker.pollCompleted()); + assertEquals(1, tracker.getInFlightPolls()); + assertEquals(0, tracker.pollCompleted()); + assertEquals(0, tracker.getInFlightPolls()); + } + + @Test + public void testPollSucceededSetsLastPollTime() { + PollerTracker tracker = new PollerTracker(); + tracker.pollSucceeded(); + assertNotNull(tracker.getLastSuccessfulPollTime()); + } + + @Test + public void testLastPollTimeInitiallyNull() { + PollerTracker tracker = new PollerTracker(); + assertNull(tracker.getLastSuccessfulPollTime()); + } + + @Test + public void testConcurrentPollTracking() throws Exception { + PollerTracker tracker = new PollerTracker(); + int threadCount = 8; + int opsPerThread = 100; + ExecutorService executor = Executors.newFixedThreadPool(threadCount); + List> futures = new ArrayList<>(); + + for (int i = 0; i < threadCount; i++) { + futures.add( + executor.submit( + () -> { + for (int j = 0; j < opsPerThread; j++) { + tracker.pollStarted(); + tracker.pollCompleted(); + } + })); + } + + for (Future f : futures) { + f.get(5, TimeUnit.SECONDS); + } + executor.shutdown(); + + assertEquals(0, tracker.getInFlightPolls()); + } + + @Test + public void testMultipleSuccessfulPolls() throws Exception { + PollerTracker tracker = new PollerTracker(); + tracker.pollSucceeded(); + Instant first = tracker.getLastSuccessfulPollTime(); + assertNotNull(first); + + // Ensure at least 1ms passes so Instant.now() can advance + Thread.sleep(2); + + tracker.pollSucceeded(); + Instant second = tracker.getLastSuccessfulPollTime(); + assertNotNull(second); + assertFalse("Last poll time should advance with each successful poll", second.isBefore(first)); + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/SlotSupplierTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/SlotSupplierTest.java index ee25255470..e4223c0b54 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/SlotSupplierTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/SlotSupplierTest.java @@ -84,7 +84,9 @@ public void supplierIsCalledAppropriately() { trackingSS, stickyQueueBalancer, metricsScope, - () -> GetSystemInfoResponse.Capabilities.newBuilder().build()); + () -> GetSystemInfoResponse.Capabilities.newBuilder().build(), + new PollerTracker(), + new PollerTracker()); PollWorkflowTaskQueueResponse pollResponse = PollWorkflowTaskQueueResponse.newBuilder() @@ -173,7 +175,8 @@ public void asyncPollerSupplierIsCalledAppropriately() throws Exception { new WorkerVersioningOptions("", false, null), trackingSS, metricsScope, - () -> GetSystemInfoResponse.Capabilities.newBuilder().build()); + () -> GetSystemInfoResponse.Capabilities.newBuilder().build(), + new PollerTracker()); SlotPermit permit = new SlotPermit(); diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/StickyQueueBacklogTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/StickyQueueBacklogTest.java index 0a080ec63f..59538ac8b2 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/StickyQueueBacklogTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/StickyQueueBacklogTest.java @@ -72,7 +72,9 @@ public void stickyQueueBacklogResetTest() { slotSupplier, stickyQueueBalancer, metricsScope, - () -> GetSystemInfoResponse.Capabilities.newBuilder().build()); + () -> GetSystemInfoResponse.Capabilities.newBuilder().build(), + new PollerTracker(), + new PollerTracker()); PollWorkflowTaskQueueResponse pollResponse = PollWorkflowTaskQueueResponse.newBuilder() diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/TrackingSlotSupplierKindTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/TrackingSlotSupplierKindTest.java new file mode 100644 index 0000000000..392bec955d --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/TrackingSlotSupplierKindTest.java @@ -0,0 +1,63 @@ +package io.temporal.internal.worker; + +import static org.junit.Assert.*; + +import com.uber.m3.tally.NoopScope; +import io.temporal.worker.tuning.*; +import org.junit.Test; + +public class TrackingSlotSupplierKindTest { + + @Test + public void testFixedSupplierKind() { + SlotSupplier supplier = new FixedSizeSlotSupplier<>(10); + TrackingSlotSupplier tracking = + new TrackingSlotSupplier<>(supplier, new NoopScope()); + assertEquals("Fixed", tracking.getSupplierKind()); + } + + @Test + public void testResourceBasedSupplierKind() { + ResourceBasedController controller = + ResourceBasedController.newSystemInfoController( + ResourceBasedControllerOptions.newBuilder(0.5, 0.5).build()); + SlotSupplier supplier = + ResourceBasedSlotSupplier.createForActivity( + controller, + ResourceBasedSlotOptions.newBuilder().setMinimumSlots(1).setMaximumSlots(10).build()); + TrackingSlotSupplier tracking = + new TrackingSlotSupplier<>(supplier, new NoopScope()); + assertEquals("ResourceBased", tracking.getSupplierKind()); + } + + @Test + public void testCustomSupplierKind() { + SlotSupplier custom = new CustomTestSlotSupplier<>(); + TrackingSlotSupplier tracking = + new TrackingSlotSupplier<>(custom, new NoopScope()); + assertEquals("CustomTestSlotSupplier", tracking.getSupplierKind()); + } + + private static class CustomTestSlotSupplier implements SlotSupplier { + @Override + public SlotSupplierFuture reserveSlot(SlotReserveContext ctx) { + return SlotSupplierFuture.completedFuture(new SlotPermit()); + } + + @Override + public java.util.Optional tryReserveSlot(SlotReserveContext ctx) { + return java.util.Optional.of(new SlotPermit()); + } + + @Override + public void markSlotUsed(SlotMarkUsedContext ctx) {} + + @Override + public void releaseSlot(SlotReleaseContext ctx) {} + + @Override + public java.util.Optional getMaximumSlots() { + return java.util.Optional.of(5); + } + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java index cbe194091a..600f273707 100644 --- a/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java +++ b/temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java @@ -70,6 +70,8 @@ public void concurrentPollRequestLockTest() throws Exception { client, "default", "task_queue", + "test-worker-instance-key", + java.util.Collections.emptyList(), "sticky_task_queue", SingleWorkerOptions.newBuilder() .setIdentity("test_identity") @@ -240,6 +242,8 @@ public void respondWorkflowTaskFailureMetricTest() throws Exception { client, "default", "task_queue", + "test-worker-instance-key", + java.util.Collections.emptyList(), "sticky_task_queue", SingleWorkerOptions.newBuilder() .setIdentity("test_identity") @@ -383,6 +387,8 @@ public boolean isAnyTypeSupported() { client, "default", "taskQueue", + "test-worker-instance-key", + java.util.Collections.emptyList(), "sticky", SingleWorkerOptions.newBuilder() .setIdentity("test_identity") @@ -400,9 +406,15 @@ public boolean isAnyTypeSupported() { slotSupplier, new AtomicBoolean(false)); + WorkflowServiceGrpc.WorkflowServiceFutureStub futureStub = + mock(WorkflowServiceGrpc.WorkflowServiceFutureStub.class); + when(futureStub.shutdownWorker(any(ShutdownWorkerRequest.class))) + .thenReturn(Futures.immediateFuture(ShutdownWorkerResponse.newBuilder().build())); + WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub = mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class); when(client.blockingStub()).thenReturn(blockingStub); + when(client.futureStub()).thenReturn(futureStub); when(blockingStub.withOption(any(), any())).thenReturn(blockingStub); PollWorkflowTaskQueueResponse pollResponse = diff --git a/temporal-sdk/src/test/java/io/temporal/worker/HeartbeatCapturingInterceptor.java b/temporal-sdk/src/test/java/io/temporal/worker/HeartbeatCapturingInterceptor.java new file mode 100644 index 0000000000..38e741d4db --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/worker/HeartbeatCapturingInterceptor.java @@ -0,0 +1,49 @@ +package io.temporal.worker; + +import io.grpc.*; +import io.temporal.api.workflowservice.v1.RecordWorkerHeartbeatRequest; +import io.temporal.api.workflowservice.v1.ShutdownWorkerRequest; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * gRPC interceptor that captures heartbeat-related RPCs for test assertions. Captures the request + * before the server responds, so tests work even if the test server returns UNIMPLEMENTED. + */ +class HeartbeatCapturingInterceptor implements ClientInterceptor { + private final List heartbeatRequests = + Collections.synchronizedList(new ArrayList<>()); + private final List shutdownRequests = + Collections.synchronizedList(new ArrayList<>()); + + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + return new ForwardingClientCall.SimpleForwardingClientCall( + next.newCall(method, callOptions)) { + @Override + public void sendMessage(ReqT message) { + if (message instanceof RecordWorkerHeartbeatRequest) { + heartbeatRequests.add((RecordWorkerHeartbeatRequest) message); + } else if (message instanceof ShutdownWorkerRequest) { + shutdownRequests.add((ShutdownWorkerRequest) message); + } + super.sendMessage(message); + } + }; + } + + List getHeartbeatRequests() { + return new ArrayList<>(heartbeatRequests); + } + + List getShutdownRequests() { + return new ArrayList<>(shutdownRequests); + } + + void clear() { + heartbeatRequests.clear(); + shutdownRequests.clear(); + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/worker/WorkerHeartbeatDeploymentVersionTest.java b/temporal-sdk/src/test/java/io/temporal/worker/WorkerHeartbeatDeploymentVersionTest.java new file mode 100644 index 0000000000..3ab4833f6b --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/worker/WorkerHeartbeatDeploymentVersionTest.java @@ -0,0 +1,74 @@ +package io.temporal.worker; + +import static org.junit.Assert.*; + +import io.temporal.api.workflowservice.v1.RecordWorkerHeartbeatRequest; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.common.WorkerDeploymentVersion; +import io.temporal.serviceclient.WorkflowServiceStubsOptions; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.junit.Assume; +import org.junit.Rule; +import org.junit.Test; + +public class WorkerHeartbeatDeploymentVersionTest { + + private static final HeartbeatCapturingInterceptor interceptor = + new HeartbeatCapturingInterceptor(); + + private static final String TEST_DEPLOYMENT_NAME = "test-deployment"; + private static final String TEST_BUILD_ID = "1.0.0"; + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowServiceStubsOptions( + WorkflowServiceStubsOptions.newBuilder() + .setGrpcClientInterceptors(Collections.singletonList(interceptor)) + .build()) + .setWorkflowClientOptions( + WorkflowClientOptions.newBuilder() + .setWorkerHeartbeatInterval(Duration.ofSeconds(1)) + .build()) + .setWorkerOptions( + WorkerOptions.newBuilder() + .setDeploymentOptions( + WorkerDeploymentOptions.newBuilder() + .setVersion( + new WorkerDeploymentVersion(TEST_DEPLOYMENT_NAME, TEST_BUILD_ID)) + .build()) + .build()) + .setDoNotStart(true) + .build(); + + @Test + public void testDeploymentVersionInHeartbeat() throws Exception { + interceptor.clear(); + testWorkflowRule.getTestEnvironment().start(); + + Thread.sleep(3000); + + List requests = interceptor.getHeartbeatRequests(); + Assume.assumeFalse( + "No heartbeats captured — test server may not support worker heartbeat capability", + requests.isEmpty()); + + io.temporal.api.worker.v1.WorkerHeartbeat hb = requests.get(0).getWorkerHeartbeat(0); + assertTrue("deployment_version should be set", hb.hasDeploymentVersion()); + assertEquals( + "deployment_version.deployment_name should match configured value", + TEST_DEPLOYMENT_NAME, + hb.getDeploymentVersion().getDeploymentName()); + assertEquals( + "deployment_version.build_id should match configured value", + TEST_BUILD_ID, + hb.getDeploymentVersion().getBuildId()); + + testWorkflowRule.getTestEnvironment().shutdown(); + testWorkflowRule.getTestEnvironment().awaitTermination(10, TimeUnit.SECONDS); + } +} diff --git a/temporal-sdk/src/test/java/io/temporal/worker/WorkerHeartbeatIntegrationTest.java b/temporal-sdk/src/test/java/io/temporal/worker/WorkerHeartbeatIntegrationTest.java new file mode 100644 index 0000000000..90d7398138 --- /dev/null +++ b/temporal-sdk/src/test/java/io/temporal/worker/WorkerHeartbeatIntegrationTest.java @@ -0,0 +1,936 @@ +package io.temporal.worker; + +import static org.junit.Assert.*; + +import io.temporal.activity.ActivityInterface; +import io.temporal.activity.ActivityMethod; +import io.temporal.activity.ActivityOptions; +import io.temporal.api.enums.v1.TaskQueueType; +import io.temporal.api.enums.v1.WorkerStatus; +import io.temporal.api.worker.v1.WorkerHeartbeat; +import io.temporal.api.workflowservice.v1.*; +import io.temporal.client.WorkflowClient; +import io.temporal.client.WorkflowClientOptions; +import io.temporal.client.WorkflowOptions; +import io.temporal.serviceclient.WorkflowServiceStubsOptions; +import io.temporal.testing.internal.SDKTestWorkflowRule; +import io.temporal.worker.tuning.ResourceBasedControllerOptions; +import io.temporal.worker.tuning.ResourceBasedTuner; +import io.temporal.workflow.Workflow; +import io.temporal.workflow.WorkflowInterface; +import io.temporal.workflow.WorkflowMethod; +import java.time.Duration; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.junit.Rule; +import org.junit.Test; + +public class WorkerHeartbeatIntegrationTest { + + private static final HeartbeatCapturingInterceptor interceptor = + new HeartbeatCapturingInterceptor(); + + // Shared latches for blocking activity tests + static final CountDownLatch blockingActivityStarted = new CountDownLatch(1); + static final CountDownLatch blockingActivityRelease = new CountDownLatch(1); + + // Separate latches for sticky cache miss test + static final CountDownLatch cacheTestActivityStarted = new CountDownLatch(1); + static final CountDownLatch cacheTestActivityRelease = new CountDownLatch(1); + + @Rule + public SDKTestWorkflowRule testWorkflowRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowServiceStubsOptions( + WorkflowServiceStubsOptions.newBuilder() + .setGrpcClientInterceptors(Collections.singletonList(interceptor)) + .build()) + .setWorkflowClientOptions( + WorkflowClientOptions.newBuilder() + .setWorkerHeartbeatInterval(Duration.ofSeconds(1)) + .build()) + .setActivityImplementations( + new TestActivityImpl(), + new FailingActivityImpl(), + new BlockingActivityImpl(), + new CacheTestActivityImpl()) + .setWorkflowTypes( + TestWorkflowImpl.class, + FailingWorkflowImpl.class, + BlockingWorkflowImpl.class, + CacheTestWorkflowImpl.class) + .setDoNotStart(true) + .build(); + + /** + * Combined test for basic heartbeat fields that only require starting the environment (no + * workflow execution needed). Covers: RPC fields, host info, timestamps, plugins, and + * elapsed_since_last_heartbeat. + */ + @Test + public void testBasicHeartbeatFields() throws Exception { + interceptor.clear(); + testWorkflowRule.getTestEnvironment().start(); + + // Wait for at least 2 heartbeat ticks (interval is 1s) + Thread.sleep(3000); + + List requests = interceptor.getHeartbeatRequests(); + assertFalse("no heartbeats captured", requests.isEmpty()); + assertTrue("need >= 2 heartbeats", requests.size() >= 2); + String workerInstanceKey = requests.get(0).getWorkerHeartbeat(0).getWorkerInstanceKey(); + + // --- RPC fields via DescribeWorker round-trip --- + WorkerHeartbeat hb = describeWorker(workerInstanceKey); + assertNotNull("DescribeWorker should return stored heartbeat", hb); + assertEquals("temporal-java", hb.getSdkName()); + assertFalse("sdk version should be set", hb.getSdkVersion().isEmpty()); + assertFalse("task queue should be set", hb.getTaskQueue().isEmpty()); + assertEquals(workerInstanceKey, hb.getWorkerInstanceKey()); + assertEquals(WorkerStatus.WORKER_STATUS_RUNNING, hb.getStatus()); + assertTrue("start time should be set", hb.hasStartTime()); + assertTrue("heartbeat time should be set", hb.hasHeartbeatTime()); + assertTrue("host info should be set", hb.hasHostInfo()); + assertFalse("host name should be set", hb.getHostInfo().getHostName().isEmpty()); + assertFalse("process id should be set", hb.getHostInfo().getProcessId().isEmpty()); + + // --- Host info details --- + assertFalse( + "host_info.worker_grouping_key should not be empty", + hb.getHostInfo().getWorkerGroupingKey().isEmpty()); + assertTrue( + "host_info.current_host_cpu_usage should be >= 0", + hb.getHostInfo().getCurrentHostCpuUsage() >= 0.0f); + assertTrue( + "host_info.current_host_mem_usage should be >= 0", + hb.getHostInfo().getCurrentHostMemUsage() >= 0.0f); + + // --- Timestamps --- + io.temporal.api.worker.v1.WorkerHeartbeat rawHb = requests.get(0).getWorkerHeartbeat(0); + assertTrue("start_time should be set", rawHb.hasStartTime()); + long startTimeSec = rawHb.getStartTime().getSeconds(); + long nowSec = java.time.Instant.now().getEpochSecond(); + assertTrue("start_time should be within 30 seconds of now", nowSec - startTimeSec <= 30); + assertTrue("heartbeat_time should be set", rawHb.hasHeartbeatTime()); + long heartbeatTimeSec = rawHb.getHeartbeatTime().getSeconds(); + assertTrue( + "heartbeat_time should be within 30 seconds of now", nowSec - heartbeatTimeSec <= 30); + assertTrue("heartbeat_time should be >= start_time", heartbeatTimeSec >= startTimeSec); + + // --- Plugins --- + assertEquals( + "plugins list should be empty when no plugins are configured", 0, rawHb.getPluginsCount()); + + // --- Elapsed since last heartbeat --- + + // First heartbeat should not have elapsed_since_last_heartbeat + WorkerHeartbeat first = requests.get(0).getWorkerHeartbeat(0); + assertFalse( + "first heartbeat should not have elapsed_since_last_heartbeat", + first.hasElapsedSinceLastHeartbeat()); + + // Subsequent heartbeats should have a non-zero elapsed duration + boolean foundNonZeroElapsed = + requests.stream() + .skip(1) + .flatMap(req -> req.getWorkerHeartbeatList().stream()) + .filter(WorkerHeartbeat::hasElapsedSinceLastHeartbeat) + .anyMatch( + h -> { + com.google.protobuf.Duration d = h.getElapsedSinceLastHeartbeat(); + return d.getSeconds() > 0 || d.getNanos() > 0; + }); + assertTrue( + "subsequent heartbeats should have non-zero elapsed_since_last_heartbeat", + foundNonZeroElapsed); + } + + @Test + public void testShutdownHeartbeatStatus() throws Exception { + interceptor.clear(); + testWorkflowRule.getTestEnvironment().start(); + + // Create a separate factory so we can shut it down within the test + String taskQueue = testWorkflowRule.getTaskQueue() + "-shutdown"; + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + WorkerFactory factory = + WorkerFactory.newInstance(client, testWorkflowRule.getWorkerFactoryOptions()); + Worker worker = factory.newWorker(taskQueue); + worker.registerWorkflowImplementationTypes(TestWorkflowImpl.class); + worker.registerActivitiesImplementations(new TestActivityImpl()); + factory.start(); + + // Wait for at least one heartbeat to confirm the worker is running + Thread.sleep(2000); + + // Graceful shutdown triggers ShutdownWorkerRequest with SHUTTING_DOWN status + factory.shutdown(); + factory.awaitTermination(10, TimeUnit.SECONDS); + + List shutdownRequests = interceptor.getShutdownRequests(); + assertFalse("no shutdown requests captured", shutdownRequests.isEmpty()); + + // Find the shutdown request for our task queue + ShutdownWorkerRequest shutdownReq = + shutdownRequests.stream() + .filter(req -> req.getTaskQueue().equals(taskQueue)) + .findFirst() + .orElse(null); + assertNotNull("should find ShutdownWorkerRequest for task queue: " + taskQueue, shutdownReq); + assertTrue( + "ShutdownWorkerRequest should include a worker_heartbeat", + shutdownReq.hasWorkerHeartbeat()); + assertEquals( + "shutdown heartbeat status should be WORKER_STATUS_SHUTTING_DOWN", + WorkerStatus.WORKER_STATUS_SHUTTING_DOWN, + shutdownReq.getWorkerHeartbeat().getStatus()); + assertFalse( + "shutdown heartbeat task_queue should be set", + shutdownReq.getWorkerHeartbeat().getTaskQueue().isEmpty()); + + // No Nexus services registered, so NEXUS should not be in task queue types + assertTrue( + "task_queue_types should include WORKFLOW", + shutdownReq.getTaskQueueTypesList().contains(TaskQueueType.TASK_QUEUE_TYPE_WORKFLOW)); + assertTrue( + "task_queue_types should include ACTIVITY", + shutdownReq.getTaskQueueTypesList().contains(TaskQueueType.TASK_QUEUE_TYPE_ACTIVITY)); + assertFalse( + "task_queue_types should NOT include NEXUS when no Nexus services are registered", + shutdownReq.getTaskQueueTypesList().contains(TaskQueueType.TASK_QUEUE_TYPE_NEXUS)); + } + + /** + * Combined test for heartbeat fields that require workflow execution. Covers: slot info, task + * counters, and poller info. + */ + @Test + public void testHeartbeatAfterWorkflowExecution() throws Exception { + interceptor.clear(); + testWorkflowRule.getTestEnvironment().start(); + + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + TestWorkflow wf = + client.newWorkflowStub( + TestWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setWorkflowExecutionTimeout(Duration.ofSeconds(30)) + .build()); + assertEquals("done", wf.execute("test")); + + Thread.sleep(2000); + + List requests = interceptor.getHeartbeatRequests(); + assertFalse("no heartbeats captured", requests.isEmpty()); + String workerInstanceKey = requests.get(0).getWorkerHeartbeat(0).getWorkerInstanceKey(); + + // --- Slot info via DescribeWorker --- + WorkerHeartbeat describedHb = describeWorker(workerInstanceKey); + assertNotNull("DescribeWorker should return stored heartbeat", describedHb); + + assertTrue("workflow_task_slots_info should be set", describedHb.hasWorkflowTaskSlotsInfo()); + assertTrue("activity_task_slots_info should be set", describedHb.hasActivityTaskSlotsInfo()); + assertTrue("local_activity_slots_info should be set", describedHb.hasLocalActivitySlotsInfo()); + assertTrue("nexus_task_slots_info should be set", describedHb.hasNexusTaskSlotsInfo()); + + assertFalse( + "workflow slot supplier kind should be set", + describedHb.getWorkflowTaskSlotsInfo().getSlotSupplierKind().isEmpty()); + assertFalse( + "activity slot supplier kind should be set", + describedHb.getActivityTaskSlotsInfo().getSlotSupplierKind().isEmpty()); + + assertEquals( + "workflow used slots should be 0 after completion", + 0, + describedHb.getWorkflowTaskSlotsInfo().getCurrentUsedSlots()); + assertEquals( + "activity used slots should be 0 after completion", + 0, + describedHb.getActivityTaskSlotsInfo().getCurrentUsedSlots()); + + assertTrue( + "workflow available slots should be > 0", + describedHb.getWorkflowTaskSlotsInfo().getCurrentAvailableSlots() > 0); + assertTrue( + "activity available slots should be > 0", + describedHb.getActivityTaskSlotsInfo().getCurrentAvailableSlots() > 0); + + // --- Task counters --- + boolean foundWorkflowProcessed = + requests.stream() + .flatMap(req -> req.getWorkerHeartbeatList().stream()) + .anyMatch( + h -> + h.hasWorkflowTaskSlotsInfo() + && h.getWorkflowTaskSlotsInfo().getTotalProcessedTasks() >= 1); + assertTrue( + "workflow_task_slots_info.total_processed_tasks should be >= 1", foundWorkflowProcessed); + + boolean foundActivityProcessed = + requests.stream() + .flatMap(req -> req.getWorkerHeartbeatList().stream()) + .anyMatch( + h -> + h.hasActivityTaskSlotsInfo() + && h.getActivityTaskSlotsInfo().getTotalProcessedTasks() >= 1); + assertTrue( + "activity_task_slots_info.total_processed_tasks should be >= 1", foundActivityProcessed); + + // --- Poller info --- + boolean foundWorkflowPollers = + requests.stream() + .flatMap(req -> req.getWorkerHeartbeatList().stream()) + .anyMatch( + h -> + h.hasWorkflowPollerInfo() && h.getWorkflowPollerInfo().getCurrentPollers() > 0); + assertTrue("workflow_poller_info should have current_pollers > 0", foundWorkflowPollers); + + boolean foundActivityPollers = + requests.stream() + .flatMap(req -> req.getWorkerHeartbeatList().stream()) + .anyMatch( + h -> + h.hasActivityPollerInfo() && h.getActivityPollerInfo().getCurrentPollers() > 0); + assertTrue("activity_poller_info should have current_pollers > 0", foundActivityPollers); + + boolean foundNexusPollerInfo = + requests.stream() + .flatMap(req -> req.getWorkerHeartbeatList().stream()) + .anyMatch(WorkerHeartbeat::hasNexusPollerInfo); + assertTrue("nexus_poller_info should be set", foundNexusPollerInfo); + + boolean foundWorkflowPollTime = + requests.stream() + .flatMap(req -> req.getWorkerHeartbeatList().stream()) + .anyMatch( + h -> + h.hasWorkflowPollerInfo() + && h.getWorkflowPollerInfo().hasLastSuccessfulPollTime()); + assertTrue( + "workflow_poller_info should have last_successful_poll_time set", foundWorkflowPollTime); + + boolean foundActivityPollTime = + requests.stream() + .flatMap(req -> req.getWorkerHeartbeatList().stream()) + .anyMatch( + h -> + h.hasActivityPollerInfo() + && h.getActivityPollerInfo().hasLastSuccessfulPollTime()); + assertTrue( + "activity_poller_info should have last_successful_poll_time set", foundActivityPollTime); + + boolean foundWorkflowAutoscaling = + requests.stream() + .flatMap(req -> req.getWorkerHeartbeatList().stream()) + .anyMatch( + h -> h.hasWorkflowPollerInfo() && h.getWorkflowPollerInfo().getIsAutoscaling()); + assertFalse( + "workflow_poller_info.is_autoscaling should be false with default pollers", + foundWorkflowAutoscaling); + + boolean foundActivityAutoscaling = + requests.stream() + .flatMap(req -> req.getWorkerHeartbeatList().stream()) + .anyMatch( + h -> h.hasActivityPollerInfo() && h.getActivityPollerInfo().getIsAutoscaling()); + assertFalse( + "activity_poller_info.is_autoscaling should be false with default pollers", + foundActivityAutoscaling); + } + + @Test + public void testFailureMetricsInHeartbeat() throws Exception { + interceptor.clear(); + testWorkflowRule.getTestEnvironment().start(); + + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + FailingWorkflow wf = + client.newWorkflowStub( + FailingWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setWorkflowExecutionTimeout(Duration.ofSeconds(30)) + .build()); + try { + wf.execute(); + } catch (Exception e) { + // Expected: the activity fails and the workflow fails + } + + Thread.sleep(2000); + + List requests = interceptor.getHeartbeatRequests(); + assertFalse("no heartbeats captured", requests.isEmpty()); + + // ApplicationFailure is handled within the activity handler and returned as a result, + // so it counts as a processed task, not a failed task. "Failed tasks" tracks + // infrastructure-level failures where the task handler itself threw an exception. + boolean foundActivityProcessed = + requests.stream() + .flatMap(req -> req.getWorkerHeartbeatList().stream()) + .anyMatch( + hb -> + hb.hasActivityTaskSlotsInfo() + && hb.getActivityTaskSlotsInfo().getTotalProcessedTasks() >= 1); + assertTrue( + "activity_task_slots_info.total_processed_tasks should be >= 1 after activity execution", + foundActivityProcessed); + } + + @Test + public void testWorkflowTaskProcessedCounts() throws Exception { + interceptor.clear(); + testWorkflowRule.getTestEnvironment().start(); + + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + for (int i = 0; i < 3; i++) { + TestWorkflow wf = + client.newWorkflowStub( + TestWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setWorkflowExecutionTimeout(Duration.ofSeconds(30)) + .build()); + assertEquals("done", wf.execute("test" + i)); + } + + Thread.sleep(2000); + + List requests = interceptor.getHeartbeatRequests(); + assertFalse("no heartbeats captured", requests.isEmpty()); + + long maxProcessed = + requests.stream() + .flatMap(req -> req.getWorkerHeartbeatList().stream()) + .filter(io.temporal.api.worker.v1.WorkerHeartbeat::hasWorkflowTaskSlotsInfo) + .mapToLong(hb -> hb.getWorkflowTaskSlotsInfo().getTotalProcessedTasks()) + .max() + .orElse(0); + assertTrue( + "workflow_task_slots_info.total_processed_tasks should be >= 3 after 3 workflows", + maxProcessed >= 3); + } + + /** Verifies activity slots are occupied while an activity is running, then released after. */ + @Test + public void testActivityInFlightSlotTracking() throws Exception { + interceptor.clear(); + testWorkflowRule.getTestEnvironment().start(); + + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + BlockingWorkflow wf = + client.newWorkflowStub( + BlockingWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setWorkflowExecutionTimeout(Duration.ofSeconds(30)) + .build()); + + // Start workflow async — the activity will block until we release it + CompletableFuture wfFuture = WorkflowClient.execute(wf::execute); + + // Wait for the blocking activity to start + assertTrue( + "blocking activity should have started", + blockingActivityStarted.await(10, TimeUnit.SECONDS)); + + // Wait for a heartbeat to capture the in-flight state + Thread.sleep(2000); + + List requests = interceptor.getHeartbeatRequests(); + assertFalse("no heartbeats captured", requests.isEmpty()); + + // While the activity is running, at least one heartbeat should show used slots >= 1 + boolean foundUsedSlot = + requests.stream() + .flatMap(req -> req.getWorkerHeartbeatList().stream()) + .anyMatch( + hb -> + hb.hasActivityTaskSlotsInfo() + && hb.getActivityTaskSlotsInfo().getCurrentUsedSlots() >= 1); + assertTrue( + "activity_task_slots_info.current_used_slots should be >= 1 while activity is running", + foundUsedSlot); + + // Release the activity + blockingActivityRelease.countDown(); + wfFuture.get(10, TimeUnit.SECONDS); + + // Wait for a heartbeat after completion + Thread.sleep(2000); + + requests = interceptor.getHeartbeatRequests(); + // Get the last heartbeat + WorkerHeartbeat lastHb = requests.get(requests.size() - 1).getWorkerHeartbeat(0); + assertEquals( + "activity used slots should be 0 after activity completes", + 0, + lastHb.getActivityTaskSlotsInfo().getCurrentUsedSlots()); + } + + /** Verifies sticky cache counters are reported in heartbeat. */ + @Test + public void testStickyCacheCountersInHeartbeat() throws Exception { + interceptor.clear(); + testWorkflowRule.getTestEnvironment().start(); + + // Run a workflow to generate at least one sticky cache hit or populate the cache + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + TestWorkflow wf = + client.newWorkflowStub( + TestWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setWorkflowExecutionTimeout(Duration.ofSeconds(30)) + .build()); + assertEquals("done", wf.execute("test")); + + Thread.sleep(2000); + + List requests = interceptor.getHeartbeatRequests(); + assertFalse("no heartbeats captured", requests.isEmpty()); + String workerInstanceKey = requests.get(0).getWorkerHeartbeat(0).getWorkerInstanceKey(); + + WorkerHeartbeat hb = describeWorker(workerInstanceKey); + assertNotNull("DescribeWorker should return stored heartbeat", hb); + + // Sticky cache fields should be present (values may be 0 if no cache hits yet) + // The key thing is the fields are populated — exact values depend on timing + assertTrue( + "total_sticky_cache_hit + total_sticky_cache_miss + current_sticky_cache_size should be >= 0", + hb.getTotalStickyCacheHit() >= 0 + && hb.getTotalStickyCacheMiss() >= 0 + && hb.getCurrentStickyCacheSize() >= 0); + } + + /** + * Verifies sticky cache misses are tracked in heartbeat. Starts a workflow with a blocking + * activity, purges the cache while the activity runs, then completes. The workflow task on resume + * triggers a cache miss. Matches Go's TestWorkerHeartbeatStickyCacheMiss and Rust's + * worker_heartbeat_sticky_cache_miss. + */ + @Test + public void testStickyCacheMissInHeartbeat() throws Exception { + interceptor.clear(); + testWorkflowRule.getTestEnvironment().start(); + + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + CacheTestWorkflow wf = + client.newWorkflowStub( + CacheTestWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setWorkflowExecutionTimeout(Duration.ofSeconds(30)) + .build()); + + // Start workflow async — the activity will block until we release it + CompletableFuture wfFuture = WorkflowClient.execute(wf::execute); + + // Wait for the blocking activity to start + assertTrue( + "cache test activity should have started", + cacheTestActivityStarted.await(10, TimeUnit.SECONDS)); + + // Purge the sticky cache so the workflow's next WFT triggers a cache miss + testWorkflowRule.invalidateWorkflowCache(); + + // Release the activity — workflow resumes on non-sticky queue + cacheTestActivityRelease.countDown(); + assertEquals("done", wfFuture.get(10, TimeUnit.SECONDS)); + + // Wait for heartbeat to capture the sticky cache miss + Thread.sleep(2000); + + List requests = interceptor.getHeartbeatRequests(); + assertFalse("no heartbeats captured", requests.isEmpty()); + + boolean foundCacheMiss = + requests.stream() + .flatMap(req -> req.getWorkerHeartbeatList().stream()) + .anyMatch(hb -> hb.getTotalStickyCacheMiss() >= 1); + assertTrue("should have at least 1 sticky cache miss after cache purge", foundCacheMiss); + } + + /** + * Verifies that interval counters (last_interval_processed_tasks) reset between heartbeat + * intervals. + */ + @Test + public void testIntervalCounterReset() throws Exception { + interceptor.clear(); + testWorkflowRule.getTestEnvironment().start(); + + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + + // Run a workflow to generate processed tasks + TestWorkflow wf = + client.newWorkflowStub( + TestWorkflow.class, + WorkflowOptions.newBuilder() + .setTaskQueue(testWorkflowRule.getTaskQueue()) + .setWorkflowExecutionTimeout(Duration.ofSeconds(30)) + .build()); + assertEquals("done", wf.execute("test")); + + // Wait for heartbeats to capture the processed tasks AND a subsequent heartbeat with no new + // work + Thread.sleep(4000); + + List requests = interceptor.getHeartbeatRequests(); + assertTrue("need >= 3 heartbeats", requests.size() >= 3); + + // Find a heartbeat with interval processed > 0 + boolean foundNonZeroInterval = + requests.stream() + .flatMap(req -> req.getWorkerHeartbeatList().stream()) + .anyMatch( + hb -> + hb.hasWorkflowTaskSlotsInfo() + && hb.getWorkflowTaskSlotsInfo().getLastIntervalProcessedTasks() > 0); + assertTrue( + "should find a heartbeat with last_interval_processed_tasks > 0", foundNonZeroInterval); + + // The last heartbeat (after no new work) should have interval processed = 0 + WorkerHeartbeat lastHb = requests.get(requests.size() - 1).getWorkerHeartbeat(0); + if (lastHb.hasWorkflowTaskSlotsInfo()) { + assertEquals( + "last_interval_processed_tasks should reset to 0 when no new work occurs", + 0, + lastHb.getWorkflowTaskSlotsInfo().getLastIntervalProcessedTasks()); + } + } + + /** + * Tests that two workers on different task queues produce distinct instance keys but share the + * same worker_grouping_key. Matches Go's TestWorkerHeartbeatMultipleWorkers. + */ + @Test + public void testMultipleWorkersHaveDistinctInstanceKeys() throws Exception { + interceptor.clear(); + testWorkflowRule.getTestEnvironment().start(); + + String taskQueue1 = testWorkflowRule.getTaskQueue() + "-multi1"; + String taskQueue2 = testWorkflowRule.getTaskQueue() + "-multi2"; + + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + WorkerFactory factory = + WorkerFactory.newInstance(client, testWorkflowRule.getWorkerFactoryOptions()); + + Worker worker1 = factory.newWorker(taskQueue1); + worker1.registerWorkflowImplementationTypes(TestWorkflowImpl.class); + worker1.registerActivitiesImplementations(new TestActivityImpl()); + + Worker worker2 = factory.newWorker(taskQueue2); + worker2.registerWorkflowImplementationTypes(TestWorkflowImpl.class); + worker2.registerActivitiesImplementations(new TestActivityImpl()); + + factory.start(); + + Thread.sleep(3000); + + List requests = interceptor.getHeartbeatRequests(); + assertFalse("no heartbeats captured", requests.isEmpty()); + + // Workers may be in separate requests (different HeartbeatManagers), so search across all + WorkerHeartbeat hb1 = + requests.stream() + .flatMap(req -> req.getWorkerHeartbeatList().stream()) + .filter(hb -> hb.getTaskQueue().equals(taskQueue1)) + .findFirst() + .orElse(null); + WorkerHeartbeat hb2 = + requests.stream() + .flatMap(req -> req.getWorkerHeartbeatList().stream()) + .filter(hb -> hb.getTaskQueue().equals(taskQueue2)) + .findFirst() + .orElse(null); + + assertNotNull("should find heartbeat for task queue 1", hb1); + assertNotNull("should find heartbeat for task queue 2", hb2); + + assertFalse("worker 1 instance key should not be empty", hb1.getWorkerInstanceKey().isEmpty()); + assertFalse("worker 2 instance key should not be empty", hb2.getWorkerInstanceKey().isEmpty()); + assertNotEquals( + "workers should have distinct instance keys", + hb1.getWorkerInstanceKey(), + hb2.getWorkerInstanceKey()); + + assertTrue("worker 1 should have host info", hb1.hasHostInfo()); + assertTrue("worker 2 should have host info", hb2.hasHostInfo()); + assertEquals( + "workers should share the same worker_grouping_key", + hb1.getHostInfo().getWorkerGroupingKey(), + hb2.getHostInfo().getWorkerGroupingKey()); + + // Verify both are stored server-side + WorkerHeartbeat described1 = describeWorker(hb1.getWorkerInstanceKey()); + WorkerHeartbeat described2 = describeWorker(hb2.getWorkerInstanceKey()); + assertNotNull("worker 1 should be stored server-side", described1); + assertNotNull("worker 2 should be stored server-side", described2); + assertEquals(taskQueue1, described1.getTaskQueue()); + assertEquals(taskQueue2, described2.getTaskQueue()); + + factory.shutdown(); + factory.awaitTermination(10, TimeUnit.SECONDS); + } + + /** + * Tests that resource-based tuner reports SlotSupplierKind as "ResourceBased". Matches Go's + * TestWorkerHeartbeatResourceBasedTuner. + */ + @Test + public void testResourceBasedSlotSupplierKind() throws Exception { + interceptor.clear(); + testWorkflowRule.getTestEnvironment().start(); + + WorkflowClient client = testWorkflowRule.getWorkflowClient(); + WorkerFactory factory = + WorkerFactory.newInstance(client, testWorkflowRule.getWorkerFactoryOptions()); + + Worker worker = + factory.newWorker( + testWorkflowRule.getTaskQueue() + "-resource", + WorkerOptions.newBuilder() + .setWorkerTuner( + ResourceBasedTuner.newBuilder() + .setControllerOptions( + ResourceBasedControllerOptions.newBuilder(0.7, 0.7).build()) + .build()) + .build()); + worker.registerWorkflowImplementationTypes(TestWorkflowImpl.class); + worker.registerActivitiesImplementations(new TestActivityImpl()); + + factory.start(); + + Thread.sleep(3000); + + List requests = interceptor.getHeartbeatRequests(); + assertFalse("no heartbeats captured", requests.isEmpty()); + + // Find the heartbeat for the resource-based worker + WorkerHeartbeat hb = + requests.stream() + .flatMap(req -> req.getWorkerHeartbeatList().stream()) + .filter(h -> h.getTaskQueue().equals(testWorkflowRule.getTaskQueue() + "-resource")) + .findFirst() + .orElse(null); + assertNotNull("should find heartbeat for resource-based worker", hb); + + assertEquals( + "workflow slot supplier kind should be ResourceBased", + "ResourceBased", + hb.getWorkflowTaskSlotsInfo().getSlotSupplierKind()); + assertEquals( + "activity slot supplier kind should be ResourceBased", + "ResourceBased", + hb.getActivityTaskSlotsInfo().getSlotSupplierKind()); + assertEquals( + "local activity slot supplier kind should be ResourceBased", + "ResourceBased", + hb.getLocalActivitySlotsInfo().getSlotSupplierKind()); + + factory.shutdown(); + factory.awaitTermination(10, TimeUnit.SECONDS); + } + + /** + * Tests that no heartbeats are sent when heartbeat interval is not configured. Matches Go's + * TestWorkerHeartbeatDisabled and Rust's worker_heartbeat_no_runtime_heartbeat. + */ + @Test + public void testNoHeartbeatsSentWhenDisabled() throws Exception { + HeartbeatCapturingInterceptor localInterceptor = new HeartbeatCapturingInterceptor(); + + SDKTestWorkflowRule noHeartbeatRule = + SDKTestWorkflowRule.newBuilder() + .setWorkflowServiceStubsOptions( + WorkflowServiceStubsOptions.newBuilder() + .setGrpcClientInterceptors(Collections.singletonList(localInterceptor)) + .build()) + // No workerHeartbeatInterval — heartbeats should be disabled + .setDoNotStart(true) + .build(); + + try { + localInterceptor.clear(); + noHeartbeatRule.getTestEnvironment().start(); + + Thread.sleep(5000); + + assertTrue( + "no heartbeats should be sent when heartbeat interval is not configured", + localInterceptor.getHeartbeatRequests().isEmpty()); + } finally { + noHeartbeatRule.getTestEnvironment().shutdown(); + noHeartbeatRule.getTestEnvironment().awaitTermination(10, TimeUnit.SECONDS); + } + } + + /** + * Queries the test server for the stored heartbeat of a given worker via the DescribeWorker RPC. + */ + private WorkerHeartbeat describeWorker(String workerInstanceKey) { + try { + DescribeWorkerResponse resp = + testWorkflowRule + .getWorkflowClient() + .getWorkflowServiceStubs() + .blockingStub() + .describeWorker( + DescribeWorkerRequest.newBuilder() + .setNamespace( + testWorkflowRule.getWorkflowClient().getOptions().getNamespace()) + .setWorkerInstanceKey(workerInstanceKey) + .build()); + return resp.getWorkerInfo().getWorkerHeartbeat(); + } catch (io.grpc.StatusRuntimeException e) { + if (e.getStatus().getCode() == io.grpc.Status.Code.NOT_FOUND) { + return null; + } + throw e; + } + } + + // --- Workflow and Activity types --- + + @WorkflowInterface + public interface TestWorkflow { + @WorkflowMethod + String execute(String input); + } + + public static class TestWorkflowImpl implements TestWorkflow { + @Override + public String execute(String input) { + TestActivity activity = + Workflow.newActivityStub( + TestActivity.class, + ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(10)).build()); + return activity.doWork(input); + } + } + + @ActivityInterface + public interface TestActivity { + @ActivityMethod + String doWork(String input); + } + + public static class TestActivityImpl implements TestActivity { + @Override + public String doWork(String input) { + return "done"; + } + } + + @WorkflowInterface + public interface FailingWorkflow { + @WorkflowMethod + void execute(); + } + + public static class FailingWorkflowImpl implements FailingWorkflow { + @Override + public void execute() { + FailingActivity activity = + Workflow.newActivityStub( + FailingActivity.class, + ActivityOptions.newBuilder() + .setStartToCloseTimeout(Duration.ofSeconds(10)) + .setRetryOptions( + io.temporal.common.RetryOptions.newBuilder().setMaximumAttempts(1).build()) + .build()); + activity.fail(); + } + } + + @ActivityInterface + public interface FailingActivity { + @ActivityMethod + void fail(); + } + + public static class FailingActivityImpl implements FailingActivity { + @Override + public void fail() { + throw io.temporal.failure.ApplicationFailure.newFailure( + "intentional failure for test", "TestFailure"); + } + } + + @WorkflowInterface + public interface BlockingWorkflow { + @WorkflowMethod + void execute(); + } + + public static class BlockingWorkflowImpl implements BlockingWorkflow { + @Override + public void execute() { + BlockingActivity activity = + Workflow.newActivityStub( + BlockingActivity.class, + ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(30)).build()); + activity.block(); + } + } + + @ActivityInterface + public interface BlockingActivity { + @ActivityMethod + void block(); + } + + public static class BlockingActivityImpl implements BlockingActivity { + @Override + public void block() { + blockingActivityStarted.countDown(); + try { + blockingActivityRelease.await(20, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + + @WorkflowInterface + public interface CacheTestWorkflow { + @WorkflowMethod + String execute(); + } + + public static class CacheTestWorkflowImpl implements CacheTestWorkflow { + @Override + public String execute() { + CacheTestActivity activity = + Workflow.newActivityStub( + CacheTestActivity.class, + ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(30)).build()); + return activity.doCacheWork(); + } + } + + @ActivityInterface + public interface CacheTestActivity { + @ActivityMethod + String doCacheWork(); + } + + public static class CacheTestActivityImpl implements CacheTestActivity { + @Override + public String doCacheWork() { + cacheTestActivityStarted.countDown(); + try { + cacheTestActivityRelease.await(20, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + return "done"; + } + } +} diff --git a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java index e73e6e7a80..d2e3f3c203 100644 --- a/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java +++ b/temporal-test-server/src/main/java/io/temporal/internal/testservice/TestWorkflowService.java @@ -87,6 +87,10 @@ public final class TestWorkflowService extends WorkflowServiceGrpc.WorkflowServi private final TestNexusEndpointStore nexusEndpointStore; private final SelfAdvancingTimer selfAdvancingTimer; + // Worker heartbeat storage: workerInstanceKey -> latest WorkerHeartbeat + private final ConcurrentHashMap + workerHeartbeats = new ConcurrentHashMap<>(); + private final ScheduledExecutorService backgroundScheduler = Executors.newSingleThreadScheduledExecutor(); @@ -733,6 +737,46 @@ public void getSystemInfo( responseObserver.onCompleted(); } + @Override + public void recordWorkerHeartbeat( + RecordWorkerHeartbeatRequest request, + StreamObserver responseObserver) { + for (io.temporal.api.worker.v1.WorkerHeartbeat hb : request.getWorkerHeartbeatList()) { + if (!hb.getWorkerInstanceKey().isEmpty()) { + workerHeartbeats.put(hb.getWorkerInstanceKey(), hb); + } + } + responseObserver.onNext(RecordWorkerHeartbeatResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + + @Override + public void describeWorker( + DescribeWorkerRequest request, StreamObserver responseObserver) { + io.temporal.api.worker.v1.WorkerHeartbeat hb = + workerHeartbeats.get(request.getWorkerInstanceKey()); + if (hb == null) { + responseObserver.onError( + Status.NOT_FOUND + .withDescription("Worker not found: " + request.getWorkerInstanceKey()) + .asRuntimeException()); + return; + } + responseObserver.onNext( + DescribeWorkerResponse.newBuilder() + .setWorkerInfo( + io.temporal.api.worker.v1.WorkerInfo.newBuilder().setWorkerHeartbeat(hb).build()) + .build()); + responseObserver.onCompleted(); + } + + @Override + public void shutdownWorker( + ShutdownWorkerRequest request, StreamObserver responseObserver) { + responseObserver.onNext(ShutdownWorkerResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + private Context.CancellableContext deadlineCtx(Deadline deadline) { return Context.current().withDeadline(deadline, this.backgroundScheduler); } @@ -1902,7 +1946,8 @@ public void describeNamespace( NamespaceInfo.Capabilities.newBuilder() .setEagerWorkflowStart(true) .setAsyncUpdate(true) - .setSyncUpdate(true)) + .setSyncUpdate(true) + .setWorkerHeartbeats(true)) .build()) .build(); responseObserver.onNext(result);