diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java index 60b14cb3ba9..0b2a21923c6 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.runner.app.client; import static org.apache.ignite.internal.testframework.IgniteTestUtils.await; +import static org.apache.ignite.internal.tx.impl.ResourceVacuumManager.RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.is; @@ -31,6 +32,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.math.BigDecimal; +import java.time.Duration; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; @@ -38,6 +40,7 @@ import java.util.ArrayList; import java.util.EnumSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletionException; @@ -51,6 +54,7 @@ import org.apache.ignite.internal.client.sql.QueryModifier; import org.apache.ignite.internal.security.authentication.UserDetails; import org.apache.ignite.internal.testframework.IgniteTestUtils; +import org.apache.ignite.internal.testframework.WithSystemProperty; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.impl.TransactionInflights; import org.apache.ignite.lang.IgniteException; @@ -73,7 +77,6 @@ import org.awaitility.Awaitility; import org.hamcrest.Matchers; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -81,6 +84,7 @@ /** * Thin client SQL integration test. */ +@WithSystemProperty(key = RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY, value = "10") @SuppressWarnings("resource") public class ItThinClientSqlTest extends ItAbstractThinClientTest { @AfterEach @@ -840,7 +844,6 @@ public void testSqlQueryModifiers() { } @Test - @Disabled("https://issues.apache.org/jira/browse/IGNITE-26567") public void testBroadcastQueryTxInflightStateCleanup() { IgniteSql sql = client().sql(); @@ -858,7 +861,11 @@ public void testBroadcastQueryTxInflightStateCleanup() { IgniteImpl server = TestWrappers.unwrapIgniteImpl(server(i)); TxManager txManager = server.txManager(); TransactionInflights transactionInflights = IgniteTestUtils.getFieldValue(txManager, "transactionInflights"); - assertFalse(transactionInflights.hasActiveInflights(), "Expecting no active inflights"); + Map txContexts = IgniteTestUtils.getFieldValue(transactionInflights, "txCtxMap"); + + Awaitility.await() + .atMost(Duration.ofSeconds(10)) + .untilAsserted(() -> assertTrue(txContexts.isEmpty(), "Expecting no transaction inflight contexts")); } } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandler.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandler.java index 6e265624939..171707c098c 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandler.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandler.java @@ -34,6 +34,9 @@ public class FinishedTransactionBatchRequestHandler { /** Resources registry. */ private final RemotelyTriggeredResourceRegistry resourcesRegistry; + /** Transaction inflights tracker. */ + private final TransactionInflights transactionInflights; + private final LowWatermark lowWatermark; private final Executor asyncExecutor; @@ -43,17 +46,20 @@ public class FinishedTransactionBatchRequestHandler { * * @param messagingService Messaging service. * @param resourcesRegistry Resources registry. + * @param transactionInflights Transaction inflights. * @param lowWatermark Low watermark. * @param asyncExecutor Executor to run cleanup commands. */ public FinishedTransactionBatchRequestHandler( MessagingService messagingService, RemotelyTriggeredResourceRegistry resourcesRegistry, + TransactionInflights transactionInflights, LowWatermark lowWatermark, Executor asyncExecutor ) { this.messagingService = messagingService; this.resourcesRegistry = resourcesRegistry; + this.transactionInflights = transactionInflights; this.lowWatermark = lowWatermark; this.asyncExecutor = asyncExecutor; } @@ -74,6 +80,8 @@ private void processFinishedTransactionsBatchMessage(FinishedTransactionsBatchMe } private void cleanUpForTransaction(UUID transactionId) { + transactionInflights.removeTxContext(transactionId); + resourcesRegistry.close(transactionId); lowWatermark.unlock(transactionId); diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceVacuumManager.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceVacuumManager.java index a76cc3fa43a..bcbaaf9fc0b 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceVacuumManager.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ResourceVacuumManager.java @@ -124,6 +124,7 @@ public ResourceVacuumManager( this.finishedTransactionBatchRequestHandler = new FinishedTransactionBatchRequestHandler( messagingService, resourceRegistry, + transactionInflights, lowWatermark, resourceVacuumExecutor ); diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandlerTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandlerTest.java index ba1ba7cc25a..e879d0b6b83 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandlerTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/FinishedTransactionBatchRequestHandlerTest.java @@ -54,6 +54,9 @@ class FinishedTransactionBatchRequestHandlerTest extends BaseIgniteAbstractTest @Mock private LowWatermark lowWatermark; + @Mock + private TransactionInflights transactionInflights; + private FinishedTransactionBatchRequestHandler requestHandler; private NetworkMessageHandler networkHandler; @@ -63,6 +66,7 @@ void createAndStartHandler() { requestHandler = new FinishedTransactionBatchRequestHandler( messagingService, resourceRegistry, + transactionInflights, lowWatermark, ForkJoinPool.commonPool() ); @@ -86,6 +90,8 @@ void unlocksLwm() { networkHandler.onReceived(message, mock(InternalClusterNode.class), null); + verify(transactionInflights, timeout(10_000)).removeTxContext(txId1); + verify(transactionInflights, timeout(10_000)).removeTxContext(txId2); verify(lowWatermark, timeout(10_000)).unlock(txId1); verify(lowWatermark, timeout(10_000)).unlock(txId2); }