Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,5 @@ src/main/idls/*
.settings
.vscode/
*/bin
/.claude
/.claude
**/.factorypath
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public ActivityPollTask(
@Nonnull String namespace,
@Nonnull String taskQueue,
@Nonnull String identity,
@Nonnull String workerInstanceKey,
@Nonnull WorkerVersioningOptions versioningOptions,
double activitiesPerSecond,
@Nonnull TrackingSlotSupplier<ActivitySlotInfo> slotSupplier,
Expand All @@ -52,6 +53,7 @@ public ActivityPollTask(
.setNamespace(namespace)
.setIdentity(identity)
.setTaskQueue(TaskQueue.newBuilder().setName(taskQueue));
pollRequest.setWorkerInstanceKey(workerInstanceKey);
if (activitiesPerSecond > 0) {
pollRequest.setTaskQueueMetadata(
TaskQueueMetadata.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -49,7 +48,7 @@ final class ActivityWorker implements SuspendableWorker {
private final GrpcRetryer grpcRetryer;
private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions;
private final TrackingSlotSupplier<ActivitySlotInfo> slotSupplier;
private final AtomicBoolean serverSupportsAutoscaling;
private final NamespaceCapabilities namespaceCapabilities;

public ActivityWorker(
@Nonnull WorkflowServiceStubs service,
Expand All @@ -59,7 +58,7 @@ public ActivityWorker(
@Nonnull SingleWorkerOptions options,
@Nonnull ActivityTaskHandler handler,
@Nonnull SlotSupplier<ActivitySlotInfo> slotSupplier,
@Nonnull AtomicBoolean serverSupportsAutoscaling) {
@Nonnull NamespaceCapabilities namespaceCapabilities) {
this.service = Objects.requireNonNull(service);
this.namespace = Objects.requireNonNull(namespace);
this.taskQueue = Objects.requireNonNull(taskQueue);
Expand All @@ -75,7 +74,7 @@ public ActivityWorker(
DefaultStubServiceOperationRpcRetryOptions.INSTANCE, null);

this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope);
this.serverSupportsAutoscaling = serverSupportsAutoscaling;
this.namespaceCapabilities = namespaceCapabilities;
}

@Override
Expand Down Expand Up @@ -103,14 +102,15 @@ public boolean start() {
namespace,
taskQueue,
options.getIdentity(),
options.getWorkerInstanceKey(),
options.getWorkerVersioningOptions(),
taskQueueActivitiesPerSecond,
this.slotSupplier,
workerMetricsScope,
service.getServerCapabilities()),
this.pollTaskExecutor,
pollerOptions,
serverSupportsAutoscaling.get(),
namespaceCapabilities,
workerMetricsScope);

} else {
Expand All @@ -122,14 +122,16 @@ public boolean start() {
namespace,
taskQueue,
options.getIdentity(),
options.getWorkerInstanceKey(),
options.getWorkerVersioningOptions(),
taskQueueActivitiesPerSecond,
this.slotSupplier,
workerMetricsScope,
service.getServerCapabilities()),
this.pollTaskExecutor,
pollerOptions,
workerMetricsScope);
workerMetricsScope,
namespaceCapabilities);
}
poller.start();
workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public AsyncActivityPollTask(
@Nonnull String namespace,
@Nonnull String taskQueue,
@Nonnull String identity,
@Nonnull String workerInstanceKey,
@Nonnull WorkerVersioningOptions versioningOptions,
double activitiesPerSecond,
@Nonnull TrackingSlotSupplier<ActivitySlotInfo> slotSupplier,
Expand All @@ -57,6 +58,7 @@ public AsyncActivityPollTask(
.setNamespace(namespace)
.setIdentity(identity)
.setTaskQueue(TaskQueue.newBuilder().setName(taskQueue));
pollRequest.setWorkerInstanceKey(workerInstanceKey);
if (activitiesPerSecond > 0) {
pollRequest.setTaskQueueMetadata(
TaskQueueMetadata.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public AsyncNexusPollTask(
@Nonnull String namespace,
@Nonnull String taskQueue,
@Nonnull String identity,
@Nonnull String workerInstanceKey,
@Nonnull WorkerVersioningOptions versioningOptions,
@Nonnull Scope metricsScope,
@Nonnull Supplier<GetSystemInfoResponse.Capabilities> serverCapabilities,
Expand All @@ -56,6 +57,8 @@ public AsyncNexusPollTask(
.setIdentity(identity)
.setTaskQueue(TaskQueue.newBuilder().setName(taskQueue));

pollRequest.setWorkerInstanceKey(workerInstanceKey);

if (versioningOptions.getWorkerDeploymentOptions() != null) {
pollRequest.setDeploymentOptions(
WorkerVersioningProtoUtils.deploymentOptionsToProto(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ final class AsyncPoller<T extends ScalingTask> extends BasePoller<T> {
private final List<PollTaskAsync<T>> asyncTaskPollers;
private final PollerOptions pollerOptions;
private final PollerBehaviorAutoscaling pollerBehavior;
private final boolean serverSupportsAutoscaling;
private final Scope workerMetricsScope;
private Throttler pollRateThrottler;
private final Thread.UncaughtExceptionHandler uncaughtExceptionHandler =
Expand All @@ -43,15 +42,15 @@ final class AsyncPoller<T extends ScalingTask> extends BasePoller<T> {
PollTaskAsync<T> asyncTaskPoller,
ShutdownableTaskExecutor<T> taskExecutor,
PollerOptions pollerOptions,
boolean serverSupportsAutoscaling,
NamespaceCapabilities namespaceCapabilities,
Scope workerMetricsScope) {
this(
slotSupplier,
slotReservationData,
Collections.singletonList(asyncTaskPoller),
taskExecutor,
pollerOptions,
serverSupportsAutoscaling,
namespaceCapabilities,
workerMetricsScope);
}

Expand All @@ -61,9 +60,9 @@ final class AsyncPoller<T extends ScalingTask> extends BasePoller<T> {
List<PollTaskAsync<T>> asyncTaskPollers,
ShutdownableTaskExecutor<T> taskExecutor,
PollerOptions pollerOptions,
boolean serverSupportsAutoscaling,
NamespaceCapabilities namespaceCapabilities,
Scope workerMetricsScope) {
super(taskExecutor);
super(taskExecutor, namespaceCapabilities);
Objects.requireNonNull(slotSupplier, "slotSupplier cannot be null");
Objects.requireNonNull(slotReservationData, "slotReservation data should not be null");
Objects.requireNonNull(asyncTaskPollers, "asyncTaskPollers should not be null");
Expand All @@ -82,7 +81,6 @@ final class AsyncPoller<T extends ScalingTask> extends BasePoller<T> {
+ " is not supported for AsyncPoller. Only PollerBehaviorAutoscaling is supported.");
}
this.pollerBehavior = (PollerBehaviorAutoscaling) pollerOptions.getPollerBehavior();
this.serverSupportsAutoscaling = serverSupportsAutoscaling;
this.pollerOptions = pollerOptions;
this.workerMetricsScope = workerMetricsScope;
}
Expand Down Expand Up @@ -114,7 +112,7 @@ public boolean start() {
pollerBehavior.getMinConcurrentTaskPollers(),
pollerBehavior.getMaxConcurrentTaskPollers(),
pollerBehavior.getInitialConcurrentTaskPollers(),
serverSupportsAutoscaling,
namespaceCapabilities.isPollerAutoscaling(),
(newTarget) -> {
log.debug(
"Updating maximum number of pollers for {} to: {}",
Expand All @@ -136,12 +134,14 @@ public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean
return super.shutdown(shutdownManager, interruptTasks)
.thenApply(
(f) -> {
for (PollTaskAsync<T> asyncTaskPoller : asyncTaskPollers) {
try {
log.debug("Shutting down async poller: {}", asyncTaskPoller.getLabel());
asyncTaskPoller.cancel(new RuntimeException("Shutting down poller"));
} catch (Throwable e) {
log.error("Error while cancelling poll task", e);
if (!namespaceCapabilities.isGracefulPollShutdown()) {
for (PollTaskAsync<T> asyncTaskPoller : asyncTaskPollers) {
try {
log.debug("Shutting down async poller: {}", asyncTaskPoller.getLabel());
asyncTaskPoller.cancel(new RuntimeException("Shutting down poller"));
} catch (Throwable e) {
log.error("Error while cancelling poll task", e);
}
}
}
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public AsyncWorkflowPollTask(
@Nonnull String taskQueue,
@Nullable String stickyTaskQueue,
@Nonnull String identity,
@Nonnull String workerInstanceKey,
@Nonnull WorkerVersioningOptions versioningOptions,
@Nonnull TrackingSlotSupplier<WorkflowSlotInfo> slotSupplier,
@Nonnull Scope metricsScope,
Expand All @@ -66,6 +67,8 @@ public AsyncWorkflowPollTask(
.setNamespace(Objects.requireNonNull(namespace))
.setIdentity(Objects.requireNonNull(identity));

pollRequestBuilder.setWorkerInstanceKey(workerInstanceKey);

if (versioningOptions.getWorkerDeploymentOptions() != null) {
pollRequestBuilder.setDeploymentOptions(
WorkerVersioningProtoUtils.deploymentOptionsToProto(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@ abstract class BasePoller<T> implements SuspendableWorker {

protected ExecutorService pollExecutor;

protected BasePoller(ShutdownableTaskExecutor<T> taskExecutor) {
protected final NamespaceCapabilities namespaceCapabilities;

protected BasePoller(
ShutdownableTaskExecutor<T> taskExecutor, NamespaceCapabilities namespaceCapabilities) {
Objects.requireNonNull(taskExecutor, "taskExecutor should not be null");
this.taskExecutor = taskExecutor;
this.namespaceCapabilities =
Objects.requireNonNull(namespaceCapabilities, "namespaceCapabilities should not be null");
}

@Override
Expand All @@ -55,15 +60,23 @@ public CompletableFuture<Void> shutdown(ShutdownManager shutdownManager, boolean
return CompletableFuture.completedFuture(null);
}

return shutdownManager
// it's ok to forcefully shutdown pollers, because they are stuck in a long poll call
// so we don't risk loosing any progress doing that.
.shutdownExecutorNow(pollExecutor, this + "#pollExecutor", Duration.ofSeconds(1))
.exceptionally(
e -> {
log.error("Unexpected exception during shutdown", e);
return null;
});
CompletableFuture<Void> pollExecutorShutdown;
if (namespaceCapabilities.isGracefulPollShutdown()) {
// When graceful poll shutdown is enabled, the server will complete outstanding polls with
// empty responses after ShutdownWorker is called. We simply wait for polls to return.
pollExecutorShutdown =
shutdownManager.shutdownExecutorUntimed(pollExecutor, this + "#pollExecutor");
} else {
// Old behaviour forcibly stops outstanding polls.
pollExecutorShutdown =
shutdownManager.shutdownExecutorNow(
pollExecutor, this + "#pollExecutor", Duration.ofSeconds(1));
}
return pollExecutorShutdown.exceptionally(
e -> {
log.error("Unexpected exception during shutdown", e);
return null;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ public MultiThreadedPoller(
PollTask<T> pollTask,
ShutdownableTaskExecutor<T> taskExecutor,
PollerOptions pollerOptions,
Scope workerMetricsScope) {
super(taskExecutor);
Scope workerMetricsScope,
NamespaceCapabilities namespaceCapabilities) {
super(taskExecutor, namespaceCapabilities);
Objects.requireNonNull(identity, "identity cannot be null");
Objects.requireNonNull(pollTask, "poll service should not be null");
Objects.requireNonNull(pollerOptions, "pollerOptions should not be null");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.temporal.internal.worker;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* Holds namespace-level capabilities discovered from the server's DescribeNamespace response. A
* single instance is shared across all workers in a WorkerFactory and is populated at startup. Uses
* AtomicBooleans so capabilities can be set after construction.
*/
public final class NamespaceCapabilities {
private final AtomicBoolean pollerAutoscaling = new AtomicBoolean(false);
private final AtomicBoolean gracefulPollShutdown = new AtomicBoolean(false);

public boolean isPollerAutoscaling() {
return pollerAutoscaling.get();
}

public void setPollerAutoscaling(boolean value) {
pollerAutoscaling.set(value);
}

public boolean isGracefulPollShutdown() {
return gracefulPollShutdown.get();
}

public void setGracefulPollShutdown(boolean value) {
gracefulPollShutdown.set(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public NexusPollTask(
@Nonnull String namespace,
@Nonnull String taskQueue,
@Nonnull String identity,
@Nonnull String workerInstanceKey,
@Nonnull WorkerVersioningOptions versioningOptions,
@Nonnull TrackingSlotSupplier<NexusSlotInfo> slotSupplier,
@Nonnull Scope metricsScope,
Expand All @@ -48,6 +49,7 @@ public NexusPollTask(
.setNamespace(namespace)
.setIdentity(identity)
.setTaskQueue(TaskQueue.newBuilder().setName(taskQueue));
pollRequest.setWorkerInstanceKey(workerInstanceKey);

if (versioningOptions.getWorkerDeploymentOptions() != null) {
pollRequest.setDeploymentOptions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -53,7 +52,7 @@ final class NexusWorker implements SuspendableWorker {
private final GrpcRetryer grpcRetryer;
private final GrpcRetryer.GrpcRetryerOptions replyGrpcRetryerOptions;
private final TrackingSlotSupplier<NexusSlotInfo> slotSupplier;
private final AtomicBoolean serverSupportsAutoscaling;
private final NamespaceCapabilities namespaceCapabilities;
private final boolean forceOldFailureFormat;

public NexusWorker(
Expand All @@ -64,7 +63,7 @@ public NexusWorker(
@Nonnull NexusTaskHandler handler,
@Nonnull DataConverter dataConverter,
@Nonnull SlotSupplier<NexusSlotInfo> slotSupplier,
@Nonnull AtomicBoolean serverSupportsAutoscaling) {
@Nonnull NamespaceCapabilities namespaceCapabilities) {
this.service = Objects.requireNonNull(service);
this.namespace = Objects.requireNonNull(namespace);
this.taskQueue = Objects.requireNonNull(taskQueue);
Expand All @@ -80,7 +79,7 @@ public NexusWorker(
DefaultStubServiceOperationRpcRetryOptions.INSTANCE, null);

this.slotSupplier = new TrackingSlotSupplier<>(slotSupplier, this.workerMetricsScope);
this.serverSupportsAutoscaling = serverSupportsAutoscaling;
this.namespaceCapabilities = namespaceCapabilities;
// Allow tests to force old format for backward compatibility testing
String forceOldFormat = System.getProperty("temporal.nexus.forceOldFailureFormat");
this.forceOldFailureFormat = "true".equalsIgnoreCase(forceOldFormat);
Expand Down Expand Up @@ -110,13 +109,14 @@ public boolean start() {
namespace,
taskQueue,
options.getIdentity(),
options.getWorkerInstanceKey(),
options.getWorkerVersioningOptions(),
workerMetricsScope,
service.getServerCapabilities(),
this.slotSupplier),
this.pollTaskExecutor,
pollerOptions,
serverSupportsAutoscaling.get(),
namespaceCapabilities,
workerMetricsScope);
} else {
poller =
Expand All @@ -127,13 +127,15 @@ public boolean start() {
namespace,
taskQueue,
options.getIdentity(),
options.getWorkerInstanceKey(),
options.getWorkerVersioningOptions(),
this.slotSupplier,
workerMetricsScope,
service.getServerCapabilities()),
this.pollTaskExecutor,
pollerOptions,
workerMetricsScope);
workerMetricsScope,
namespaceCapabilities);
}
poller.start();
workerMetricsScope.counter(MetricsType.WORKER_START_COUNTER).inc(1);
Expand Down
Loading
Loading