Skip to content

Commit 6b65e36

Browse files
authored
Merge branch 'main' into worker-deployment-versioning
2 parents 31853fe + fb3dccd commit 6b65e36

File tree

4 files changed

+127
-85
lines changed

4 files changed

+127
-85
lines changed

temporalio/contrib/opentelemetry.py

+27-12
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,22 @@ class should return the workflow interceptor subclass from
7474
def __init__(
7575
self,
7676
tracer: Optional[opentelemetry.trace.Tracer] = None,
77+
*,
78+
always_create_workflow_spans: bool = False,
7779
) -> None:
7880
"""Initialize a OpenTelemetry tracing interceptor.
7981
8082
Args:
8183
tracer: The tracer to use. Defaults to
8284
:py:func:`opentelemetry.trace.get_tracer`.
85+
always_create_workflow_spans: When false, the default, spans are
86+
only created in workflows when an overarching span from the
87+
client is present. In cases of starting a workflow elsewhere,
88+
e.g. CLI or schedules, a client-created span is not present and
89+
workflow spans will not be created. Setting this to true will
90+
create spans in workflows no matter what, but there is a risk of
91+
them being orphans since they may not have a parent span after
92+
replaying.
8393
"""
8494
self.tracer = tracer or opentelemetry.trace.get_tracer(__name__)
8595
# To customize any of this, users must subclass. We intentionally don't
@@ -90,6 +100,7 @@ def __init__(
90100
self.text_map_propagator: opentelemetry.propagators.textmap.TextMapPropagator = default_text_map_propagator
91101
# TODO(cretz): Should I be using the configured one at the client and activity level?
92102
self.payload_converter = temporalio.converter.PayloadConverter.default
103+
self._always_create_workflow_spans = always_create_workflow_spans
93104

94105
def intercept_client(
95106
self, next: temporalio.client.OutboundInterceptor
@@ -165,10 +176,15 @@ def _start_as_current_span(
165176

166177
def _completed_workflow_span(
167178
self, params: _CompletedWorkflowSpanParams
168-
) -> _CarrierDict:
179+
) -> Optional[_CarrierDict]:
169180
# Carrier to context, start span, set span as current on context,
170181
# context back to carrier
171182

183+
# If the parent is missing and user hasn't said to always create, do not
184+
# create
185+
if params.parent_missing and not self._always_create_workflow_spans:
186+
return None
187+
172188
# Extract the context
173189
context = self.text_map_propagator.extract(params.context)
174190
# Create link if there is a span present
@@ -286,7 +302,7 @@ class _InputWithHeaders(Protocol):
286302

287303
class _WorkflowExternFunctions(TypedDict):
288304
__temporal_opentelemetry_completed_span: Callable[
289-
[_CompletedWorkflowSpanParams], _CarrierDict
305+
[_CompletedWorkflowSpanParams], Optional[_CarrierDict]
290306
]
291307

292308

@@ -299,6 +315,7 @@ class _CompletedWorkflowSpanParams:
299315
link_context: Optional[_CarrierDict]
300316
exception: Optional[Exception]
301317
kind: opentelemetry.trace.SpanKind
318+
parent_missing: bool
302319

303320

304321
_interceptor_context_key = opentelemetry.context.create_key(
@@ -529,17 +546,13 @@ def _completed_span(
529546
exception: Optional[Exception] = None,
530547
kind: opentelemetry.trace.SpanKind = opentelemetry.trace.SpanKind.INTERNAL,
531548
) -> None:
532-
# If there is no span on the context, we do not create a span
533-
if opentelemetry.trace.get_current_span() is opentelemetry.trace.INVALID_SPAN:
534-
return None
535-
536549
# If we are replaying and they don't want a span on replay, no span
537550
if temporalio.workflow.unsafe.is_replaying() and not new_span_even_on_replay:
538551
return None
539552

540553
# Create the span. First serialize current context to carrier.
541-
context_carrier: _CarrierDict = {}
542-
self.text_map_propagator.inject(context_carrier)
554+
new_context_carrier: _CarrierDict = {}
555+
self.text_map_propagator.inject(new_context_carrier)
543556
# Invoke
544557
info = temporalio.workflow.info()
545558
attributes: Dict[str, opentelemetry.util.types.AttributeValue] = {
@@ -548,25 +561,27 @@ def _completed_span(
548561
}
549562
if additional_attributes:
550563
attributes.update(additional_attributes)
551-
context_carrier = self._extern_functions[
564+
updated_context_carrier = self._extern_functions[
552565
"__temporal_opentelemetry_completed_span"
553566
](
554567
_CompletedWorkflowSpanParams(
555-
context=context_carrier,
568+
context=new_context_carrier,
556569
name=span_name,
557570
# Always set span attributes as workflow ID and run ID
558571
attributes=attributes,
559572
time_ns=temporalio.workflow.time_ns(),
560573
link_context=link_context_carrier,
561574
exception=exception,
562575
kind=kind,
576+
parent_missing=opentelemetry.trace.get_current_span()
577+
is opentelemetry.trace.INVALID_SPAN,
563578
)
564579
)
565580

566581
# Add to outbound if needed
567-
if add_to_outbound:
582+
if add_to_outbound and updated_context_carrier:
568583
add_to_outbound.headers = self._context_carrier_to_headers(
569-
context_carrier, add_to_outbound.headers
584+
updated_context_carrier, add_to_outbound.headers
570585
)
571586

572587
def _set_on_context(

temporalio/worker/_workflow_instance.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -1718,10 +1718,11 @@ def _instantiate_workflow_object(self) -> Any:
17181718

17191719
def _is_workflow_failure_exception(self, err: BaseException) -> bool:
17201720
# An exception is a failure instead of a task fail if it's already a
1721-
# failure error or if it is an instance of any of the failure types in
1722-
# the worker or workflow-level setting
1721+
# failure error or if it is a timeout error or if it is an instance of
1722+
# any of the failure types in the worker or workflow-level setting
17231723
return (
17241724
isinstance(err, temporalio.exceptions.FailureError)
1725+
or isinstance(err, asyncio.TimeoutError)
17251726
or any(isinstance(err, typ) for typ in self._defn.failure_exception_types)
17261727
or any(
17271728
isinstance(err, typ)

tests/contrib/test_opentelemetry.py

+50
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,56 @@ def dump_spans(
332332
return ret
333333

334334

335+
@workflow.defn
336+
class SimpleWorkflow:
337+
@workflow.run
338+
async def run(self) -> str:
339+
return "done"
340+
341+
342+
async def test_opentelemetry_always_create_workflow_spans(client: Client):
343+
# Create a tracer that has an in-memory exporter
344+
exporter = InMemorySpanExporter()
345+
provider = TracerProvider()
346+
provider.add_span_processor(SimpleSpanProcessor(exporter))
347+
tracer = get_tracer(__name__, tracer_provider=provider)
348+
349+
# Create a worker with an interceptor without always create
350+
async with Worker(
351+
client,
352+
task_queue=f"task_queue_{uuid.uuid4()}",
353+
workflows=[SimpleWorkflow],
354+
interceptors=[TracingInterceptor(tracer)],
355+
) as worker:
356+
assert "done" == await client.execute_workflow(
357+
SimpleWorkflow.run,
358+
id=f"workflow_{uuid.uuid4()}",
359+
task_queue=worker.task_queue,
360+
)
361+
# Confirm the spans are not there
362+
spans = exporter.get_finished_spans()
363+
logging.debug("Spans:\n%s", "\n".join(dump_spans(spans, with_attributes=False)))
364+
assert len(spans) == 0
365+
366+
# Now create a worker with an interceptor with always create
367+
async with Worker(
368+
client,
369+
task_queue=f"task_queue_{uuid.uuid4()}",
370+
workflows=[SimpleWorkflow],
371+
interceptors=[TracingInterceptor(tracer, always_create_workflow_spans=True)],
372+
) as worker:
373+
assert "done" == await client.execute_workflow(
374+
SimpleWorkflow.run,
375+
id=f"workflow_{uuid.uuid4()}",
376+
task_queue=worker.task_queue,
377+
)
378+
# Confirm the spans are not there
379+
spans = exporter.get_finished_spans()
380+
logging.debug("Spans:\n%s", "\n".join(dump_spans(spans, with_attributes=False)))
381+
assert len(spans) > 0
382+
assert spans[0].name == "RunWorkflow:SimpleWorkflow"
383+
384+
335385
# TODO(cretz): Additional tests to write
336386
# * query without interceptor (no headers)
337387
# * workflow without interceptor (no headers) but query with interceptor (headers)

0 commit comments

Comments
 (0)