Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.executor;

import java.util.List;
import java.util.Optional;
import lombok.RequiredArgsConstructor;
import lombok.extern.log4j.Log4j2;
import org.apache.calcite.rel.RelNode;
import org.opensearch.sql.ast.statement.ExplainMode;
import org.opensearch.sql.calcite.CalcitePlanContext;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.planner.physical.PhysicalPlan;

/**
* An {@link ExecutionEngine} that delegates Calcite RelNode execution to the first extension whose
* {@link ExecutionEngine#canVectorize(RelNode)} returns {@code true}, falling back to the default
* engine otherwise. Non-Calcite ({@link PhysicalPlan}) methods and unmatched RelNode plans are
* forwarded to the default engine.
*/
@RequiredArgsConstructor
@Log4j2
public class DelegatingExecutionEngine implements ExecutionEngine {

private final ExecutionEngine defaultEngine;
private final List<ExecutionEngine> extensions;

@Override
public void execute(PhysicalPlan plan, ResponseListener<QueryResponse> listener) {
defaultEngine.execute(plan, listener);
}

@Override
public void execute(
PhysicalPlan plan, ExecutionContext context, ResponseListener<QueryResponse> listener) {
defaultEngine.execute(plan, context, listener);
}

@Override
public void explain(PhysicalPlan plan, ResponseListener<ExplainResponse> listener) {
defaultEngine.explain(plan, listener);
}

@Override
public boolean canVectorize(RelNode plan) {
return findExtension(plan).isPresent();
}

@Override
public void execute(
RelNode plan, CalcitePlanContext context, ResponseListener<QueryResponse> listener) {
Optional<ExecutionEngine> ext = findExtension(plan);
if (ext.isPresent()) {
log.info("Routing query to extension engine : {}", ext.get().getClass().getSimpleName());
ext.get().execute(plan, context, listener);
} else {
defaultEngine.execute(plan, context, listener);
}
}

@Override
public void explain(
RelNode plan,
ExplainMode mode,
CalcitePlanContext context,
ResponseListener<ExplainResponse> listener) {
Optional<ExecutionEngine> ext = findExtension(plan);
if (ext.isPresent()) {
ext.get().explain(plan, mode, context, listener);
} else {
defaultEngine.explain(plan, mode, context, listener);
}
}

private Optional<ExecutionEngine> findExtension(RelNode plan) {
return extensions.stream().filter(ext -> ext.canVectorize(plan)).findFirst();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,32 @@ void execute(
*/
void explain(PhysicalPlan plan, ResponseListener<ExplainResponse> listener);

/**
* Check if this engine supports vectorized execution of the given Calcite RelNode plan.
* Vectorized execution engines (e.g. Velox) override this to advertise support for specific plan
* shapes. The default returns {@code false}.
*/
default boolean canVectorize(RelNode plan) {
return false;
}

/** Execute calcite RelNode plan with {@link ExecutionContext} and call back response listener. */
default void execute(
RelNode plan, CalcitePlanContext context, ResponseListener<QueryResponse> listener) {}
RelNode plan, CalcitePlanContext context, ResponseListener<QueryResponse> listener) {
listener.onFailure(
new UnsupportedOperationException(
getClass().getSimpleName() + " does not support RelNode execution"));
}

default void explain(
RelNode plan,
ExplainMode mode,
CalcitePlanContext context,
ResponseListener<ExplainResponse> listener) {}
ResponseListener<ExplainResponse> listener) {
listener.onFailure(
new UnsupportedOperationException(
getClass().getSimpleName() + " does not support RelNode explain"));
}

/** Data class that encapsulates ExprValue. */
@Data
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.executor;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.List;
import org.apache.calcite.rel.RelNode;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.sql.ast.statement.ExplainMode;
import org.opensearch.sql.calcite.CalcitePlanContext;
import org.opensearch.sql.common.response.ResponseListener;
import org.opensearch.sql.planner.physical.PhysicalPlan;

@ExtendWith(MockitoExtension.class)
class DelegatingExecutionEngineTest {

@Mock private ExecutionEngine defaultEngine;

@Mock private ExecutionEngine extension1;

@Mock private ExecutionEngine extension2;

@Mock private RelNode relNode;

@Mock private CalcitePlanContext calciteContext;

@Mock private PhysicalPlan physicalPlan;

@Mock private ExecutionContext executionContext;

@Mock private ResponseListener<ExecutionEngine.QueryResponse> queryListener;

@Mock private ResponseListener<ExecutionEngine.ExplainResponse> explainListener;

@Test
void executeRelNodeRoutesToMatchingExtension() {
when(extension1.canVectorize(relNode)).thenReturn(true);
DelegatingExecutionEngine engine =
new DelegatingExecutionEngine(defaultEngine, List.of(extension1, extension2));

engine.execute(relNode, calciteContext, queryListener);

verify(extension1).execute(relNode, calciteContext, queryListener);
verify(defaultEngine, never()).execute(any(RelNode.class), any(), eq(queryListener));
}

@Test
void executeRelNodeFallsBackToDefaultWhenNoExtensionMatches() {
when(extension1.canVectorize(relNode)).thenReturn(false);
when(extension2.canVectorize(relNode)).thenReturn(false);
DelegatingExecutionEngine engine =
new DelegatingExecutionEngine(defaultEngine, List.of(extension1, extension2));

engine.execute(relNode, calciteContext, queryListener);

verify(defaultEngine).execute(relNode, calciteContext, queryListener);
verify(extension1, never()).execute(any(RelNode.class), any(), eq(queryListener));
verify(extension2, never()).execute(any(RelNode.class), any(), eq(queryListener));
}

@Test
void executeRelNodeRoutesToFirstMatchingExtension() {
when(extension1.canVectorize(relNode)).thenReturn(true);
DelegatingExecutionEngine engine =
new DelegatingExecutionEngine(defaultEngine, List.of(extension1, extension2));

engine.execute(relNode, calciteContext, queryListener);

verify(extension1).execute(relNode, calciteContext, queryListener);
verify(extension2, never()).execute(any(RelNode.class), any(), eq(queryListener));
}

@Test
void explainRelNodeRoutesToMatchingExtension() {
when(extension1.canVectorize(relNode)).thenReturn(true);
DelegatingExecutionEngine engine =
new DelegatingExecutionEngine(defaultEngine, List.of(extension1));

engine.explain(relNode, ExplainMode.STANDARD, calciteContext, explainListener);

verify(extension1).explain(relNode, ExplainMode.STANDARD, calciteContext, explainListener);
verify(defaultEngine, never()).explain(any(RelNode.class), any(), any(), eq(explainListener));
}

@Test
void explainRelNodeFallsBackToDefaultWhenNoExtensionMatches() {
when(extension1.canVectorize(relNode)).thenReturn(false);
DelegatingExecutionEngine engine =
new DelegatingExecutionEngine(defaultEngine, List.of(extension1));

engine.explain(relNode, ExplainMode.STANDARD, calciteContext, explainListener);

verify(defaultEngine).explain(relNode, ExplainMode.STANDARD, calciteContext, explainListener);
}

@Test
void canVectorizeReturnsTrueWhenExtensionMatches() {
when(extension1.canVectorize(relNode)).thenReturn(false);
when(extension2.canVectorize(relNode)).thenReturn(true);
DelegatingExecutionEngine engine =
new DelegatingExecutionEngine(defaultEngine, List.of(extension1, extension2));

assert engine.canVectorize(relNode);
}

@Test
void canVectorizeReturnsFalseWhenNoExtensionMatches() {
when(extension1.canVectorize(relNode)).thenReturn(false);
DelegatingExecutionEngine engine =
new DelegatingExecutionEngine(defaultEngine, List.of(extension1));

assert !engine.canVectorize(relNode);
}

@Test
void physicalPlanExecuteDelegatesToDefault() {
DelegatingExecutionEngine engine =
new DelegatingExecutionEngine(defaultEngine, List.of(extension1));

engine.execute(physicalPlan, queryListener);

verify(defaultEngine).execute(physicalPlan, queryListener);
}

@Test
void physicalPlanExecuteWithContextDelegatesToDefault() {
DelegatingExecutionEngine engine =
new DelegatingExecutionEngine(defaultEngine, List.of(extension1));

engine.execute(physicalPlan, executionContext, queryListener);

verify(defaultEngine).execute(physicalPlan, executionContext, queryListener);
}

@Test
void physicalPlanExplainDelegatesToDefault() {
DelegatingExecutionEngine engine =
new DelegatingExecutionEngine(defaultEngine, List.of(extension1));

engine.explain(physicalPlan, explainListener);

verify(defaultEngine).explain(physicalPlan, explainListener);
}

@Test
void emptyExtensionsListAlwaysFallsBackToDefault() {
DelegatingExecutionEngine engine = new DelegatingExecutionEngine(defaultEngine, List.of());

engine.execute(relNode, calciteContext, queryListener);

verify(defaultEngine).execute(relNode, calciteContext, queryListener);
}
}
29 changes: 26 additions & 3 deletions plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.plugins.ExtensiblePlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.ScriptPlugin;
import org.opensearch.plugins.SystemIndexPlugin;
Expand Down Expand Up @@ -85,6 +86,7 @@
import org.opensearch.sql.directquery.transport.model.ExecuteDirectQueryActionResponse;
import org.opensearch.sql.directquery.transport.model.ReadDirectQueryResourcesActionResponse;
import org.opensearch.sql.directquery.transport.model.WriteDirectQueryResourcesActionResponse;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.legacy.plugin.RestSqlAction;
Expand All @@ -93,6 +95,7 @@
import org.opensearch.sql.opensearch.setting.OpenSearchSettings;
import org.opensearch.sql.opensearch.storage.OpenSearchDataSourceFactory;
import org.opensearch.sql.opensearch.storage.script.CompoundedScriptEngine;
import org.opensearch.sql.plugin.config.EngineExtensionsHolder;
import org.opensearch.sql.plugin.config.OpenSearchPluginModule;
import org.opensearch.sql.plugin.rest.RestPPLGrammarAction;
import org.opensearch.sql.plugin.rest.RestPPLQueryAction;
Expand Down Expand Up @@ -126,10 +129,15 @@
import org.opensearch.watcher.ResourceWatcherService;

public class SQLPlugin extends Plugin
implements ActionPlugin, ScriptPlugin, SystemIndexPlugin, JobSchedulerExtension {
implements ActionPlugin,
ScriptPlugin,
SystemIndexPlugin,
JobSchedulerExtension,
ExtensiblePlugin {

private static final Logger LOGGER = LogManager.getLogger(SQLPlugin.class);

private List<ExecutionEngine> executionEngineExtensions = List.of();
private ClusterService clusterService;

/** Settings should be inited when bootstrap the plugin. */
Expand All @@ -148,6 +156,18 @@ public String description() {
return "Use sql to query OpenSearch.";
}

@Override
public void loadExtensions(ExtensionLoader loader) {
List<ExecutionEngine> loaded = loader.loadExtensions(ExecutionEngine.class);
this.executionEngineExtensions = loaded != null ? List.copyOf(loaded) : List.of();
if (!executionEngineExtensions.isEmpty()) {
LOGGER.info(
"Loaded {} execution engine extension(s): {}",
executionEngineExtensions.size(),
executionEngineExtensions.stream().map(e -> e.getClass().getSimpleName()).toList());
}
}

@Override
public List<RestHandler> getRestHandlers(
Settings settings,
Expand Down Expand Up @@ -252,7 +272,7 @@ public Collection<Object> createComponents(
LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings);
LocalClusterState.state().setClient(client);
ModulesBuilder modules = new ModulesBuilder();
modules.add(new OpenSearchPluginModule());
modules.add(new OpenSearchPluginModule(executionEngineExtensions));
modules.add(
b -> {
b.bind(NodeClient.class).toInstance((NodeClient) client);
Expand Down Expand Up @@ -287,12 +307,15 @@ public Collection<Object> createComponents(
ScheduledAsyncQueryJobRunner.getJobRunnerInstance()
.loadJobResource(client, clusterService, threadPool, asyncQueryExecutorService);

EngineExtensionsHolder extensionsHolder = new EngineExtensionsHolder(executionEngineExtensions);

return ImmutableList.of(
dataSourceService,
asyncQueryExecutorService,
clusterManagerEventListener,
pluginSettings,
directQueryExecutorService);
directQueryExecutorService,
extensionsHolder);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.plugin.config;

import java.util.List;
import org.opensearch.sql.executor.ExecutionEngine;

/**
* Holds execution engine engines loaded via SPI. Returned from {@code SQLPlugin.createComponents()}
* so that OpenSearch's Guice injector can inject it into transport actions like {@code
* TransportPPLQueryAction}.
*/
public record EngineExtensionsHolder(List<ExecutionEngine> engines) {
public EngineExtensionsHolder(List<ExecutionEngine> engines) {
this.engines = engines != null ? List.copyOf(engines) : List.of();
}
}
Loading
Loading