-
Notifications
You must be signed in to change notification settings - Fork 8
Added OTEL tracing #196
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Added OTEL tracing #196
Changes from all commits
722bf3e
52cb04d
cc9ee12
f7cc658
8b2a2f1
0686e28
2d9e21c
3a5283a
4b999f1
4b86715
9c13d07
3e0b902
d446e80
16b0e10
7ad857f
468f940
7aae664
902a7df
9e5adb7
1d7457e
ff3679c
e6be925
aebe1ca
77eb9e9
5f94077
786bd00
33fbce2
ed6bc93
2369ba1
554baa2
7221ec4
b7e7999
2fd00fa
e4f69ed
a6fdbb1
229214f
30a67d2
6897e13
5aae712
0d4d825
106fcd8
d437b47
85eba77
4004a7d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,8 +3,11 @@ | |
| import functools | ||
| import logging | ||
| from collections.abc import Callable | ||
| from contextlib import ExitStack | ||
| from typing import Any | ||
|
|
||
| from opentelemetry import trace | ||
|
|
||
| from workflows.recipe.recipe import Recipe | ||
| from workflows.recipe.validate import validate_recipe | ||
| from workflows.recipe.wrapper import RecipeWrapper | ||
|
|
@@ -69,10 +72,35 @@ def unwrap_recipe(header, message): | |
| message = mangle_for_receiving(message) | ||
| if header.get("workflows-recipe") in {True, "True", "true", 1}: | ||
| rw = RecipeWrapper(message=message, transport=transport_layer) | ||
| if log_extender and rw.environment and rw.environment.get("ID"): | ||
| with log_extender("recipe_ID", rw.environment["ID"]): | ||
|
|
||
| if log_extender and rw.environment["ID"]: | ||
| # Extract recipe ID from environment and add to current span | ||
| span = trace.get_current_span() | ||
| recipe_id = rw.environment["ID"] | ||
| span.set_attribute("recipe_id", recipe_id) | ||
|
|
||
| # Extract span_id and trace_id for logging | ||
| span_context = span.get_span_context() | ||
| otel_logs = None | ||
| if span_context.is_valid: | ||
| span_id = span_context.span_id | ||
| trace_id = span_context.trace_id | ||
|
|
||
| otel_logs = { | ||
| "span_id": span_id, | ||
| "trace_id": trace_id, | ||
| "recipe_id": recipe_id, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. another note that this recipe_id is redundant as it is already injected onto the log message |
||
| } | ||
|
|
||
| with ExitStack() as stack: | ||
| # Configure the context depending on if service is emitting spans | ||
| stack.enter_context(log_extender("recipe_ID", recipe_id)) | ||
| if otel_logs: | ||
| stack.enter_context(log_extender("otel_logs", otel_logs)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| return callback(rw, header, message.get("payload")) | ||
|
|
||
| return callback(rw, header, message.get("payload")) | ||
|
|
||
| if allow_non_recipe_messages: | ||
| return callback(None, header, message) | ||
| # self.log.warning('Discarding non-recipe message:\n' + \ | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -9,8 +9,15 @@ | |||||||||||||
| import time | ||||||||||||||
| from typing import Any | ||||||||||||||
|
|
||||||||||||||
| from opentelemetry import trace | ||||||||||||||
| from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter | ||||||||||||||
| from opentelemetry.sdk.resources import SERVICE_NAME, Resource | ||||||||||||||
| from opentelemetry.sdk.trace import TracerProvider | ||||||||||||||
| from opentelemetry.sdk.trace.export import BatchSpanProcessor | ||||||||||||||
|
|
||||||||||||||
| import workflows | ||||||||||||||
| import workflows.logging | ||||||||||||||
| from workflows.transport.middleware.otel_tracing import OTELTracingMiddleware | ||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| class Status(enum.Enum): | ||||||||||||||
|
|
@@ -185,6 +192,40 @@ def start_transport(self): | |||||||||||||
| self.transport.subscription_callback_set_intercept( | ||||||||||||||
| self._transport_interceptor | ||||||||||||||
| ) | ||||||||||||||
|
|
||||||||||||||
| # Configure OTELTracing if configuration is available | ||||||||||||||
| otel_config = ( | ||||||||||||||
| self.config._opentelemetry | ||||||||||||||
| if self.config and hasattr(self.config, "_opentelemetry") | ||||||||||||||
| else None | ||||||||||||||
| ) | ||||||||||||||
|
Comment on lines
+197
to
+201
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
| if otel_config: | ||||||||||||||
| # Configure OTELTracing | ||||||||||||||
| resource = Resource.create( | ||||||||||||||
| { | ||||||||||||||
| SERVICE_NAME: self._service_name, | ||||||||||||||
| } | ||||||||||||||
| ) | ||||||||||||||
|
|
||||||||||||||
| self.log.debug("Configuring OTELTracing") | ||||||||||||||
| provider = TracerProvider(resource=resource) | ||||||||||||||
| trace.set_tracer_provider(provider) | ||||||||||||||
|
|
||||||||||||||
| # Configure BatchProcessor and OTLPSpanExporter using config values | ||||||||||||||
| otlp_exporter = OTLPSpanExporter( | ||||||||||||||
| endpoint=otel_config["endpoint"], | ||||||||||||||
| timeout=otel_config.get("timeout", 10), | ||||||||||||||
| ) | ||||||||||||||
| span_processor = BatchSpanProcessor(otlp_exporter) | ||||||||||||||
| provider.add_span_processor(span_processor) | ||||||||||||||
|
|
||||||||||||||
| # Add OTELTracingMiddleware to the transport layer | ||||||||||||||
| tracer = trace.get_tracer(__name__) | ||||||||||||||
| otel_middleware = OTELTracingMiddleware( | ||||||||||||||
| tracer, service_name=self._service_name | ||||||||||||||
| ) | ||||||||||||||
| self._transport.add_middleware(otel_middleware) | ||||||||||||||
|
|
||||||||||||||
| metrics = self._environment.get("metrics") | ||||||||||||||
| if metrics: | ||||||||||||||
| import prometheus_client | ||||||||||||||
|
|
||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,227 @@ | ||
| from __future__ import annotations | ||
davidigandan marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| import functools | ||
| from collections.abc import Callable | ||
|
|
||
| from opentelemetry import trace | ||
| from opentelemetry.context import Context | ||
| from opentelemetry.propagate import extract, inject | ||
|
|
||
| from workflows.transport.common_transport import MessageCallback, TemporarySubscription | ||
|
|
||
|
|
||
| class OTELTracingMiddleware: | ||
| def __init__(self, tracer: trace.Tracer, service_name: str): | ||
| self.tracer = tracer | ||
| self.service_name = service_name | ||
|
|
||
| def _set_span_attributes(self, span, **attributes): | ||
| """Helper method to set common span attributes""" | ||
| span.set_attribute("service_name", self.service_name) | ||
| for key, value in attributes.items(): | ||
| if value is not None: | ||
| span.set_attribute(key, value) | ||
|
|
||
| def send(self, call_next: Callable, destination: str, message, **kwargs): | ||
| # Get current span context (may be None if this is the root span) | ||
| current_span = trace.get_current_span() | ||
| parent_context = ( | ||
| trace.set_span_in_context(current_span) if current_span else None | ||
| ) | ||
|
|
||
| with self.tracer.start_as_current_span( | ||
| "transport.send", | ||
| context=parent_context, | ||
| ) as span: | ||
| self._set_span_attributes(span, destination=destination) | ||
|
|
||
| # Inject the current trace context into the message headers | ||
| headers = kwargs.get("headers", {}) | ||
| if headers is None: | ||
| headers = {} | ||
| inject(headers) # This modifies headers in-place | ||
| kwargs["headers"] = headers | ||
|
|
||
| return call_next(destination, message, **kwargs) | ||
|
|
||
| def subscribe( | ||
| self, call_next: Callable, channel: str, callback: Callable, **kwargs | ||
| ) -> int: | ||
| @functools.wraps(callback) | ||
| def wrapped_callback(header, message): | ||
| # Extract trace context from message headers | ||
| ctx = extract(header) if header else Context() | ||
|
|
||
| # Start a new span with the extracted context | ||
| with self.tracer.start_as_current_span( | ||
| "transport.subscribe", | ||
| context=ctx, | ||
| ) as span: | ||
| self._set_span_attributes(span, channel=channel) | ||
|
|
||
| # Call the original callback - this will process the message | ||
| # and potentially call send() which will pick up this context | ||
| return callback(header, message) | ||
|
|
||
| return call_next(channel, wrapped_callback, **kwargs) | ||
|
|
||
| def subscribe_broadcast( | ||
| self, call_next: Callable, channel: str, callback: Callable, **kwargs | ||
| ) -> int: | ||
| @functools.wraps(callback) | ||
| def wrapped_callback(header, message): | ||
| # Extract trace context from message headers | ||
| ctx = extract(header) if header else Context() | ||
|
|
||
| # Start a new span with the extracted context | ||
| with self.tracer.start_as_current_span( | ||
| "transport.subscribe_broadcast", | ||
| context=ctx, | ||
| ) as span: | ||
| self._set_span_attributes(span, channel=channel) | ||
|
|
||
| return callback(header, message) | ||
|
|
||
| return call_next(channel, wrapped_callback, **kwargs) | ||
|
|
||
| def subscribe_temporary( | ||
| self, | ||
| call_next: Callable, | ||
| channel_hint: str | None, | ||
| callback: MessageCallback, | ||
| **kwargs, | ||
| ) -> TemporarySubscription: | ||
| @functools.wraps(callback) | ||
| def wrapped_callback(header, message): | ||
| # Extract trace context from message headers | ||
| ctx = extract(header) if header else Context() | ||
|
|
||
| # Start a new span with the extracted context | ||
| with self.tracer.start_as_current_span( | ||
| "transport.subscribe_temporary", | ||
| context=ctx, | ||
| ) as span: | ||
| self._set_span_attributes(span, channel_hint=channel_hint) | ||
|
|
||
| return callback(header, message) | ||
|
|
||
| return call_next(channel_hint, wrapped_callback, **kwargs) | ||
|
|
||
| def unsubscribe( | ||
| self, | ||
| call_next: Callable, | ||
| subscription: int, | ||
| drop_callback_reference=False, | ||
| **kwargs, | ||
| ): | ||
| # Get current span context | ||
| current_span = trace.get_current_span() | ||
| current_context = ( | ||
| trace.set_span_in_context(current_span) if current_span else Context() | ||
| ) | ||
|
|
||
| with self.tracer.start_as_current_span( | ||
| "transport.unsubscribe", | ||
| context=current_context, | ||
| ) as span: | ||
| self._set_span_attributes(span, subscription_id=subscription) | ||
|
|
||
| call_next( | ||
| subscription, drop_callback_reference=drop_callback_reference, **kwargs | ||
| ) | ||
|
|
||
| def ack( | ||
| self, | ||
| call_next: Callable, | ||
| message, | ||
| subscription_id: int | None = None, | ||
| **kwargs, | ||
| ): | ||
| # Get current span context | ||
| current_span = trace.get_current_span() | ||
| current_context = ( | ||
| trace.set_span_in_context(current_span) if current_span else Context() | ||
| ) | ||
|
|
||
| with self.tracer.start_as_current_span( | ||
| "transport.ack", | ||
| context=current_context, | ||
| ) as span: | ||
| self._set_span_attributes(span, subscription_id=subscription_id) | ||
|
|
||
| call_next(message, subscription_id=subscription_id, **kwargs) | ||
|
|
||
| def nack( | ||
| self, | ||
| call_next: Callable, | ||
| message, | ||
| subscription_id: int | None = None, | ||
| **kwargs, | ||
| ): | ||
| # Get current span context | ||
| current_span = trace.get_current_span() | ||
| current_context = ( | ||
| trace.set_span_in_context(current_span) if current_span else Context() | ||
| ) | ||
|
|
||
| with self.tracer.start_as_current_span( | ||
| "transport.nack", | ||
| context=current_context, | ||
| ) as span: | ||
| self._set_span_attributes(span, subscription_id=subscription_id) | ||
|
|
||
| call_next(message, subscription_id=subscription_id, **kwargs) | ||
|
|
||
| def transaction_begin( | ||
| self, call_next: Callable, subscription_id: int | None = None, **kwargs | ||
| ) -> int: | ||
| """Start a new transaction span""" | ||
| # Get current span context (may be None if this is the root span) | ||
| current_span = trace.get_current_span() | ||
| current_context = ( | ||
| trace.set_span_in_context(current_span) if current_span else Context() | ||
| ) | ||
|
|
||
| with self.tracer.start_as_current_span( | ||
| "transaction.begin", | ||
| context=current_context, | ||
| ) as span: | ||
| self._set_span_attributes(span, subscription_id=subscription_id) | ||
|
|
||
| return call_next(subscription_id=subscription_id, **kwargs) | ||
|
|
||
| def transaction_abort( | ||
| self, call_next: Callable, transaction_id: int | None = None, **kwargs | ||
| ): | ||
| """Abort a transaction span""" | ||
| # Get current span context | ||
| current_span = trace.get_current_span() | ||
| current_context = ( | ||
| trace.set_span_in_context(current_span) if current_span else Context() | ||
| ) | ||
|
|
||
| with self.tracer.start_as_current_span( | ||
| "transaction.abort", | ||
| context=current_context, | ||
| ) as span: | ||
| self._set_span_attributes(span, transaction_id=transaction_id) | ||
|
|
||
| call_next(transaction_id=transaction_id, **kwargs) | ||
|
|
||
| def transaction_commit( | ||
| self, call_next: Callable, transaction_id: int | None = None, **kwargs | ||
| ): | ||
| """Commit a transaction span""" | ||
| # Get current span context | ||
| current_span = trace.get_current_span() | ||
| current_context = ( | ||
| trace.set_span_in_context(current_span) if current_span else Context() | ||
| ) | ||
|
|
||
| with self.tracer.start_as_current_span( | ||
| "transaction.commit", | ||
| context=current_context, | ||
| ) as span: | ||
| self._set_span_attributes(span, transaction_id=transaction_id) | ||
|
|
||
| call_next(transaction_id=transaction_id, **kwargs) | ||

There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this need to be
.get()- KeyError if "ID" is not in environment?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suddenly forgetting what is and isn't assumed to be present