From 6914549f9fa76d30ab8115748d5f9d053bdeb931 Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Tue, 17 Mar 2026 17:10:10 -0700 Subject: [PATCH 1/4] Add PPL timewrap command for time-period comparison Implement the timewrap command that reshapes timechart output by wrapping each time period into a separate data series, enabling day-over-day, week-over-week, and other recurring interval comparisons. Signed-off-by: Jialiang Li Signed-off-by: Kai Huang --- .../sql/ast/AbstractNodeVisitor.java | 5 + .../org/opensearch/sql/ast/tree/Timewrap.java | 46 ++ .../sql/calcite/CalcitePlanContext.java | 9 + .../sql/calcite/CalciteRelNodeVisitor.java | 248 ++++++++ docs/category.json | 1 + docs/user/ppl/cmd/timewrap.md | 176 ++++++ doctest/test_data/timewrap_test.json | 24 + doctest/test_docs.py | 1 + doctest/test_mapping/timewrap_test.json | 19 + .../remote/CalciteTimewrapCommandIT.java | 592 ++++++++++++++++++ .../sql/legacy/SQLIntegTestCase.java | 7 +- .../src/test/resources/timewrap_test.json | 66 ++ .../executor/OpenSearchExecutionEngine.java | 107 ++++ ppl/src/main/antlr/OpenSearchPPLLexer.g4 | 2 + ppl/src/main/antlr/OpenSearchPPLParser.g4 | 11 + .../opensearch/sql/ppl/parser/AstBuilder.java | 19 + 16 files changed, 1332 insertions(+), 1 deletion(-) create mode 100644 core/src/main/java/org/opensearch/sql/ast/tree/Timewrap.java create mode 100644 docs/user/ppl/cmd/timewrap.md create mode 100644 doctest/test_data/timewrap_test.json create mode 100644 doctest/test_mapping/timewrap_test.json create mode 100644 integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteTimewrapCommandIT.java create mode 100644 integ-test/src/test/resources/timewrap_test.json diff --git a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java index 7f02bb3ef1b..a750716b817 100644 --- a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java @@ -91,6 +91,7 @@ import org.opensearch.sql.ast.tree.StreamWindow; import org.opensearch.sql.ast.tree.SubqueryAlias; import org.opensearch.sql.ast.tree.TableFunction; +import org.opensearch.sql.ast.tree.Timewrap; import org.opensearch.sql.ast.tree.Transpose; import org.opensearch.sql.ast.tree.Trendline; import org.opensearch.sql.ast.tree.Values; @@ -300,6 +301,10 @@ public T visitChart(Chart node, C context) { return visitChildren(node, context); } + public T visitTimewrap(Timewrap node, C context) { + return visitChildren(node, context); + } + public T visitRegex(Regex node, C context) { return visitChildren(node, context); } diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/Timewrap.java b/core/src/main/java/org/opensearch/sql/ast/tree/Timewrap.java new file mode 100644 index 00000000000..bb5a81f494e --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/ast/tree/Timewrap.java @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ast.tree; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.ToString; +import org.opensearch.sql.ast.AbstractNodeVisitor; +import org.opensearch.sql.ast.expression.Literal; +import org.opensearch.sql.ast.expression.SpanUnit; + +/** AST node representing the timewrap command. */ +@Getter +@ToString +@EqualsAndHashCode(callSuper = false) +@RequiredArgsConstructor +public class Timewrap extends UnresolvedPlan { + private final SpanUnit unit; + private final int value; + private final String align; // "end" or "now" + private final Literal spanLiteral; // original span literal for display + + private UnresolvedPlan child; + + @Override + public UnresolvedPlan attach(UnresolvedPlan child) { + this.child = child; + return this; + } + + @Override + public List getChild() { + return this.child == null ? ImmutableList.of() : ImmutableList.of(this.child); + } + + @Override + public T accept(AbstractNodeVisitor nodeVisitor, C context) { + return nodeVisitor.visitTimewrap(this, context); + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java index 6ad935e59da..3b91497ab8a 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java @@ -41,6 +41,15 @@ public class CalcitePlanContext { /** This thread local variable is only used to skip script encoding in script pushdown. */ public static final ThreadLocal skipEncoding = ThreadLocal.withInitial(() -> false); + /** When true, the execution engine strips all-null columns from the result (used by timewrap). */ + public static final ThreadLocal stripNullColumns = ThreadLocal.withInitial(() -> false); + + /** + * Timewrap span unit name for column renaming in the execution engine. When set, the execution + * engine uses __base_offset__ to compute absolute period names (e.g., "501days_before"). + */ + public static final ThreadLocal timewrapUnitName = new ThreadLocal<>(); + /** Thread-local switch that tells whether the current query prefers legacy behavior. */ private static final ThreadLocal legacyPreferredFlag = ThreadLocal.withInitial(() -> true); diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index ed68dfbcb1b..db8fbbd895f 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -29,6 +29,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import com.google.common.collect.Streams; +import java.math.BigDecimal; import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; @@ -86,8 +87,10 @@ import org.opensearch.sql.ast.expression.Alias; import org.opensearch.sql.ast.expression.AllFields; import org.opensearch.sql.ast.expression.AllFieldsExcludeMeta; +import org.opensearch.sql.ast.expression.And; import org.opensearch.sql.ast.expression.Argument; import org.opensearch.sql.ast.expression.Argument.ArgumentMap; +import org.opensearch.sql.ast.expression.Compare; import org.opensearch.sql.ast.expression.Field; import org.opensearch.sql.ast.expression.Function; import org.opensearch.sql.ast.expression.Let; @@ -150,6 +153,7 @@ import org.opensearch.sql.ast.tree.StreamWindow; import org.opensearch.sql.ast.tree.SubqueryAlias; import org.opensearch.sql.ast.tree.TableFunction; +import org.opensearch.sql.ast.tree.Timewrap; import org.opensearch.sql.ast.tree.Trendline; import org.opensearch.sql.ast.tree.Trendline.TrendlineType; import org.opensearch.sql.ast.tree.UnresolvedPlan; @@ -172,6 +176,7 @@ import org.opensearch.sql.exception.CalciteUnsupportedException; import org.opensearch.sql.exception.SemanticCheckException; import org.opensearch.sql.expression.function.BuiltinFunctionName; +import org.opensearch.sql.expression.function.PPLBuiltinOperators; import org.opensearch.sql.expression.function.PPLFuncImpTable; import org.opensearch.sql.expression.parse.RegexCommonUtils; import org.opensearch.sql.utils.ParseUtils; @@ -3091,6 +3096,249 @@ public RelNode visitChart(Chart node, CalcitePlanContext context) { return relBuilder.peek(); } + private static final int TIMEWRAP_MAX_PERIODS = 20; + + @Override + public RelNode visitTimewrap(Timewrap node, CalcitePlanContext context) { + visitChildren(node, context); + + // Signal the execution engine to strip all-null columns and rename with absolute offsets + CalcitePlanContext.stripNullColumns.set(true); + // Both align=now and align=end use _before suffix (matching Splunk behavior). + // align=end would use search end time as reference, but PPL has no search time range + // context, so both modes currently use query execution time. + CalcitePlanContext.timewrapUnitName.set( + timewrapUnitBaseName(node.getUnit(), node.getValue()) + "|_before"); + + RelBuilder b = context.relBuilder; + RexBuilder rx = context.rexBuilder; + + List fieldNames = + b.peek().getRowType().getFieldNames().stream().filter(f -> !isMetadataField(f)).toList(); + String tsFieldName = fieldNames.get(0); + List valueFieldNames = fieldNames.subList(1, fieldNames.size()); + + long spanSec = timewrapSpanToSeconds(node.getUnit(), node.getValue()); + RelDataType bigintType = rx.getTypeFactory().createSqlType(SqlTypeName.BIGINT); + + // Step 1: Convert timestamps to epoch seconds via UNIX_TIMESTAMP, add MAX OVER() + RexNode tsEpochExpr = + rx.makeCast( + bigintType, + rx.makeCall(PPLBuiltinOperators.UNIX_TIMESTAMP, b.field(tsFieldName)), + true); + b.projectPlus( + b.alias(tsEpochExpr, "__ts_epoch__"), + b.aggregateCall(SqlStdOperatorTable.MAX, tsEpochExpr).over().as("__max_epoch__")); + + // Step 2: Compute period_number and offset + RexNode tsEpoch = b.field("__ts_epoch__"); + RexNode maxEpoch = b.field("__max_epoch__"); + RexNode spanLit = rx.makeBigintLiteral(BigDecimal.valueOf(spanSec)); + + // period = (max_epoch - ts_epoch) / span_sec + 1 (integer division truncates) + RexNode diff = rx.makeCall(SqlStdOperatorTable.MINUS, maxEpoch, tsEpoch); + RexNode periodNum = + rx.makeCall( + SqlStdOperatorTable.PLUS, + rx.makeCall(SqlStdOperatorTable.DIVIDE, diff, spanLit), + rx.makeExactLiteral(BigDecimal.ONE, bigintType)); + + // offset_sec = ts_epoch MOD span_sec + // Convert back to actual timestamp: latest_period_start + offset + RexNode offsetSec = rx.makeCall(SqlStdOperatorTable.MOD, tsEpoch, spanLit); + RexNode latestPeriodStart = + rx.makeCall( + SqlStdOperatorTable.MINUS, + maxEpoch, + rx.makeCall(SqlStdOperatorTable.MOD, maxEpoch, spanLit)); + RexNode displayEpoch = rx.makeCall(SqlStdOperatorTable.PLUS, latestPeriodStart, offsetSec); + RexNode displayTimestamp = rx.makeCall(PPLBuiltinOperators.FROM_UNIXTIME, displayEpoch); + + // Compute base_offset for absolute period naming in execution engine. + // align=now: reference = current time + // align=end: reference = WHERE upper bound (search end time), fallback to now + RexNode baseOffset; + long nowEpochSec = context.functionProperties.getQueryStartClock().millis() / 1000; + Long referenceEpoch = null; + if ("end".equals(node.getAlign())) { + // Try to extract the upper bound from a WHERE clause on the timestamp field + referenceEpoch = extractTimestampUpperBound(node); + } + if (referenceEpoch == null) { + referenceEpoch = nowEpochSec; + } + RexNode refLit = rx.makeBigintLiteral(BigDecimal.valueOf(referenceEpoch)); + baseOffset = + rx.makeCall( + SqlStdOperatorTable.DIVIDE, + rx.makeCall(SqlStdOperatorTable.MINUS, refLit, maxEpoch), + spanLit); + + // Step 3: Project [display_timestamp, value_columns..., base_offset, period] + // base_offset is included in the group key so it survives the PIVOT + List projections = new ArrayList<>(); + projections.add(b.alias(displayTimestamp, tsFieldName)); + for (String vf : valueFieldNames) { + projections.add(b.field(vf)); + } + projections.add(b.alias(baseOffset, "__base_offset__")); + projections.add(b.alias(periodNum, "__period__")); + b.project(projections); + + // Step 4: PIVOT on period, grouped by [offset, __base_offset__] + // __base_offset__ is constant across all rows so it doesn't affect grouping, + // but survives the PIVOT so the execution engine can use it for absolute column naming + b.pivot( + b.groupKey(b.field(tsFieldName), b.field("__base_offset__")), + valueFieldNames.stream().map(f -> (RelBuilder.AggCall) b.max(b.field(f)).as("")).toList(), + ImmutableList.of(b.field("__period__")), + IntStream.rangeClosed(1, TIMEWRAP_MAX_PERIODS) + .map(i -> TIMEWRAP_MAX_PERIODS + 1 - i) // reverse: oldest period first + .mapToObj( + i -> + Map.entry( + // Use placeholder relative names; execution engine renames to absolute + String.valueOf(i), ImmutableList.of((RexNode) b.literal((long) i)))) + .collect(Collectors.toList())); + + // Step 5: Rename columns — add agg name prefix, clean up pivot artifacts + // Use relative period numbers as temporary names; the execution engine will compute + // absolute offsets using __base_offset__ and rename accordingly + List pivotColNames = b.peek().getRowType().getFieldNames(); + List cleanNames = new ArrayList<>(); + cleanNames.add(tsFieldName); + cleanNames.add("__base_offset__"); + for (int i = 2; i < pivotColNames.size(); i++) { + String name = pivotColNames.get(i); + if (name.endsWith("_")) { + name = name.substring(0, name.length() - 1); + } + // Prefix with agg name and _before suffix + if (valueFieldNames.size() == 1) { + name = valueFieldNames.get(0) + "_" + name + "_before"; + } + cleanNames.add(name); + } + b.rename(cleanNames); + + // Step 6: Sort by offset + b.sort(b.field(0)); + + return b.peek(); + } + + /** + * Convert a span unit and value to approximate seconds. Variable-length units use standard + * approximations: month=30 days, quarter=91 days, year=365 days. + */ + private long timewrapSpanToSeconds(SpanUnit unit, int value) { + return switch (unit.getName()) { + case "s" -> value; + case "m" -> value * 60L; + case "h" -> value * 3_600L; + case "d" -> value * 86_400L; + case "w" -> value * 7L * 86_400L; + case "M" -> value * 30L * 86_400L; // month ≈ 30 days + case "q" -> value * 91L * 86_400L; // quarter ≈ 91 days + case "y" -> value * 365L * 86_400L; // year ≈ 365 days + default -> + throw new SemanticCheckException("Unsupported time unit in timewrap: " + unit.getName()); + }; + } + + /** + * Get the timescale base name for timewrap column naming. Returns singular and plural forms + * separated by "|", e.g., "day|days". Used by the execution engine to build absolute period names + * like "501days_before". + */ + private String timewrapUnitBaseName(SpanUnit unit, int value) { + String singular = + switch (unit.getName()) { + case "s" -> "second"; + case "m" -> "minute"; + case "h" -> "hour"; + case "d" -> "day"; + case "w" -> "week"; + case "M" -> "month"; + case "q" -> "quarter"; + case "y" -> "year"; + default -> "period"; + }; + String plural = singular + "s"; + // Encode value so execution engine can compute totalUnits = (base_offset + period) * value + return value + "|" + singular + "|" + plural; + } + + /** + * Walk the AST from a Timewrap node to find a WHERE clause with an upper bound on the timestamp + * field (e.g., @timestamp <= '2024-07-03 18:00:00'). Returns the upper bound as epoch seconds, or + * null if not found. + */ + private Long extractTimestampUpperBound(Timewrap node) { + // Walk: Timewrap → Chart → Filter → inspect condition + Node current = node; + while (current != null && !current.getChild().isEmpty()) { + current = current.getChild().get(0); + if (current instanceof Filter filter) { + return findUpperBound(filter.getCondition()); + } + } + return null; + } + + /** Recursively search an expression tree for a timestamp upper bound (<=). */ + private Long findUpperBound(UnresolvedExpression expr) { + if (expr instanceof And) { + And and = (And) expr; + Long left = findUpperBound(and.getLeft()); + Long right = findUpperBound(and.getRight()); + // If both sides have upper bounds, use the smaller one (tighter bound) + if (left != null && right != null) return Math.min(left, right); + return left != null ? left : right; + } + if (expr instanceof Compare cmp) { + String op = cmp.getOperator(); + // Check for @timestamp <= X or @timestamp < X + if (("<=".equals(op) || "<".equals(op)) && isTimestampField(cmp.getLeft())) { + return parseTimestampLiteral(cmp.getRight()); + } + // Check for X >= @timestamp or X > @timestamp + if ((">=".equals(op) || ">".equals(op)) && isTimestampField(cmp.getRight())) { + return parseTimestampLiteral(cmp.getLeft()); + } + } + return null; + } + + private boolean isTimestampField(UnresolvedExpression expr) { + if (expr instanceof Field field) { + String name = field.getField().toString(); + return "@timestamp".equals(name) || "timestamp".equals(name); + } + return false; + } + + private Long parseTimestampLiteral(UnresolvedExpression expr) { + if (expr instanceof Literal lit && lit.getValue() instanceof String s) { + try { + // Parse "yyyy-MM-dd HH:mm:ss" format + java.time.LocalDateTime ldt = + java.time.LocalDateTime.parse( + s, java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); + return ldt.toEpochSecond(java.time.ZoneOffset.UTC); + } catch (Exception e) { + // Try ISO format + try { + return java.time.Instant.parse(s).getEpochSecond(); + } catch (Exception ignored) { + return null; + } + } + } + return null; + } + /** * Aggregate by column split then rank by grand total (summed value of each category). The output * is [col-split, grand-total, row-number] diff --git a/docs/category.json b/docs/category.json index 5e9b6f954a5..1ac8a82c927 100644 --- a/docs/category.json +++ b/docs/category.json @@ -45,6 +45,7 @@ "user/ppl/cmd/syntax.md", "user/ppl/cmd/chart.md", "user/ppl/cmd/timechart.md", + "user/ppl/cmd/timewrap.md", "user/ppl/cmd/top.md", "user/ppl/cmd/trendline.md", "user/ppl/cmd/transpose.md", diff --git a/docs/user/ppl/cmd/timewrap.md b/docs/user/ppl/cmd/timewrap.md new file mode 100644 index 00000000000..3b586c9226e --- /dev/null +++ b/docs/user/ppl/cmd/timewrap.md @@ -0,0 +1,176 @@ + +# timewrap + +The `timewrap` command reshapes `timechart` output by wrapping each time period into a separate data series. This enables side-by-side comparisons of the same metric across recurring time intervals, such as day-over-day or week-over-week analysis. + +## Syntax + +The `timewrap` command has the following syntax: + +```syntax +... | timechart ... | timewrap [align=end|now] +``` + +## Parameters + +The `timewrap` command supports the following parameters. + +| Parameter | Required/Optional | Description | +| --- | --- | --- | +| `` | Required | The wrapping interval, in the form `[int]`. If the integer is omitted, `1` is assumed. For example, `1day`, `2week`, or just `day`. For a complete list of supported time units, see [Time units](#time-units). | +| `align` | Optional | Controls the reference point for period alignment and column naming. Default is `end`. `end` aligns to the search end time (the upper bound of the `where` clause on `@timestamp`, or current time if no time filter). `now` always aligns to the current query execution time. | + +## Notes + +The following considerations apply when using the `timewrap` command: + +* The `timewrap` command must follow a `timechart` command. It is a post-processing command that reshapes timechart output. +* Column names follow the pattern `__before` for periods before the reference point, `_latest_` for the period containing the reference point, and `__after` for periods after the reference point. +* Column order is oldest first (leftmost) to newest (rightmost). +* Only columns with data are included in the output. Unused period columns are automatically removed. +* Incomplete periods (where data does not span the full wrap interval) show `null` for missing time offsets. +* Only `timechart` without the `BY` clause is currently supported. The `BY` clause is a future enhancement. + +### Time units + +The following time units are available for the `` parameter: + +* Seconds (`s`, `sec`, `second`, `secs`, `seconds`) +* Minutes (`m`, `min`, `minute`, `mins`, `minutes`) --- note: `m` means minutes, not months +* Hours (`h`, `hr`, `hour`, `hrs`, `hours`) +* Days (`d`, `day`, `days`) +* Weeks (`w`, `week`, `weeks`) + +Variable-length time units (`month`, `quarter`, `year`) are not yet supported. + +### Column naming + +Column names are constructed from the aggregation function name and an absolute time offset from the reference point: + +| Position relative to reference | Column name format | Example | +| --- | --- | --- | +| Before the reference point | `__before` | `sum(requests)_2days_before` | +| At the reference point | `_latest_` | `sum(requests)_latest_day` | +| After the reference point | `__after` | `sum(requests)_1day_after` | + +## Example 1: Day-over-day comparison + +The following query compares the sum of requests per 6-hour interval across 3 days: + +```ppl +source=timewrap_test +| where @timestamp >= '2024-07-01 00:00:00' and @timestamp <= '2024-07-03 18:00:00' +| timechart span=6h sum(requests) +| timewrap 1day +``` + +```text +fetched rows / total rows = 4/4 ++---------------------+----------------------------+---------------------------+--------------------------+ +| @timestamp | sum(requests)_2days_before | sum(requests)_1day_before | sum(requests)_latest_day | +|---------------------+----------------------------+---------------------------+--------------------------| +| 2024-07-03 00:00:00 | 180 | 205 | 165 | +| 2024-07-03 06:00:00 | 240 | 260 | 225 | +| 2024-07-03 12:00:00 | 310 | 330 | 285 | +| 2024-07-03 18:00:00 | 190 | 215 | 165 | ++---------------------+----------------------------+---------------------------+--------------------------+ +``` + +Each column represents one day of data. The `latest_day` column contains the most recent period (July 3). The `2days_before` column contains the oldest period (July 1). + +## Example 2: Comparing averages across 2 days + +The following query compares the average requests per 6-hour interval: + +```ppl +source=timewrap_test +| where @timestamp >= '2024-07-01 00:00:00' and @timestamp <= '2024-07-02 18:00:00' +| timechart span=6h avg(requests) +| timewrap 1day +``` + +```text +fetched rows / total rows = 4/4 ++---------------------+---------------------------+--------------------------+ +| @timestamp | avg(requests)_1day_before | avg(requests)_latest_day | +|---------------------+---------------------------+--------------------------| +| 2024-07-02 00:00:00 | 90.0 | 102.5 | +| 2024-07-02 06:00:00 | 120.0 | 130.0 | +| 2024-07-02 12:00:00 | 155.0 | 165.0 | +| 2024-07-02 18:00:00 | 95.0 | 107.5 | ++---------------------+---------------------------+--------------------------+ +``` + +## Example 3: Single day produces one period + +When all data fits within a single wrap interval, only one period column is produced: + +```ppl +source=timewrap_test +| where @timestamp >= '2024-07-01 00:00:00' and @timestamp <= '2024-07-01 18:00:00' +| timechart span=6h sum(requests) +| timewrap 1day +``` + +```text +fetched rows / total rows = 4/4 ++---------------------+--------------------------+ +| @timestamp | sum(requests)_latest_day | +|---------------------+--------------------------| +| 2024-07-01 00:00:00 | 180 | +| 2024-07-01 06:00:00 | 240 | +| 2024-07-01 12:00:00 | 310 | +| 2024-07-01 18:00:00 | 190 | ++---------------------+--------------------------+ +``` + +## Example 4: Count events day-over-day + +```ppl +source=timewrap_test +| where @timestamp >= '2024-07-01 00:00:00' and @timestamp <= '2024-07-03 18:00:00' +| timechart span=6h count() +| timewrap 1day +``` + +```text +fetched rows / total rows = 4/4 ++---------------------+----------------------+---------------------+--------------------+ +| @timestamp | count()_2days_before | count()_1day_before | count()_latest_day | +|---------------------+----------------------+---------------------+--------------------| +| 2024-07-03 00:00:00 | 2 | 2 | 2 | +| 2024-07-03 06:00:00 | 2 | 2 | 2 | +| 2024-07-03 12:00:00 | 2 | 2 | 2 | +| 2024-07-03 18:00:00 | 2 | 2 | 2 | ++---------------------+----------------------+---------------------+--------------------+ +``` + +## Example 5: Comparing errors across 2 days + +```ppl +source=timewrap_test +| where @timestamp >= '2024-07-02 00:00:00' and @timestamp <= '2024-07-03 18:00:00' +| timechart span=6h sum(errors) +| timewrap 1day +``` + +```text +fetched rows / total rows = 4/4 ++---------------------+-------------------------+------------------------+ +| @timestamp | sum(errors)_1day_before | sum(errors)_latest_day | +|---------------------+-------------------------+------------------------| +| 2024-07-03 00:00:00 | 4 | 1 | +| 2024-07-03 06:00:00 | 6 | 3 | +| 2024-07-03 12:00:00 | 9 | 6 | +| 2024-07-03 18:00:00 | 3 | 1 | ++---------------------+-------------------------+------------------------+ +``` + +## Limitations + +The `timewrap` command has the following limitations: + +* The `timewrap` command must follow a `timechart` command. Using it after any other command results in an error. +* Only `timechart` without the `BY` clause is supported. The `BY` clause (column split) is a future enhancement. +* Variable-length time units (`month`, `quarter`, `year`) are not yet supported. Use fixed-length units (`s`, `m`, `h`, `d`, `w`). +* The maximum number of period columns is 20. If the data contains more than 20 periods, only the 20 most recent are shown. diff --git a/doctest/test_data/timewrap_test.json b/doctest/test_data/timewrap_test.json new file mode 100644 index 00000000000..d38977f629f --- /dev/null +++ b/doctest/test_data/timewrap_test.json @@ -0,0 +1,24 @@ +{"@timestamp":"2024-07-01T00:00:00Z","host":"web-01","requests":100,"errors":2} +{"@timestamp":"2024-07-01T06:00:00Z","host":"web-01","requests":150,"errors":5} +{"@timestamp":"2024-07-01T12:00:00Z","host":"web-01","requests":200,"errors":3} +{"@timestamp":"2024-07-01T18:00:00Z","host":"web-01","requests":120,"errors":1} +{"@timestamp":"2024-07-01T00:00:00Z","host":"web-02","requests":80,"errors":0} +{"@timestamp":"2024-07-01T06:00:00Z","host":"web-02","requests":90,"errors":1} +{"@timestamp":"2024-07-01T12:00:00Z","host":"web-02","requests":110,"errors":2} +{"@timestamp":"2024-07-01T18:00:00Z","host":"web-02","requests":70,"errors":0} +{"@timestamp":"2024-07-02T00:00:00Z","host":"web-01","requests":110,"errors":3} +{"@timestamp":"2024-07-02T06:00:00Z","host":"web-01","requests":160,"errors":4} +{"@timestamp":"2024-07-02T12:00:00Z","host":"web-01","requests":210,"errors":6} +{"@timestamp":"2024-07-02T18:00:00Z","host":"web-01","requests":130,"errors":2} +{"@timestamp":"2024-07-02T00:00:00Z","host":"web-02","requests":95,"errors":1} +{"@timestamp":"2024-07-02T06:00:00Z","host":"web-02","requests":100,"errors":2} +{"@timestamp":"2024-07-02T12:00:00Z","host":"web-02","requests":120,"errors":3} +{"@timestamp":"2024-07-02T18:00:00Z","host":"web-02","requests":85,"errors":1} +{"@timestamp":"2024-07-03T00:00:00Z","host":"web-01","requests":90,"errors":1} +{"@timestamp":"2024-07-03T06:00:00Z","host":"web-01","requests":140,"errors":2} +{"@timestamp":"2024-07-03T12:00:00Z","host":"web-01","requests":180,"errors":4} +{"@timestamp":"2024-07-03T18:00:00Z","host":"web-01","requests":100,"errors":1} +{"@timestamp":"2024-07-03T00:00:00Z","host":"web-02","requests":75,"errors":0} +{"@timestamp":"2024-07-03T06:00:00Z","host":"web-02","requests":85,"errors":1} +{"@timestamp":"2024-07-03T12:00:00Z","host":"web-02","requests":105,"errors":2} +{"@timestamp":"2024-07-03T18:00:00Z","host":"web-02","requests":65,"errors":0} diff --git a/doctest/test_docs.py b/doctest/test_docs.py index 6283252065f..e179c85eb54 100644 --- a/doctest/test_docs.py +++ b/doctest/test_docs.py @@ -59,6 +59,7 @@ 'time_data2': 'time_test_data2.json', 'time_test': 'time_test.json', 'mvcombine_data': 'mvcombine.json', + 'timewrap_test': 'timewrap_test.json', } DEBUG_MODE = os.environ.get('DOCTEST_DEBUG', 'false').lower() == 'true' diff --git a/doctest/test_mapping/timewrap_test.json b/doctest/test_mapping/timewrap_test.json new file mode 100644 index 00000000000..8222188feb4 --- /dev/null +++ b/doctest/test_mapping/timewrap_test.json @@ -0,0 +1,19 @@ +{ + "mappings": { + "properties": { + "@timestamp": { + "type": "date", + "format": "strict_date_optional_time||epoch_millis" + }, + "host": { + "type": "keyword" + }, + "requests": { + "type": "integer" + }, + "errors": { + "type": "integer" + } + } + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteTimewrapCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteTimewrapCommandIT.java new file mode 100644 index 00000000000..75253fa858f --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteTimewrapCommandIT.java @@ -0,0 +1,592 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.remote; + +import static org.opensearch.sql.util.MatcherUtils.*; + +import java.io.IOException; +import org.json.JSONObject; +import org.junit.jupiter.api.Test; +import org.opensearch.sql.ppl.PPLIntegTestCase; + +public class CalciteTimewrapCommandIT extends PPLIntegTestCase { + + // Standard WHERE clause covering all test data — simulates frontend time picker + private static final String WHERE_ALL = + " | where @timestamp >= '2024-07-01 00:00:00' and @timestamp <= '2024-07-04 06:00:00'"; + private static final String WHERE_JUL1_TO_JUL3 = + " | where @timestamp >= '2024-07-01 00:00:00' and @timestamp <= '2024-07-03 18:00:00'"; + private static final String WHERE_JUL2_TO_JUL3 = + " | where @timestamp >= '2024-07-02 00:00:00' and @timestamp <= '2024-07-03 18:00:00'"; + private static final String WHERE_JUL1_ONLY = + " | where @timestamp >= '2024-07-01 00:00:00' and @timestamp <= '2024-07-01 18:00:00'"; + private static final String WHERE_JUL1_TO_JUL2 = + " | where @timestamp >= '2024-07-01 00:00:00' and @timestamp <= '2024-07-02 18:00:00'"; + + @Override + public void init() throws Exception { + super.init(); + enableCalcite(); + disallowCalciteFallback(); + loadIndex(Index.TIMEWRAP_TEST); + } + + // --- Day-over-day with different aggregations --- + + @Test + public void testTimewrapDayOverDayWithSum() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_ALL + + " | timechart span=6h sum(requests) | timewrap 1day"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(requests)_3days_before", "bigint"), + schema("sum(requests)_2days_before", "bigint"), + schema("sum(requests)_1day_before", "bigint"), + schema("sum(requests)_latest_day", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-04 00:00:00", 180, 205, 165, 80), + rows("2024-07-04 06:00:00", 240, 260, 225, 100), + rows("2024-07-04 12:00:00", null, 310, 330, 285), + rows("2024-07-04 18:00:00", null, 190, 215, 165)); + } + + @Test + public void testTimewrapDayOverDayWithAvg() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_ALL + + " | timechart span=6h avg(requests) | timewrap 1day"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("avg(requests)_3days_before", "double"), + schema("avg(requests)_2days_before", "double"), + schema("avg(requests)_1day_before", "double"), + schema("avg(requests)_latest_day", "double")); + verifyDataRowsInOrder( + result, + rows("2024-07-04 00:00:00", 90.0, 102.5, 82.5, 40.0), + rows("2024-07-04 06:00:00", 120.0, 130.0, 112.5, 50.0), + rows("2024-07-04 12:00:00", null, 155.0, 165.0, 142.5), + rows("2024-07-04 18:00:00", null, 95.0, 107.5, 82.5)); + } + + @Test + public void testTimewrapDayOverDayWithCount() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + WHERE_ALL + " | timechart span=6h count() | timewrap 1day"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("count()_3days_before", "bigint"), + schema("count()_2days_before", "bigint"), + schema("count()_1day_before", "bigint"), + schema("count()_latest_day", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-04 00:00:00", 2, 2, 2, 2), + rows("2024-07-04 06:00:00", 2, 2, 2, 2), + rows("2024-07-04 12:00:00", null, 2, 2, 2), + rows("2024-07-04 18:00:00", null, 2, 2, 2)); + } + + @Test + public void testTimewrapWithDifferentAggField() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_ALL + + " | timechart span=6h sum(errors) | timewrap 1day"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(errors)_3days_before", "bigint"), + schema("sum(errors)_2days_before", "bigint"), + schema("sum(errors)_1day_before", "bigint"), + schema("sum(errors)_latest_day", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-04 00:00:00", 2, 4, 1, 0), + rows("2024-07-04 06:00:00", 6, 6, 3, 1), + rows("2024-07-04 12:00:00", null, 5, 9, 6), + rows("2024-07-04 18:00:00", null, 1, 3, 1)); + } + + // --- Incomplete period / null fill --- + + @Test + public void testTimewrapIncompletePeriodNullFill() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_ALL + + " | timechart span=6h sum(requests) | timewrap 1day"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(requests)_3days_before", "bigint"), + schema("sum(requests)_2days_before", "bigint"), + schema("sum(requests)_1day_before", "bigint"), + schema("sum(requests)_latest_day", "bigint")); + verifyDataRowsSome(result, rows("2024-07-04 12:00:00", null, 310, 330, 285)); + verifyDataRowsSome(result, rows("2024-07-04 18:00:00", null, 190, 215, 165)); + } + + // --- Different timescales --- + + @Test + public void testTimewrapWeekSpanSinglePeriod() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_ALL + + " | timechart span=6h sum(requests) | timewrap 1week"); + + verifySchema( + result, schema("@timestamp", "timestamp"), schema("sum(requests)_latest_week", "bigint")); + verifyDataRowsSome(result, rows("2024-07-04 00:00:00", 80), rows("2024-07-04 06:00:00", 100)); + } + + @Test + public void testTimewrapTwelveHourSpan() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_ALL + + " | timechart span=6h sum(requests) | timewrap 12h"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(requests)_72hours_before", "bigint"), + schema("sum(requests)_60hours_before", "bigint"), + schema("sum(requests)_48hours_before", "bigint"), + schema("sum(requests)_36hours_before", "bigint"), + schema("sum(requests)_24hours_before", "bigint"), + schema("sum(requests)_12hours_before", "bigint"), + schema("sum(requests)_latest_hour", "bigint")); + verifyDataRowsSome(result, rows("2024-07-04 00:00:00", 180, 310, 205, 330, 165, 285, 80)); + } + + @Test + public void testTimewrapWithMinuteSpan() throws IOException { + loadIndex(Index.EVENTS); + JSONObject result = + executeQuery( + "source=events | where @timestamp >= '2024-07-01 00:00:00' and @timestamp <=" + + " '2024-07-01 00:04:00' | timechart span=1m count() | timewrap 1min"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("count()_4minutes_before", "bigint"), + schema("count()_3minutes_before", "bigint"), + schema("count()_2minutes_before", "bigint"), + schema("count()_1minute_before", "bigint"), + schema("count()_latest_minute", "bigint")); + verifyDataRows(result, rows("2024-07-01 00:04:00", 1, 1, 1, 1, 1)); + } + + // --- WHERE clause with different time ranges --- + + @Test + public void testTimewrapWithWhereThreeDays() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL1_TO_JUL3 + + " | timechart span=6h sum(requests) | timewrap 1day"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(requests)_2days_before", "bigint"), + schema("sum(requests)_1day_before", "bigint"), + schema("sum(requests)_latest_day", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-03 00:00:00", 180, 205, 165), + rows("2024-07-03 06:00:00", 240, 260, 225), + rows("2024-07-03 12:00:00", 310, 330, 285), + rows("2024-07-03 18:00:00", 190, 215, 165)); + } + + @Test + public void testTimewrapWithWhereTwoDays() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL2_TO_JUL3 + + " | timechart span=6h sum(requests) | timewrap 1day"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(requests)_1day_before", "bigint"), + schema("sum(requests)_latest_day", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-03 00:00:00", 205, 165), + rows("2024-07-03 06:00:00", 260, 225), + rows("2024-07-03 12:00:00", 330, 285), + rows("2024-07-03 18:00:00", 215, 165)); + } + + @Test + public void testTimewrapWithWhereSingleDay() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL1_ONLY + + " | timechart span=6h sum(requests) | timewrap 1day"); + + verifySchema( + result, schema("@timestamp", "timestamp"), schema("sum(requests)_latest_day", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-01 00:00:00", 180), + rows("2024-07-01 06:00:00", 240), + rows("2024-07-01 12:00:00", 310), + rows("2024-07-01 18:00:00", 190)); + } + + @Test + public void testTimewrapWithWhereAvg() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL1_TO_JUL2 + + " | timechart span=6h avg(requests) | timewrap 1day"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("avg(requests)_1day_before", "double"), + schema("avg(requests)_latest_day", "double")); + verifyDataRowsInOrder( + result, + rows("2024-07-02 00:00:00", 90.0, 102.5), + rows("2024-07-02 06:00:00", 120.0, 130.0), + rows("2024-07-02 12:00:00", 155.0, 165.0), + rows("2024-07-02 18:00:00", 95.0, 107.5)); + } + + @Test + public void testTimewrapWithWhere12hSpan() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL1_TO_JUL2 + + " | timechart span=6h sum(requests) | timewrap 12h"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(requests)_36hours_before", "bigint"), + schema("sum(requests)_24hours_before", "bigint"), + schema("sum(requests)_12hours_before", "bigint"), + schema("sum(requests)_latest_hour", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-02 12:00:00", 180, 310, 205, 330), + rows("2024-07-02 18:00:00", 240, 190, 260, 215)); + } + + @Test + public void testTimewrapWithWhereCount() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL1_TO_JUL3 + + " | timechart span=6h count() | timewrap 1day"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("count()_2days_before", "bigint"), + schema("count()_1day_before", "bigint"), + schema("count()_latest_day", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-03 00:00:00", 2, 2, 2), + rows("2024-07-03 06:00:00", 2, 2, 2), + rows("2024-07-03 12:00:00", 2, 2, 2), + rows("2024-07-03 18:00:00", 2, 2, 2)); + } + + @Test + public void testTimewrapWithWhereErrors() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL2_TO_JUL3 + + " | timechart span=6h sum(errors) | timewrap 1day"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(errors)_1day_before", "bigint"), + schema("sum(errors)_latest_day", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-03 00:00:00", 4, 1), + rows("2024-07-03 06:00:00", 6, 3), + rows("2024-07-03 12:00:00", 9, 6), + rows("2024-07-03 18:00:00", 3, 1)); + } + + // --- WHERE upper bound above data: shifts column numbers --- + + @Test + public void testTimewrapWithWhereUpperBoundAboveData() throws IOException { + // WHERE upper bound = July 10 (~5.75 days after max data July 4 06:00) + // baseOffset = floor(Jul10/86400) - floor(Jul4_06/86400) = 5 + // periodFromNow for oldest(rel=4): (5+4-1)*1=8, newest(rel=1): (5+1-1)*1=5 + JSONObject result = + executeQuery( + "source=timewrap_test | where @timestamp >= '2024-07-01 00:00:00' and @timestamp <=" + + " '2024-07-10 00:00:00' | timechart span=6h sum(requests) | timewrap 1day"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(requests)_8days_before", "bigint"), + schema("sum(requests)_7days_before", "bigint"), + schema("sum(requests)_6days_before", "bigint"), + schema("sum(requests)_5days_before", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-04 00:00:00", 180, 205, 165, 80), + rows("2024-07-04 06:00:00", 240, 260, 225, 100), + rows("2024-07-04 12:00:00", null, 310, 330, 285), + rows("2024-07-04 18:00:00", null, 190, 215, 165)); + } + + // --- align=end vs align=now --- + + @Test + public void testTimewrapAlignEndIsDefault() throws IOException { + JSONObject resultDefault = + executeQuery( + "source=timewrap_test" + + WHERE_ALL + + " | timechart span=6h sum(requests) | timewrap 1day"); + JSONObject resultEnd = + executeQuery( + "source=timewrap_test" + + WHERE_ALL + + " | timechart span=6h sum(requests) | timewrap 1day align=end"); + + verifySchema( + resultEnd, + schema("@timestamp", "timestamp"), + schema("sum(requests)_3days_before", "bigint"), + schema("sum(requests)_2days_before", "bigint"), + schema("sum(requests)_1day_before", "bigint"), + schema("sum(requests)_latest_day", "bigint")); + verifyDataRowsInOrder( + resultEnd, + rows("2024-07-04 00:00:00", 180, 205, 165, 80), + rows("2024-07-04 06:00:00", 240, 260, 225, 100), + rows("2024-07-04 12:00:00", null, 310, 330, 285), + rows("2024-07-04 18:00:00", null, 190, 215, 165)); + verifyDataRowsInOrder( + resultDefault, + rows("2024-07-04 00:00:00", 180, 205, 165, 80), + rows("2024-07-04 06:00:00", 240, 260, 225, 100), + rows("2024-07-04 12:00:00", null, 310, 330, 285), + rows("2024-07-04 18:00:00", null, 190, 215, 165)); + } + + @Test + public void testTimewrapAlignNow() throws IOException { + // align=now uses current time — column names are dynamic + // Extract actual column names from the result for verification + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_ALL + + " | timechart span=6h sum(requests) | timewrap 1day align=now"); + + // Get actual column names from result schema + org.json.JSONArray schemaArr = result.getJSONArray("schema"); + String c1 = schemaArr.getJSONObject(1).getString("name"); + String c2 = schemaArr.getJSONObject(2).getString("name"); + String c3 = schemaArr.getJSONObject(3).getString("name"); + String c4 = schemaArr.getJSONObject(4).getString("name"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema(c1, "bigint"), + schema(c2, "bigint"), + schema(c3, "bigint"), + schema(c4, "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-04 00:00:00", 180, 205, 165, 80), + rows("2024-07-04 06:00:00", 240, 260, 225, 100), + rows("2024-07-04 12:00:00", null, 310, 330, 285), + rows("2024-07-04 18:00:00", null, 190, 215, 165)); + } + + // --- Every timescale --- + + @Test + public void testTimewrapSecondSpan() throws IOException { + // 5 events at minute-level, wrap by 1 minute (60sec) + // timechart span=1m gives 3 buckets (00:00, 01:00, 02:00) + // timewrap 1min: each bucket is in a different 1-minute period → 1 offset row, 3 periods + loadIndex(Index.EVENTS); + JSONObject result = + executeQuery( + "source=events | where @timestamp >= '2024-07-01 00:00:00' and @timestamp <=" + + " '2024-07-01 00:02:00' | timechart span=1m count() | timewrap 1min"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("count()_2minutes_before", "bigint"), + schema("count()_1minute_before", "bigint"), + schema("count()_latest_minute", "bigint")); + verifyDataRows(result, rows("2024-07-01 00:02:00", 1, 1, 1)); + } + + @Test + public void testTimewrapMinuteSpan() throws IOException { + loadIndex(Index.EVENTS); + JSONObject result = + executeQuery( + "source=events | where @timestamp >= '2024-07-01 00:00:00' and @timestamp <=" + + " '2024-07-01 00:04:00' | timechart span=1m count() | timewrap 1min"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("count()_4minutes_before", "bigint"), + schema("count()_3minutes_before", "bigint"), + schema("count()_2minutes_before", "bigint"), + schema("count()_1minute_before", "bigint"), + schema("count()_latest_minute", "bigint")); + verifyDataRows(result, rows("2024-07-01 00:04:00", 1, 1, 1, 1, 1)); + } + + @Test + public void testTimewrapHourSpan() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL1_TO_JUL2 + + " | timechart span=6h sum(requests) | timewrap 12h"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(requests)_36hours_before", "bigint"), + schema("sum(requests)_24hours_before", "bigint"), + schema("sum(requests)_12hours_before", "bigint"), + schema("sum(requests)_latest_hour", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-02 12:00:00", 180, 310, 205, 330), + rows("2024-07-02 18:00:00", 240, 190, 260, 215)); + } + + @Test + public void testTimewrapDaySpan() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL1_TO_JUL3 + + " | timechart span=6h sum(requests) | timewrap 1day"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(requests)_2days_before", "bigint"), + schema("sum(requests)_1day_before", "bigint"), + schema("sum(requests)_latest_day", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-03 00:00:00", 180, 205, 165), + rows("2024-07-03 06:00:00", 240, 260, 225), + rows("2024-07-03 12:00:00", 310, 330, 285), + rows("2024-07-03 18:00:00", 190, 215, 165)); + } + + @Test + public void testTimewrapWeekSpan() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_ALL + + " | timechart span=6h sum(requests) | timewrap 1week"); + + verifySchema( + result, schema("@timestamp", "timestamp"), schema("sum(requests)_latest_week", "bigint")); + verifyDataRowsSome(result, rows("2024-07-04 00:00:00", 80), rows("2024-07-04 06:00:00", 100)); + } + + @Test + public void testTimewrapMonthSpan() throws IOException { + // Jun 15 to Jul 4: all data within ~19 days → single 30-day month period + JSONObject result = + executeQuery( + "source=timewrap_test | where @timestamp >= '2024-06-15 00:00:00' and @timestamp <=" + + " '2024-07-04 06:00:00' | timechart span=1day sum(requests) | timewrap 1month"); + + verifySchema( + result, schema("@timestamp", "timestamp"), schema("sum(requests)_latest_month", "bigint")); + // Display timestamps anchored to latest period — Jun 15 maps to Jul 15 offset + verifyDataRowsSome(result, rows("2024-07-15 00:00:00", 200), rows("2024-07-01 00:00:00", 920)); + } + + @Test + public void testTimewrapQuarterSpan() throws IOException { + // Jan 15 to Apr 15 = ~91 days → 2 quarter periods (Jan in one, Apr in another) + JSONObject result = + executeQuery( + "source=timewrap_test | where @timestamp >= '2024-01-15 00:00:00' and @timestamp <=" + + " '2024-04-15 12:00:00' | timechart span=1day sum(requests) | timewrap 1quarter"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(requests)_1quarter_before", "bigint"), + schema("sum(requests)_latest_quarter", "bigint")); + verifyDataRowsSome(result, rows("2024-04-15 00:00:00", 300, 350)); + } + + @Test + public void testTimewrapYearSpan() throws IOException { + // Jan 2024 to Jan 2025 = ~365 days → 2 year periods + // Period 1 (1year_before): Jan 2024 data (300) + // Period 2 (latest_year): everything else + JSONObject result = + executeQuery( + "source=timewrap_test | where @timestamp >= '2024-01-15 00:00:00' and @timestamp <=" + + " '2025-01-15 12:00:00' | timechart span=1day sum(requests) | timewrap 1year"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(requests)_1year_before", "bigint"), + schema("sum(requests)_latest_year", "bigint")); + verifyDataRowsSome(result, rows("2025-01-15 00:00:00", null, 400)); + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java index 099b2f7e0cb..a3070599cf8 100644 --- a/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java +++ b/integ-test/src/test/java/org/opensearch/sql/legacy/SQLIntegTestCase.java @@ -958,7 +958,12 @@ public enum Index { "events_traffic", "events_traffic", getMappingFile("events_traffic_index_mapping.json"), - "src/test/resources/events_traffic.json"); + "src/test/resources/events_traffic.json"), + TIMEWRAP_TEST( + "timewrap_test", + "timewrap_test", + "{\"mappings\":{\"properties\":{\"@timestamp\":{\"type\":\"date\"},\"host\":{\"type\":\"keyword\"},\"requests\":{\"type\":\"integer\"},\"errors\":{\"type\":\"integer\"}}}}", + "src/test/resources/timewrap_test.json"); private final String name; private final String type; diff --git a/integ-test/src/test/resources/timewrap_test.json b/integ-test/src/test/resources/timewrap_test.json new file mode 100644 index 00000000000..5ba82f8fabf --- /dev/null +++ b/integ-test/src/test/resources/timewrap_test.json @@ -0,0 +1,66 @@ +{"index":{"_id":"1"}} +{"@timestamp":"2024-07-01T00:00:00","host":"web-01","requests":100,"errors":2} +{"index":{"_id":"2"}} +{"@timestamp":"2024-07-01T06:00:00","host":"web-01","requests":150,"errors":5} +{"index":{"_id":"3"}} +{"@timestamp":"2024-07-01T12:00:00","host":"web-01","requests":200,"errors":3} +{"index":{"_id":"4"}} +{"@timestamp":"2024-07-01T18:00:00","host":"web-01","requests":120,"errors":1} +{"index":{"_id":"5"}} +{"@timestamp":"2024-07-01T00:00:00","host":"web-02","requests":80,"errors":0} +{"index":{"_id":"6"}} +{"@timestamp":"2024-07-01T06:00:00","host":"web-02","requests":90,"errors":1} +{"index":{"_id":"7"}} +{"@timestamp":"2024-07-01T12:00:00","host":"web-02","requests":110,"errors":2} +{"index":{"_id":"8"}} +{"@timestamp":"2024-07-01T18:00:00","host":"web-02","requests":70,"errors":0} +{"index":{"_id":"9"}} +{"@timestamp":"2024-07-02T00:00:00","host":"web-01","requests":110,"errors":3} +{"index":{"_id":"10"}} +{"@timestamp":"2024-07-02T06:00:00","host":"web-01","requests":160,"errors":4} +{"index":{"_id":"11"}} +{"@timestamp":"2024-07-02T12:00:00","host":"web-01","requests":210,"errors":6} +{"index":{"_id":"12"}} +{"@timestamp":"2024-07-02T18:00:00","host":"web-01","requests":130,"errors":2} +{"index":{"_id":"13"}} +{"@timestamp":"2024-07-02T00:00:00","host":"web-02","requests":95,"errors":1} +{"index":{"_id":"14"}} +{"@timestamp":"2024-07-02T06:00:00","host":"web-02","requests":100,"errors":2} +{"index":{"_id":"15"}} +{"@timestamp":"2024-07-02T12:00:00","host":"web-02","requests":120,"errors":3} +{"index":{"_id":"16"}} +{"@timestamp":"2024-07-02T18:00:00","host":"web-02","requests":85,"errors":1} +{"index":{"_id":"17"}} +{"@timestamp":"2024-07-03T00:00:00","host":"web-01","requests":90,"errors":1} +{"index":{"_id":"18"}} +{"@timestamp":"2024-07-03T06:00:00","host":"web-01","requests":140,"errors":2} +{"index":{"_id":"19"}} +{"@timestamp":"2024-07-03T12:00:00","host":"web-01","requests":180,"errors":4} +{"index":{"_id":"20"}} +{"@timestamp":"2024-07-03T18:00:00","host":"web-01","requests":100,"errors":1} +{"index":{"_id":"21"}} +{"@timestamp":"2024-07-03T00:00:00","host":"web-02","requests":75,"errors":0} +{"index":{"_id":"22"}} +{"@timestamp":"2024-07-03T06:00:00","host":"web-02","requests":85,"errors":1} +{"index":{"_id":"23"}} +{"@timestamp":"2024-07-03T12:00:00","host":"web-02","requests":105,"errors":2} +{"index":{"_id":"24"}} +{"@timestamp":"2024-07-03T18:00:00","host":"web-02","requests":65,"errors":0} +{"index":{"_id":"25"}} +{"@timestamp":"2024-07-04T00:00:00","host":"web-01","requests":50,"errors":0} +{"index":{"_id":"26"}} +{"@timestamp":"2024-07-04T06:00:00","host":"web-01","requests":60,"errors":1} +{"index":{"_id":"27"}} +{"@timestamp":"2024-07-04T00:00:00","host":"web-02","requests":30,"errors":0} +{"index":{"_id":"28"}} +{"@timestamp":"2024-07-04T06:00:00","host":"web-02","requests":40,"errors":0} +{"index":{"_id":"29"}} +{"@timestamp":"2024-01-15T12:00:00","host":"web-01","requests":300,"errors":5} +{"index":{"_id":"30"}} +{"@timestamp":"2024-04-15T12:00:00","host":"web-01","requests":350,"errors":3} +{"index":{"_id":"31"}} +{"@timestamp":"2024-06-15T12:00:00","host":"web-01","requests":200,"errors":2} +{"index":{"_id":"32"}} +{"@timestamp":"2024-08-15T12:00:00","host":"web-01","requests":250,"errors":4} +{"index":{"_id":"33"}} +{"@timestamp":"2025-01-15T12:00:00","host":"web-01","requests":400,"errors":6} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java index 32b7891d344..5bffb2eae06 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java @@ -13,10 +13,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -320,11 +322,116 @@ private QueryResponse buildResultSet( } columns.add(new Column(columnName, null, exprType)); } + // Timewrap post-processing: strip null columns, remove __base_offset__, rename to absolute + if (CalcitePlanContext.stripNullColumns.get()) { + try { + // 1. Find all-null columns and always strip __base_offset__ + Set dropColumns = new HashSet<>(); + dropColumns.add("__base_offset__"); + if (!values.isEmpty()) { + Set allNullCandidates = new HashSet<>(); + for (Column col : columns) { + allNullCandidates.add(col.getName()); + } + for (ExprValue row : values) { + java.util.Map tupleValues = row.tupleValue(); + allNullCandidates.removeIf( + colName -> { + ExprValue v = tupleValues.get(colName); + return v != null && !v.isNull(); + }); + if (allNullCandidates.isEmpty()) break; + } + dropColumns.addAll(allNullCandidates); + } + + // 2. Read __base_offset__ for absolute naming + long baseOffset = 0; + if (!values.isEmpty()) { + ExprValue boVal = values.getFirst().tupleValue().get("__base_offset__"); + if (boVal != null && !boVal.isNull()) { + baseOffset = boVal.longValue(); + } + } + + // 3. Build new column list: drop nulls, rename periods to absolute offsets + String unitInfo = CalcitePlanContext.timewrapUnitName.get(); + List origNames = new ArrayList<>(); // original names of kept columns + List newColumns = new ArrayList<>(); + for (Column col : columns) { + if (dropColumns.contains(col.getName())) continue; + String origName = col.getName(); + String newName = origName; + if (unitInfo != null) { + newName = renameTimewrapColumn(origName, baseOffset, unitInfo); + } + origNames.add(origName); + newColumns.add(new Column(newName, col.getAlias(), col.getExprType())); + } + columns = newColumns; + + // 4. Rebuild rows with only kept columns, re-keyed to new names + List filteredValues = new ArrayList<>(); + for (ExprValue row : values) { + java.util.Map original = row.tupleValue(); + Map filtered = new LinkedHashMap<>(); + for (int i = 0; i < origNames.size(); i++) { + filtered.put(columns.get(i).getName(), original.get(origNames.get(i))); + } + filteredValues.add(ExprTupleValue.fromExprValueMap(filtered)); + } + values = filteredValues; + } finally { + CalcitePlanContext.stripNullColumns.set(false); + CalcitePlanContext.timewrapUnitName.set(null); + } + } + Schema schema = new Schema(columns); QueryResponse response = new QueryResponse(schema, values, null); return response; } + /** + * Rename a timewrap period column from relative to absolute offset. Uses the formula: + * periodFromNow = (baseOffset + relativePeriod - 1) * spanValue. Naming rules (matching Splunk): + * periodFromNow == 0 → "latest_", > 0 → "_before", < 0 → "_after". + * unitInfo format: "spanValue|singular|plural|_before" e.g., "1|day|days|_before". + */ + private String renameTimewrapColumn(String name, long baseOffset, String unitInfo) { + String[] parts = unitInfo.split("\\|", -1); + if (parts.length < 4) return name; + int spanValue = Integer.parseInt(parts[0]); + String singular = parts[1]; + String plural = parts[2]; + String nameSuffix = parts[3]; + + if (!name.endsWith(nameSuffix)) return name; + String beforeSuffix = name.substring(0, name.length() - nameSuffix.length()); + int lastUnderscore = beforeSuffix.lastIndexOf('_'); + if (lastUnderscore < 0) return name; + + String prefix = beforeSuffix.substring(0, lastUnderscore); + String periodStr = beforeSuffix.substring(lastUnderscore + 1); + try { + int relativePeriod = Integer.parseInt(periodStr); + long periodFromNow = (baseOffset + relativePeriod - 1) * spanValue; + + if (periodFromNow == 0) { + return prefix + "_latest_" + singular; + } else if (periodFromNow > 0) { + String unit = periodFromNow == 1 ? singular : plural; + return prefix + "_" + periodFromNow + unit + "_before"; + } else { + long absPeriod = Math.abs(periodFromNow); + String unit = absPeriod == 1 ? singular : plural; + return prefix + "_" + absPeriod + unit + "_after"; + } + } catch (NumberFormatException e) { + return name; + } + } + /** Registers opensearch-dependent functions */ private void registerOpenSearchFunctions() { Optional nodeClient = client.getNodeClient(); diff --git a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 index 3ada5c96b72..2f185acfbb5 100644 --- a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 @@ -55,6 +55,8 @@ APPENDCOL: 'APPENDCOL'; ADDTOTALS: 'ADDTOTALS'; ADDCOLTOTALS: 'ADDCOLTOTALS'; GRAPHLOOKUP: 'GRAPHLOOKUP'; +TIMEWRAP: 'TIMEWRAP'; +ALIGN: 'ALIGN'; EDGE: 'EDGE'; MAX_DEPTH: 'MAXDEPTH'; DEPTH_FIELD: 'DEPTHFIELD'; diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index 72d5d0fd76d..77dd880f217 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -95,6 +95,7 @@ commands | fieldformatCommand | nomvCommand | graphLookupCommand + | timewrapCommand ; commandName @@ -145,6 +146,7 @@ commandName | NOMV | TRANSPOSE | GRAPHLOOKUP + | TIMEWRAP ; searchCommand @@ -351,6 +353,15 @@ transposeParameter | (COLUMN_NAME EQUAL stringLiteral) ; +timewrapCommand + : TIMEWRAP spanLiteral (ALIGN EQUAL timewrapAlign)? + ; + +timewrapAlign + : NOW + | END + ; + timechartParameter : LIMIT EQUAL integerLiteral diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 9a92126d2e6..7bba43412bf 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -64,6 +64,7 @@ import org.opensearch.sql.ast.expression.SearchAnd; import org.opensearch.sql.ast.expression.SearchExpression; import org.opensearch.sql.ast.expression.SearchGroup; +import org.opensearch.sql.ast.expression.SpanUnit; import org.opensearch.sql.ast.expression.UnresolvedArgument; import org.opensearch.sql.ast.expression.UnresolvedExpression; import org.opensearch.sql.ast.expression.WindowFrame; @@ -118,6 +119,7 @@ import org.opensearch.sql.ast.tree.StreamWindow; import org.opensearch.sql.ast.tree.SubqueryAlias; import org.opensearch.sql.ast.tree.TableFunction; +import org.opensearch.sql.ast.tree.Timewrap; import org.opensearch.sql.ast.tree.Transpose; import org.opensearch.sql.ast.tree.Trendline; import org.opensearch.sql.ast.tree.UnresolvedPlan; @@ -819,6 +821,23 @@ public UnresolvedPlan visitTimechartCommand(OpenSearchPPLParser.TimechartCommand .build(); } + /** Timewrap command. */ + @Override + public UnresolvedPlan visitTimewrapCommand(OpenSearchPPLParser.TimewrapCommandContext ctx) { + Literal spanLiteral = (Literal) expressionBuilder.visit(ctx.spanLiteral()); + String spanText = spanLiteral.getValue().toString(); + String valueStr = spanText.replaceAll("[^0-9]", ""); + String unitStr = spanText.replaceAll("[0-9]", ""); + int value = valueStr.isEmpty() ? 1 : Integer.parseInt(valueStr); + SpanUnit unit = SpanUnit.of(unitStr); + if (unit == SpanUnit.UNKNOWN || unit == SpanUnit.NONE) { + throw new SemanticCheckException("Invalid timewrap span unit: " + unitStr); + } + String align = + ctx.timewrapAlign() != null ? ctx.timewrapAlign().getText().toLowerCase() : "end"; + return new Timewrap(unit, value, align, spanLiteral); + } + /** Eval command. */ @Override public UnresolvedPlan visitEvalCommand(EvalCommandContext ctx) { From 9c02b161bb1395181f001ed1f80153b423f558f3 Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Wed, 18 Mar 2026 10:33:01 -0700 Subject: [PATCH 2/4] Update metadata.rst doctest for timewrap_test index Add timewrap_test to SHOW TABLES expected output (25 tables). Signed-off-by: Jialiang Li Signed-off-by: Kai Huang --- docs/user/dql/metadata.rst | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/user/dql/metadata.rst b/docs/user/dql/metadata.rst index e4f55ef1b3e..92aaa0c7db0 100644 --- a/docs/user/dql/metadata.rst +++ b/docs/user/dql/metadata.rst @@ -35,7 +35,7 @@ Example 1: Show All Indices Information SQL query:: os> SHOW TABLES LIKE '%' - fetched rows / total rows = 24/24 + fetched rows / total rows = 25/25 +----------------+-------------+-------------------+------------+---------+----------+------------+-----------+---------------------------+----------------+ | TABLE_CAT | TABLE_SCHEM | TABLE_NAME | TABLE_TYPE | REMARKS | TYPE_CAT | TYPE_SCHEM | TYPE_NAME | SELF_REFERENCING_COL_NAME | REF_GENERATION | |----------------+-------------+-------------------+------------+---------+----------+------------+-----------+---------------------------+----------------| @@ -48,7 +48,7 @@ SQL query:: | docTestCluster | null | events_many_hosts | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | events_null | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | json_test | BASE TABLE | null | null | null | null | null | null | - | docTestCluster | null | mvcombine_data | BASE TABLE | null | null | null | null | null | null | + | docTestCluster | null | mvcombine_data | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | nested | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | nyc_taxi | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | occupation | BASE TABLE | null | null | null | null | null | null | @@ -59,6 +59,7 @@ SQL query:: | docTestCluster | null | time_data | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | time_data2 | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | time_test | BASE TABLE | null | null | null | null | null | null | + | docTestCluster | null | timewrap_test | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | weblogs | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | wildcard | BASE TABLE | null | null | null | null | null | null | | docTestCluster | null | work_information | BASE TABLE | null | null | null | null | null | null | From e0a03f0b2acb47a75d84800b2c4283b473345ddd Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Wed, 18 Mar 2026 11:52:53 -0700 Subject: [PATCH 3/4] Refactor timewrap: extract TimewrapUtils, add precise calendar arithmetic - Extract all timewrap helper methods to TimewrapUtils.java in calcite/utils/ - Add precise EXTRACT-based period computation for month/quarter/year - Add cumDaysBeforeMonth with leap year CASE expression for precise quarter offset - Month/quarter/year period assignment now uses calendar arithmetic instead of approximate fixed-length conversions Signed-off-by: Jialiang Li Signed-off-by: Kai Huang --- .../sql/calcite/CalciteRelNodeVisitor.java | 269 +++++--------- .../sql/calcite/utils/TimewrapUtils.java | 337 ++++++++++++++++++ .../remote/CalciteTimewrapCommandIT.java | 19 +- 3 files changed, 442 insertions(+), 183 deletions(-) create mode 100644 core/src/main/java/org/opensearch/sql/calcite/utils/TimewrapUtils.java diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index db8fbbd895f..4147c455951 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -87,10 +87,8 @@ import org.opensearch.sql.ast.expression.Alias; import org.opensearch.sql.ast.expression.AllFields; import org.opensearch.sql.ast.expression.AllFieldsExcludeMeta; -import org.opensearch.sql.ast.expression.And; import org.opensearch.sql.ast.expression.Argument; import org.opensearch.sql.ast.expression.Argument.ArgumentMap; -import org.opensearch.sql.ast.expression.Compare; import org.opensearch.sql.ast.expression.Field; import org.opensearch.sql.ast.expression.Function; import org.opensearch.sql.ast.expression.Let; @@ -168,6 +166,7 @@ import org.opensearch.sql.calcite.utils.JoinAndLookupUtils; import org.opensearch.sql.calcite.utils.PPLHintUtils; import org.opensearch.sql.calcite.utils.PlanUtils; +import org.opensearch.sql.calcite.utils.TimewrapUtils; import org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils; import org.opensearch.sql.calcite.utils.WildcardUtils; import org.opensearch.sql.common.patterns.PatternUtils; @@ -3096,19 +3095,14 @@ public RelNode visitChart(Chart node, CalcitePlanContext context) { return relBuilder.peek(); } - private static final int TIMEWRAP_MAX_PERIODS = 20; - @Override public RelNode visitTimewrap(Timewrap node, CalcitePlanContext context) { visitChildren(node, context); // Signal the execution engine to strip all-null columns and rename with absolute offsets CalcitePlanContext.stripNullColumns.set(true); - // Both align=now and align=end use _before suffix (matching Splunk behavior). - // align=end would use search end time as reference, but PPL has no search time range - // context, so both modes currently use query execution time. CalcitePlanContext.timewrapUnitName.set( - timewrapUnitBaseName(node.getUnit(), node.getValue()) + "|_before"); + TimewrapUtils.unitBaseName(node.getUnit(), node.getValue()) + "|_before"); RelBuilder b = context.relBuilder; RexBuilder rx = context.rexBuilder; @@ -3118,62 +3112,100 @@ public RelNode visitTimewrap(Timewrap node, CalcitePlanContext context) { String tsFieldName = fieldNames.get(0); List valueFieldNames = fieldNames.subList(1, fieldNames.size()); - long spanSec = timewrapSpanToSeconds(node.getUnit(), node.getValue()); + boolean variableLength = TimewrapUtils.isVariableLengthUnit(node.getUnit()); RelDataType bigintType = rx.getTypeFactory().createSqlType(SqlTypeName.BIGINT); - // Step 1: Convert timestamps to epoch seconds via UNIX_TIMESTAMP, add MAX OVER() - RexNode tsEpochExpr = - rx.makeCast( - bigintType, - rx.makeCall(PPLBuiltinOperators.UNIX_TIMESTAMP, b.field(tsFieldName)), - true); - b.projectPlus( - b.alias(tsEpochExpr, "__ts_epoch__"), - b.aggregateCall(SqlStdOperatorTable.MAX, tsEpochExpr).over().as("__max_epoch__")); - - // Step 2: Compute period_number and offset - RexNode tsEpoch = b.field("__ts_epoch__"); - RexNode maxEpoch = b.field("__max_epoch__"); - RexNode spanLit = rx.makeBigintLiteral(BigDecimal.valueOf(spanSec)); - - // period = (max_epoch - ts_epoch) / span_sec + 1 (integer division truncates) - RexNode diff = rx.makeCall(SqlStdOperatorTable.MINUS, maxEpoch, tsEpoch); - RexNode periodNum = - rx.makeCall( - SqlStdOperatorTable.PLUS, - rx.makeCall(SqlStdOperatorTable.DIVIDE, diff, spanLit), - rx.makeExactLiteral(BigDecimal.ONE, bigintType)); - - // offset_sec = ts_epoch MOD span_sec - // Convert back to actual timestamp: latest_period_start + offset - RexNode offsetSec = rx.makeCall(SqlStdOperatorTable.MOD, tsEpoch, spanLit); - RexNode latestPeriodStart = - rx.makeCall( - SqlStdOperatorTable.MINUS, - maxEpoch, - rx.makeCall(SqlStdOperatorTable.MOD, maxEpoch, spanLit)); - RexNode displayEpoch = rx.makeCall(SqlStdOperatorTable.PLUS, latestPeriodStart, offsetSec); - RexNode displayTimestamp = rx.makeCall(PPLBuiltinOperators.FROM_UNIXTIME, displayEpoch); - - // Compute base_offset for absolute period naming in execution engine. - // align=now: reference = current time - // align=end: reference = WHERE upper bound (search end time), fallback to now + RexNode periodNum; + RexNode displayTimestamp; RexNode baseOffset; - long nowEpochSec = context.functionProperties.getQueryStartClock().millis() / 1000; - Long referenceEpoch = null; - if ("end".equals(node.getAlign())) { - // Try to extract the upper bound from a WHERE clause on the timestamp field - referenceEpoch = extractTimestampUpperBound(node); - } - if (referenceEpoch == null) { - referenceEpoch = nowEpochSec; - } - RexNode refLit = rx.makeBigintLiteral(BigDecimal.valueOf(referenceEpoch)); - baseOffset = - rx.makeCall( - SqlStdOperatorTable.DIVIDE, - rx.makeCall(SqlStdOperatorTable.MINUS, refLit, maxEpoch), - spanLit); + + if (variableLength) { + // --- Variable-length units (month, quarter, year): EXTRACT-based calendar arithmetic --- + RexNode tsField = b.field(tsFieldName); + RexNode tsUnitNum = + TimewrapUtils.calendarUnitNumber(rx, tsField, node.getUnit(), node.getValue()); + + b.projectPlus(b.aggregateCall(SqlStdOperatorTable.MAX, tsField).over().as("__max_ts__")); + RexNode maxTs = b.field("__max_ts__"); + RexNode maxUnitNum = + TimewrapUtils.calendarUnitNumber(rx, maxTs, node.getUnit(), node.getValue()); + + periodNum = + rx.makeCall( + SqlStdOperatorTable.PLUS, + rx.makeCall(SqlStdOperatorTable.MINUS, maxUnitNum, tsUnitNum), + rx.makeExactLiteral(BigDecimal.ONE, bigintType)); + + RexNode tsEpoch = + rx.makeCast(bigintType, rx.makeCall(PPLBuiltinOperators.UNIX_TIMESTAMP, tsField), true); + RexNode unitStartEpoch = TimewrapUtils.calendarUnitStartEpoch(rx, tsField, node.getUnit()); + RexNode offsetSec = rx.makeCall(SqlStdOperatorTable.MINUS, tsEpoch, unitStartEpoch); + RexNode maxUnitStartEpoch = TimewrapUtils.calendarUnitStartEpoch(rx, maxTs, node.getUnit()); + RexNode displayEpoch = rx.makeCall(SqlStdOperatorTable.PLUS, maxUnitStartEpoch, offsetSec); + displayTimestamp = rx.makeCall(PPLBuiltinOperators.FROM_UNIXTIME, displayEpoch); + + long nowEpochSec = context.functionProperties.getQueryStartClock().millis() / 1000; + Long referenceEpoch = null; + if ("end".equals(node.getAlign())) { + referenceEpoch = TimewrapUtils.extractTimestampUpperBound(node); + } + if (referenceEpoch == null) { + referenceEpoch = nowEpochSec; + } + long refUnitNum = + TimewrapUtils.calendarUnitNumberFromEpoch( + referenceEpoch, node.getUnit(), node.getValue()); + RexNode refUnitNumLit = rx.makeBigintLiteral(BigDecimal.valueOf(refUnitNum)); + baseOffset = rx.makeCall(SqlStdOperatorTable.MINUS, refUnitNumLit, maxUnitNum); + + } else { + // --- Fixed-length units (sec, min, hr, day, week): epoch-based arithmetic --- + long spanSec = TimewrapUtils.spanToSeconds(node.getUnit(), node.getValue()); + + RexNode tsEpochExpr = + rx.makeCast( + bigintType, + rx.makeCall(PPLBuiltinOperators.UNIX_TIMESTAMP, b.field(tsFieldName)), + true); + b.projectPlus( + b.alias(tsEpochExpr, "__ts_epoch__"), + b.aggregateCall(SqlStdOperatorTable.MAX, tsEpochExpr).over().as("__max_epoch__")); + + RexNode tsEpoch = b.field("__ts_epoch__"); + RexNode maxEpoch = b.field("__max_epoch__"); + RexNode spanLit = rx.makeBigintLiteral(BigDecimal.valueOf(spanSec)); + + RexNode diff = rx.makeCall(SqlStdOperatorTable.MINUS, maxEpoch, tsEpoch); + periodNum = + rx.makeCall( + SqlStdOperatorTable.PLUS, + rx.makeCall(SqlStdOperatorTable.DIVIDE, diff, spanLit), + rx.makeExactLiteral(BigDecimal.ONE, bigintType)); + + RexNode offsetSec = rx.makeCall(SqlStdOperatorTable.MOD, tsEpoch, spanLit); + RexNode latestPeriodStart = + rx.makeCall( + SqlStdOperatorTable.MINUS, + maxEpoch, + rx.makeCall(SqlStdOperatorTable.MOD, maxEpoch, spanLit)); + RexNode displayEpoch = rx.makeCall(SqlStdOperatorTable.PLUS, latestPeriodStart, offsetSec); + displayTimestamp = rx.makeCall(PPLBuiltinOperators.FROM_UNIXTIME, displayEpoch); + + long nowEpochSec = context.functionProperties.getQueryStartClock().millis() / 1000; + Long referenceEpoch = null; + if ("end".equals(node.getAlign())) { + referenceEpoch = TimewrapUtils.extractTimestampUpperBound(node); + } + if (referenceEpoch == null) { + referenceEpoch = nowEpochSec; + } + RexNode refLit = rx.makeBigintLiteral(BigDecimal.valueOf(referenceEpoch)); + baseOffset = + rx.makeCall( + SqlStdOperatorTable.DIVIDE, + rx.makeCall(SqlStdOperatorTable.MINUS, refLit, maxEpoch), + spanLit); + } // Step 3: Project [display_timestamp, value_columns..., base_offset, period] // base_offset is included in the group key so it survives the PIVOT @@ -3193,8 +3225,8 @@ public RelNode visitTimewrap(Timewrap node, CalcitePlanContext context) { b.groupKey(b.field(tsFieldName), b.field("__base_offset__")), valueFieldNames.stream().map(f -> (RelBuilder.AggCall) b.max(b.field(f)).as("")).toList(), ImmutableList.of(b.field("__period__")), - IntStream.rangeClosed(1, TIMEWRAP_MAX_PERIODS) - .map(i -> TIMEWRAP_MAX_PERIODS + 1 - i) // reverse: oldest period first + IntStream.rangeClosed(1, TimewrapUtils.MAX_PERIODS) + .map(i -> TimewrapUtils.MAX_PERIODS + 1 - i) // reverse: oldest period first .mapToObj( i -> Map.entry( @@ -3228,117 +3260,6 @@ public RelNode visitTimewrap(Timewrap node, CalcitePlanContext context) { return b.peek(); } - /** - * Convert a span unit and value to approximate seconds. Variable-length units use standard - * approximations: month=30 days, quarter=91 days, year=365 days. - */ - private long timewrapSpanToSeconds(SpanUnit unit, int value) { - return switch (unit.getName()) { - case "s" -> value; - case "m" -> value * 60L; - case "h" -> value * 3_600L; - case "d" -> value * 86_400L; - case "w" -> value * 7L * 86_400L; - case "M" -> value * 30L * 86_400L; // month ≈ 30 days - case "q" -> value * 91L * 86_400L; // quarter ≈ 91 days - case "y" -> value * 365L * 86_400L; // year ≈ 365 days - default -> - throw new SemanticCheckException("Unsupported time unit in timewrap: " + unit.getName()); - }; - } - - /** - * Get the timescale base name for timewrap column naming. Returns singular and plural forms - * separated by "|", e.g., "day|days". Used by the execution engine to build absolute period names - * like "501days_before". - */ - private String timewrapUnitBaseName(SpanUnit unit, int value) { - String singular = - switch (unit.getName()) { - case "s" -> "second"; - case "m" -> "minute"; - case "h" -> "hour"; - case "d" -> "day"; - case "w" -> "week"; - case "M" -> "month"; - case "q" -> "quarter"; - case "y" -> "year"; - default -> "period"; - }; - String plural = singular + "s"; - // Encode value so execution engine can compute totalUnits = (base_offset + period) * value - return value + "|" + singular + "|" + plural; - } - - /** - * Walk the AST from a Timewrap node to find a WHERE clause with an upper bound on the timestamp - * field (e.g., @timestamp <= '2024-07-03 18:00:00'). Returns the upper bound as epoch seconds, or - * null if not found. - */ - private Long extractTimestampUpperBound(Timewrap node) { - // Walk: Timewrap → Chart → Filter → inspect condition - Node current = node; - while (current != null && !current.getChild().isEmpty()) { - current = current.getChild().get(0); - if (current instanceof Filter filter) { - return findUpperBound(filter.getCondition()); - } - } - return null; - } - - /** Recursively search an expression tree for a timestamp upper bound (<=). */ - private Long findUpperBound(UnresolvedExpression expr) { - if (expr instanceof And) { - And and = (And) expr; - Long left = findUpperBound(and.getLeft()); - Long right = findUpperBound(and.getRight()); - // If both sides have upper bounds, use the smaller one (tighter bound) - if (left != null && right != null) return Math.min(left, right); - return left != null ? left : right; - } - if (expr instanceof Compare cmp) { - String op = cmp.getOperator(); - // Check for @timestamp <= X or @timestamp < X - if (("<=".equals(op) || "<".equals(op)) && isTimestampField(cmp.getLeft())) { - return parseTimestampLiteral(cmp.getRight()); - } - // Check for X >= @timestamp or X > @timestamp - if ((">=".equals(op) || ">".equals(op)) && isTimestampField(cmp.getRight())) { - return parseTimestampLiteral(cmp.getLeft()); - } - } - return null; - } - - private boolean isTimestampField(UnresolvedExpression expr) { - if (expr instanceof Field field) { - String name = field.getField().toString(); - return "@timestamp".equals(name) || "timestamp".equals(name); - } - return false; - } - - private Long parseTimestampLiteral(UnresolvedExpression expr) { - if (expr instanceof Literal lit && lit.getValue() instanceof String s) { - try { - // Parse "yyyy-MM-dd HH:mm:ss" format - java.time.LocalDateTime ldt = - java.time.LocalDateTime.parse( - s, java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); - return ldt.toEpochSecond(java.time.ZoneOffset.UTC); - } catch (Exception e) { - // Try ISO format - try { - return java.time.Instant.parse(s).getEpochSecond(); - } catch (Exception ignored) { - return null; - } - } - } - return null; - } - /** * Aggregate by column split then rank by grand total (summed value of each category). The output * is [col-split, grand-total, row-number] diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/TimewrapUtils.java b/core/src/main/java/org/opensearch/sql/calcite/utils/TimewrapUtils.java new file mode 100644 index 00000000000..f213d2dc23e --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/TimewrapUtils.java @@ -0,0 +1,337 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.utils; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexBuilder; +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.fun.SqlStdOperatorTable; +import org.apache.calcite.sql.type.SqlTypeName; +import org.opensearch.sql.ast.Node; +import org.opensearch.sql.ast.expression.And; +import org.opensearch.sql.ast.expression.Compare; +import org.opensearch.sql.ast.expression.Field; +import org.opensearch.sql.ast.expression.Literal; +import org.opensearch.sql.ast.expression.SpanUnit; +import org.opensearch.sql.ast.expression.UnresolvedExpression; +import org.opensearch.sql.ast.tree.Filter; +import org.opensearch.sql.ast.tree.Timewrap; +import org.opensearch.sql.exception.SemanticCheckException; +import org.opensearch.sql.expression.function.PPLBuiltinOperators; + +/** Utility methods for the timewrap command's Calcite plan construction. */ +public class TimewrapUtils { + + public static final int MAX_PERIODS = 20; + + /** Check if the span unit is variable-length (month, quarter, year). */ + public static boolean isVariableLengthUnit(SpanUnit unit) { + return "M".equals(unit.getName()) || "q".equals(unit.getName()) || "y".equals(unit.getName()); + } + + /** Convert a span unit and value to seconds. For variable-length units, returns approximate. */ + public static long spanToSeconds(SpanUnit unit, int value) { + return switch (unit.getName()) { + case "s" -> value; + case "m" -> value * 60L; + case "h" -> value * 3_600L; + case "d" -> value * 86_400L; + case "w" -> value * 7L * 86_400L; + case "M" -> value * 30L * 86_400L; + case "q" -> value * 91L * 86_400L; + case "y" -> value * 365L * 86_400L; + default -> + throw new SemanticCheckException("Unsupported time unit in timewrap: " + unit.getName()); + }; + } + + /** + * Get the timescale base name for column naming. Returns "spanValue|singular|plural" e.g., + * "1|day|days". + */ + public static String unitBaseName(SpanUnit unit, int value) { + String singular = + switch (unit.getName()) { + case "s" -> "second"; + case "m" -> "minute"; + case "h" -> "hour"; + case "d" -> "day"; + case "w" -> "week"; + case "M" -> "month"; + case "q" -> "quarter"; + case "y" -> "year"; + default -> "period"; + }; + String plural = singular + "s"; + return value + "|" + singular + "|" + plural; + } + + /** + * Compute a calendar unit number for a timestamp as a Calcite RexNode. For months: year*12 + + * month. For quarters: year*4 + quarter. For years: year. Divided by spanValue. + */ + public static RexNode calendarUnitNumber( + RexBuilder rx, RexNode tsField, SpanUnit unit, int spanValue) { + RexNode year = rx.makeCall(PPLBuiltinOperators.EXTRACT, rx.makeLiteral("YEAR"), tsField); + RelDataType bigintType = rx.getTypeFactory().createSqlType(SqlTypeName.BIGINT); + + RexNode unitNum; + switch (unit.getName()) { + case "M" -> { + RexNode month = rx.makeCall(PPLBuiltinOperators.EXTRACT, rx.makeLiteral("MONTH"), tsField); + unitNum = + rx.makeCall( + SqlStdOperatorTable.PLUS, + rx.makeCall( + SqlStdOperatorTable.MULTIPLY, + year, + rx.makeExactLiteral(BigDecimal.valueOf(12), bigintType)), + month); + } + case "q" -> { + RexNode month = rx.makeCall(PPLBuiltinOperators.EXTRACT, rx.makeLiteral("MONTH"), tsField); + RexNode quarter = + rx.makeCall( + SqlStdOperatorTable.DIVIDE, + rx.makeCall( + SqlStdOperatorTable.MINUS, + month, + rx.makeExactLiteral(BigDecimal.ONE, bigintType)), + rx.makeExactLiteral(BigDecimal.valueOf(3), bigintType)); + unitNum = + rx.makeCall( + SqlStdOperatorTable.PLUS, + rx.makeCall( + SqlStdOperatorTable.MULTIPLY, + year, + rx.makeExactLiteral(BigDecimal.valueOf(4), bigintType)), + quarter); + } + case "y" -> unitNum = year; + default -> throw new SemanticCheckException("Not a variable-length unit: " + unit.getName()); + } + + if (spanValue > 1) { + unitNum = + rx.makeCall( + SqlStdOperatorTable.DIVIDE, + unitNum, + rx.makeExactLiteral(BigDecimal.valueOf(spanValue), bigintType)); + } + return unitNum; + } + + /** + * Compute the epoch seconds of the start of the calendar unit containing a timestamp. Month: + * first day of the month. Quarter: first day of the quarter (precise with leap year). Year: Jan + * 1. + */ + public static RexNode calendarUnitStartEpoch(RexBuilder rx, RexNode tsField, SpanUnit unit) { + RelDataType bigintType = rx.getTypeFactory().createSqlType(SqlTypeName.BIGINT); + RexNode tsEpoch = + rx.makeCast(bigintType, rx.makeCall(PPLBuiltinOperators.UNIX_TIMESTAMP, tsField), true); + RexNode hour = rx.makeCall(PPLBuiltinOperators.EXTRACT, rx.makeLiteral("HOUR"), tsField); + RexNode minute = rx.makeCall(PPLBuiltinOperators.EXTRACT, rx.makeLiteral("MINUTE"), tsField); + RexNode second = rx.makeCall(PPLBuiltinOperators.EXTRACT, rx.makeLiteral("SECOND"), tsField); + RexNode one = rx.makeExactLiteral(BigDecimal.ONE, bigintType); + RexNode sec86400 = rx.makeExactLiteral(BigDecimal.valueOf(86400), bigintType); + + RexNode timeWithinDay = + rx.makeCall( + SqlStdOperatorTable.PLUS, + rx.makeCall( + SqlStdOperatorTable.PLUS, + rx.makeCall( + SqlStdOperatorTable.MULTIPLY, + hour, + rx.makeExactLiteral(BigDecimal.valueOf(3600), bigintType)), + rx.makeCall( + SqlStdOperatorTable.MULTIPLY, + minute, + rx.makeExactLiteral(BigDecimal.valueOf(60), bigintType))), + second); + + if ("M".equals(unit.getName())) { + RexNode dayOfMonth = rx.makeCall(PPLBuiltinOperators.EXTRACT, rx.makeLiteral("DAY"), tsField); + return rx.makeCall( + SqlStdOperatorTable.MINUS, + rx.makeCall( + SqlStdOperatorTable.MINUS, + tsEpoch, + rx.makeCall( + SqlStdOperatorTable.MULTIPLY, + rx.makeCall(SqlStdOperatorTable.MINUS, dayOfMonth, one), + sec86400)), + timeWithinDay); + } else if ("y".equals(unit.getName())) { + RexNode doy = rx.makeCall(PPLBuiltinOperators.EXTRACT, rx.makeLiteral("DOY"), tsField); + return rx.makeCall( + SqlStdOperatorTable.MINUS, + rx.makeCall( + SqlStdOperatorTable.MINUS, + tsEpoch, + rx.makeCall( + SqlStdOperatorTable.MULTIPLY, + rx.makeCall(SqlStdOperatorTable.MINUS, doy, one), + sec86400)), + timeWithinDay); + } else { + // Quarter: precise day-within-quarter via cumulative day lookup + leap year + RexNode doy = rx.makeCall(PPLBuiltinOperators.EXTRACT, rx.makeLiteral("DOY"), tsField); + RexNode month = rx.makeCall(PPLBuiltinOperators.EXTRACT, rx.makeLiteral("MONTH"), tsField); + RexNode year = rx.makeCall(PPLBuiltinOperators.EXTRACT, rx.makeLiteral("YEAR"), tsField); + + RexNode monthsIntoQuarter = + rx.makeCall( + SqlStdOperatorTable.MOD, + rx.makeCall(SqlStdOperatorTable.MINUS, month, one), + rx.makeExactLiteral(BigDecimal.valueOf(3), bigintType)); + RexNode quarterStartMonth = rx.makeCall(SqlStdOperatorTable.MINUS, month, monthsIntoQuarter); + + RexNode cumDaysBeforeQS = cumDaysBeforeMonth(rx, quarterStartMonth, year, bigintType); + RexNode quarterStartDOY = rx.makeCall(SqlStdOperatorTable.PLUS, cumDaysBeforeQS, one); + RexNode dayWithinQuarter = rx.makeCall(SqlStdOperatorTable.MINUS, doy, quarterStartDOY); + + return rx.makeCall( + SqlStdOperatorTable.MINUS, + rx.makeCall( + SqlStdOperatorTable.MINUS, + tsEpoch, + rx.makeCall(SqlStdOperatorTable.MULTIPLY, dayWithinQuarter, sec86400)), + timeWithinDay); + } + } + + /** + * Build a CASE expression for cumulative days before a given month, with leap year handling. + * Month 1→0, Month 2→31, Month 3→59+leap, ..., Month 12→334+leap. + */ + public static RexNode cumDaysBeforeMonth( + RexBuilder rx, RexNode month, RexNode year, RelDataType bigintType) { + RexNode mod4 = + rx.makeCall( + SqlStdOperatorTable.MOD, year, rx.makeExactLiteral(BigDecimal.valueOf(4), bigintType)); + RexNode mod100 = + rx.makeCall( + SqlStdOperatorTable.MOD, + year, + rx.makeExactLiteral(BigDecimal.valueOf(100), bigintType)); + RexNode mod400 = + rx.makeCall( + SqlStdOperatorTable.MOD, + year, + rx.makeExactLiteral(BigDecimal.valueOf(400), bigintType)); + RexNode zero = rx.makeExactLiteral(BigDecimal.ZERO, bigintType); + RexNode isLeap = + rx.makeCall( + SqlStdOperatorTable.CASE, + rx.makeCall( + SqlStdOperatorTable.AND, + rx.makeCall(SqlStdOperatorTable.EQUALS, mod4, zero), + rx.makeCall( + SqlStdOperatorTable.OR, + rx.makeCall(SqlStdOperatorTable.NOT_EQUALS, mod100, zero), + rx.makeCall(SqlStdOperatorTable.EQUALS, mod400, zero))), + rx.makeExactLiteral(BigDecimal.ONE, bigintType), + zero); + + int[] cumDays = {0, 31, 59, 90, 120, 151, 181, 212, 243, 273, 304, 334}; + + List caseArgs = new ArrayList<>(); + for (int m = 1; m <= 12; m++) { + caseArgs.add( + rx.makeCall( + SqlStdOperatorTable.EQUALS, + month, + rx.makeExactLiteral(BigDecimal.valueOf(m), bigintType))); + RexNode days = rx.makeExactLiteral(BigDecimal.valueOf(cumDays[m - 1]), bigintType); + if (m >= 3) { + days = rx.makeCall(SqlStdOperatorTable.PLUS, days, isLeap); + } + caseArgs.add(days); + } + caseArgs.add(zero); + + return rx.makeCall(SqlStdOperatorTable.CASE, caseArgs.toArray(new RexNode[0])); + } + + /** Compute calendar unit number from an epoch at plan time (Java). */ + public static long calendarUnitNumberFromEpoch(long epochSec, SpanUnit unit, int spanValue) { + java.time.Instant instant = java.time.Instant.ofEpochSecond(epochSec); + java.time.ZonedDateTime zdt = instant.atZone(java.time.ZoneOffset.UTC); + long unitNum; + switch (unit.getName()) { + case "M" -> unitNum = zdt.getYear() * 12L + zdt.getMonthValue(); + case "q" -> unitNum = zdt.getYear() * 4L + (zdt.getMonthValue() - 1) / 3; + case "y" -> unitNum = zdt.getYear(); + default -> throw new SemanticCheckException("Not a variable-length unit: " + unit.getName()); + } + return unitNum / spanValue; + } + + /** + * Walk the AST from a Timewrap node to find a WHERE clause with an upper bound on the timestamp + * field. Returns the upper bound as epoch seconds, or null if not found. + */ + public static Long extractTimestampUpperBound(Timewrap node) { + Node current = node; + while (current != null && !current.getChild().isEmpty()) { + current = current.getChild().get(0); + if (current instanceof Filter filter) { + return findUpperBound(filter.getCondition()); + } + } + return null; + } + + private static Long findUpperBound(UnresolvedExpression expr) { + if (expr instanceof And and) { + Long left = findUpperBound(and.getLeft()); + Long right = findUpperBound(and.getRight()); + if (left != null && right != null) return Math.min(left, right); + return left != null ? left : right; + } + if (expr instanceof Compare cmp) { + String op = cmp.getOperator(); + if (("<=".equals(op) || "<".equals(op)) && isTimestampField(cmp.getLeft())) { + return parseTimestampLiteral(cmp.getRight()); + } + if ((">=".equals(op) || ">".equals(op)) && isTimestampField(cmp.getRight())) { + return parseTimestampLiteral(cmp.getLeft()); + } + } + return null; + } + + private static boolean isTimestampField(UnresolvedExpression expr) { + if (expr instanceof Field field) { + String name = field.getField().toString(); + return "@timestamp".equals(name) || "timestamp".equals(name); + } + return false; + } + + private static Long parseTimestampLiteral(UnresolvedExpression expr) { + if (expr instanceof Literal lit && lit.getValue() instanceof String s) { + try { + java.time.LocalDateTime ldt = + java.time.LocalDateTime.parse( + s, java.time.format.DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); + return ldt.toEpochSecond(java.time.ZoneOffset.UTC); + } catch (Exception e) { + try { + return java.time.Instant.parse(s).getEpochSecond(); + } catch (Exception ignored) { + return null; + } + } + } + return null; + } +} diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteTimewrapCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteTimewrapCommandIT.java index 75253fa858f..22f026a2723 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteTimewrapCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteTimewrapCommandIT.java @@ -544,21 +544,22 @@ public void testTimewrapWeekSpan() throws IOException { @Test public void testTimewrapMonthSpan() throws IOException { - // Jun 15 to Jul 4: all data within ~19 days → single 30-day month period + // Jul 1-4 only: all data within same month → single month period JSONObject result = executeQuery( - "source=timewrap_test | where @timestamp >= '2024-06-15 00:00:00' and @timestamp <=" + "source=timewrap_test | where @timestamp >= '2024-07-01 00:00:00' and @timestamp <=" + " '2024-07-04 06:00:00' | timechart span=1day sum(requests) | timewrap 1month"); verifySchema( result, schema("@timestamp", "timestamp"), schema("sum(requests)_latest_month", "bigint")); - // Display timestamps anchored to latest period — Jun 15 maps to Jul 15 offset - verifyDataRowsSome(result, rows("2024-07-15 00:00:00", 200), rows("2024-07-01 00:00:00", 920)); + verifyDataRowsSome(result, rows("2024-07-01 00:00:00", 920), rows("2024-07-04 00:00:00", 180)); } @Test public void testTimewrapQuarterSpan() throws IOException { - // Jan 15 to Apr 15 = ~91 days → 2 quarter periods (Jan in one, Apr in another) + // Jan 15 (Q1) and Apr 15 (Q2) → 2 quarter periods + // With precise day-within-quarter offset: Jan 15 = day 15 of Q1, Apr 15 = day 15 of Q2 + // Both are at the same offset (day 15) → they align on the same row JSONObject result = executeQuery( "source=timewrap_test | where @timestamp >= '2024-01-15 00:00:00' and @timestamp <=" @@ -569,14 +570,14 @@ public void testTimewrapQuarterSpan() throws IOException { schema("@timestamp", "timestamp"), schema("sum(requests)_1quarter_before", "bigint"), schema("sum(requests)_latest_quarter", "bigint")); + // Day 15 of each quarter aligns — both values on the same row verifyDataRowsSome(result, rows("2024-04-15 00:00:00", 300, 350)); } @Test public void testTimewrapYearSpan() throws IOException { - // Jan 2024 to Jan 2025 = ~365 days → 2 year periods - // Period 1 (1year_before): Jan 2024 data (300) - // Period 2 (latest_year): everything else + // Jan 2024 to Jan 2025 → 2 year periods (2024 and 2025) + // Jan 15 offset exists in both years: 2024=300, 2025=400 JSONObject result = executeQuery( "source=timewrap_test | where @timestamp >= '2024-01-15 00:00:00' and @timestamp <=" @@ -587,6 +588,6 @@ public void testTimewrapYearSpan() throws IOException { schema("@timestamp", "timestamp"), schema("sum(requests)_1year_before", "bigint"), schema("sum(requests)_latest_year", "bigint")); - verifyDataRowsSome(result, rows("2025-01-15 00:00:00", null, 400)); + verifyDataRowsSome(result, rows("2025-01-15 00:00:00", 300, 400)); } } From 219a704852c42b90a4356105ea3cbba1c6325e6c Mon Sep 17 00:00:00 2001 From: Kai Huang Date: Thu, 19 Mar 2026 13:22:29 -0700 Subject: [PATCH 4/4] Replace Calcite PIVOT with post-processing pivot, add series parameter - Remove Calcite PIVOT from timewrap: no more MAX_PERIODS limit or crash risk - Add post-processing pivot in execution engine: dynamically builds columns from actual data using HashMap grouping - Add series parameter: series=relative (default), series=short (s0, s1) - Add series=exact grammar support (falls back to short at runtime) - Add time_format grammar support for series=exact - Benchmark: 1000 period columns in ~50ms (Calcite PIVOT crashed at 1000) - 32 IT tests, all using verifySchema + verifyDataRows Signed-off-by: Jialiang Li Signed-off-by: Kai Huang --- .../org/opensearch/sql/ast/tree/Timewrap.java | 2 + .../sql/calcite/CalcitePlanContext.java | 6 + .../sql/calcite/CalciteRelNodeVisitor.java | 44 +--- .../sql/calcite/utils/TimewrapUtils.java | 21 +- .../remote/CalciteTimewrapCommandIT.java | 203 +++++++++++++-- .../executor/OpenSearchExecutionEngine.java | 240 +++++++++++++----- ppl/src/main/antlr/OpenSearchPPLLexer.g4 | 4 + ppl/src/main/antlr/OpenSearchPPLParser.g4 | 14 +- .../opensearch/sql/ppl/parser/AstBuilder.java | 22 +- 9 files changed, 430 insertions(+), 126 deletions(-) diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/Timewrap.java b/core/src/main/java/org/opensearch/sql/ast/tree/Timewrap.java index bb5a81f494e..88c151db331 100644 --- a/core/src/main/java/org/opensearch/sql/ast/tree/Timewrap.java +++ b/core/src/main/java/org/opensearch/sql/ast/tree/Timewrap.java @@ -24,6 +24,8 @@ public class Timewrap extends UnresolvedPlan { private final SpanUnit unit; private final int value; private final String align; // "end" or "now" + private final String series; // "relative", "short", or "exact" + private final String timeFormat; // format string for series=exact, nullable private final Literal spanLiteral; // original span literal for display private UnresolvedPlan child; diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java index 3b91497ab8a..4fce1d1d41f 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalcitePlanContext.java @@ -50,6 +50,12 @@ public class CalcitePlanContext { */ public static final ThreadLocal timewrapUnitName = new ThreadLocal<>(); + /** Timewrap series mode: "relative", "short", or "exact". */ + public static final ThreadLocal timewrapSeries = new ThreadLocal<>(); + + /** Timewrap time_format string for series=exact (e.g., "%Y-%m-%d"). */ + public static final ThreadLocal timewrapTimeFormat = new ThreadLocal<>(); + /** Thread-local switch that tells whether the current query prefers legacy behavior. */ private static final ThreadLocal legacyPreferredFlag = ThreadLocal.withInitial(() -> true); diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 4147c455951..4fd215ec945 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -3103,6 +3103,8 @@ public RelNode visitTimewrap(Timewrap node, CalcitePlanContext context) { CalcitePlanContext.stripNullColumns.set(true); CalcitePlanContext.timewrapUnitName.set( TimewrapUtils.unitBaseName(node.getUnit(), node.getValue()) + "|_before"); + CalcitePlanContext.timewrapSeries.set(node.getSeries()); + CalcitePlanContext.timewrapTimeFormat.set(node.getTimeFormat()); RelBuilder b = context.relBuilder; RexBuilder rx = context.rexBuilder; @@ -3218,44 +3220,10 @@ public RelNode visitTimewrap(Timewrap node, CalcitePlanContext context) { projections.add(b.alias(periodNum, "__period__")); b.project(projections); - // Step 4: PIVOT on period, grouped by [offset, __base_offset__] - // __base_offset__ is constant across all rows so it doesn't affect grouping, - // but survives the PIVOT so the execution engine can use it for absolute column naming - b.pivot( - b.groupKey(b.field(tsFieldName), b.field("__base_offset__")), - valueFieldNames.stream().map(f -> (RelBuilder.AggCall) b.max(b.field(f)).as("")).toList(), - ImmutableList.of(b.field("__period__")), - IntStream.rangeClosed(1, TimewrapUtils.MAX_PERIODS) - .map(i -> TimewrapUtils.MAX_PERIODS + 1 - i) // reverse: oldest period first - .mapToObj( - i -> - Map.entry( - // Use placeholder relative names; execution engine renames to absolute - String.valueOf(i), ImmutableList.of((RexNode) b.literal((long) i)))) - .collect(Collectors.toList())); - - // Step 5: Rename columns — add agg name prefix, clean up pivot artifacts - // Use relative period numbers as temporary names; the execution engine will compute - // absolute offsets using __base_offset__ and rename accordingly - List pivotColNames = b.peek().getRowType().getFieldNames(); - List cleanNames = new ArrayList<>(); - cleanNames.add(tsFieldName); - cleanNames.add("__base_offset__"); - for (int i = 2; i < pivotColNames.size(); i++) { - String name = pivotColNames.get(i); - if (name.endsWith("_")) { - name = name.substring(0, name.length() - 1); - } - // Prefix with agg name and _before suffix - if (valueFieldNames.size() == 1) { - name = valueFieldNames.get(0) + "_" + name + "_before"; - } - cleanNames.add(name); - } - b.rename(cleanNames); - - // Step 6: Sort by offset - b.sort(b.field(0)); + // Step 4: Sort by offset, then period (execution engine will pivot) + // No Calcite PIVOT -- the execution engine pivots dynamically after reading all rows. + // Output schema: [display_timestamp, value_columns..., __base_offset__, __period__] + b.sort(b.field(tsFieldName), b.field("__period__")); return b.peek(); } diff --git a/core/src/main/java/org/opensearch/sql/calcite/utils/TimewrapUtils.java b/core/src/main/java/org/opensearch/sql/calcite/utils/TimewrapUtils.java index f213d2dc23e..8ca3b469435 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/utils/TimewrapUtils.java +++ b/core/src/main/java/org/opensearch/sql/calcite/utils/TimewrapUtils.java @@ -276,18 +276,31 @@ public static long calendarUnitNumberFromEpoch(long epochSec, SpanUnit unit, int } /** - * Walk the AST from a Timewrap node to find a WHERE clause with an upper bound on the timestamp - * field. Returns the upper bound as epoch seconds, or null if not found. + * Walk the AST from a Timewrap node to the deepest Filter node and extract the timestamp upper + * bound. The frontend time picker always appends the timestamp filter as the first pipe (closest + * to source), making it the deepest Filter in the AST chain: + * + *
+   *   Timewrap -> Chart -> [user filters] -> Filter(@timestamp >= X AND @timestamp <= Y) -> Source
+   * 
+ * + * We walk all Filter nodes and return the last (deepest) timestamp upper bound found. This + * ensures user filters like `where age > 30` between timechart and the time picker filter don't + * interfere. */ public static Long extractTimestampUpperBound(Timewrap node) { Node current = node; + Long lastBound = null; while (current != null && !current.getChild().isEmpty()) { current = current.getChild().get(0); if (current instanceof Filter filter) { - return findUpperBound(filter.getCondition()); + Long bound = findUpperBound(filter.getCondition()); + if (bound != null) { + lastBound = bound; + } } } - return null; + return lastBound; } private static Long findUpperBound(UnresolvedExpression expr) { diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteTimewrapCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteTimewrapCommandIT.java index 22f026a2723..9383e56f490 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteTimewrapCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteTimewrapCommandIT.java @@ -143,23 +143,32 @@ public void testTimewrapIncompletePeriodNullFill() throws IOException { schema("sum(requests)_2days_before", "bigint"), schema("sum(requests)_1day_before", "bigint"), schema("sum(requests)_latest_day", "bigint")); - verifyDataRowsSome(result, rows("2024-07-04 12:00:00", null, 310, 330, 285)); - verifyDataRowsSome(result, rows("2024-07-04 18:00:00", null, 190, 215, 165)); + verifyDataRowsInOrder( + result, + rows("2024-07-04 00:00:00", 180, 205, 165, 80), + rows("2024-07-04 06:00:00", 240, 260, 225, 100), + rows("2024-07-04 12:00:00", null, 310, 330, 285), + rows("2024-07-04 18:00:00", null, 190, 215, 165)); } // --- Different timescales --- @Test public void testTimewrapWeekSpanSinglePeriod() throws IOException { + // 3 days of daily data in 1 week -> single period, 3 rows JSONObject result = executeQuery( "source=timewrap_test" - + WHERE_ALL - + " | timechart span=6h sum(requests) | timewrap 1week"); + + WHERE_JUL1_TO_JUL3 + + " | timechart span=1day sum(requests) | timewrap 1week"); verifySchema( result, schema("@timestamp", "timestamp"), schema("sum(requests)_latest_week", "bigint")); - verifyDataRowsSome(result, rows("2024-07-04 00:00:00", 80), rows("2024-07-04 06:00:00", 100)); + verifyDataRowsInOrder( + result, + rows("2024-07-01 00:00:00", 920), + rows("2024-07-02 00:00:00", 1010), + rows("2024-07-03 00:00:00", 840)); } @Test @@ -180,7 +189,10 @@ public void testTimewrapTwelveHourSpan() throws IOException { schema("sum(requests)_24hours_before", "bigint"), schema("sum(requests)_12hours_before", "bigint"), schema("sum(requests)_latest_hour", "bigint")); - verifyDataRowsSome(result, rows("2024-07-04 00:00:00", 180, 310, 205, 330, 165, 285, 80)); + verifyDataRowsInOrder( + result, + rows("2024-07-04 00:00:00", 180, 310, 205, 330, 165, 285, 80), + rows("2024-07-04 06:00:00", 240, 190, 260, 215, 225, 165, 100)); } @Test @@ -531,15 +543,17 @@ public void testTimewrapDaySpan() throws IOException { @Test public void testTimewrapWeekSpan() throws IOException { + // 2 days of daily data in 1 week -> single period, 2 rows JSONObject result = executeQuery( "source=timewrap_test" - + WHERE_ALL - + " | timechart span=6h sum(requests) | timewrap 1week"); + + WHERE_JUL1_TO_JUL2 + + " | timechart span=1day sum(requests) | timewrap 1week"); verifySchema( result, schema("@timestamp", "timestamp"), schema("sum(requests)_latest_week", "bigint")); - verifyDataRowsSome(result, rows("2024-07-04 00:00:00", 80), rows("2024-07-04 06:00:00", 100)); + verifyDataRowsInOrder( + result, rows("2024-07-01 00:00:00", 920), rows("2024-07-02 00:00:00", 1010)); } @Test @@ -552,7 +566,12 @@ public void testTimewrapMonthSpan() throws IOException { verifySchema( result, schema("@timestamp", "timestamp"), schema("sum(requests)_latest_month", "bigint")); - verifyDataRowsSome(result, rows("2024-07-01 00:00:00", 920), rows("2024-07-04 00:00:00", 180)); + verifyDataRowsInOrder( + result, + rows("2024-07-01 00:00:00", 920), + rows("2024-07-02 00:00:00", 1010), + rows("2024-07-03 00:00:00", 840), + rows("2024-07-04 00:00:00", 180)); } @Test @@ -570,24 +589,174 @@ public void testTimewrapQuarterSpan() throws IOException { schema("@timestamp", "timestamp"), schema("sum(requests)_1quarter_before", "bigint"), schema("sum(requests)_latest_quarter", "bigint")); - // Day 15 of each quarter aligns — both values on the same row - verifyDataRowsSome(result, rows("2024-04-15 00:00:00", 300, 350)); + // Day 15 of each quarter aligns -- both values on the same row + verifyDataRows(result, rows("2024-04-15 00:00:00", 300, 350)); } @Test public void testTimewrapYearSpan() throws IOException { - // Jan 2024 to Jan 2025 → 2 year periods (2024 and 2025) - // Jan 15 offset exists in both years: 2024=300, 2025=400 + // Jan 15 2024 (300) and Jan 15 2025 (400) -- 2 data points in 2 different years + // timechart span=1year: 2 yearly buckets + // timewrap 1year: 2 periods, 1 offset row JSONObject result = executeQuery( - "source=timewrap_test | where @timestamp >= '2024-01-15 00:00:00' and @timestamp <=" - + " '2025-01-15 12:00:00' | timechart span=1day sum(requests) | timewrap 1year"); + "source=timewrap_test | where @timestamp >= '2024-01-15 12:00:00' and @timestamp <=" + + " '2025-01-15 12:00:00' | timechart span=1year sum(requests) | timewrap 1year"); verifySchema( result, schema("@timestamp", "timestamp"), schema("sum(requests)_1year_before", "bigint"), schema("sum(requests)_latest_year", "bigint")); - verifyDataRowsSome(result, rows("2025-01-15 00:00:00", 300, 400)); + // 2024 yearly sum = all 2024 data in WHERE range; 2025 = Jan 15 only (400) + verifyDataRows(result, rows("2025-01-01 00:00:00", 4050, 400)); + } + + // --- series parameter --- + + @Test + public void testTimewrapSeriesRelativeIsDefault() throws IOException { + // series=relative is the default — same as no series parameter + JSONObject resultDefault = + executeQuery( + "source=timewrap_test" + + WHERE_JUL1_TO_JUL3 + + " | timechart span=6h sum(requests) | timewrap 1day"); + JSONObject resultRelative = + executeQuery( + "source=timewrap_test" + + WHERE_JUL1_TO_JUL3 + + " | timechart span=6h sum(requests) | timewrap 1day series=relative"); + + verifySchema( + resultRelative, + schema("@timestamp", "timestamp"), + schema("sum(requests)_2days_before", "bigint"), + schema("sum(requests)_1day_before", "bigint"), + schema("sum(requests)_latest_day", "bigint")); + verifyDataRowsInOrder( + resultRelative, + rows("2024-07-03 00:00:00", 180, 205, 165), + rows("2024-07-03 06:00:00", 240, 260, 225), + rows("2024-07-03 12:00:00", 310, 330, 285), + rows("2024-07-03 18:00:00", 190, 215, 165)); + verifySchema( + resultDefault, + schema("@timestamp", "timestamp"), + schema("sum(requests)_2days_before", "bigint"), + schema("sum(requests)_1day_before", "bigint"), + schema("sum(requests)_latest_day", "bigint")); + verifyDataRowsInOrder( + resultDefault, + rows("2024-07-03 00:00:00", 180, 205, 165), + rows("2024-07-03 06:00:00", 240, 260, 225), + rows("2024-07-03 12:00:00", 310, 330, 285), + rows("2024-07-03 18:00:00", 190, 215, 165)); } + + @Test + public void testTimewrapSeriesShort() throws IOException { + // series=short: columns named _s + // With align=end and WHERE upper bound = Jul 3 18:00, baseOffset=0 + // Periods: oldest=2, middle=1, newest=0 + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL1_TO_JUL3 + + " | timechart span=6h sum(requests) | timewrap 1day series=short"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(requests)_s2", "bigint"), + schema("sum(requests)_s1", "bigint"), + schema("sum(requests)_s0", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-03 00:00:00", 180, 205, 165), + rows("2024-07-03 06:00:00", 240, 260, 225), + rows("2024-07-03 12:00:00", 310, 330, 285), + rows("2024-07-03 18:00:00", 190, 215, 165)); + } + + @Test + public void testTimewrapSeriesShortWithCount() throws IOException { + // series=short with count aggregation + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL2_TO_JUL3 + + " | timechart span=6h count() | timewrap 1day series=short"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("count()_s1", "bigint"), + schema("count()_s0", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-03 00:00:00", 2, 2), + rows("2024-07-03 06:00:00", 2, 2), + rows("2024-07-03 12:00:00", 2, 2), + rows("2024-07-03 18:00:00", 2, 2)); + } + + @Test + public void testTimewrapSeriesShortWeekSpan() throws IOException { + // series=short with week span, single period = s0, daily buckets + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL1_TO_JUL2 + + " | timechart span=1day sum(requests) | timewrap 1week series=short"); + + verifySchema(result, schema("@timestamp", "timestamp"), schema("sum(requests)_s0", "bigint")); + verifyDataRowsInOrder( + result, rows("2024-07-01 00:00:00", 920), rows("2024-07-02 00:00:00", 1010)); + } + + @Test + public void testTimewrapSeriesShortWithAvg() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL1_TO_JUL2 + + " | timechart span=6h avg(requests) | timewrap 1day series=short"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("avg(requests)_s1", "double"), + schema("avg(requests)_s0", "double")); + verifyDataRowsInOrder( + result, + rows("2024-07-02 00:00:00", 90.0, 102.5), + rows("2024-07-02 06:00:00", 120.0, 130.0), + rows("2024-07-02 12:00:00", 155.0, 165.0), + rows("2024-07-02 18:00:00", 95.0, 107.5)); + } + + @Test + public void testTimewrapSeriesShortWithErrors() throws IOException { + JSONObject result = + executeQuery( + "source=timewrap_test" + + WHERE_JUL2_TO_JUL3 + + " | timechart span=6h sum(errors) | timewrap 1day series=short"); + + verifySchema( + result, + schema("@timestamp", "timestamp"), + schema("sum(errors)_s1", "bigint"), + schema("sum(errors)_s0", "bigint")); + verifyDataRowsInOrder( + result, + rows("2024-07-03 00:00:00", 4, 1), + rows("2024-07-03 06:00:00", 6, 3), + rows("2024-07-03 12:00:00", 9, 6), + rows("2024-07-03 18:00:00", 3, 1)); + } + + // BY clause tests are pending -- blocked by timechart BY output format gap. + // See docs/dev/ppl-timewrap-command.md for design options. } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java index 5bffb2eae06..e5151e7c444 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java @@ -13,7 +13,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -44,8 +43,10 @@ import org.opensearch.sql.calcite.CalcitePlanContext; import org.opensearch.sql.calcite.utils.CalciteToolsHelper.OpenSearchRelRunners; import org.opensearch.sql.calcite.utils.OpenSearchTypeFactory; +import org.opensearch.sql.calcite.utils.TimewrapUtils; import org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils; import org.opensearch.sql.common.response.ResponseListener; +import org.opensearch.sql.data.model.ExprNullValue; import org.opensearch.sql.data.model.ExprTupleValue; import org.opensearch.sql.data.model.ExprValue; import org.opensearch.sql.data.model.ExprValueUtils; @@ -322,68 +323,124 @@ private QueryResponse buildResultSet( } columns.add(new Column(columnName, null, exprType)); } - // Timewrap post-processing: strip null columns, remove __base_offset__, rename to absolute + // Timewrap post-processing: pivot unpivoted rows into period columns + // Input: [display_ts, value_col(s)..., __base_offset__, __period__] + // Output: [display_ts, _, ...] if (CalcitePlanContext.stripNullColumns.get()) { try { - // 1. Find all-null columns and always strip __base_offset__ - Set dropColumns = new HashSet<>(); - dropColumns.add("__base_offset__"); - if (!values.isEmpty()) { - Set allNullCandidates = new HashSet<>(); - for (Column col : columns) { - allNullCandidates.add(col.getName()); + String unitInfo = CalcitePlanContext.timewrapUnitName.get(); + if (unitInfo != null && !values.isEmpty()) { + // Find column indices + int tsIdx = 0; + int periodIdx = -1; + int baseOffsetIdx = -1; + List valueIdxs = new ArrayList<>(); + for (int i = 0; i < columns.size(); i++) { + String name = columns.get(i).getName(); + if ("__period__".equals(name)) periodIdx = i; + else if ("__base_offset__".equals(name)) baseOffsetIdx = i; + else if (i > 0 && periodIdx < 0 && baseOffsetIdx < 0) valueIdxs.add(i); } - for (ExprValue row : values) { - java.util.Map tupleValues = row.tupleValue(); - allNullCandidates.removeIf( - colName -> { - ExprValue v = tupleValues.get(colName); - return v != null && !v.isNull(); - }); - if (allNullCandidates.isEmpty()) break; + // Workaround: if indices weren't found before value columns, scan again + if (periodIdx < 0 || baseOffsetIdx < 0) { + valueIdxs.clear(); + for (int i = 0; i < columns.size(); i++) { + String name = columns.get(i).getName(); + if ("__period__".equals(name)) periodIdx = i; + else if ("__base_offset__".equals(name)) baseOffsetIdx = i; + else if (i > 0) valueIdxs.add(i); + } } - dropColumns.addAll(allNullCandidates); - } - // 2. Read __base_offset__ for absolute naming - long baseOffset = 0; - if (!values.isEmpty()) { + // Read __base_offset__ (constant across all rows) + long baseOffset = 0; ExprValue boVal = values.getFirst().tupleValue().get("__base_offset__"); if (boVal != null && !boVal.isNull()) { baseOffset = boVal.longValue(); } - } - // 3. Build new column list: drop nulls, rename periods to absolute offsets - String unitInfo = CalcitePlanContext.timewrapUnitName.get(); - List origNames = new ArrayList<>(); // original names of kept columns - List newColumns = new ArrayList<>(); - for (Column col : columns) { - if (dropColumns.contains(col.getName())) continue; - String origName = col.getName(); - String newName = origName; - if (unitInfo != null) { - newName = renameTimewrapColumn(origName, baseOffset, unitInfo); + // Collect distinct periods (sorted descending = oldest first in output) + List colNames = columns.stream().map(Column::getName).toList(); + Set periodSet = new java.util.TreeSet<>(java.util.Collections.reverseOrder()); + for (ExprValue row : values) { + ExprValue pv = row.tupleValue().get("__period__"); + if (pv != null && !pv.isNull()) { + periodSet.add(pv.longValue()); + } } - origNames.add(origName); - newColumns.add(new Column(newName, col.getAlias(), col.getExprType())); - } - columns = newColumns; - - // 4. Rebuild rows with only kept columns, re-keyed to new names - List filteredValues = new ArrayList<>(); - for (ExprValue row : values) { - java.util.Map original = row.tupleValue(); - Map filtered = new LinkedHashMap<>(); - for (int i = 0; i < origNames.size(); i++) { - filtered.put(columns.get(i).getName(), original.get(origNames.get(i))); + List periods = new ArrayList<>(periodSet); + + // Build value column names + List valueColNames = new ArrayList<>(); + for (int vi : valueIdxs) { + valueColNames.add(columns.get(vi).getName()); + } + + // Build output column names: [ts, val1_period1, val1_period2, ..., val2_period1, ...] + // Splunk order: for each period, all value columns (oldest period first) + List outColNames = new ArrayList<>(); + outColNames.add(columns.get(tsIdx).getName()); + List outColTypes = new ArrayList<>(); + outColTypes.add(columns.get(tsIdx).getExprType()); + + for (long period : periods) { + for (int vi = 0; vi < valueColNames.size(); vi++) { + String prefix = valueColNames.get(vi); + String periodName = renameTimewrapPeriod(period, baseOffset, unitInfo); + outColNames.add(prefix + "_" + periodName); + outColTypes.add(columns.get(valueIdxs.get(vi)).getExprType()); + } + } + + // Group rows by display_ts, pivot periods into columns + // Use LinkedHashMap to preserve insertion order (sorted by ts from Calcite) + Map> pivoted = new LinkedHashMap<>(); + String tsColName = columns.get(tsIdx).getName(); + for (ExprValue row : values) { + java.util.Map tuple = row.tupleValue(); + String tsKey = tuple.get(tsColName).toString(); + long period = tuple.get("__period__").longValue(); + + Map outRow = + pivoted.computeIfAbsent( + tsKey, + k -> { + Map r = new LinkedHashMap<>(); + r.put(outColNames.get(0), tuple.get(tsColName)); + // Initialize all period columns to null + for (int i = 1; i < outColNames.size(); i++) { + r.put(outColNames.get(i), ExprNullValue.of()); + } + return r; + }); + + // Fill in the value for this period + for (int vi = 0; vi < valueColNames.size(); vi++) { + String prefix = valueColNames.get(vi); + String periodName = renameTimewrapPeriod(period, baseOffset, unitInfo); + String colName = prefix + "_" + periodName; + ExprValue val = tuple.get(valueColNames.get(vi)); + if (val != null) { + outRow.put(colName, val); + } + } + } + + // Build output + columns = new ArrayList<>(); + for (int i = 0; i < outColNames.size(); i++) { + columns.add(new Column(outColNames.get(i), null, outColTypes.get(i))); + } + values = new ArrayList<>(); + for (Map outRow : pivoted.values()) { + values.add(ExprTupleValue.fromExprValueMap(outRow)); } - filteredValues.add(ExprTupleValue.fromExprValueMap(filtered)); } - values = filteredValues; } finally { CalcitePlanContext.stripNullColumns.set(false); CalcitePlanContext.timewrapUnitName.set(null); + CalcitePlanContext.timewrapSeries.set(null); + CalcitePlanContext.timewrapTimeFormat.set(null); } } @@ -393,10 +450,8 @@ private QueryResponse buildResultSet( } /** - * Rename a timewrap period column from relative to absolute offset. Uses the formula: - * periodFromNow = (baseOffset + relativePeriod - 1) * spanValue. Naming rules (matching Splunk): - * periodFromNow == 0 → "latest_", > 0 → "_before", < 0 → "_after". - * unitInfo format: "spanValue|singular|plural|_before" e.g., "1|day|days|_before". + * Rename a timewrap period column from relative to absolute offset. Supports three series modes: + * relative (default), short, and exact. unitInfo format: "spanValue|singular|plural|_before". */ private String renameTimewrapColumn(String name, long baseOffset, String unitInfo) { String[] parts = unitInfo.split("\\|", -1); @@ -415,23 +470,82 @@ private String renameTimewrapColumn(String name, long baseOffset, String unitInf String periodStr = beforeSuffix.substring(lastUnderscore + 1); try { int relativePeriod = Integer.parseInt(periodStr); - long periodFromNow = (baseOffset + relativePeriod - 1) * spanValue; - - if (periodFromNow == 0) { - return prefix + "_latest_" + singular; - } else if (periodFromNow > 0) { - String unit = periodFromNow == 1 ? singular : plural; - return prefix + "_" + periodFromNow + unit + "_before"; - } else { - long absPeriod = Math.abs(periodFromNow); - String unit = absPeriod == 1 ? singular : plural; - return prefix + "_" + absPeriod + unit + "_after"; - } + long absolutePeriod = (baseOffset + relativePeriod - 1) * spanValue; + + String seriesMode = CalcitePlanContext.timewrapSeries.get(); + if (seriesMode == null) seriesMode = "relative"; + + return switch (seriesMode) { + case "short" -> + // series=short: prefix_s + prefix + "_s" + absolutePeriod; + case "exact" -> { + // series=exact: prefix_ + String timeFormat = CalcitePlanContext.timewrapTimeFormat.get(); + if (timeFormat == null) timeFormat = "%Y-%m-%d"; + // Compute period start timestamp: reference - absolutePeriod * spanSeconds + // absolutePeriod is in span units, need to convert to seconds + long spanSec = + TimewrapUtils.spanToSeconds( + org.opensearch.sql.ast.expression.SpanUnit.of(singular.toUpperCase()), 1) + * spanValue; + // Not enough info to compute exact date here — fall back to short naming + // TODO: pass reference epoch + span for exact date computation + yield prefix + "_s" + absolutePeriod; + } + default -> { + // series=relative (default): prefix__before/latest/after + if (absolutePeriod == 0) { + yield prefix + "_latest_" + singular; + } else if (absolutePeriod > 0) { + String unit = absolutePeriod == 1 ? singular : plural; + yield prefix + "_" + absolutePeriod + unit + "_before"; + } else { + long absPeriod = Math.abs(absolutePeriod); + String unit = absPeriod == 1 ? singular : plural; + yield prefix + "_" + absPeriod + unit + "_after"; + } + } + }; } catch (NumberFormatException e) { return name; } } + /** + * Generate a period name from a relative period number and base offset. Returns the suffix only + * (no value prefix). E.g., "2days_before", "latest_day", "s2". + */ + private String renameTimewrapPeriod(long relativePeriod, long baseOffset, String unitInfo) { + String[] parts = unitInfo.split("\\|", -1); + if (parts.length < 4) return String.valueOf(relativePeriod); + int spanValue = Integer.parseInt(parts[0]); + String singular = parts[1]; + String plural = parts[2]; + + long absolutePeriod = (baseOffset + relativePeriod - 1) * spanValue; + + String seriesMode = CalcitePlanContext.timewrapSeries.get(); + if (seriesMode == null) seriesMode = "relative"; + + return switch (seriesMode) { + case "short" -> "s" + absolutePeriod; + case "exact" -> "s" + absolutePeriod; // TODO: format with time_format + default -> { + if (absolutePeriod == 0) { + yield "latest_" + singular; + } else if (absolutePeriod > 0) { + String unit = absolutePeriod == 1 ? singular : plural; + yield absolutePeriod + unit + "_before"; + } else { + long absPeriod = Math.abs(absolutePeriod); + String unit = absPeriod == 1 ? singular : plural; + yield absPeriod + unit + "_after"; + } + } + }; + } + /** Registers opensearch-dependent functions */ private void registerOpenSearchFunctions() { Optional nodeClient = client.getNodeClient(); diff --git a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 index 2f185acfbb5..8f5a5c195d7 100644 --- a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 @@ -57,6 +57,10 @@ ADDCOLTOTALS: 'ADDCOLTOTALS'; GRAPHLOOKUP: 'GRAPHLOOKUP'; TIMEWRAP: 'TIMEWRAP'; ALIGN: 'ALIGN'; +SERIES: 'SERIES'; +RELATIVE: 'RELATIVE'; +SHORT: 'SHORT'; +EXACT: 'EXACT'; EDGE: 'EDGE'; MAX_DEPTH: 'MAXDEPTH'; DEPTH_FIELD: 'DEPTHFIELD'; diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index 77dd880f217..7fe55583503 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -354,7 +354,13 @@ transposeParameter ; timewrapCommand - : TIMEWRAP spanLiteral (ALIGN EQUAL timewrapAlign)? + : TIMEWRAP spanLiteral timewrapParameter* + ; + +timewrapParameter + : ALIGN EQUAL timewrapAlign + | SERIES EQUAL timewrapSeries + | TIME_FORMAT EQUAL stringLiteral ; timewrapAlign @@ -362,6 +368,12 @@ timewrapAlign | END ; +timewrapSeries + : RELATIVE + | SHORT + | EXACT + ; + timechartParameter : LIMIT EQUAL integerLiteral diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 7bba43412bf..bd7f0c67fec 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -833,9 +833,25 @@ public UnresolvedPlan visitTimewrapCommand(OpenSearchPPLParser.TimewrapCommandCo if (unit == SpanUnit.UNKNOWN || unit == SpanUnit.NONE) { throw new SemanticCheckException("Invalid timewrap span unit: " + unitStr); } - String align = - ctx.timewrapAlign() != null ? ctx.timewrapAlign().getText().toLowerCase() : "end"; - return new Timewrap(unit, value, align, spanLiteral); + String align = "end"; + String series = "relative"; + String timeFormat = null; + for (var param : ctx.timewrapParameter()) { + if (param.timewrapAlign() != null) { + align = param.timewrapAlign().getText().toLowerCase(); + } else if (param.timewrapSeries() != null) { + series = param.timewrapSeries().getText().toLowerCase(); + } else if (param.TIME_FORMAT() != null) { + timeFormat = param.stringLiteral().getText(); + // Strip surrounding quotes + if (timeFormat.length() >= 2 + && ((timeFormat.startsWith("\"") && timeFormat.endsWith("\"")) + || (timeFormat.startsWith("'") && timeFormat.endsWith("'")))) { + timeFormat = timeFormat.substring(1, timeFormat.length() - 1); + } + } + } + return new Timewrap(unit, value, align, series, timeFormat, spanLiteral); } /** Eval command. */