Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,21 @@ func (c *conn) ExecContext(ctx context.Context, query string, args []driver.Name
log, ctx = client.LoggerAndContext(ctx, exStmtResp)
stagingErr := c.execStagingOperation(exStmtResp, ctx)

// Telemetry: track statement execution
var statementID string
if c.telemetry != nil && exStmtResp != nil && exStmtResp.OperationHandle != nil && exStmtResp.OperationHandle.OperationId != nil {
statementID = client.SprintGuid(exStmtResp.OperationHandle.OperationId.GUID)
ctx = c.telemetry.BeforeExecute(ctx, statementID)
defer func() {
finalErr := err
if stagingErr != nil {
finalErr = stagingErr
}
c.telemetry.AfterExecute(ctx, finalErr)
c.telemetry.CompleteStatement(ctx, statementID, finalErr != nil)
}()
}

if exStmtResp != nil && exStmtResp.OperationHandle != nil {
// since we have an operation handle we can close the operation if necessary
alreadyClosed := exStmtResp.DirectResults != nil && exStmtResp.DirectResults.CloseOperation != nil
Expand Down Expand Up @@ -172,6 +187,17 @@ func (c *conn) QueryContext(ctx context.Context, query string, args []driver.Nam
log, ctx = client.LoggerAndContext(ctx, exStmtResp)
defer log.Duration(msg, start)

// Telemetry: track statement execution
var statementID string
if c.telemetry != nil && exStmtResp != nil && exStmtResp.OperationHandle != nil && exStmtResp.OperationHandle.OperationId != nil {
statementID = client.SprintGuid(exStmtResp.OperationHandle.OperationId.GUID)
ctx = c.telemetry.BeforeExecute(ctx, statementID)
defer func() {
c.telemetry.AfterExecute(ctx, err)
c.telemetry.CompleteStatement(ctx, statementID, err != nil)
}()
}

if err != nil {
log.Err(err).Msg("databricks: failed to run query") // To log query we need to redact credentials
return nil, dbsqlerrint.NewExecutionError(ctx, dbsqlerr.ErrQueryExecution, err, opStatusResp)
Expand Down
65 changes: 34 additions & 31 deletions telemetry/DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -2116,34 +2116,34 @@ func BenchmarkInterceptor_Disabled(b *testing.B) {
- [x] Test server error handling
- [x] Test unreachable server scenarios

### Phase 6: Collection & Aggregation (PECOBLR-1381)
- [ ] Implement `interceptor.go` for metric collection
- [ ] Implement beforeExecute() and afterExecute() hooks
- [ ] Implement context-based metric tracking with metricContext
- [ ] Implement latency measurement (startTime, latencyMs calculation)
- [ ] Add tag collection methods (addTag)
- [ ] Implement error swallowing with panic recovery
- [ ] Implement `aggregator.go` for batching
- [ ] Implement statement-level aggregation (statementMetrics)
- [ ] Implement batch size and flush interval logic
- [ ] Implement background flush goroutine (flushLoop)
- [ ] Add thread-safe metric recording
- [ ] Implement completeStatement() for final aggregation
- [ ] Implement error classification in `errors.go`
- [ ] Implement error type classification (terminal vs retryable)
- [ ] Implement HTTP status code classification
- [ ] Add error pattern matching
- [ ] Implement isTerminalError() function
- [ ] Update `client.go` to integrate aggregator
- [ ] Wire up aggregator with exporter
- [ ] Implement background flush timer
- [ ] Update start() and close() methods
- [ ] Add unit tests for collection and aggregation
- [ ] Test interceptor metric collection and latency tracking
- [ ] Test aggregation logic
- [ ] Test batch flushing (size-based and time-based)
- [ ] Test error classification
- [ ] Test client with aggregator integration
### Phase 6: Collection & Aggregation (PECOBLR-1381) ✅ COMPLETED
- [x] Implement `interceptor.go` for metric collection
- [x] Implement beforeExecute() and afterExecute() hooks
- [x] Implement context-based metric tracking with metricContext
- [x] Implement latency measurement (startTime, latencyMs calculation)
- [x] Add tag collection methods (addTag)
- [x] Implement error swallowing with panic recovery
- [x] Implement `aggregator.go` for batching
- [x] Implement statement-level aggregation (statementMetrics)
- [x] Implement batch size and flush interval logic
- [x] Implement background flush goroutine (flushLoop)
- [x] Add thread-safe metric recording
- [x] Implement completeStatement() for final aggregation
- [x] Implement error classification in `errors.go`
- [x] Implement error type classification (terminal vs retryable)
- [x] Implement HTTP status code classification
- [x] Add error pattern matching
- [x] Implement isTerminalError() function
- [x] Update `client.go` to integrate aggregator
- [x] Wire up aggregator with exporter
- [x] Implement background flush timer
- [x] Update start() and close() methods
- [x] Add unit tests for collection and aggregation
- [x] Test interceptor metric collection and latency tracking
- [x] Test aggregation logic
- [x] Test batch flushing (size-based and time-based)
- [x] Test error classification
- [x] Test client with aggregator integration

### Phase 7: Driver Integration ✅ COMPLETED
- [x] Add telemetry initialization to `connection.go`
Expand All @@ -2167,9 +2167,12 @@ func BenchmarkInterceptor_Disabled(b *testing.B) {
- [x] Test compilation with telemetry
- [x] Test no breaking changes to existing tests
- [x] Test graceful handling when disabled

Note: Statement execution hooks (beforeExecute/afterExecute in statement.go) for
actual metric collection can be added as follow-up enhancement.
- [x] Statement execution hooks
- [x] Add beforeExecute() hook to QueryContext
- [x] Add afterExecute() and completeStatement() hooks to QueryContext
- [x] Add beforeExecute() hook to ExecContext
- [x] Add afterExecute() and completeStatement() hooks to ExecContext
- [x] Use operation handle GUID as statement ID

### Phase 8: Testing & Validation
- [ ] Run benchmark tests
Expand Down
9 changes: 0 additions & 9 deletions telemetry/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ type metricsAggregator struct {
}

// statementMetrics holds aggregated metrics for a statement.
//
//nolint:unused // Will be used in Phase 8+
type statementMetrics struct {
statementID string
sessionID string
Expand Down Expand Up @@ -63,8 +61,6 @@ func newMetricsAggregator(exporter *telemetryExporter, cfg *Config) *metricsAggr
}

// recordMetric records a metric for aggregation.
//
//nolint:unused // Will be used in Phase 8+
func (agg *metricsAggregator) recordMetric(ctx context.Context, metric *telemetryMetric) {
// Swallow all errors
defer func() {
Expand Down Expand Up @@ -136,8 +132,6 @@ func (agg *metricsAggregator) recordMetric(ctx context.Context, metric *telemetr
}

// completeStatement marks a statement as complete and emits aggregated metric.
//
//nolint:unused // Will be used in Phase 8+
func (agg *metricsAggregator) completeStatement(ctx context.Context, statementID string, failed bool) {
defer func() {
if r := recover(); r != nil {
Expand Down Expand Up @@ -248,13 +242,10 @@ func (agg *metricsAggregator) close(ctx context.Context) error {
}

// simpleError is a simple error implementation for testing.
//
//nolint:unused // Will be used in Phase 8+
type simpleError struct {
msg string
}

//nolint:unused // Will be used in Phase 8+
func (e *simpleError) Error() string {
return e.msg
}
16 changes: 5 additions & 11 deletions telemetry/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@ import (
// isTerminalError returns true if error is terminal (non-retryable).
// Terminal errors indicate user errors or permanent failures that won't
// be resolved by retrying the operation.
//
//nolint:unused // Will be used in Phase 8+

func isTerminalError(err error) bool {
if err == nil {
return false
Expand Down Expand Up @@ -45,8 +44,7 @@ func isTerminalError(err error) bool {

// classifyError classifies an error for telemetry purposes.
// Returns a string representation of the error type.
//
//nolint:unused // Will be used in Phase 8+

func classifyError(err error) string {
if err == nil {
return ""
Expand Down Expand Up @@ -89,14 +87,12 @@ func isRetryableError(err error) bool {
}

// httpError represents an HTTP error with status code.
//
//nolint:unused // Will be used in Phase 8+

type httpError struct {
statusCode int
message string
}

//nolint:unused // Will be used in Phase 8+
func (e *httpError) Error() string {
return e.message
}
Expand All @@ -112,16 +108,14 @@ func newHTTPError(statusCode int, message string) error {
}

// isTerminalHTTPStatus returns true for non-retryable HTTP status codes.
//
//nolint:unused // Will be used in Phase 8+

func isTerminalHTTPStatus(status int) bool {
// 4xx errors (except 429) are terminal
return status >= 400 && status < 500 && status != 429
}

// extractHTTPError extracts HTTP error information if available.
//
//nolint:unused // Will be used in Phase 8+

func extractHTTPError(err error) (*httpError, bool) {
var httpErr *httpError
if errors.As(err, &httpErr) {
Expand Down
36 changes: 12 additions & 24 deletions telemetry/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,14 @@ type Interceptor struct {
}

// metricContext holds metric collection state in context.
//
//nolint:unused // Will be used in Phase 8+
type metricContext struct {
statementID string
startTime time.Time
tags map[string]interface{}
}

//nolint:unused // Will be used in Phase 8+
type contextKey int

//nolint:unused // Will be used in Phase 8+
const metricContextKey contextKey = 0

// newInterceptor creates a new telemetry interceptor.
Expand All @@ -38,27 +34,22 @@ func newInterceptor(aggregator *metricsAggregator, enabled bool) *Interceptor {
}

// withMetricContext adds metric context to the context.
//
//nolint:unused // Will be used in Phase 8+
func withMetricContext(ctx context.Context, mc *metricContext) context.Context {
return context.WithValue(ctx, metricContextKey, mc)
}

// getMetricContext retrieves metric context from the context.
//
//nolint:unused // Will be used in Phase 8+
func getMetricContext(ctx context.Context) *metricContext {
if mc, ok := ctx.Value(metricContextKey).(*metricContext); ok {
return mc
}
return nil
}

// beforeExecute is called before statement execution.
// BeforeExecute is called before statement execution.
// Returns a new context with metric tracking attached.
//
//nolint:unused // Will be used in Phase 8+
func (i *Interceptor) beforeExecute(ctx context.Context, statementID string) context.Context {
// Exported for use by the driver package.
func (i *Interceptor) BeforeExecute(ctx context.Context, statementID string) context.Context {
if !i.enabled {
return ctx
}
Expand All @@ -72,11 +63,10 @@ func (i *Interceptor) beforeExecute(ctx context.Context, statementID string) con
return withMetricContext(ctx, mc)
}

// afterExecute is called after statement execution.
// AfterExecute is called after statement execution.
// Records the metric with timing and error information.
//
//nolint:unused // Will be used in Phase 8+
func (i *Interceptor) afterExecute(ctx context.Context, err error) {
// Exported for use by the driver package.
func (i *Interceptor) AfterExecute(ctx context.Context, err error) {
if !i.enabled {
return
}
Expand Down Expand Up @@ -109,10 +99,9 @@ func (i *Interceptor) afterExecute(ctx context.Context, err error) {
i.aggregator.recordMetric(ctx, metric)
}

// addTag adds a tag to the current metric context.
//
//nolint:unused // Will be used in Phase 8+
func (i *Interceptor) addTag(ctx context.Context, key string, value interface{}) {
// AddTag adds a tag to the current metric context.
// Exported for use by the driver package.
func (i *Interceptor) AddTag(ctx context.Context, key string, value interface{}) {
if !i.enabled {
return
}
Expand Down Expand Up @@ -146,10 +135,9 @@ func (i *Interceptor) recordConnection(ctx context.Context, tags map[string]inte
i.aggregator.recordMetric(ctx, metric)
}

// completeStatement marks a statement as complete and flushes aggregated metrics.
//
//nolint:unused // Will be used in Phase 8+
func (i *Interceptor) completeStatement(ctx context.Context, statementID string, failed bool) {
// CompleteStatement marks a statement as complete and flushes aggregated metrics.
// Exported for use by the driver package.
func (i *Interceptor) CompleteStatement(ctx context.Context, statementID string, failed bool) {
if !i.enabled {
return
}
Expand Down
Loading