Skip to content

Commit 1bbff88

Browse files
committed
Expose deployment version on contexts
1 parent f4f6483 commit 1bbff88

File tree

7 files changed

+100
-26
lines changed

7 files changed

+100
-26
lines changed

temporalio/bridge/Cargo.lock

+12-12
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

temporalio/bridge/src/worker.rs

+23-4
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use log::error;
55
use prost::Message;
66
use pyo3::exceptions::{PyException, PyRuntimeError, PyValueError};
77
use pyo3::prelude::*;
8-
use pyo3::types::{PyBytes, PyTuple};
8+
use pyo3::types::{PyBytes, PyDict, PyTuple};
99
use std::collections::HashMap;
1010
use std::collections::HashSet;
1111
use std::marker::PhantomData;
@@ -91,12 +91,31 @@ pub struct LegacyBuildIdBased {
9191
}
9292

9393
/// Recreates [temporal_sdk_core_api::worker::WorkerDeploymentVersion]
94-
#[derive(FromPyObject)]
94+
#[derive(FromPyObject, Clone)]
9595
pub struct WorkerDeploymentVersion {
9696
pub deployment_name: String,
9797
pub build_id: String,
9898
}
9999

100+
impl IntoPy<Py<PyAny>> for WorkerDeploymentVersion {
101+
fn into_py(self, py: Python) -> Py<PyAny> {
102+
let dict = PyDict::new(py);
103+
dict.set_item("deployment_name", self.deployment_name)
104+
.unwrap();
105+
dict.set_item("build_id", self.build_id).unwrap();
106+
dict.into()
107+
}
108+
}
109+
110+
impl From<temporal_sdk_core_api::worker::WorkerDeploymentVersion> for WorkerDeploymentVersion {
111+
fn from(version: temporal_sdk_core_api::worker::WorkerDeploymentVersion) -> Self {
112+
WorkerDeploymentVersion {
113+
deployment_name: version.deployment_name,
114+
build_id: version.build_id,
115+
}
116+
}
117+
}
118+
100119
#[derive(FromPyObject)]
101120
pub struct TunerHolder {
102121
workflow_slot_supplier: SlotSupplier,
@@ -134,7 +153,7 @@ pub struct SlotReserveCtx {
134153
#[pyo3(get)]
135154
pub worker_identity: String,
136155
#[pyo3(get)]
137-
pub worker_build_id: String,
156+
pub worker_deployment_version: Option<WorkerDeploymentVersion>,
138157
#[pyo3(get)]
139158
pub is_sticky: bool,
140159
}
@@ -150,7 +169,7 @@ impl SlotReserveCtx {
150169
},
151170
task_queue: ctx.task_queue().to_string(),
152171
worker_identity: ctx.worker_identity().to_string(),
153-
worker_build_id: ctx.worker_build_id().to_string(),
172+
worker_deployment_version: ctx.worker_deployment_version().clone().map(Into::into),
154173
is_sticky: ctx.is_sticky(),
155174
}
156175
}

temporalio/worker/_tuning.py

+15-3
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from typing_extensions import TypeAlias
99

1010
import temporalio.bridge.worker
11+
from temporalio.common import WorkerDeploymentVersion
1112

1213
_DEFAULT_RESOURCE_ACTIVITY_MAX = 500
1314

@@ -55,7 +56,7 @@ class ResourceBasedSlotConfig:
5556
ramp_throttle: Optional[timedelta] = None
5657
"""Minimum time we will wait (after passing the minimum slots number) between handing out new slots in milliseconds.
5758
Defaults to 0 for workflows and 50ms for activities.
58-
59+
5960
This value matters because how many resources a task will use cannot be determined ahead of time, and thus the
6061
system should wait to see how much resources are used before issuing more slots."""
6162

@@ -100,11 +101,22 @@ class SlotReserveContext(Protocol):
100101
"""The name of the task queue for which this reservation request is associated."""
101102
worker_identity: str
102103
"""The identity of the worker that is requesting the reservation."""
103-
worker_build_id: str
104-
"""The build id of the worker that is requesting the reservation."""
104+
worker_deployment_version: Optional[WorkerDeploymentVersion]
105+
"""The deployment version of the worker that is requesting the reservation, if any."""
105106
is_sticky: bool
106107
"""True iff this is a reservation for a sticky poll for a workflow task."""
107108

109+
@property
110+
def worker_build_id(self) -> str:
111+
"""The build id of the worker that is requesting the reservation.
112+
113+
.. warning::
114+
Deprecated, use :py:attr:`worker_deployment_version` instead.
115+
"""
116+
if not self.worker_deployment_version:
117+
return ""
118+
return self.worker_deployment_version.build_id
119+
108120

109121
# WARNING: This must match Rust worker::WorkflowSlotInfo
110122
@runtime_checkable

temporalio/worker/_workflow_instance.py

