diff --git a/core/src/main/java/org/opensearch/sql/executor/DelegatingExecutionEngine.java b/core/src/main/java/org/opensearch/sql/executor/DelegatingExecutionEngine.java new file mode 100644 index 00000000000..b38251233a0 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/DelegatingExecutionEngine.java @@ -0,0 +1,81 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor; + +import java.util.List; +import java.util.Optional; +import lombok.RequiredArgsConstructor; +import lombok.extern.log4j.Log4j2; +import org.apache.calcite.rel.RelNode; +import org.opensearch.sql.ast.statement.ExplainMode; +import org.opensearch.sql.calcite.CalcitePlanContext; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.planner.physical.PhysicalPlan; + +/** + * An {@link ExecutionEngine} that delegates Calcite RelNode execution to the first extension whose + * {@link ExecutionEngine#canVectorize(RelNode)} returns {@code true}, falling back to the default + * engine otherwise. Non-Calcite ({@link PhysicalPlan}) methods and unmatched RelNode plans are + * forwarded to the default engine. + */ +@RequiredArgsConstructor +@Log4j2 +public class DelegatingExecutionEngine implements ExecutionEngine { + + private final ExecutionEngine defaultEngine; + private final List extensions; + + @Override + public void execute(PhysicalPlan plan, ResponseListener listener) { + defaultEngine.execute(plan, listener); + } + + @Override + public void execute( + PhysicalPlan plan, ExecutionContext context, ResponseListener listener) { + defaultEngine.execute(plan, context, listener); + } + + @Override + public void explain(PhysicalPlan plan, ResponseListener listener) { + defaultEngine.explain(plan, listener); + } + + @Override + public boolean canVectorize(RelNode plan) { + return findExtension(plan).isPresent(); + } + + @Override + public void execute( + RelNode plan, CalcitePlanContext context, ResponseListener listener) { + Optional ext = findExtension(plan); + if (ext.isPresent()) { + log.info("Routing query to extension engine : {}", ext.get().getClass().getSimpleName()); + ext.get().execute(plan, context, listener); + } else { + defaultEngine.execute(plan, context, listener); + } + } + + @Override + public void explain( + RelNode plan, + ExplainMode mode, + CalcitePlanContext context, + ResponseListener listener) { + Optional ext = findExtension(plan); + if (ext.isPresent()) { + ext.get().explain(plan, mode, context, listener); + } else { + defaultEngine.explain(plan, mode, context, listener); + } + } + + private Optional findExtension(RelNode plan) { + return extensions.stream().filter(ext -> ext.canVectorize(plan)).findFirst(); + } +} diff --git a/core/src/main/java/org/opensearch/sql/executor/ExecutionEngine.java b/core/src/main/java/org/opensearch/sql/executor/ExecutionEngine.java index e65db7b4065..da8eae41355 100644 --- a/core/src/main/java/org/opensearch/sql/executor/ExecutionEngine.java +++ b/core/src/main/java/org/opensearch/sql/executor/ExecutionEngine.java @@ -47,15 +47,32 @@ void execute( */ void explain(PhysicalPlan plan, ResponseListener listener); + /** + * Check if this engine supports vectorized execution of the given Calcite RelNode plan. + * Vectorized execution engines (e.g. Velox) override this to advertise support for specific plan + * shapes. The default returns {@code false}. + */ + default boolean canVectorize(RelNode plan) { + return false; + } + /** Execute calcite RelNode plan with {@link ExecutionContext} and call back response listener. */ default void execute( - RelNode plan, CalcitePlanContext context, ResponseListener listener) {} + RelNode plan, CalcitePlanContext context, ResponseListener listener) { + listener.onFailure( + new UnsupportedOperationException( + getClass().getSimpleName() + " does not support RelNode execution")); + } default void explain( RelNode plan, ExplainMode mode, CalcitePlanContext context, - ResponseListener listener) {} + ResponseListener listener) { + listener.onFailure( + new UnsupportedOperationException( + getClass().getSimpleName() + " does not support RelNode explain")); + } /** Data class that encapsulates ExprValue. */ @Data diff --git a/core/src/test/java/org/opensearch/sql/executor/DelegatingExecutionEngineTest.java b/core/src/test/java/org/opensearch/sql/executor/DelegatingExecutionEngineTest.java new file mode 100644 index 00000000000..6e7c59d6ac6 --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/executor/DelegatingExecutionEngineTest.java @@ -0,0 +1,164 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.List; +import org.apache.calcite.rel.RelNode; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.ast.statement.ExplainMode; +import org.opensearch.sql.calcite.CalcitePlanContext; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.planner.physical.PhysicalPlan; + +@ExtendWith(MockitoExtension.class) +class DelegatingExecutionEngineTest { + + @Mock private ExecutionEngine defaultEngine; + + @Mock private ExecutionEngine extension1; + + @Mock private ExecutionEngine extension2; + + @Mock private RelNode relNode; + + @Mock private CalcitePlanContext calciteContext; + + @Mock private PhysicalPlan physicalPlan; + + @Mock private ExecutionContext executionContext; + + @Mock private ResponseListener queryListener; + + @Mock private ResponseListener explainListener; + + @Test + void executeRelNodeRoutesToMatchingExtension() { + when(extension1.canVectorize(relNode)).thenReturn(true); + DelegatingExecutionEngine engine = + new DelegatingExecutionEngine(defaultEngine, List.of(extension1, extension2)); + + engine.execute(relNode, calciteContext, queryListener); + + verify(extension1).execute(relNode, calciteContext, queryListener); + verify(defaultEngine, never()).execute(any(RelNode.class), any(), eq(queryListener)); + } + + @Test + void executeRelNodeFallsBackToDefaultWhenNoExtensionMatches() { + when(extension1.canVectorize(relNode)).thenReturn(false); + when(extension2.canVectorize(relNode)).thenReturn(false); + DelegatingExecutionEngine engine = + new DelegatingExecutionEngine(defaultEngine, List.of(extension1, extension2)); + + engine.execute(relNode, calciteContext, queryListener); + + verify(defaultEngine).execute(relNode, calciteContext, queryListener); + verify(extension1, never()).execute(any(RelNode.class), any(), eq(queryListener)); + verify(extension2, never()).execute(any(RelNode.class), any(), eq(queryListener)); + } + + @Test + void executeRelNodeRoutesToFirstMatchingExtension() { + when(extension1.canVectorize(relNode)).thenReturn(true); + DelegatingExecutionEngine engine = + new DelegatingExecutionEngine(defaultEngine, List.of(extension1, extension2)); + + engine.execute(relNode, calciteContext, queryListener); + + verify(extension1).execute(relNode, calciteContext, queryListener); + verify(extension2, never()).execute(any(RelNode.class), any(), eq(queryListener)); + } + + @Test + void explainRelNodeRoutesToMatchingExtension() { + when(extension1.canVectorize(relNode)).thenReturn(true); + DelegatingExecutionEngine engine = + new DelegatingExecutionEngine(defaultEngine, List.of(extension1)); + + engine.explain(relNode, ExplainMode.STANDARD, calciteContext, explainListener); + + verify(extension1).explain(relNode, ExplainMode.STANDARD, calciteContext, explainListener); + verify(defaultEngine, never()).explain(any(RelNode.class), any(), any(), eq(explainListener)); + } + + @Test + void explainRelNodeFallsBackToDefaultWhenNoExtensionMatches() { + when(extension1.canVectorize(relNode)).thenReturn(false); + DelegatingExecutionEngine engine = + new DelegatingExecutionEngine(defaultEngine, List.of(extension1)); + + engine.explain(relNode, ExplainMode.STANDARD, calciteContext, explainListener); + + verify(defaultEngine).explain(relNode, ExplainMode.STANDARD, calciteContext, explainListener); + } + + @Test + void canVectorizeReturnsTrueWhenExtensionMatches() { + when(extension1.canVectorize(relNode)).thenReturn(false); + when(extension2.canVectorize(relNode)).thenReturn(true); + DelegatingExecutionEngine engine = + new DelegatingExecutionEngine(defaultEngine, List.of(extension1, extension2)); + + assert engine.canVectorize(relNode); + } + + @Test + void canVectorizeReturnsFalseWhenNoExtensionMatches() { + when(extension1.canVectorize(relNode)).thenReturn(false); + DelegatingExecutionEngine engine = + new DelegatingExecutionEngine(defaultEngine, List.of(extension1)); + + assert !engine.canVectorize(relNode); + } + + @Test + void physicalPlanExecuteDelegatesToDefault() { + DelegatingExecutionEngine engine = + new DelegatingExecutionEngine(defaultEngine, List.of(extension1)); + + engine.execute(physicalPlan, queryListener); + + verify(defaultEngine).execute(physicalPlan, queryListener); + } + + @Test + void physicalPlanExecuteWithContextDelegatesToDefault() { + DelegatingExecutionEngine engine = + new DelegatingExecutionEngine(defaultEngine, List.of(extension1)); + + engine.execute(physicalPlan, executionContext, queryListener); + + verify(defaultEngine).execute(physicalPlan, executionContext, queryListener); + } + + @Test + void physicalPlanExplainDelegatesToDefault() { + DelegatingExecutionEngine engine = + new DelegatingExecutionEngine(defaultEngine, List.of(extension1)); + + engine.explain(physicalPlan, explainListener); + + verify(defaultEngine).explain(physicalPlan, explainListener); + } + + @Test + void emptyExtensionsListAlwaysFallsBackToDefault() { + DelegatingExecutionEngine engine = new DelegatingExecutionEngine(defaultEngine, List.of()); + + engine.execute(relNode, calciteContext, queryListener); + + verify(defaultEngine).execute(relNode, calciteContext, queryListener); + } +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index edffd65f6bf..d92788ac43b 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -46,6 +46,7 @@ import org.opensearch.jobscheduler.spi.ScheduledJobParser; import org.opensearch.jobscheduler.spi.ScheduledJobRunner; import org.opensearch.plugins.ActionPlugin; +import org.opensearch.plugins.ExtensiblePlugin; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.ScriptPlugin; import org.opensearch.plugins.SystemIndexPlugin; @@ -85,6 +86,7 @@ import org.opensearch.sql.directquery.transport.model.ExecuteDirectQueryActionResponse; import org.opensearch.sql.directquery.transport.model.ReadDirectQueryResourcesActionResponse; import org.opensearch.sql.directquery.transport.model.WriteDirectQueryResourcesActionResponse; +import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.legacy.esdomain.LocalClusterState; import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.legacy.plugin.RestSqlAction; @@ -93,6 +95,7 @@ import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.sql.opensearch.storage.OpenSearchDataSourceFactory; import org.opensearch.sql.opensearch.storage.script.CompoundedScriptEngine; +import org.opensearch.sql.plugin.config.EngineExtensionsHolder; import org.opensearch.sql.plugin.config.OpenSearchPluginModule; import org.opensearch.sql.plugin.rest.RestPPLGrammarAction; import org.opensearch.sql.plugin.rest.RestPPLQueryAction; @@ -126,10 +129,15 @@ import org.opensearch.watcher.ResourceWatcherService; public class SQLPlugin extends Plugin - implements ActionPlugin, ScriptPlugin, SystemIndexPlugin, JobSchedulerExtension { + implements ActionPlugin, + ScriptPlugin, + SystemIndexPlugin, + JobSchedulerExtension, + ExtensiblePlugin { private static final Logger LOGGER = LogManager.getLogger(SQLPlugin.class); + private List executionEngineExtensions = List.of(); private ClusterService clusterService; /** Settings should be inited when bootstrap the plugin. */ @@ -148,6 +156,18 @@ public String description() { return "Use sql to query OpenSearch."; } + @Override + public void loadExtensions(ExtensionLoader loader) { + List loaded = loader.loadExtensions(ExecutionEngine.class); + this.executionEngineExtensions = loaded != null ? List.copyOf(loaded) : List.of(); + if (!executionEngineExtensions.isEmpty()) { + LOGGER.info( + "Loaded {} execution engine extension(s): {}", + executionEngineExtensions.size(), + executionEngineExtensions.stream().map(e -> e.getClass().getSimpleName()).toList()); + } + } + @Override public List getRestHandlers( Settings settings, @@ -252,7 +272,7 @@ public Collection createComponents( LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings); LocalClusterState.state().setClient(client); ModulesBuilder modules = new ModulesBuilder(); - modules.add(new OpenSearchPluginModule()); + modules.add(new OpenSearchPluginModule(executionEngineExtensions)); modules.add( b -> { b.bind(NodeClient.class).toInstance((NodeClient) client); @@ -287,12 +307,15 @@ public Collection createComponents( ScheduledAsyncQueryJobRunner.getJobRunnerInstance() .loadJobResource(client, clusterService, threadPool, asyncQueryExecutorService); + EngineExtensionsHolder extensionsHolder = new EngineExtensionsHolder(executionEngineExtensions); + return ImmutableList.of( dataSourceService, asyncQueryExecutorService, clusterManagerEventListener, pluginSettings, - directQueryExecutorService); + directQueryExecutorService, + extensionsHolder); } @Override diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/config/EngineExtensionsHolder.java b/plugin/src/main/java/org/opensearch/sql/plugin/config/EngineExtensionsHolder.java new file mode 100644 index 00000000000..70b3fd29011 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/config/EngineExtensionsHolder.java @@ -0,0 +1,20 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.config; + +import java.util.List; +import org.opensearch.sql.executor.ExecutionEngine; + +/** + * Holds execution engine engines loaded via SPI. Returned from {@code SQLPlugin.createComponents()} + * so that OpenSearch's Guice injector can inject it into transport actions like {@code + * TransportPPLQueryAction}. + */ +public record EngineExtensionsHolder(List engines) { + public EngineExtensionsHolder(List engines) { + this.engines = engines != null ? List.copyOf(engines) : List.of(); + } +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginModule.java b/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginModule.java index 35504dd83c2..d9406935ee5 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginModule.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/config/OpenSearchPluginModule.java @@ -5,6 +5,7 @@ package org.opensearch.sql.plugin.config; +import java.util.List; import lombok.RequiredArgsConstructor; import org.opensearch.common.inject.AbstractModule; import org.opensearch.common.inject.Provides; @@ -13,6 +14,7 @@ import org.opensearch.sql.analysis.ExpressionAnalyzer; import org.opensearch.sql.common.setting.Settings; import org.opensearch.sql.datasource.DataSourceService; +import org.opensearch.sql.executor.DelegatingExecutionEngine; import org.opensearch.sql.executor.ExecutionEngine; import org.opensearch.sql.executor.QueryManager; import org.opensearch.sql.executor.QueryService; @@ -41,6 +43,13 @@ @RequiredArgsConstructor public class OpenSearchPluginModule extends AbstractModule { + private final List executionEngineExtensions; + + /** Default constructor for when no engines are available. */ + public OpenSearchPluginModule() { + this(List.of()); + } + private final BuiltinFunctionRepository functionRepository = BuiltinFunctionRepository.getInstance(); @@ -61,7 +70,12 @@ public StorageEngine storageEngine(OpenSearchClient client, Settings settings) { @Singleton public ExecutionEngine executionEngine( OpenSearchClient client, ExecutionProtector protector, PlanSerializer planSerializer) { - return new OpenSearchExecutionEngine(client, protector, planSerializer); + ExecutionEngine defaultEngine = + new OpenSearchExecutionEngine(client, protector, planSerializer); + if (executionEngineExtensions.isEmpty()) { + return defaultEngine; + } + return new DelegatingExecutionEngine(defaultEngine, executionEngineExtensions); } @Provides diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java index fd932ef2fc2..ef10aaca451 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java @@ -33,6 +33,7 @@ import org.opensearch.sql.monitor.profile.QueryProfiling; import org.opensearch.sql.opensearch.executor.OpenSearchQueryManager; import org.opensearch.sql.opensearch.setting.OpenSearchSettings; +import org.opensearch.sql.plugin.config.EngineExtensionsHolder; import org.opensearch.sql.plugin.config.OpenSearchPluginModule; import org.opensearch.sql.ppl.PPLService; import org.opensearch.sql.ppl.domain.PPLQueryRequest; @@ -65,11 +66,12 @@ public TransportPPLQueryAction( NodeClient client, ClusterService clusterService, DataSourceServiceImpl dataSourceService, - org.opensearch.common.settings.Settings clusterSettings) { + org.opensearch.common.settings.Settings clusterSettings, + EngineExtensionsHolder extensionsHolder) { super(PPLQueryAction.NAME, transportService, actionFilters, TransportPPLQueryRequest::new); ModulesBuilder modules = new ModulesBuilder(); - modules.add(new OpenSearchPluginModule()); + modules.add(new OpenSearchPluginModule(extensionsHolder.engines())); modules.add( b -> { b.bind(NodeClient.class).toInstance(client);