diff --git a/.changes/next-release/bugfix-AWSSDKforJavav2-7d9ef28.json b/.changes/next-release/bugfix-AWSSDKforJavav2-7d9ef28.json new file mode 100644 index 000000000000..cc36d1b0808a --- /dev/null +++ b/.changes/next-release/bugfix-AWSSDKforJavav2-7d9ef28.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "AWS SDK for Java v2", + "contributor": "", + "description": "Fixed a thread leak in ResponseInputStream and ResponsePublisher where the internal timeout scheduler thread persisted for the lifetime of the JVM, even when no streams were active. The thread now terminates after being idle for 60 seconds." +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/ResponseInputStream.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/ResponseInputStream.java index 8a7d9fea9b89..8f87186d0edd 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/ResponseInputStream.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/ResponseInputStream.java @@ -18,9 +18,9 @@ import java.io.IOException; import java.io.InputStream; import java.time.Duration; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import software.amazon.awssdk.annotations.SdkPublicApi; import software.amazon.awssdk.annotations.SdkTestInternalApi; @@ -61,6 +61,7 @@ public final class ResponseInputStream extends SdkFilterInputStream i private static final Logger log = Logger.loggerFor(ResponseInputStream.class); private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(60); + private static final long THREAD_IDLE_TIMEOUT_SECONDS = 60; private final ResponseT response; private final Abortable abortable; private ScheduledFuture timeoutTask; @@ -140,12 +141,18 @@ private void scheduleTimeoutTask(Duration timeout) { } private static final class TimeoutScheduler { - static final ScheduledExecutorService INSTANCE = - Executors.newScheduledThreadPool(1, r -> { + static final ScheduledExecutorService INSTANCE; + + static { + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, r -> { Thread t = new Thread(r, "response-input-stream-timeout-scheduler"); t.setDaemon(true); return t; }); + executor.setKeepAliveTime(THREAD_IDLE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + executor.allowCoreThreadTimeOut(true); + INSTANCE = executor; + } } /** diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/ResponsePublisher.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/ResponsePublisher.java index bb44b4568f16..d57dd399903e 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/ResponsePublisher.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/async/ResponsePublisher.java @@ -18,9 +18,9 @@ import java.nio.ByteBuffer; import java.time.Duration; import java.util.Objects; -import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; @@ -50,6 +50,7 @@ public final class ResponsePublisher implements S private static final Logger log = Logger.loggerFor(ResponsePublisher.class); private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(60); + private static final long THREAD_IDLE_TIMEOUT_SECONDS = 60; private final ResponseT response; private final SdkPublisher publisher; private ScheduledFuture timeoutTask; @@ -101,12 +102,18 @@ private void scheduleTimeoutTask(Duration timeout) { } private static final class TimeoutScheduler { - static final ScheduledExecutorService INSTANCE = - Executors.newScheduledThreadPool(1, r -> { + static final ScheduledExecutorService INSTANCE; + + static { + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, r -> { Thread t = new Thread(r, "response-publisher-timeout-scheduler"); t.setDaemon(true); return t; }); + executor.setKeepAliveTime(THREAD_IDLE_TIMEOUT_SECONDS, TimeUnit.SECONDS); + executor.allowCoreThreadTimeOut(true); + INSTANCE = executor; + } } private static class CancellingSubscriber implements Subscriber {