diff --git a/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java b/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java new file mode 100644 index 00000000000..67be7142463 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngine.java @@ -0,0 +1,110 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.analytics; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.opensearch.sql.calcite.CalcitePlanContext; +import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.data.model.ExprTupleValue; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.model.ExprValueUtils; +import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.executor.ExecutionContext; +import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.pagination.Cursor; +import org.opensearch.sql.planner.physical.PhysicalPlan; + +/** + * Execution engine adapter for the analytics engine (Project Mustang). + * + *

Bridges the analytics engine's {@link QueryPlanExecutor} with the SQL plugin's {@link + * ExecutionEngine} response pipeline. Takes a Calcite {@link RelNode}, delegates execution to the + * analytics engine, and converts the raw results into {@link QueryResponse}. + */ +public class AnalyticsExecutionEngine implements ExecutionEngine { + + private final QueryPlanExecutor planExecutor; + + public AnalyticsExecutionEngine(QueryPlanExecutor planExecutor) { + this.planExecutor = planExecutor; + } + + /** Not supported. Analytics queries use the RelNode path exclusively. */ + @Override + public void execute(PhysicalPlan plan, ResponseListener listener) { + listener.onFailure( + new UnsupportedOperationException("Analytics engine only supports RelNode execution")); + } + + /** Not supported. Analytics queries use the RelNode path exclusively. */ + @Override + public void execute( + PhysicalPlan plan, ExecutionContext context, ResponseListener listener) { + listener.onFailure( + new UnsupportedOperationException("Analytics engine only supports RelNode execution")); + } + + /** Not supported. Analytics queries use the RelNode path exclusively. */ + @Override + public void explain(PhysicalPlan plan, ResponseListener listener) { + listener.onFailure( + new UnsupportedOperationException("Analytics engine only supports RelNode execution")); + } + + @Override + public void execute( + RelNode plan, CalcitePlanContext context, ResponseListener listener) { + try { + Iterable rows = planExecutor.execute(plan, null); + + List fields = plan.getRowType().getFieldList(); + List results = convertRows(rows, fields); + Schema schema = buildSchema(fields); + + listener.onResponse(new QueryResponse(schema, results, Cursor.None)); + } catch (Exception e) { + listener.onFailure(e); + } + } + + private List convertRows(Iterable rows, List fields) { + List results = new ArrayList<>(); + for (Object[] row : rows) { + Map valueMap = new LinkedHashMap<>(); + for (int i = 0; i < fields.size(); i++) { + String columnName = fields.get(i).getName(); + Object value = (i < row.length) ? row[i] : null; + valueMap.put(columnName, ExprValueUtils.fromObjectValue(value)); + } + results.add(ExprTupleValue.fromExprValueMap(valueMap)); + } + return results; + } + + private Schema buildSchema(List fields) { + List columns = new ArrayList<>(); + for (RelDataTypeField field : fields) { + ExprType exprType = convertType(field.getType()); + columns.add(new Schema.Column(field.getName(), null, exprType)); + } + return new Schema(columns); + } + + private ExprType convertType(RelDataType type) { + try { + return OpenSearchTypeFactory.convertRelDataTypeToExprType(type); + } catch (IllegalArgumentException e) { + return org.opensearch.sql.data.type.ExprCoreType.UNKNOWN; + } + } +} diff --git a/core/src/main/java/org/opensearch/sql/executor/analytics/QueryPlanExecutor.java b/core/src/main/java/org/opensearch/sql/executor/analytics/QueryPlanExecutor.java new file mode 100644 index 00000000000..fd322ca432a --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/executor/analytics/QueryPlanExecutor.java @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.analytics; + +import org.apache.calcite.rel.RelNode; + +/** + * Executes a Calcite {@link RelNode} logical plan against the analytics engine. + * + *

This is a local equivalent of {@code org.opensearch.analytics.exec.QueryPlanExecutor} from the + * analytics-framework library. It will be replaced by the upstream interface once the + * analytics-framework JAR is published. + * + * @see Upstream + * QueryPlanExecutor + */ +@FunctionalInterface +public interface QueryPlanExecutor { + + /** + * Executes the given logical plan and returns result rows. + * + * @param plan the Calcite RelNode subtree to execute + * @param context execution context (opaque to avoid server dependency) + * @return rows produced by the engine + */ + Iterable execute(RelNode plan, Object context); +} diff --git a/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java b/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java new file mode 100644 index 00000000000..70ada76c54a --- /dev/null +++ b/core/src/test/java/org/opensearch/sql/executor/analytics/AnalyticsExecutionEngineTest.java @@ -0,0 +1,365 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.executor.analytics; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.sql.type.SqlTypeFactoryImpl; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.sql.calcite.CalcitePlanContext; +import org.opensearch.sql.calcite.SysLimit; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.data.type.ExprCoreType; +import org.opensearch.sql.executor.ExecutionEngine.ExplainResponse; +import org.opensearch.sql.executor.ExecutionEngine.QueryResponse; +import org.opensearch.sql.planner.physical.PhysicalPlan; + +class AnalyticsExecutionEngineTest { + + private AnalyticsExecutionEngine engine; + private QueryPlanExecutor mockExecutor; + private CalcitePlanContext mockContext; + + @BeforeEach + void setUp() throws Exception { + mockExecutor = mock(QueryPlanExecutor.class); + engine = new AnalyticsExecutionEngine(mockExecutor); + mockContext = mock(CalcitePlanContext.class); + setSysLimit(mockContext, SysLimit.DEFAULT); + } + + /** Sets the public final sysLimit field on a mocked CalcitePlanContext. */ + private static void setSysLimit(CalcitePlanContext context, SysLimit sysLimit) throws Exception { + Field field = CalcitePlanContext.class.getDeclaredField("sysLimit"); + field.setAccessible(true); + field.set(context, sysLimit); + } + + @Test + void executeRelNode_basicTypesAndRows() { + RelNode relNode = mockRelNode("name", SqlTypeName.VARCHAR, "age", SqlTypeName.INTEGER); + Iterable rows = Arrays.asList(new Object[] {"Alice", 30}, new Object[] {"Bob", 25}); + when(mockExecutor.execute(relNode, null)).thenReturn(rows); + + QueryResponse response = executeAndCapture(relNode); + String dump = dumpResponse(response); + + // Schema: 2 columns [name:STRING, age:INTEGER] + assertEquals(2, response.getSchema().getColumns().size(), "Column count. " + dump); + assertEquals("name", response.getSchema().getColumns().get(0).getName(), dump); + assertEquals(ExprCoreType.STRING, response.getSchema().getColumns().get(0).getExprType(), dump); + assertEquals("age", response.getSchema().getColumns().get(1).getName(), dump); + assertEquals( + ExprCoreType.INTEGER, response.getSchema().getColumns().get(1).getExprType(), dump); + + // Rows: [{name=Alice, age=30}, {name=Bob, age=25}] + assertEquals(2, response.getResults().size(), "Row count. " + dump); + assertEquals( + "Alice", response.getResults().get(0).tupleValue().get("name").value(), "Row 0. " + dump); + assertEquals( + 30, response.getResults().get(0).tupleValue().get("age").value(), "Row 0. " + dump); + assertEquals( + "Bob", response.getResults().get(1).tupleValue().get("name").value(), "Row 1. " + dump); + assertEquals( + 25, response.getResults().get(1).tupleValue().get("age").value(), "Row 1. " + dump); + + // Cursor: None + assertEquals(org.opensearch.sql.executor.pagination.Cursor.None, response.getCursor(), dump); + } + + @Test + void executeRelNode_numericTypes() { + RelNode relNode = + mockRelNode( + "b", SqlTypeName.TINYINT, + "s", SqlTypeName.SMALLINT, + "i", SqlTypeName.INTEGER, + "l", SqlTypeName.BIGINT, + "f", SqlTypeName.FLOAT, + "d", SqlTypeName.DOUBLE); + Iterable rows = + Collections.singletonList(new Object[] {(byte) 1, (short) 2, 3, 4L, 5.0f, 6.0}); + when(mockExecutor.execute(relNode, null)).thenReturn(rows); + + QueryResponse response = executeAndCapture(relNode); + String dump = dumpResponse(response); + + assertEquals(ExprCoreType.BYTE, response.getSchema().getColumns().get(0).getExprType(), dump); + assertEquals(ExprCoreType.SHORT, response.getSchema().getColumns().get(1).getExprType(), dump); + assertEquals( + ExprCoreType.INTEGER, response.getSchema().getColumns().get(2).getExprType(), dump); + assertEquals(ExprCoreType.LONG, response.getSchema().getColumns().get(3).getExprType(), dump); + assertEquals(ExprCoreType.FLOAT, response.getSchema().getColumns().get(4).getExprType(), dump); + assertEquals(ExprCoreType.DOUBLE, response.getSchema().getColumns().get(5).getExprType(), dump); + + // Verify actual values + assertEquals( + (byte) 1, + response.getResults().get(0).tupleValue().get("b").value(), + "byte value. " + dump); + assertEquals( + (short) 2, + response.getResults().get(0).tupleValue().get("s").value(), + "short value. " + dump); + assertEquals( + 3, response.getResults().get(0).tupleValue().get("i").value(), "int value. " + dump); + assertEquals( + 4L, response.getResults().get(0).tupleValue().get("l").value(), "long value. " + dump); + assertEquals( + 5.0f, response.getResults().get(0).tupleValue().get("f").value(), "float value. " + dump); + assertEquals( + 6.0, response.getResults().get(0).tupleValue().get("d").value(), "double value. " + dump); + } + + @Test + void executeRelNode_temporalTypes() { + RelNode relNode = + mockRelNode("dt", SqlTypeName.DATE, "tm", SqlTypeName.TIME, "ts", SqlTypeName.TIMESTAMP); + Iterable emptyRows = Collections.emptyList(); + when(mockExecutor.execute(relNode, null)).thenReturn(emptyRows); + + QueryResponse response = executeAndCapture(relNode); + String dump = dumpResponse(response); + + assertEquals(ExprCoreType.DATE, response.getSchema().getColumns().get(0).getExprType(), dump); + assertEquals(ExprCoreType.TIME, response.getSchema().getColumns().get(1).getExprType(), dump); + assertEquals( + ExprCoreType.TIMESTAMP, response.getSchema().getColumns().get(2).getExprType(), dump); + assertEquals(0, response.getResults().size(), "Should have 0 rows. " + dump); + } + + // Query size limit is now enforced in the RelNode plan (LogicalSystemLimit) before it reaches + // AnalyticsExecutionEngine. The engine trusts the executor to honor the limit. + + @Test + void executeRelNode_emptyResults() { + RelNode relNode = mockRelNode("name", SqlTypeName.VARCHAR); + Iterable emptyRows = Collections.emptyList(); + when(mockExecutor.execute(relNode, null)).thenReturn(emptyRows); + + QueryResponse response = executeAndCapture(relNode); + String dump = dumpResponse(response); + + assertEquals(1, response.getSchema().getColumns().size(), "Schema column count. " + dump); + assertEquals(0, response.getResults().size(), "Row count should be 0. " + dump); + } + + @Test + void executeRelNode_nullValues() { + RelNode relNode = mockRelNode("name", SqlTypeName.VARCHAR, "age", SqlTypeName.INTEGER); + Iterable rows = Collections.singletonList(new Object[] {null, null}); + when(mockExecutor.execute(relNode, null)).thenReturn(rows); + + QueryResponse response = executeAndCapture(relNode); + String dump = dumpResponse(response); + + assertEquals(1, response.getResults().size(), "Row count. " + dump); + assertTrue( + response.getResults().get(0).tupleValue().get("name").isNull(), + "name should be null. " + dump); + assertTrue( + response.getResults().get(0).tupleValue().get("age").isNull(), + "age should be null. " + dump); + } + + @Test + void executeRelNode_errorPropagation() { + RelNode relNode = mockRelNode("id", SqlTypeName.INTEGER); + when(mockExecutor.execute(relNode, null)).thenThrow(new RuntimeException("Engine failure")); + + Exception error = executeAndCaptureError(relNode); + System.out.println(dumpError("executeRelNode_errorPropagation", error)); + + assertEquals( + "Engine failure", + error.getMessage(), + "Exception type: " + error.getClass().getSimpleName() + ", message: " + error.getMessage()); + } + + @Test + void physicalPlanExecute_callsOnFailure() { + PhysicalPlan physicalPlan = mock(PhysicalPlan.class); + AtomicReference errorRef = new AtomicReference<>(); + engine.execute(physicalPlan, failureListener(errorRef)); + + assertNotNull(errorRef.get(), "onFailure should have been called"); + System.out.println(dumpError("physicalPlanExecute_callsOnFailure", errorRef.get())); + assertTrue( + errorRef.get() instanceof UnsupportedOperationException, + "Expected UnsupportedOperationException, got: " + + errorRef.get().getClass().getSimpleName() + + " - " + + errorRef.get().getMessage()); + } + + @Test + void physicalPlanExecuteWithContext_callsOnFailure() { + PhysicalPlan physicalPlan = mock(PhysicalPlan.class); + AtomicReference errorRef = new AtomicReference<>(); + engine.execute( + physicalPlan, + org.opensearch.sql.executor.ExecutionContext.emptyExecutionContext(), + failureListener(errorRef)); + + assertNotNull(errorRef.get(), "onFailure should have been called"); + System.out.println(dumpError("physicalPlanExecuteWithContext_callsOnFailure", errorRef.get())); + assertTrue( + errorRef.get() instanceof UnsupportedOperationException, + "Expected UnsupportedOperationException, got: " + + errorRef.get().getClass().getSimpleName() + + " - " + + errorRef.get().getMessage()); + } + + @Test + void physicalPlanExplain_callsOnFailure() { + PhysicalPlan physicalPlan = mock(PhysicalPlan.class); + AtomicReference errorRef = new AtomicReference<>(); + engine.explain(physicalPlan, explainFailureListener(errorRef)); + + assertNotNull(errorRef.get(), "onFailure should have been called"); + System.out.println(dumpError("physicalPlanExplain_callsOnFailure", errorRef.get())); + assertTrue( + errorRef.get() instanceof UnsupportedOperationException, + "Expected UnsupportedOperationException, got: " + + errorRef.get().getClass().getSimpleName() + + " - " + + errorRef.get().getMessage()); + } + + // --- helpers --- + + private QueryResponse executeAndCapture(RelNode relNode) { + AtomicReference ref = new AtomicReference<>(); + engine.execute(relNode, mockContext, captureListener(ref)); + assertNotNull(ref.get(), "QueryResponse should not be null"); + // Always print the full response so test output shows exact results + System.out.println(dumpResponse(ref.get())); + return ref.get(); + } + + private Exception executeAndCaptureError(RelNode relNode) { + AtomicReference ref = new AtomicReference<>(); + engine.execute( + relNode, + mockContext, + new ResponseListener() { + @Override + public void onResponse(QueryResponse response) {} + + @Override + public void onFailure(Exception e) { + ref.set(e); + } + }); + assertNotNull(ref.get(), "onFailure should have been called"); + return ref.get(); + } + + private ResponseListener failureListener(AtomicReference ref) { + return new ResponseListener() { + @Override + public void onResponse(QueryResponse response) {} + + @Override + public void onFailure(Exception e) { + ref.set(e); + } + }; + } + + private ResponseListener explainFailureListener(AtomicReference ref) { + return new ResponseListener() { + @Override + public void onResponse(ExplainResponse response) {} + + @Override + public void onFailure(Exception e) { + ref.set(e); + } + }; + } + + private String dumpError(String testName, Exception e) { + return "\n--- " + + testName + + " ---\n" + + "Exception: " + + e.getClass().getSimpleName() + + "\n" + + "Message: " + + e.getMessage() + + "\n--- End ---"; + } + + /** Dumps the full QueryResponse into a readable string for test output and assertion messages. */ + private String dumpResponse(QueryResponse response) { + StringBuilder sb = new StringBuilder(); + sb.append("\n--- QueryResponse ---\n"); + + sb.append("Schema: ["); + sb.append( + response.getSchema().getColumns().stream() + .map(c -> c.getName() + ":" + c.getExprType().typeName()) + .collect(Collectors.joining(", "))); + sb.append("]\n"); + + sb.append("Rows (").append(response.getResults().size()).append("):\n"); + for (int i = 0; i < response.getResults().size(); i++) { + sb.append(" [").append(i).append("] "); + sb.append(response.getResults().get(i).tupleValue()); + sb.append("\n"); + } + + sb.append("Cursor: ").append(response.getCursor()).append("\n"); + sb.append("--- End ---"); + return sb.toString(); + } + + private RelNode mockRelNode(Object... nameTypePairs) { + SqlTypeFactoryImpl typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + RelDataTypeFactory.Builder builder = typeFactory.builder(); + for (int i = 0; i < nameTypePairs.length; i += 2) { + String name = (String) nameTypePairs[i]; + SqlTypeName typeName = (SqlTypeName) nameTypePairs[i + 1]; + builder.add(name, typeName); + } + RelDataType rowType = builder.build(); + + RelNode relNode = mock(RelNode.class); + when(relNode.getRowType()).thenReturn(rowType); + return relNode; + } + + private ResponseListener captureListener(AtomicReference ref) { + return new ResponseListener() { + @Override + public void onResponse(QueryResponse response) { + ref.set(response); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError("Unexpected failure", e); + } + }; + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/AnalyticsPPLIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/AnalyticsPPLIT.java new file mode 100644 index 00000000000..e7788ae5c67 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/AnalyticsPPLIT.java @@ -0,0 +1,208 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl; + +import static org.opensearch.sql.legacy.TestUtils.getResponseBody; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT; +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.schema; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; +import static org.opensearch.sql.util.MatcherUtils.verifyNumOfRows; +import static org.opensearch.sql.util.MatcherUtils.verifySchema; + +import java.io.IOException; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.json.JSONObject; +import org.junit.Test; +import org.opensearch.client.ResponseException; + +/** + * Integration tests for PPL queries routed through the analytics engine path (Project Mustang). + * Queries targeting "parquet_*" indices are routed to {@code RestUnifiedQueryAction} which uses + * {@code AnalyticsExecutionEngine} with a stub {@code QueryPlanExecutor}. + * + *

These tests validate the full pipeline: REST request -> routing -> planning via + * UnifiedQueryPlanner -> execution via AnalyticsExecutionEngine -> response formatting. + * + *

The stub executor always returns the full table rows regardless of the logical plan. After + * projection (| fields), the execution engine maps row values by position -- so projected columns + * get the values from the corresponding positions in the full row, not the actual projected column. + * This is expected behavior for a stub; the real analytics engine will evaluate the plan correctly. + */ +public class AnalyticsPPLIT extends PPLIntegTestCase { + + private static final Logger LOG = LogManager.getLogger(AnalyticsPPLIT.class); + + @Override + protected void init() throws Exception { + // No index loading needed -- stub schema and data are hardcoded + // in RestUnifiedQueryAction and StubQueryPlanExecutor + } + + // --- Full table scan tests with schema + data verification --- + + @Test + public void testBasicQuerySchemaAndData() throws IOException { + String query = "source = opensearch.parquet_logs"; + JSONObject result = executeQuery(query); + LOG.info("[testBasicQuerySchemaAndData] query: {}\nresponse: {}", query, result.toString(2)); + + verifySchema( + result, + schema("ts", "timestamp"), + schema("status", "integer"), + schema("message", "keyword"), + schema("ip_addr", "keyword")); + verifyNumOfRows(result, 3); + verifyDataRows( + result, + rows("2024-01-15 10:30:00", 200, "Request completed", "192.168.1.1"), + rows("2024-01-15 10:31:00", 200, "Health check OK", "192.168.1.2"), + rows("2024-01-15 10:32:00", 500, "Internal server error", "192.168.1.3")); + } + + @Test + public void testParquetMetricsSchemaAndData() throws IOException { + String query = "source = opensearch.parquet_metrics"; + JSONObject result = executeQuery(query); + LOG.info( + "[testParquetMetricsSchemaAndData] query: {}\nresponse: {}", query, result.toString(2)); + + verifySchema( + result, + schema("ts", "timestamp"), + schema("cpu", "double"), + schema("memory", "double"), + schema("host", "keyword")); + verifyNumOfRows(result, 2); + verifyDataRows( + result, + rows("2024-01-15 10:30:00", 75.5, 8192.5, "host-1"), + rows("2024-01-15 10:31:00", 82.3, 7680.5, "host-2")); + } + + // --- Response format validation --- + + @Test + public void testResponseFormatHasRequiredFields() throws IOException { + String query = "source = opensearch.parquet_logs"; + JSONObject result = executeQuery(query); + LOG.info( + "[testResponseFormatHasRequiredFields] query: {}\nresponse: {}", query, result.toString(2)); + + String msg = "Full response: " + result.toString(2); + assertTrue("Response missing 'schema'. " + msg, result.has("schema")); + assertTrue("Response missing 'datarows'. " + msg, result.has("datarows")); + assertTrue("Response missing 'total'. " + msg, result.has("total")); + assertTrue("Response missing 'size'. " + msg, result.has("size")); + assertTrue("Response missing 'status'. " + msg, result.has("status")); + assertEquals( + "Expected status 200 but got " + result.getInt("status") + ". " + msg, + 200, + result.getInt("status")); + } + + @Test + public void testTotalAndSizeMatchRowCount() throws IOException { + String query = "source = opensearch.parquet_logs"; + JSONObject result = executeQuery(query); + LOG.info("[testTotalAndSizeMatchRowCount] query: {}\nresponse: {}", query, result.toString(2)); + + int rowCount = result.getJSONArray("datarows").length(); + assertEquals( + String.format( + "total should match row count. rows=%d, total=%d, size=%d. Response: %s", + rowCount, result.getInt("total"), result.getInt("size"), result.toString(2)), + rowCount, + result.getInt("total")); + assertEquals( + String.format( + "size should match row count. rows=%d, size=%d. Response: %s", + rowCount, result.getInt("size"), result.toString(2)), + rowCount, + result.getInt("size")); + } + + // --- Projection tests (schema verification -- stub doesn't evaluate projections) --- + + @Test + public void testFieldsProjectionChangesSchema() throws IOException { + String query = "source = opensearch.parquet_logs | fields ts, message"; + JSONObject result = executeQuery(query); + LOG.info( + "[testFieldsProjectionChangesSchema] query: {}\nresponse: {}", query, result.toString(2)); + + verifySchema(result, schema("ts", "timestamp"), schema("message", "keyword")); + verifyNumOfRows(result, 3); + } + + @Test + public void testSingleFieldProjection() throws IOException { + String query = "source = opensearch.parquet_logs | fields status"; + JSONObject result = executeQuery(query); + LOG.info("[testSingleFieldProjection] query: {}\nresponse: {}", query, result.toString(2)); + + verifySchema(result, schema("status", "integer")); + verifyNumOfRows(result, 3); + } + + // --- Error handling tests --- + + @Test + public void testSyntaxErrorReturnsClientError() throws IOException { + String query = "source = opensearch.parquet_logs | invalid_command"; + ResponseException e = assertThrows(ResponseException.class, () -> executeQuery(query)); + int statusCode = e.getResponse().getStatusLine().getStatusCode(); + String responseBody = getResponseBody(e.getResponse(), true); + LOG.info( + "[testSyntaxErrorReturnsClientError] query: {}\nstatus: {}\nresponse: {}", + query, + statusCode, + responseBody); + + assertTrue( + String.format( + "Syntax error should return 4xx, got %d. Response: %s", statusCode, responseBody), + statusCode >= 400 && statusCode < 500); + } + + // --- Regression tests --- + + @Test + public void testNonParquetQueryStillWorks() throws IOException { + loadIndex(Index.ACCOUNT); + String query = String.format("source=%s | head 1 | fields firstname", TEST_INDEX_ACCOUNT); + JSONObject result = executeQuery(query); + LOG.info("[testNonParquetQueryStillWorks] query: {}\nresponse: {}", query, result.toString(2)); + + assertNotNull("Non-parquet query returned null. Query: " + query, result); + assertTrue( + "Non-parquet query missing 'datarows'. Response: " + result.toString(2), + result.has("datarows")); + int rowCount = result.getJSONArray("datarows").length(); + assertTrue( + String.format( + "Non-parquet query returned 0 rows. Expected > 0. Response: %s", result.toString(2)), + rowCount > 0); + } + + @Test + public void testNonParquetAggregationStillWorks() throws IOException { + loadIndex(Index.ACCOUNT); + String query = String.format("source=%s | stats count()", TEST_INDEX_ACCOUNT); + JSONObject result = executeQuery(query); + LOG.info( + "[testNonParquetAggregationStillWorks] query: {}\nresponse: {}", query, result.toString(2)); + + int total = result.getInt("total"); + assertTrue( + String.format( + "Non-parquet aggregation returned total=%d, expected > 0. Response: %s", + total, result.toString(2)), + total > 0); + } +} diff --git a/plugin/build.gradle b/plugin/build.gradle index 340787fa01f..7b757759e13 100644 --- a/plugin/build.gradle +++ b/plugin/build.gradle @@ -160,6 +160,7 @@ dependencies { api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson_annotations}" api project(":ppl") + api project(':api') api project(':legacy') api project(':opensearch') api project(':prometheus') diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java new file mode 100644 index 00000000000..460ad390a55 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryAction.java @@ -0,0 +1,177 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.rest; + +import static org.opensearch.sql.lang.PPLLangSpec.PPL_SPEC; +import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME; +import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY; + +import java.util.Map; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.schema.impl.AbstractSchema; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.ThreadContext; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.action.ActionListener; +import org.opensearch.sql.api.UnifiedQueryContext; +import org.opensearch.sql.api.UnifiedQueryPlanner; +import org.opensearch.sql.calcite.CalcitePlanContext; +import org.opensearch.sql.calcite.plan.rel.LogicalSystemLimit; +import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.executor.ExecutionEngine.QueryResponse; +import org.opensearch.sql.executor.QueryType; +import org.opensearch.sql.executor.analytics.AnalyticsExecutionEngine; +import org.opensearch.sql.executor.analytics.QueryPlanExecutor; +import org.opensearch.sql.lang.LangSpec; +import org.opensearch.sql.plugin.rest.analytics.stub.StubIndexDetector; +import org.opensearch.sql.plugin.rest.analytics.stub.StubSchemaProvider; +import org.opensearch.sql.plugin.transport.TransportPPLQueryResponse; +import org.opensearch.sql.ppl.domain.PPLQueryRequest; +import org.opensearch.sql.protocol.response.QueryResult; +import org.opensearch.sql.protocol.response.format.JdbcResponseFormatter; +import org.opensearch.sql.protocol.response.format.ResponseFormatter; +import org.opensearch.transport.client.node.NodeClient; + +/** + * Handles queries routed to the Analytics engine via the unified query pipeline. Parses PPL queries + * using {@link UnifiedQueryPlanner} to generate a Calcite {@link RelNode}, then delegates to {@link + * AnalyticsExecutionEngine} for execution. + */ +public class RestUnifiedQueryAction { + + private static final Logger LOG = LogManager.getLogger(RestUnifiedQueryAction.class); + private static final String SCHEMA_NAME = "opensearch"; + + private final AnalyticsExecutionEngine analyticsEngine; + private final NodeClient client; + + public RestUnifiedQueryAction(NodeClient client, QueryPlanExecutor planExecutor) { + this.client = client; + this.analyticsEngine = new AnalyticsExecutionEngine(planExecutor); + } + + /** + * Check if the query targets an analytics engine index. Delegates to {@link StubIndexDetector} + * which will be replaced by UnifiedQueryParser and index settings when available. + */ + public static boolean isAnalyticsIndex(String query) { + return StubIndexDetector.isAnalyticsIndex(query); + } + + /** + * Execute a query through the unified query pipeline on the sql-worker thread pool. Called from + * {@code TransportPPLQueryAction} which handles PPL enabled check, metrics, request ID, and + * profiling cleanup. + * + * @param query the query string + * @param queryType SQL or PPL + * @param pplRequest the original PPL request + * @param listener the transport action listener + */ + public void execute( + String query, + QueryType queryType, + PPLQueryRequest pplRequest, + ActionListener listener) { + client + .threadPool() + .schedule( + withCurrentContext(() -> doExecute(query, queryType, pplRequest, listener)), + new TimeValue(0), + SQL_WORKER_THREAD_POOL_NAME); + } + + private void doExecute( + String query, + QueryType queryType, + PPLQueryRequest pplRequest, + ActionListener listener) { + try { + long startTime = System.nanoTime(); + AbstractSchema schema = StubSchemaProvider.buildSchema(); + + try (UnifiedQueryContext context = + UnifiedQueryContext.builder() + .language(queryType) + .catalog(SCHEMA_NAME, schema) + .defaultNamespace(SCHEMA_NAME) + .build()) { + + UnifiedQueryPlanner planner = new UnifiedQueryPlanner(context); + RelNode plan = planner.plan(query); + + // Add query size limit to the plan so the analytics engine can enforce it + // during execution, consistent with PPL V3 (see QueryService.convertToCalcitePlan) + CalcitePlanContext planContext = context.getPlanContext(); + plan = addQuerySizeLimit(plan, planContext); + + long planTime = System.nanoTime(); + LOG.info( + "[unified] Planning completed in {}ms for {} query", + (planTime - startTime) / 1_000_000, + queryType); + + analyticsEngine.execute( + plan, planContext, createQueryListener(queryType, planTime, listener)); + } + } catch (Exception e) { + listener.onFailure(e); + } + } + + /** + * Add a system-level query size limit to the plan. This ensures the analytics engine enforces the + * limit during execution rather than returning all rows for post-processing truncation. + */ + private static RelNode addQuerySizeLimit(RelNode plan, CalcitePlanContext context) { + return LogicalSystemLimit.create( + LogicalSystemLimit.SystemLimitType.QUERY_SIZE_LIMIT, + plan, + context.relBuilder.literal(context.sysLimit.querySizeLimit())); + } + + private ResponseListener createQueryListener( + QueryType queryType, + long planEndTime, + ActionListener transportListener) { + ResponseFormatter formatter = new JdbcResponseFormatter(PRETTY); + return new ResponseListener() { + @Override + public void onResponse(QueryResponse response) { + long execTime = System.nanoTime(); + LOG.info( + "[unified] Execution completed in {}ms, {} rows returned", + (execTime - planEndTime) / 1_000_000, + response.getResults().size()); + LangSpec langSpec = queryType == QueryType.PPL ? PPL_SPEC : LangSpec.SQL_SPEC; + String result = + formatter.format( + new QueryResult( + response.getSchema(), response.getResults(), response.getCursor(), langSpec)); + transportListener.onResponse(new TransportPPLQueryResponse(result)); + } + + @Override + public void onFailure(Exception e) { + transportListener.onFailure(e); + } + }; + } + + /** + * Capture current thread context and restore it on the worker thread. Ensures security context + * (user identity, permissions) is propagated. Same pattern as {@link + * org.opensearch.sql.opensearch.executor.OpenSearchQueryManager#withCurrentContext}. + */ + private static Runnable withCurrentContext(final Runnable task) { + final Map currentContext = ThreadContext.getImmutableContext(); + return () -> { + ThreadContext.putAll(currentContext); + task.run(); + }; + } +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/analytics/stub/StubIndexDetector.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/analytics/stub/StubIndexDetector.java new file mode 100644 index 00000000000..b809adfbc9b --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/analytics/stub/StubIndexDetector.java @@ -0,0 +1,59 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.rest.analytics.stub; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Temporary index detection logic for routing queries to the analytics engine. Uses a regex to + * extract the index name and a "parquet_" prefix convention to identify analytics indices. + * + *

Will be replaced by {@code UnifiedQueryParser} for index extraction and index settings (e.g., + * {@code index.storage_type}) for detection when available. + */ +public class StubIndexDetector { + + /** + * Pattern to extract index name from PPL source clause. Matches: source = index, source=index, + * source = `index`, source = catalog.index + */ + private static final Pattern SOURCE_PATTERN = + Pattern.compile( + "source\\s*=\\s*`?([a-zA-Z0-9_.*]+(?:\\.[a-zA-Z0-9_.*]+)*)`?", Pattern.CASE_INSENSITIVE); + + /** + * Check if the query targets an analytics engine index (e.g., Parquet-backed). Currently uses a + * prefix convention ("parquet_"). In production, this will check index settings such as + * index.storage_type. + */ + public static boolean isAnalyticsIndex(String query) { + if (query == null) { + return false; + } + String indexName = extractIndexName(query); + if (indexName == null) { + return false; + } + int lastDot = indexName.lastIndexOf('.'); + String tableName = lastDot >= 0 ? indexName.substring(lastDot + 1) : indexName; + return tableName.startsWith("parquet_"); + } + + /** + * Extract the source index name from a PPL query string. + * + * @param query the PPL query string + * @return the index name, or null if not found + */ + static String extractIndexName(String query) { + Matcher matcher = SOURCE_PATTERN.matcher(query); + if (matcher.find()) { + return matcher.group(1); + } + return null; + } +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/analytics/stub/StubQueryPlanExecutor.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/analytics/stub/StubQueryPlanExecutor.java new file mode 100644 index 00000000000..598d0e13699 --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/analytics/stub/StubQueryPlanExecutor.java @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.rest.analytics.stub; + +import java.time.Instant; +import java.util.List; +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.RelNode; +import org.opensearch.sql.executor.analytics.QueryPlanExecutor; + +/** + * Stub implementation of {@link QueryPlanExecutor} for development and testing. Returns canned data + * so the full pipeline (routing -> planning -> execution -> response formatting) can be validated + * without the analytics engine. + * + *

Will be replaced by the real analytics engine implementation when available. + */ +public class StubQueryPlanExecutor implements QueryPlanExecutor { + + @Override + public Iterable execute(RelNode plan, Object context) { + String tableName = extractTableName(plan); + if (tableName != null && tableName.contains("parquet_logs")) { + return List.of( + new Object[] { + Instant.parse("2024-01-15T10:30:00Z"), 200, "Request completed", "192.168.1.1" + }, + new Object[] { + Instant.parse("2024-01-15T10:31:00Z"), 200, "Health check OK", "192.168.1.2" + }, + new Object[] { + Instant.parse("2024-01-15T10:32:00Z"), 500, "Internal server error", "192.168.1.3" + }); + } + if (tableName != null && tableName.contains("parquet_metrics")) { + return List.of( + new Object[] {Instant.parse("2024-01-15T10:30:00Z"), 75.5, 8192.5, "host-1"}, + new Object[] {Instant.parse("2024-01-15T10:31:00Z"), 82.3, 7680.5, "host-2"}); + } + return List.of(); + } + + private String extractTableName(RelNode plan) { + String planStr = RelOptUtil.toString(plan); + if (planStr.contains("parquet_logs")) { + return "parquet_logs"; + } + if (planStr.contains("parquet_metrics")) { + return "parquet_metrics"; + } + return null; + } +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/rest/analytics/stub/StubSchemaProvider.java b/plugin/src/main/java/org/opensearch/sql/plugin/rest/analytics/stub/StubSchemaProvider.java new file mode 100644 index 00000000000..d45a97f861f --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/rest/analytics/stub/StubSchemaProvider.java @@ -0,0 +1,64 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.rest.analytics.stub; + +import java.util.Map; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.Table; +import org.apache.calcite.schema.impl.AbstractSchema; +import org.apache.calcite.schema.impl.AbstractTable; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * Stub schema provider for development and testing. Returns hardcoded Calcite table definitions + * with standard types. Will be replaced by {@code EngineContext.getSchema()} when the analytics + * engine is ready. + */ +public class StubSchemaProvider { + + /** Build a stub Calcite schema with hardcoded parquet tables. */ + public static AbstractSchema buildSchema() { + return new AbstractSchema() { + @Override + protected Map getTableMap() { + return Map.of( + "parquet_logs", buildLogsTable(), + "parquet_metrics", buildMetricsTable()); + } + }; + } + + private static Table buildLogsTable() { + return new AbstractTable() { + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return typeFactory + .builder() + .add("ts", SqlTypeName.TIMESTAMP) + .add("status", SqlTypeName.INTEGER) + .add("message", SqlTypeName.VARCHAR) + .add("ip_addr", SqlTypeName.VARCHAR) + .build(); + } + }; + } + + private static Table buildMetricsTable() { + return new AbstractTable() { + @Override + public RelDataType getRowType(RelDataTypeFactory typeFactory) { + return typeFactory + .builder() + .add("ts", SqlTypeName.TIMESTAMP) + .add("cpu", SqlTypeName.DOUBLE) + .add("memory", SqlTypeName.DOUBLE) + .add("host", SqlTypeName.VARCHAR) + .build(); + } + }; + } +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java index 48bc36374a8..db57e0d84d2 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java @@ -28,11 +28,14 @@ import org.opensearch.sql.datasource.DataSourceService; import org.opensearch.sql.datasources.service.DataSourceServiceImpl; import org.opensearch.sql.executor.ExecutionEngine; +import org.opensearch.sql.executor.QueryType; import org.opensearch.sql.legacy.metrics.MetricName; import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.monitor.profile.QueryProfiling; import org.opensearch.sql.opensearch.setting.OpenSearchSettings; import org.opensearch.sql.plugin.config.OpenSearchPluginModule; +import org.opensearch.sql.plugin.rest.RestUnifiedQueryAction; +import org.opensearch.sql.plugin.rest.analytics.stub.StubQueryPlanExecutor; import org.opensearch.sql.ppl.PPLService; import org.opensearch.sql.ppl.domain.PPLQueryRequest; import org.opensearch.sql.protocol.response.QueryResult; @@ -56,6 +59,8 @@ public class TransportPPLQueryAction private final Supplier pplEnabled; + private final RestUnifiedQueryAction unifiedQueryHandler; + /** Constructor of TransportPPLQueryAction. */ @Inject public TransportPPLQueryAction( @@ -77,6 +82,7 @@ public TransportPPLQueryAction( b.bind(DataSourceService.class).toInstance(dataSourceService); }); this.injector = Guice.createInjector(modules); + this.unifiedQueryHandler = new RestUnifiedQueryAction(client, new StubQueryPlanExecutor()); this.pplEnabled = () -> MULTI_ALLOW_EXPLICIT_INDEX.get(clusterSettings) @@ -114,12 +120,20 @@ protected void doExecute( QueryContext.addRequestId(); - PPLService pplService = injector.getInstance(PPLService.class); // in order to use PPL service, we need to convert TransportPPLQueryRequest to PPLQueryRequest PPLQueryRequest transformedRequest = transportRequest.toPPLQueryRequest(); QueryContext.setProfile(transformedRequest.profile()); ActionListener clearingListener = wrapWithProfilingClear(listener); + // Route to analytics engine for non-Lucene (e.g., Parquet-backed) indices + if (RestUnifiedQueryAction.isAnalyticsIndex(transformedRequest.getRequest())) { + unifiedQueryHandler.execute( + transformedRequest.getRequest(), QueryType.PPL, transformedRequest, clearingListener); + return; + } + + PPLService pplService = injector.getInstance(PPLService.class); + if (transformedRequest.isExplainRequest()) { pplService.explain( transformedRequest, createExplainResponseListener(transformedRequest, clearingListener)); diff --git a/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java b/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java new file mode 100644 index 00000000000..31d719acc0d --- /dev/null +++ b/plugin/src/test/java/org/opensearch/sql/plugin/rest/RestUnifiedQueryActionTest.java @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.plugin.rest; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +/** + * Tests for analytics index routing in RestUnifiedQueryAction. Index name extraction will be + * replaced by UnifiedQueryParser -- these tests focus on routing behavior only. + */ +public class RestUnifiedQueryActionTest { + + @Test + public void parquetIndexRoutesToAnalytics() { + assertTrue(RestUnifiedQueryAction.isAnalyticsIndex("source = parquet_logs | fields ts")); + assertTrue( + RestUnifiedQueryAction.isAnalyticsIndex("source = opensearch.parquet_logs | fields ts")); + } + + @Test + public void nonParquetIndexRoutesToLucene() { + assertFalse(RestUnifiedQueryAction.isAnalyticsIndex("source = my_logs | fields ts")); + assertFalse(RestUnifiedQueryAction.isAnalyticsIndex(null)); + assertFalse(RestUnifiedQueryAction.isAnalyticsIndex("")); + } +}