From 5d95a668d084ec429ba867836b46d08b0f962c02 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Thu, 27 Mar 2025 14:02:16 -0700 Subject: [PATCH 1/7] expose root execution info --- packages/client/src/helpers.ts | 6 ++++++ packages/client/src/types.ts | 1 + 2 files changed, 7 insertions(+) diff --git a/packages/client/src/helpers.ts b/packages/client/src/helpers.ts index a695f8f9c..dfd8706a8 100644 --- a/packages/client/src/helpers.ts +++ b/packages/client/src/helpers.ts @@ -78,6 +78,12 @@ export async function executionInfoFromRaw( runId: raw.parentExecution.runId!, } : undefined, + rootExecution: raw.rootExecution + ? { + workflowId: raw.rootExecution.workflowId!, + runId: raw.rootExecution.runId!, + } + : undefined, raw: rawDataToEmbed, priority: decodePriority(raw.priority), }; diff --git a/packages/client/src/types.ts b/packages/client/src/types.ts index eb0a63dfe..148781a19 100644 --- a/packages/client/src/types.ts +++ b/packages/client/src/types.ts @@ -51,6 +51,7 @@ export interface WorkflowExecutionInfo { searchAttributes: SearchAttributes; // eslint-disable-line deprecation/deprecation typedSearchAttributes: TypedSearchAttributes; parentExecution?: Required; + rootExecution?: Required; raw: RawWorkflowExecutionInfo; priority?: Priority; } From 58ba7d576d0fb4406a272dcf506e28fb0a7fd97f Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Thu, 27 Mar 2025 14:38:28 -0700 Subject: [PATCH 2/7] update core, add root execution info to workflow info --- packages/worker/src/utils.ts | 20 ++++++++++++++++++-- packages/worker/src/worker.ts | 4 +++- packages/workflow/src/index.ts | 1 + packages/workflow/src/interfaces.ts | 10 ++++++++++ 4 files changed, 32 insertions(+), 3 deletions(-) diff --git a/packages/worker/src/utils.ts b/packages/worker/src/utils.ts index 62b8198e0..84e36a068 100644 --- a/packages/worker/src/utils.ts +++ b/packages/worker/src/utils.ts @@ -1,5 +1,5 @@ -import type { coresdk } from '@temporalio/proto'; -import { IllegalStateError, ParentWorkflowInfo } from '@temporalio/workflow'; +import type { coresdk, temporal } from '@temporalio/proto'; +import { IllegalStateError, ParentWorkflowInfo, RootWorkflowInfo } from '@temporalio/workflow'; export const MiB = 1024 ** 2; @@ -28,3 +28,19 @@ export function convertToParentWorkflowType( namespace: parent.namespace, }; } + +export function convertToRootWorkflowType( + root: temporal.api.common.v1.IWorkflowExecution | null | undefined +): RootWorkflowInfo | undefined { + if (root == null) { + return undefined; + } + if (!root.workflowId || !root.runId) { + throw new IllegalStateError('Root workflow execution is missing a field that should be defined'); + } + + return { + workflowId: root.workflowId, + runId: root.runId, + }; +} diff --git a/packages/worker/src/worker.ts b/packages/worker/src/worker.ts index a47a22a36..0c2af33cf 100644 --- a/packages/worker/src/worker.ts +++ b/packages/worker/src/worker.ts @@ -71,7 +71,7 @@ import { } from './replay'; import { History, Runtime } from './runtime'; import { CloseableGroupedObservable, closeableGroupBy, mapWithState, mergeMapWithState } from './rxutils'; -import { byteArrayToBuffer, convertToParentWorkflowType } from './utils'; +import { byteArrayToBuffer, convertToParentWorkflowType, convertToRootWorkflowType } from './utils'; import { CompiledWorkerOptions, CompiledWorkerOptionsWithBuildId, @@ -1259,6 +1259,7 @@ export class Worker { randomnessSeed, workflowType, parentWorkflowInfo, + rootWorkflow, workflowExecutionTimeout, workflowRunTimeout, workflowTaskTimeout, @@ -1281,6 +1282,7 @@ export class Worker { searchAttributes: {}, typedSearchAttributes: new TypedSearchAttributes(), parent: convertToParentWorkflowType(parentWorkflowInfo), + root: convertToRootWorkflowType(rootWorkflow), taskQueue: this.options.taskQueue, namespace: this.options.namespace, firstExecutionRunId, diff --git a/packages/workflow/src/index.ts b/packages/workflow/src/index.ts index 8889e32c0..79882aa5a 100644 --- a/packages/workflow/src/index.ts +++ b/packages/workflow/src/index.ts @@ -96,6 +96,7 @@ export { StackTraceFileSlice, ParentClosePolicy, ParentWorkflowInfo, + RootWorkflowInfo, StackTraceSDKInfo, StackTrace, UnsafeWorkflowInfo, diff --git a/packages/workflow/src/interfaces.ts b/packages/workflow/src/interfaces.ts index 7f8d6db2b..859177f72 100644 --- a/packages/workflow/src/interfaces.ts +++ b/packages/workflow/src/interfaces.ts @@ -62,6 +62,11 @@ export interface WorkflowInfo { */ readonly parent?: ParentWorkflowInfo; + /** + * Root Workflow info + */ + readonly root?: RootWorkflowInfo; + /** * Result from the previous Run (present if this is a Cron Workflow or was Continued As New). * @@ -228,6 +233,11 @@ export interface ParentWorkflowInfo { namespace: string; } +export interface RootWorkflowInfo { + workflowId: string; + runId: string; +} + /** * Not an actual error, used by the Workflow runtime to abort execution when {@link continueAsNew} is called */ From e18638dec7cb2018d0c981cd31e048c241869475 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Fri, 28 Mar 2025 14:20:12 -0700 Subject: [PATCH 3/7] added test, fixed broken test --- .../test/src/test-integration-workflows.ts | 54 ++++++++++++++++++- packages/test/src/test-sinks.ts | 1 + 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index 9d55c87ef..4fa7f70e7 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -24,7 +24,7 @@ import { activityStartedSignal } from './workflows/definitions'; import * as workflows from './workflows'; import { Context, createLocalTestEnvironment, helpers, makeTestFunction } from './helpers-integration'; import { overrideSdkInternalFlag } from './mock-internal-flags'; -import { asSdkLoggerSink, loadHistory, RUN_TIME_SKIPPING_TESTS } from './helpers'; +import { asSdkLoggerSink, loadHistory, RUN_TIME_SKIPPING_TESTS, waitUntil } from './helpers'; const test = makeTestFunction({ workflowsPath: __filename, @@ -1337,3 +1337,55 @@ test('can register search attributes to dev server', async (t) => { t.deepEqual(desc.searchAttributes, { 'new-search-attr': [12] }); // eslint-disable-line deprecation/deprecation await env.teardown(); }); + +export async function ChildWorkflowInfo(): Promise { + let blocked = true; + workflow.setHandler(unblockSignal, () => { + blocked = false; + }); + await workflow.condition(() => !blocked); + return workflow.workflowInfo().root; +} + +export async function WithChildWorkflow(childWfId: string) { + return await workflow.executeChild(ChildWorkflowInfo, { + workflowId: childWfId, + }); +} + +test('root execution is exposed', async (t) => { + const { createWorker, startWorkflow } = helpers(t); + const worker = await createWorker(); + + await worker.runUntil(async () => { + const childWfId = 'child-wf-id'; + const handle = await startWorkflow(WithChildWorkflow, { + args: [childWfId], + }); + + const childHandle = t.context.env.client.workflow.getHandle(childWfId); + const childStarted = async (): Promise => { + try { + await childHandle.describe(); + return true; + } catch (e) { + if (e instanceof workflow.WorkflowNotFoundError) { + return false; + } else { + throw e; + } + } + }; + await waitUntil(childStarted, 3000); + const childDesc = await childHandle.describe(); + const parentDesc = await handle.describe(); + + t.true(childDesc.rootExecution?.workflowId == parentDesc.workflowId); + t.true(childDesc.rootExecution?.runId == parentDesc.runId); + + await childHandle.signal(unblockSignal); + const childWfInfoRoot = await handle.result(); + t.true(childWfInfoRoot?.workflowId == parentDesc.workflowId); + t.true(childWfInfoRoot?.runId == parentDesc.runId); + }); +}); diff --git a/packages/test/src/test-sinks.ts b/packages/test/src/test-sinks.ts index f3991fccb..d612664d4 100644 --- a/packages/test/src/test-sinks.ts +++ b/packages/test/src/test-sinks.ts @@ -117,6 +117,7 @@ if (RUN_INTEGRATION_TESTS) { lastResult: undefined, memo: {}, parent: undefined, + root: undefined, searchAttributes: {}, // FIXME: consider rehydrating the class before passing to sink functions or // create a variant of WorkflowInfo that corresponds to what we actually get in sinks. From 6bd0098a2280b09ee72d0193870055fee4aee262 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Fri, 28 Mar 2025 15:07:47 -0700 Subject: [PATCH 4/7] bump timeouts --- packages/test/src/test-integration-workflows.ts | 2 +- packages/test/src/test-schedules.ts | 2 +- packages/test/src/test-typed-search-attributes.ts | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index 4fa7f70e7..e62082310 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -1376,7 +1376,7 @@ test('root execution is exposed', async (t) => { } } }; - await waitUntil(childStarted, 3000); + await waitUntil(childStarted, 5000); const childDesc = await childHandle.describe(); const parentDesc = await handle.describe(); diff --git a/packages/test/src/test-schedules.ts b/packages/test/src/test-schedules.ts index 5f1c0d96a..4b0e5588c 100644 --- a/packages/test/src/test-schedules.ts +++ b/packages/test/src/test-schedules.ts @@ -773,7 +773,7 @@ if (RUN_INTEGRATION_TESTS) { const exists = desc.typedSearchAttributes.getAll().find((pair) => pair.key.name === attributeName) !== undefined; return exists === shouldExist; - }, 300); + }, 5000); return await handle.describe(); }; diff --git a/packages/test/src/test-typed-search-attributes.ts b/packages/test/src/test-typed-search-attributes.ts index 6b8ce5318..9d4324aaf 100644 --- a/packages/test/src/test-typed-search-attributes.ts +++ b/packages/test/src/test-typed-search-attributes.ts @@ -135,7 +135,7 @@ if (test?.serial?.before) { Object.keys(untypedKeys).every((key) => key in resp.customAttributes) && Object.keys(typedKeys).every((key) => key in resp.customAttributes) ); - }, 300); + }, 5000); }); } From 31d02191381b59ef755f58827badba8509fbac85 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Fri, 28 Mar 2025 15:10:31 -0700 Subject: [PATCH 5/7] test/lint fixes --- packages/test/src/test-integration-workflows.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index e62082310..c7a16eea1 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -1347,7 +1347,7 @@ export async function ChildWorkflowInfo(): Promise { return await workflow.executeChild(ChildWorkflowInfo, { workflowId: childWfId, }); @@ -1380,12 +1380,12 @@ test('root execution is exposed', async (t) => { const childDesc = await childHandle.describe(); const parentDesc = await handle.describe(); - t.true(childDesc.rootExecution?.workflowId == parentDesc.workflowId); - t.true(childDesc.rootExecution?.runId == parentDesc.runId); + t.true(childDesc.rootExecution?.workflowId === parentDesc.workflowId); + t.true(childDesc.rootExecution?.runId === parentDesc.runId); await childHandle.signal(unblockSignal); const childWfInfoRoot = await handle.result(); - t.true(childWfInfoRoot?.workflowId == parentDesc.workflowId); - t.true(childWfInfoRoot?.runId == parentDesc.runId); + t.true(childWfInfoRoot?.workflowId === parentDesc.workflowId); + t.true(childWfInfoRoot?.runId === parentDesc.runId); }); }); From c43beb46e4b535fc1644e6de6379f07be93e7584 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Thu, 24 Apr 2025 10:55:23 -0700 Subject: [PATCH 6/7] add antonio test --- .../test/src/test-integration-workflows.ts | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/packages/test/src/test-integration-workflows.ts b/packages/test/src/test-integration-workflows.ts index c7a16eea1..72f0d85c4 100644 --- a/packages/test/src/test-integration-workflows.ts +++ b/packages/test/src/test-integration-workflows.ts @@ -1389,3 +1389,26 @@ test('root execution is exposed', async (t) => { t.true(childWfInfoRoot?.runId === parentDesc.runId); }); }); + +export async function rootWorkflow(): Promise { + let result = ''; + if (!workflow.workflowInfo().root) { + result += 'empty'; + } else { + result += workflow.workflowInfo().root!.workflowId; + } + if (!workflow.workflowInfo().parent) { + result += ' '; + result += await workflow.executeChild(rootWorkflow); + } + return result; +} + +test('Workflow can return root workflow', async (t) => { + const { createWorker, executeWorkflow } = helpers(t); + const worker = await createWorker(); + await worker.runUntil(async () => { + const result = await executeWorkflow(rootWorkflow, { workflowId: 'test-root-workflow-length' }); + t.deepEqual(result, 'empty test-root-workflow-length'); + }); +}); From 28e347e39534d105cc1db4e8c4ab225ef34957a5 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Thu, 24 Apr 2025 14:29:12 -0700 Subject: [PATCH 7/7] improve doc description of root workflow info in workflow info --- packages/workflow/src/interfaces.ts | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/packages/workflow/src/interfaces.ts b/packages/workflow/src/interfaces.ts index 859177f72..87500de01 100644 --- a/packages/workflow/src/interfaces.ts +++ b/packages/workflow/src/interfaces.ts @@ -63,7 +63,18 @@ export interface WorkflowInfo { readonly parent?: ParentWorkflowInfo; /** - * Root Workflow info + * The root workflow execution, defined as follows: + * 1. A workflow without a parent workflow is its own root workflow. + * 2. A workflow with a parent workflow has the same root workflow as + * its parent. + * + * When there is no parent workflow, i.e., the workflow is its own root workflow, + * this field is `undefined`. + * + * Note that Continue-as-New (or reset) propagates the workflow parentage relationship, + * and therefore, whether the new workflow has the same root workflow as the original one + * depends on whether it had a parent. + * */ readonly root?: RootWorkflowInfo;