Skip to content

Commit 6f2dd37

Browse files
authored
Handle metrics for benign application failures (#905)
* updated api * don't track failures for benign application errors * remove closure * linting/formatting * add workflow rule rpcs to client
1 parent 1cbca02 commit 6f2dd37

File tree

18 files changed

+1680
-39
lines changed

18 files changed

+1680
-39
lines changed

client/src/raw.rs

+45
Original file line numberDiff line numberDiff line change
@@ -847,6 +847,51 @@ proxier! {
847847
r.extensions_mut().insert(labels);
848848
}
849849
);
850+
(
851+
create_workflow_rule,
852+
CreateWorkflowRuleRequest,
853+
CreateWorkflowRuleResponse,
854+
|r| {
855+
let labels = namespaced_request!(r);
856+
r.extensions_mut().insert(labels);
857+
}
858+
);
859+
(
860+
describe_workflow_rule,
861+
DescribeWorkflowRuleRequest,
862+
DescribeWorkflowRuleResponse,
863+
|r| {
864+
let labels = namespaced_request!(r);
865+
r.extensions_mut().insert(labels);
866+
}
867+
);
868+
(
869+
delete_workflow_rule,
870+
DeleteWorkflowRuleRequest,
871+
DeleteWorkflowRuleResponse,
872+
|r| {
873+
let labels = namespaced_request!(r);
874+
r.extensions_mut().insert(labels);
875+
}
876+
);
877+
(
878+
list_workflow_rules,
879+
ListWorkflowRulesRequest,
880+
ListWorkflowRulesResponse,
881+
|r| {
882+
let labels = namespaced_request!(r);
883+
r.extensions_mut().insert(labels);
884+
}
885+
);
886+
(
887+
trigger_workflow_rule,
888+
TriggerWorkflowRuleRequest,
889+
TriggerWorkflowRuleResponse,
890+
|r| {
891+
let labels = namespaced_request!(r);
892+
r.extensions_mut().insert(labels);
893+
}
894+
);
850895
(
851896
get_search_attributes,
852897
GetSearchAttributesRequest,

core/src/telemetry/metrics.rs

+8
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use temporal_sdk_core_api::telemetry::metrics::{
1313
NoOpCoreMeter,
1414
};
1515
use temporal_sdk_core_protos::temporal::api::enums::v1::WorkflowTaskFailedCause;
16+
use temporal_sdk_core_protos::temporal::api::failure::v1::Failure;
1617

1718
/// Used to track context associated with metrics, and record/update them
1819
///
@@ -592,6 +593,13 @@ pub(super) const TASK_SLOTS_AVAILABLE_NAME: &str = "worker_task_slots_available"
592593
pub(super) const TASK_SLOTS_USED_NAME: &str = "worker_task_slots_used";
593594
pub(super) const STICKY_CACHE_SIZE_NAME: &str = "sticky_cache_size";
594595

596+
/// Track a failure metric if the failure is not a benign application failure.
597+
pub(crate) fn should_record_failure_metric(failure: &Option<Failure>) -> bool {
598+
!failure
599+
.as_ref()
600+
.is_some_and(|f| f.is_benign_application_failure())
601+
}
602+
595603
/// Helps define buckets once in terms of millis, but also generates a seconds version
596604
macro_rules! define_latency_buckets {
597605
($(($metric_name:pat, $name:ident, $sec_name:ident, [$($bucket:expr),*])),*) => {

core/src/worker/activities.rs

+6-2
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,9 @@ use crate::{
1313
UsedMeteredSemPermit,
1414
},
1515
pollers::{BoxedActPoller, PermittedTqResp, TrackedPermittedTqResp, new_activity_task_poller},
16-
telemetry::metrics::{MetricsContext, activity_type, eager, workflow_type},
16+
telemetry::metrics::{
17+
MetricsContext, activity_type, eager, should_record_failure_metric, workflow_type,
18+
},
1719
worker::{
1820
activities::activity_heartbeat_manager::ActivityHeartbeatError, client::WorkerClient,
1921
},
@@ -349,7 +351,9 @@ impl WorkerActivityTasks {
349351
.err()
350352
}
351353
aer::Status::Failed(ar::Failure { failure }) => {
352-
act_metrics.act_execution_failed();
354+
if should_record_failure_metric(&failure) {
355+
act_metrics.act_execution_failed();
356+
}
353357
client
354358
.fail_activity_task(task_token.clone(), failure)
355359
.await

core/src/worker/activities/local_activities.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::{
33
abstractions::{MeteredPermitDealer, OwnedMeteredSemPermit, UsedMeteredSemPermit, dbg_panic},
44
protosext::ValidScheduleLA,
55
retry_logic::RetryPolicyExt,
6-
telemetry::metrics::{activity_type, workflow_type},
6+
telemetry::metrics::{activity_type, should_record_failure_metric, workflow_type},
77
worker::workflow::HeartbeatTimeoutMsg,
88
};
99
use futures_util::{
@@ -583,7 +583,9 @@ impl LocalActivityManager {
583583
la_metrics.la_exec_latency(runtime);
584584
let outcome = match &status {
585585
LocalActivityExecutionResult::Failed(fail) => {
586-
la_metrics.la_execution_failed();
586+
if should_record_failure_metric(&fail.failure) {
587+
la_metrics.la_execution_failed()
588+
}
587589
Outcome::FailurePath {
588590
backoff: calc_backoff!(fail),
589591
}

core/src/worker/workflow/managed_run.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -1125,8 +1125,10 @@ impl ManagedRun {
11251125
Some(CmdAttribs::CompleteWorkflowExecutionCommandAttributes(_)) => {
11261126
self.metrics.wf_completed();
11271127
}
1128-
Some(CmdAttribs::FailWorkflowExecutionCommandAttributes(_)) => {
1129-
self.metrics.wf_failed();
1128+
Some(CmdAttribs::FailWorkflowExecutionCommandAttributes(attrs)) => {
1129+
if metrics::should_record_failure_metric(&attrs.failure) {
1130+
self.metrics.wf_failed();
1131+
}
11301132
}
11311133
Some(CmdAttribs::ContinueAsNewWorkflowExecutionCommandAttributes(_)) => {
11321134
self.metrics.wf_continued_as_new();

0 commit comments

Comments
 (0)