Skip to content

Commit

Permalink
Delete Namespace: move Nexus endpoint validation to workflow (#7022)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
Delete Namespace: move Nexus endpoint validation to workflow.

## Why?
<!-- Tell your future self why have you made these changes -->
Consolidate all delete namespace validation logic in one place.

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
Modified existing and new unit tests.

## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
This is a breaking change for `DeleteNamespace` workflow. But this
workflow is very short lived and chances of being upgraded while running
are very low.

## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->
No.

## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
No.
  • Loading branch information
alexshtin authored Dec 20, 2024
1 parent 5ff0650 commit 26cdb1b
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 142 deletions.
43 changes: 5 additions & 38 deletions service/frontend/operator_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ import (
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
)

var _ OperatorHandler = (*OperatorHandlerImpl)(nil)
Expand Down Expand Up @@ -613,43 +614,9 @@ func (h *OperatorHandlerImpl) DeleteNamespace(
return nil, errRequestNotSet
}

if !h.config.AllowDeleteNamespaceIfNexusEndpointTarget() {
// Get the namespace name in case the request is to delete by ID so we can verify that this namespace is not
// associated with a Nexus endpoint.
requestNSName := request.GetNamespace()
if request.GetNamespaceId() != "" {
nsName, err := h.namespaceRegistry.GetNamespaceName(namespace.ID(request.GetNamespaceId()))
if err != nil {
return nil, err
}
requestNSName = nsName.String()
}

// Prevent deletion of a namespace that is targeted by a Nexus endpoint.
var nextPageToken []byte
for {
resp, err := h.nexusEndpointClient.List(ctx, &operatorservice.ListNexusEndpointsRequest{
// Don't specify PageSize and fallback to default.
NextPageToken: nextPageToken,
})
if err != nil {
return nil, err
}
for _, entry := range resp.GetEndpoints() {
if nsName := entry.GetSpec().GetTarget().GetWorker().GetNamespace(); nsName == requestNSName {
return nil, serviceerror.NewFailedPrecondition(fmt.Sprintf("cannot delete a namespace that is a target of a Nexus endpoint (%s)", entry.GetSpec().GetName()))
}
}
nextPageToken = resp.NextPageToken
if len(nextPageToken) == 0 {
break
}
}
}

namespaceDeleteDelay := h.config.DeleteNamespaceNamespaceDeleteDelay()
if request.NamespaceDeleteDelay != nil {
namespaceDeleteDelay = request.NamespaceDeleteDelay.AsDuration()
// If NamespaceDeleteDelay is not provided, the default delay configured in the cluster should be used.
if request.NamespaceDeleteDelay == nil {
request.NamespaceDeleteDelay = durationpb.New(h.config.DeleteNamespaceNamespaceDeleteDelay())
}

// Execute workflow.
Expand All @@ -662,7 +629,7 @@ func (h *OperatorHandlerImpl) DeleteNamespace(
PagesPerExecution: h.config.DeleteNamespacePagesPerExecution(),
ConcurrentDeleteExecutionsActivities: h.config.DeleteNamespaceConcurrentDeleteExecutionsActivities(),
},
NamespaceDeleteDelay: namespaceDeleteDelay,
NamespaceDeleteDelay: request.NamespaceDeleteDelay.AsDuration(),
}

sdkClient := h.sdkClientFactory.GetSystemClient()
Expand Down
69 changes: 5 additions & 64 deletions service/frontend/operator_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/cluster"
"go.temporal.io/server/common/dynamicconfig"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/persistence/sql/sqlplugin/mysql"
"go.temporal.io/server/common/persistence/visibility"
Expand Down Expand Up @@ -1129,52 +1128,10 @@ func (s *operatorHandlerSuite) Test_DeleteNamespace() {
handler := s.handler
ctx := context.Background()

type test struct {
Name string
Request *operatorservice.DeleteNamespaceRequest
Expected error
}
// request validation tests
testCases := []test{
{
Name: "nil request",
Request: nil,
Expected: &serviceerror.InvalidArgument{Message: "Request is nil."},
},
}
for _, testCase := range testCases {
s.T().Run(testCase.Name, func(t *testing.T) {
resp, err := handler.DeleteNamespace(ctx, testCase.Request)
s.Equal(testCase.Expected, err)
s.Nil(resp)
})
}

// The "fake" namespace ID is associated with a Nexus endoint.
s.nexusEndpointPersistenceManager.EXPECT().ListNexusEndpoints(gomock.Any(), gomock.Any()).Return(&persistence.ListNexusEndpointsResponse{
Entries: []*persistencespb.NexusEndpointEntry{
{
Endpoint: &persistencespb.NexusEndpoint{
Spec: &persistencespb.NexusEndpointSpec{
Name: "test-endpoint",
Target: &persistencespb.NexusEndpointTarget{
Variant: &persistencespb.NexusEndpointTarget_Worker_{
Worker: &persistencespb.NexusEndpointTarget_Worker{
NamespaceId: "fake",
},
},
},
},
},
},
},
}, nil).AnyTimes()
// Map "fake" namespace ID to the "namespace-with-nexus-endpoint" name.
s.mockResource.NamespaceCache.EXPECT().GetNamespaceName(namespace.ID("fake")).
Return(namespace.Name("test-namespace"), nil).AnyTimes()
// Map "c13c01a7-3887-4eda-ba4b-9a07a6359e7e" namespace ID to the "test-namespace-deleted-ka2te" name.
s.mockResource.NamespaceCache.EXPECT().GetNamespaceName(namespace.ID("c13c01a7-3887-4eda-ba4b-9a07a6359e7e")).
Return(namespace.Name("test-namespace-deleted-ka2te"), nil).AnyTimes()
// Nil request.
resp, err := handler.DeleteNamespace(ctx, nil)
s.Equal(&serviceerror.InvalidArgument{Message: "Request is nil."}, err)
s.Nil(resp)

mockSdkClient := mocksdk.NewMockClient(s.controller)
s.mockResource.SDKClientFactory.EXPECT().GetSystemClient().Return(mockSdkClient).AnyTimes()
Expand All @@ -1185,27 +1142,11 @@ func (s *operatorHandlerSuite) Test_DeleteNamespace() {
DeleteNamespacePagesPerExecution: dynamicconfig.GetIntPropertyFn(78),
DeleteNamespaceConcurrentDeleteExecutionsActivities: dynamicconfig.GetIntPropertyFn(3),
DeleteNamespaceNamespaceDeleteDelay: dynamicconfig.GetDurationPropertyFn(22 * time.Hour),
AllowDeleteNamespaceIfNexusEndpointTarget: dynamicconfig.GetBoolPropertyFn(false),
}

// Delete by name: Nexus endpoint associated.
_, err := handler.DeleteNamespace(ctx, &operatorservice.DeleteNamespaceRequest{
Namespace: "test-namespace",
})
s.ErrorContains(err, "cannot delete a namespace that is a target of a Nexus endpoint (test-endpoint)")

// Delete by ID: Nexus endpoint associated.
_, err = handler.DeleteNamespace(ctx, &operatorservice.DeleteNamespaceRequest{
NamespaceId: "fake",
})
s.ErrorContains(err, "cannot delete a namespace that is a target of a Nexus endpoint (test-endpoint)")

// Allow delete from now on.
handler.config.AllowDeleteNamespaceIfNexusEndpointTarget = dynamicconfig.GetBoolPropertyFn(true)

// Start workflow failed.
mockSdkClient.EXPECT().ExecuteWorkflow(gomock.Any(), gomock.Any(), "temporal-sys-delete-namespace-workflow", gomock.Any()).Return(nil, errors.New("start failed"))
resp, err := handler.DeleteNamespace(ctx, &operatorservice.DeleteNamespaceRequest{
resp, err = handler.DeleteNamespace(ctx, &operatorservice.DeleteNamespaceRequest{
Namespace: "test-namespace",
})
s.Error(err)
Expand Down
11 changes: 4 additions & 7 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,8 +203,6 @@ type Config struct {
// EnableNexusAPIs controls whether to allow invoking Nexus related APIs.
EnableNexusAPIs dynamicconfig.BoolPropertyFn

AllowDeleteNamespaceIfNexusEndpointTarget dynamicconfig.BoolPropertyFn

CallbackURLMaxLength dynamicconfig.IntPropertyFnWithNamespaceFilter
CallbackHeaderMaxSize dynamicconfig.IntPropertyFnWithNamespaceFilter
MaxCallbacksPerWorkflow dynamicconfig.IntPropertyFnWithNamespaceFilter
Expand Down Expand Up @@ -334,11 +332,10 @@ func NewConfig(
EnableWorkerVersioningWorkflow: dynamicconfig.FrontendEnableWorkerVersioningWorkflowAPIs.Get(dc),
EnableWorkerVersioningRules: dynamicconfig.FrontendEnableWorkerVersioningRuleAPIs.Get(dc),

EnableNexusAPIs: dynamicconfig.EnableNexus.Get(dc),
AllowDeleteNamespaceIfNexusEndpointTarget: dynamicconfig.AllowDeleteNamespaceIfNexusEndpointTarget.Get(dc),
CallbackURLMaxLength: dynamicconfig.FrontendCallbackURLMaxLength.Get(dc),
CallbackHeaderMaxSize: dynamicconfig.FrontendCallbackHeaderMaxSize.Get(dc),
MaxCallbacksPerWorkflow: dynamicconfig.MaxCallbacksPerWorkflow.Get(dc),
EnableNexusAPIs: dynamicconfig.EnableNexus.Get(dc),
CallbackURLMaxLength: dynamicconfig.FrontendCallbackURLMaxLength.Get(dc),
CallbackHeaderMaxSize: dynamicconfig.FrontendCallbackHeaderMaxSize.Get(dc),
MaxCallbacksPerWorkflow: dynamicconfig.MaxCallbacksPerWorkflow.Get(dc),

NexusRequestHeadersBlacklist: dynamicconfig.NewGlobalCachedTypedValue(
dc,
Expand Down
53 changes: 46 additions & 7 deletions service/worker/deletenamespace/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,13 @@ import (

type (
localActivities struct {
metadataManager persistence.MetadataManager
protectedNamespaces dynamicconfig.TypedPropertyFn[[]string]
logger log.Logger
metadataManager persistence.MetadataManager
nexusEndpointManager persistence.NexusEndpointManager
logger log.Logger

protectedNamespaces dynamicconfig.TypedPropertyFn[[]string]
allowDeleteNamespaceIfNexusEndpointTarget dynamicconfig.BoolPropertyFn
nexusEndpointListDefaultPageSize dynamicconfig.IntPropertyFn
}

getNamespaceInfoResult struct {
Expand All @@ -58,13 +62,19 @@ type (

func newLocalActivities(
metadataManager persistence.MetadataManager,
protectedNamespaces dynamicconfig.TypedPropertyFn[[]string],
nexusEndpointManager persistence.NexusEndpointManager,
logger log.Logger,
protectedNamespaces dynamicconfig.TypedPropertyFn[[]string],
allowDeleteNamespaceIfNexusEndpointTarget dynamicconfig.BoolPropertyFn,
nexusEndpointListDefaultPageSize dynamicconfig.IntPropertyFn,
) *localActivities {
return &localActivities{
metadataManager: metadataManager,
protectedNamespaces: protectedNamespaces,
logger: logger,
metadataManager: metadataManager,
nexusEndpointManager: nexusEndpointManager,
logger: logger,
protectedNamespaces: protectedNamespaces,
allowDeleteNamespaceIfNexusEndpointTarget: allowDeleteNamespaceIfNexusEndpointTarget,
nexusEndpointListDefaultPageSize: nexusEndpointListDefaultPageSize,
}
}

Expand Down Expand Up @@ -99,6 +109,35 @@ func (a *localActivities) ValidateProtectedNamespacesActivity(_ context.Context,
return nil
}

func (a *localActivities) ValidateNexusEndpointsActivity(ctx context.Context, nsID namespace.ID, nsName namespace.Name) error {
if !a.allowDeleteNamespaceIfNexusEndpointTarget() {
// Prevent deletion of a namespace that is targeted by a Nexus endpoint.
var nextPageToken []byte
for {
resp, err := a.nexusEndpointManager.ListNexusEndpoints(ctx, &persistence.ListNexusEndpointsRequest{
LastKnownTableVersion: 0,
NextPageToken: nextPageToken,
PageSize: a.nexusEndpointListDefaultPageSize(),
})
if err != nil {
a.logger.Error("Unable to list Nexus endpoints from persistence.", tag.WorkflowNamespace(nsName.String()), tag.Error(err))
return fmt.Errorf("unable to list Nexus endpoints for namespace %s: %w", nsName, err)
}

for _, entry := range resp.Entries {
if endpointNsID := entry.GetEndpoint().GetSpec().GetTarget().GetWorker().GetNamespaceId(); endpointNsID == nsID.String() {
return temporal.NewNonRetryableApplicationError(fmt.Sprintf("cannot delete a namespace that is a target of a Nexus endpoint %s", entry.GetEndpoint().GetSpec().GetName()), errors.ValidationErrorErrType, nil, nil)
}
}
nextPageToken = resp.NextPageToken
if len(nextPageToken) == 0 {
break
}
}
}
return nil
}

func (a *localActivities) MarkNamespaceDeletedActivity(ctx context.Context, nsName namespace.Name) error {
ctx = headers.SetCallerName(ctx, nsName.String())

Expand Down
52 changes: 51 additions & 1 deletion service/worker/deletenamespace/activities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,13 @@ package deletenamespace

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/require"
"go.temporal.io/api/serviceerror"
"go.temporal.io/sdk/temporal"
persistencespb "go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/namespace"
"go.temporal.io/server/common/persistence"
Expand All @@ -42,7 +45,7 @@ func Test_GenerateDeletedNamespaceNameActivity(t *testing.T) {

a := &localActivities{
metadataManager: metadataManager,
logger: log.NewNoopLogger(),
logger: log.NewTestLogger(),
}

metadataManager.EXPECT().GetNamespace(gomock.Any(), &persistence.GetNamespaceRequest{
Expand All @@ -68,3 +71,50 @@ func Test_GenerateDeletedNamespaceNameActivity(t *testing.T) {

ctrl.Finish()
}
func Test_ValidateNexusEndpointsActivity(t *testing.T) {
ctrl := gomock.NewController(t)
nexusEndpointManager := persistence.NewMockNexusEndpointManager(ctrl)

a := &localActivities{
nexusEndpointManager: nexusEndpointManager,
logger: log.NewTestLogger(),

allowDeleteNamespaceIfNexusEndpointTarget: func() bool { return false },
nexusEndpointListDefaultPageSize: func() int { return 100 },
}

// The "fake" namespace ID is associated with a Nexus endoint.
nexusEndpointManager.EXPECT().ListNexusEndpoints(gomock.Any(), gomock.Any()).Return(&persistence.ListNexusEndpointsResponse{
Entries: []*persistencespb.NexusEndpointEntry{
{
Endpoint: &persistencespb.NexusEndpoint{
Spec: &persistencespb.NexusEndpointSpec{
Name: "test-endpoint",
Target: &persistencespb.NexusEndpointTarget{
Variant: &persistencespb.NexusEndpointTarget_Worker_{
Worker: &persistencespb.NexusEndpointTarget_Worker{
NamespaceId: "namespace-id",
},
},
},
},
},
},
},
}, nil).Times(2)

err := a.ValidateNexusEndpointsActivity(context.Background(), "namespace-id", "namespace")
require.Error(t, err)
var appErr *temporal.ApplicationError
require.ErrorAs(t, err, &appErr)

err = a.ValidateNexusEndpointsActivity(context.Background(), "namespace2-id", "namespace2")
require.NoError(t, err)

nexusEndpointManager.EXPECT().ListNexusEndpoints(gomock.Any(), gomock.Any()).Return(nil, errors.New("persistence failure"))
err = a.ValidateNexusEndpointsActivity(context.Background(), "namespace-id", "namespace")
require.Error(t, err)
require.Equal(t, err.Error(), "unable to list Nexus endpoints for namespace namespace: persistence failure")

ctrl.Finish()
}
Loading

0 comments on commit 26cdb1b

Please sign in to comment.