Skip to content

Expose root execution info #1662

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/client/src/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ export async function executionInfoFromRaw<T>(
runId: raw.parentExecution.runId!,
}
: undefined,
rootExecution: raw.rootExecution
? {
workflowId: raw.rootExecution.workflowId!,
runId: raw.rootExecution.runId!,
}
: undefined,
raw: rawDataToEmbed,
};
}
Expand Down
1 change: 1 addition & 0 deletions packages/client/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export interface WorkflowExecutionInfo {
searchAttributes: SearchAttributes; // eslint-disable-line deprecation/deprecation
typedSearchAttributes: TypedSearchAttributes;
parentExecution?: Required<proto.temporal.api.common.v1.IWorkflowExecution>;
rootExecution?: Required<proto.temporal.api.common.v1.IWorkflowExecution>;
raw: RawWorkflowExecutionInfo;
}

Expand Down
2 changes: 1 addition & 1 deletion packages/core-bridge/sdk-core
Submodule sdk-core updated 40 files
+5 −0 .cargo/config.toml
+54 −0 client/src/lib.rs
+84 −12 client/src/raw.rs
+12 −14 core/src/telemetry/otel.rs
+1 −0 core/src/worker/activities/local_activities.rs
+11 −0 core/src/worker/client.rs
+1 −0 core/src/worker/workflow/machines/nexus_operation_state_machine.rs
+2 −2 sdk-core-protos/protos/api_upstream/Makefile
+2,317 −971 sdk-core-protos/protos/api_upstream/openapi/openapiv2.json
+2,304 −927 sdk-core-protos/protos/api_upstream/openapi/openapiv3.yaml
+23 −0 sdk-core-protos/protos/api_upstream/temporal/api/batch/v1/message.proto
+6 −0 sdk-core-protos/protos/api_upstream/temporal/api/command/v1/message.proto
+34 −0 sdk-core-protos/protos/api_upstream/temporal/api/common/v1/message.proto
+163 −0 sdk-core-protos/protos/api_upstream/temporal/api/deployment/v1/message.proto
+47 −0 sdk-core-protos/protos/api_upstream/temporal/api/enums/v1/deployment.proto
+2 −0 sdk-core-protos/protos/api_upstream/temporal/api/enums/v1/failed_cause.proto
+42 −0 sdk-core-protos/protos/api_upstream/temporal/api/enums/v1/nexus.proto
+36 −17 sdk-core-protos/protos/api_upstream/temporal/api/enums/v1/workflow.proto
+7 −0 sdk-core-protos/protos/api_upstream/temporal/api/failure/v1/message.proto
+35 −1 sdk-core-protos/protos/api_upstream/temporal/api/history/v1/message.proto
+10 −0 sdk-core-protos/protos/api_upstream/temporal/api/nexus/v1/message.proto
+46 −2 sdk-core-protos/protos/api_upstream/temporal/api/taskqueue/v1/message.proto
+98 −12 sdk-core-protos/protos/api_upstream/temporal/api/workflow/v1/message.proto
+337 −84 sdk-core-protos/protos/api_upstream/temporal/api/workflowservice/v1/request_response.proto
+154 −42 sdk-core-protos/protos/api_upstream/temporal/api/workflowservice/v1/service.proto
+2 −0 sdk-core-protos/protos/local/temporal/sdk/core/activity_task/activity_task.proto
+13 −0 sdk-core-protos/protos/local/temporal/sdk/core/workflow_activation/workflow_activation.proto
+5 −1 sdk-core-protos/protos/local/temporal/sdk/core/workflow_commands/workflow_commands.proto
+6 −11 sdk-core-protos/src/lib.rs
+5 −0 sdk/src/activity_context.rs
+9 −11 sdk/src/lib.rs
+22 −6 sdk/src/workflow_context.rs
+8 −1 sdk/src/workflow_context/options.rs
+10 −23 sdk/src/workflow_future.rs
+22 −2 tests/integ_tests/metrics_tests.rs
+1 −0 tests/integ_tests/workflow_tests.rs
+13 −1 tests/integ_tests/workflow_tests/child_workflows.rs
+4 −0 tests/integ_tests/workflow_tests/nexus.rs
+112 −0 tests/integ_tests/workflow_tests/priority.rs
+11 −2 tests/runner.rs
54 changes: 53 additions & 1 deletion packages/test/src/test-integration-workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import { activityStartedSignal } from './workflows/definitions';
import * as workflows from './workflows';
import { Context, createLocalTestEnvironment, helpers, makeTestFunction } from './helpers-integration';
import { overrideSdkInternalFlag } from './mock-internal-flags';
import { asSdkLoggerSink, loadHistory, RUN_TIME_SKIPPING_TESTS } from './helpers';
import { asSdkLoggerSink, loadHistory, RUN_TIME_SKIPPING_TESTS, waitUntil } from './helpers';

