Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StartWorkflow with VersioningOverride #6891

Merged
merged 11 commits into from
Nov 27, 2024
4,784 changes: 2,401 additions & 2,383 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ func CreateHistoryStartWorkflowRequest(
ContinuedFailure: startRequest.ContinuedFailure,
LastCompletionResult: startRequest.LastCompletionResult,
RootExecutionInfo: rootExecutionInfo,
VersioningOverride: startRequest.GetVersioningOverride(),
}
startRequest.ContinuedFailure = nil
startRequest.LastCompletionResult = nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,9 @@ message StartWorkflowExecutionRequest {
temporal.server.api.workflow.v1.RootExecutionInfo root_execution_info = 11;
// inherited build ID from parent/previous execution
string inherited_build_id = 12;
// If set, takes precedence over the Versioning Behavior sent by the SDK on Workflow Task completion.
// To unset the override after the workflow is running, use UpdateWorkflowExecutionOptions.
temporal.api.workflow.v1.VersioningOverride versioning_override = 13;
}

message StartWorkflowExecutionResponse {
Expand Down
4 changes: 2 additions & 2 deletions service/frontend/workflow_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (wh *WorkflowHandler) DeprecateNamespace(ctx context.Context, request *work
return resp, err
}

// StartWorkflowExecution starts a new long running workflow instance. It will create the instance with
// StartWorkflowExecution starts a new workflow instance (a "workflow execution"). It will create the instance with
// 'WorkflowExecutionStarted' event in history and also schedule the first WorkflowTask for the worker to make the
// first workflow task for this instance. It will return 'WorkflowExecutionAlreadyStartedError', if an instance already
// exists with same workflowId.
Expand Down Expand Up @@ -5331,7 +5331,7 @@ func (wh *WorkflowHandler) UpdateWorkflowExecutionOptions(
}
if opts.GetVersioningOverride().GetBehavior() == enumspb.VERSIONING_BEHAVIOR_PINNED &&
opts.GetVersioningOverride().GetDeployment() == nil {
return nil, serviceerror.NewInvalidArgument("Deployment must be set if behavior override is PINNED")
return nil, serviceerror.NewInvalidArgument("Deployment override must be set if behavior override is PINNED")
}

namespaceId, err := wh.namespaceRegistry.GetNamespaceID(namespace.Name(request.GetNamespace()))
Expand Down
10 changes: 10 additions & 0 deletions service/history/api/create_workflow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ package api
import (
"context"

enumspb "go.temporal.io/api/enums/v1"
workflowpb "go.temporal.io/api/workflow/v1"

commonpb "go.temporal.io/api/common/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"
Expand Down Expand Up @@ -75,6 +78,9 @@ func NewWorkflowWithSignal(
workflowID,
runID,
)
newMutableState.GetExecutionInfo().VersioningInfo = &workflowpb.WorkflowExecutionVersioningInfo{
VersioningOverride: startRequest.GetVersioningOverride(),
}
carlydf marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -323,6 +329,10 @@ func ValidateStartWorkflowExecutionRequest(
if len(request.WorkflowType.GetName()) > maxIDLengthLimit {
return serviceerror.NewInvalidArgument("WorkflowType exceeds length limit.")
}
if request.GetVersioningOverride().GetBehavior() == enumspb.VERSIONING_BEHAVIOR_PINNED &&
request.GetVersioningOverride().GetDeployment() == nil {
return serviceerror.NewInvalidArgument("Deployment override must be set if behavior override is PINNED")
}
if err := retrypolicy.Validate(request.RetryPolicy); err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions service/history/api/signalwithstartworkflow/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func ConvertToStartRequest(
WorkflowStartDelay: request.GetWorkflowStartDelay(),
UserMetadata: request.UserMetadata,
Links: request.GetLinks(),
VersioningOverride: request.GetVersioningOverride(),
}

return common.CreateHistoryStartWorkflowRequest(namespaceID.String(), req, nil, nil, now)
Expand Down
1 change: 1 addition & 0 deletions service/history/api/updateworkflowoptions/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package updateworkflowoptions
import (
"context"
"fmt"

"google.golang.org/protobuf/types/known/fieldmaskpb"

"go.temporal.io/api/serviceerror"
Expand Down
143 changes: 141 additions & 2 deletions tests/deployment_test.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to go through these tests after merge.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hopefully they will be cleaner by then

Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ package tests
import (
"context"
"fmt"
"testing"
"time"

workflowpb "go.temporal.io/api/workflow/v1"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/fieldmaskpb"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -907,3 +908,141 @@ func (d *DeploymentSuite) TestUpdateWorkflowExecutionOptions_SetPinnedSetUnpinne
versioningInfo = descResp.GetWorkflowExecutionInfo().GetVersioningInfo()
d.True(proto.Equal(unpinnedOpts.GetVersioningOverride(), versioningInfo.GetVersioningOverride()))
}

func (d *DeploymentSuite) TestStartWorkflowExecution_WithPinnedOverride() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
override := &workflowpb.VersioningOverride{
Behavior: enumspb.VERSIONING_BEHAVIOR_PINNED,
Deployment: &deploymentpb.Deployment{
SeriesName: "seriesName",
BuildId: "A",
},
}

resp, err := d.FrontendClient().StartWorkflowExecution(ctx, &workflowservice.StartWorkflowExecutionRequest{
Namespace: d.Namespace(),
WorkflowId: "test-workflow-id",
WorkflowType: &commonpb.WorkflowType{Name: "test-wf-type"},
TaskQueue: &taskqueuepb.TaskQueue{Name: "test-tq", Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
Identity: "test-id",
RequestId: "test-request-id",
VersioningOverride: override,
})

d.NoError(err)
d.True(resp.GetStarted())

descResp, err := d.FrontendClient().DescribeWorkflowExecution(ctx, &workflowservice.DescribeWorkflowExecutionRequest{
Namespace: d.Namespace(),
Execution: &commonpb.WorkflowExecution{
WorkflowId: "test-workflow-id",
RunId: resp.GetRunId(),
},
})
d.NoError(err)
d.True(proto.Equal(override, descResp.GetWorkflowExecutionInfo().GetVersioningInfo().GetVersioningOverride()))
}

func (d *DeploymentSuite) TestStartWorkflowExecution_WithUnpinnedOverride() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
override := &workflowpb.VersioningOverride{
Behavior: enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE,
Deployment: nil,
}

resp, err := d.FrontendClient().StartWorkflowExecution(ctx, &workflowservice.StartWorkflowExecutionRequest{
Namespace: d.Namespace(),
WorkflowId: "test-workflow-id",
WorkflowType: &commonpb.WorkflowType{Name: "test-wf-type"},
TaskQueue: &taskqueuepb.TaskQueue{Name: "test-tq", Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
Identity: "test-id",
RequestId: "test-request-id",
VersioningOverride: override,
})

d.NoError(err)
d.True(resp.GetStarted())

descResp, err := d.FrontendClient().DescribeWorkflowExecution(ctx, &workflowservice.DescribeWorkflowExecutionRequest{
Namespace: d.Namespace(),
Execution: &commonpb.WorkflowExecution{
WorkflowId: "test-workflow-id",
RunId: resp.GetRunId(),
},
})
d.NoError(err)
d.True(proto.Equal(override, descResp.GetWorkflowExecutionInfo().GetVersioningInfo().GetVersioningOverride()))
}

func (d *DeploymentSuite) TestSignalWithStartWorkflowExecution_WithPinnedOverride() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
override := &workflowpb.VersioningOverride{
Behavior: enumspb.VERSIONING_BEHAVIOR_PINNED,
Deployment: &deploymentpb.Deployment{
SeriesName: "seriesName",
BuildId: "A",
},
}

resp, err := d.FrontendClient().SignalWithStartWorkflowExecution(ctx, &workflowservice.SignalWithStartWorkflowExecutionRequest{
Namespace: d.Namespace(),
WorkflowId: "test-workflow-id",
WorkflowType: &commonpb.WorkflowType{Name: "test-wf-type"},
TaskQueue: &taskqueuepb.TaskQueue{Name: "test-tq", Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
Identity: "test-id",
RequestId: "test-request-id",
SignalName: "test-signal",
SignalInput: nil,
VersioningOverride: override,
})

d.NoError(err)
d.True(resp.GetStarted())

descResp, err := d.FrontendClient().DescribeWorkflowExecution(ctx, &workflowservice.DescribeWorkflowExecutionRequest{
Namespace: d.Namespace(),
Execution: &commonpb.WorkflowExecution{
WorkflowId: "test-workflow-id",
RunId: resp.GetRunId(),
},
})
d.NoError(err)
d.True(proto.Equal(override, descResp.GetWorkflowExecutionInfo().GetVersioningInfo().GetVersioningOverride()))
}

func (d *DeploymentSuite) TestSignalWithStartWorkflowExecution_WithUnpinnedOverride() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
override := &workflowpb.VersioningOverride{
Behavior: enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE,
Deployment: nil,
}

resp, err := d.FrontendClient().SignalWithStartWorkflowExecution(ctx, &workflowservice.SignalWithStartWorkflowExecutionRequest{
Namespace: d.Namespace(),
WorkflowId: "test-workflow-id",
WorkflowType: &commonpb.WorkflowType{Name: "test-wf-type"},
TaskQueue: &taskqueuepb.TaskQueue{Name: "test-tq", Kind: enumspb.TASK_QUEUE_KIND_NORMAL},
Identity: "test-id",
RequestId: "test-request-id",
SignalName: "test-signal",
SignalInput: nil,
VersioningOverride: override,
})

d.NoError(err)
d.True(resp.GetStarted())

descResp, err := d.FrontendClient().DescribeWorkflowExecution(ctx, &workflowservice.DescribeWorkflowExecutionRequest{
Namespace: d.Namespace(),
Execution: &commonpb.WorkflowExecution{
WorkflowId: "test-workflow-id",
RunId: resp.GetRunId(),
},
})
d.NoError(err)
d.True(proto.Equal(override, descResp.GetWorkflowExecutionInfo().GetVersioningInfo().GetVersioningOverride()))
}
Loading