+20-3
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import temporalio.api.sdk.v1
5151
import temporalio.bridge.proto.activity_result
5252
import temporalio.bridge.proto.child_workflow
53+
import temporalio.bridge.proto.common
5354
import temporalio.bridge.proto.workflow_activation
5455
import temporalio.bridge.proto.workflow_commands
5556
import temporalio.bridge.proto.workflow_completion
@@ -211,7 +212,9 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
211212
self._primary_task: Optional[asyncio.Task[None]] = None
212213
self._time_ns = 0
213214
self._cancel_requested = False
214-
self._current_build_id = ""
215+
self._deployment_version_for_current_task: Optional[
216+
temporalio.bridge.proto.common.WorkerDeploymentVersion
217+
] = None
215218
self._current_history_length = 0
216219
self._current_history_size = 0
217220
self._continue_as_new_suggested = False
@@ -351,7 +354,9 @@ def activate(
351354
else temporalio.api.enums.v1.VersioningBehavior.VERSIONING_BEHAVIOR_UNSPECIFIED
352355
)
353356
self._current_activation_error: Optional[Exception] = None
354-
self._current_build_id = act.build_id_for_current_task
357+
self._deployment_version_for_current_task = (
358+
act.deployment_version_for_current_task
359+
)
355360
self._current_history_length = act.history_length
356361
self._current_history_size = act.history_size_bytes
357362
self._continue_as_new_suggested = act.continue_as_new_suggested
@@ -985,7 +990,19 @@ def workflow_extern_functions(self) -> Mapping[str, Callable]:
985990
return self._extern_functions
986991

987992
def workflow_get_current_build_id(self) -> str:
988-
return self._current_build_id
993+
if not self._deployment_version_for_current_task:
994+
return ""
995+
return self._deployment_version_for_current_task.build_id
996+
997+
def workflow_get_current_deployment_version(
998+
self,
999+
) -> Optional[temporalio.common.WorkerDeploymentVersion]:
1000+
if not self._deployment_version_for_current_task:
1001+
return None
1002+
return temporalio.common.WorkerDeploymentVersion(
1003+
build_id=self._deployment_version_for_current_task.build_id,
1004+
deployment_name=self._deployment_version_for_current_task.deployment_name,
1005+
)
9891006

9901007
def workflow_get_current_history_length(self) -> int:
9911008
return self._current_history_length

temporalio/workflow.py

+24-3
Original file line numberDiff line numberDiff line change
@@ -480,12 +480,28 @@ def _logger_details(self) -> Mapping[str, Any]:
480480
def get_current_build_id(self) -> str:
481481
"""Get the Build ID of the worker which executed the current Workflow Task.
482482
483-
May be undefined if the task was completed by a worker without a Build ID. If this worker is the one executing
484-
this task for the first time and has a Build ID set, then its ID will be used. This value may change over the
485-
lifetime of the workflow run, but is deterministic and safe to use for branching.
483+
May be undefined if the task was completed by a worker without a Build ID. If this worker is
484+
the one executing this task for the first time and has a Build ID set, then its ID will be
485+
used. This value may change over the lifetime of the workflow run, but is deterministic and
486+
safe to use for branching.
487+
488+
.. deprecated::
489+
Use get_current_deployment_version instead.
486490
"""
487491
return _Runtime.current().workflow_get_current_build_id()
488492

493+
def get_current_deployment_version(
494+
self,
495+
) -> Optional[temporalio.common.WorkerDeploymentVersion]:
496+
"""Get the deployment version of the worker which executed the current Workflow Task.
497+
498+
May be None if the task was completed by a worker without a deployment version or build
499+
id. If this worker is the one executing this task for the first time and has a deployment
500+
version set, then its ID will be used. This value may change over the lifetime of the
501+
workflow run, but is deterministic and safe to use for branching.
502+
"""
503+
return _Runtime.current().workflow_get_current_deployment_version()
504+
489505
def get_current_history_length(self) -> int:
490506
"""Get the current number of events in history.
491507
@@ -613,6 +629,11 @@ def workflow_extern_functions(self) -> Mapping[str, Callable]: ...
613629
@abstractmethod
614630
def workflow_get_current_build_id(self) -> str: ...
615631

632+
@abstractmethod
633+
def workflow_get_current_deployment_version(
634+
self,
635+
) -> Optional[temporalio.common.WorkerDeploymentVersion]: ...
636+
616637
@abstractmethod
617638
def workflow_get_current_history_length(self) -> int: ...
618639

tests/worker/test_worker.py

+5
Original file line numberDiff line numberDiff line change
@@ -561,6 +561,11 @@ class DeploymentVersioningWorkflowV2Pinned:
561561
@workflow.run
562562
async def run(self):
563563
await workflow.wait_condition(lambda: self.finish)
564+
depver = workflow.info().get_current_deployment_version()
565+
assert depver
566+
assert depver.build_id == "2.0"
567+
# Just ensuring the rust object was converted properly and this method still works
568+
workflow.logger.debug(f"Dep string: {depver.to_canonical_string()}")
564569
return "version-v2"
565570

566571
@workflow.signal

0 commit comments

Comments
 (0)