const test = makeTestFunction({
workflowsPath: __filename,
Expand Down Expand Up @@ -1337,3 +1337,55 @@ test('can register search attributes to dev server', async (t) => {
t.deepEqual(desc.searchAttributes, { 'new-search-attr': [12] }); // eslint-disable-line deprecation/deprecation
await env.teardown();
});

export async function ChildWorkflowInfo(): Promise<workflow.RootWorkflowInfo | undefined> {
let blocked = true;
workflow.setHandler(unblockSignal, () => {
blocked = false;
});
await workflow.condition(() => !blocked);
return workflow.workflowInfo().root;
}

export async function WithChildWorkflow(childWfId: string): Promise<workflow.RootWorkflowInfo | undefined> {
return await workflow.executeChild(ChildWorkflowInfo, {
workflowId: childWfId,
});
}

test('root execution is exposed', async (t) => {
const { createWorker, startWorkflow } = helpers(t);
const worker = await createWorker();

await worker.runUntil(async () => {
const childWfId = 'child-wf-id';
const handle = await startWorkflow(WithChildWorkflow, {
args: [childWfId],
});

const childHandle = t.context.env.client.workflow.getHandle(childWfId);
const childStarted = async (): Promise<boolean> => {
try {
await childHandle.describe();
return true;
} catch (e) {
if (e instanceof workflow.WorkflowNotFoundError) {
return false;
} else {
throw e;
}
}
};
await waitUntil(childStarted, 5000);
const childDesc = await childHandle.describe();
const parentDesc = await handle.describe();

t.true(childDesc.rootExecution?.workflowId === parentDesc.workflowId);
t.true(childDesc.rootExecution?.runId === parentDesc.runId);

await childHandle.signal(unblockSignal);
const childWfInfoRoot = await handle.result();
t.true(childWfInfoRoot?.workflowId === parentDesc.workflowId);
t.true(childWfInfoRoot?.runId === parentDesc.runId);
});
});
2 changes: 1 addition & 1 deletion packages/test/src/test-schedules.ts
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ if (RUN_INTEGRATION_TESTS) {
const exists =
desc.typedSearchAttributes.getAll().find((pair) => pair.key.name === attributeName) !== undefined;
return exists === shouldExist;
}, 300);
}, 5000);
return await handle.describe();
};

Expand Down
1 change: 1 addition & 0 deletions packages/test/src/test-sinks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ if (RUN_INTEGRATION_TESTS) {
lastResult: undefined,
memo: {},
parent: undefined,
root: undefined,
searchAttributes: {},
// FIXME: consider rehydrating the class before passing to sink functions or
// create a variant of WorkflowInfo that corresponds to what we actually get in sinks.
Expand Down
2 changes: 1 addition & 1 deletion packages/test/src/test-typed-search-attributes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ if (test?.serial?.before) {
Object.keys(untypedKeys).every((key) => key in resp.customAttributes) &&
Object.keys(typedKeys).every((key) => key in resp.customAttributes)
);
}, 300);
}, 5000);
});
}

Expand Down
20 changes: 18 additions & 2 deletions packages/worker/src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { coresdk } from '@temporalio/proto';
import { IllegalStateError, ParentWorkflowInfo } from '@temporalio/workflow';
import type { coresdk, temporal } from '@temporalio/proto';
import { IllegalStateError, ParentWorkflowInfo, RootWorkflowInfo } from '@temporalio/workflow';

export const MiB = 1024 ** 2;

Expand Down Expand Up @@ -28,3 +28,19 @@ export function convertToParentWorkflowType(
namespace: parent.namespace,
};
}

export function convertToRootWorkflowType(
root: temporal.api.common.v1.IWorkflowExecution | null | undefined
): RootWorkflowInfo | undefined {
if (root == null) {
return undefined;
}
if (!root.workflowId || !root.runId) {
throw new IllegalStateError('Root workflow execution is missing a field that should be defined');
}

return {
workflowId: root.workflowId,
runId: root.runId,
};
}
4 changes: 3 additions & 1 deletion packages/worker/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ import {
} from './replay';
import { History, Runtime } from './runtime';
import { CloseableGroupedObservable, closeableGroupBy, mapWithState, mergeMapWithState } from './rxutils';
import { byteArrayToBuffer, convertToParentWorkflowType } from './utils';
import { byteArrayToBuffer, convertToParentWorkflowType, convertToRootWorkflowType } from './utils';
import {
CompiledWorkerOptions,
CompiledWorkerOptionsWithBuildId,
Expand Down Expand Up @@ -1258,6 +1258,7 @@ export class Worker {
randomnessSeed,
workflowType,
parentWorkflowInfo,
rootWorkflow,
workflowExecutionTimeout,
workflowRunTimeout,
workflowTaskTimeout,
Expand All @@ -1279,6 +1280,7 @@ export class Worker {
searchAttributes: {},
typedSearchAttributes: new TypedSearchAttributes(),
parent: convertToParentWorkflowType(parentWorkflowInfo),
root: convertToRootWorkflowType(rootWorkflow),
taskQueue: this.options.taskQueue,
namespace: this.options.namespace,
firstExecutionRunId,
Expand Down
1 change: 1 addition & 0 deletions packages/workflow/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ export {
StackTraceFileSlice,
ParentClosePolicy,
ParentWorkflowInfo,
RootWorkflowInfo,
StackTraceSDKInfo,
StackTrace,
UnsafeWorkflowInfo,
Expand Down
10 changes: 10 additions & 0 deletions packages/workflow/src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ export interface WorkflowInfo {
*/
readonly parent?: ParentWorkflowInfo;

/**
* Root Workflow info
*/
readonly root?: RootWorkflowInfo;

/**
* Result from the previous Run (present if this is a Cron Workflow or was Continued As New).
*
Expand Down Expand Up @@ -223,6 +228,11 @@ export interface ParentWorkflowInfo {
namespace: string;
}

export interface RootWorkflowInfo {
workflowId: string;
runId: string;
}

/**
* Not an actual error, used by the Workflow runtime to abort execution when {@link continueAsNew} is called
*/
Expand Down
Loading