Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.ignite.internal.runner.app.client;

import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.apache.ignite.internal.tx.impl.ResourceVacuumManager.RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
Expand All @@ -31,13 +32,15 @@
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.math.BigDecimal;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletionException;
Expand All @@ -51,6 +54,7 @@
import org.apache.ignite.internal.client.sql.QueryModifier;
import org.apache.ignite.internal.security.authentication.UserDetails;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.testframework.WithSystemProperty;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.lang.IgniteException;
Expand All @@ -73,14 +77,14 @@
import org.awaitility.Awaitility;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/**
* Thin client SQL integration test.
*/
@WithSystemProperty(key = RESOURCE_VACUUM_INTERVAL_MILLISECONDS_PROPERTY, value = "10")
@SuppressWarnings("resource")
public class ItThinClientSqlTest extends ItAbstractThinClientTest {
@AfterEach
Expand Down Expand Up @@ -840,7 +844,6 @@ public void testSqlQueryModifiers() {
}

@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-26567")
public void testBroadcastQueryTxInflightStateCleanup() {
IgniteSql sql = client().sql();

Expand All @@ -858,7 +861,11 @@ public void testBroadcastQueryTxInflightStateCleanup() {
IgniteImpl server = TestWrappers.unwrapIgniteImpl(server(i));
TxManager txManager = server.txManager();
TransactionInflights transactionInflights = IgniteTestUtils.getFieldValue(txManager, "transactionInflights");
assertFalse(transactionInflights.hasActiveInflights(), "Expecting no active inflights");
Map<UUID, ?> txContexts = IgniteTestUtils.getFieldValue(transactionInflights, "txCtxMap");

Awaitility.await()
.atMost(Duration.ofSeconds(10))
.untilAsserted(() -> assertTrue(txContexts.isEmpty(), "Expecting no transaction inflight contexts"));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public class FinishedTransactionBatchRequestHandler {
/** Resources registry. */
private final RemotelyTriggeredResourceRegistry resourcesRegistry;

/** Transaction inflights tracker. */
private final TransactionInflights transactionInflights;

private final LowWatermark lowWatermark;

private final Executor asyncExecutor;
Expand All @@ -43,17 +46,20 @@ public class FinishedTransactionBatchRequestHandler {
*
* @param messagingService Messaging service.
* @param resourcesRegistry Resources registry.
* @param transactionInflights Transaction inflights.
* @param lowWatermark Low watermark.
* @param asyncExecutor Executor to run cleanup commands.
*/
public FinishedTransactionBatchRequestHandler(
MessagingService messagingService,
RemotelyTriggeredResourceRegistry resourcesRegistry,
TransactionInflights transactionInflights,
LowWatermark lowWatermark,
Executor asyncExecutor
) {
this.messagingService = messagingService;
this.resourcesRegistry = resourcesRegistry;
this.transactionInflights = transactionInflights;
this.lowWatermark = lowWatermark;
this.asyncExecutor = asyncExecutor;
}
Expand All @@ -74,6 +80,8 @@ private void processFinishedTransactionsBatchMessage(FinishedTransactionsBatchMe
}

private void cleanUpForTransaction(UUID transactionId) {
transactionInflights.removeTxContext(transactionId);

resourcesRegistry.close(transactionId);

lowWatermark.unlock(transactionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ public ResourceVacuumManager(
this.finishedTransactionBatchRequestHandler = new FinishedTransactionBatchRequestHandler(
messagingService,
resourceRegistry,
transactionInflights,
lowWatermark,
resourceVacuumExecutor
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ class FinishedTransactionBatchRequestHandlerTest extends BaseIgniteAbstractTest
@Mock
private LowWatermark lowWatermark;

@Mock
private TransactionInflights transactionInflights;

private FinishedTransactionBatchRequestHandler requestHandler;

private NetworkMessageHandler networkHandler;
Expand All @@ -63,6 +66,7 @@ void createAndStartHandler() {
requestHandler = new FinishedTransactionBatchRequestHandler(
messagingService,
resourceRegistry,
transactionInflights,
lowWatermark,
ForkJoinPool.commonPool()
);
Expand All @@ -86,6 +90,8 @@ void unlocksLwm() {

networkHandler.onReceived(message, mock(InternalClusterNode.class), null);

verify(transactionInflights, timeout(10_000)).removeTxContext(txId1);
verify(transactionInflights, timeout(10_000)).removeTxContext(txId2);
verify(lowWatermark, timeout(10_000)).unlock(txId1);
verify(lowWatermark, timeout(10_000)).unlock(txId2);
}
Expand Down